aboutsummaryrefslogtreecommitdiff
path: root/src/IRCLine.py
diff options
context:
space:
mode:
authorGravatar jesopo2019-02-23 21:33:04 +0000
committerGravatar jesopo2019-02-23 21:33:04 +0000
commit8c94bcf6caf0ae88b3a67d0a73389a7e60810e1c (patch)
tree8d72b0760aef71da12403696e9acd387d288e4e0 /src/IRCLine.py
parent!raw needs to parse the line it's given in to an IRCParsedLine now (diff)
Move utils.irc.IRCParsedLine to IRCLine.ParsedLine, improve truncation
mechanism, don't convert sent line from ParsedLine to text to ParsedLine for line_handler handling
Diffstat (limited to 'src/IRCLine.py')
-rw-r--r--src/IRCLine.py133
1 files changed, 109 insertions, 24 deletions
diff --git a/src/IRCLine.py b/src/IRCLine.py
index 3651d9f7..64aa573e 100644
--- a/src/IRCLine.py
+++ b/src/IRCLine.py
@@ -1,23 +1,90 @@
import datetime, typing
-from src import IRCObject, IRCServer, utils
+from src import IRCObject
# this should be 510 (RFC1459, 512 with \r\n) but a server BitBot uses is broken
LINE_CUTOFF = 470
-class Line(IRCObject.Object):
- def __init__(self, server: "IRCServer.Server", send_time: datetime.datetime,
- line: str):
- self.server = server
- self._line = line
- self.send_time = send_time
+class IRCArgs(object):
+ def __init__(self, args: typing.List[str]):
+ self._args = args
+
+ def get(self, index: int) -> typing.Optional[str]:
+ if len(self._args) > index:
+ return self._args[index]
+ return None
+
+ def __repr__(self):
+ return "IRCArgs(%s)" % self._args
+ def __len__(self) -> int:
+ return len(self._args)
+ def __getitem__(self, index) -> str:
+ return self._args[index]
+
+class Hostmask(object):
+ def __init__(self, nickname: str, username: str, hostname: str,
+ hostmask: str):
+ self.nickname = nickname
+ self.username = username
+ self.hostname = hostname
+ self.hostmask = hostmask
+ def __repr__(self):
+ return "Hostmask(%s)" % self.__str__()
+ def __str__(self):
+ return self.hostmask
+
+class ParsedLine(object):
+ def __init__(self, command: str, args: typing.List[str],
+ prefix: Hostmask=None,
+ tags: typing.Dict[str, str]={}):
+ self.command = command
+ self._args = args
+ self.args = IRCArgs(args)
+ self.prefix = prefix
+ self.tags = {} if tags == None else tags
+
+ def _tag_str(self, tags: typing.Dict[str, str]) -> str:
+ tag_str = ""
+ for tag, value in tags.items():
+ if tag_str:
+ tag_str += ","
+ tag_str += tag
+ if value:
+ tag_str += "=%s" % value
+ if tag_str:
+ tag_str = "@%s" % tag_str
+ return tag_str
+
+ def format(self) -> str:
+ s = ""
+ if self.tags:
+ s += "%s " % self._tag_str(self.tags)
+
+ if self.prefix:
+ s += "%s " % self.prefix
- data, truncated = utils.encode_truncate(line, "utf8",
- self._char_limit())
+ s += self.command.upper()
- self._data = data
- self._truncated = truncated
+ if self.args:
+ if len(self._args) > 1:
+ s += " %s" % " ".join(self._args[:-1])
- self._on_send = [] # type: typing.List[typing.Callable[[], None]]
+ s += " "
+ if " " in self._args[-1] or self._args[-1][0] == ":":
+ s += ":%s" % self._args[-1]
+ else:
+ s += self._args[-1]
+
+ return s
+
+class Line(IRCObject.Object):
+ def __init__(self, send_time: datetime.datetime, hostmask: str,
+ line: ParsedLine):
+ self.send_time = send_time
+ self._hostmask = hostmask
+ self.parsed_line = line
+
+ self._on_send: typing.List[typing.Callable[[], None]] = []
+ self.truncate_marker: typing.Optional[str] = None
def __repr__(self) -> str:
return "IRCLine.Line(%s)" % self.__str__()
@@ -25,25 +92,43 @@ class Line(IRCObject.Object):
return self.decoded_data()
def _char_limit(self) -> int:
- return LINE_CUTOFF-len(":%s " % self.server.hostmask())
+ return LINE_CUTOFF-len(":%s " % self._hostmask)
+ def _encode_truncate(self) -> typing.Tuple[bytes, str]:
+ line = self.parsed_line.format()
+ byte_max = self._char_limit()
+ encoded = b""
+ truncated = ""
+ truncate_marker = b""
+ if not self.truncate_marker == None:
+ truncate_marker = typing.cast(str, self.truncate_marker
+ ).encode("utf8")
+
+ for i, character in enumerate(line):
+ encoded_character = character.encode("utf8")
+ new_len = len(encoded + encoded_character)
+ if truncate_marker and (byte_max-new_len) < len(truncate_marker):
+ encoded += truncate_marker
+ truncated = line[i:]
+ break
+ elif new_len > byte_max:
+ truncated = line[i:]
+ break
+ else:
+ encoded += encoded_character
+ return (encoded, truncated)
+
+ def _data(self) -> bytes:
+ return self._encode_truncate()[0]
def data(self) -> bytes:
- return b"%s\r\n" % self._data
+ return b"%s\r\n" % self._data()
def decoded_data(self) -> str:
- return self._data.decode("utf8")
+ return self._data().decode("utf8")
def truncated(self) -> str:
- return self._truncated
+ return self._encode_truncate()[1]
def on_send(self, func: typing.Callable[[], None]):
self._on_send.append(func)
def sent(self):
for func in self._on_send[:]:
func()
-
- def end_replace(self, s: str):
- s_encoded = s.encode("utf8")
- s_len = len(s_encoded)
-
- removed = self._data[-s_len:].decode("utf8")
- self._truncated = removed+self._truncated
- self._data = self._data[:-s_len]+s_encoded