diff options
| author | 2019-10-27 10:19:00 +0000 | |
|---|---|---|
| committer | 2019-10-27 10:19:00 +0000 | |
| commit | 8f4b5a0e70804f8f19f8b9032b7d93857cda40e8 (patch) | |
| tree | c522c8a04f4173cfc104796dd5cf942f2c44a0fb /src | |
| parent | fallback ActivityPub data encoding to utf8 (diff) | |
| signature | ||
move IRCLine related code from utils.irc to IRCLine.py
Diffstat (limited to 'src')
| -rw-r--r-- | src/IRCLine.py | 81 | ||||
| -rw-r--r-- | src/IRCServer.py | 4 | ||||
| -rw-r--r-- | src/utils/irc/__init__.py | 88 |
3 files changed, 87 insertions, 86 deletions
diff --git a/src/IRCLine.py b/src/IRCLine.py index e4f5462a..75c88215 100644 --- a/src/IRCLine.py +++ b/src/IRCLine.py @@ -37,6 +37,21 @@ class Hostmask(object): def __str__(self): return self.hostmask +def parse_hostmask(hostmask: str) -> Hostmask: + nickname, _, username = hostmask.partition("!") + username, _, hostname = username.partition("@") + return Hostmask(nickname, username, hostname, hostmask) + +MESSAGE_TAG_ESCAPED = [r"\:", r"\s", r"\\", r"\r", r"\n"] +MESSAGE_TAG_UNESCAPED = [";", " ", "\\", "\r", "\n"] +def message_tag_escape(s): + return utils.irc.multi_replace(s, MESSAGE_TAG_UNESCAPED, + MESSAGE_TAG_ESCAPED) +def message_tag_unescape(s): + unescaped = utils.irc.multi_replace(s, MESSAGE_TAG_ESCAPED, + MESSAGE_TAG_UNESCAPED) + return unescaped.replace("\\", "") + class ParsedLine(object): def __init__(self, command: str, args: typing.List[str], source: Hostmask=None, @@ -76,7 +91,7 @@ class ParsedLine(object): tag_pieces = [] for tag, value in tags.items(): if value: - value_escaped = utils.irc.message_tag_escape(value) + value_escaped = message_tag_escape(value) tag_pieces.append("%s=%s" % (tag, value_escaped)) else: tag_pieces.append(tag) @@ -144,6 +159,41 @@ class ParsedLine(object): return valid, overflow +def parse_line(line: str) -> ParsedLine: + tags = {} # type: typing.Dict[str, typing.Any] + source = None # type: typing.Optional[Hostmask] + command = None + + if line[0] == "@": + tags_prefix, line = line[1:].split(" ", 1) + + for tag in filter(None, tags_prefix.split(";")): + tag, sep, value = tag.partition("=") + if value: + tags[tag] = message_tag_unescape(value) + else: + tags[tag] = None + + line, trailing_separator, trailing_split = line.partition(" :") + + trailing = None # type: typing.Optional[str] + if trailing_separator: + trailing = trailing_split + + if line[0] == ":": + source_str, line = line[1:].split(" ", 1) + source = parse_hostmask(source_str) + + command, sep, line = line.partition(" ") + args = [] # type: typing.List[str] + if line: + # this is so that `args` is empty if `line` is empty + args = line.split(" ") + + if not trailing == None: + args.append(typing.cast(str, trailing)) + return ParsedLine(command, args, source, tags) + class SentLine(IRCObject.Object): def __init__(self, events: "EventManager.Events", send_time: datetime.datetime, hostmask: str, line: ParsedLine): @@ -161,3 +211,32 @@ class SentLine(IRCObject.Object): return self.parsed_line.truncate(self._hostmask)[0] def for_wire(self) -> bytes: return b"%s\r\n" % self._for_wire().encode("utf8") + +class IRCBatch(object): + def __init__(self, identifier: str, batch_type: str, args: typing.List[str], + tags: typing.Dict[str, str]=None, source: Hostmask=None): + self.identifier = identifier + self.type = batch_type + self.args = args + self.tags = tags or {} + self.source = source + self._lines = [] # type: typing.List[ParsedLine] + def add_line(self, line: ParsedLine): + self._lines.append(line) + def get_lines(self) -> typing.List[ParsedLine]: + return self._lines + +class IRCSendBatch(IRCBatch): + def __init__(self, batch_type: str, args: typing.List[str], + tags: typing.Dict[str, str]=None): + IRCBatch.__init__(self, str(uuid.uuid4()), batch_type, args, tags) + def get_lines(self) -> typing.List[ParsedLine]: + lines = [] + for line in self._lines: + line.add_tag("batch", self.identifier) + lines.append(line) + + lines.insert(0, ParsedLine("BATCH", + ["+%s" % self.identifier, self.type])) + lines.append(ParsedLine("BATCH", ["-%s" % self.identifier])) + return lines diff --git a/src/IRCServer.py b/src/IRCServer.py index 8fed6f46..17b0bc13 100644 --- a/src/IRCServer.py +++ b/src/IRCServer.py @@ -229,7 +229,7 @@ class Server(IRCObject.Object): for line in lines: self.bot.log.debug("%s (raw recv) | %s", [str(self), line]) self.events.on("raw.received").call_unsafe(server=self, - line=utils.irc.parse_line(line)) + line=IRCLine.parse_line(line)) self.check_users() def check_users(self): for user in self.new_users: @@ -294,7 +294,7 @@ class Server(IRCObject.Object): return line_obj return None def send_raw(self, line: str): - return self.send(utils.irc.parse_line(line)) + return self.send(IRCLine.parse_line(line)) def send_user(self, username: str, realname: str ) -> typing.Optional[IRCLine.SentLine]: diff --git a/src/utils/irc/__init__.py b/src/utils/irc/__init__.py index 160d55c2..fa544c5d 100644 --- a/src/utils/irc/__init__.py +++ b/src/utils/irc/__init__.py @@ -1,5 +1,5 @@ import json, string, re, typing, uuid -from src import IRCLine, utils +from src import utils from . import protocol ASCII_UPPER = string.ascii_uppercase @@ -10,7 +10,7 @@ RFC1459_UPPER = STRICT_RFC1459_UPPER+"^" RFC1459_LOWER = STRICT_RFC1459_LOWER+"~" # case mapping lowercase/uppcase logic -def _multi_replace(s: str, +def multi_replace(s: str, chars1: typing.Iterable[str], chars2: typing.Iterable[str]) -> str: for char1, char2 in zip(chars1, chars2): @@ -18,11 +18,11 @@ def _multi_replace(s: str, return s def lower(case_mapping: str, s: str) -> str: if case_mapping == "ascii": - return _multi_replace(s, ASCII_UPPER, ASCII_LOWER) + return multi_replace(s, ASCII_UPPER, ASCII_LOWER) elif case_mapping == "rfc1459": - return _multi_replace(s, RFC1459_UPPER, RFC1459_LOWER) + return multi_replace(s, RFC1459_UPPER, RFC1459_LOWER) elif case_mapping == "strict-rfc1459": - return _multi_replace(s, STRICT_RFC1459_UPPER, STRICT_RFC1459_LOWER) + return multi_replace(s, STRICT_RFC1459_UPPER, STRICT_RFC1459_LOWER) else: raise ValueError("unknown casemapping '%s'" % case_mapping) @@ -30,55 +30,6 @@ def lower(case_mapping: str, s: str) -> str: def equals(case_mapping: str, s1: str, s2: str) -> bool: return lower(case_mapping, s1) == lower(case_mapping, s2) -def parse_hostmask(hostmask: str) -> IRCLine.Hostmask: - nickname, _, username = hostmask.partition("!") - username, _, hostname = username.partition("@") - return IRCLine.Hostmask(nickname, username, hostname, hostmask) - -MESSAGE_TAG_ESCAPED = [r"\:", r"\s", r"\\", r"\r", r"\n"] -MESSAGE_TAG_UNESCAPED = [";", " ", "\\", "\r", "\n"] -def message_tag_escape(s): - return _multi_replace(s, MESSAGE_TAG_UNESCAPED, MESSAGE_TAG_ESCAPED) -def message_tag_unescape(s): - unescaped = _multi_replace(s, MESSAGE_TAG_ESCAPED, MESSAGE_TAG_UNESCAPED) - return unescaped.replace("\\", "") - -def parse_line(line: str) -> IRCLine.ParsedLine: - tags = {} # type: typing.Dict[str, typing.Any] - source = None # type: typing.Optional[IRCLine.Hostmask] - command = None - - if line[0] == "@": - tags_prefix, line = line[1:].split(" ", 1) - - for tag in filter(None, tags_prefix.split(";")): - tag, sep, value = tag.partition("=") - if value: - tags[tag] = message_tag_unescape(value) - else: - tags[tag] = None - - line, trailing_separator, trailing_split = line.partition(" :") - - trailing = None # type: typing.Optional[str] - if trailing_separator: - trailing = trailing_split - - if line[0] == ":": - source_str, line = line[1:].split(" ", 1) - source = parse_hostmask(source_str) - - command, sep, line = line.partition(" ") - args = [] # type: typing.List[str] - if line: - # this is so that `args` is empty if `line` is empty - args = line.split(" ") - - if not trailing == None: - args.append(typing.cast(str, trailing)) - - return IRCLine.ParsedLine(command, args, source, tags) - REGEX_COLOR = re.compile("%s(?:(\d{1,2})(?:,(\d{1,2}))?)?" % utils.consts.COLOR) def color(s: str, foreground: utils.consts.IRCColor, @@ -256,35 +207,6 @@ def parse_ctcp(s: str) -> typing.Optional[CTCPMessage]: return None -class IRCBatch(object): - def __init__(self, identifier: str, batch_type: str, args: typing.List[str], - tags: typing.Dict[str, str]=None, source: IRCLine.Hostmask=None): - self.identifier = identifier - self.type = batch_type - self.args = args - self.tags = tags or {} - self.source = source - self._lines = [] # type: typing.List[IRCLine.ParsedLine] - def add_line(self, line: IRCLine.ParsedLine): - self._lines.append(line) - def get_lines(self) -> typing.List[IRCLine.ParsedLine]: - return self._lines - -class IRCSendBatch(IRCBatch): - def __init__(self, batch_type: str, args: typing.List[str], - tags: typing.Dict[str, str]=None): - IRCBatch.__init__(self, str(uuid.uuid4()), batch_type, args, tags) - def get_lines(self) -> typing.List[IRCLine.ParsedLine]: - lines = [] - for line in self._lines: - line.add_tag("batch", self.identifier) - lines.append(line) - - lines.insert(0, IRCLine.ParsedLine("BATCH", - ["+%s" % self.identifier, self.type])) - lines.append(IRCLine.ParsedLine("BATCH", ["-%s" % self.identifier])) - return lines - class Capability(object): def __init__(self, ratified_name: typing.Optional[str], draft_name: str=None, alias: str=None, |
