diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/EventManager.py | 4 | ||||
| -rw-r--r-- | src/IRCBuffer.py | 6 | ||||
| -rw-r--r-- | src/IRCChannel.py | 4 | ||||
| -rw-r--r-- | src/IRCServer.py | 18 | ||||
| -rw-r--r-- | src/IRCUser.py | 4 | ||||
| -rw-r--r-- | src/ModuleManager.py | 6 | ||||
| -rw-r--r-- | src/Utils.py | 378 | ||||
| -rw-r--r-- | src/utils/__init__.py | 173 | ||||
| -rw-r--r-- | src/utils/http.py | 94 | ||||
| -rw-r--r-- | src/utils/irc.py | 116 |
10 files changed, 404 insertions, 399 deletions
diff --git a/src/EventManager.py b/src/EventManager.py index e322004e..dbcf7449 100644 --- a/src/EventManager.py +++ b/src/EventManager.py @@ -1,5 +1,5 @@ import itertools, time, traceback -from src import Utils +from src import utils PRIORITY_URGENT = 0 PRIORITY_HIGH = 1 @@ -30,7 +30,7 @@ class EventCallback(object): self.function = function self.priority = priority self.kwargs = kwargs - self.docstring = Utils.parse_docstring(function.__doc__) + self.docstring = utils.parse_docstring(function.__doc__) def call(self, event): return self.function(event) diff --git a/src/IRCBuffer.py b/src/IRCBuffer.py index f5e6fa70..06465749 100644 --- a/src/IRCBuffer.py +++ b/src/IRCBuffer.py @@ -1,5 +1,5 @@ import re -from . import Utils +from src import utils class BufferLine(object): def __init__(self, sender, message, action, tags, from_self, method): @@ -39,7 +39,7 @@ class Buffer(object): def find(self, pattern, **kwargs): from_self = kwargs.get("from_self", True) for_user = kwargs.get("for_user", "") - for_user = Utils.irc_lower(self.server, for_user + for_user = utils.irc.lower(self.server, for_user ) if for_user else None not_pattern = kwargs.get("not_pattern", None) for line in self.lines: @@ -48,7 +48,7 @@ class Buffer(object): elif re.search(pattern, line.message): if not_pattern and re.search(not_pattern, line.message): continue - if for_user and not Utils.irc_lower(self.server, line.sender + if for_user and not utils.irc.lower(self.server, line.sender ) == for_user: continue return line diff --git a/src/IRCChannel.py b/src/IRCChannel.py index 02622380..ac79b0a4 100644 --- a/src/IRCChannel.py +++ b/src/IRCChannel.py @@ -1,9 +1,9 @@ import uuid -from . import IRCBuffer, IRCObject, Utils +from src import IRCBuffer, IRCObject, utils class Channel(IRCObject.Object): def __init__(self, name, id, server, bot): - self.name = Utils.irc_lower(server, name) + self.name = utils.irc.lower(server, name) self.id = id self.server = server self.bot = bot diff --git a/src/IRCServer.py b/src/IRCServer.py index 282d4bc3..cdac78ab 100644 --- a/src/IRCServer.py +++ b/src/IRCServer.py @@ -1,5 +1,5 @@ import collections, socket, ssl, sys, time -from . import IRCChannel, IRCObject, IRCUser, Utils +from src import IRCChannel, IRCObject, IRCUser, utils THROTTLE_LINES = 4 THROTTLE_SECONDS = 1 @@ -142,9 +142,9 @@ class Server(IRCObject.Object): def set_own_nickname(self, nickname): self.nickname = nickname - self.nickname_lower = Utils.irc_lower(self, nickname) + self.nickname_lower = utils.irc.lower(self, nickname) def is_own_nickname(self, nickname): - return Utils.irc_equals(self, nickname, self.nickname) + return utils.irc.equals(self, nickname, self.nickname) def add_own_mode(self, mode, arg=None): self.own_modes[mode] = arg @@ -157,7 +157,7 @@ class Server(IRCObject.Object): self.add_own_mode(mode, arg) def has_user(self, nickname): - return Utils.irc_lower(self, nickname) in self.users + return utils.irc.lower(self, nickname) in self.users def get_user(self, nickname, create=True): if not self.has_user(nickname) and create: user_id = self.get_user_id(nickname) @@ -165,7 +165,7 @@ class Server(IRCObject.Object): self.events.on("new.user").call(user=new_user, server=self) self.users[new_user.nickname_lower] = new_user self.new_users.add(new_user) - return self.users.get(Utils.irc_lower(self, nickname), None) + return self.users.get(utils.irc.lower(self, nickname), None) def get_user_id(self, nickname): self.bot.database.users.add(self.id, nickname) return self.bot.database.users.get_id(self.id, nickname) @@ -175,11 +175,11 @@ class Server(IRCObject.Object): channel.remove_user(user) def change_user_nickname(self, old_nickname, new_nickname): - user = self.users.pop(Utils.irc_lower(self, old_nickname)) + user = self.users.pop(utils.irc.lower(self, old_nickname)) user._id = self.get_user_id(new_nickname) - self.users[Utils.irc_lower(self, new_nickname)] = user + self.users[utils.irc.lower(self, new_nickname)] = user def has_channel(self, channel_name): - return channel_name[0] in self.channel_types and Utils.irc_lower( + return channel_name[0] in self.channel_types and utils.irc.lower( self, channel_name) in self.channels def get_channel(self, channel_name): if not self.has_channel(channel_name): @@ -189,7 +189,7 @@ class Server(IRCObject.Object): self.events.on("new.channel").call(channel=new_channel, server=self) self.channels[new_channel.name] = new_channel - return self.channels[Utils.irc_lower(self, channel_name)] + return self.channels[utils.irc.lower(self, channel_name)] def get_channel_id(self, channel_name): self.bot.database.channels.add(self.id, channel_name) return self.bot.database.channels.get_id(self.id, channel_name) diff --git a/src/IRCUser.py b/src/IRCUser.py index 3f2eb260..a26aa591 100644 --- a/src/IRCUser.py +++ b/src/IRCUser.py @@ -1,5 +1,5 @@ import uuid -from . import IRCBuffer, IRCObject, Utils +from src import IRCBuffer, IRCObject, utils class User(IRCObject.Object): def __init__(self, nickname, id, server, bot): @@ -33,7 +33,7 @@ class User(IRCObject.Object): def set_nickname(self, nickname): self.nickname = nickname - self.nickname_lower = Utils.irc_lower(self.server, nickname) + self.nickname_lower = utils.irc.lower(self.server, nickname) self.name = self.nickname_lower def join_channel(self, channel): self.channels.add(channel) diff --git a/src/ModuleManager.py b/src/ModuleManager.py index 1f229611..5d997f54 100644 --- a/src/ModuleManager.py +++ b/src/ModuleManager.py @@ -1,5 +1,5 @@ import gc, glob, imp, io, inspect, os, sys, uuid -from src import Utils +from . import utils BITBOT_HOOKS_MAGIC = "__bitbot_hooks" BITBOT_EXPORTS_MAGIC = "__bitbot_exports" @@ -54,7 +54,7 @@ class ModuleManager(object): def _load_module(self, bot, name): path = self._module_path(name) - for hashflag, value in Utils.get_hashflags(path): + for hashflag, value in utils.get_hashflags(path): if hashflag == "ignore": # nope, ignore this module. raise ModuleNotLoadedWarning("module ignored") @@ -92,7 +92,7 @@ class ModuleManager(object): attribute = getattr(module_object, attribute_name) for hook in self._get_magic(attribute, BITBOT_HOOKS_MAGIC, []): context_events.on(hook["event"]).hook(attribute, - docstring=attribute.__doc__, **hook["kwargs"]) + **hook["kwargs"]) for export in self._get_magic(module_object, BITBOT_EXPORTS_MAGIC, []): context_exports.add(export["setting"], export["value"]) diff --git a/src/Utils.py b/src/Utils.py deleted file mode 100644 index 072f14e0..00000000 --- a/src/Utils.py +++ /dev/null @@ -1,378 +0,0 @@ -import io, json, re, traceback, urllib.request, urllib.parse, urllib.error -import ssl, string -import bs4 -from . import ModuleManager - -USER_AGENT = ("Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 " - "(KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36") -REGEX_HTTP = re.compile("https?://", re.I) -ASCII_UPPER = string.ascii_uppercase -ASCII_LOWER = string.ascii_lowercase -STRICT_RFC1459_UPPER = ASCII_UPPER+r'\[]' -STRICT_RFC1459_LOWER = ASCII_LOWER+r'|{}' -RFC1459_UPPER = STRICT_RFC1459_UPPER+"^" -RFC1459_LOWER = STRICT_RFC1459_LOWER+"~" - -def remove_colon(s): - if s.startswith(":"): - s = s[1:] - return s - -def arbitrary(s, n): - return remove_colon(" ".join(s[n:])) - -# case mapping lowercase/uppcase logic -def _multi_replace(s, chars1, chars2): - for char1, char2 in zip(chars1, chars2): - s = s.replace(char1, char2) - return s -def irc_lower(server, s): - if server.case_mapping == "ascii": - return _multi_replace(s, ASCII_UPPER, ASCII_LOWER) - elif server.case_mapping == "rfc1459": - return _multi_replace(s, RFC1459_UPPER, RFC1459_LOWER) - elif server.case_mapping == "strict-rfc1459": - return _multi_replace(s, STRICT_RFC1459_UPPER, STRICT_RFC1459_LOWER) - else: - raise ValueError("unknown casemapping '%s'" % server.case_mapping) - -# compare a string while respecting case mapping -def irc_equals(server, s1, s2): - return irc_lower(server, s1) == irc_lower(server, s2) - -class IRCHostmask(object): - def __init__(self, nickname, username, hostname, hostmask): - self.nickname = nickname - self.username = username - self.hostname = hostname - self.hostmask = hostmask - def __repr__(self): - return "Utils.IRCHostmask(%s)" % self.__str__() - def __str__(self): - return self.hostmask - -def seperate_hostmask(hostmask): - hostmask = remove_colon(hostmask) - nickname, _, username = hostmask.partition("!") - username, _, hostname = username.partition("@") - return IRCHostmask(nickname, username, hostname, hostmask) - -def get_url(url, **kwargs): - if not urllib.parse.urlparse(url).scheme: - url = "http://%s" % url - url_parsed = urllib.parse.urlparse(url) - - method = kwargs.get("method", "GET") - get_params = kwargs.get("get_params", "") - post_params = kwargs.get("post_params", None) - headers = kwargs.get("headers", {}) - if get_params: - get_params = "?%s" % urllib.parse.urlencode(get_params) - if post_params: - post_params = urllib.parse.urlencode(post_params).encode("utf8") - url = "%s%s" % (url, get_params) - try: - url.encode("latin-1") - except UnicodeEncodeError: - if kwargs.get("code"): - return 0, False - return False - - request = urllib.request.Request(url, post_params) - request.add_header("Accept-Language", "en-US") - request.add_header("User-Agent", USER_AGENT) - for header, value in headers.items(): - request.add_header(header, value) - request.method = method - - try: - response = urllib.request.urlopen(request, timeout=5) - except urllib.error.HTTPError as e: - traceback.print_exc() - if kwargs.get("code"): - return e.code, False - return False - except urllib.error.URLError as e: - traceback.print_exc() - if kwargs.get("code"): - return -1, False - return False - except ssl.CertificateError as e: - traceback.print_exc() - if kwargs.get("code"): - return -1, False, - return False - - response_content = response.read() - encoding = response.info().get_content_charset() - if kwargs.get("soup"): - return bs4.BeautifulSoup(response_content, kwargs.get("parser", "lxml")) - if not encoding: - soup = bs4.BeautifulSoup(response_content, kwargs.get("parser", "lxml")) - metas = soup.find_all("meta") - for meta in metas: - if "charset=" in meta.get("content", ""): - encoding = meta.get("content").split("charset=", 1)[1 - ].split(";", 1)[0] - elif meta.get("charset", ""): - encoding = meta.get("charset") - else: - continue - break - if not encoding: - for item in soup.contents: - if isinstance(item, bs4.Doctype): - if item == "html": - encoding = "utf8" - else: - encoding = "latin-1" - break - response_content = response_content.decode(encoding or "utf8") - data = response_content - if kwargs.get("json") and data: - try: - data = json.loads(response_content) - except json.decoder.JSONDecodeError: - traceback.print_exc() - return False - if kwargs.get("code"): - return response.code, data - else: - return data - -COLOR_WHITE, COLOR_BLACK, COLOR_BLUE, COLOR_GREEN = 0, 1, 2, 3 -COLOR_RED, COLOR_BROWN, COLOR_PURPLE, COLOR_ORANGE = 4, 5, 6, 7 -COLOR_YELLOW, COLOR_LIGHTGREEN, COLOR_CYAN, COLOR_LIGHTCYAN = (8, 9, - 10, 11) -COLOR_LIGHTBLUE, COLOR_PINK, COLOR_GREY, COLOR_LIGHTGREY = (12, 13, - 14, 15) -FONT_BOLD, FONT_ITALIC, FONT_UNDERLINE, FONT_INVERT = ("\x02", "\x1D", - "\x1F", "\x16") -FONT_COLOR, FONT_RESET = "\x03", "\x0F" -REGEX_COLOR = re.compile("%s\d\d(?:,\d\d)?" % FONT_COLOR) - -def color(s, foreground, background=None): - foreground = str(foreground).zfill(2) - if background: - background = str(background).zfill(2) - return "%s%s%s%s%s" % (FONT_COLOR, foreground, - "" if not background else ",%s" % background, s, FONT_COLOR) - -def bold(s): - return "%s%s%s" % (FONT_BOLD, s, FONT_BOLD) - -def underline(s): - return "%s%s%s" % (FONT_UNDERLINE, s, FONT_UNDERLINE) - -def strip_font(s): - s = s.replace(FONT_BOLD, "") - s = s.replace(FONT_ITALIC, "") - s = REGEX_COLOR.sub("", s) - s = s.replace(FONT_COLOR, "") - return s - -TIME_SECOND = 1 -TIME_MINUTE = TIME_SECOND*60 -TIME_HOUR = TIME_MINUTE*60 -TIME_DAY = TIME_HOUR*24 -TIME_WEEK = TIME_DAY*7 - -def time_unit(seconds): - since = None - unit = None - if seconds >= TIME_WEEK: - since = seconds/TIME_WEEK - unit = "week" - elif seconds >= TIME_DAY: - since = seconds/TIME_DAY - unit = "day" - elif seconds >= TIME_HOUR: - since = seconds/TIME_HOUR - unit = "hour" - elif seconds >= TIME_MINUTE: - since = seconds/TIME_MINUTE - unit = "minute" - else: - since = seconds - unit = "second" - since = int(since) - if since > 1: - unit = "%ss" % unit # pluralise the unit - return [since, unit] - -REGEX_PRETTYTIME = re.compile("\d+[wdhms]", re.I) - -SECONDS_MINUTES = 60 -SECONDS_HOURS = SECONDS_MINUTES*60 -SECONDS_DAYS = SECONDS_HOURS*24 -SECONDS_WEEKS = SECONDS_DAYS*7 - -def from_pretty_time(pretty_time): - seconds = 0 - for match in re.findall(REGEX_PRETTYTIME, pretty_time): - number, unit = int(match[:-1]), match[-1].lower() - if unit == "m": - number = number*SECONDS_MINUTES - elif unit == "h": - number = number*SECONDS_HOURS - elif unit == "d": - number = number*SECONDS_DAYS - elif unit == "w": - number = number*SECONDS_WEEKS - seconds += number - if seconds > 0: - return seconds - -UNIT_SECOND = 5 -UNIT_MINUTE = 4 -UNIT_HOUR = 3 -UNIT_DAY = 2 -UNIT_WEEK = 1 -def to_pretty_time(total_seconds, minimum_unit=UNIT_SECOND, max_units=6): - minutes, seconds = divmod(total_seconds, 60) - hours, minutes = divmod(minutes, 60) - days, hours = divmod(hours, 24) - weeks, days = divmod(days, 7) - out = "" - - units = 0 - if weeks and minimum_unit >= UNIT_WEEK and units < max_units: - out += "%dw" % weeks - units += 1 - if days and minimum_unit >= UNIT_DAY and units < max_units: - out += "%dd" % days - units += 1 - if hours and minimum_unit >= UNIT_HOUR and units < max_units: - out += "%dh" % hours - units += 1 - if minutes and minimum_unit >= UNIT_MINUTE and units < max_units: - out += "%dm" % minutes - units += 1 - if seconds and minimum_unit >= UNIT_SECOND and units < max_units: - out += "%ds" % seconds - units += 1 - return out - -IS_TRUE = ["true", "yes", "on", "y"] -IS_FALSE = ["false", "no", "off", "n"] -def bool_or_none(s): - s = s.lower() - if s in IS_TRUE: - return True - elif s in IS_FALSE: - return False -def int_or_none(s): - stripped_s = s.lstrip("0") - if stripped_s.isdigit(): - return int(stripped_s) - -def get_closest_setting(event, setting, default=None): - server = event["server"] - if "channel" in event: - closest = event["channel"] - elif "target" in event and "is_channel" in event and event["is_channel"]: - closest = event["target"] - else: - closest = event["user"] - return closest.get_setting(setting, server.get_setting(setting, default)) - -def prevent_highlight(nickname): - return nickname[0]+"\u200c"+nickname[1:] - -def _set_get_append(obj, setting, item): - if not hasattr(obj, setting): - setattr(obj, setting, []) - getattr(obj, setting).append(item) -def hook(event, **kwargs): - def _hook_func(func): - _set_get_append(func, ModuleManager.BITBOT_HOOKS_MAGIC, - {"event": event, "kwargs": kwargs}) - return func - return _hook_func -def export(setting, value): - def _export_func(module): - _set_get_append(module, ModuleManager.BITBOT_EXPORTS_MAGIC, - {"setting": setting, "value": value}) - return module - return _export_func - -def strip_html(s): - return bs4.BeautifulSoup(s, "lxml").get_text() - -def get_hashflags(filename): - hashflags = {} - with io.open(filename, mode="r", encoding="utf8") as f: - for line in f: - line = line.strip("\n") - if not line.startswith("#"): - break - elif line.startswith("#--"): - line_split = line.split(" ", 1) - hashflag = line_split[0][3:] - value = None - - if len(line_split) > 1: - value = line_split[1] - hashflags[hashflag] = value - return hashflags.items() - -class Docstring(object): - def __init__(self, description, items): - self.description = description - self.items = items - -def parse_docstring(s): - description = "" - last_item = None - items = {} - if s: - for line in s.split("\n"): - line = line.strip() - - if line: - if line[0] == ":": - key, _, value = line.partition(": ") - last_item = value - items[value] = value - else: - if last_item: - items[last_item] += " %s" % line - else: - if description: - description += " " - description += line - return Docstring(description, items) - -class IRCLine(object): - def __init__(self, tags, prefix, command, args, arbitrary, last, server): - self.tags = tags - self.prefix = prefix - self.command = command - self.args = args - self.arbitrary = arbitrary - self.last = last - self.server = server - -def parse_line(server, line): - tags = {} - prefix = None - command = None - - if line[0] == "@": - tags_prefix, line = line[1:].split(" ", 1) - for tag in filter(None, tags_prefix.split(";")): - tag, _, value = tag.partition("=") - tags[tag] = value - - line, _, arbitrary = line.partition(" :") - arbitrary = arbitrary or None - - if line[0] == ":": - prefix, line = line[1:].split(" ", 1) - prefix = seperate_hostmask(prefix) - command, _, line = line.partition(" ") - - args = line.split(" ") - last = arbitrary or args[-1] - - return IRCLine(tags, prefix, command, args, arbitrary, last, server) diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 00000000..d568e517 --- /dev/null +++ b/src/utils/__init__.py @@ -0,0 +1,173 @@ +from . import irc, http + +import io, re +from src import ModuleManager + +TIME_SECOND = 1 +TIME_MINUTE = TIME_SECOND*60 +TIME_HOUR = TIME_MINUTE*60 +TIME_DAY = TIME_HOUR*24 +TIME_WEEK = TIME_DAY*7 + +def time_unit(seconds): + since = None + unit = None + if seconds >= TIME_WEEK: + since = seconds/TIME_WEEK + unit = "week" + elif seconds >= TIME_DAY: + since = seconds/TIME_DAY + unit = "day" + elif seconds >= TIME_HOUR: + since = seconds/TIME_HOUR + unit = "hour" + elif seconds >= TIME_MINUTE: + since = seconds/TIME_MINUTE + unit = "minute" + else: + since = seconds + unit = "second" + since = int(since) + if since > 1: + unit = "%ss" % unit # pluralise the unit + return [since, unit] + +REGEX_PRETTYTIME = re.compile("\d+[wdhms]", re.I) + +SECONDS_MINUTES = 60 +SECONDS_HOURS = SECONDS_MINUTES*60 +SECONDS_DAYS = SECONDS_HOURS*24 +SECONDS_WEEKS = SECONDS_DAYS*7 + +def from_pretty_time(pretty_time): + seconds = 0 + for match in re.findall(REGEX_PRETTYTIME, pretty_time): + number, unit = int(match[:-1]), match[-1].lower() + if unit == "m": + number = number*SECONDS_MINUTES + elif unit == "h": + number = number*SECONDS_HOURS + elif unit == "d": + number = number*SECONDS_DAYS + elif unit == "w": + number = number*SECONDS_WEEKS + seconds += number + if seconds > 0: + return seconds + +UNIT_SECOND = 5 +UNIT_MINUTE = 4 +UNIT_HOUR = 3 +UNIT_DAY = 2 +UNIT_WEEK = 1 +def to_pretty_time(total_seconds, minimum_unit=UNIT_SECOND, max_units=6): + minutes, seconds = divmod(total_seconds, 60) + hours, minutes = divmod(minutes, 60) + days, hours = divmod(hours, 24) + weeks, days = divmod(days, 7) + out = "" + + units = 0 + if weeks and minimum_unit >= UNIT_WEEK and units < max_units: + out += "%dw" % weeks + units += 1 + if days and minimum_unit >= UNIT_DAY and units < max_units: + out += "%dd" % days + units += 1 + if hours and minimum_unit >= UNIT_HOUR and units < max_units: + out += "%dh" % hours + units += 1 + if minutes and minimum_unit >= UNIT_MINUTE and units < max_units: + out += "%dm" % minutes + units += 1 + if seconds and minimum_unit >= UNIT_SECOND and units < max_units: + out += "%ds" % seconds + units += 1 + return out + +IS_TRUE = ["true", "yes", "on", "y"] +IS_FALSE = ["false", "no", "off", "n"] +def bool_or_none(s): + s = s.lower() + if s in IS_TRUE: + return True + elif s in IS_FALSE: + return False +def int_or_none(s): + stripped_s = s.lstrip("0") + if stripped_s.isdigit(): + return int(stripped_s) + +def get_closest_setting(event, setting, default=None): + server = event["server"] + if "channel" in event: + closest = event["channel"] + elif "target" in event and "is_channel" in event and event["is_channel"]: + closest = event["target"] + else: + closest = event["user"] + return closest.get_setting(setting, server.get_setting(setting, default)) + +def prevent_highlight(nickname): + return nickname[0]+"\u200c"+nickname[1:] + +def _set_get_append(obj, setting, item): + if not hasattr(obj, setting): + setattr(obj, setting, []) + getattr(obj, setting).append(item) +def hook(event, **kwargs): + def _hook_func(func): + _set_get_append(func, ModuleManager.BITBOT_HOOKS_MAGIC, + {"event": event, "kwargs": kwargs}) + return func + return _hook_func +def export(setting, value): + def _export_func(module): + _set_get_append(module, ModuleManager.BITBOT_EXPORTS_MAGIC, + {"setting": setting, "value": value}) + return module + return _export_func + +def get_hashflags(filename): + hashflags = {} + with io.open(filename, mode="r", encoding="utf8") as f: + for line in f: + line = line.strip("\n") + if not line.startswith("#"): + break + elif line.startswith("#--"): + line_split = line.split(" ", 1) + hashflag = line_split[0][3:] + value = None + + if len(line_split) > 1: + value = line_split[1] + hashflags[hashflag] = value + return hashflags.items() + +class Docstring(object): + def __init__(self, description, items): + self.description = description + self.items = items + +def parse_docstring(s): + description = "" + last_item = None + items = {} + if s: + for line in s.split("\n"): + line = line.strip() + + if line: + if line[0] == ":": + key, _, value = line.partition(": ") + last_item = value + items[value] = value + else: + if last_item: + items[last_item] += " %s" % line + else: + if description: + description += " " + description += line + return Docstring(description, items) diff --git a/src/utils/http.py b/src/utils/http.py new file mode 100644 index 00000000..e9cb41f9 --- /dev/null +++ b/src/utils/http.py @@ -0,0 +1,94 @@ +import re, traceback, urllib.error, urllib.parse, urllib.request +import json, ssl +import bs4 + +USER_AGENT = ("Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36") +REGEX_HTTP = re.compile("https?://", re.I) + +def get_url(url, **kwargs): + if not urllib.parse.urlparse(url).scheme: + url = "http://%s" % url + url_parsed = urllib.parse.urlparse(url) + + method = kwargs.get("method", "GET") + get_params = kwargs.get("get_params", "") + post_params = kwargs.get("post_params", None) + headers = kwargs.get("headers", {}) + if get_params: + get_params = "?%s" % urllib.parse.urlencode(get_params) + if post_params: + post_params = urllib.parse.urlencode(post_params).encode("utf8") + url = "%s%s" % (url, get_params) + try: + url.encode("latin-1") + except UnicodeEncodeError: + if kwargs.get("code"): + return 0, False + return False + + request = urllib.request.Request(url, post_params) + request.add_header("Accept-Language", "en-US") + request.add_header("User-Agent", USER_AGENT) + for header, value in headers.items(): + request.add_header(header, value) + request.method = method + + try: + response = urllib.request.urlopen(request, timeout=5) + except urllib.error.HTTPError as e: + traceback.print_exc() + if kwargs.get("code"): + return e.code, False + return False + except urllib.error.URLError as e: + traceback.print_exc() + if kwargs.get("code"): + return -1, False + return False + except ssl.CertificateError as e: + traceback.print_exc() + if kwargs.get("code"): + return -1, False, + return False + + response_content = response.read() + encoding = response.info().get_content_charset() + if kwargs.get("soup"): + return bs4.BeautifulSoup(response_content, kwargs.get("parser", "lxml")) + if not encoding: + soup = bs4.BeautifulSoup(response_content, kwargs.get("parser", "lxml")) + metas = soup.find_all("meta") + for meta in metas: + if "charset=" in meta.get("content", ""): + encoding = meta.get("content").split("charset=", 1)[1 + ].split(";", 1)[0] + elif meta.get("charset", ""): + encoding = meta.get("charset") + else: + continue + break + if not encoding: + for item in soup.contents: + if isinstance(item, bs4.Doctype): + if item == "html": + encoding = "utf8" + else: + encoding = "latin-1" + break + response_content = response_content.decode(encoding or "utf8") + data = response_content + if kwargs.get("json") and data: + try: + data = json.loads(response_content) + except json.decoder.JSONDecodeError: + traceback.print_exc() + return False + if kwargs.get("code"): + return response.code, data + else: + return data + +def strip_html(s): + return bs4.BeautifulSoup(s, "lxml").get_text() + diff --git a/src/utils/irc.py b/src/utils/irc.py new file mode 100644 index 00000000..792de7f3 --- /dev/null +++ b/src/utils/irc.py @@ -0,0 +1,116 @@ +import string, re + +ASCII_UPPER = string.ascii_uppercase +ASCII_LOWER = string.ascii_lowercase +STRICT_RFC1459_UPPER = ASCII_UPPER+r'\[]' +STRICT_RFC1459_LOWER = ASCII_LOWER+r'|{}' +RFC1459_UPPER = STRICT_RFC1459_UPPER+"^" +RFC1459_LOWER = STRICT_RFC1459_LOWER+"~" + +def remove_colon(s): + if s.startswith(":"): + s = s[1:] + return s + +# case mapping lowercase/uppcase logic +def _multi_replace(s, chars1, chars2): + for char1, char2 in zip(chars1, chars2): + s = s.replace(char1, char2) + return s +def lower(server, s): + if server.case_mapping == "ascii": + return _multi_replace(s, ASCII_UPPER, ASCII_LOWER) + elif server.case_mapping == "rfc1459": + return _multi_replace(s, RFC1459_UPPER, RFC1459_LOWER) + elif server.case_mapping == "strict-rfc1459": + return _multi_replace(s, STRICT_RFC1459_UPPER, STRICT_RFC1459_LOWER) + else: + raise ValueError("unknown casemapping '%s'" % server.case_mapping) + +# compare a string while respecting case mapping +def equals(server, s1, s2): + return lower(server, s1) == lower(server, s2) + +class IRCHostmask(object): + def __init__(self, nickname, username, hostname, hostmask): + self.nickname = nickname + self.username = username + self.hostname = hostname + self.hostmask = hostmask + def __repr__(self): + return "IRCHostmask(%s)" % self.__str__() + def __str__(self): + return self.hostmask + +def seperate_hostmask(hostmask): + hostmask = remove_colon(hostmask) + nickname, _, username = hostmask.partition("!") + username, _, hostname = username.partition("@") + return IRCHostmask(nickname, username, hostname, hostmask) + + +class IRCLine(object): + def __init__(self, tags, prefix, command, args, arbitrary, last, server): + self.tags = tags + self.prefix = prefix + self.command = command + self.args = args + self.arbitrary = arbitrary + self.last = last + self.server = server + +def parse_line(server, line): + tags = {} + prefix = None + command = None + + if line[0] == "@": + tags_prefix, line = line[1:].split(" ", 1) + for tag in filter(None, tags_prefix.split(";")): + tag, _, value = tag.partition("=") + tags[tag] = value + + line, _, arbitrary = line.partition(" :") + arbitrary = arbitrary or None + + if line[0] == ":": + prefix, line = line[1:].split(" ", 1) + prefix = seperate_hostmask(prefix) + command, _, line = line.partition(" ") + + args = line.split(" ") + last = arbitrary or args[-1] + + return IRCLine(tags, prefix, command, args, arbitrary, last, server) + +COLOR_WHITE, COLOR_BLACK, COLOR_BLUE, COLOR_GREEN = 0, 1, 2, 3 +COLOR_RED, COLOR_BROWN, COLOR_PURPLE, COLOR_ORANGE = 4, 5, 6, 7 +COLOR_YELLOW, COLOR_LIGHTGREEN, COLOR_CYAN, COLOR_LIGHTCYAN = (8, 9, + 10, 11) +COLOR_LIGHTBLUE, COLOR_PINK, COLOR_GREY, COLOR_LIGHTGREY = (12, 13, + 14, 15) +FONT_BOLD, FONT_ITALIC, FONT_UNDERLINE, FONT_INVERT = ("\x02", "\x1D", + "\x1F", "\x16") +FONT_COLOR, FONT_RESET = "\x03", "\x0F" +REGEX_COLOR = re.compile("%s\d\d(?:,\d\d)?" % FONT_COLOR) + +def color(s, foreground, background=None): + foreground = str(foreground).zfill(2) + if background: + background = str(background).zfill(2) + return "%s%s%s%s%s" % (FONT_COLOR, foreground, + "" if not background else ",%s" % background, s, FONT_COLOR) + +def bold(s): + return "%s%s%s" % (FONT_BOLD, s, FONT_BOLD) + +def underline(s): + return "%s%s%s" % (FONT_UNDERLINE, s, FONT_UNDERLINE) + +def strip_font(s): + s = s.replace(FONT_BOLD, "") + s = s.replace(FONT_ITALIC, "") + s = REGEX_COLOR.sub("", s) + s = s.replace(FONT_COLOR, "") + return s + |
