diff options
| author | 2019-09-11 17:44:07 +0100 | |
|---|---|---|
| committer | 2019-09-11 17:44:07 +0100 | |
| commit | 4a97c9eb0dc8279a7382c2fddf51f0ed93a99c1a (patch) | |
| tree | 0b453a54b928ecdc2d6268e109faf20d9d4866b1 /src/utils/http.py | |
| parent | automatically decode certain http content types (diff) | |
| signature | ||
refactor utils.http.requests to support a Request object
Diffstat (limited to 'src/utils/http.py')
| -rw-r--r-- | src/utils/http.py | 104 |
1 files changed, 77 insertions, 27 deletions
diff --git a/src/utils/http.py b/src/utils/http.py index 5b5f36bb..4dfb450d 100644 --- a/src/utils/http.py +++ b/src/utils/http.py @@ -52,6 +52,62 @@ class HTTPWrongContentTypeException(HTTPException): def throw_timeout(): raise HTTPTimeoutException() +class Request(object): + def __init__(self, url: str, method: str="GET", + get_params: typing.Dict[str, str]={}, post_data: typing.Any=None, + headers: typing.Dict[str, str]={}, + + json: bool=False, allow_redirects: bool=True, + check_content_type: bool=True, parse: bool=False, + detect_encoding: bool=True, + + parser: str="lxml", fallback_encoding="iso-8859-1", + content_type: str=None, + + **kwargs): + self.set_url(url) + self.method = method.upper() + self.get_params = get_params + self.post_data = post_data + self.headers = headers + + self.json = json + self.allow_redirects = allow_redirects + self.check_content_type = check_content_type + self.parse = parse + self.detect_encoding = detect_encoding + + self.parser = parser + self.fallback_encoding = fallback_encoding + self.content_type = content_type + + if kwargs: + if method == "POST": + self.post_data = kwargs + else: + self.get_params.update(kwargs) + + def set_url(self, url: str): + if not urllib.parse.urlparse(url).scheme: + url = "http://%s" % url + self.url = url + + def get_headers(self) -> typing.Dict[str, str]: + headers = self.headers.copy() + if not "Accept-Language" in headers: + headers["Accept-Language"] = "en-GB" + if not "User-Agent" in headers: + headers["User-Agent"] = USER_AGENT + if not "Content-Type" in headers and self.content_type: + headers["Content-Type"] = self.content_type + return headers + + def get_body(self) -> typing.Any: + if self.content_type == "application/json": + return _json.dumps(self.post_data) + else: + return self.post_data + class Response(object): def __init__(self, code: int, data: typing.Any, headers: typing.Dict[str, str]): @@ -84,31 +140,23 @@ def _find_encoding(soup: bs4.BeautifulSoup) -> typing.Optional[str]: return None -def request(url: str, method: str="GET", get_params: dict={}, - post_data: typing.Any=None, headers: dict={}, - json_data: typing.Any=None, code: bool=False, json: bool=False, - soup: bool=False, parser: str="lxml", detect_encoding: bool=True, - fallback_encoding: str="utf8", allow_redirects: bool=True, - check_content_type: bool=True) -> Response: - - if not urllib.parse.urlparse(url).scheme: - url = "http://%s" % url +def request(request_obj: typing.Union[str, Request], **kwargs) -> Response: + if type(request_obj) == str: + request_obj = Request(request_obj, **kwargs) + return _request(request_obj) - if not "Accept-Language" in headers: - headers["Accept-Language"] = "en-GB" - if not "User-Agent" in headers: - headers["User-Agent"] = USER_AGENT +def _request(request_obj: Request) -> Response: + headers = request_obj.get_headers() with utils.deadline(seconds=5): try: response = requests.request( - method.upper(), - url, + request_obj.method, + request_obj.url, headers=headers, - params=get_params, - data=post_data, - json=json_data, - allow_redirects=allow_redirects, + params=request_obj.get_params, + data=request_obj.get_body(), + allow_redirects=request_obj.allow_redirects, stream=True ) response_content = response.raw.read(RESPONSE_MAX, @@ -122,23 +170,25 @@ def request(url: str, method: str="GET", get_params: dict={}, response_headers = utils.CaseInsensitiveDict(dict(response.headers)) content_type = response.headers.get("Content-Type", "").split(";", 1)[0] - encoding = response.encoding or "iso-8859-1" - if detect_encoding and content_type and content_type in SOUP_CONTENT_TYPES: - souped = bs4.BeautifulSoup(response_content, parser) + encoding = response.encoding or request_obj.fallback_encoding + if (request_obj.detect_encoding and + content_type and content_type in SOUP_CONTENT_TYPES): + souped = bs4.BeautifulSoup(response_content, request_obj.parser) encoding = _find_encoding(souped) or encoding def _decode_data(): return response_content.decode(encoding) - if soup: - if not check_content_type or content_type in SOUP_CONTENT_TYPES: - soup = bs4.BeautifulSoup(_decode_data(), parser) - return Response(response.status_code, soup, response_headers) + if request_obj.parse: + if (not request_obj.check_content_type or + content_type in SOUP_CONTENT_TYPES): + souped = bs4.BeautifulSoup(_decode_data(), request_obj.parser) + return Response(response.status_code, souped, response_headers) else: raise HTTPWrongContentTypeException( "Tried to soup non-html/non-xml data (%s)" % content_type) - if json and response_content: + if request_obj.json and response_content: data = _decode_data() try: return Response(response.status_code, _json.loads(data), |
