diff options
Diffstat (limited to 'src/IRCLine.py')
| -rw-r--r-- | src/IRCLine.py | 81 |
1 files changed, 80 insertions, 1 deletions
diff --git a/src/IRCLine.py b/src/IRCLine.py index e4f5462a..75c88215 100644 --- a/src/IRCLine.py +++ b/src/IRCLine.py @@ -37,6 +37,21 @@ class Hostmask(object): def __str__(self): return self.hostmask +def parse_hostmask(hostmask: str) -> Hostmask: + nickname, _, username = hostmask.partition("!") + username, _, hostname = username.partition("@") + return Hostmask(nickname, username, hostname, hostmask) + +MESSAGE_TAG_ESCAPED = [r"\:", r"\s", r"\\", r"\r", r"\n"] +MESSAGE_TAG_UNESCAPED = [";", " ", "\\", "\r", "\n"] +def message_tag_escape(s): + return utils.irc.multi_replace(s, MESSAGE_TAG_UNESCAPED, + MESSAGE_TAG_ESCAPED) +def message_tag_unescape(s): + unescaped = utils.irc.multi_replace(s, MESSAGE_TAG_ESCAPED, + MESSAGE_TAG_UNESCAPED) + return unescaped.replace("\\", "") + class ParsedLine(object): def __init__(self, command: str, args: typing.List[str], source: Hostmask=None, @@ -76,7 +91,7 @@ class ParsedLine(object): tag_pieces = [] for tag, value in tags.items(): if value: - value_escaped = utils.irc.message_tag_escape(value) + value_escaped = message_tag_escape(value) tag_pieces.append("%s=%s" % (tag, value_escaped)) else: tag_pieces.append(tag) @@ -144,6 +159,41 @@ class ParsedLine(object): return valid, overflow +def parse_line(line: str) -> ParsedLine: + tags = {} # type: typing.Dict[str, typing.Any] + source = None # type: typing.Optional[Hostmask] + command = None + + if line[0] == "@": + tags_prefix, line = line[1:].split(" ", 1) + + for tag in filter(None, tags_prefix.split(";")): + tag, sep, value = tag.partition("=") + if value: + tags[tag] = message_tag_unescape(value) + else: + tags[tag] = None + + line, trailing_separator, trailing_split = line.partition(" :") + + trailing = None # type: typing.Optional[str] + if trailing_separator: + trailing = trailing_split + + if line[0] == ":": + source_str, line = line[1:].split(" ", 1) + source = parse_hostmask(source_str) + + command, sep, line = line.partition(" ") + args = [] # type: typing.List[str] + if line: + # this is so that `args` is empty if `line` is empty + args = line.split(" ") + + if not trailing == None: + args.append(typing.cast(str, trailing)) + return ParsedLine(command, args, source, tags) + class SentLine(IRCObject.Object): def __init__(self, events: "EventManager.Events", send_time: datetime.datetime, hostmask: str, line: ParsedLine): @@ -161,3 +211,32 @@ class SentLine(IRCObject.Object): return self.parsed_line.truncate(self._hostmask)[0] def for_wire(self) -> bytes: return b"%s\r\n" % self._for_wire().encode("utf8") + +class IRCBatch(object): + def __init__(self, identifier: str, batch_type: str, args: typing.List[str], + tags: typing.Dict[str, str]=None, source: Hostmask=None): + self.identifier = identifier + self.type = batch_type + self.args = args + self.tags = tags or {} + self.source = source + self._lines = [] # type: typing.List[ParsedLine] + def add_line(self, line: ParsedLine): + self._lines.append(line) + def get_lines(self) -> typing.List[ParsedLine]: + return self._lines + +class IRCSendBatch(IRCBatch): + def __init__(self, batch_type: str, args: typing.List[str], + tags: typing.Dict[str, str]=None): + IRCBatch.__init__(self, str(uuid.uuid4()), batch_type, args, tags) + def get_lines(self) -> typing.List[ParsedLine]: + lines = [] + for line in self._lines: + line.add_tag("batch", self.identifier) + lines.append(line) + + lines.insert(0, ParsedLine("BATCH", + ["+%s" % self.identifier, self.type])) + lines.append(ParsedLine("BATCH", ["-%s" % self.identifier])) + return lines |
