aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/EventManager.py4
-rw-r--r--src/IRCBuffer.py6
-rw-r--r--src/IRCChannel.py4
-rw-r--r--src/IRCServer.py18
-rw-r--r--src/IRCUser.py4
-rw-r--r--src/ModuleManager.py6
-rw-r--r--src/Utils.py378
-rw-r--r--src/utils/__init__.py173
-rw-r--r--src/utils/http.py94
-rw-r--r--src/utils/irc.py116
10 files changed, 404 insertions, 399 deletions
diff --git a/src/EventManager.py b/src/EventManager.py
index e322004e..dbcf7449 100644
--- a/src/EventManager.py
+++ b/src/EventManager.py
@@ -1,5 +1,5 @@
import itertools, time, traceback
-from src import Utils
+from src import utils
PRIORITY_URGENT = 0
PRIORITY_HIGH = 1
@@ -30,7 +30,7 @@ class EventCallback(object):
self.function = function
self.priority = priority
self.kwargs = kwargs
- self.docstring = Utils.parse_docstring(function.__doc__)
+ self.docstring = utils.parse_docstring(function.__doc__)
def call(self, event):
return self.function(event)
diff --git a/src/IRCBuffer.py b/src/IRCBuffer.py
index f5e6fa70..06465749 100644
--- a/src/IRCBuffer.py
+++ b/src/IRCBuffer.py
@@ -1,5 +1,5 @@
import re
-from . import Utils
+from src import utils
class BufferLine(object):
def __init__(self, sender, message, action, tags, from_self, method):
@@ -39,7 +39,7 @@ class Buffer(object):
def find(self, pattern, **kwargs):
from_self = kwargs.get("from_self", True)
for_user = kwargs.get("for_user", "")
- for_user = Utils.irc_lower(self.server, for_user
+ for_user = utils.irc.lower(self.server, for_user
) if for_user else None
not_pattern = kwargs.get("not_pattern", None)
for line in self.lines:
@@ -48,7 +48,7 @@ class Buffer(object):
elif re.search(pattern, line.message):
if not_pattern and re.search(not_pattern, line.message):
continue
- if for_user and not Utils.irc_lower(self.server, line.sender
+ if for_user and not utils.irc.lower(self.server, line.sender
) == for_user:
continue
return line
diff --git a/src/IRCChannel.py b/src/IRCChannel.py
index 02622380..ac79b0a4 100644
--- a/src/IRCChannel.py
+++ b/src/IRCChannel.py
@@ -1,9 +1,9 @@
import uuid
-from . import IRCBuffer, IRCObject, Utils
+from src import IRCBuffer, IRCObject, utils
class Channel(IRCObject.Object):
def __init__(self, name, id, server, bot):
- self.name = Utils.irc_lower(server, name)
+ self.name = utils.irc.lower(server, name)
self.id = id
self.server = server
self.bot = bot
diff --git a/src/IRCServer.py b/src/IRCServer.py
index 282d4bc3..cdac78ab 100644
--- a/src/IRCServer.py
+++ b/src/IRCServer.py
@@ -1,5 +1,5 @@
import collections, socket, ssl, sys, time
-from . import IRCChannel, IRCObject, IRCUser, Utils
+from src import IRCChannel, IRCObject, IRCUser, utils
THROTTLE_LINES = 4
THROTTLE_SECONDS = 1
@@ -142,9 +142,9 @@ class Server(IRCObject.Object):
def set_own_nickname(self, nickname):
self.nickname = nickname
- self.nickname_lower = Utils.irc_lower(self, nickname)
+ self.nickname_lower = utils.irc.lower(self, nickname)
def is_own_nickname(self, nickname):
- return Utils.irc_equals(self, nickname, self.nickname)
+ return utils.irc.equals(self, nickname, self.nickname)
def add_own_mode(self, mode, arg=None):
self.own_modes[mode] = arg
@@ -157,7 +157,7 @@ class Server(IRCObject.Object):
self.add_own_mode(mode, arg)
def has_user(self, nickname):
- return Utils.irc_lower(self, nickname) in self.users
+ return utils.irc.lower(self, nickname) in self.users
def get_user(self, nickname, create=True):
if not self.has_user(nickname) and create:
user_id = self.get_user_id(nickname)
@@ -165,7 +165,7 @@ class Server(IRCObject.Object):
self.events.on("new.user").call(user=new_user, server=self)
self.users[new_user.nickname_lower] = new_user
self.new_users.add(new_user)
- return self.users.get(Utils.irc_lower(self, nickname), None)
+ return self.users.get(utils.irc.lower(self, nickname), None)
def get_user_id(self, nickname):
self.bot.database.users.add(self.id, nickname)
return self.bot.database.users.get_id(self.id, nickname)
@@ -175,11 +175,11 @@ class Server(IRCObject.Object):
channel.remove_user(user)
def change_user_nickname(self, old_nickname, new_nickname):
- user = self.users.pop(Utils.irc_lower(self, old_nickname))
+ user = self.users.pop(utils.irc.lower(self, old_nickname))
user._id = self.get_user_id(new_nickname)
- self.users[Utils.irc_lower(self, new_nickname)] = user
+ self.users[utils.irc.lower(self, new_nickname)] = user
def has_channel(self, channel_name):
- return channel_name[0] in self.channel_types and Utils.irc_lower(
+ return channel_name[0] in self.channel_types and utils.irc.lower(
self, channel_name) in self.channels
def get_channel(self, channel_name):
if not self.has_channel(channel_name):
@@ -189,7 +189,7 @@ class Server(IRCObject.Object):
self.events.on("new.channel").call(channel=new_channel,
server=self)
self.channels[new_channel.name] = new_channel
- return self.channels[Utils.irc_lower(self, channel_name)]
+ return self.channels[utils.irc.lower(self, channel_name)]
def get_channel_id(self, channel_name):
self.bot.database.channels.add(self.id, channel_name)
return self.bot.database.channels.get_id(self.id, channel_name)
diff --git a/src/IRCUser.py b/src/IRCUser.py
index 3f2eb260..a26aa591 100644
--- a/src/IRCUser.py
+++ b/src/IRCUser.py
@@ -1,5 +1,5 @@
import uuid
-from . import IRCBuffer, IRCObject, Utils
+from src import IRCBuffer, IRCObject, utils
class User(IRCObject.Object):
def __init__(self, nickname, id, server, bot):
@@ -33,7 +33,7 @@ class User(IRCObject.Object):
def set_nickname(self, nickname):
self.nickname = nickname
- self.nickname_lower = Utils.irc_lower(self.server, nickname)
+ self.nickname_lower = utils.irc.lower(self.server, nickname)
self.name = self.nickname_lower
def join_channel(self, channel):
self.channels.add(channel)
diff --git a/src/ModuleManager.py b/src/ModuleManager.py
index 1f229611..5d997f54 100644
--- a/src/ModuleManager.py
+++ b/src/ModuleManager.py
@@ -1,5 +1,5 @@
import gc, glob, imp, io, inspect, os, sys, uuid
-from src import Utils
+from . import utils
BITBOT_HOOKS_MAGIC = "__bitbot_hooks"
BITBOT_EXPORTS_MAGIC = "__bitbot_exports"
@@ -54,7 +54,7 @@ class ModuleManager(object):
def _load_module(self, bot, name):
path = self._module_path(name)
- for hashflag, value in Utils.get_hashflags(path):
+ for hashflag, value in utils.get_hashflags(path):
if hashflag == "ignore":
# nope, ignore this module.
raise ModuleNotLoadedWarning("module ignored")
@@ -92,7 +92,7 @@ class ModuleManager(object):
attribute = getattr(module_object, attribute_name)
for hook in self._get_magic(attribute, BITBOT_HOOKS_MAGIC, []):
context_events.on(hook["event"]).hook(attribute,
- docstring=attribute.__doc__, **hook["kwargs"])
+ **hook["kwargs"])
for export in self._get_magic(module_object, BITBOT_EXPORTS_MAGIC, []):
context_exports.add(export["setting"], export["value"])
diff --git a/src/Utils.py b/src/Utils.py
deleted file mode 100644
index 072f14e0..00000000
--- a/src/Utils.py
+++ /dev/null
@@ -1,378 +0,0 @@
-import io, json, re, traceback, urllib.request, urllib.parse, urllib.error
-import ssl, string
-import bs4
-from . import ModuleManager
-
-USER_AGENT = ("Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 "
- "(KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36")
-REGEX_HTTP = re.compile("https?://", re.I)
-ASCII_UPPER = string.ascii_uppercase
-ASCII_LOWER = string.ascii_lowercase
-STRICT_RFC1459_UPPER = ASCII_UPPER+r'\[]'
-STRICT_RFC1459_LOWER = ASCII_LOWER+r'|{}'
-RFC1459_UPPER = STRICT_RFC1459_UPPER+"^"
-RFC1459_LOWER = STRICT_RFC1459_LOWER+"~"
-
-def remove_colon(s):
- if s.startswith(":"):
- s = s[1:]
- return s
-
-def arbitrary(s, n):
- return remove_colon(" ".join(s[n:]))
-
-# case mapping lowercase/uppcase logic
-def _multi_replace(s, chars1, chars2):
- for char1, char2 in zip(chars1, chars2):
- s = s.replace(char1, char2)
- return s
-def irc_lower(server, s):
- if server.case_mapping == "ascii":
- return _multi_replace(s, ASCII_UPPER, ASCII_LOWER)
- elif server.case_mapping == "rfc1459":
- return _multi_replace(s, RFC1459_UPPER, RFC1459_LOWER)
- elif server.case_mapping == "strict-rfc1459":
- return _multi_replace(s, STRICT_RFC1459_UPPER, STRICT_RFC1459_LOWER)
- else:
- raise ValueError("unknown casemapping '%s'" % server.case_mapping)
-
-# compare a string while respecting case mapping
-def irc_equals(server, s1, s2):
- return irc_lower(server, s1) == irc_lower(server, s2)
-
-class IRCHostmask(object):
- def __init__(self, nickname, username, hostname, hostmask):
- self.nickname = nickname
- self.username = username
- self.hostname = hostname
- self.hostmask = hostmask
- def __repr__(self):
- return "Utils.IRCHostmask(%s)" % self.__str__()
- def __str__(self):
- return self.hostmask
-
-def seperate_hostmask(hostmask):
- hostmask = remove_colon(hostmask)
- nickname, _, username = hostmask.partition("!")
- username, _, hostname = username.partition("@")
- return IRCHostmask(nickname, username, hostname, hostmask)
-
-def get_url(url, **kwargs):
- if not urllib.parse.urlparse(url).scheme:
- url = "http://%s" % url
- url_parsed = urllib.parse.urlparse(url)
-
- method = kwargs.get("method", "GET")
- get_params = kwargs.get("get_params", "")
- post_params = kwargs.get("post_params", None)
- headers = kwargs.get("headers", {})
- if get_params:
- get_params = "?%s" % urllib.parse.urlencode(get_params)
- if post_params:
- post_params = urllib.parse.urlencode(post_params).encode("utf8")
- url = "%s%s" % (url, get_params)
- try:
- url.encode("latin-1")
- except UnicodeEncodeError:
- if kwargs.get("code"):
- return 0, False
- return False
-
- request = urllib.request.Request(url, post_params)
- request.add_header("Accept-Language", "en-US")
- request.add_header("User-Agent", USER_AGENT)
- for header, value in headers.items():
- request.add_header(header, value)
- request.method = method
-
- try:
- response = urllib.request.urlopen(request, timeout=5)
- except urllib.error.HTTPError as e:
- traceback.print_exc()
- if kwargs.get("code"):
- return e.code, False
- return False
- except urllib.error.URLError as e:
- traceback.print_exc()
- if kwargs.get("code"):
- return -1, False
- return False
- except ssl.CertificateError as e:
- traceback.print_exc()
- if kwargs.get("code"):
- return -1, False,
- return False
-
- response_content = response.read()
- encoding = response.info().get_content_charset()
- if kwargs.get("soup"):
- return bs4.BeautifulSoup(response_content, kwargs.get("parser", "lxml"))
- if not encoding:
- soup = bs4.BeautifulSoup(response_content, kwargs.get("parser", "lxml"))
- metas = soup.find_all("meta")
- for meta in metas:
- if "charset=" in meta.get("content", ""):
- encoding = meta.get("content").split("charset=", 1)[1
- ].split(";", 1)[0]
- elif meta.get("charset", ""):
- encoding = meta.get("charset")
- else:
- continue
- break
- if not encoding:
- for item in soup.contents:
- if isinstance(item, bs4.Doctype):
- if item == "html":
- encoding = "utf8"
- else:
- encoding = "latin-1"
- break
- response_content = response_content.decode(encoding or "utf8")
- data = response_content
- if kwargs.get("json") and data:
- try:
- data = json.loads(response_content)
- except json.decoder.JSONDecodeError:
- traceback.print_exc()
- return False
- if kwargs.get("code"):
- return response.code, data
- else:
- return data
-
-COLOR_WHITE, COLOR_BLACK, COLOR_BLUE, COLOR_GREEN = 0, 1, 2, 3
-COLOR_RED, COLOR_BROWN, COLOR_PURPLE, COLOR_ORANGE = 4, 5, 6, 7
-COLOR_YELLOW, COLOR_LIGHTGREEN, COLOR_CYAN, COLOR_LIGHTCYAN = (8, 9,
- 10, 11)
-COLOR_LIGHTBLUE, COLOR_PINK, COLOR_GREY, COLOR_LIGHTGREY = (12, 13,
- 14, 15)
-FONT_BOLD, FONT_ITALIC, FONT_UNDERLINE, FONT_INVERT = ("\x02", "\x1D",
- "\x1F", "\x16")
-FONT_COLOR, FONT_RESET = "\x03", "\x0F"
-REGEX_COLOR = re.compile("%s\d\d(?:,\d\d)?" % FONT_COLOR)
-
-def color(s, foreground, background=None):
- foreground = str(foreground).zfill(2)
- if background:
- background = str(background).zfill(2)
- return "%s%s%s%s%s" % (FONT_COLOR, foreground,
- "" if not background else ",%s" % background, s, FONT_COLOR)
-
-def bold(s):
- return "%s%s%s" % (FONT_BOLD, s, FONT_BOLD)
-
-def underline(s):
- return "%s%s%s" % (FONT_UNDERLINE, s, FONT_UNDERLINE)
-
-def strip_font(s):
- s = s.replace(FONT_BOLD, "")
- s = s.replace(FONT_ITALIC, "")
- s = REGEX_COLOR.sub("", s)
- s = s.replace(FONT_COLOR, "")
- return s
-
-TIME_SECOND = 1
-TIME_MINUTE = TIME_SECOND*60
-TIME_HOUR = TIME_MINUTE*60
-TIME_DAY = TIME_HOUR*24
-TIME_WEEK = TIME_DAY*7
-
-def time_unit(seconds):
- since = None
- unit = None
- if seconds >= TIME_WEEK:
- since = seconds/TIME_WEEK
- unit = "week"
- elif seconds >= TIME_DAY:
- since = seconds/TIME_DAY
- unit = "day"
- elif seconds >= TIME_HOUR:
- since = seconds/TIME_HOUR
- unit = "hour"
- elif seconds >= TIME_MINUTE:
- since = seconds/TIME_MINUTE
- unit = "minute"
- else:
- since = seconds
- unit = "second"
- since = int(since)
- if since > 1:
- unit = "%ss" % unit # pluralise the unit
- return [since, unit]
-
-REGEX_PRETTYTIME = re.compile("\d+[wdhms]", re.I)
-
-SECONDS_MINUTES = 60
-SECONDS_HOURS = SECONDS_MINUTES*60
-SECONDS_DAYS = SECONDS_HOURS*24
-SECONDS_WEEKS = SECONDS_DAYS*7
-
-def from_pretty_time(pretty_time):
- seconds = 0
- for match in re.findall(REGEX_PRETTYTIME, pretty_time):
- number, unit = int(match[:-1]), match[-1].lower()
- if unit == "m":
- number = number*SECONDS_MINUTES
- elif unit == "h":
- number = number*SECONDS_HOURS
- elif unit == "d":
- number = number*SECONDS_DAYS
- elif unit == "w":
- number = number*SECONDS_WEEKS
- seconds += number
- if seconds > 0:
- return seconds
-
-UNIT_SECOND = 5
-UNIT_MINUTE = 4
-UNIT_HOUR = 3
-UNIT_DAY = 2
-UNIT_WEEK = 1
-def to_pretty_time(total_seconds, minimum_unit=UNIT_SECOND, max_units=6):
- minutes, seconds = divmod(total_seconds, 60)
- hours, minutes = divmod(minutes, 60)
- days, hours = divmod(hours, 24)
- weeks, days = divmod(days, 7)
- out = ""
-
- units = 0
- if weeks and minimum_unit >= UNIT_WEEK and units < max_units:
- out += "%dw" % weeks
- units += 1
- if days and minimum_unit >= UNIT_DAY and units < max_units:
- out += "%dd" % days
- units += 1
- if hours and minimum_unit >= UNIT_HOUR and units < max_units:
- out += "%dh" % hours
- units += 1
- if minutes and minimum_unit >= UNIT_MINUTE and units < max_units:
- out += "%dm" % minutes
- units += 1
- if seconds and minimum_unit >= UNIT_SECOND and units < max_units:
- out += "%ds" % seconds
- units += 1
- return out
-
-IS_TRUE = ["true", "yes", "on", "y"]
-IS_FALSE = ["false", "no", "off", "n"]
-def bool_or_none(s):
- s = s.lower()
- if s in IS_TRUE:
- return True
- elif s in IS_FALSE:
- return False
-def int_or_none(s):
- stripped_s = s.lstrip("0")
- if stripped_s.isdigit():
- return int(stripped_s)
-
-def get_closest_setting(event, setting, default=None):
- server = event["server"]
- if "channel" in event:
- closest = event["channel"]
- elif "target" in event and "is_channel" in event and event["is_channel"]:
- closest = event["target"]
- else:
- closest = event["user"]
- return closest.get_setting(setting, server.get_setting(setting, default))
-
-def prevent_highlight(nickname):
- return nickname[0]+"\u200c"+nickname[1:]
-
-def _set_get_append(obj, setting, item):
- if not hasattr(obj, setting):
- setattr(obj, setting, [])
- getattr(obj, setting).append(item)
-def hook(event, **kwargs):
- def _hook_func(func):
- _set_get_append(func, ModuleManager.BITBOT_HOOKS_MAGIC,
- {"event": event, "kwargs": kwargs})
- return func
- return _hook_func
-def export(setting, value):
- def _export_func(module):
- _set_get_append(module, ModuleManager.BITBOT_EXPORTS_MAGIC,
- {"setting": setting, "value": value})
- return module
- return _export_func
-
-def strip_html(s):
- return bs4.BeautifulSoup(s, "lxml").get_text()
-
-def get_hashflags(filename):
- hashflags = {}
- with io.open(filename, mode="r", encoding="utf8") as f:
- for line in f:
- line = line.strip("\n")
- if not line.startswith("#"):
- break
- elif line.startswith("#--"):
- line_split = line.split(" ", 1)
- hashflag = line_split[0][3:]
- value = None
-
- if len(line_split) > 1:
- value = line_split[1]
- hashflags[hashflag] = value
- return hashflags.items()
-
-class Docstring(object):
- def __init__(self, description, items):
- self.description = description
- self.items = items
-
-def parse_docstring(s):
- description = ""
- last_item = None
- items = {}
- if s:
- for line in s.split("\n"):
- line = line.strip()
-
- if line:
- if line[0] == ":":
- key, _, value = line.partition(": ")
- last_item = value
- items[value] = value
- else:
- if last_item:
- items[last_item] += " %s" % line
- else:
- if description:
- description += " "
- description += line
- return Docstring(description, items)
-
-class IRCLine(object):
- def __init__(self, tags, prefix, command, args, arbitrary, last, server):
- self.tags = tags
- self.prefix = prefix
- self.command = command
- self.args = args
- self.arbitrary = arbitrary
- self.last = last
- self.server = server
-
-def parse_line(server, line):
- tags = {}
- prefix = None
- command = None
-
- if line[0] == "@":
- tags_prefix, line = line[1:].split(" ", 1)
- for tag in filter(None, tags_prefix.split(";")):
- tag, _, value = tag.partition("=")
- tags[tag] = value
-
- line, _, arbitrary = line.partition(" :")
- arbitrary = arbitrary or None
-
- if line[0] == ":":
- prefix, line = line[1:].split(" ", 1)
- prefix = seperate_hostmask(prefix)
- command, _, line = line.partition(" ")
-
- args = line.split(" ")
- last = arbitrary or args[-1]
-
- return IRCLine(tags, prefix, command, args, arbitrary, last, server)
diff --git a/src/utils/__init__.py b/src/utils/__init__.py
new file mode 100644
index 00000000..d568e517
--- /dev/null
+++ b/src/utils/__init__.py
@@ -0,0 +1,173 @@
+from . import irc, http
+
+import io, re
+from src import ModuleManager
+
+TIME_SECOND = 1
+TIME_MINUTE = TIME_SECOND*60
+TIME_HOUR = TIME_MINUTE*60
+TIME_DAY = TIME_HOUR*24
+TIME_WEEK = TIME_DAY*7
+
+def time_unit(seconds):
+ since = None
+ unit = None
+ if seconds >= TIME_WEEK:
+ since = seconds/TIME_WEEK
+ unit = "week"
+ elif seconds >= TIME_DAY:
+ since = seconds/TIME_DAY
+ unit = "day"
+ elif seconds >= TIME_HOUR:
+ since = seconds/TIME_HOUR
+ unit = "hour"
+ elif seconds >= TIME_MINUTE:
+ since = seconds/TIME_MINUTE
+ unit = "minute"
+ else:
+ since = seconds
+ unit = "second"
+ since = int(since)
+ if since > 1:
+ unit = "%ss" % unit # pluralise the unit
+ return [since, unit]
+
+REGEX_PRETTYTIME = re.compile("\d+[wdhms]", re.I)
+
+SECONDS_MINUTES = 60
+SECONDS_HOURS = SECONDS_MINUTES*60
+SECONDS_DAYS = SECONDS_HOURS*24
+SECONDS_WEEKS = SECONDS_DAYS*7
+
+def from_pretty_time(pretty_time):
+ seconds = 0
+ for match in re.findall(REGEX_PRETTYTIME, pretty_time):
+ number, unit = int(match[:-1]), match[-1].lower()
+ if unit == "m":
+ number = number*SECONDS_MINUTES
+ elif unit == "h":
+ number = number*SECONDS_HOURS
+ elif unit == "d":
+ number = number*SECONDS_DAYS
+ elif unit == "w":
+ number = number*SECONDS_WEEKS
+ seconds += number
+ if seconds > 0:
+ return seconds
+
+UNIT_SECOND = 5
+UNIT_MINUTE = 4
+UNIT_HOUR = 3
+UNIT_DAY = 2
+UNIT_WEEK = 1
+def to_pretty_time(total_seconds, minimum_unit=UNIT_SECOND, max_units=6):
+ minutes, seconds = divmod(total_seconds, 60)
+ hours, minutes = divmod(minutes, 60)
+ days, hours = divmod(hours, 24)
+ weeks, days = divmod(days, 7)
+ out = ""
+
+ units = 0
+ if weeks and minimum_unit >= UNIT_WEEK and units < max_units:
+ out += "%dw" % weeks
+ units += 1
+ if days and minimum_unit >= UNIT_DAY and units < max_units:
+ out += "%dd" % days
+ units += 1
+ if hours and minimum_unit >= UNIT_HOUR and units < max_units:
+ out += "%dh" % hours
+ units += 1
+ if minutes and minimum_unit >= UNIT_MINUTE and units < max_units:
+ out += "%dm" % minutes
+ units += 1
+ if seconds and minimum_unit >= UNIT_SECOND and units < max_units:
+ out += "%ds" % seconds
+ units += 1
+ return out
+
+IS_TRUE = ["true", "yes", "on", "y"]
+IS_FALSE = ["false", "no", "off", "n"]
+def bool_or_none(s):
+ s = s.lower()
+ if s in IS_TRUE:
+ return True
+ elif s in IS_FALSE:
+ return False
+def int_or_none(s):
+ stripped_s = s.lstrip("0")
+ if stripped_s.isdigit():
+ return int(stripped_s)
+
+def get_closest_setting(event, setting, default=None):
+ server = event["server"]
+ if "channel" in event:
+ closest = event["channel"]
+ elif "target" in event and "is_channel" in event and event["is_channel"]:
+ closest = event["target"]
+ else:
+ closest = event["user"]
+ return closest.get_setting(setting, server.get_setting(setting, default))
+
+def prevent_highlight(nickname):
+ return nickname[0]+"\u200c"+nickname[1:]
+
+def _set_get_append(obj, setting, item):
+ if not hasattr(obj, setting):
+ setattr(obj, setting, [])
+ getattr(obj, setting).append(item)
+def hook(event, **kwargs):
+ def _hook_func(func):
+ _set_get_append(func, ModuleManager.BITBOT_HOOKS_MAGIC,
+ {"event": event, "kwargs": kwargs})
+ return func
+ return _hook_func
+def export(setting, value):
+ def _export_func(module):
+ _set_get_append(module, ModuleManager.BITBOT_EXPORTS_MAGIC,
+ {"setting": setting, "value": value})
+ return module
+ return _export_func
+
+def get_hashflags(filename):
+ hashflags = {}
+ with io.open(filename, mode="r", encoding="utf8") as f:
+ for line in f:
+ line = line.strip("\n")
+ if not line.startswith("#"):
+ break
+ elif line.startswith("#--"):
+ line_split = line.split(" ", 1)
+ hashflag = line_split[0][3:]
+ value = None
+
+ if len(line_split) > 1:
+ value = line_split[1]
+ hashflags[hashflag] = value
+ return hashflags.items()
+
+class Docstring(object):
+ def __init__(self, description, items):
+ self.description = description
+ self.items = items
+
+def parse_docstring(s):
+ description = ""
+ last_item = None
+ items = {}
+ if s:
+ for line in s.split("\n"):
+ line = line.strip()
+
+ if line:
+ if line[0] == ":":
+ key, _, value = line.partition(": ")
+ last_item = value
+ items[value] = value
+ else:
+ if last_item:
+ items[last_item] += " %s" % line
+ else:
+ if description:
+ description += " "
+ description += line
+ return Docstring(description, items)
diff --git a/src/utils/http.py b/src/utils/http.py
new file mode 100644
index 00000000..e9cb41f9
--- /dev/null
+++ b/src/utils/http.py
@@ -0,0 +1,94 @@
+import re, traceback, urllib.error, urllib.parse, urllib.request
+import json, ssl
+import bs4
+
+USER_AGENT = ("Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 "
+ "(KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36")
+REGEX_HTTP = re.compile("https?://", re.I)
+
+def get_url(url, **kwargs):
+ if not urllib.parse.urlparse(url).scheme:
+ url = "http://%s" % url
+ url_parsed = urllib.parse.urlparse(url)
+
+ method = kwargs.get("method", "GET")
+ get_params = kwargs.get("get_params", "")
+ post_params = kwargs.get("post_params", None)
+ headers = kwargs.get("headers", {})
+ if get_params:
+ get_params = "?%s" % urllib.parse.urlencode(get_params)
+ if post_params:
+ post_params = urllib.parse.urlencode(post_params).encode("utf8")
+ url = "%s%s" % (url, get_params)
+ try:
+ url.encode("latin-1")
+ except UnicodeEncodeError:
+ if kwargs.get("code"):
+ return 0, False
+ return False
+
+ request = urllib.request.Request(url, post_params)
+ request.add_header("Accept-Language", "en-US")
+ request.add_header("User-Agent", USER_AGENT)
+ for header, value in headers.items():
+ request.add_header(header, value)
+ request.method = method
+
+ try:
+ response = urllib.request.urlopen(request, timeout=5)
+ except urllib.error.HTTPError as e:
+ traceback.print_exc()
+ if kwargs.get("code"):
+ return e.code, False
+ return False
+ except urllib.error.URLError as e:
+ traceback.print_exc()
+ if kwargs.get("code"):
+ return -1, False
+ return False
+ except ssl.CertificateError as e:
+ traceback.print_exc()
+ if kwargs.get("code"):
+ return -1, False,
+ return False
+
+ response_content = response.read()
+ encoding = response.info().get_content_charset()
+ if kwargs.get("soup"):
+ return bs4.BeautifulSoup(response_content, kwargs.get("parser", "lxml"))
+ if not encoding:
+ soup = bs4.BeautifulSoup(response_content, kwargs.get("parser", "lxml"))
+ metas = soup.find_all("meta")
+ for meta in metas:
+ if "charset=" in meta.get("content", ""):
+ encoding = meta.get("content").split("charset=", 1)[1
+ ].split(";", 1)[0]
+ elif meta.get("charset", ""):
+ encoding = meta.get("charset")
+ else:
+ continue
+ break
+ if not encoding:
+ for item in soup.contents:
+ if isinstance(item, bs4.Doctype):
+ if item == "html":
+ encoding = "utf8"
+ else:
+ encoding = "latin-1"
+ break
+ response_content = response_content.decode(encoding or "utf8")
+ data = response_content
+ if kwargs.get("json") and data:
+ try:
+ data = json.loads(response_content)
+ except json.decoder.JSONDecodeError:
+ traceback.print_exc()
+ return False
+ if kwargs.get("code"):
+ return response.code, data
+ else:
+ return data
+
+def strip_html(s):
+ return bs4.BeautifulSoup(s, "lxml").get_text()
+
diff --git a/src/utils/irc.py b/src/utils/irc.py
new file mode 100644
index 00000000..792de7f3
--- /dev/null
+++ b/src/utils/irc.py
@@ -0,0 +1,116 @@
+import string, re
+
+ASCII_UPPER = string.ascii_uppercase
+ASCII_LOWER = string.ascii_lowercase
+STRICT_RFC1459_UPPER = ASCII_UPPER+r'\[]'
+STRICT_RFC1459_LOWER = ASCII_LOWER+r'|{}'
+RFC1459_UPPER = STRICT_RFC1459_UPPER+"^"
+RFC1459_LOWER = STRICT_RFC1459_LOWER+"~"
+
+def remove_colon(s):
+ if s.startswith(":"):
+ s = s[1:]
+ return s
+
+# case mapping lowercase/uppcase logic
+def _multi_replace(s, chars1, chars2):
+ for char1, char2 in zip(chars1, chars2):
+ s = s.replace(char1, char2)
+ return s
+def lower(server, s):
+ if server.case_mapping == "ascii":
+ return _multi_replace(s, ASCII_UPPER, ASCII_LOWER)
+ elif server.case_mapping == "rfc1459":
+ return _multi_replace(s, RFC1459_UPPER, RFC1459_LOWER)
+ elif server.case_mapping == "strict-rfc1459":
+ return _multi_replace(s, STRICT_RFC1459_UPPER, STRICT_RFC1459_LOWER)
+ else:
+ raise ValueError("unknown casemapping '%s'" % server.case_mapping)
+
+# compare a string while respecting case mapping
+def equals(server, s1, s2):
+ return lower(server, s1) == lower(server, s2)
+
+class IRCHostmask(object):
+ def __init__(self, nickname, username, hostname, hostmask):
+ self.nickname = nickname
+ self.username = username
+ self.hostname = hostname
+ self.hostmask = hostmask
+ def __repr__(self):
+ return "IRCHostmask(%s)" % self.__str__()
+ def __str__(self):
+ return self.hostmask
+
+def seperate_hostmask(hostmask):
+ hostmask = remove_colon(hostmask)
+ nickname, _, username = hostmask.partition("!")
+ username, _, hostname = username.partition("@")
+ return IRCHostmask(nickname, username, hostname, hostmask)
+
+
+class IRCLine(object):
+ def __init__(self, tags, prefix, command, args, arbitrary, last, server):
+ self.tags = tags
+ self.prefix = prefix
+ self.command = command
+ self.args = args
+ self.arbitrary = arbitrary
+ self.last = last
+ self.server = server
+
+def parse_line(server, line):
+ tags = {}
+ prefix = None
+ command = None
+
+ if line[0] == "@":
+ tags_prefix, line = line[1:].split(" ", 1)
+ for tag in filter(None, tags_prefix.split(";")):
+ tag, _, value = tag.partition("=")
+ tags[tag] = value
+
+ line, _, arbitrary = line.partition(" :")
+ arbitrary = arbitrary or None
+
+ if line[0] == ":":
+ prefix, line = line[1:].split(" ", 1)
+ prefix = seperate_hostmask(prefix)
+ command, _, line = line.partition(" ")
+
+ args = line.split(" ")
+ last = arbitrary or args[-1]
+
+ return IRCLine(tags, prefix, command, args, arbitrary, last, server)
+
+COLOR_WHITE, COLOR_BLACK, COLOR_BLUE, COLOR_GREEN = 0, 1, 2, 3
+COLOR_RED, COLOR_BROWN, COLOR_PURPLE, COLOR_ORANGE = 4, 5, 6, 7
+COLOR_YELLOW, COLOR_LIGHTGREEN, COLOR_CYAN, COLOR_LIGHTCYAN = (8, 9,
+ 10, 11)
+COLOR_LIGHTBLUE, COLOR_PINK, COLOR_GREY, COLOR_LIGHTGREY = (12, 13,
+ 14, 15)
+FONT_BOLD, FONT_ITALIC, FONT_UNDERLINE, FONT_INVERT = ("\x02", "\x1D",
+ "\x1F", "\x16")
+FONT_COLOR, FONT_RESET = "\x03", "\x0F"
+REGEX_COLOR = re.compile("%s\d\d(?:,\d\d)?" % FONT_COLOR)
+
+def color(s, foreground, background=None):
+ foreground = str(foreground).zfill(2)
+ if background:
+ background = str(background).zfill(2)
+ return "%s%s%s%s%s" % (FONT_COLOR, foreground,
+ "" if not background else ",%s" % background, s, FONT_COLOR)
+
+def bold(s):
+ return "%s%s%s" % (FONT_BOLD, s, FONT_BOLD)
+
+def underline(s):
+ return "%s%s%s" % (FONT_UNDERLINE, s, FONT_UNDERLINE)
+
+def strip_font(s):
+ s = s.replace(FONT_BOLD, "")
+ s = s.replace(FONT_ITALIC, "")
+ s = REGEX_COLOR.sub("", s)
+ s = s.replace(FONT_COLOR, "")
+ return s
+