aboutsummaryrefslogtreecommitdiff
import codecs, datetime, typing, uuid
from src import EventManager, IRCObject, utils

LINE_MAX = 510

class IRCArgs(object):
    def __init__(self, args: typing.List[str]):
        self._args = args

    def get(self, index: int) -> typing.Optional[str]:
        if index < 0:
            if len(self._args) > (abs(index)-1):
                return self._args[index]
        elif len(self._args) > index:
            return self._args[index]
        return None

    def __repr__(self):
        return "IRCArgs(%s)" % self._args
    def __len__(self) -> int:
        return len(self._args)
    def __getitem__(self, index: int) -> str:
        return self._args[index]
    def __setitem__(self, index: int, value: str):
        self._args[index] = value

    def append(self, value: str):
        self._args.append(value)

class Hostmask(object):
    def __init__(self, nickname: str, username: str, hostname: str,
            hostmask: str):
        self.nickname = nickname
        self.username = username
        self.hostname = hostname
        self.hostmask = hostmask
    def __repr__(self):
        return "Hostmask(%s)" % self.__str__()
    def __str__(self):
        return self.hostmask

def parse_hostmask(hostmask: str) -> Hostmask:
    nickname, _, username = hostmask.partition("!")
    username, _, hostname = username.partition("@")
    return Hostmask(nickname, username, hostname, hostmask)

MESSAGE_TAG_ESCAPED = [r"\:", r"\s", r"\\", r"\r", r"\n"]
MESSAGE_TAG_UNESCAPED = [";", " ", "\\", "\r", "\n"]
def message_tag_escape(s):
    return utils.irc.multi_replace(s, MESSAGE_TAG_UNESCAPED,
        MESSAGE_TAG_ESCAPED)
def message_tag_unescape(s):
    unescaped = utils.irc.multi_replace(s, MESSAGE_TAG_ESCAPED,
        MESSAGE_TAG_UNESCAPED)
    return unescaped.replace("\\", "")

class ParsedLine(object):
    def __init__(self, command: str, args: typing.List[str],
            source: Hostmask=None,
            tags: typing.Dict[str, str]=None):
        self.id = str(uuid.uuid4())
        self.command = command
        self._args = args
        self.args = IRCArgs(args)
        self.source = source
        self.tags = tags or {} # type: typing.Dict[str, str]
        self._valid = True
        self._assured = False

    def __repr__(self):
        return "ParsedLine(%s)" % self.__str__()
    def __str__(self):
        return self.format()

    def valid(self) -> bool:
        return self._valid
    def invalidate(self):
        self._valid = False

    def assured(self) -> bool:
        return self._assured
    def assure(self):
        self._assured = True

    def add_tag(self, tag: str, value: str=None):
        self.tags[tag] = value or ""
    def has_tag(self, tag: str) -> bool:
        return "tag" in self.tags
    def get_tag(self, tag: str) -> typing.Optional[str]:
        return self.tags[tag]

    def _tag_str(self, tags: typing.Dict[str, str]) -> str:
        tag_pieces = []
        for tag, value in tags.items():
            if value:
                value_escaped = message_tag_escape(value)
                tag_pieces.append("%s=%s" % (tag, value_escaped))
            else:
                tag_pieces.append(tag)

        if tag_pieces:
            return "@%s" % ";".join(tag_pieces)
        return ""

    def _format(self) -> typing.Tuple[str, str]:
        pieces = []
        tags = ""
        if self.tags:
            tags = self._tag_str(self.tags)

        if self.source:
            pieces.append(":%s" % str(self.source))

        pieces.append(self.command.upper())

        if self.args:
            for i, arg in enumerate(self._args):
                if arg and i == len(self._args)-1 and (
                        " " in arg or arg[0] == ":"):
                    pieces.append(":%s" % arg)
                else:
                    pieces.append(arg)

        return tags, " ".join(pieces).replace("\r", "")
    def format(self) -> str:
        tags, line = self._format()
        if tags:
            return "%s %s" % (tags, line)
        else:
            return line

