aboutsummaryrefslogtreecommitdiff
path: root/src/Timers.py
blob: f913b7816d3aa492867d4babcab2f611728357ab (about) (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import time, typing, uuid
from src import Database, EventManager, Logging, PollHook

T_CALLBACK = typing.Callable[["Timer"], None]

class Timer(object):
    def __init__(self, id: str, context: typing.Optional[str], name: str,
            delay: float, next_due: typing.Optional[float], kwargs: dict,
            callback: T_CALLBACK):
        self.id = id
        self.context = context
        self.name = name
        self.delay = delay
        if next_due:
            self.next_due = next_due
        else:
            self.set_next_due()
        self.kwargs = kwargs
        self.callback = callback
        self._done = False

    def set_next_due(self):
        self.next_due = time.time()+self.delay
    def due(self) -> bool:
        return not self.done() and self.time_left() <= 0
    def time_left(self) -> float:
        return self.next_due-time.time()

    def redo(self):
        self._done = False
        self.set_next_due()
    def finish(self):
        self._done = True
    def cancel(self):
        self.finish()
    def done(self) -> bool:
        return self._done

class Timers(PollHook.PollHook):
    def __init__(self, database: Database.Database,
            events: EventManager.Events,
            log: Logging.Log):
        self.database = database
        self.events = events
        self.log = log
        self.timers = [] # type: typing.List[Timer]
        self.context_timers = {} # type: typing.Dict[str, typing.List[Timer]]

    def new_context(self, context: str) -> "TimersContext":
        return TimersContext(self, context)

    def setup(self, timers: typing.List[typing.Tuple[str, dict]]):
        for name, timer in timers:
            id = name.split("timer-", 1)[1]
            self._add(None, timer["name"], timer["delay"], timer[
                "next-due"], id, False, timer["kwargs"])

    def _persist(self, timer: Timer):
        self.database.bot_settings.set("timer-%s" % timer.id, {
            "name": timer.name, "delay": timer.delay,
            "next-due": timer.next_due, "kwargs": timer.kwargs})
    def _remove(self, timer: Timer):
        if timer.context:
            self.context_timers[timer.context].remove(timer)
            if not self.context_timers[timer.context]:
                del self.context_timers[timer.context]
        else:
            self.timers.remove(timer)
        self.database.bot_settings.delete("timer-%s" % timer.id)

    def add(self, name: str, callback: T_CALLBACK, delay: float,
            next_due: float=None, **kwargs) -> Timer:
        return self._add(None, name, delay, next_due, None, False, kwargs,
            callback=callback)
    def add_persistent(self, name: str, delay: float, next_due: float=None,
            **kwargs) -> Timer:
        return self._add(None, name, delay, next_due, None, True, kwargs)
    def _add(self, context: typing.Optional[str], name: str, delay: float,
            next_due: typing.Optional[float], id: typing.Optional[str],
            persist: bool, kwargs: dict, callback: T_CALLBACK=None) -> Timer:
        id = id or str(uuid.uuid4())

        if not callback:
            callback = lambda timer: self.events.on("timer.%s" % name).call(
                timer=timer, **kwargs)

        timer = Timer(id, context, name, delay, next_due, kwargs,
            callback=callback)
        if persist:
            self._persist(timer)

        if context and not persist:
            if not context in self.context_timers:
                self.context_timers[context] = []
            self.context_timers[context].append(timer)
        else:
            self.timers.append(timer)
        return timer

    def next(self) -> typing.Optional[float]:
        times = list(filter(None,
            [timer.time_left() for timer in self.get_timers()]))
        if not times:
            return None
        return max(min(times), 0)

    def get_timers(self) -> typing.List[Timer]:
        return self.timers + sum(self.context_timers.values(), [])

    def find_all(self, name: str) -> typing.List[Timer]:
        name_lower = name.lower()
        timers = self.get_timers()
        found = [] # type: typing.List[Timer]
        for timer in timers:
            if timer.name.lower() == name_lower:
                found.append(timer)

        return found

    def call(self):
        for timer in self.get_timers():
            if timer.due():
                timer.finish()
                timer.callback(timer)
            if timer.done():
                self._remove(timer)

    def purge_context(self, context: str):
        if context in self.context_timers:
            del self.context_timers[context]

class TimersContext(object):
    def __init__(self, parent: Timers, context: str):
        self._parent = parent
        self.context = context
    def add(self, name: str, callback: T_CALLBACK, delay: float,
            next_due: float=None, **kwargs) -> Timer:
        return self._parent._add(self.context, name, delay, next_due, None,
            False, kwargs, callback=callback)
    def add_persistent(self, name: str, delay: float, next_due: float=None,
            **kwargs) -> Timer:
        return self._parent._add(None, name, delay, next_due, None, True,
            kwargs)
    def find_all(self, name: str) -> typing.List[Timer]:
        return self._parent.find_all(name)