aboutsummaryrefslogtreecommitdiff
path: root/src/utils
diff options
context:
space:
mode:
Diffstat (limited to 'src/utils')
-rw-r--r--src/utils/datetime.py4
-rw-r--r--src/utils/http.py136
-rw-r--r--src/utils/irc.py24
-rw-r--r--src/utils/parse.py11
4 files changed, 83 insertions, 92 deletions
diff --git a/src/utils/datetime.py b/src/utils/datetime.py
index 0fac2bb3..3ed03088 100644
--- a/src/utils/datetime.py
+++ b/src/utils/datetime.py
@@ -10,7 +10,7 @@ ISO8601_FORMAT_TZ = "%z"
DATETIME_HUMAN = "%Y/%m/%d %H:%M:%S"
DATE_HUMAN = "%Y-%m-%d"
-def datetime_utcnow() -> _datetime.datetime:
+def utcnow() -> _datetime.datetime:
return _datetime.datetime.utcnow().replace(tzinfo=_datetime.timezone.utc)
def datetime_timestamp(seconds: float) -> _datetime.datetime:
return _datetime.datetime.fromtimestamp(seconds).replace(
@@ -26,7 +26,7 @@ def iso8601_format(dt: _datetime.datetime, milliseconds: bool=False) -> str:
return "%s%s%s" % (dt_format, ms_format, tz_format)
def iso8601_format_now(milliseconds: bool=False) -> str:
- return iso8601_format(datetime_utcnow(), milliseconds=milliseconds)
+ return iso8601_format(utcnow(), milliseconds=milliseconds)
def iso8601_parse(s: str, microseconds: bool=False) -> _datetime.datetime:
fmt = ISO8601_PARSE_MICROSECONDS if microseconds else ISO8601_PARSE
return _datetime.datetime.strptime(s, fmt)
diff --git a/src/utils/http.py b/src/utils/http.py
index f31da62c..699c48f1 100644
--- a/src/utils/http.py
+++ b/src/utils/http.py
@@ -1,9 +1,8 @@
-import asyncio, codecs, ipaddress, re, signal, socket, traceback, typing
-import urllib.error, urllib.parse, uuid
+import asyncio, codecs, dataclasses, ipaddress, re, signal, socket, traceback
+import typing, urllib.error, urllib.parse, uuid
import json as _json
-import bs4, netifaces, requests
-import tornado.httpclient
-from src import utils
+import bs4, netifaces, requests, tornado.httpclient
+from src import IRCBot, utils
REGEX_URL = re.compile("https?://\S+", re.I)
@@ -29,8 +28,8 @@ def url_sanitise(url: str):
url = url[:-1]
return url
-DEFAULT_USERAGENT = ("Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 "
- "(KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36")
+USERAGENT = "Mozilla/5.0 (compatible; BitBot/%s; +%s" % (
+ IRCBot.VERSION, IRCBot.URL)
RESPONSE_MAX = (1024*1024)*100
SOUP_CONTENT_TYPES = ["text/html", "text/xml", "application/xml"]
@@ -54,46 +53,33 @@ class HTTPWrongContentTypeException(HTTPException):
def throw_timeout():
raise HTTPTimeoutException()
+@dataclasses.dataclass
class Request(object):
- def __init__(self, url: str,
- get_params: typing.Dict[str, str]={}, post_data: typing.Any=None,
- headers: typing.Dict[str, str]={},
+ url: str
+ id: typing.Optional[str] = None
+ method: str = "GET"
- json: bool=False, json_body: bool=False, allow_redirects: bool=True,
- check_content_type: bool=True, parse: bool=False,
- detect_encoding: bool=True,
+ get_params: typing.Dict[str, str] = dataclasses.field(
+ default_factory=dict)
+ post_data: typing.Any = None
+ headers: typing.Dict[str, str] = dataclasses.field(
+ default_factory=dict)
+ cookies: typing.Dict[str, str] = dataclasses.field(
+ default_factory=dict)
- method: str="GET", parser: str="lxml", id: str=None,
- fallback_encoding: str=None, content_type: str=None,
- proxy: str=None, useragent: str=None,
+ json_body: bool = False
- **kwargs):
- self.id = id or str(uuid.uuid4())
+ allow_redirects: bool = True
+ check_content_type: bool = True
+ fallback_encoding: typing.Optional[str] = None
+ content_type: typing.Optional[str] = None
+ proxy: typing.Optional[str] = None
+ useragent: typing.Optional[str] = None
- self.set_url(url)
- self.method = method.upper()
- self.get_params = get_params
- self.post_data = post_data
- self.headers = headers
-
- self.json = json
- self.json_body = json_body
- self.allow_redirects = allow_redirects
- self.check_content_type = check_content_type
- self.parse = parse
- self.detect_encoding = detect_encoding
-
- self.parser = parser
- self.fallback_encoding = fallback_encoding
- self.content_type = content_type
- self.proxy = proxy
- self.useragent = useragent
-
- if kwargs:
- if method == "POST":
- self.post_data = kwargs
- else:
- self.get_params.update(kwargs)
+ def validate(self):
+ self.id = self.id or str(uuid.uuid4())
+ self.set_url(self.url)
+ self.method = self.method.upper()
def set_url(self, url: str):
parts = urllib.parse.urlparse(url)
@@ -113,7 +99,7 @@ class Request(object):
if not "Accept-Language" in headers:
headers["Accept-Language"] = "en-GB"
if not "User-Agent" in headers:
- headers["User-Agent"] = self.useragent or DEFAULT_USERAGENT
+ headers["User-Agent"] = self.useragent or USERAGENT
if not "Content-Type" in headers and self.content_type:
headers["Content-Type"] = self.content_type
return headers
@@ -128,13 +114,20 @@ class Request(object):
return None
class Response(object):
- def __init__(self, code: int, data: typing.Any,
- headers: typing.Dict[str, str], encoding: str):
+ def __init__(self, code: int, data: bytes, encoding: str,
+ headers: typing.Dict[str, str], cookies: typing.Dict[str, str]):
self.code = code
self.data = data
- self.headers = headers
self.content_type = headers.get("Content-Type", "").split(";", 1)[0]
self.encoding = encoding
+ self.headers = headers
+ self.cookies = cookies
+ def decode(self, encoding="utf8") -> str:
+ return self.data.decode(encoding)
+ def json(self) -> typing.Any:
+ return _json.loads(self.data)
+ def soup(self, parser: str="lxml") -> bs4.BeautifulSoup:
+ return bs4.BeautifulSoup(self.decode(), parser)
def _meta_content(s: str) -> typing.Dict[str, str]:
out = {}
@@ -143,7 +136,8 @@ def _meta_content(s: str) -> typing.Dict[str, str]:
out[key] = value
return out
-def _find_encoding(soup: bs4.BeautifulSoup) -> typing.Optional[str]:
+def _find_encoding(data: bytes) -> typing.Optional[str]:
+ soup = bs4.BeautifulSoup(data, "lxml")
if not soup.meta == None:
meta_charset = soup.meta.get("charset")
if not meta_charset == None:
@@ -167,7 +161,7 @@ def request(request_obj: typing.Union[str, Request], **kwargs) -> Response:
return _request(request_obj)
def _request(request_obj: Request) -> Response:
-
+ request_obj.validate()
def _wrap() -> Response:
headers = request_obj.get_headers()
response = requests.request(
@@ -177,7 +171,8 @@ def _request(request_obj: Request) -> Response:
params=request_obj.get_params,
data=request_obj.get_body(),
allow_redirects=request_obj.allow_redirects,
- stream=True
+ stream=True,
+ cookies=request_obj.cookies
)
response_content = response.raw.read(RESPONSE_MAX,
decode_content=True)
@@ -186,7 +181,8 @@ def _request(request_obj: Request) -> Response:
headers = utils.CaseInsensitiveDict(dict(response.headers))
our_response = Response(response.status_code, response_content,
- headers=headers, encoding=response.encoding)
+ encoding=response.encoding, headers=headers,
+ cookies=response.cookies.get_dict())
return our_response
try:
@@ -202,39 +198,12 @@ def _request(request_obj: Request) -> Response:
else:
encoding = "iso-8859-1"
- if (request_obj.detect_encoding and
- response.content_type and
+ if (response.content_type and
response.content_type in SOUP_CONTENT_TYPES):
- souped = bs4.BeautifulSoup(response.data, request_obj.parser)
- encoding = _find_encoding(souped) or encoding
-
- def _decode_data():
- return response.data.decode(encoding)
-
- if request_obj.parse:
- if (not request_obj.check_content_type or
- response.content_type in SOUP_CONTENT_TYPES):
- souped = bs4.BeautifulSoup(_decode_data(), request_obj.parser)
- response.data = souped
- return response
- else:
- raise HTTPWrongContentTypeException(
- "Tried to soup non-html/non-xml data (%s)" %
- response.content_type)
+ encoding = _find_encoding(response.data) or encoding
+ response.encoding = encoding
- if request_obj.json and response.data:
- data = _decode_data()
- try:
- response.data = _json.loads(data)
- return response
- except _json.decoder.JSONDecodeError as e:
- raise HTTPParsingException(str(e), data)
-
- if response.content_type in DECODE_CONTENT_TYPES:
- response.data = _decode_data()
- return response
- else:
- return response
+ return response
class RequestManyException(Exception):
pass
@@ -242,6 +211,7 @@ def request_many(requests: typing.List[Request]) -> typing.Dict[str, Response]:
responses = {}
async def _request(request):
+ request.validate()
client = tornado.httpclient.AsyncHTTPClient()
url = request.url
if request.get_params:
@@ -263,8 +233,8 @@ def request_many(requests: typing.List[Request]) -> typing.Dict[str, Response]:
"request_many failed for %s" % url)
headers = utils.CaseInsensitiveDict(dict(response.headers))
- data = response.body.decode("utf8")
- responses[request.id] = Response(response.code, data, headers, "utf8")
+ responses[request.id] = Response(response.code, response.body, "utf8",
+ headers, {})
loop = asyncio.new_event_loop()
awaits = []
diff --git a/src/utils/irc.py b/src/utils/irc.py
index 30a3126e..cdaa61eb 100644
--- a/src/utils/irc.py
+++ b/src/utils/irc.py
@@ -38,19 +38,29 @@ def color(s: str, foreground: consts.IRCColor,
if background:
background_s = ",%s" % str(background.irc).zfill(2)
- return "%s%s%s%s%s" % (consts.COLOR, foreground_s, background_s, s,
- consts.COLOR)
+ return f"{consts.COLOR}{foreground_s}{background_s}{s}{consts.COLOR}"
-HASH_COLORS = list(range(2, 16))
+HASH_STOP = ["_", "|", "["]
+HASH_COLORS = [consts.CYAN, consts.PURPLE, consts.GREEN, consts.ORANGE,
+ consts.LIGHTBLUE, consts.TRANSPARENT, consts.LIGHTCYAN, consts.PINK,
+ consts.LIGHTGREEN, consts.BLUE]
def hash_colorize(s: str):
- hash_code = sum(ord(c) for c in s.lower())%len(HASH_COLORS)
- return color(s, consts.COLOR_CODES[HASH_COLORS[hash_code]])
+ hash = 5381
+ non_stop = False
+ for i, char in enumerate(s):
+ if not char in HASH_STOP:
+ non_stop = True
+ elif non_stop:
+ break
+ hash ^= ((hash<<5)+(hash>>2)+ord(char))&0xFFFFFFFFFFFFFFFF
+
+ return color(s, HASH_COLORS[hash%len(HASH_COLORS)])
def bold(s: str) -> str:
- return "%s%s%s" % (consts.BOLD, s, consts.BOLD)
+ return f"{consts.BOLD}{s}{consts.BOLD}"
def underline(s: str) -> str:
- return "%s%s%s" % (consts.UNDERLINE, s, consts.UNDERLINE)
+ return f"{consts.UNDERLINE}{s}{consts.UNDERLINE}"
def strip_font(s: str) -> str:
s = s.replace(consts.BOLD, "")
diff --git a/src/utils/parse.py b/src/utils/parse.py
index d5018441..ce2ee793 100644
--- a/src/utils/parse.py
+++ b/src/utils/parse.py
@@ -1,4 +1,5 @@
import decimal, io, typing
+from . import datetime, errors
COMMENT_TYPES = ["#", "//"]
def hashflags(filename: str
@@ -109,3 +110,13 @@ def parse_number(s: str) -> str:
raise ValueError("Unknown unit '%s' given to parse_number" % unit)
return str(number)
+def timed_args(args, min_args):
+ if args and args[0][0] == "+":
+ if len(args[1:]) < min_args:
+ raise errors.EventError("Not enough arguments")
+ time = datetime.from_pretty_time(args[0][1:])
+ if time == None:
+ raise errors.EventError("Invalid timeframe")
+ return time, args[1:]
+ return None, args
+