aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/IRCLine.py133
-rw-r--r--src/IRCServer.py17
-rw-r--r--src/IRCSocket.py9
-rw-r--r--src/utils/__init__.py13
-rw-r--r--src/utils/irc/__init__.py89
-rw-r--r--src/utils/irc/protocol.py108
6 files changed, 184 insertions, 185 deletions
diff --git a/src/IRCLine.py b/src/IRCLine.py
index 3651d9f7..64aa573e 100644
--- a/src/IRCLine.py
+++ b/src/IRCLine.py
@@ -1,23 +1,90 @@
import datetime, typing
-from src import IRCObject, IRCServer, utils
+from src import IRCObject
# this should be 510 (RFC1459, 512 with \r\n) but a server BitBot uses is broken
LINE_CUTOFF = 470
-class Line(IRCObject.Object):
- def __init__(self, server: "IRCServer.Server", send_time: datetime.datetime,
- line: str):
- self.server = server
- self._line = line
- self.send_time = send_time
+class IRCArgs(object):
+ def __init__(self, args: typing.List[str]):
+ self._args = args
+
+ def get(self, index: int) -> typing.Optional[str]:
+ if len(self._args) > index:
+ return self._args[index]
+ return None
+
+ def __repr__(self):
+ return "IRCArgs(%s)" % self._args
+ def __len__(self) -> int:
+ return len(self._args)
+ def __getitem__(self, index) -> str:
+ return self._args[index]
+
+class Hostmask(object):
+ def __init__(self, nickname: str, username: str, hostname: str,
+ hostmask: str):
+ self.nickname = nickname
+ self.username = username
+ self.hostname = hostname
+ self.hostmask = hostmask
+ def __repr__(self):
+ return "Hostmask(%s)" % self.__str__()
+ def __str__(self):
+ return self.hostmask
+
+class ParsedLine(object):
+ def __init__(self, command: str, args: typing.List[str],
+ prefix: Hostmask=None,
+ tags: typing.Dict[str, str]={}):
+ self.command = command
+ self._args = args
+ self.args = IRCArgs(args)
+ self.prefix = prefix
+ self.tags = {} if tags == None else tags
+
+ def _tag_str(self, tags: typing.Dict[str, str]) -> str:
+ tag_str = ""
+ for tag, value in tags.items():
+ if tag_str:
+ tag_str += ","
+ tag_str += tag
+ if value:
+ tag_str += "=%s" % value
+ if tag_str:
+ tag_str = "@%s" % tag_str
+ return tag_str
+
+ def format(self) -> str:
+ s = ""
+ if self.tags:
+ s += "%s " % self._tag_str(self.tags)
+
+ if self.prefix:
+ s += "%s " % self.prefix
- data, truncated = utils.encode_truncate(line, "utf8",
- self._char_limit())
+ s += self.command.upper()
- self._data = data
- self._truncated = truncated
+ if self.args:
+ if len(self._args) > 1:
+ s += " %s" % " ".join(self._args[:-1])
- self._on_send = [] # type: typing.List[typing.Callable[[], None]]
+ s += " "
+ if " " in self._args[-1] or self._args[-1][0] == ":":
+ s += ":%s" % self._args[-1]
+ else:
+ s += self._args[-1]
+
+ return s
+
+class Line(IRCObject.Object):
+ def __init__(self, send_time: datetime.datetime, hostmask: str,
+ line: ParsedLine):
+ self.send_time = send_time
+ self._hostmask = hostmask
+ self.parsed_line = line
+
+ self._on_send: typing.List[typing.Callable[[], None]] = []
+ self.truncate_marker: typing.Optional[str] = None
def __repr__(self) -> str:
return "IRCLine.Line(%s)" % self.__str__()
@@ -25,25 +92,43 @@ class Line(IRCObject.Object):
return self.decoded_data()
def _char_limit(self) -> int:
- return LINE_CUTOFF-len(":%s " % self.server.hostmask())
+ return LINE_CUTOFF-len(":%s " % self._hostmask)
+ def _encode_truncate(self) -> typing.Tuple[bytes, str]:
+ line = self.parsed_line.format()
+ byte_max = self._char_limit()
+ encoded = b""
+ truncated = ""
+ truncate_marker = b""
+ if not self.truncate_marker == None:
+ truncate_marker = typing.cast(str, self.truncate_marker
+ ).encode("utf8")
+
+ for i, character in enumerate(line):
+ encoded_character = character.encode("utf8")
+ new_len = len(encoded + encoded_character)
+ if truncate_marker and (byte_max-new_len) < len(truncate_marker):
+ encoded += truncate_marker
+ truncated = line[i:]
+ break
+ elif new_len > byte_max:
+ truncated = line[i:]
+ break
+ else:
+ encoded += encoded_character
+ return (encoded, truncated)
+
+ def _data(self) -> bytes:
+ return self._encode_truncate()[0]
def data(self) -> bytes:
- return b"%s\r\n" % self._data
+ return b"%s\r\n" % self._data()
def decoded_data(self) -> str:
- return self._data.decode("utf8")
+ return self._data().decode("utf8")
def truncated(self) -> str:
- return self._truncated
+ return self._encode_truncate()[1]
def on_send(self, func: typing.Callable[[], None]):
self._on_send.append(func)
def sent(self):
for func in self._on_send[:]:
func()
-
- def end_replace(self, s: str):
- s_encoded = s.encode("utf8")
- s_len = len(s_encoded)
-
- removed = self._data[-s_len:].decode("utf8")
- self._truncated = removed+self._truncated
- self._data = self._data[:-s_len]+s_encoded
diff --git a/src/IRCServer.py b/src/IRCServer.py
index 07116484..663f4703 100644
--- a/src/IRCServer.py
+++ b/src/IRCServer.py
@@ -29,7 +29,7 @@ class Server(IRCObject.Object):
self.agreed_capabilities = set([]) # type: typing.Set[str]
self.requested_capabilities = [] # type: typing.List[str]
self.server_capabilities = {} # type: typing.Dict[str, str]
- self.batches = {} # type: typing.Dict[str, utils.irc.IRCParsedLine]
+ self.batches = {} # type: typing.Dict[str, IRCLine.ParsedLine]
self.cap_started = False
self.users = {} # type: typing.Dict[str, IRCUser.User]
@@ -237,23 +237,24 @@ class Server(IRCObject.Object):
self.set_setting("last-read", utils.iso8601_format(now))
return lines
- def send(self, line_parsed: utils.irc.IRCParsedLine):
+ def send(self, line_parsed: IRCLine.ParsedLine):
line = line_parsed.format()
results = self.events.on("preprocess.send").call_unsafe(
server=self, line=line)
- for result in results:
- if result:
- line = result
- break
+ results = list(filter(None, results))
+ if results:
+ line = results[0]
+
line_stripped = line.split("\n", 1)[0].strip("\r")
- line_obj = IRCLine.Line(self, datetime.datetime.utcnow(), line_stripped)
+ line_obj = IRCLine.Line(datetime.datetime.utcnow(), self.hostmask(),
+ line_parsed)
self.socket.send(line_obj)
return line_obj
def _send(self):
lines = self.socket._send()
for line in lines:
- self.bot.log.debug("%s (raw send) | %s", [str(self), line])
+ self.bot.log.debug("%s (raw send) | %s", [str(self), line.format()])
self.events.on("raw.send").call_unsafe(server=self, line=line)
def send_user(self, username: str, realname: str) -> IRCLine.Line:
diff --git a/src/IRCSocket.py b/src/IRCSocket.py
index af1c3a5e..9ba677f4 100644
--- a/src/IRCSocket.py
+++ b/src/IRCSocket.py
@@ -120,15 +120,14 @@ class Socket(IRCObject.Object):
def send(self, line: IRCLine.Line):
self._queued_lines.append(line)
- def _send(self) -> typing.List[str]:
- decoded_sent = []
+ def _send(self) -> typing.List[IRCLine.ParsedLine]:
+ sent_lines = []
throttle_space = self.throttle_space()
if throttle_space:
to_buffer = self._queued_lines[:throttle_space]
self._queued_lines = self._queued_lines[throttle_space:]
for line in to_buffer:
- decoded_data = line.decoded_data()
- decoded_sent.append(decoded_data)
+ sent_lines.append(line.parsed_line)
self._write_buffer += line.data()
self._buffered_lines.append(line)
@@ -147,7 +146,7 @@ class Socket(IRCObject.Object):
self._recent_sends.append(now)
self.last_send = now
- return decoded_sent
+ return sent_lines
def waiting_send(self) -> bool:
return bool(len(self._write_buffer)) or bool(len(self._queued_lines))
diff --git a/src/utils/__init__.py b/src/utils/__init__.py
index 90f3e6ba..609b0eaa 100644
--- a/src/utils/__init__.py
+++ b/src/utils/__init__.py
@@ -199,16 +199,3 @@ def is_ip(s: str) -> bool:
except ValueError:
return False
return True
-
-def encode_truncate(s: str, encoding: str, byte_max: int
- ) -> typing.Tuple[bytes, str]:
- encoded = b""
- truncated = ""
- for i, character in enumerate(s):
- encoded_character = character.encode(encoding)
- if len(encoded + encoded_character) > byte_max:
- truncated = s[i:]
- break
- else:
- encoded += encoded_character
- return encoded, truncated
diff --git a/src/utils/irc/__init__.py b/src/utils/irc/__init__.py
index f39d476a..b38b6802 100644
--- a/src/utils/irc/__init__.py
+++ b/src/utils/irc/__init__.py
@@ -1,5 +1,5 @@
import json, string, re, typing
-from src import utils
+from src import IRCLine, utils
from . import protocol
ASCII_UPPER = string.ascii_uppercase
@@ -30,77 +30,10 @@ def lower(case_mapping: str, s: str) -> str:
def equals(case_mapping: str, s1: str, s2: str) -> bool:
return lower(case_mapping, s1) == lower(case_mapping, s2)
-class IRCHostmask(object):
- def __init__(self, nickname: str, username: str, hostname: str,
- hostmask: str):
- self.nickname = nickname
- self.username = username
- self.hostname = hostname
- self.hostmask = hostmask
- def __repr__(self):
- return "IRCHostmask(%s)" % self.__str__()
- def __str__(self):
- return self.hostmask
-
-def seperate_hostmask(hostmask: str) -> IRCHostmask:
+def seperate_hostmask(hostmask: str) -> IRCLine.Hostmask:
nickname, _, username = hostmask.partition("!")
username, _, hostname = username.partition("@")
- return IRCHostmask(nickname, username, hostname, hostmask)
-
-class IRCArgs(object):
- def __init__(self, args: typing.List[str]):
- self._args = args
-
- def get(self, index: int) -> typing.Optional[str]:
- if len(self._args) > index:
- return self._args[index]
- return None
-
- def __repr__(self):
- return "IRCArgs(%s)" % self._args
- def __len__(self) -> int:
- return len(self._args)
- def __getitem__(self, index) -> str:
- return self._args[index]
-
-def _tag_str(tags: typing.Dict[str, str]) -> str:
- tag_str = ""
- for tag, value in tags.items():
- if tag_str:
- tag_str += ","
- tag_str += tag
- if value:
- tag_str += "=%s" % value
- if tag_str:
- tag_str = "@%s" % tag_str
- return tag_str
-
-class IRCParsedLine(object):
- def __init__(self, command: str, args: typing.List[str],
- prefix: IRCHostmask=None,
- tags: typing.Dict[str, str]={}):
- self.command = command
- self._args = args
- self.args = IRCArgs(args)
- self.prefix = prefix
- self.tags = {} if tags == None else tags
-
- def format(self) -> str:
- s = ""
- if self.tags:
- s += "%s " % _tag_str(self.tags)
-
- if self.prefix:
- s += "%s " % self.prefix
-
- s += self.command.upper()
-
- if self.args:
- if len(self._args) > 1:
- s += " %s" % " ".join(self._args[:-1])
- s += " %s" % trailing(self._args[-1])
-
- return s
+ return IRCLine.Hostmask(nickname, username, hostname, hostmask)
MESSAGE_TAG_ESCAPED = [r"\:", r"\s", r"\\", r"\r", r"\n"]
MESSAGE_TAG_UNESCAPED = [";", " ", "\\", "\r", "\n"]
@@ -110,9 +43,9 @@ def message_tag_unescape(s):
unescaped = _multi_replace(s, MESSAGE_TAG_ESCAPED, MESSAGE_TAG_UNESCAPED)
return unescaped.replace("\\", "")
-def parse_line(line: str) -> IRCParsedLine:
+def parse_line(line: str) -> IRCLine.ParsedLine:
tags = {} # type: typing.Dict[str, typing.Any]
- prefix = None # type: typing.Optional[IRCHostmask]
+ prefix = None # type: typing.Optional[IRCLine.Hostmask]
command = None
if line[0] == "@":
@@ -144,7 +77,7 @@ def parse_line(line: str) -> IRCParsedLine:
if not trailing == None:
args.append(typing.cast(str, trailing))
- return IRCParsedLine(command, args, prefix, tags)
+ return IRCLine.ParsedLine(command, args, prefix, tags)
REGEX_COLOR = re.compile("%s(?:(\d{1,2})(?:,(\d{1,2}))?)?" % utils.consts.COLOR)
@@ -322,20 +255,14 @@ class IRCBatch(object):
self.id = identifier
self.type = batch_type
self.tags = tags
- self.lines = [] # type: typing.List[IRCParsedLine]
+ self.lines = [] # type: typing.List[IRCLine.ParsedLine]
class IRCRecvBatch(IRCBatch):
pass
class IRCSendBatch(IRCBatch):
- def _add_line(self, line: IRCParsedLine):
+ def _add_line(self, line: IRCLine.ParsedLine):
line.tags["batch"] = self.id
self.lines.append(line)
def message(self, target: str, message: str, tags: dict={}):
self._add_line(utils.irc.protocol.message(target, message, tags))
def notice(self, target: str, message: str, tags: dict={}):
self._add_line(utils.irc.protocol.notice(target, message, tags))
-
-def trailing(s: str) -> str:
- if s[0] == ":" or " " in s:
- return ":%s" % s
- else:
- return s
diff --git a/src/utils/irc/protocol.py b/src/utils/irc/protocol.py
index 2c57ad25..0ccf684f 100644
--- a/src/utils/irc/protocol.py
+++ b/src/utils/irc/protocol.py
@@ -1,90 +1,90 @@
import typing
-from src import utils
+from src import IRCLine, utils
-def user(username: str, realname: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("USER", [username, "0", "*", realname])
-def nick(nickname: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("NICK", [nickname])
+def user(username: str, realname: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("USER", [username, "0", "*", realname])
+def nick(nickname: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("NICK", [nickname])
-def capability_ls() -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("CAP", ["LS", "302"])
-def capability_request(capability: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("CAP", ["REQ", capability])
-def capability_end() -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("CAP", ["END"])
-def authenticate(text: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("AUTHENTICATE", [text])
+def capability_ls() -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("CAP", ["LS", "302"])
+def capability_request(capability: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("CAP", ["REQ", capability])
+def capability_end() -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("CAP", ["END"])
+def authenticate(text: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("AUTHENTICATE", [text])
-def password(password: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PASS", [password])
+def password(password: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PASS", [password])
-def ping(nonce: str="hello") -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PING", [nonce])
-def pong(nonce: str="hello") -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PONG", [nonce])
+def ping(nonce: str="hello") -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PING", [nonce])
+def pong(nonce: str="hello") -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PONG", [nonce])
def join(channel_name: str, keys: typing.List[str]=None
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("JOIN", [channel_name]+(
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("JOIN", [channel_name]+(
keys if keys else []))
-def part(channel_name: str, reason: str=None) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PART", [channel_name]+(
+def part(channel_name: str, reason: str=None) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PART", [channel_name]+(
[reason] if reason else []))
-def quit(reason: str=None) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("QUIT", [reason] if reason else [])
+def quit(reason: str=None) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("QUIT", [reason] if reason else [])
def message(target: str, message: str, tags: typing.Dict[str, str]={}
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PRIVMSG", [target, message], tags=tags)
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PRIVMSG", [target, message], tags=tags)
def notice(target: str, message: str, tags: typing.Dict[str, str]={}
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("NOTICE", [target, message], tags=tags)
-def tagmsg(target, tags: dict) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("TAGMSG", [target], tags=tags)
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("NOTICE", [target, message], tags=tags)
+def tagmsg(target, tags: dict) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("TAGMSG", [target], tags=tags)
def mode(target: str, mode: str=None, args: typing.List[str]=None
- ) -> 'utils.irc.IRCParsedLine':
+ ) -> IRCLine.ParsedLine:
command_args = [target]
if mode:
command_args.append(mode)
if args:
command_args = command_args+args
- return utils.irc.IRCParsedLine("MODE", command_args)
+ return IRCLine.ParsedLine("MODE", command_args)
-def topic(channel_name: str, topic: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("TOPIC", [channel_name, topic])
+def topic(channel_name: str, topic: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("TOPIC", [channel_name, topic])
def kick(channel_name: str, target: str, reason: str=None
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("KICK", [channel_name, target]+(
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("KICK", [channel_name, target]+(
[reason] if reason else []))
-def names(channel_name: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("NAMES", [channel_name])
-def list(search_for: str=None) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("LIST", [search_for] if search_for else [])
-def invite(target: str, channel_name: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("INVITE", [target, channel_name])
+def names(channel_name: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("NAMES", [channel_name])
+def list(search_for: str=None) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("LIST", [search_for] if search_for else [])
+def invite(target: str, channel_name: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("INVITE", [target, channel_name])
-def whois(target: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("WHOIS", [target])
+def whois(target: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("WHOIS", [target])
def whowas(target: str, amount: int=None, server: str=None
- ) -> 'utils.irc.IRCParsedLine':
+ ) -> IRCLine.ParsedLine:
command_args = [target]
if amount:
command_args.append(str(amount))
if server:
command_args.append(server)
- return utils.irc.IRCParsedLine("WHOWAS", command_args)
-def who(filter: str=None) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("WHO", [filter] if filter else [])
+ return IRCLine.ParsedLine("WHOWAS", command_args)
+def who(filter: str=None) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("WHO", [filter] if filter else [])
def whox(mask: str, filter: str, fields: str, label: str=None
- ) -> 'utils.irc.IRCParsedLine':
+ ) -> IRCLine.ParsedLine:
flags = "%s%%%s%s" % (filter, fields, ","+label if label else "")
- return utils.irc.IRCParsedLine("WHO", [mask, flags])
+ return IRCLine.ParsedLine("WHO", [mask, flags])
def batch_start(identifier: str, batch_type: str, tags: typing.Dict[str, str]={}
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("BATCH", ["+%s" % identifier, batch_type],
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("BATCH", ["+%s" % identifier, batch_type],
tags=tags)
def batch_end(identifier: str, tags: typing.Dict[str, str]={}):
- return utils.irc.IRCParsedLine("BATCH", ["-%s" % identifier], tags=tags)
+ return IRCLine.ParsedLine("BATCH", ["-%s" % identifier], tags=tags)