diff options
| author | 2019-02-23 21:33:04 +0000 | |
|---|---|---|
| committer | 2019-02-23 21:33:04 +0000 | |
| commit | 8c94bcf6caf0ae88b3a67d0a73389a7e60810e1c (patch) | |
| tree | 8d72b0760aef71da12403696e9acd387d288e4e0 /src/utils | |
| parent | !raw needs to parse the line it's given in to an IRCParsedLine now (diff) | |
| signature | ||
Move utils.irc.IRCParsedLine to IRCLine.ParsedLine, improve truncation
mechanism, don't convert sent line from ParsedLine to text to ParsedLine for
line_handler handling
Diffstat (limited to 'src/utils')
| -rw-r--r-- | src/utils/__init__.py | 13 | ||||
| -rw-r--r-- | src/utils/irc/__init__.py | 89 | ||||
| -rw-r--r-- | src/utils/irc/protocol.py | 108 |
3 files changed, 62 insertions, 148 deletions
diff --git a/src/utils/__init__.py b/src/utils/__init__.py index 90f3e6ba..609b0eaa 100644 --- a/src/utils/__init__.py +++ b/src/utils/__init__.py @@ -199,16 +199,3 @@ def is_ip(s: str) -> bool: except ValueError: return False return True - -def encode_truncate(s: str, encoding: str, byte_max: int - ) -> typing.Tuple[bytes, str]: - encoded = b"" - truncated = "" - for i, character in enumerate(s): - encoded_character = character.encode(encoding) - if len(encoded + encoded_character) > byte_max: - truncated = s[i:] - break - else: - encoded += encoded_character - return encoded, truncated diff --git a/src/utils/irc/__init__.py b/src/utils/irc/__init__.py index f39d476a..b38b6802 100644 --- a/src/utils/irc/__init__.py +++ b/src/utils/irc/__init__.py @@ -1,5 +1,5 @@ import json, string, re, typing -from src import utils +from src import IRCLine, utils from . import protocol ASCII_UPPER = string.ascii_uppercase @@ -30,77 +30,10 @@ def lower(case_mapping: str, s: str) -> str: def equals(case_mapping: str, s1: str, s2: str) -> bool: return lower(case_mapping, s1) == lower(case_mapping, s2) -class IRCHostmask(object): - def __init__(self, nickname: str, username: str, hostname: str, - hostmask: str): - self.nickname = nickname - self.username = username - self.hostname = hostname - self.hostmask = hostmask - def __repr__(self): - return "IRCHostmask(%s)" % self.__str__() - def __str__(self): - return self.hostmask - -def seperate_hostmask(hostmask: str) -> IRCHostmask: +def seperate_hostmask(hostmask: str) -> IRCLine.Hostmask: nickname, _, username = hostmask.partition("!") username, _, hostname = username.partition("@") - return IRCHostmask(nickname, username, hostname, hostmask) - -class IRCArgs(object): - def __init__(self, args: typing.List[str]): - self._args = args - - def get(self, index: int) -> typing.Optional[str]: - if len(self._args) > index: - return self._args[index] - return None - - def __repr__(self): - return "IRCArgs(%s)" % self._args - def __len__(self) -> int: - return len(self._args) - def __getitem__(self, index) -> str: - return self._args[index] - -def _tag_str(tags: typing.Dict[str, str]) -> str: - tag_str = "" - for tag, value in tags.items(): - if tag_str: - tag_str += "," - tag_str += tag - if value: - tag_str += "=%s" % value - if tag_str: - tag_str = "@%s" % tag_str - return tag_str - -class IRCParsedLine(object): - def __init__(self, command: str, args: typing.List[str], - prefix: IRCHostmask=None, - tags: typing.Dict[str, str]={}): - self.command = command - self._args = args - self.args = IRCArgs(args) - self.prefix = prefix - self.tags = {} if tags == None else tags - - def format(self) -> str: - s = "" - if self.tags: - s += "%s " % _tag_str(self.tags) - - if self.prefix: - s += "%s " % self.prefix - - s += self.command.upper() - - if self.args: - if len(self._args) > 1: - s += " %s" % " ".join(self._args[:-1]) - s += " %s" % trailing(self._args[-1]) - - return s + return IRCLine.Hostmask(nickname, username, hostname, hostmask) MESSAGE_TAG_ESCAPED = [r"\:", r"\s", r"\\", r"\r", r"\n"] MESSAGE_TAG_UNESCAPED = [";", " ", "\\", "\r", "\n"] @@ -110,9 +43,9 @@ def message_tag_unescape(s): unescaped = _multi_replace(s, MESSAGE_TAG_ESCAPED, MESSAGE_TAG_UNESCAPED) return unescaped.replace("\\", "") -def parse_line(line: str) -> IRCParsedLine: +def parse_line(line: str) -> IRCLine.ParsedLine: tags = {} # type: typing.Dict[str, typing.Any] - prefix = None # type: typing.Optional[IRCHostmask] + prefix = None # type: typing.Optional[IRCLine.Hostmask] command = None if line[0] == "@": @@ -144,7 +77,7 @@ def parse_line(line: str) -> IRCParsedLine: if not trailing == None: args.append(typing.cast(str, trailing)) - return IRCParsedLine(command, args, prefix, tags) + return IRCLine.ParsedLine(command, args, prefix, tags) REGEX_COLOR = re.compile("%s(?:(\d{1,2})(?:,(\d{1,2}))?)?" % utils.consts.COLOR) @@ -322,20 +255,14 @@ class IRCBatch(object): self.id = identifier self.type = batch_type self.tags = tags - self.lines = [] # type: typing.List[IRCParsedLine] + self.lines = [] # type: typing.List[IRCLine.ParsedLine] class IRCRecvBatch(IRCBatch): pass class IRCSendBatch(IRCBatch): - def _add_line(self, line: IRCParsedLine): + def _add_line(self, line: IRCLine.ParsedLine): line.tags["batch"] = self.id self.lines.append(line) def message(self, target: str, message: str, tags: dict={}): self._add_line(utils.irc.protocol.message(target, message, tags)) def notice(self, target: str, message: str, tags: dict={}): self._add_line(utils.irc.protocol.notice(target, message, tags)) - -def trailing(s: str) -> str: - if s[0] == ":" or " " in s: - return ":%s" % s - else: - return s diff --git a/src/utils/irc/protocol.py b/src/utils/irc/protocol.py index 2c57ad25..0ccf684f 100644 --- a/src/utils/irc/protocol.py +++ b/src/utils/irc/protocol.py @@ -1,90 +1,90 @@ import typing -from src import utils +from src import IRCLine, utils -def user(username: str, realname: str) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("USER", [username, "0", "*", realname]) -def nick(nickname: str) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("NICK", [nickname]) +def user(username: str, realname: str) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("USER", [username, "0", "*", realname]) +def nick(nickname: str) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("NICK", [nickname]) -def capability_ls() -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("CAP", ["LS", "302"]) -def capability_request(capability: str) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("CAP", ["REQ", capability]) -def capability_end() -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("CAP", ["END"]) -def authenticate(text: str) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("AUTHENTICATE", [text]) +def capability_ls() -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("CAP", ["LS", "302"]) +def capability_request(capability: str) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("CAP", ["REQ", capability]) +def capability_end() -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("CAP", ["END"]) +def authenticate(text: str) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("AUTHENTICATE", [text]) -def password(password: str) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("PASS", [password]) +def password(password: str) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("PASS", [password]) -def ping(nonce: str="hello") -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("PING", [nonce]) -def pong(nonce: str="hello") -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("PONG", [nonce]) +def ping(nonce: str="hello") -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("PING", [nonce]) +def pong(nonce: str="hello") -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("PONG", [nonce]) def join(channel_name: str, keys: typing.List[str]=None - ) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("JOIN", [channel_name]+( + ) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("JOIN", [channel_name]+( keys if keys else [])) -def part(channel_name: str, reason: str=None) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("PART", [channel_name]+( +def part(channel_name: str, reason: str=None) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("PART", [channel_name]+( [reason] if reason else [])) -def quit(reason: str=None) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("QUIT", [reason] if reason else []) +def quit(reason: str=None) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("QUIT", [reason] if reason else []) def message(target: str, message: str, tags: typing.Dict[str, str]={} - ) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("PRIVMSG", [target, message], tags=tags) + ) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("PRIVMSG", [target, message], tags=tags) def notice(target: str, message: str, tags: typing.Dict[str, str]={} - ) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("NOTICE", [target, message], tags=tags) -def tagmsg(target, tags: dict) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("TAGMSG", [target], tags=tags) + ) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("NOTICE", [target, message], tags=tags) +def tagmsg(target, tags: dict) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("TAGMSG", [target], tags=tags) def mode(target: str, mode: str=None, args: typing.List[str]=None - ) -> 'utils.irc.IRCParsedLine': + ) -> IRCLine.ParsedLine: command_args = [target] if mode: command_args.append(mode) if args: command_args = command_args+args - return utils.irc.IRCParsedLine("MODE", command_args) + return IRCLine.ParsedLine("MODE", command_args) -def topic(channel_name: str, topic: str) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("TOPIC", [channel_name, topic]) +def topic(channel_name: str, topic: str) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("TOPIC", [channel_name, topic]) def kick(channel_name: str, target: str, reason: str=None - ) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("KICK", [channel_name, target]+( + ) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("KICK", [channel_name, target]+( [reason] if reason else [])) -def names(channel_name: str) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("NAMES", [channel_name]) -def list(search_for: str=None) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("LIST", [search_for] if search_for else []) -def invite(target: str, channel_name: str) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("INVITE", [target, channel_name]) +def names(channel_name: str) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("NAMES", [channel_name]) +def list(search_for: str=None) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("LIST", [search_for] if search_for else []) +def invite(target: str, channel_name: str) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("INVITE", [target, channel_name]) -def whois(target: str) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("WHOIS", [target]) +def whois(target: str) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("WHOIS", [target]) def whowas(target: str, amount: int=None, server: str=None - ) -> 'utils.irc.IRCParsedLine': + ) -> IRCLine.ParsedLine: command_args = [target] if amount: command_args.append(str(amount)) if server: command_args.append(server) - return utils.irc.IRCParsedLine("WHOWAS", command_args) -def who(filter: str=None) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("WHO", [filter] if filter else []) + return IRCLine.ParsedLine("WHOWAS", command_args) +def who(filter: str=None) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("WHO", [filter] if filter else []) def whox(mask: str, filter: str, fields: str, label: str=None - ) -> 'utils.irc.IRCParsedLine': + ) -> IRCLine.ParsedLine: flags = "%s%%%s%s" % (filter, fields, ","+label if label else "") - return utils.irc.IRCParsedLine("WHO", [mask, flags]) + return IRCLine.ParsedLine("WHO", [mask, flags]) def batch_start(identifier: str, batch_type: str, tags: typing.Dict[str, str]={} - ) -> 'utils.irc.IRCParsedLine': - return utils.irc.IRCParsedLine("BATCH", ["+%s" % identifier, batch_type], + ) -> IRCLine.ParsedLine: + return IRCLine.ParsedLine("BATCH", ["+%s" % identifier, batch_type], tags=tags) def batch_end(identifier: str, tags: typing.Dict[str, str]={}): - return utils.irc.IRCParsedLine("BATCH", ["-%s" % identifier], tags=tags) + return IRCLine.ParsedLine("BATCH", ["-%s" % identifier], tags=tags) |
