diff options
| author | 2018-10-30 14:58:48 +0000 | |
|---|---|---|
| committer | 2018-10-30 14:58:48 +0000 | |
| commit | e07553c3627b80f20cdc81a35030bf0540924db8 (patch) | |
| tree | 0a81640b280e007cbe5d2cb956681068ab80c58e /src/IRCBot.py | |
| parent | Don't needlessly search a youtube URL before getting the information for it's (diff) | |
| signature | ||
Add type/return hints throughout src/ and, in doing so, fix some cyclical
references.
Diffstat (limited to 'src/IRCBot.py')
| -rw-r--r-- | src/IRCBot.py | 46 |
1 files changed, 25 insertions, 21 deletions
diff --git a/src/IRCBot.py b/src/IRCBot.py index 657fb135..61f21114 100644 --- a/src/IRCBot.py +++ b/src/IRCBot.py @@ -1,4 +1,4 @@ -import os, select, socket, sys, threading, time, traceback, uuid +import os, select, socket, sys, threading, time, traceback, typing, uuid from src import EventManager, Exports, IRCServer, Logging, ModuleManager from src import Socket, utils @@ -28,14 +28,15 @@ class Bot(object): self._trigger_functions = [] - def trigger(self, func=None): + def trigger(self, func: typing.Callable[[], typing.Any]=None): self.lock.acquire() if func: self._trigger_functions.append(func) self._trigger_client.send(b"TRIGGER") self.lock.release() - def add_server(self, server_id, connect=True): + def add_server(self, server_id: int, connect: bool = True + ) -> typing.Optional[IRCServer.Server]: (_, alias, hostname, port, password, ipv4, tls, bindhost, nickname, username, realname) = self.database.servers.get(server_id) @@ -49,20 +50,20 @@ class Bot(object): self.connect(new_server) return new_server - def add_socket(self, sock): + def add_socket(self, sock: socket.socket): self.other_sockets[sock.fileno()] = sock self.poll.register(sock.fileno(), select.EPOLLIN) - def remove_socket(self, sock): + def remove_socket(self, sock: socket.socket): del self.other_sockets[sock.fileno()] self.poll.unregister(sock.fileno()) - def get_server(self, id): + def get_server(self, id: int) -> typing.Optional[IRCServer.Server]: for server in self.servers.values(): if server.id == id: return server - def connect(self, server): + def connect(self, server: IRCServer.Server) -> bool: try: server.connect() except: @@ -73,7 +74,7 @@ class Bot(object): self.poll.register(server.fileno(), select.EPOLLOUT) return True - def next_send(self): + def next_send(self) -> typing.Optional[float]: next = None for server in self.servers.values(): timeout = server.send_throttle_timeout() @@ -81,7 +82,7 @@ class Bot(object): next = timeout return next - def next_ping(self): + def next_ping(self) -> typing.Optional[float]: timeouts = [] for server in self.servers.values(): timeout = server.until_next_ping() @@ -90,7 +91,8 @@ class Bot(object): if not timeouts: return None return min(timeouts) - def next_read_timeout(self): + + def next_read_timeout(self) -> typing.Optional[float]: timeouts = [] for server in self.servers.values(): timeouts.append(server.until_read_timeout()) @@ -98,7 +100,7 @@ class Bot(object): return None return min(timeouts) - def get_poll_timeout(self): + def get_poll_timeout(self) -> float: timeouts = [] timeouts.append(self._timers.next()) timeouts.append(self.next_send()) @@ -107,15 +109,15 @@ class Bot(object): timeouts.append(self.cache.next_expiration()) return min([timeout for timeout in timeouts if not timeout == None]) - def register_read(self, server): + def register_read(self, server: IRCServer.Server): self.poll.modify(server.fileno(), select.EPOLLIN) - def register_write(self, server): + def register_write(self, server: IRCServer.Server): self.poll.modify(server.fileno(), select.EPOLLOUT) - def register_both(self, server): + def register_both(self, server: IRCServer.Server): self.poll.modify(server.fileno(), select.EPOLLIN|select.EPOLLOUT) - def disconnect(self, server): + def disconnect(self, server: IRCServer.Server): try: self.poll.unregister(server.fileno()) except FileNotFoundError: @@ -123,23 +125,25 @@ class Bot(object): del self.servers[server.fileno()] @utils.hook("timer.reconnect") - def reconnect(self, event): + def reconnect(self, event: EventManager.Event): server = self.add_server(event["server_id"], False) if self.connect(server): self.servers[server.fileno()] = server else: event["timer"].redo() - def set_setting(self, setting, value): + def set_setting(self, setting: str, value: typing.Any): self.database.bot_settings.set(setting, value) - def get_setting(self, setting, default=None): + def get_setting(self, setting: str, default: typing.Any=None) -> typing.Any: return self.database.bot_settings.get(setting, default) - def find_settings(self, pattern, default=[]): + def find_settings(self, pattern: str, default: typing.Any=[] + ) -> typing.List[typing.Any]: return self.database.bot_settings.find(pattern, default) - def find_settings_prefix(self, prefix, default=[]): + def find_settings_prefix(self, prefix: str, default: typing.Any=[] + ) -> typing.List[typing.Any]: return self.database.bot_settings.find_prefix( prefix, default) - def del_setting(self, setting): + def del_setting(self, setting: str): self.database.bot_settings.delete(setting) def run(self): |
