aboutsummaryrefslogtreecommitdiff
path: root/http2irc.py
diff options
context:
space:
mode:
Diffstat (limited to 'http2irc.py')
-rw-r--r--http2irc.py32
1 files changed, 29 insertions, 3 deletions
diff --git a/http2irc.py b/http2irc.py
index dd786b4..5a1b5e3 100644
--- a/http2irc.py
+++ b/http2irc.py
@@ -147,8 +147,8 @@ class MessageQueue:
if self._getter is not None:
self._getter.set_result(None)
- def putleft_nowait(self, item):
- self._queue.appendleft(item)
+ def putleft_nowait(self, *item):
+ self._queue.extendleft(reversed(item))
if self._getter is not None:
self._getter.set_result(None)
@@ -166,6 +166,8 @@ class IRCClientProtocol(asyncio.Protocol):
self.buffer = b''
self.connected = False
self.channels = channels # Currently joined/supposed-to-be-joined channels; set(str)
+ self.unconfirmedMessages = []
+ self.pongReceivedEvent = asyncio.Event()
def connection_made(self, transport):
logging.info('Connected')
@@ -176,6 +178,7 @@ class IRCClientProtocol(asyncio.Protocol):
self.send(b'USER ' + nickb + b' ' + nickb + b' ' + nickb + b' :' + self.config.irc.real.encode('utf-8'))
self.send(b'JOIN ' + ','.join(self.channels).encode('utf-8')) #TODO: Split if too long
asyncio.create_task(self.send_messages())
+ asyncio.create_task(self.confirm_messages())
def update_channels(self, channels: set):
channelsToPart = self.channels - channels
@@ -220,10 +223,31 @@ class IRCClientProtocol(asyncio.Protocol):
logging.debug(f'{id(self)}: got message: {message!r}')
if message is None:
break
+ self.unconfirmedMessages.append((channel, message))
self.send(b'PRIVMSG ' + channel.encode('utf-8') + b' :' + message.encode('utf-8'))
- #TODO self.messageQueue.putleft_nowait if delivery fails
await asyncio.sleep(1) # Rate limit
+ async def confirm_messages(self):
+ while self.connected:
+ await asyncio.wait((asyncio.sleep(60), self.connectionClosedEvent.wait()), return_when = concurrent.futures.FIRST_COMPLETED) # Confirm once per minute
+ if not self.connected: # Disconnected while sleeping, can't confirm unconfirmed messages, requeue them directly
+ self.messageQueue.putleft_nowait(*self.unconfirmedMessages)
+ self.unconfirmedMessages = []
+ break
+ if not self.unconfirmedMessages:
+ logging.debug(f'{id(self)}: no messages to confirm')
+ continue
+ logging.debug(f'{id(self)}: trying to confirm message delivery')
+ self.pongReceivedEvent.clear()
+ self.send(b'PING :42')
+ await asyncio.wait((asyncio.sleep(5), self.pongReceivedEvent.wait()), return_when = concurrent.futures.FIRST_COMPLETED)
+ logging.debug(f'{id(self)}: message delivery success: {self.pongReceivedEvent.is_set()}')
+ if not self.pongReceivedEvent.is_set():
+ # No PONG received in five seconds, assume connection's dead
+ self.messageQueue.putleft_nowait(*self.unconfirmedMessages)
+ self.transport.close()
+ self.unconfirmedMessages = []
+
def data_received(self, data):
logging.debug(f'Data received: {data!r}')
# Split received data on CRLF. If there's any data left in the buffer, prepend it to the first message and process that.
@@ -241,6 +265,8 @@ class IRCClientProtocol(asyncio.Protocol):
logging.info(f'Message received: {message!r}')
if message.startswith(b'PING '):
self.send(b'PONG ' + message[5:])
+ elif message.startswith(b'PONG '):
+ self.pongReceivedEvent.set()
def connection_lost(self, exc):
logging.info('The server closed the connection')