aboutsummaryrefslogtreecommitdiff
path: root/src/IRCLine.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/IRCLine.py')
-rw-r--r--src/IRCLine.py133
1 files changed, 109 insertions, 24 deletions
diff --git a/src/IRCLine.py b/src/IRCLine.py
index 3651d9f7..64aa573e 100644
--- a/src/IRCLine.py
+++ b/src/IRCLine.py
@@ -1,23 +1,90 @@
import datetime, typing
-from src import IRCObject, IRCServer, utils
+from src import IRCObject
# this should be 510 (RFC1459, 512 with \r\n) but a server BitBot uses is broken
LINE_CUTOFF = 470
-class Line(IRCObject.Object):
- def __init__(self, server: "IRCServer.Server", send_time: datetime.datetime,
- line: str):
- self.server = server
- self._line = line
- self.send_time = send_time
+class IRCArgs(object):
+ def __init__(self, args: typing.List[str]):
+ self._args = args
+
+ def get(self, index: int) -> typing.Optional[str]:
+ if len(self._args) > index:
+ return self._args[index]
+ return None
+
+ def __repr__(self):
+ return "IRCArgs(%s)" % self._args
+ def __len__(self) -> int:
+ return len(self._args)
+ def __getitem__(self, index) -> str:
+ return self._args[index]
+
+class Hostmask(object):
+ def __init__(self, nickname: str, username: str, hostname: str,
+ hostmask: str):
+ self.nickname = nickname
+ self.username = username
+ self.hostname = hostname
+ self.hostmask = hostmask
+ def __repr__(self):
+ return "Hostmask(%s)" % self.__str__()
+ def __str__(self):
+ return self.hostmask
+
+class ParsedLine(object):
+ def __init__(self, command: str, args: typing.List[str],
+ prefix: Hostmask=None,
+ tags: typing.Dict[str, str]={}):
+ self.command = command
+ self._args = args
+ self.args = IRCArgs(args)
+ self.prefix = prefix
+ self.tags = {} if tags == None else tags
+
+ def _tag_str(self, tags: typing.Dict[str, str]) -> str:
+ tag_str = ""
+ for tag, value in tags.items():
+ if tag_str:
+ tag_str += ","
+ tag_str += tag
+ if value:
+ tag_str += "=%s" % value
+ if tag_str:
+ tag_str = "@%s" % tag_str
+ return tag_str
+
+ def format(self) -> str:
+ s = ""
+ if self.tags:
+ s += "%s " % self._tag_str(self.tags)
+
+ if self.prefix:
+ s += "%s " % self.prefix
- data, truncated = utils.encode_truncate(line, "utf8",
- self._char_limit())
+ s += self.command.upper()
- self._data = data
- self._truncated = truncated
+ if self.args:
+ if len(self._args) > 1:
+ s += " %s" % " ".join(self._args[:-1])
- self._on_send = [] # type: typing.List[typing.Callable[[], None]]
+ s += " "
+ if " " in self._args[-1] or self._args[-1][0] == ":":
+ s += ":%s" % self._args[-1]
+ else:
+ s += self._args[-1]
+
+ return s
+
+class Line(IRCObject.Object):
+ def __init__(self, send_time: datetime.datetime, hostmask: str,
+ line: ParsedLine):
+ self.send_time = send_time
+ self._hostmask = hostmask
+ self.parsed_line = line
+
+ self._on_send: typing.List[typing.Callable[[], None]] = []
+ self.truncate_marker: typing.Optional[str] = None
def __repr__(self) -> str:
return "IRCLine.Line(%s)" % self.__str__()
@@ -25,25 +92,43 @@ class Line(IRCObject.Object):
return self.decoded_data()
def _char_limit(self) -> int:
- return LINE_CUTOFF-len(":%s " % self.server.hostmask())
+ return LINE_CUTOFF-len(":%s " % self._hostmask)
+ def _encode_truncate(self) -> typing.Tuple[bytes, str]:
+ line = self.parsed_line.format()
+ byte_max = self._char_limit()
+ encoded = b""
+ truncated = ""
+ truncate_marker = b""
+ if not self.truncate_marker == None:
+ truncate_marker = typing.cast(str, self.truncate_marker
+ ).encode("utf8")
+
+ for i, character in enumerate(line):
+ encoded_character = character.encode("utf8")
+ new_len = len(encoded + encoded_character)
+ if truncate_marker and (byte_max-new_len) < len(truncate_marker):
+ encoded += truncate_marker
+ truncated = line[i:]
+ break
+ elif new_len > byte_max:
+ truncated = line[i:]
+ break
+ else:
+ encoded += encoded_character
+ return (encoded, truncated)
+
+ def _data(self) -> bytes:
+ return self._encode_truncate()[0]
def data(self) -> bytes:
- return b"%s\r\n" % self._data
+ return b"%s\r\n" % self._data()
def decoded_data(self) -> str:
- return self._data.decode("utf8")
+ return self._data().decode("utf8")
def truncated(self) -> str:
- return self._truncated
+ return self._encode_truncate()[1]
def on_send(self, func: typing.Callable[[], None]):
self._on_send.append(func)
def sent(self):
for func in self._on_send[:]:
func()
-
- def end_replace(self, s: str):
- s_encoded = s.encode("utf8")
- s_len = len(s_encoded)
-
- removed = self._data[-s_len:].decode("utf8")
- self._truncated = removed+self._truncated
- self._data = self._data[:-s_len]+s_encoded