aboutsummaryrefslogtreecommitdiff
path: root/src/utils
diff options
context:
space:
mode:
Diffstat (limited to 'src/utils')
-rw-r--r--src/utils/http.py7
-rw-r--r--src/utils/irc/__init__.py42
2 files changed, 24 insertions, 25 deletions
diff --git a/src/utils/http.py b/src/utils/http.py
index 0488b0af..2d7c512a 100644
--- a/src/utils/http.py
+++ b/src/utils/http.py
@@ -64,18 +64,17 @@ def request(url: str, method: str="GET", get_params: dict={},
signal.signal(signal.SIGALRM, signal.SIG_IGN)
response_headers = utils.CaseInsensitiveDict(dict(response.headers))
-
+ data = response_content.decode(response.encoding or fallback_encoding)
content_type = response.headers["Content-Type"].split(";", 1)[0]
+
if soup:
if content_type in SOUP_CONTENT_TYPES:
- soup = bs4.BeautifulSoup(response_content, parser)
+ soup = bs4.BeautifulSoup(data, parser)
return Response(response.status_code, soup, response_headers)
else:
raise HTTPWrongContentTypeException(
"Tried to soup non-html/non-xml data")
-
- data = response_content.decode(response.encoding or fallback_encoding)
if json and data:
try:
return Response(response.status_code, _json.loads(data),
diff --git a/src/utils/irc/__init__.py b/src/utils/irc/__init__.py
index 14bbb5b0..9577bcc1 100644
--- a/src/utils/irc/__init__.py
+++ b/src/utils/irc/__init__.py
@@ -45,7 +45,7 @@ def message_tag_unescape(s):
def parse_line(line: str) -> IRCLine.ParsedLine:
tags = {} # type: typing.Dict[str, typing.Any]
- prefix = None # type: typing.Optional[IRCLine.Hostmask]
+ source = None # type: typing.Optional[IRCLine.Hostmask]
command = None
if line[0] == "@":
@@ -65,8 +65,8 @@ def parse_line(line: str) -> IRCLine.ParsedLine:
trailing = trailing_split
if line[0] == ":":
- prefix_str, line = line[1:].split(" ", 1)
- prefix = seperate_hostmask(prefix_str)
+ source_str, line = line[1:].split(" ", 1)
+ source = seperate_hostmask(source_str)
command, sep, line = line.partition(" ")
args = [] # type: typing.List[str]
@@ -77,7 +77,7 @@ def parse_line(line: str) -> IRCLine.ParsedLine:
if not trailing == None:
args.append(typing.cast(str, trailing))
- return IRCLine.ParsedLine(command, args, prefix, tags)
+ return IRCLine.ParsedLine(command, args, source, tags)
REGEX_COLOR = re.compile("%s(?:(\d{1,2})(?:,(\d{1,2}))?)?" % utils.consts.COLOR)
@@ -258,28 +258,25 @@ def parse_ctcp(s: str) -> typing.Optional[CTCPMessage]:
return None
class IRCBatch(object):
- def __init__(self, identifier: str, batch_type: str, tags:
- typing.Dict[str, str]={}):
+ def __init__(self, identifier: str, batch_type: str, args: typing.List[str],
+ tags: typing.Dict[str, str]={}):
self.id = identifier
self.type = batch_type
+ self.args = args
self.tags = tags
- self.lines = [] # type: typing.List[IRCLine.ParsedLine]
-class IRCRecvBatch(IRCBatch):
- pass
-class IRCSendBatch(IRCBatch):
- def _add_line(self, line: IRCLine.ParsedLine):
- line.tags["batch"] = self.id
- self.lines.append(line)
- def message(self, target: str, message: str, tags: dict={}):
- self._add_line(utils.irc.protocol.message(target, message, tags))
- def notice(self, target: str, message: str, tags: dict={}):
- self._add_line(utils.irc.protocol.notice(target, message, tags))
+ self._lines = [] # type: typing.List[IRCLine.ParsedLine]
+ def add_line(self, line: IRCLine.ParsedLine):
+ self._lines.append(line)
+ def get_lines(self) -> typing.List[IRCLine.ParsedLine]:
+ return self._lines
class Capability(object):
- def __init__(self, name, draft_name=None):
+ def __init__(self, name: typing.Optional[str], draft_name: str=None):
self._caps = set([name, draft_name])
- self._on_ack_callbacks = []
- def available(self, capabilities: typing.Iterable[str]) -> str:
+ self._on_ack_callbacks = [
+ ] # type: typing.List[typing.Callable[[], None]]
+ def available(self, capabilities: typing.Iterable[str]
+ ) -> typing.Optional[str]:
match = list(set(capabilities)&self._caps)
return match[0] if match else None
@@ -295,11 +292,14 @@ class Capability(object):
pass
class MessageTag(object):
- def __init__(self, name: str, draft_name: str=None):
+ def __init__(self, name: typing.Optional[str], draft_name: str=None):
self._names = set([name, draft_name])
def get_value(self, tags: typing.Dict[str, str]) -> typing.Optional[str]:
key = list(set(tags.keys())&self._names)
return tags[key[0]] if key else None
+ def match(self, s: str) -> typing.Optional[str]:
+ key = list(set([s])&self._names)
+ return key[0] if key else None
def hostmask_match(hostmask: str, pattern: str) -> bool:
return fnmatch.fnmatchcase(hostmask, pattern)