diff options
Diffstat (limited to 'src/utils')
| -rw-r--r-- | src/utils/datetime/format.py | 2 | ||||
| -rw-r--r-- | src/utils/http.py | 69 | ||||
| -rw-r--r-- | src/utils/irc.py | 60 | ||||
| -rw-r--r-- | src/utils/parse/__init__.py | 7 | ||||
| -rw-r--r-- | src/utils/parse/sed.py | 4 | ||||
| -rw-r--r-- | src/utils/parse/spec.py | 18 | ||||
| -rw-r--r-- | src/utils/settings.py | 8 |
7 files changed, 105 insertions, 63 deletions
diff --git a/src/utils/datetime/format.py b/src/utils/datetime/format.py index 889f5906..4b56e7b3 100644 --- a/src/utils/datetime/format.py +++ b/src/utils/datetime/format.py @@ -89,7 +89,7 @@ def to_pretty_time(total_seconds: int, max_units: int=UNIT_MINIMUM, if hours and len(out) < max_units: out.append("%dh" % hours) if minutes and len(out) < max_units: - out.append("%dmi" % minutes) + out.append("%dm" % minutes) if seconds and len(out) < max_units: out.append("%ds" % seconds) diff --git a/src/utils/http.py b/src/utils/http.py index 9f25b315..045d9641 100644 --- a/src/utils/http.py +++ b/src/utils/http.py @@ -3,6 +3,7 @@ import typing, urllib.error, urllib.parse, uuid import json as _json import bs4, netifaces, requests, tornado.httpclient from src import IRCBot, utils +from requests_toolbelt.adapters import source REGEX_URL = re.compile("https?://\S+", re.I) @@ -69,6 +70,7 @@ class Request(object): json_body: bool = False allow_redirects: bool = True + check_hostname: bool = False check_content_type: bool = True fallback_encoding: typing.Optional[str] = None content_type: typing.Optional[str] = None @@ -77,6 +79,8 @@ class Request(object): timeout: int=5 + bindhost: typing.Optional[str] = None + def validate(self): self.id = self.id or str(uuid.uuid4()) self.set_url(self.url) @@ -169,24 +173,61 @@ def request(request_obj: typing.Union[str, Request], **kwargs) -> Response: request_obj = Request(request_obj, **kwargs) return _request(request_obj) +class HostNameInvalidError(ValueError): + pass +class TooManyRedirectionsError(Exception): + pass + def _request(request_obj: Request) -> Response: request_obj.validate() + + def _assert_allowed(url: str): + hostname = urllib.parse.urlparse(url).hostname + if hostname is None or not host_permitted(hostname): + raise HostNameInvalidError( + f"hostname {hostname} is not permitted") + def _wrap() -> Response: headers = request_obj.get_headers() - response = requests.request( - request_obj.method, - request_obj.url, - headers=headers, - params=request_obj.get_params, - data=request_obj.get_body(), - allow_redirects=request_obj.allow_redirects, - stream=True, - cookies=request_obj.cookies - ) - response_content = response.raw.read(RESPONSE_MAX, - decode_content=True) - if not response.raw.read(1) == b"": - raise ValueError("Response too large") + + redirect = 0 + current_url = request_obj.url + session = requests.Session() + if not request_obj.bindhost is None: + new_source = source.SourceAddressAdapter(request_obj.bindhost) + session.mount('http://', new_source) + session.mount('https://', new_source) + + while True: + if request_obj.check_hostname: + _assert_allowed(current_url) + + response = session.request( + request_obj.method, + current_url, + headers=headers, + params=request_obj.get_params, + data=request_obj.get_body(), + allow_redirects=False, + stream=True, + cookies=request_obj.cookies + ) + + if response.status_code in [301, 302]: + redirect += 1 + if redirect == 5: + raise TooManyRedirectionsError(f"{redirect} redirects") + else: + current_url = response.headers["location"] + continue + + response_content = response.raw.read(RESPONSE_MAX, + decode_content=True) + if not response.raw.read(1) == b"": + raise ValueError("Response too large") + break + + session.close() headers = utils.CaseInsensitiveDict(dict(response.headers)) our_response = Response(response.status_code, response_content, diff --git a/src/utils/irc.py b/src/utils/irc.py index 400d5352..f93f61ed 100644 --- a/src/utils/irc.py +++ b/src/utils/irc.py @@ -75,48 +75,30 @@ FORMAT_TOKENS = [ FORMAT_STRIP = [ "\x08" # backspace ] -def _format_tokens(s: str) -> typing.List[str]: - is_color = False - foreground: typing.List[str] = [] - background: typing.List[str] = [] - is_background = False - matches = [] # type: typing.List[str] - - for i, char in enumerate(s): - last_char = i == len(s)-1 - if is_color: - current_color = background if is_background else foreground - color_finished = True - - if char == "," and not is_background: - is_background = True - color_finished = False - elif char.isdigit() and len(current_color) < 2: - current_color.append(char) - color_finished = len(current_color) == 2 and is_background - - if color_finished or last_char: - color = "".join(foreground) - if background: - color += "".join([","]+background) +def _format_tokens(s: str) -> typing.List[str]: + tokens: typing.List[str] = [] - matches.append("\x03%s" % color) - is_color = False - foreground = [] - background = [] - is_background = False + s_copy = list(s) + while s_copy: + token = s_copy.pop(0) + if token == "\x03": + for i in range(2): + if s_copy and s_copy[0].isdigit(): + token += s_copy.pop(0) + if (len(s_copy) > 1 and + s_copy[0] == "," and + s_copy[1].isdigit()): + token += s_copy.pop(0) + token += s_copy.pop(0) + if s_copy and s_copy[0].isdigit(): + token += s_copy.pop(0) - if char == consts.COLOR: - if is_color: - matches.append(char) - else: - is_color = True - elif char in FORMAT_TOKENS: - matches.append(char) - elif char in FORMAT_STRIP: - matches.append(char) - return matches + tokens.append(token) + elif (token in FORMAT_TOKENS or + token in FORMAT_STRIP): + tokens.append(token) + return tokens def _color_match(code: typing.Optional[str], foreground: bool) -> str: if not code: diff --git a/src/utils/parse/__init__.py b/src/utils/parse/__init__.py index 262edf4a..36da8ffb 100644 --- a/src/utils/parse/__init__.py +++ b/src/utils/parse/__init__.py @@ -131,7 +131,7 @@ def format_tokens(s: str, sigil: str="$" return tokens def format_token_replace(s: str, vars: typing.Dict[str, str], - sigil: str="$") -> str: + sigil: str="$") -> typing.Tuple[typing.List[str], str]: vars = vars.copy() vars.update({sigil: sigil}) @@ -140,7 +140,10 @@ def format_token_replace(s: str, vars: typing.Dict[str, str], tokens.sort(key=lambda x: x[0]) tokens.reverse() + not_found: typing.List[str] = [] for start, end, token in tokens: if token in vars: s = s[:start] + vars[token] + s[end+1:] - return s + else: + not_found += token + return not_found, s diff --git a/src/utils/parse/sed.py b/src/utils/parse/sed.py index 76b9e567..8a1c895d 100644 --- a/src/utils/parse/sed.py +++ b/src/utils/parse/sed.py @@ -76,6 +76,6 @@ def parse(sed_s: str) -> typing.Optional[Sed]: return SedMatch(type, re.compile(pattern, flags)) return None -def sed(sed_obj: Sed, s: str) -> typing.Tuple[str, typing.Optional[str]]: +def sed(sed_obj: Sed, s: str) -> typing.Optional[str]: out = sed_obj.match(s) - return sed_obj.type, out + return out diff --git a/src/utils/parse/spec.py b/src/utils/parse/spec.py index 7d7ffd3a..2a682c3a 100644 --- a/src/utils/parse/spec.py +++ b/src/utils/parse/spec.py @@ -15,6 +15,7 @@ class SpecArgumentContext(enum.IntFlag): class SpecArgumentType(object): context = SpecArgumentContext.ALL + _modifier: typing.Optional[str] def __init__(self, type_name: str, name: typing.Optional[str], modifier: typing.Optional[str], exported: typing.Optional[str]): @@ -24,7 +25,7 @@ class SpecArgumentType(object): self.exported = exported def _set_modifier(self, modifier: typing.Optional[str]): - pass + self._modifier = modifier def name(self) -> typing.Optional[str]: return self._name @@ -102,6 +103,18 @@ class SpecArgumentTypeDate(SpecArgumentType): return date_human(args[0]), 1 return None, 1 +class SpecArgumentTypeFlag(SpecArgumentType): + def _str(self): + pref = "-" if len(self._modifier) == 1 else "--" + return f"{pref}{self._modifier}" + def name(self): + return self._str() + def simple(self, args): + print("flag _str()", self._str()) + if args and args[0] == self._str(): + return True, 1 + return None, 1 + class SpecArgumentPrivateType(SpecArgumentType): context = SpecArgumentContext.PRIVATE @@ -115,7 +128,8 @@ SPEC_ARGUMENT_TYPES = { "int": SpecArgumentTypeInt, "date": SpecArgumentTypeDate, "duration": SpecArgumentTypeDuration, - "pattern": SpecArgumentTypePattern + "pattern": SpecArgumentTypePattern, + "flag": SpecArgumentTypeFlag } class SpecArgument(object): diff --git a/src/utils/settings.py b/src/utils/settings.py index 97fe885b..53ee0ebd 100644 --- a/src/utils/settings.py +++ b/src/utils/settings.py @@ -49,15 +49,17 @@ class IntSetting(Setting): class IntRangeSetting(IntSetting): example: typing.Optional[str] = None - def __init__(self, n_min: int, n_max: int, name: str, help: str=None, - example: str=None): + def __init__(self, n_min: int, n_max: typing.Optional[int], name: str, + help: str=None, example: str=None): self._n_min = n_min self._n_max = n_max Setting.__init__(self, name, help, example) def parse(self, value: str) -> typing.Any: out = IntSetting.parse(self, value) - if not out == None and self._n_min <= out <= self._n_max: + if (not out == None and + self._n_min <= out and + (self._n_max == None or out <= self._n_max)): return out return None |