class SendableLine(ParsedLine):
    def __init__(self, command: str, args: typing.List[str],
            margin: int=0, tags: typing.Dict[str, str]=None):
        ParsedLine.__init__(self, command, args, None, tags)
        self._margin = margin

    def push_last(self, arg: str, extra_margin: int=0,
            human_trunc: bool=False) -> typing.Optional[str]:
        last_arg = self.args[-1]
        tags, line = self._format()
        n = len(line.encode("utf8")) # get length of current line
        n += self._margin            # margin used for :hostmask
        if " " in arg and not " " in last_arg:
            n += 1                   # +1 for colon on new arg
        n += extra_margin            # used for things like (more ...)

        overflow: typing.Optional[str] = None

        if (n+len(arg.encode("utf8"))) > LINE_MAX:
            for i, char in enumerate(codecs.iterencode(arg, "utf8")):
                n += len(char)
                if n > LINE_MAX:
                    arg, overflow = arg[:i], arg[i:]
                    if human_trunc and not overflow[0] == " ":
                        new_arg, sep, new_overflow = arg.rpartition(" ")
                        if sep:
                            arg = new_arg
                            overflow = new_overflow+overflow
                    break
        if arg:
            self.args[-1] = last_arg+arg
        return overflow

def parse_line(line: str) -> ParsedLine:
    tags = {} # type: typing.Dict[str, typing.Any]
    source = None # type: typing.Optional[Hostmask]
    command = None

    if line[0] == "@":
        tags_prefix, line = line[1:].split(" ", 1)

        for tag in filter(None, tags_prefix.split(";")):
            tag, sep, value = tag.partition("=")
            if value:
                tags[tag] = message_tag_unescape(value)
            else:
                tags[tag] = None

    line, trailing_separator, trailing_split = line.partition(" :")

    trailing = None # type: typing.Optional[str]
    if trailing_separator:
        trailing = trailing_split

    if line[0] == ":":
        source_str, line = line[1:].split(" ", 1)
        source = parse_hostmask(source_str)

    command, sep, line = line.partition(" ")
    args = [] # type: typing.List[str]
    if line:
        # this is so that `args` is empty if `line` is empty
        args = line.split(" ")

    if not trailing == None:
        args.append(typing.cast(str, trailing))
    return ParsedLine(command, args, source, tags)

def is_human(line: str):
    return len(line) > 1 and line[0] == "/"
def parse_human(line: str) -> typing.Optional[ParsedLine]:
    command, _, args = line[1:].partition(" ")
    if command == "msg":
        target, _, message = args.partition(" ")
        return ParsedLine("PRIVMSG", [target, message])
    return None

class SentLine(IRCObject.Object):
    def __init__(self, events: "EventManager.Events",
            send_time: datetime.datetime, hostmask: str, line: ParsedLine):
        self.events = events
        self.send_time = send_time
        self._hostmask = hostmask
        self.parsed_line = line

    def __repr__(self) -> str:
        return "IRCLine.SentLine(%s)" % self.__str__()
    def __str__(self) -> str:
        return self._for_wire()

    def _for_wire(self) -> str:
        return str(self.parsed_line)
    def for_wire(self) -> bytes:
        return b"%s\r\n" % self._for_wire().encode("utf8")

class IRCBatch(object):
    def __init__(self, identifier: str, batch_type: str, args: typing.List[str],
            tags: typing.Dict[str, str]=None, source: Hostmask=None):
        self.identifier = identifier
        self.type = batch_type
        self.args = args
        self.tags = tags or {}
        self.source = source
        self._lines = [] # type: typing.List[ParsedLine]
    def add_line(self, line: ParsedLine):
        self._lines.append(line)
    def get_lines(self) -> typing.List[ParsedLine]:
        return self._lines

class IRCSendBatch(IRCBatch):
    def __init__(self, batch_type: str, args: typing.List[str],
            tags: typing.Dict[str, str]=None):
        IRCBatch.__init__(self, str(uuid.uuid4()), batch_type, args, tags)
    def get_lines(self) -> typing.List[ParsedLine]:
        lines = []
        for line in self._lines:
            line.add_tag("batch", self.identifier)
            lines.append(line)

        lines.insert(0, ParsedLine("BATCH",
            ["+%s" % self.identifier, self.type]))
        lines.append(ParsedLine("BATCH", ["-%s" % self.identifier]))
        return lines