aboutsummaryrefslogtreecommitdiff
path: root/src/utils
diff options
context:
space:
mode:
authorGravatar jesopo2019-02-23 21:33:04 +0000
committerGravatar jesopo2019-02-23 21:33:04 +0000
commit8c94bcf6caf0ae88b3a67d0a73389a7e60810e1c (patch)
tree8d72b0760aef71da12403696e9acd387d288e4e0 /src/utils
parent!raw needs to parse the line it's given in to an IRCParsedLine now (diff)
signature
Move utils.irc.IRCParsedLine to IRCLine.ParsedLine, improve truncation
mechanism, don't convert sent line from ParsedLine to text to ParsedLine for line_handler handling
Diffstat (limited to 'src/utils')
-rw-r--r--src/utils/__init__.py13
-rw-r--r--src/utils/irc/__init__.py89
-rw-r--r--src/utils/irc/protocol.py108
3 files changed, 62 insertions, 148 deletions
diff --git a/src/utils/__init__.py b/src/utils/__init__.py
index 90f3e6ba..609b0eaa 100644
--- a/src/utils/__init__.py
+++ b/src/utils/__init__.py
@@ -199,16 +199,3 @@ def is_ip(s: str) -> bool:
except ValueError:
return False
return True
-
-def encode_truncate(s: str, encoding: str, byte_max: int
- ) -> typing.Tuple[bytes, str]:
- encoded = b""
- truncated = ""
- for i, character in enumerate(s):
- encoded_character = character.encode(encoding)
- if len(encoded + encoded_character) > byte_max:
- truncated = s[i:]
- break
- else:
- encoded += encoded_character
- return encoded, truncated
diff --git a/src/utils/irc/__init__.py b/src/utils/irc/__init__.py
index f39d476a..b38b6802 100644
--- a/src/utils/irc/__init__.py
+++ b/src/utils/irc/__init__.py
@@ -1,5 +1,5 @@
import json, string, re, typing
-from src import utils
+from src import IRCLine, utils
from . import protocol
ASCII_UPPER = string.ascii_uppercase
@@ -30,77 +30,10 @@ def lower(case_mapping: str, s: str) -> str:
def equals(case_mapping: str, s1: str, s2: str) -> bool:
return lower(case_mapping, s1) == lower(case_mapping, s2)
-class IRCHostmask(object):
- def __init__(self, nickname: str, username: str, hostname: str,
- hostmask: str):
- self.nickname = nickname
- self.username = username
- self.hostname = hostname
- self.hostmask = hostmask
- def __repr__(self):
- return "IRCHostmask(%s)" % self.__str__()
- def __str__(self):
- return self.hostmask
-
-def seperate_hostmask(hostmask: str) -> IRCHostmask:
+def seperate_hostmask(hostmask: str) -> IRCLine.Hostmask:
nickname, _, username = hostmask.partition("!")
username, _, hostname = username.partition("@")
- return IRCHostmask(nickname, username, hostname, hostmask)
-
-class IRCArgs(object):
- def __init__(self, args: typing.List[str]):
- self._args = args
-
- def get(self, index: int) -> typing.Optional[str]:
- if len(self._args) > index:
- return self._args[index]
- return None
-
- def __repr__(self):
- return "IRCArgs(%s)" % self._args
- def __len__(self) -> int:
- return len(self._args)
- def __getitem__(self, index) -> str:
- return self._args[index]
-
-def _tag_str(tags: typing.Dict[str, str]) -> str:
- tag_str = ""
- for tag, value in tags.items():
- if tag_str:
- tag_str += ","
- tag_str += tag
- if value:
- tag_str += "=%s" % value
- if tag_str:
- tag_str = "@%s" % tag_str
- return tag_str
-
-class IRCParsedLine(object):
- def __init__(self, command: str, args: typing.List[str],
- prefix: IRCHostmask=None,
- tags: typing.Dict[str, str]={}):
- self.command = command
- self._args = args
- self.args = IRCArgs(args)
- self.prefix = prefix
- self.tags = {} if tags == None else tags
-
- def format(self) -> str:
- s = ""
- if self.tags:
- s += "%s " % _tag_str(self.tags)
-
- if self.prefix:
- s += "%s " % self.prefix
-
- s += self.command.upper()
-
- if self.args:
- if len(self._args) > 1:
- s += " %s" % " ".join(self._args[:-1])
- s += " %s" % trailing(self._args[-1])
-
- return s
+ return IRCLine.Hostmask(nickname, username, hostname, hostmask)
MESSAGE_TAG_ESCAPED = [r"\:", r"\s", r"\\", r"\r", r"\n"]
MESSAGE_TAG_UNESCAPED = [";", " ", "\\", "\r", "\n"]
@@ -110,9 +43,9 @@ def message_tag_unescape(s):
unescaped = _multi_replace(s, MESSAGE_TAG_ESCAPED, MESSAGE_TAG_UNESCAPED)
return unescaped.replace("\\", "")
-def parse_line(line: str) -> IRCParsedLine:
+def parse_line(line: str) -> IRCLine.ParsedLine:
tags = {} # type: typing.Dict[str, typing.Any]
- prefix = None # type: typing.Optional[IRCHostmask]
+ prefix = None # type: typing.Optional[IRCLine.Hostmask]
command = None
if line[0] == "@":
@@ -144,7 +77,7 @@ def parse_line(line: str) -> IRCParsedLine:
if not trailing == None:
args.append(typing.cast(str, trailing))
- return IRCParsedLine(command, args, prefix, tags)
+ return IRCLine.ParsedLine(command, args, prefix, tags)
REGEX_COLOR = re.compile("%s(?:(\d{1,2})(?:,(\d{1,2}))?)?" % utils.consts.COLOR)
@@ -322,20 +255,14 @@ class IRCBatch(object):
self.id = identifier
self.type = batch_type
self.tags = tags
- self.lines = [] # type: typing.List[IRCParsedLine]
+ self.lines = [] # type: typing.List[IRCLine.ParsedLine]
class IRCRecvBatch(IRCBatch):
pass
class IRCSendBatch(IRCBatch):
- def _add_line(self, line: IRCParsedLine):
+ def _add_line(self, line: IRCLine.ParsedLine):
line.tags["batch"] = self.id
self.lines.append(line)
def message(self, target: str, message: str, tags: dict={}):
self._add_line(utils.irc.protocol.message(target, message, tags))
def notice(self, target: str, message: str, tags: dict={}):
self._add_line(utils.irc.protocol.notice(target, message, tags))
-
-def trailing(s: str) -> str:
- if s[0] == ":" or " " in s:
- return ":%s" % s
- else:
- return s
diff --git a/src/utils/irc/protocol.py b/src/utils/irc/protocol.py
index 2c57ad25..0ccf684f 100644
--- a/src/utils/irc/protocol.py
+++ b/src/utils/irc/protocol.py
@@ -1,90 +1,90 @@
import typing
-from src import utils
+from src import IRCLine, utils
-def user(username: str, realname: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("USER", [username, "0", "*", realname])
-def nick(nickname: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("NICK", [nickname])
+def user(username: str, realname: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("USER", [username, "0", "*", realname])
+def nick(nickname: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("NICK", [nickname])
-def capability_ls() -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("CAP", ["LS", "302"])
-def capability_request(capability: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("CAP", ["REQ", capability])
-def capability_end() -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("CAP", ["END"])
-def authenticate(text: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("AUTHENTICATE", [text])
+def capability_ls() -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("CAP", ["LS", "302"])
+def capability_request(capability: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("CAP", ["REQ", capability])
+def capability_end() -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("CAP", ["END"])
+def authenticate(text: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("AUTHENTICATE", [text])
-def password(password: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PASS", [password])
+def password(password: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PASS", [password])
-def ping(nonce: str="hello") -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PING", [nonce])
-def pong(nonce: str="hello") -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PONG", [nonce])
+def ping(nonce: str="hello") -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PING", [nonce])
+def pong(nonce: str="hello") -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PONG", [nonce])
def join(channel_name: str, keys: typing.List[str]=None
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("JOIN", [channel_name]+(
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("JOIN", [channel_name]+(
keys if keys else []))
-def part(channel_name: str, reason: str=None) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PART", [channel_name]+(
+def part(channel_name: str, reason: str=None) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PART", [channel_name]+(
[reason] if reason else []))
-def quit(reason: str=None) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("QUIT", [reason] if reason else [])
+def quit(reason: str=None) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("QUIT", [reason] if reason else [])
def message(target: str, message: str, tags: typing.Dict[str, str]={}
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PRIVMSG", [target, message], tags=tags)
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PRIVMSG", [target, message], tags=tags)
def notice(target: str, message: str, tags: typing.Dict[str, str]={}
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("NOTICE", [target, message], tags=tags)
-def tagmsg(target, tags: dict) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("TAGMSG", [target], tags=tags)
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("NOTICE", [target, message], tags=tags)
+def tagmsg(target, tags: dict) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("TAGMSG", [target], tags=tags)
def mode(target: str, mode: str=None, args: typing.List[str]=None
- ) -> 'utils.irc.IRCParsedLine':
+ ) -> IRCLine.ParsedLine:
command_args = [target]
if mode:
command_args.append(mode)
if args:
command_args = command_args+args
- return utils.irc.IRCParsedLine("MODE", command_args)
+ return IRCLine.ParsedLine("MODE", command_args)
-def topic(channel_name: str, topic: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("TOPIC", [channel_name, topic])
+def topic(channel_name: str, topic: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("TOPIC", [channel_name, topic])
def kick(channel_name: str, target: str, reason: str=None
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("KICK", [channel_name, target]+(
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("KICK", [channel_name, target]+(
[reason] if reason else []))
-def names(channel_name: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("NAMES", [channel_name])
-def list(search_for: str=None) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("LIST", [search_for] if search_for else [])
-def invite(target: str, channel_name: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("INVITE", [target, channel_name])
+def names(channel_name: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("NAMES", [channel_name])
+def list(search_for: str=None) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("LIST", [search_for] if search_for else [])
+def invite(target: str, channel_name: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("INVITE", [target, channel_name])
-def whois(target: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("WHOIS", [target])
+def whois(target: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("WHOIS", [target])
def whowas(target: str, amount: int=None, server: str=None
- ) -> 'utils.irc.IRCParsedLine':
+ ) -> IRCLine.ParsedLine:
command_args = [target]
if amount:
command_args.append(str(amount))
if server:
command_args.append(server)
- return utils.irc.IRCParsedLine("WHOWAS", command_args)
-def who(filter: str=None) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("WHO", [filter] if filter else [])
+ return IRCLine.ParsedLine("WHOWAS", command_args)
+def who(filter: str=None) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("WHO", [filter] if filter else [])
def whox(mask: str, filter: str, fields: str, label: str=None
- ) -> 'utils.irc.IRCParsedLine':
+ ) -> IRCLine.ParsedLine:
flags = "%s%%%s%s" % (filter, fields, ","+label if label else "")
- return utils.irc.IRCParsedLine("WHO", [mask, flags])
+ return IRCLine.ParsedLine("WHO", [mask, flags])
def batch_start(identifier: str, batch_type: str, tags: typing.Dict[str, str]={}
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("BATCH", ["+%s" % identifier, batch_type],
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("BATCH", ["+%s" % identifier, batch_type],
tags=tags)
def batch_end(identifier: str, tags: typing.Dict[str, str]={}):
- return utils.irc.IRCParsedLine("BATCH", ["-%s" % identifier], tags=tags)
+ return IRCLine.ParsedLine("BATCH", ["-%s" % identifier], tags=tags)