aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGravatar jesopo2019-10-27 10:19:00 +0000
committerGravatar jesopo2019-10-27 10:19:00 +0000
commit8f4b5a0e70804f8f19f8b9032b7d93857cda40e8 (patch)
treec522c8a04f4173cfc104796dd5cf942f2c44a0fb /src
parentfallback ActivityPub data encoding to utf8 (diff)
signature
move IRCLine related code from utils.irc to IRCLine.py
Diffstat (limited to 'src')
-rw-r--r--src/IRCLine.py81
-rw-r--r--src/IRCServer.py4
-rw-r--r--src/utils/irc/__init__.py88
3 files changed, 87 insertions, 86 deletions
diff --git a/src/IRCLine.py b/src/IRCLine.py
index e4f5462a..75c88215 100644
--- a/src/IRCLine.py
+++ b/src/IRCLine.py
@@ -37,6 +37,21 @@ class Hostmask(object):
def __str__(self):
return self.hostmask
+def parse_hostmask(hostmask: str) -> Hostmask:
+ nickname, _, username = hostmask.partition("!")
+ username, _, hostname = username.partition("@")
+ return Hostmask(nickname, username, hostname, hostmask)
+
+MESSAGE_TAG_ESCAPED = [r"\:", r"\s", r"\\", r"\r", r"\n"]
+MESSAGE_TAG_UNESCAPED = [";", " ", "\\", "\r", "\n"]
+def message_tag_escape(s):
+ return utils.irc.multi_replace(s, MESSAGE_TAG_UNESCAPED,
+ MESSAGE_TAG_ESCAPED)
+def message_tag_unescape(s):
+ unescaped = utils.irc.multi_replace(s, MESSAGE_TAG_ESCAPED,
+ MESSAGE_TAG_UNESCAPED)
+ return unescaped.replace("\\", "")
+
class ParsedLine(object):
def __init__(self, command: str, args: typing.List[str],
source: Hostmask=None,
@@ -76,7 +91,7 @@ class ParsedLine(object):
tag_pieces = []
for tag, value in tags.items():
if value:
- value_escaped = utils.irc.message_tag_escape(value)
+ value_escaped = message_tag_escape(value)
tag_pieces.append("%s=%s" % (tag, value_escaped))
else:
tag_pieces.append(tag)
@@ -144,6 +159,41 @@ class ParsedLine(object):
return valid, overflow
+def parse_line(line: str) -> ParsedLine:
+ tags = {} # type: typing.Dict[str, typing.Any]
+ source = None # type: typing.Optional[Hostmask]
+ command = None
+
+ if line[0] == "@":
+ tags_prefix, line = line[1:].split(" ", 1)
+
+ for tag in filter(None, tags_prefix.split(";")):
+ tag, sep, value = tag.partition("=")
+ if value:
+ tags[tag] = message_tag_unescape(value)
+ else:
+ tags[tag] = None
+
+ line, trailing_separator, trailing_split = line.partition(" :")
+
+ trailing = None # type: typing.Optional[str]
+ if trailing_separator:
+ trailing = trailing_split
+
+ if line[0] == ":":
+ source_str, line = line[1:].split(" ", 1)
+ source = parse_hostmask(source_str)
+
+ command, sep, line = line.partition(" ")
+ args = [] # type: typing.List[str]
+ if line:
+ # this is so that `args` is empty if `line` is empty
+ args = line.split(" ")
+
+ if not trailing == None:
+ args.append(typing.cast(str, trailing))
+ return ParsedLine(command, args, source, tags)
+
class SentLine(IRCObject.Object):
def __init__(self, events: "EventManager.Events",
send_time: datetime.datetime, hostmask: str, line: ParsedLine):
@@ -161,3 +211,32 @@ class SentLine(IRCObject.Object):
return self.parsed_line.truncate(self._hostmask)[0]
def for_wire(self) -> bytes:
return b"%s\r\n" % self._for_wire().encode("utf8")
+
+class IRCBatch(object):
+ def __init__(self, identifier: str, batch_type: str, args: typing.List[str],
+ tags: typing.Dict[str, str]=None, source: Hostmask=None):
+ self.identifier = identifier
+ self.type = batch_type
+ self.args = args
+ self.tags = tags or {}
+ self.source = source
+ self._lines = [] # type: typing.List[ParsedLine]
+ def add_line(self, line: ParsedLine):
+ self._lines.append(line)
+ def get_lines(self) -> typing.List[ParsedLine]:
+ return self._lines
+
+class IRCSendBatch(IRCBatch):
+ def __init__(self, batch_type: str, args: typing.List[str],
+ tags: typing.Dict[str, str]=None):
+ IRCBatch.__init__(self, str(uuid.uuid4()), batch_type, args, tags)
+ def get_lines(self) -> typing.List[ParsedLine]:
+ lines = []
+ for line in self._lines:
+ line.add_tag("batch", self.identifier)
+ lines.append(line)
+
+ lines.insert(0, ParsedLine("BATCH",
+ ["+%s" % self.identifier, self.type]))
+ lines.append(ParsedLine("BATCH", ["-%s" % self.identifier]))
+ return lines
diff --git a/src/IRCServer.py b/src/IRCServer.py
index 8fed6f46..17b0bc13 100644
--- a/src/IRCServer.py
+++ b/src/IRCServer.py
@@ -229,7 +229,7 @@ class Server(IRCObject.Object):
for line in lines:
self.bot.log.debug("%s (raw recv) | %s", [str(self), line])
self.events.on("raw.received").call_unsafe(server=self,
- line=utils.irc.parse_line(line))
+ line=IRCLine.parse_line(line))
self.check_users()
def check_users(self):
for user in self.new_users:
@@ -294,7 +294,7 @@ class Server(IRCObject.Object):
return line_obj
return None
def send_raw(self, line: str):
- return self.send(utils.irc.parse_line(line))
+ return self.send(IRCLine.parse_line(line))
def send_user(self, username: str, realname: str
) -> typing.Optional[IRCLine.SentLine]:
diff --git a/src/utils/irc/__init__.py b/src/utils/irc/__init__.py
index 160d55c2..fa544c5d 100644
--- a/src/utils/irc/__init__.py
+++ b/src/utils/irc/__init__.py
@@ -1,5 +1,5 @@
import json, string, re, typing, uuid
-from src import IRCLine, utils
+from src import utils
from . import protocol
ASCII_UPPER = string.ascii_uppercase
@@ -10,7 +10,7 @@ RFC1459_UPPER = STRICT_RFC1459_UPPER+"^"
RFC1459_LOWER = STRICT_RFC1459_LOWER+"~"
# case mapping lowercase/uppcase logic
-def _multi_replace(s: str,
+def multi_replace(s: str,
chars1: typing.Iterable[str],
chars2: typing.Iterable[str]) -> str:
for char1, char2 in zip(chars1, chars2):
@@ -18,11 +18,11 @@ def _multi_replace(s: str,
return s
def lower(case_mapping: str, s: str) -> str:
if case_mapping == "ascii":
- return _multi_replace(s, ASCII_UPPER, ASCII_LOWER)
+ return multi_replace(s, ASCII_UPPER, ASCII_LOWER)
elif case_mapping == "rfc1459":
- return _multi_replace(s, RFC1459_UPPER, RFC1459_LOWER)
+ return multi_replace(s, RFC1459_UPPER, RFC1459_LOWER)
elif case_mapping == "strict-rfc1459":
- return _multi_replace(s, STRICT_RFC1459_UPPER, STRICT_RFC1459_LOWER)
+ return multi_replace(s, STRICT_RFC1459_UPPER, STRICT_RFC1459_LOWER)
else:
raise ValueError("unknown casemapping '%s'" % case_mapping)
@@ -30,55 +30,6 @@ def lower(case_mapping: str, s: str) -> str:
def equals(case_mapping: str, s1: str, s2: str) -> bool:
return lower(case_mapping, s1) == lower(case_mapping, s2)
-def parse_hostmask(hostmask: str) -> IRCLine.Hostmask:
- nickname, _, username = hostmask.partition("!")
- username, _, hostname = username.partition("@")
- return IRCLine.Hostmask(nickname, username, hostname, hostmask)
-
-MESSAGE_TAG_ESCAPED = [r"\:", r"\s", r"\\", r"\r", r"\n"]
-MESSAGE_TAG_UNESCAPED = [";", " ", "\\", "\r", "\n"]
-def message_tag_escape(s):
- return _multi_replace(s, MESSAGE_TAG_UNESCAPED, MESSAGE_TAG_ESCAPED)
-def message_tag_unescape(s):
- unescaped = _multi_replace(s, MESSAGE_TAG_ESCAPED, MESSAGE_TAG_UNESCAPED)
- return unescaped.replace("\\", "")
-
-def parse_line(line: str) -> IRCLine.ParsedLine:
- tags = {} # type: typing.Dict[str, typing.Any]
- source = None # type: typing.Optional[IRCLine.Hostmask]
- command = None
-
- if line[0] == "@":
- tags_prefix, line = line[1:].split(" ", 1)
-
- for tag in filter(None, tags_prefix.split(";")):
- tag, sep, value = tag.partition("=")
- if value:
- tags[tag] = message_tag_unescape(value)
- else:
- tags[tag] = None
-
- line, trailing_separator, trailing_split = line.partition(" :")
-
- trailing = None # type: typing.Optional[str]
- if trailing_separator:
- trailing = trailing_split
-
- if line[0] == ":":
- source_str, line = line[1:].split(" ", 1)
- source = parse_hostmask(source_str)
-
- command, sep, line = line.partition(" ")
- args = [] # type: typing.List[str]
- if line:
- # this is so that `args` is empty if `line` is empty
- args = line.split(" ")
-
- if not trailing == None:
- args.append(typing.cast(str, trailing))
-
- return IRCLine.ParsedLine(command, args, source, tags)
-
REGEX_COLOR = re.compile("%s(?:(\d{1,2})(?:,(\d{1,2}))?)?" % utils.consts.COLOR)
def color(s: str, foreground: utils.consts.IRCColor,
@@ -256,35 +207,6 @@ def parse_ctcp(s: str) -> typing.Optional[CTCPMessage]:
return None
-class IRCBatch(object):
- def __init__(self, identifier: str, batch_type: str, args: typing.List[str],
- tags: typing.Dict[str, str]=None, source: IRCLine.Hostmask=None):
- self.identifier = identifier
- self.type = batch_type
- self.args = args
- self.tags = tags or {}
- self.source = source
- self._lines = [] # type: typing.List[IRCLine.ParsedLine]
- def add_line(self, line: IRCLine.ParsedLine):
- self._lines.append(line)
- def get_lines(self) -> typing.List[IRCLine.ParsedLine]:
- return self._lines
-
-class IRCSendBatch(IRCBatch):
- def __init__(self, batch_type: str, args: typing.List[str],
- tags: typing.Dict[str, str]=None):
- IRCBatch.__init__(self, str(uuid.uuid4()), batch_type, args, tags)
- def get_lines(self) -> typing.List[IRCLine.ParsedLine]:
- lines = []
- for line in self._lines:
- line.add_tag("batch", self.identifier)
- lines.append(line)
-
- lines.insert(0, IRCLine.ParsedLine("BATCH",
- ["+%s" % self.identifier, self.type]))
- lines.append(IRCLine.ParsedLine("BATCH", ["-%s" % self.identifier]))
- return lines
-
class Capability(object):
def __init__(self, ratified_name: typing.Optional[str],
draft_name: str=None, alias: str=None,