aboutsummaryrefslogtreecommitdiff
path: root/src/utils/http.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/utils/http.py')
-rw-r--r--src/utils/http.py136
1 files changed, 53 insertions, 83 deletions
diff --git a/src/utils/http.py b/src/utils/http.py
index f31da62c..699c48f1 100644
--- a/src/utils/http.py
+++ b/src/utils/http.py
@@ -1,9 +1,8 @@
-import asyncio, codecs, ipaddress, re, signal, socket, traceback, typing
-import urllib.error, urllib.parse, uuid
+import asyncio, codecs, dataclasses, ipaddress, re, signal, socket, traceback
+import typing, urllib.error, urllib.parse, uuid
import json as _json
-import bs4, netifaces, requests
-import tornado.httpclient
-from src import utils
+import bs4, netifaces, requests, tornado.httpclient
+from src import IRCBot, utils
REGEX_URL = re.compile("https?://\S+", re.I)
@@ -29,8 +28,8 @@ def url_sanitise(url: str):
url = url[:-1]
return url
-DEFAULT_USERAGENT = ("Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 "
- "(KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36")
+USERAGENT = "Mozilla/5.0 (compatible; BitBot/%s; +%s" % (
+ IRCBot.VERSION, IRCBot.URL)
RESPONSE_MAX = (1024*1024)*100
SOUP_CONTENT_TYPES = ["text/html", "text/xml", "application/xml"]
@@ -54,46 +53,33 @@ class HTTPWrongContentTypeException(HTTPException):
def throw_timeout():
raise HTTPTimeoutException()
+@dataclasses.dataclass
class Request(object):
- def __init__(self, url: str,
- get_params: typing.Dict[str, str]={}, post_data: typing.Any=None,
- headers: typing.Dict[str, str]={},
+ url: str
+ id: typing.Optional[str] = None
+ method: str = "GET"
- json: bool=False, json_body: bool=False, allow_redirects: bool=True,
- check_content_type: bool=True, parse: bool=False,
- detect_encoding: bool=True,
+ get_params: typing.Dict[str, str] = dataclasses.field(
+ default_factory=dict)
+ post_data: typing.Any = None
+ headers: typing.Dict[str, str] = dataclasses.field(
+ default_factory=dict)
+ cookies: typing.Dict[str, str] = dataclasses.field(
+ default_factory=dict)
- method: str="GET", parser: str="lxml", id: str=None,
- fallback_encoding: str=None, content_type: str=None,
- proxy: str=None, useragent: str=None,
+ json_body: bool = False
- **kwargs):
- self.id = id or str(uuid.uuid4())
+ allow_redirects: bool = True
+ check_content_type: bool = True
+ fallback_encoding: typing.Optional[str] = None
+ content_type: typing.Optional[str] = None
+ proxy: typing.Optional[str] = None
+ useragent: typing.Optional[str] = None
- self.set_url(url)
- self.method = method.upper()
- self.get_params = get_params
- self.post_data = post_data
- self.headers = headers
-
- self.json = json
- self.json_body = json_body
- self.allow_redirects = allow_redirects
- self.check_content_type = check_content_type
- self.parse = parse
- self.detect_encoding = detect_encoding
-
- self.parser = parser
- self.fallback_encoding = fallback_encoding
- self.content_type = content_type
- self.proxy = proxy
- self.useragent = useragent
-
- if kwargs:
- if method == "POST":
- self.post_data = kwargs
- else:
- self.get_params.update(kwargs)
+ def validate(self):
+ self.id = self.id or str(uuid.uuid4())
+ self.set_url(self.url)
+ self.method = self.method.upper()
def set_url(self, url: str):
parts = urllib.parse.urlparse(url)
@@ -113,7 +99,7 @@ class Request(object):
if not "Accept-Language" in headers:
headers["Accept-Language"] = "en-GB"
if not "User-Agent" in headers:
- headers["User-Agent"] = self.useragent or DEFAULT_USERAGENT
+ headers["User-Agent"] = self.useragent or USERAGENT
if not "Content-Type" in headers and self.content_type:
headers["Content-Type"] = self.content_type
return headers
@@ -128,13 +114,20 @@ class Request(object):
return None
class Response(object):
- def __init__(self, code: int, data: typing.Any,
- headers: typing.Dict[str, str], encoding: str):
+ def __init__(self, code: int, data: bytes, encoding: str,
+ headers: typing.Dict[str, str], cookies: typing.Dict[str, str]):
self.code = code
self.data = data
- self.headers = headers
self.content_type = headers.get("Content-Type", "").split(";", 1)[0]
self.encoding = encoding
+ self.headers = headers
+ self.cookies = cookies
+ def decode(self, encoding="utf8") -> str:
+ return self.data.decode(encoding)
+ def json(self) -> typing.Any:
+ return _json.loads(self.data)
+ def soup(self, parser: str="lxml") -> bs4.BeautifulSoup:
+ return bs4.BeautifulSoup(self.decode(), parser)
def _meta_content(s: str) -> typing.Dict[str, str]:
out = {}
@@ -143,7 +136,8 @@ def _meta_content(s: str) -> typing.Dict[str, str]:
out[key] = value
return out
-def _find_encoding(soup: bs4.BeautifulSoup) -> typing.Optional[str]:
+def _find_encoding(data: bytes) -> typing.Optional[str]:
+ soup = bs4.BeautifulSoup(data, "lxml")
if not soup.meta == None:
meta_charset = soup.meta.get("charset")
if not meta_charset == None:
@@ -167,7 +161,7 @@ def request(request_obj: typing.Union[str, Request], **kwargs) -> Response:
return _request(request_obj)
def _request(request_obj: Request) -> Response:
-
+ request_obj.validate()
def _wrap() -> Response:
headers = request_obj.get_headers()
response = requests.request(
@@ -177,7 +171,8 @@ def _request(request_obj: Request) -> Response:
params=request_obj.get_params,
data=request_obj.get_body(),
allow_redirects=request_obj.allow_redirects,
- stream=True
+ stream=True,
+ cookies=request_obj.cookies
)
response_content = response.raw.read(RESPONSE_MAX,
decode_content=True)
@@ -186,7 +181,8 @@ def _request(request_obj: Request) -> Response:
headers = utils.CaseInsensitiveDict(dict(response.headers))
our_response = Response(response.status_code, response_content,
- headers=headers, encoding=response.encoding)
+ encoding=response.encoding, headers=headers,
+ cookies=response.cookies.get_dict())
return our_response
try:
@@ -202,39 +198,12 @@ def _request(request_obj: Request) -> Response:
else:
encoding = "iso-8859-1"
- if (request_obj.detect_encoding and
- response.content_type and
+ if (response.content_type and
response.content_type in SOUP_CONTENT_TYPES):
- souped = bs4.BeautifulSoup(response.data, request_obj.parser)
- encoding = _find_encoding(souped) or encoding
-
- def _decode_data():
- return response.data.decode(encoding)
-
- if request_obj.parse:
- if (not request_obj.check_content_type or
- response.content_type in SOUP_CONTENT_TYPES):
- souped = bs4.BeautifulSoup(_decode_data(), request_obj.parser)
- response.data = souped
- return response
- else:
- raise HTTPWrongContentTypeException(
- "Tried to soup non-html/non-xml data (%s)" %
- response.content_type)
+ encoding = _find_encoding(response.data) or encoding
+ response.encoding = encoding
- if request_obj.json and response.data:
- data = _decode_data()
- try:
- response.data = _json.loads(data)
- return response
- except _json.decoder.JSONDecodeError as e:
- raise HTTPParsingException(str(e), data)
-
- if response.content_type in DECODE_CONTENT_TYPES:
- response.data = _decode_data()
- return response
- else:
- return response
+ return response
class RequestManyException(Exception):
pass
@@ -242,6 +211,7 @@ def request_many(requests: typing.List[Request]) -> typing.Dict[str, Response]:
responses = {}
async def _request(request):
+ request.validate()
client = tornado.httpclient.AsyncHTTPClient()
url = request.url
if request.get_params:
@@ -263,8 +233,8 @@ def request_many(requests: typing.List[Request]) -> typing.Dict[str, Response]:
"request_many failed for %s" % url)
headers = utils.CaseInsensitiveDict(dict(response.headers))
- data = response.body.decode("utf8")
- responses[request.id] = Response(response.code, data, headers, "utf8")
+ responses[request.id] = Response(response.code, response.body, "utf8",
+ headers, {})
loop = asyncio.new_event_loop()
awaits = []