diff options
Diffstat (limited to 'src/utils')
| -rw-r--r-- | src/utils/datetime.py | 4 | ||||
| -rw-r--r-- | src/utils/http.py | 136 | ||||
| -rw-r--r-- | src/utils/irc.py | 24 | ||||
| -rw-r--r-- | src/utils/parse.py | 11 |
4 files changed, 83 insertions, 92 deletions
diff --git a/src/utils/datetime.py b/src/utils/datetime.py index 0fac2bb3..3ed03088 100644 --- a/src/utils/datetime.py +++ b/src/utils/datetime.py @@ -10,7 +10,7 @@ ISO8601_FORMAT_TZ = "%z" DATETIME_HUMAN = "%Y/%m/%d %H:%M:%S" DATE_HUMAN = "%Y-%m-%d" -def datetime_utcnow() -> _datetime.datetime: +def utcnow() -> _datetime.datetime: return _datetime.datetime.utcnow().replace(tzinfo=_datetime.timezone.utc) def datetime_timestamp(seconds: float) -> _datetime.datetime: return _datetime.datetime.fromtimestamp(seconds).replace( @@ -26,7 +26,7 @@ def iso8601_format(dt: _datetime.datetime, milliseconds: bool=False) -> str: return "%s%s%s" % (dt_format, ms_format, tz_format) def iso8601_format_now(milliseconds: bool=False) -> str: - return iso8601_format(datetime_utcnow(), milliseconds=milliseconds) + return iso8601_format(utcnow(), milliseconds=milliseconds) def iso8601_parse(s: str, microseconds: bool=False) -> _datetime.datetime: fmt = ISO8601_PARSE_MICROSECONDS if microseconds else ISO8601_PARSE return _datetime.datetime.strptime(s, fmt) diff --git a/src/utils/http.py b/src/utils/http.py index f31da62c..699c48f1 100644 --- a/src/utils/http.py +++ b/src/utils/http.py @@ -1,9 +1,8 @@ -import asyncio, codecs, ipaddress, re, signal, socket, traceback, typing -import urllib.error, urllib.parse, uuid +import asyncio, codecs, dataclasses, ipaddress, re, signal, socket, traceback +import typing, urllib.error, urllib.parse, uuid import json as _json -import bs4, netifaces, requests -import tornado.httpclient -from src import utils +import bs4, netifaces, requests, tornado.httpclient +from src import IRCBot, utils REGEX_URL = re.compile("https?://\S+", re.I) @@ -29,8 +28,8 @@ def url_sanitise(url: str): url = url[:-1] return url -DEFAULT_USERAGENT = ("Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 " - "(KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36") +USERAGENT = "Mozilla/5.0 (compatible; BitBot/%s; +%s" % ( + IRCBot.VERSION, IRCBot.URL) RESPONSE_MAX = (1024*1024)*100 SOUP_CONTENT_TYPES = ["text/html", "text/xml", "application/xml"] @@ -54,46 +53,33 @@ class HTTPWrongContentTypeException(HTTPException): def throw_timeout(): raise HTTPTimeoutException() +@dataclasses.dataclass class Request(object): - def __init__(self, url: str, - get_params: typing.Dict[str, str]={}, post_data: typing.Any=None, - headers: typing.Dict[str, str]={}, + url: str + id: typing.Optional[str] = None + method: str = "GET" - json: bool=False, json_body: bool=False, allow_redirects: bool=True, - check_content_type: bool=True, parse: bool=False, - detect_encoding: bool=True, + get_params: typing.Dict[str, str] = dataclasses.field( + default_factory=dict) + post_data: typing.Any = None + headers: typing.Dict[str, str] = dataclasses.field( + default_factory=dict) + cookies: typing.Dict[str, str] = dataclasses.field( + default_factory=dict) - method: str="GET", parser: str="lxml", id: str=None, - fallback_encoding: str=None, content_type: str=None, - proxy: str=None, useragent: str=None, + json_body: bool = False - **kwargs): - self.id = id or str(uuid.uuid4()) + allow_redirects: bool = True + check_content_type: bool = True + fallback_encoding: typing.Optional[str] = None + content_type: typing.Optional[str] = None + proxy: typing.Optional[str] = None + useragent: typing.Optional[str] = None - self.set_url(url) - self.method = method.upper() - self.get_params = get_params - self.post_data = post_data - self.headers = headers - - self.json = json - self.json_body = json_body - self.allow_redirects = allow_redirects - self.check_content_type = check_content_type - self.parse = parse - self.detect_encoding = detect_encoding - - self.parser = parser - self.fallback_encoding = fallback_encoding - self.content_type = content_type - self.proxy = proxy - self.useragent = useragent - - if kwargs: - if method == "POST": - self.post_data = kwargs - else: - self.get_params.update(kwargs) + def validate(self): + self.id = self.id or str(uuid.uuid4()) + self.set_url(self.url) + self.method = self.method.upper() def set_url(self, url: str): parts = urllib.parse.urlparse(url) @@ -113,7 +99,7 @@ class Request(object): if not "Accept-Language" in headers: headers["Accept-Language"] = "en-GB" if not "User-Agent" in headers: - headers["User-Agent"] = self.useragent or DEFAULT_USERAGENT + headers["User-Agent"] = self.useragent or USERAGENT if not "Content-Type" in headers and self.content_type: headers["Content-Type"] = self.content_type return headers @@ -128,13 +114,20 @@ class Request(object): return None class Response(object): - def __init__(self, code: int, data: typing.Any, - headers: typing.Dict[str, str], encoding: str): + def __init__(self, code: int, data: bytes, encoding: str, + headers: typing.Dict[str, str], cookies: typing.Dict[str, str]): self.code = code self.data = data - self.headers = headers self.content_type = headers.get("Content-Type", "").split(";", 1)[0] self.encoding = encoding + self.headers = headers + self.cookies = cookies + def decode(self, encoding="utf8") -> str: + return self.data.decode(encoding) + def json(self) -> typing.Any: + return _json.loads(self.data) + def soup(self, parser: str="lxml") -> bs4.BeautifulSoup: + return bs4.BeautifulSoup(self.decode(), parser) def _meta_content(s: str) -> typing.Dict[str, str]: out = {} @@ -143,7 +136,8 @@ def _meta_content(s: str) -> typing.Dict[str, str]: out[key] = value return out -def _find_encoding(soup: bs4.BeautifulSoup) -> typing.Optional[str]: +def _find_encoding(data: bytes) -> typing.Optional[str]: + soup = bs4.BeautifulSoup(data, "lxml") if not soup.meta == None: meta_charset = soup.meta.get("charset") if not meta_charset == None: @@ -167,7 +161,7 @@ def request(request_obj: typing.Union[str, Request], **kwargs) -> Response: return _request(request_obj) def _request(request_obj: Request) -> Response: - + request_obj.validate() def _wrap() -> Response: headers = request_obj.get_headers() response = requests.request( @@ -177,7 +171,8 @@ def _request(request_obj: Request) -> Response: params=request_obj.get_params, data=request_obj.get_body(), allow_redirects=request_obj.allow_redirects, - stream=True + stream=True, + cookies=request_obj.cookies ) response_content = response.raw.read(RESPONSE_MAX, decode_content=True) @@ -186,7 +181,8 @@ def _request(request_obj: Request) -> Response: headers = utils.CaseInsensitiveDict(dict(response.headers)) our_response = Response(response.status_code, response_content, - headers=headers, encoding=response.encoding) + encoding=response.encoding, headers=headers, + cookies=response.cookies.get_dict()) return our_response try: @@ -202,39 +198,12 @@ def _request(request_obj: Request) -> Response: else: encoding = "iso-8859-1" - if (request_obj.detect_encoding and - response.content_type and + if (response.content_type and response.content_type in SOUP_CONTENT_TYPES): - souped = bs4.BeautifulSoup(response.data, request_obj.parser) - encoding = _find_encoding(souped) or encoding - - def _decode_data(): - return response.data.decode(encoding) - - if request_obj.parse: - if (not request_obj.check_content_type or - response.content_type in SOUP_CONTENT_TYPES): - souped = bs4.BeautifulSoup(_decode_data(), request_obj.parser) - response.data = souped - return response - else: - raise HTTPWrongContentTypeException( - "Tried to soup non-html/non-xml data (%s)" % - response.content_type) + encoding = _find_encoding(response.data) or encoding + response.encoding = encoding - if request_obj.json and response.data: - data = _decode_data() - try: - response.data = _json.loads(data) - return response - except _json.decoder.JSONDecodeError as e: - raise HTTPParsingException(str(e), data) - - if response.content_type in DECODE_CONTENT_TYPES: - response.data = _decode_data() - return response - else: - return response + return response class RequestManyException(Exception): pass @@ -242,6 +211,7 @@ def request_many(requests: typing.List[Request]) -> typing.Dict[str, Response]: responses = {} async def _request(request): + request.validate() client = tornado.httpclient.AsyncHTTPClient() url = request.url if request.get_params: @@ -263,8 +233,8 @@ def request_many(requests: typing.List[Request]) -> typing.Dict[str, Response]: "request_many failed for %s" % url) headers = utils.CaseInsensitiveDict(dict(response.headers)) - data = response.body.decode("utf8") - responses[request.id] = Response(response.code, data, headers, "utf8") + responses[request.id] = Response(response.code, response.body, "utf8", + headers, {}) loop = asyncio.new_event_loop() awaits = [] diff --git a/src/utils/irc.py b/src/utils/irc.py index 30a3126e..cdaa61eb 100644 --- a/src/utils/irc.py +++ b/src/utils/irc.py @@ -38,19 +38,29 @@ def color(s: str, foreground: consts.IRCColor, if background: background_s = ",%s" % str(background.irc).zfill(2) - return "%s%s%s%s%s" % (consts.COLOR, foreground_s, background_s, s, - consts.COLOR) + return f"{consts.COLOR}{foreground_s}{background_s}{s}{consts.COLOR}" -HASH_COLORS = list(range(2, 16)) +HASH_STOP = ["_", "|", "["] +HASH_COLORS = [consts.CYAN, consts.PURPLE, consts.GREEN, consts.ORANGE, + consts.LIGHTBLUE, consts.TRANSPARENT, consts.LIGHTCYAN, consts.PINK, + consts.LIGHTGREEN, consts.BLUE] def hash_colorize(s: str): - hash_code = sum(ord(c) for c in s.lower())%len(HASH_COLORS) - return color(s, consts.COLOR_CODES[HASH_COLORS[hash_code]]) + hash = 5381 + non_stop = False + for i, char in enumerate(s): + if not char in HASH_STOP: + non_stop = True + elif non_stop: + break + hash ^= ((hash<<5)+(hash>>2)+ord(char))&0xFFFFFFFFFFFFFFFF + + return color(s, HASH_COLORS[hash%len(HASH_COLORS)]) def bold(s: str) -> str: - return "%s%s%s" % (consts.BOLD, s, consts.BOLD) + return f"{consts.BOLD}{s}{consts.BOLD}" def underline(s: str) -> str: - return "%s%s%s" % (consts.UNDERLINE, s, consts.UNDERLINE) + return f"{consts.UNDERLINE}{s}{consts.UNDERLINE}" def strip_font(s: str) -> str: s = s.replace(consts.BOLD, "") diff --git a/src/utils/parse.py b/src/utils/parse.py index d5018441..ce2ee793 100644 --- a/src/utils/parse.py +++ b/src/utils/parse.py @@ -1,4 +1,5 @@ import decimal, io, typing +from . import datetime, errors COMMENT_TYPES = ["#", "//"] def hashflags(filename: str @@ -109,3 +110,13 @@ def parse_number(s: str) -> str: raise ValueError("Unknown unit '%s' given to parse_number" % unit) return str(number) +def timed_args(args, min_args): + if args and args[0][0] == "+": + if len(args[1:]) < min_args: + raise errors.EventError("Not enough arguments") + time = datetime.from_pretty_time(args[0][1:]) + if time == None: + raise errors.EventError("Invalid timeframe") + return time, args[1:] + return None, args + |
