aboutsummaryrefslogtreecommitdiff
path: root/src/utils/irc
diff options
context:
space:
mode:
Diffstat (limited to 'src/utils/irc')
-rw-r--r--src/utils/irc/__init__.py89
-rw-r--r--src/utils/irc/protocol.py108
2 files changed, 62 insertions, 135 deletions
diff --git a/src/utils/irc/__init__.py b/src/utils/irc/__init__.py
index f39d476a..b38b6802 100644
--- a/src/utils/irc/__init__.py
+++ b/src/utils/irc/__init__.py
@@ -1,5 +1,5 @@
import json, string, re, typing
-from src import utils
+from src import IRCLine, utils
from . import protocol
ASCII_UPPER = string.ascii_uppercase
@@ -30,77 +30,10 @@ def lower(case_mapping: str, s: str) -> str:
def equals(case_mapping: str, s1: str, s2: str) -> bool:
return lower(case_mapping, s1) == lower(case_mapping, s2)
-class IRCHostmask(object):
- def __init__(self, nickname: str, username: str, hostname: str,
- hostmask: str):
- self.nickname = nickname
- self.username = username
- self.hostname = hostname
- self.hostmask = hostmask
- def __repr__(self):
- return "IRCHostmask(%s)" % self.__str__()
- def __str__(self):
- return self.hostmask
-
-def seperate_hostmask(hostmask: str) -> IRCHostmask:
+def seperate_hostmask(hostmask: str) -> IRCLine.Hostmask:
nickname, _, username = hostmask.partition("!")
username, _, hostname = username.partition("@")
- return IRCHostmask(nickname, username, hostname, hostmask)
-
-class IRCArgs(object):
- def __init__(self, args: typing.List[str]):
- self._args = args
-
- def get(self, index: int) -> typing.Optional[str]:
- if len(self._args) > index:
- return self._args[index]
- return None
-
- def __repr__(self):
- return "IRCArgs(%s)" % self._args
- def __len__(self) -> int:
- return len(self._args)
- def __getitem__(self, index) -> str:
- return self._args[index]
-
-def _tag_str(tags: typing.Dict[str, str]) -> str:
- tag_str = ""
- for tag, value in tags.items():
- if tag_str:
- tag_str += ","
- tag_str += tag
- if value:
- tag_str += "=%s" % value
- if tag_str:
- tag_str = "@%s" % tag_str
- return tag_str
-
-class IRCParsedLine(object):
- def __init__(self, command: str, args: typing.List[str],
- prefix: IRCHostmask=None,
- tags: typing.Dict[str, str]={}):
- self.command = command
- self._args = args
- self.args = IRCArgs(args)
- self.prefix = prefix
- self.tags = {} if tags == None else tags
-
- def format(self) -> str:
- s = ""
- if self.tags:
- s += "%s " % _tag_str(self.tags)
-
- if self.prefix:
- s += "%s " % self.prefix
-
- s += self.command.upper()
-
- if self.args:
- if len(self._args) > 1:
- s += " %s" % " ".join(self._args[:-1])
- s += " %s" % trailing(self._args[-1])
-
- return s
+ return IRCLine.Hostmask(nickname, username, hostname, hostmask)
MESSAGE_TAG_ESCAPED = [r"\:", r"\s", r"\\", r"\r", r"\n"]
MESSAGE_TAG_UNESCAPED = [";", " ", "\\", "\r", "\n"]
@@ -110,9 +43,9 @@ def message_tag_unescape(s):
unescaped = _multi_replace(s, MESSAGE_TAG_ESCAPED, MESSAGE_TAG_UNESCAPED)
return unescaped.replace("\\", "")
-def parse_line(line: str) -> IRCParsedLine:
+def parse_line(line: str) -> IRCLine.ParsedLine:
tags = {} # type: typing.Dict[str, typing.Any]
- prefix = None # type: typing.Optional[IRCHostmask]
+ prefix = None # type: typing.Optional[IRCLine.Hostmask]
command = None
if line[0] == "@":
@@ -144,7 +77,7 @@ def parse_line(line: str) -> IRCParsedLine:
if not trailing == None:
args.append(typing.cast(str, trailing))
- return IRCParsedLine(command, args, prefix, tags)
+ return IRCLine.ParsedLine(command, args, prefix, tags)
REGEX_COLOR = re.compile("%s(?:(\d{1,2})(?:,(\d{1,2}))?)?" % utils.consts.COLOR)
@@ -322,20 +255,14 @@ class IRCBatch(object):
self.id = identifier
self.type = batch_type
self.tags = tags
- self.lines = [] # type: typing.List[IRCParsedLine]
+ self.lines = [] # type: typing.List[IRCLine.ParsedLine]
class IRCRecvBatch(IRCBatch):
pass
class IRCSendBatch(IRCBatch):
- def _add_line(self, line: IRCParsedLine):
+ def _add_line(self, line: IRCLine.ParsedLine):
line.tags["batch"] = self.id
self.lines.append(line)
def message(self, target: str, message: str, tags: dict={}):
self._add_line(utils.irc.protocol.message(target, message, tags))
def notice(self, target: str, message: str, tags: dict={}):
self._add_line(utils.irc.protocol.notice(target, message, tags))
-
-def trailing(s: str) -> str:
- if s[0] == ":" or " " in s:
- return ":%s" % s
- else:
- return s
diff --git a/src/utils/irc/protocol.py b/src/utils/irc/protocol.py
index 2c57ad25..0ccf684f 100644
--- a/src/utils/irc/protocol.py
+++ b/src/utils/irc/protocol.py
@@ -1,90 +1,90 @@
import typing
-from src import utils
+from src import IRCLine, utils
-def user(username: str, realname: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("USER", [username, "0", "*", realname])
-def nick(nickname: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("NICK", [nickname])
+def user(username: str, realname: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("USER", [username, "0", "*", realname])
+def nick(nickname: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("NICK", [nickname])
-def capability_ls() -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("CAP", ["LS", "302"])
-def capability_request(capability: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("CAP", ["REQ", capability])
-def capability_end() -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("CAP", ["END"])
-def authenticate(text: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("AUTHENTICATE", [text])
+def capability_ls() -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("CAP", ["LS", "302"])
+def capability_request(capability: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("CAP", ["REQ", capability])
+def capability_end() -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("CAP", ["END"])
+def authenticate(text: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("AUTHENTICATE", [text])
-def password(password: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PASS", [password])
+def password(password: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PASS", [password])
-def ping(nonce: str="hello") -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PING", [nonce])
-def pong(nonce: str="hello") -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PONG", [nonce])
+def ping(nonce: str="hello") -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PING", [nonce])
+def pong(nonce: str="hello") -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PONG", [nonce])
def join(channel_name: str, keys: typing.List[str]=None
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("JOIN", [channel_name]+(
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("JOIN", [channel_name]+(
keys if keys else []))
-def part(channel_name: str, reason: str=None) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PART", [channel_name]+(
+def part(channel_name: str, reason: str=None) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PART", [channel_name]+(
[reason] if reason else []))
-def quit(reason: str=None) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("QUIT", [reason] if reason else [])
+def quit(reason: str=None) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("QUIT", [reason] if reason else [])
def message(target: str, message: str, tags: typing.Dict[str, str]={}
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("PRIVMSG", [target, message], tags=tags)
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("PRIVMSG", [target, message], tags=tags)
def notice(target: str, message: str, tags: typing.Dict[str, str]={}
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("NOTICE", [target, message], tags=tags)
-def tagmsg(target, tags: dict) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("TAGMSG", [target], tags=tags)
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("NOTICE", [target, message], tags=tags)
+def tagmsg(target, tags: dict) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("TAGMSG", [target], tags=tags)
def mode(target: str, mode: str=None, args: typing.List[str]=None
- ) -> 'utils.irc.IRCParsedLine':
+ ) -> IRCLine.ParsedLine:
command_args = [target]
if mode:
command_args.append(mode)
if args:
command_args = command_args+args
- return utils.irc.IRCParsedLine("MODE", command_args)
+ return IRCLine.ParsedLine("MODE", command_args)
-def topic(channel_name: str, topic: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("TOPIC", [channel_name, topic])
+def topic(channel_name: str, topic: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("TOPIC", [channel_name, topic])
def kick(channel_name: str, target: str, reason: str=None
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("KICK", [channel_name, target]+(
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("KICK", [channel_name, target]+(
[reason] if reason else []))
-def names(channel_name: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("NAMES", [channel_name])
-def list(search_for: str=None) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("LIST", [search_for] if search_for else [])
-def invite(target: str, channel_name: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("INVITE", [target, channel_name])
+def names(channel_name: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("NAMES", [channel_name])
+def list(search_for: str=None) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("LIST", [search_for] if search_for else [])
+def invite(target: str, channel_name: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("INVITE", [target, channel_name])
-def whois(target: str) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("WHOIS", [target])
+def whois(target: str) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("WHOIS", [target])
def whowas(target: str, amount: int=None, server: str=None
- ) -> 'utils.irc.IRCParsedLine':
+ ) -> IRCLine.ParsedLine:
command_args = [target]
if amount:
command_args.append(str(amount))
if server:
command_args.append(server)
- return utils.irc.IRCParsedLine("WHOWAS", command_args)
-def who(filter: str=None) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("WHO", [filter] if filter else [])
+ return IRCLine.ParsedLine("WHOWAS", command_args)
+def who(filter: str=None) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("WHO", [filter] if filter else [])
def whox(mask: str, filter: str, fields: str, label: str=None
- ) -> 'utils.irc.IRCParsedLine':
+ ) -> IRCLine.ParsedLine:
flags = "%s%%%s%s" % (filter, fields, ","+label if label else "")
- return utils.irc.IRCParsedLine("WHO", [mask, flags])
+ return IRCLine.ParsedLine("WHO", [mask, flags])
def batch_start(identifier: str, batch_type: str, tags: typing.Dict[str, str]={}
- ) -> 'utils.irc.IRCParsedLine':
- return utils.irc.IRCParsedLine("BATCH", ["+%s" % identifier, batch_type],
+ ) -> IRCLine.ParsedLine:
+ return IRCLine.ParsedLine("BATCH", ["+%s" % identifier, batch_type],
tags=tags)
def batch_end(identifier: str, tags: typing.Dict[str, str]={}):
- return utils.irc.IRCParsedLine("BATCH", ["-%s" % identifier], tags=tags)
+ return IRCLine.ParsedLine("BATCH", ["-%s" % identifier], tags=tags)