Source code for enaml.application

#------------------------------------------------------------------------------
# Copyright (c) 2014, Nucleic Development Team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
#------------------------------------------------------------------------------
from heapq import heappush, heappop
from itertools import count
from threading import Lock

from atom.api import (
    Atom, Bool, Typed, ForwardTyped, Tuple, Dict, Callable, Value, List,
    observe
)


[docs]class ScheduledTask(Atom): """ An object representing a task in the scheduler. """ #: The callable to run when the task is executed. _callback = Callable() #: The args to pass to the callable. _args = Tuple() #: The keywords to pass to the callable. _kwargs = Dict() #: The result of invoking the callback. _result = Value() #: Whether or not the task is still valid. _valid = Bool(True) #: Whether or not the task is still pending. _pending = Bool(True) #: A callable to invoke with the result of running the task. _notify = Callable()
[docs] def __init__(self, callback, args, kwargs): """ Initialize a ScheduledTask. Parameters ---------- callback : callable The callable to run when the task is executed. args : tuple The tuple of positional arguments to pass to the callback. kwargs : dict The dict of keyword arguments to pass to the callback. """ self._callback = callback self._args = args self._kwargs = kwargs
#-------------------------------------------------------------------------- # Private API #-------------------------------------------------------------------------- def _execute(self): """ Execute the underlying task. This should only been called by the scheduler loop. """ try: if self._valid: self._result = self._callback(*self._args, **self._kwargs) if self._notify is not None: self._notify(self._result) finally: del self._notify self._pending = False #-------------------------------------------------------------------------- # Public API #--------------------------------------------------------------------------
[docs] def notify(self, callback): """ Set a callback to be run when the task is executed. Parameters ---------- callback : callable A callable which accepts a single argument which is the results of the task. It will be invoked immediate after the task is executed, on the main event loop thread. """ self._notify = callback
[docs] def pending(self): """ Returns True if this task is pending execution, False otherwise. """ return self._pending
[docs] def unschedule(self): """ Unschedule the task so that it will not be executed. If the task has already been executed, this call has no effect. """ self._valid = False
[docs] def result(self): """ Returns the result of the task, or ScheduledTask.undefined if the task has not yet been executed, was unscheduled before execution, or raised an exception on execution. """ return self._result
[docs]class ProxyResolver(Atom): """ An object which resolves requests for proxy objects. """ #: A dictionary of factories functions to use when resolving the #: proxy. The function should take no arguments, and return the #: proxy class when called. factories = Dict()
[docs] def resolve(self, name): """ Resolve the given name to a proxy calls. For example, 'Field' should resolve to a class which implements the ProxyField interface. Parameters ---------- name : string The name of the proxy object to resolve. Returns ------- result : type or None A class which implements the proxy interface, or None if no class can be found for the given name. """ factory = self.factories.get(name) if factory is not None: return factory()
def StyleSheet(): """ A lazy importer for the Enaml StyleSheet class. """ from enaml.styling import StyleSheet return StyleSheet
[docs]class Application(Atom): """ The application object which manages the top-level communication protocol for serving Enaml views. """ #: The proxy resolver to use for the application. This will normally #: be supplied by application subclasses, but can also be supplied #: by the developer to supply custom proxy resolution behavior. resolver = Typed(ProxyResolver) #: The style sheet to apply to the entire application. style_sheet = ForwardTyped(StyleSheet) #: The task heap for application tasks. _task_heap = List() #: The counter to break heap ties. _counter = Value(factory=count) #: The heap lock for protecting heap access. _heap_lock = Value(factory=Lock) #: Private class storage for the singleton application instance. _instance = None
[docs] @staticmethod def instance(): """ Get the global Application instance. Returns ------- result : Application or None The global application instance, or None if one has not yet been created. """ return Application._instance
[docs] def __new__(cls, *args, **kwargs): """ Create a new Enaml Application. There may be only one application instance in existence at any point in time. Attempting to create a new Application when one exists will raise an exception. """ if Application._instance is not None: raise RuntimeError('An Application instance already exists') self = super(Application, cls).__new__(cls, *args, **kwargs) Application._instance = self return self
#-------------------------------------------------------------------------- # Private API #-------------------------------------------------------------------------- def _process_task(self, task): """ Processes the given task, then dispatches the next task. """ try: task._execute() finally: self._next_task() def _next_task(self): """ Pulls the next task off the heap and processes it on the main gui thread. """ heap = self._task_heap with self._heap_lock: if heap: priority, ignored, task = heappop(heap) self.deferred_call(self._process_task, task) @observe('style_sheet.destroyed') def _clear_destroyed_style_sheet(self, change): """ An observer which clears a destroyed style sheet. """ self.style_sheet = None @observe('style_sheet') def _invalidate_style_cache(self, change): """ An observer which invalidates the style sheet cache. """ if change['type'] == 'update': from enaml.styling import StyleCache StyleCache._app_sheet_changed() #-------------------------------------------------------------------------- # Abstract API #--------------------------------------------------------------------------
[docs] def start(self): """ Start the application's main event loop. """ raise NotImplementedError
[docs] def stop(self): """ Stop the application's main event loop. """ raise NotImplementedError
[docs] def deferred_call(self, callback, *args, **kwargs): """ Invoke a callable on the next cycle of the main event loop thread. Parameters ---------- callback : callable The callable object to execute at some point in the future. args, kwargs Any additional positional and keyword arguments to pass to the callback. """ raise NotImplementedError
[docs] def timed_call(self, ms, callback, *args, **kwargs): """ Invoke a callable on the main event loop thread at a specified time in the future. Parameters ---------- ms : int The time to delay, in milliseconds, before executing the callable. callback : callable The callable object to execute at some point in the future. args, kwargs Any additional positional and keyword arguments to pass to the callback. """ raise NotImplementedError
[docs] def is_main_thread(self): """ Indicates whether the caller is on the main gui thread. Returns ------- result : bool True if called from the main gui thread. False otherwise. """ raise NotImplementedError
[docs] def create_mime_data(self): """ Create a new mime data object to be filled by the user. Returns ------- result : MimeData A concrete implementation of the MimeData class. """ raise NotImplementedError
#-------------------------------------------------------------------------- # Public API #--------------------------------------------------------------------------
[docs] def resolve_proxy_class(self, declaration_class): """ Resolve the proxy implementation class for a declaration. This can be reimplemented by Application subclasses if more control is needed. Parameters ---------- declaration_class : type A ToolkitObject subclass for which the proxy implementation class should be resolved. Returns ------- result : type A ProxyToolkitObject subclass for the given class, or None if one could not be resolved. """ resolver = self.resolver for base in declaration_class.mro(): name = base.__name__ cls = resolver.resolve(name) if cls is not None: return cls
[docs] def create_proxy(self, declaration): """ Create the proxy object for the given declaration. This can be reimplemented by Application subclasses if more control is needed. Parameters ---------- declaration : ToolkitObject The object for which a toolkit proxy should be created. Returns ------- result : ProxyToolkitObject or None An appropriate toolkit proxy object, or None if one cannot be create for the given declaration object. """ cls = self.resolve_proxy_class(type(declaration)) if cls is not None: return cls(declaration=declaration) msg = "could not resolve a toolkit implementation for the '%s' " msg += "component when running under a '%s'" d_name = type(declaration).__name__ a_name = type(self).__name__ raise TypeError(msg % (d_name, a_name))
[docs] def schedule(self, callback, args=None, kwargs=None, priority=0): """ Schedule a callable to be executed on the event loop thread. This call is thread-safe. Parameters ---------- callback : callable The callable object to be executed. args : tuple, optional The positional arguments to pass to the callable. kwargs : dict, optional The keyword arguments to pass to the callable. priority : int, optional The queue priority for the callable. Smaller values indicate lower priority, larger values indicate higher priority. The default priority is zero. Returns ------- result : ScheduledTask A task object which can be used to unschedule the task or retrieve the results of the callback after the task has been executed. """ if args is None: args = () if kwargs is None: kwargs = {} task = ScheduledTask(callback, args, kwargs) heap = self._task_heap with self._heap_lock: needs_start = len(heap) == 0 item = (-priority, next(self._counter), task) heappush(heap, item) if needs_start: if self.is_main_thread(): self._next_task() else: self.deferred_call(self._next_task) return task
[docs] def has_pending_tasks(self): """ Get whether or not the application has pending tasks. Returns ------- result : bool True if there are pending tasks. False otherwise. """ heap = self._task_heap with self._heap_lock: has_pending = len(heap) > 0 return has_pending
[docs] def destroy(self): """ Destroy this application instance. Once an application is created, it must be destroyed before a new application can be instantiated. """ self.stop() Application._instance = None
#------------------------------------------------------------------------------ # Helper Functions #------------------------------------------------------------------------------
[docs]def deferred_call(callback, *args, **kwargs): """ Invoke a callable on the next cycle of the main event loop thread. This is a convenience function for invoking the same method on the current application instance. If an application instance does not exist, a RuntimeError will be raised. Parameters ---------- callback : callable The callable object to execute at some point in the future. args, kwargs Any additional positional and keyword arguments to pass to the callback. """ app = Application.instance() if app is None: raise RuntimeError('Application instance does not exist') app.deferred_call(callback, *args, **kwargs)
[docs]def timed_call(ms, callback, *args, **kwargs): """ Invoke a callable on the main event loop thread at a specified time in the future. This is a convenience function for invoking the same method on the current application instance. If an application instance does not exist, a RuntimeError will be raised. Parameters ---------- ms : int The time to delay, in milliseconds, before executing the callable. callback : callable The callable object to execute at some point in the future. args, kwargs Any additional positional and keyword arguments to pass to the callback. """ app = Application.instance() if app is None: raise RuntimeError('Application instance does not exist') app.timed_call(ms, callback, *args, **kwargs)
[docs]def is_main_thread(): """ Indicates whether the caller is on the main gui thread. This is a convenience function for invoking the same method on the current application instance. If an application instance does not exist, a RuntimeError will be raised. Returns ------- result : bool True if called from the main gui thread. False otherwise. """ app = Application.instance() if app is None: raise RuntimeError('Application instance does not exist') return app.is_main_thread()
[docs]def schedule(callback, args=None, kwargs=None, priority=0): """ Schedule a callable to be executed on the event loop thread. This call is thread-safe. This is a convenience function for invoking the same method on the current application instance. If an application instance does not exist, a RuntimeError will be raised. Parameters ---------- callback : callable The callable object to be executed. args : tuple, optional The positional arguments to pass to the callable. kwargs : dict, optional The keyword arguments to pass to the callable. priority : int, optional The queue priority for the callable. Smaller values indicate lower priority, larger values indicate higher priority. The default priority is zero. Returns ------- result : ScheduledTask A task object which can be used to unschedule the task or retrieve the results of the callback after the task has been executed. """ app = Application.instance() if app is None: raise RuntimeError('Application instance does not exist') return app.schedule(callback, args, kwargs, priority)