aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGravatar jesopo2019-02-23 21:33:04 +0000
committerGravatar jesopo2019-02-23 21:33:04 +0000
commit8c94bcf6caf0ae88b3a67d0a73389a7e60810e1c (patch)
tree8d72b0760aef71da12403696e9acd387d288e4e0 /src
parent!raw needs to parse the line it's given in to an IRCParsedLine now (diff)
signature
Move utils.irc.IRCParsedLine to IRCLine.ParsedLine, improve truncation
mechanism, don't convert sent line from ParsedLine to text to ParsedLine for line_handler handling
Diffstat (limited to 'src')
-rw-r--r--src/IRCLine.py133
-rw-r--r--src/IRCServer.py17
-rw-r--r--src/IRCSocket.py9
-rw-r--r--src/utils/__init__.py13
-rw-r--r--src/utils/irc/__init__.py89
-rw-r--r--src/utils/irc/protocol.py108
6 files changed, 184 insertions, 185 deletions
diff --git a/src/IRCLine.py b/src/IRCLine.py
index 3651d9f7..64aa573e 100644
--- a/src/IRCLine.py
+++ b/src/IRCLine.py
@@ -1,23 +1,90 @@
import datetime, typing
-from src import IRCObject, IRCServer, utils
+from src import IRCObject
# this should be 510 (RFC1459, 512 with \r\n) but a server BitBot uses is broken
LINE_CUTOFF = 470
-class Line(IRCObject.Object):
- def __init__(self, server: "IRCServer.Server", send_time: datetime.datetime,
- line: str):
- self.server = server
- self._line = line
- self.send_time = send_time
+class IRCArgs(object):
+ def __init__(self, args: typing.List[str]):
+ self._args = args
+
+ def get(self, index: int) -> typing.Optional[str]:
+ if len(self._args) > index:
+ return self._args[index]
+ return None
+
+ def __repr__(self):
+ return "IRCArgs(%s)" % self._args
+ def __len__(self) -> int:
+ return len(self._args)
+ def __getitem__(self, index) -> str:
+ return self._args[index]
+
+class Hostmask(object):
+ def __init__(self, nickname: str, username: str, hostname: str,
+ hostmask: str):
+ self.nickname = nickname
+ self.username = username
+ self.hostname = hostname
+ self.hostmask = hostmask
+ def __repr__(self):
+ return "Hostmask(%s)" % self.__str__()
+ def __str__(self):
+ return self.hostmask
+
+class ParsedLine(object):
+ def __init__(self, command: str, args: typing.List[str],
+ prefix: Hostmask=None,
+ tags: typing.Dict[str, str]={}):
+ self.command = command
+ self._args = args
+ self.args = IRCArgs(args)
+ self.prefix = prefix
+ self.tags = {} if tags == None else tags
+
+ def _tag_str(self, tags: typing.Dict[str, str]) -> str:
+ tag_str = ""
+ for tag, value in tags.items():
+ if tag_str:
+ tag_str += ","
+ tag_str += tag
+ if value:
+ tag_str += "=%s" % value
+ if tag_str:
+ tag_str = "@%s" % tag_str
+ return tag_str
+
+ def format(self) -> str:
+ s = ""
+ if self.tags:
+ s += "%s " % self._tag_str(self.tags)
+
+ if self.prefix:
+ s += "%s " % self.prefix
- data, truncated = utils.encode_truncate(line, "utf8",
- self._char_limit())
+ s += self.command.upper()
- self._data = data
- self._truncated = truncated
+ if self.args:
+ if len(self._args) > 1:
+ s += " %s" % " ".join(self._args[:-1])
- self._on_send = [] # type: typing.List[typing.Callable[[], None]]
+ s += " "
+ if " " in self._args[-1] or self._args[-1][0] == ":":
+ s += ":%s" % self._args[-1]
+ else:
+ s += self._args[-1]
+
+ return s
+
+class Line(IRCObject.Object):
+ def __init__(self, send_time: datetime.datetime, hostmask: str,
+ line: ParsedLine):
+ self.send_time = send_time
+ self._hostmask = hostmask
+ self.parsed_line = line
+
+ self._on_send: typing.List[typing.Callable[[], None]] = []
+ self.truncate_marker: typing.Optional[str] = None
def __repr__(self) -> str:
return "IRCLine.Line(%s)" % self.__str__()
@@ -25,25 +92,43 @@ class Line(IRCObject.Object):
return self.decoded_data()
def _char_limit(self) -> int:
- return LINE_CUTOFF-len(":%s " % self.server.hostmask())
+ return LINE_CUTOFF-len(":%s " % self._hostmask)
+ def _encode_truncate(self) -> typing.Tuple[bytes, str]:
+ line = self.parsed_line.format()
+ byte_max = self._char_limit()
+ encoded = b""
+ truncated = ""
+ truncate_marker = b""
+ if not self.truncate_marker == None:
+ truncate_marker = typing.cast(str, self.truncate_marker
+ ).encode("utf8")
+
+ for i, character in enumerate(line):
+ encoded_character = character.encode("utf8")
+ new_len = len(encoded + encoded_character)
+ if truncate_marker and (byte_max-new_len) < len(truncate_marker):
+ encoded += truncate_marker
+ truncated = line[i:]
+ break
+ elif new_len > byte_max:
+ truncated = line[i:]
+ break
+ else:
+ encoded += encoded_character
+ return (encoded, truncated)
+
+ def _data(self) -> bytes:
+ return self._encode_truncate()[0]
def data(self) -> bytes:
- return b"%s\r\n" % self._data
+ return b"%s\r\n" % self._data()
def decoded_data(self) -> str:
- return self._data.decode("utf8")
+ return self._data().decode("utf8")
def truncated(self) -> str:
- return self._truncated
+ return self._encode_truncate()[1]
def on_send(self, func: typing.Callable[[], None]):
self._on_send.append(func)
def sent(self):
for func in self._on_send[:]:
func()
-
- def end_replace(self, s: str):
- s_encoded = s.encode("utf8")
- s_len = len(s_encoded)
-
- removed = self._data[-s_len:].decode("utf8")
- self._truncated = removed+self._truncated
- self._data = self._data[:-s_len]+s_encoded
diff --git a/src/IRCServer.py b/src/IRCServer.py
index 07116484..663f4703 100644
--- a/src/IRCServer.py
+++ b/src/IRCServer.py
@@ -29,7 +29,7 @@ class Server(IRCObject.Object):
self.agreed_capabilities = set([]) # type: typing.Set[str]
self.requested_capabilities = [] # type: typing.List[str]
self.server_capabilities = {} # type: typing.Dict[str, str]
- self.batches = {} # type: typing.Dict[str, utils.irc.IRCParsedLine]
+ self.batches = {} # type: typing.Dict[str, IRCLine.ParsedLine]
self.cap_started = False
self.users = {} # type: typing.Dict[str, IRCUser.User]
@@ -237,23 +237,24 @@ class Server(IRCObject.Object):
self.set_setting("last-read", utils.iso8601_format(now))
return lines
- def send(self, line_parsed: utils.irc.IRCParsedLine):
+ def send(self, line_parsed: IRCLine.ParsedLine):
line = line_parsed.format()
results = self.events.on("preprocess.send").call_unsafe(
server=self, line=line)
- for result in results:
- if result:
- line = result
- break
+ results = list(filter(None, results))
+ if results:
+ line = results[0]
+
line_stripped = line.split("\n", 1)[0].strip("\r")
- line_obj = IRCLine.Line(self, datetime.datetime.utcnow(), line_stripped)
+ line_obj = IRCLine.Line(datetime.datetime.utcnow(), self.hostmask(),
+ line_parsed)
self.socket.send(line_obj)
return line_obj
def _send(self):
lines = self.socket._send()
for line in lines:
- self.bot.log.debug("%s (raw send) | %s", [str(self), line])
+ self.bot.log.debug("%s (raw send) | %s", [str(self), line.format()])
self.events.on("raw.send").call_unsafe(server=self, line=line)
def send_user(self, username: str, realname: str) -> IRCLine.Line:
diff --git a/src/IRCSocket.py b/src/IRCSocket.py
index af1c3a5e..9ba677f4 100644
--- a/src/IRCSocket.py
+++ b/src/IRCSocket.py
@@ -120,15 +120,14 @@ class Socket(IRCObject.Object):
def send(self, line: IRCLine.Line):
self._queued_lines.append(line)
- def _send(self) -> typing.List[str]:
- decoded_sent = []
+ def _send(self) -> typing.List[IRCLine.ParsedLine]:
+ sent_lines = []
throttle_space = self.throttle_space()
if throttle_space:
to_buffer = self._queued_lines[:throttle_space]
self._queued_lines = self._queued_lines[throttle_space:]
for line in to_buffer:
- decoded_data = line.decoded_data()
- decoded_sent.append(decoded_data)
+ sent_lines.append(line.parsed_line)
self._write_buffer += line.data()
self._buffered_lines.append(line)
@@ -147,7 +146,7 @@ class Socket(IRCObject.Object):
self._recent_sends.append(now)
self.last_send = now
- return decoded_sent
+ return sent_lines
def waiting_send(self) -> bool:
return bool(len(self._write_buffer)) or bool(len(self._queued_lines))
diff --git a/src/utils/__init__.py b/src/utils/__init__.py
index 90f3e6ba..609b0eaa 100644
--- a/src/utils/__init__.py
+++ b/src/utils/__init__.py
@@ -199,16 +199,3 @@ def is_ip(s: str) -> bool:
except ValueError:
return False
return True
-
-def encode_truncate(s: str, encoding: str, byte_max: int
- ) -> typing.Tuple[bytes, str]:
- encoded = b""
- truncated = ""
- for i, character in enumerate(s):
- encoded_character = character.encode(encoding)
- if len(encoded + encoded_character) > byte_max:
- truncated = s[i:]
- break
- else:
- encoded += encoded_character
- return encoded, truncated
diff --git a/src/utils/irc/__init__.py b/src/utils/irc/__init__.py
index f39d476a..b38b6802 100644
--- a/src/utils/irc/__init__.py
+++ b/src/utils/irc/__init__.py
@@ -1,5 +1,5 @@
import json, string, re, typing
-from src import utils
+from src import IRCLine, utils
from . import protocol
ASCII_UPPER = string.ascii_uppercase
@@ -30,77 +30,10 @@ def lower(case_mapping: str, s: str) -> str:
def equals(case_mapping: str, s1: str, s2: str) -> bool:
return lower(case_mapping, s1) == lower(case_mapping, s2)
-class IRCHostmask(object):
- def __init__(self, nickname: str, username: str, hostname: str,
- hostmask: str):
- self.nickname = nickname
- self.username = username
- self.hostname = hostname
- self.hostmask = hostmask
- def __repr__(self):
- return "IRCHostmask(%s)" % self.__str__()
- def __str__(self):
- return self.hostmask
-
-def seperate_hostmask(hostmask: str) -> IRCHostmask:
+def seperate_hostmask(hostmask: str) -> IRCLine.Hostmask:
nickname, _, username = hostmask.partition("!")
username, _, hostname = username.partition("@")
- return IRCHostmask(nickname, username, hostname, hostmask)
-
-class IRCArgs(object):
- def __init__(self, args: typing.List[str]):
- self._args = args
-
- def get(self, index: int) -> typing.Optional[str]:
- if len(self._args) > index:
- return self._args[index]
- return None
-
- def __repr__(self):
- return "IRCArgs(%s)" % self._args
- def __len__(self) -> int:
- return len(self._args)
- def __getitem__(self, index) -> str:
- return self._args[index]
-
-def _tag_str(tags: typing.Dict[str, str]) -> str:
- tag_str = ""
- for tag, value in tags.items():
- if tag_str:
- tag_str += ","
- tag_str += tag
- if value:
- tag_str += "=%s" % value
- if tag_str:
- tag_str = "@%s" % tag_str
- return tag_str
-
-class IRCParsedLine(object):
- def __init__(self, command: str, args: typing.List[str],
- prefix: IRCHostmask=None,
- tags: typing.Dict[str, str]={}):
- self.command = command
- self._args = args
- self.args = IRCArgs(args)
- self.prefix = prefix
- self.tags = {} if tags == None else tags
-
- def format(self) -> str:
- s = ""
- if self.tags:
- s += "%s " % _tag_str(self.tags)
-
- if self.prefix:
- s += "%s " % self.prefix
-
- s += self.command.upper()
-
- if self.args:
- if len(self._args) > 1:
- s += " %s" % " ".join(self._args[:-1])
- s += " %s" % trailing(self._args[-1])
-
- return s
+ return IRCLine.Hostmask(nickname, username, hostname, hostmask)
MESSAGE_TAG_ESCAPED = [r"\:", r"\s", r"\\", r"\r", r"\n"]
MESSAGE_TAG_UNESCAPED = [";", " ", "\\", "\r", "\n"]
@@ -110,9 +43,9 @@ def message_tag_unescape(s):
unescaped = _multi_replace(s, MESSAGE_TAG_ESCAPED, MESSAGE_TAG_UNESCAPED)
return unescaped.replace("\\", "")
-def parse_line(line: str) -> IRCParsedLine:
+def parse_line(line: str) -> IRCLine.ParsedLine:
tags = {} # type: typing.Dict[str, typing.Any]
- prefix = None # type: typing.Optional[IRCHostmask]
+ prefix = None # type: typing.Optional[IRCLine.Hostmask]
command = None
if line[0] == "@":
@@ -144,7 +77,7 @@ def parse_line(line: str) -> IRCParsedLine:
if not trailing == None:
args.append(typing.cast(str, trailing))
- return IRCParsedLine(command, args, prefix, tags)
+ return IRCLine.ParsedLine(command, args, prefix, tags)
REGEX_COLOR = re.compile("%s(?:(\d{1,2})(?:,(\d{1,2}))?)?" % utils.consts.COLOR)
@@ -322,20 +255,14 @@ class IRCBatch(object):
self.id = identifier
self.type = batch_type
self.tags = tags
- self.lines = [] # type: typing.List[IRCParsedLine]
+ self.lines = [] # type: typing.List[IRCLine.ParsedLine]
class IRCRecvBatch(IRCBatch):
pass
class IRCSendBatch(IRCBatch):
- def _add_line(self, line: IRCParsedLine):
+ def _add_line(self, line: IRCLine.ParsedLine):
line.tags["batch"] = self.id
self.lines.append(line)
def message(self, target: str, message: str, tags: dict={}):
self._add_line(utils.irc.protocol.message(target, message, tags))
def notice(self, target: str, message: str, tags: dict={}):
self._add_line(utils.irc.protocol.notice(target, message, tags))
-
-def trailing(s: str) -> str:
- if s[0] == ":" or " " in s:
- return ":%s" % s
- else:
- return s
diff --git a/src/utils/irc/protocol.py b/src/utils/irc/protocol.py
index 2c57ad25..0ccf684f 100644
--- a/src/utils/irc/protocol.py
+++ b/src/utils/irc/protocol.py
@@ -1,90 +1,90 @@
import typing
-from src import utils
+from src import IRCLine, utils
-def user(username: str, realname: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("USER", [username, "0", "*", realname])
-def nick(nickname: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("NICK", [nickname])
+def user(username: str, realname: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("USER", [username, "0", "*", realname])
+def nick(nickname: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("NICK", [nickname])
-def capability_ls() -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("CAP", ["LS", "302"])
-def capability_request(capability: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("CAP", ["REQ", capability])
-def capability_end() -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("CAP", ["END"])
-def authenticate(text: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("AUTHENTICATE", [text])
+def capability_ls() -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("CAP", ["LS", "302"])
+def capability_request(capability: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("CAP", ["REQ", capability])
+def capability_end() -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("CAP", ["END"])
+def authenticate(text: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("AUTHENTICATE", [text])
-def password(password: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PASS", [password])
+def password(password: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PASS", [password])
-def ping(nonce: str="hello") -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PING", [nonce])
-def pong(nonce: str="hello") -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PONG", [nonce])
+def ping(nonce: str="hello") -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PING", [nonce])
+def pong(nonce: str="hello") -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PONG", [nonce])
def join(channel_name: str, keys: typing.List[str]=None
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("JOIN", [channel_name]+(
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("JOIN", [channel_name]+(
keys if keys else []))
-def part(channel_name: str, reason: str=None) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PART", [channel_name]+(
+def part(channel_name: str, reason: str=None) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PART", [channel_name]+(
[reason] if reason else []))
-def quit(reason: str=None) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("QUIT", [reason] if reason else [])
+def quit(reason: str=None) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("QUIT", [reason] if reason else [])
def message(target: str, message: str, tags: typing.Dict[str, str]={}
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PRIVMSG", [target, message], tags=tags)
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PRIVMSG", [target, message], tags=tags)
def notice(target: str, message: str, tags: typing.Dict[str, str]={}
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("NOTICE", [target, message], tags=tags)
-def tagmsg(target, tags: dict) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("TAGMSG", [target], tags=tags)
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("NOTICE", [target, message], tags=tags)
+def tagmsg(target, tags: dict) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("TAGMSG", [target], tags=tags)
def mode(target: str, mode: str=None, args: typing.List[str]=None
- ) -> 'utils.irc.IRCParsedLine':
+ ) -> IRCLine.ParsedLine:
command_args = [target]
if mode:
command_args.append(mode)
if args:
command_args = command_args+args
- return utils.irc.IRCParsedLine("MODE", command_args)
+ return IRCLine.ParsedLine("MODE", command_args)
-def topic(channel_name: str, topic: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("TOPIC", [channel_name, topic])
+def topic(channel_name: str, topic: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("TOPIC", [channel_name, topic])
def kick(channel_name: str, target: str, reason: str=None
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("KICK", [channel_name, target]+(
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("KICK", [channel_name, target]+(
[reason] if reason else []))
-def names(channel_name: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("NAMES", [channel_name])
-def list(search_for: str=None) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("LIST", [search_for] if search_for else [])
-def invite(target: str, channel_name: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("INVITE", [target, channel_name])
+def names(channel_name: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("NAMES", [channel_name])
+def list(search_for: str=None) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("LIST", [search_for] if search_for else [])
+def invite(target: str, channel_name: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("INVITE", [target, channel_name])
-def whois(target: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("WHOIS", [target])
+def whois(target: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("WHOIS", [target])
def whowas(target: str, amount: int=None, server: str=None
- ) -> 'utils.irc.IRCParsedLine':
+ ) -> IRCLine.ParsedLine:
command_args = [target]
if amount:
command_args.append(str(amount))
if server:
command_args.append(server)
- return utils.irc.IRCParsedLine("WHOWAS", command_args)
-def who(filter: str=None) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("WHO", [filter] if filter else [])
+ return IRCLine.ParsedLine("WHOWAS", command_args)
+def who(filter: str=None) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("WHO", [filter] if filter else [])
def whox(mask: str, filter: str, fields: str, label: str=None
- ) -> 'utils.irc.IRCParsedLine':
+ ) -> IRCLine.ParsedLine:
flags = "%s%%%s%s" % (filter, fields, ","+label if label else "")
- return utils.irc.IRCParsedLine("WHO", [mask, flags])
+ return IRCLine.ParsedLine("WHO", [mask, flags])
def batch_start(identifier: str, batch_type: str, tags: typing.Dict[str, str]={}
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("BATCH", ["+%s" % identifier, batch_type],
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("BATCH", ["+%s" % identifier, batch_type],
tags=tags)
def batch_end(identifier: str, tags: typing.Dict[str, str]={}):
- return utils.irc.IRCParsedLine("BATCH", ["-%s" % identifier], tags=tags)
+ return IRCLine.ParsedLine("BATCH", ["-%s" % identifier], tags=tags)