import itertools, time, traceback, typing from src import Logging, utils PRIORITY_URGENT = 0 PRIORITY_HIGH = 1 PRIORITY_MEDIUM = 2 PRIORITY_LOW = 3 PRIORITY_MONITOR = 4 DEFAULT_PRIORITY = PRIORITY_MEDIUM DEFAULT_EVENT_DELIMITER = "." DEFAULT_MULTI_DELIMITER = "|" CALLBACK_TYPE = typing.Callable[["Event"], typing.Any] class Event(object): def __init__(self, name: str, **kwargs): self.name = name self.kwargs = kwargs self.eaten = False def __getitem__(self, key: str) -> typing.Any: return self.kwargs[key] def get(self, key: str, default=None) -> typing.Any: return self.kwargs.get(key, default) def __contains__(self, key: str) -> bool: return key in self.kwargs def eat(self): self.eaten = True class EventCallback(object): def __init__(self, function: CALLBACK_TYPE, priority: int, kwargs: dict): self.function = function self.priority = priority self.kwargs = kwargs self.docstring = utils.parse.docstring(function.__doc__) def call(self, event: Event) -> typing.Any: return self.function(event) def get_kwarg(self, name: str, default=None) -> typing.Any: item = self.kwargs.get(name, default) return item or self.docstring.items.get(name, default) class EventHook(object): def __init__(self, log: Logging.Log, name: str = None, parent: "EventHook" = None): self.log = log self.name = name self.parent = parent self._children = {} self._hooks = [] self._stored_events = [] self._context_hooks = {} def _make_event(self, kwargs: dict) -> Event: return Event(self._get_path(), **kwargs) def _get_path(self) -> str: path = [] parent = self while not parent == None and not parent.name == None: path.append(parent.name) parent = parent.parent return DEFAULT_EVENT_DELIMITER.join(path[::-1]) def new_context(self, context: str) -> "EventHookContext": return EventHookContext(self, context) def hook(self, function: CALLBACK_TYPE, priority: int = DEFAULT_PRIORITY, replay: bool = False, **kwargs) -> EventCallback: return self._hook(function, None, priority, replay, kwargs) def _context_hook(self, context: str, function: CALLBACK_TYPE, priority: int, replay: bool, kwargs: dict) -> EventCallback: return self._hook(function, context, priority, replay, kwargs) def _hook(self, function: CALLBACK_TYPE, context: str, priority: int, replay: bool, kwargs: dict) -> EventCallback: callback = EventCallback(function, priority, kwargs) if context == None: self._hooks.append(callback) else: if not context in self._context_hooks: self._context_hooks[context] = [] self._context_hooks[context].append(callback) if replay and not self._stored_events == None: for kwargs in self._stored_events: self._call(kwargs, True, None) self._stored_events = None return callback def unhook(self, callback: "EventHook"): if callback in self._hooks: self._hooks.remove(callback) empty = [] for context, hooks in self._context_hooks.items(): if callback in hooks: hooks.remove(callback) if not hooks: empty.append(context) for context in empty: del self._context_hooks[context] def _make_multiple_hook(self, source: "EventHook", context: str, events: typing.List[str]) -> "MultipleEventHook": multiple_event_hook = MultipleEventHook() for event in events: event_hook = source.get_child(event) if not context == None: event_hook = event_hook.new_context(context) multiple_event_hook._add(event_hook) return multiple_event_hook def on(self, subevent: str, *extra_subevents, delimiter: int = DEFAULT_EVENT_DELIMITER) -> "EventHook": return self._on(subevent, extra_subevents, None, delimiter) def _context_on(self, context: str, subevent: str, extra_subevents: typing.List[str], delimiter: str = DEFAULT_EVENT_DELIMITER) -> "EventHook": return self._on(subevent, extra_subevents, context, delimiter) def _on(self, subevent: str, extra_subevents: typing.List[str], context: str, delimiter: str) -> "EventHook": if delimiter in subevent: event_chain = subevent.split(delimiter) event_obj = self for event_name in event_chain: if DEFAULT_MULTI_DELIMITER in event_name: return self._make_multiple_hook(event_obj, context, event_name.split(DEFAULT_MULTI_DELIMITER)) event_obj = event_obj.get_child(event_name) if not context == None: return event_obj.new_context(context) return event_obj if extra_subevents: return self._make_multiple_hook(self, context, (subevent,)+extra_subevents) child = self.get_child(subevent) if not context == None: child = child.new_context(context) return child def call_for_result(self, default=None, **kwargs) -> typing.Any: return (self.call_limited(1, **kwargs) or [default])[0] def assure_call(self, **kwargs): if not self._stored_events == None: self._stored_events.append(kwargs) else: self._call(kwargs, True, None) def call(self, **kwargs) -> typing.List[typing.Any]: return self._call(kwargs, True, None) def call_limited(self, maximum: int, **kwargs) -> typing.List[typing.Any]: return self._call(kwargs, True, None) def call_unsafe_for_result(self, default=None, **kwargs) -> typing.Any: return (self.call_unsafe_limited(1, **kwargs) or [default])[0] def call_unsafe(self, **kwargs) -> typing.List[typing.Any]: return self._call(kwargs, False, None) def call_unsafe_limited(self, maximum: int, **kwargs ) -> typing.List[typing.Any]: return self._call(kwargs, False, maximum) def _call(self, kwargs: dict, safe: bool, maximum: int ) -> typing.List[typing.Any]: event_path = self._get_path() self.log.trace("calling event: \"%s\" (params: %s)", [event_path, kwargs]) start = time.monotonic() event = self._make_event(kwargs) returns = [] for hook in self.get_hooks()[:maximum]: if event.eaten: break try: returns.append(hook.call(event)) except Exception as e: if safe: self.log.error("failed to call event \"%s\"", [self._get_path()], exc_info=True) else: raise total_milliseconds = (time.monotonic() - start) * 1000 self.log.trace("event \"%s\" called in %fms", [ event_path, total_milliseconds]) self.check_purge() return returns def get_child(self, child_name: str) -> "EventHook": child_name_lower = child_name.lower() if not child_name_lower in self._children: self._children[child_name_lower] = EventHook(self.log, child_name_lower, self) return self._children[child_name_lower] def remove_child(self, child_name: str): child_name_lower = child_name.lower() if child_name_lower in self._children: del self._children[child_name_lower] def check_purge(self): if self.is_empty() and not self.parent == None: self.parent.remove_child(self.name) self.parent.check_purge() def remove_context(self, context: str): del self._context_hooks[context] def has_context(self, context: str) -> bool: return context in self._context_hooks def purge_context(self, context: str): if self.has_context(context): self.remove_context(context) for child_name in self.get_children()[:]: child = self.get_child(child_name) child.purge_context(context) def get_hooks(self) -> typing.List[EventCallback]: return sorted(self._hooks + sum(self._context_hooks.values(), []), key=lambda e: e.priority) def get_children(self) -> typing.List["EventHook"]: return list(self._children.keys()) def is_empty(self) -> bool: return len(self.get_hooks() + self.get_children()) == 0 class MultipleEventHook(object): def __init__(self): self._event_hooks = set([]) def _add(self, event_hook: EventHook): self._event_hooks.add(event_hook) def hook(self, function: CALLBACK_TYPE, **kwargs): for event_hook in self._event_hooks: event_hook.hook(function, **kwargs) def call_limited(self, maximum: int, **kwargs) -> typing.List[typing.Any]: returns = [] for event_hook in self._event_hooks: returns.append(event_hook.call_limited(maximum, **kwargs)) return returns def call(self, **kwargs) -> typing.List[typing.Any]: returns = [] for event_hook in self._event_hooks: returns.append(event_hook.call(**kwargs)) return returns class EventHookContext(object): def __init__(self, parent, context): self._parent = parent self.context = context def hook(self, function: CALLBACK_TYPE, priority: int = DEFAULT_PRIORITY, replay: bool = False, **kwargs) -> EventCallback: return self._parent._context_hook(self.context, function, priority, replay, kwargs) def unhook(self, callback: EventCallback): self._parent.unhook(callback) def on(self, subevent: str, *extra_subevents, delimiter: str = DEFAULT_EVENT_DELIMITER) -> EventHook: return self._parent._context_on(self.context, subevent, extra_subevents, delimiter) def call_for_result(self, default=None, **kwargs) -> typing.Any: return self._parent.call_for_result(default, **kwargs) def assure_call(self, **kwargs): self._parent.assure_call(**kwargs) def call(self, **kwargs) -> typing.List[typing.Any]: return self._parent.call(**kwargs) def call_limited(self, maximum: int, **kwargs) -> typing.List[typing.Any]: return self._parent.call_limited(maximum, **kwargs) def call_unsafe_for_result(self, default=None, **kwargs) -> typing.Any: return self._parent.call_unsafe_for_result(default, **kwargs) def call_unsafe(self, **kwargs) -> typing.List[typing.Any]: return self._parent.call_unsafe(**kwargs) def call_unsafe_limited(self, maximum: int, **kwargs ) -> typing.List[typing.Any]: return self._parent.call_unsafe_limited(maximum, **kwargs) def get_hooks(self) -> typing.List[EventCallback]: return self._parent.get_hooks() def get_children(self) -> typing.List[EventHook]: return self._parent.get_children()