aboutsummaryrefslogtreecommitdiff
path: root/src/utils
diff options
context:
space:
mode:
Diffstat (limited to 'src/utils')
-rw-r--r--src/utils/datetime/format.py2
-rw-r--r--src/utils/http.py69
-rw-r--r--src/utils/irc.py60
-rw-r--r--src/utils/parse/__init__.py7
-rw-r--r--src/utils/parse/sed.py4
-rw-r--r--src/utils/parse/spec.py18
-rw-r--r--src/utils/settings.py8
7 files changed, 105 insertions, 63 deletions
diff --git a/src/utils/datetime/format.py b/src/utils/datetime/format.py
index 889f5906..4b56e7b3 100644
--- a/src/utils/datetime/format.py
+++ b/src/utils/datetime/format.py
@@ -89,7 +89,7 @@ def to_pretty_time(total_seconds: int, max_units: int=UNIT_MINIMUM,
if hours and len(out) < max_units:
out.append("%dh" % hours)
if minutes and len(out) < max_units:
- out.append("%dmi" % minutes)
+ out.append("%dm" % minutes)
if seconds and len(out) < max_units:
out.append("%ds" % seconds)
diff --git a/src/utils/http.py b/src/utils/http.py
index 9f25b315..045d9641 100644
--- a/src/utils/http.py
+++ b/src/utils/http.py
@@ -3,6 +3,7 @@ import typing, urllib.error, urllib.parse, uuid
import json as _json
import bs4, netifaces, requests, tornado.httpclient
from src import IRCBot, utils
+from requests_toolbelt.adapters import source
REGEX_URL = re.compile("https?://\S+", re.I)
@@ -69,6 +70,7 @@ class Request(object):
json_body: bool = False
allow_redirects: bool = True
+ check_hostname: bool = False
check_content_type: bool = True
fallback_encoding: typing.Optional[str] = None
content_type: typing.Optional[str] = None
@@ -77,6 +79,8 @@ class Request(object):
timeout: int=5
+ bindhost: typing.Optional[str] = None
+
def validate(self):
self.id = self.id or str(uuid.uuid4())
self.set_url(self.url)
@@ -169,24 +173,61 @@ def request(request_obj: typing.Union[str, Request], **kwargs) -> Response:
request_obj = Request(request_obj, **kwargs)
return _request(request_obj)
+class HostNameInvalidError(ValueError):
+ pass
+class TooManyRedirectionsError(Exception):
+ pass
+
def _request(request_obj: Request) -> Response:
request_obj.validate()
+
+ def _assert_allowed(url: str):
+ hostname = urllib.parse.urlparse(url).hostname
+ if hostname is None or not host_permitted(hostname):
+ raise HostNameInvalidError(
+ f"hostname {hostname} is not permitted")
+
def _wrap() -> Response:
headers = request_obj.get_headers()
- response = requests.request(
- request_obj.method,
- request_obj.url,
- headers=headers,
- params=request_obj.get_params,
- data=request_obj.get_body(),
- allow_redirects=request_obj.allow_redirects,
- stream=True,
- cookies=request_obj.cookies
- )
- response_content = response.raw.read(RESPONSE_MAX,
- decode_content=True)
- if not response.raw.read(1) == b"":
- raise ValueError("Response too large")
+
+ redirect = 0
+ current_url = request_obj.url
+ session = requests.Session()
+ if not request_obj.bindhost is None:
+ new_source = source.SourceAddressAdapter(request_obj.bindhost)
+ session.mount('http://', new_source)
+ session.mount('https://', new_source)
+
+ while True:
+ if request_obj.check_hostname:
+ _assert_allowed(current_url)
+
+ response = session.request(
+ request_obj.method,
+ current_url,
+ headers=headers,
+ params=request_obj.get_params,
+ data=request_obj.get_body(),
+ allow_redirects=False,
+ stream=True,
+ cookies=request_obj.cookies
+ )
+
+ if response.status_code in [301, 302]:
+ redirect += 1
+ if redirect == 5:
+ raise TooManyRedirectionsError(f"{redirect} redirects")
+ else:
+ current_url = response.headers["location"]
+ continue
+
+ response_content = response.raw.read(RESPONSE_MAX,
+ decode_content=True)
+ if not response.raw.read(1) == b"":
+ raise ValueError("Response too large")
+ break
+
+ session.close()
headers = utils.CaseInsensitiveDict(dict(response.headers))
our_response = Response(response.status_code, response_content,
diff --git a/src/utils/irc.py b/src/utils/irc.py
index 400d5352..f93f61ed 100644
--- a/src/utils/irc.py
+++ b/src/utils/irc.py
@@ -75,48 +75,30 @@ FORMAT_TOKENS = [
FORMAT_STRIP = [
"\x08" # backspace
]
-def _format_tokens(s: str) -> typing.List[str]:
- is_color = False
- foreground: typing.List[str] = []
- background: typing.List[str] = []
- is_background = False
- matches = [] # type: typing.List[str]
-
- for i, char in enumerate(s):
- last_char = i == len(s)-1
- if is_color:
- current_color = background if is_background else foreground
- color_finished = True
-
- if char == "," and not is_background:
- is_background = True
- color_finished = False
- elif char.isdigit() and len(current_color) < 2:
- current_color.append(char)
- color_finished = len(current_color) == 2 and is_background
-
- if color_finished or last_char:
- color = "".join(foreground)
- if background:
- color += "".join([","]+background)
+def _format_tokens(s: str) -> typing.List[str]:
+ tokens: typing.List[str] = []
- matches.append("\x03%s" % color)
- is_color = False
- foreground = []
- background = []
- is_background = False
+ s_copy = list(s)
+ while s_copy:
+ token = s_copy.pop(0)
+ if token == "\x03":
+ for i in range(2):
+ if s_copy and s_copy[0].isdigit():
+ token += s_copy.pop(0)
+ if (len(s_copy) > 1 and
+ s_copy[0] == "," and
+ s_copy[1].isdigit()):
+ token += s_copy.pop(0)
+ token += s_copy.pop(0)
+ if s_copy and s_copy[0].isdigit():
+ token += s_copy.pop(0)
- if char == consts.COLOR:
- if is_color:
- matches.append(char)
- else:
- is_color = True
- elif char in FORMAT_TOKENS:
- matches.append(char)
- elif char in FORMAT_STRIP:
- matches.append(char)
- return matches
+ tokens.append(token)
+ elif (token in FORMAT_TOKENS or
+ token in FORMAT_STRIP):
+ tokens.append(token)
+ return tokens
def _color_match(code: typing.Optional[str], foreground: bool) -> str:
if not code:
diff --git a/src/utils/parse/__init__.py b/src/utils/parse/__init__.py
index 262edf4a..36da8ffb 100644
--- a/src/utils/parse/__init__.py
+++ b/src/utils/parse/__init__.py
@@ -131,7 +131,7 @@ def format_tokens(s: str, sigil: str="$"
return tokens
def format_token_replace(s: str, vars: typing.Dict[str, str],
- sigil: str="$") -> str:
+ sigil: str="$") -> typing.Tuple[typing.List[str], str]:
vars = vars.copy()
vars.update({sigil: sigil})
@@ -140,7 +140,10 @@ def format_token_replace(s: str, vars: typing.Dict[str, str],
tokens.sort(key=lambda x: x[0])
tokens.reverse()
+ not_found: typing.List[str] = []
for start, end, token in tokens:
if token in vars:
s = s[:start] + vars[token] + s[end+1:]
- return s
+ else:
+ not_found += token
+ return not_found, s
diff --git a/src/utils/parse/sed.py b/src/utils/parse/sed.py
index 76b9e567..8a1c895d 100644
--- a/src/utils/parse/sed.py
+++ b/src/utils/parse/sed.py
@@ -76,6 +76,6 @@ def parse(sed_s: str) -> typing.Optional[Sed]:
return SedMatch(type, re.compile(pattern, flags))
return None
-def sed(sed_obj: Sed, s: str) -> typing.Tuple[str, typing.Optional[str]]:
+def sed(sed_obj: Sed, s: str) -> typing.Optional[str]:
out = sed_obj.match(s)
- return sed_obj.type, out
+ return out
diff --git a/src/utils/parse/spec.py b/src/utils/parse/spec.py
index 7d7ffd3a..2a682c3a 100644
--- a/src/utils/parse/spec.py
+++ b/src/utils/parse/spec.py
@@ -15,6 +15,7 @@ class SpecArgumentContext(enum.IntFlag):
class SpecArgumentType(object):
context = SpecArgumentContext.ALL
+ _modifier: typing.Optional[str]
def __init__(self, type_name: str, name: typing.Optional[str],
modifier: typing.Optional[str], exported: typing.Optional[str]):
@@ -24,7 +25,7 @@ class SpecArgumentType(object):
self.exported = exported
def _set_modifier(self, modifier: typing.Optional[str]):
- pass
+ self._modifier = modifier
def name(self) -> typing.Optional[str]:
return self._name
@@ -102,6 +103,18 @@ class SpecArgumentTypeDate(SpecArgumentType):
return date_human(args[0]), 1
return None, 1
+class SpecArgumentTypeFlag(SpecArgumentType):
+ def _str(self):
+ pref = "-" if len(self._modifier) == 1 else "--"
+ return f"{pref}{self._modifier}"
+ def name(self):
+ return self._str()
+ def simple(self, args):
+ print("flag _str()", self._str())
+ if args and args[0] == self._str():
+ return True, 1
+ return None, 1
+
class SpecArgumentPrivateType(SpecArgumentType):
context = SpecArgumentContext.PRIVATE
@@ -115,7 +128,8 @@ SPEC_ARGUMENT_TYPES = {
"int": SpecArgumentTypeInt,
"date": SpecArgumentTypeDate,
"duration": SpecArgumentTypeDuration,
- "pattern": SpecArgumentTypePattern
+ "pattern": SpecArgumentTypePattern,
+ "flag": SpecArgumentTypeFlag
}
class SpecArgument(object):
diff --git a/src/utils/settings.py b/src/utils/settings.py
index 97fe885b..53ee0ebd 100644
--- a/src/utils/settings.py
+++ b/src/utils/settings.py
@@ -49,15 +49,17 @@ class IntSetting(Setting):
class IntRangeSetting(IntSetting):
example: typing.Optional[str] = None
- def __init__(self, n_min: int, n_max: int, name: str, help: str=None,
- example: str=None):
+ def __init__(self, n_min: int, n_max: typing.Optional[int], name: str,
+ help: str=None, example: str=None):
self._n_min = n_min
self._n_max = n_max
Setting.__init__(self, name, help, example)
def parse(self, value: str) -> typing.Any:
out = IntSetting.parse(self, value)
- if not out == None and self._n_min <= out <= self._n_max:
+ if (not out == None and
+ self._n_min <= out and
+ (self._n_max == None or out <= self._n_max)):
return out
return None