Source code for clients.base

import collections
import contextlib
import functools
import json
import random
import re
import threading
from typing import Callable, Iterator
from urllib.parse import urljoin
import requests


def content_type(response, **patterns):
    """Return name for response's content-type based on regular expression matches."""
    ct = response.headers.get('content-type', '')
    matches = (name for name, pattern in patterns.items() if re.match(pattern, ct))
    return next(matches, '')


def validate(response):
    """Return validation headers from response translated for modification."""
    headers = response.headers
    validators = {'etag': 'if-match', 'last-modified': 'if-unmodified-since'}
    return {validators[key]: headers[key] for key in validators if key in headers}


[docs]class Client(requests.Session): """A Session which sends requests to a base url. :param url: base url for requests :param trailing: trailing chars (e.g. /) appended to the url :param headers: additional headers to include in requests :param attrs: additional Session attributes """ def __init__(self, url, trailing='', headers=(), **attrs): super().__init__() self.__setstate__(attrs) self.headers.update(headers) self.trailing = trailing self.url = url.rstrip('/') + '/' @classmethod def clone(cls, other, path='', **kwargs): kwargs.update(other.__getstate__()) return cls(urljoin(other.url, path), trailing=other.trailing, **kwargs) def __repr__(self): return f'{type(self).__name__}({self.url}... {self.trailing})'
[docs] def __truediv__(self, path: str) -> 'Client': """Return a cloned client with appended path.""" return type(self).clone(self, path)
[docs] def request(self, method, path, **kwargs): """Send request with relative or absolute path and return response.""" url = urljoin(self.url, path).rstrip('/') + self.trailing return super().request(method, url, **kwargs)
[docs] def get(self, path='', **kwargs): """GET request with optional path.""" return self.request('GET', path, **kwargs)
[docs] def options(self, path='', **kwargs): """OPTIONS request with optional path.""" return self.request('OPTIONS', path, **kwargs)
[docs] def head(self, path='', allow_redirects=False, **kwargs): """HEAD request with optional path.""" return self.request('HEAD', path, allow_redirects=allow_redirects, **kwargs)
[docs] def post(self, path='', json=None, **kwargs): """POST request with optional path and json body.""" return self.request('POST', path, json=json, **kwargs)
[docs] def put(self, path='', json=None, **kwargs): """PUT request with optional path and json body.""" return self.request('PUT', path, json=json, **kwargs)
[docs] def patch(self, path='', json=None, **kwargs): """PATCH request with optional path and json body.""" return self.request('PATCH', path, json=json, **kwargs)
[docs] def delete(self, path='', **kwargs): """DELETE request with optional path.""" return self.request('DELETE', path, **kwargs)
[docs]class Resource(Client): """A `Client`_ which returns json content and has syntactic support for requests.""" client = property(Client.clone, doc="upcasted `Client`_") __getitem__ = Client.get __setitem__ = Client.put __delitem__ = Client.delete __getattr__ = Client.__truediv__ content_type = functools.partial(content_type, text='text/', json=r'application/(\w|\.)*\+?json')
[docs] def request(self, method, path, **kwargs): """Send request with path and return processed content.""" response = super().request(method, path, **kwargs) response.raise_for_status() if self.content_type(response) == 'json': return response.json() return response.text if response.encoding else response.content
[docs] def iter(self, path: str = '', **kwargs) -> Iterator: """Iterate lines or chunks from streamed GET request.""" response = super().request('GET', path, stream=True, **kwargs) response.raise_for_status() content_type = self.content_type(response) if content_type == 'json': response.encoding = response.encoding or 'utf8' return map(json.loads, response.iter_lines(decode_unicode=True)) if response.encoding or content_type == 'text': return response.iter_lines(decode_unicode=response.encoding) return iter(response)
__iter__ = iter
[docs] def __contains__(self, path: str): """Return whether endpoint exists according to HEAD request.""" return super().request('HEAD', path, allow_redirects=False).ok
[docs] def __call__(self, path: str = '', **params): """GET request with params.""" return self.get(path, params=params)
def updater(self, path='', **kwargs): response = super().request('GET', path, **kwargs) response.raise_for_status() kwargs['headers'] = dict(kwargs.get('headers', {}), **validate(response)) yield self.put(path, (yield response.json()), **kwargs)
[docs] @contextlib.contextmanager def updating(self, path: str = '', **kwargs): """Provisional context manager to GET and conditionally PUT json data.""" updater = self.updater(path, **kwargs) json = next(updater) yield json updater.send(json)
[docs] def update(self, path: str = '', callback: Callable = None, **json): """PATCH request with json params. :param callback: optionally update with GET and validated PUT. ``callback`` is called on the json result with keyword params, i.e., ``dict`` correctly implements the simple update case. """ if callback is None: return self.patch(path, json=json) updater = self.updater(path) return updater.send(callback(next(updater), **json))
[docs] def create(self, path: str = '', json=None, **kwargs) -> str: """POST request and return location.""" response = super().request('POST', path, json=json, **kwargs) response.raise_for_status() return response.headers.get('location')
[docs] def download(self, file, path: str = '', **kwargs): """Output streamed GET request to file.""" response = super().request('GET', path, stream=True, **kwargs) response.raise_for_status() for chunk in response: file.write(chunk) return file
[docs] def authorize(self, path: str = '', **kwargs) -> dict: """Acquire oauth access token and set ``Authorization`` header.""" method = 'GET' if {'json', 'data'}.isdisjoint(kwargs) else 'POST' result = self.request(method, path, **kwargs) self.headers['authorization'] = f"{result['token_type']} {result['access_token']}" return result
[docs]class Remote(Client): """A `Client`_ which defaults to posts with json bodies, i.e., RPC. :param url: base url for requests :param json: default json body for all calls :param kwargs: same options as `Client`_ """ client = Resource.client __getattr__ = Resource.__getattr__
[docs] def __init__(self, url: str, json=(), **kwargs): super().__init__(url, **kwargs) self.json = dict(json)
@classmethod def clone(cls, other, path=''): return Client.clone.__func__(cls, other, path, json=other.json)
[docs] def __call__(self, path: str = '', **json): """POST request with json body and :meth:`check` result.""" response = self.post(path, json=dict(self.json, **json)) response.raise_for_status() return self.check(response.json())
[docs] @staticmethod def check(result): """Override to return result or raise error, for APIs which don't use status codes.""" return result
[docs]class Graph(Remote): """A `Remote`_ client which executes GraphQL queries.""" Error = requests.HTTPError
[docs] @classmethod def check(cls, result: dict): """Return ``data`` or raise ``errors``.""" for error in result.get('errors', ()): raise cls.Error(error) return result.get('data')
[docs] def execute(self, query: str, **variables): """Execute query over POST.""" return self(query=query, variables=variables)
class Stats(collections.Counter): """Thread-safe Counter. Context manager tracks number of active connections and errors. """ def __init__(self): self.lock = threading.Lock() def add(self, **kwargs): """Atomically add data.""" with self.lock: self.update(kwargs) def __enter__(self): self.add(connections=1) return self def __exit__(self, *args): self.add(connections=-1, errors=int(any(args)))
[docs]class Proxy(Client): """An extensible embedded proxy client to multiple hosts. The default implementation provides load balancing based on active connections. It does not provide error handling or retrying. :param urls: base urls for requests :param kwargs: same options as `Client`_ """ Stats = Stats def __init__(self, *urls: str, **kwargs): super().__init__('', **kwargs) self.urls = {(url.rstrip('/') + '/'): self.Stats() for url in urls} @classmethod def clone(cls, other, path=''): urls = (urljoin(url, path) for url in other.urls) return cls(*urls, trailing=other.trailing, **other.__getstate__())
[docs] def priority(self, url: str): """Return comparable priority for url. Minimizes errors, failures (500s), and active connections. None may be used to eliminate from consideration. """ stats = self.urls[url] return tuple(stats[key] for key in ('errors', 'failures', 'connections'))
[docs] def choice(self, method: str) -> str: """Return chosen url according to priority. :param method: placeholder for extensions which distinguish read/write requests """ priorities = collections.defaultdict(list) # type: dict for url in self.urls: priorities[self.priority(url)].append(url) priorities.pop(None, None) return random.choice(priorities[min(priorities)])
[docs] def request(self, method, path, **kwargs): """Send request with relative or absolute path and return response.""" url = self.choice(method) with self.urls[url] as stats: response = super().request(method, urljoin(url, path), **kwargs) stats.add(failures=int(response.status_code >= 500)) return response