diff options
Diffstat (limited to 'src/utils')
| -rw-r--r-- | src/utils/http.py | 7 | ||||
| -rw-r--r-- | src/utils/irc/__init__.py | 42 |
2 files changed, 24 insertions, 25 deletions
diff --git a/src/utils/http.py b/src/utils/http.py index 0488b0af..2d7c512a 100644 --- a/src/utils/http.py +++ b/src/utils/http.py @@ -64,18 +64,17 @@ def request(url: str, method: str="GET", get_params: dict={}, signal.signal(signal.SIGALRM, signal.SIG_IGN) response_headers = utils.CaseInsensitiveDict(dict(response.headers)) - + data = response_content.decode(response.encoding or fallback_encoding) content_type = response.headers["Content-Type"].split(";", 1)[0] + if soup: if content_type in SOUP_CONTENT_TYPES: - soup = bs4.BeautifulSoup(response_content, parser) + soup = bs4.BeautifulSoup(data, parser) return Response(response.status_code, soup, response_headers) else: raise HTTPWrongContentTypeException( "Tried to soup non-html/non-xml data") - - data = response_content.decode(response.encoding or fallback_encoding) if json and data: try: return Response(response.status_code, _json.loads(data), diff --git a/src/utils/irc/__init__.py b/src/utils/irc/__init__.py index 14bbb5b0..9577bcc1 100644 --- a/src/utils/irc/__init__.py +++ b/src/utils/irc/__init__.py @@ -45,7 +45,7 @@ def message_tag_unescape(s): def parse_line(line: str) -> IRCLine.ParsedLine: tags = {} # type: typing.Dict[str, typing.Any] - prefix = None # type: typing.Optional[IRCLine.Hostmask] + source = None # type: typing.Optional[IRCLine.Hostmask] command = None if line[0] == "@": @@ -65,8 +65,8 @@ def parse_line(line: str) -> IRCLine.ParsedLine: trailing = trailing_split if line[0] == ":": - prefix_str, line = line[1:].split(" ", 1) - prefix = seperate_hostmask(prefix_str) + source_str, line = line[1:].split(" ", 1) + source = seperate_hostmask(source_str) command, sep, line = line.partition(" ") args = [] # type: typing.List[str] @@ -77,7 +77,7 @@ def parse_line(line: str) -> IRCLine.ParsedLine: if not trailing == None: args.append(typing.cast(str, trailing)) - return IRCLine.ParsedLine(command, args, prefix, tags) + return IRCLine.ParsedLine(command, args, source, tags) REGEX_COLOR = re.compile("%s(?:(\d{1,2})(?:,(\d{1,2}))?)?" % utils.consts.COLOR) @@ -258,28 +258,25 @@ def parse_ctcp(s: str) -> typing.Optional[CTCPMessage]: return None class IRCBatch(object): - def __init__(self, identifier: str, batch_type: str, tags: - typing.Dict[str, str]={}): + def __init__(self, identifier: str, batch_type: str, args: typing.List[str], + tags: typing.Dict[str, str]={}): self.id = identifier self.type = batch_type + self.args = args self.tags = tags - self.lines = [] # type: typing.List[IRCLine.ParsedLine] -class IRCRecvBatch(IRCBatch): - pass -class IRCSendBatch(IRCBatch): - def _add_line(self, line: IRCLine.ParsedLine): - line.tags["batch"] = self.id - self.lines.append(line) - def message(self, target: str, message: str, tags: dict={}): - self._add_line(utils.irc.protocol.message(target, message, tags)) - def notice(self, target: str, message: str, tags: dict={}): - self._add_line(utils.irc.protocol.notice(target, message, tags)) + self._lines = [] # type: typing.List[IRCLine.ParsedLine] + def add_line(self, line: IRCLine.ParsedLine): + self._lines.append(line) + def get_lines(self) -> typing.List[IRCLine.ParsedLine]: + return self._lines class Capability(object): - def __init__(self, name, draft_name=None): + def __init__(self, name: typing.Optional[str], draft_name: str=None): self._caps = set([name, draft_name]) - self._on_ack_callbacks = [] - def available(self, capabilities: typing.Iterable[str]) -> str: + self._on_ack_callbacks = [ + ] # type: typing.List[typing.Callable[[], None]] + def available(self, capabilities: typing.Iterable[str] + ) -> typing.Optional[str]: match = list(set(capabilities)&self._caps) return match[0] if match else None @@ -295,11 +292,14 @@ class Capability(object): pass class MessageTag(object): - def __init__(self, name: str, draft_name: str=None): + def __init__(self, name: typing.Optional[str], draft_name: str=None): self._names = set([name, draft_name]) def get_value(self, tags: typing.Dict[str, str]) -> typing.Optional[str]: key = list(set(tags.keys())&self._names) return tags[key[0]] if key else None + def match(self, s: str) -> typing.Optional[str]: + key = list(set([s])&self._names) + return key[0] if key else None def hostmask_match(hostmask: str, pattern: str) -> bool: return fnmatch.fnmatchcase(hostmask, pattern) |
