1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
import time, typing, uuid
from src import Database, EventManager, Logging
class Timer(object):
def __init__(self, id: str, context: typing.Optional[str], name: str,
delay: float, next_due: typing.Optional[float], kwargs: dict):
self.id = id
self.context = context
self.name = name
self.delay = delay
if next_due:
self.next_due = next_due
else:
self.set_next_due()
self.kwargs = kwargs
self._done = False
def set_next_due(self):
self.next_due = time.time()+self.delay
def due(self) -> bool:
return self.time_left() <= 0
def time_left(self) -> float:
return self.next_due-time.time()
def redo(self):
self._done = False
self.set_next_due()
def finish(self):
self._done = True
def done(self) -> bool:
return self._done
class Timers(object):
def __init__(self, database: Database.Database,
events: EventManager.Events,
log: Logging.Log):
self.database = database
self.events = events
self.log = log
self.timers = [] # type: typing.List[Timer]
self.context_timers = {} # type: typing.Dict[str, typing.List[Timer]]
def new_context(self, context: str) -> "TimersContext":
return TimersContext(self, context)
def setup(self, timers: typing.List[typing.Tuple[str, dict]]):
for name, timer in timers:
id = name.split("timer-", 1)[1]
self._add(None, timer["name"], timer["delay"], timer[
"next-due"], id, False, timer["kwargs"])
def _persist(self, timer: Timer):
self.database.bot_settings.set("timer-%s" % timer.id, {
"name": timer.name, "delay": timer.delay,
"next-due": timer.next_due, "kwargs": timer.kwargs})
def _remove(self, timer: Timer):
if timer.context:
self.context_timers[timer.context].remove(timer)
if not self.context_timers[timer.context]:
del self.context_timers[timer.context]
else:
self.timers.remove(timer)
self.database.bot_settings.delete("timer-%s" % timer.id)
def add(self, name: str, delay: float, next_due: float=None, **kwargs):
self._add(None, name, delay, next_due, None, False, kwargs)
def add_persistent(self, name: str, delay: float, next_due: float=None,
**kwargs):
self._add(None, name, delay, next_due, None, True, kwargs)
def _add(self, context: typing.Optional[str], name: str, delay: float,
next_due: typing.Optional[float], id: typing.Optional[str],
persist: bool, kwargs: dict):
id = id or str(uuid.uuid4())
timer = Timer(id, context, name, delay, next_due, kwargs)
if persist:
self._persist(timer)
if context and not persist:
if not context in self.context_timers:
self.context_timers[context] = []
self.context_timers[context].append(timer)
else:
self.timers.append(timer)
def next(self) -> typing.Optional[float]:
times = list(filter(None,
[timer.time_left() for timer in self.get_timers()]))
if not times:
return None
return max(min(times), 0)
def get_timers(self) -> typing.List[Timer]:
return self.timers + sum(self.context_timers.values(), [])
def find_all(self, name: str) -> typing.List[Timer]:
name_lower = name.lower()
timers = self.get_timers()
found = [] # type: typing.List[Timer]
for timer in timers:
if timer.name.lower() == name_lower:
found.append(timer)
return found
def call(self):
for timer in self.get_timers():
if timer.due():
timer.finish()
self.events.on("timer.%s" % timer.name).call(timer=timer,
**timer.kwargs)
if timer.done():
self._remove(timer)
def purge_context(self, context: str):
if context in self.context_timers:
del self.context_timers[context]
class TimersContext(object):
def __init__(self, parent: Timers, context: str):
self._parent = parent
self.context = context
def add(self, name: str, delay: float, next_due: float=None,
**kwargs):
self._parent._add(self.context, name, delay, next_due, None, False,
kwargs)
def add_persistent(self, name: str, delay: float, next_due: float=None,
**kwargs):
self._parent._add(None, name, delay, next_due, None, True,
kwargs)
def find_all(self, name: str) -> typing.List[Timer]:
return self._parent.find_all(name)
|