diff options
| author | 2019-12-02 05:09:16 +0000 | |
|---|---|---|
| committer | 2019-12-02 05:09:16 +0000 | |
| commit | 8e5fbe765d934b9e992d47ac164003281b33112a (patch) | |
| tree | cdd8b7d77eefafad01b1573ab0dce995dcc2f22f /http2irc.py | |
| parent | Refactor into more flexible tool supporting multiple endpoints and channels (... (diff) | |
| signature | ||
Confirm message delivery by periodic PINGs
This assumes that a PONG response from the server means the connection is still intact. The messages could still have been dropped for weird reasons, but short of using a second client to confirm the delivery directly, there is no better option to test this.
RFC 1459 actually specifies that "servers should not respond to PING commands". However, this recommendation has been removed in RFC 2812, and in practice, all common IRC servers seem to respond to PINGs.
Diffstat (limited to 'http2irc.py')
| -rw-r--r-- | http2irc.py | 32 |
1 files changed, 29 insertions, 3 deletions
diff --git a/http2irc.py b/http2irc.py index dd786b4..5a1b5e3 100644 --- a/http2irc.py +++ b/http2irc.py @@ -147,8 +147,8 @@ class MessageQueue: if self._getter is not None: self._getter.set_result(None) - def putleft_nowait(self, item): - self._queue.appendleft(item) + def putleft_nowait(self, *item): + self._queue.extendleft(reversed(item)) if self._getter is not None: self._getter.set_result(None) @@ -166,6 +166,8 @@ class IRCClientProtocol(asyncio.Protocol): self.buffer = b'' self.connected = False self.channels = channels # Currently joined/supposed-to-be-joined channels; set(str) + self.unconfirmedMessages = [] + self.pongReceivedEvent = asyncio.Event() def connection_made(self, transport): logging.info('Connected') @@ -176,6 +178,7 @@ class IRCClientProtocol(asyncio.Protocol): self.send(b'USER ' + nickb + b' ' + nickb + b' ' + nickb + b' :' + self.config.irc.real.encode('utf-8')) self.send(b'JOIN ' + ','.join(self.channels).encode('utf-8')) #TODO: Split if too long asyncio.create_task(self.send_messages()) + asyncio.create_task(self.confirm_messages()) def update_channels(self, channels: set): channelsToPart = self.channels - channels @@ -220,10 +223,31 @@ class IRCClientProtocol(asyncio.Protocol): logging.debug(f'{id(self)}: got message: {message!r}') if message is None: break + self.unconfirmedMessages.append((channel, message)) self.send(b'PRIVMSG ' + channel.encode('utf-8') + b' :' + message.encode('utf-8')) - #TODO self.messageQueue.putleft_nowait if delivery fails await asyncio.sleep(1) # Rate limit + async def confirm_messages(self): + while self.connected: + await asyncio.wait((asyncio.sleep(60), self.connectionClosedEvent.wait()), return_when = concurrent.futures.FIRST_COMPLETED) # Confirm once per minute + if not self.connected: # Disconnected while sleeping, can't confirm unconfirmed messages, requeue them directly + self.messageQueue.putleft_nowait(*self.unconfirmedMessages) + self.unconfirmedMessages = [] + break + if not self.unconfirmedMessages: + logging.debug(f'{id(self)}: no messages to confirm') + continue + logging.debug(f'{id(self)}: trying to confirm message delivery') + self.pongReceivedEvent.clear() + self.send(b'PING :42') + await asyncio.wait((asyncio.sleep(5), self.pongReceivedEvent.wait()), return_when = concurrent.futures.FIRST_COMPLETED) + logging.debug(f'{id(self)}: message delivery success: {self.pongReceivedEvent.is_set()}') + if not self.pongReceivedEvent.is_set(): + # No PONG received in five seconds, assume connection's dead + self.messageQueue.putleft_nowait(*self.unconfirmedMessages) + self.transport.close() + self.unconfirmedMessages = [] + def data_received(self, data): logging.debug(f'Data received: {data!r}') # Split received data on CRLF. If there's any data left in the buffer, prepend it to the first message and process that. @@ -241,6 +265,8 @@ class IRCClientProtocol(asyncio.Protocol): logging.info(f'Message received: {message!r}') if message.startswith(b'PING '): self.send(b'PONG ' + message[5:]) + elif message.startswith(b'PONG '): + self.pongReceivedEvent.set() def connection_lost(self, exc): logging.info('The server closed the connection') |
