diff options
| author | 2019-10-27 10:19:00 +0000 | |
|---|---|---|
| committer | 2019-10-27 10:19:00 +0000 | |
| commit | 8f4b5a0e70804f8f19f8b9032b7d93857cda40e8 (patch) | |
| tree | c522c8a04f4173cfc104796dd5cf942f2c44a0fb /src/utils | |
| parent | fallback ActivityPub data encoding to utf8 (diff) | |
| signature | ||
move IRCLine related code from utils.irc to IRCLine.py
Diffstat (limited to 'src/utils')
| -rw-r--r-- | src/utils/irc/__init__.py | 88 |
1 files changed, 5 insertions, 83 deletions
diff --git a/src/utils/irc/__init__.py b/src/utils/irc/__init__.py index 160d55c2..fa544c5d 100644 --- a/src/utils/irc/__init__.py +++ b/src/utils/irc/__init__.py @@ -1,5 +1,5 @@ import json, string, re, typing, uuid -from src import IRCLine, utils +from src import utils from . import protocol ASCII_UPPER = string.ascii_uppercase @@ -10,7 +10,7 @@ RFC1459_UPPER = STRICT_RFC1459_UPPER+"^" RFC1459_LOWER = STRICT_RFC1459_LOWER+"~" # case mapping lowercase/uppcase logic -def _multi_replace(s: str, +def multi_replace(s: str, chars1: typing.Iterable[str], chars2: typing.Iterable[str]) -> str: for char1, char2 in zip(chars1, chars2): @@ -18,11 +18,11 @@ def _multi_replace(s: str, return s def lower(case_mapping: str, s: str) -> str: if case_mapping == "ascii": - return _multi_replace(s, ASCII_UPPER, ASCII_LOWER) + return multi_replace(s, ASCII_UPPER, ASCII_LOWER) elif case_mapping == "rfc1459": - return _multi_replace(s, RFC1459_UPPER, RFC1459_LOWER) + return multi_replace(s, RFC1459_UPPER, RFC1459_LOWER) elif case_mapping == "strict-rfc1459": - return _multi_replace(s, STRICT_RFC1459_UPPER, STRICT_RFC1459_LOWER) + return multi_replace(s, STRICT_RFC1459_UPPER, STRICT_RFC1459_LOWER) else: raise ValueError("unknown casemapping '%s'" % case_mapping) @@ -30,55 +30,6 @@ def lower(case_mapping: str, s: str) -> str: def equals(case_mapping: str, s1: str, s2: str) -> bool: return lower(case_mapping, s1) == lower(case_mapping, s2) -def parse_hostmask(hostmask: str) -> IRCLine.Hostmask: - nickname, _, username = hostmask.partition("!") - username, _, hostname = username.partition("@") - return IRCLine.Hostmask(nickname, username, hostname, hostmask) - -MESSAGE_TAG_ESCAPED = [r"\:", r"\s", r"\\", r"\r", r"\n"] -MESSAGE_TAG_UNESCAPED = [";", " ", "\\", "\r", "\n"] -def message_tag_escape(s): - return _multi_replace(s, MESSAGE_TAG_UNESCAPED, MESSAGE_TAG_ESCAPED) -def message_tag_unescape(s): - unescaped = _multi_replace(s, MESSAGE_TAG_ESCAPED, MESSAGE_TAG_UNESCAPED) - return unescaped.replace("\\", "") - -def parse_line(line: str) -> IRCLine.ParsedLine: - tags = {} # type: typing.Dict[str, typing.Any] - source = None # type: typing.Optional[IRCLine.Hostmask] - command = None - - if line[0] == "@": - tags_prefix, line = line[1:].split(" ", 1) - - for tag in filter(None, tags_prefix.split(";")): - tag, sep, value = tag.partition("=") - if value: - tags[tag] = message_tag_unescape(value) - else: - tags[tag] = None - - line, trailing_separator, trailing_split = line.partition(" :") - - trailing = None # type: typing.Optional[str] - if trailing_separator: - trailing = trailing_split - - if line[0] == ":": - source_str, line = line[1:].split(" ", 1) - source = parse_hostmask(source_str) - - command, sep, line = line.partition(" ") - args = [] # type: typing.List[str] - if line: - # this is so that `args` is empty if `line` is empty - args = line.split(" ") - - if not trailing == None: - args.append(typing.cast(str, trailing)) - - return IRCLine.ParsedLine(command, args, source, tags) - REGEX_COLOR = re.compile("%s(?:(\d{1,2})(?:,(\d{1,2}))?)?" % utils.consts.COLOR) def color(s: str, foreground: utils.consts.IRCColor, @@ -256,35 +207,6 @@ def parse_ctcp(s: str) -> typing.Optional[CTCPMessage]: return None -class IRCBatch(object): - def __init__(self, identifier: str, batch_type: str, args: typing.List[str], - tags: typing.Dict[str, str]=None, source: IRCLine.Hostmask=None): - self.identifier = identifier - self.type = batch_type - self.args = args - self.tags = tags or {} - self.source = source - self._lines = [] # type: typing.List[IRCLine.ParsedLine] - def add_line(self, line: IRCLine.ParsedLine): - self._lines.append(line) - def get_lines(self) -> typing.List[IRCLine.ParsedLine]: - return self._lines - -class IRCSendBatch(IRCBatch): - def __init__(self, batch_type: str, args: typing.List[str], - tags: typing.Dict[str, str]=None): - IRCBatch.__init__(self, str(uuid.uuid4()), batch_type, args, tags) - def get_lines(self) -> typing.List[IRCLine.ParsedLine]: - lines = [] - for line in self._lines: - line.add_tag("batch", self.identifier) - lines.append(line) - - lines.insert(0, IRCLine.ParsedLine("BATCH", - ["+%s" % self.identifier, self.type])) - lines.append(IRCLine.ParsedLine("BATCH", ["-%s" % self.identifier])) - return lines - class Capability(object): def __init__(self, ratified_name: typing.Optional[str], draft_name: str=None, alias: str=None, |
