import collections
import contextlib
import functools
import json
import random
import re
import threading
from typing import Callable, Iterator
from urllib.parse import urljoin
import requests

def content_type(response, **patterns):
    """Return name for response's content-type based on regular expression matches."""
    ct = response.headers.get('content-type', '')
    matches = (name for name, pattern in patterns.items() if re.match(pattern, ct))
    return next(matches, '')

def validate(response):
    """Return validation headers from response translated for modification."""
    headers = response.headers
    validators = {'etag': 'if-match', 'last-modified': 'if-unmodified-since'}
    return {validators[key]: headers[key] for key in validators if key in headers}

class TokenAuth(dict):
    def __call__(self, req):
        req.headers['Authorization'] = ' '.join(*self.items())
        return req

[docs]class Client(requests.Session): """A Session which sends requests to a base url. :param url: base url for requests :param trailing: trailing chars (e.g. /) appended to the url :param headers: additional headers to include in requests :param auth: additional authorization support for ``{token_type: access_token}``, available per request as well :param attrs: additional Session attributes """ def __init__(self, url, trailing='', headers=(), auth=None, **attrs): super().__init__() self.__setstate__(attrs) self.auth = TokenAuth(auth) if isinstance(auth, dict) else auth self.headers.update(headers) self.trailing = trailing self.url = url.rstrip('/') + '/' @classmethod def clone(cls, other, path='', **kwargs): kwargs.update(other.__getstate__()) return cls(urljoin(other.url, path), trailing=other.trailing, **kwargs) def __repr__(self): return '{}({}... {})'.format(type(self).__name__, self.url, self.trailing)
[docs] def __truediv__(self, path: str) -> 'Client': """Return a cloned client with appended path.""" return type(self).clone(self, path)
[docs] def request(self, method, path, auth=None, **kwargs): """Send request with relative or absolute path and return response.""" kwargs['auth'] = TokenAuth(auth) if isinstance(auth, dict) else auth url = urljoin(self.url, path).rstrip('/') + self.trailing return super().request(method, url, **kwargs)
[docs] def get(self, path='', **kwargs): """GET request with optional path.""" return self.request('GET', path, **kwargs)
[docs] def options(self, path='', **kwargs): """OPTIONS request with optional path.""" return self.request('OPTIONS', path, **kwargs)
[docs] def head(self, path='', allow_redirects=False, **kwargs): """HEAD request with optional path.""" return self.request('HEAD', path, allow_redirects=allow_redirects, **kwargs)
[docs] def post(self, path='', json=None, **kwargs): """POST request with optional path and json body.""" return self.request('POST', path, json=json, **kwargs)
[docs] def put(self, path='', json=None, **kwargs): """PUT request with optional path and json body.""" return self.request('PUT', path, json=json, **kwargs)
[docs] def patch(self, path='', json=None, **kwargs): """PATCH request with optional path and json body.""" return self.request('PATCH', path, json=json, **kwargs)
[docs] def delete(self, path='', **kwargs): """DELETE request with optional path.""" return self.request('DELETE', path, **kwargs)
[docs]class Resource(Client): """A `Client`_ which returns json content and has syntactic support for requests.""" client = property(Client.clone, doc="upcasted `Client`_") __getitem__ = Client.get __setitem__ = Client.put __delitem__ = Client.delete content_type = functools.partial(content_type, text='text/', json=r'application/(\w|\.)*\+?json')
[docs] def __getattr__(self, name: str) -> Client: if name in type(self).__attrs__: raise AttributeError(name) return self / name
__getattr__.__doc__ = Client.__truediv__.__doc__
[docs] def request(self, method, path, **kwargs): """Send request with path and return processed content.""" response = super().request(method, path, **kwargs) response.raise_for_status() if self.content_type(response) == 'json': return response.json() return response.text if response.encoding else response.content
[docs] def iter(self, path: str = '', **kwargs) -> Iterator: """Iterate lines or chunks from streamed GET request.""" response = super().request('GET', path, stream=True, **kwargs) response.raise_for_status() content_type = self.content_type(response) if content_type == 'json': response.encoding = response.encoding or 'utf8' return map(json.loads, response.iter_lines(decode_unicode=True)) if response.encoding or content_type == 'text': return response.iter_lines(decode_unicode=response.encoding) return iter(response)
__iter__ = iter
[docs] def __contains__(self, path: str): """Return whether endpoint exists according to HEAD request.""" return super().request('HEAD', path, allow_redirects=False).ok
[docs] def __call__(self, path: str = '', **params): """GET request with params.""" return self.get(path, params=params)
def updater(self, path='', **kwargs): response = super().request('GET', path, **kwargs) response.raise_for_status() kwargs['headers'] = dict(kwargs.get('headers', {}), **validate(response)) yield self.put(path, (yield response.json()), **kwargs)
[docs] @contextlib.contextmanager def updating(self, path: str = '', **kwargs): """Provisional context manager to GET and conditionally PUT json data.""" updater = self.updater(path, **kwargs) json = next(updater) yield json updater.send(json)
[docs] def update(self, path: str = '', callback: Callable = None, **json): """PATCH request with json params. :param callback: optionally update with GET and validated PUT. ``callback`` is called on the json result with keyword params, i.e., ``dict`` correctly implements the simple update case. """ if callback is None: return self.patch(path, json=json) updater = self.updater(path) return updater.send(callback(next(updater), **json))
[docs] def create(self, path: str = '', json=None, **kwargs) -> str: """POST request and return location.""" response = super().request('POST', path, json=json, **kwargs) response.raise_for_status() return response.headers.get('location')
[docs] def download(self, file, path: str = '', **kwargs): """Output streamed GET request to file.""" response = super().request('GET', path, stream=True, **kwargs) response.raise_for_status() for chunk in response: file.write(chunk) return file
[docs] def authorize(self, path: str = '', **kwargs) -> dict: """Acquire oauth access token and set ``auth``.""" method = 'GET' if {'json', 'data'}.isdisjoint(kwargs) else 'POST' result = self.request(method, path, **kwargs) self.auth = TokenAuth({result['token_type']: result['access_token']}) return result
[docs]class Remote(Client): """A `Client`_ which defaults to posts with json bodies, i.e., RPC. :param url: base url for requests :param json: default json body for all calls :param kwargs: same options as `Client`_ """ client = Resource.client __getattr__ = Resource.__dict__['__getattr__']
[docs] def __init__(self, url: str, json=(), **kwargs): super().__init__(url, **kwargs) self.json = dict(json)
@classmethod def clone(cls, other, path=''): return Client.clone.__func__(cls, other, path, json=other.json)
[docs] def __call__(self, path: str = '', **json): """POST request with json body and :meth:`check` result.""" response =, json=dict(self.json, **json)) response.raise_for_status() return self.check(response.json())
[docs] @staticmethod def check(result): """Override to return result or raise error, for APIs which don't use status codes.""" return result
[docs]class Graph(Remote): """A `Remote`_ client which executes GraphQL queries.""" Error = requests.HTTPError
[docs] @classmethod def check(cls, result: dict): """Return ``data`` or raise ``errors``.""" for error in result.get('errors', ()): raise cls.Error(error) return result.get('data')
[docs] def execute(self, query: str, **variables): """Execute query over POST.""" return self(query=query, variables=variables)
class Stats(collections.Counter): """Thread-safe Counter. Context manager tracks number of active connections and errors. """ def __init__(self): self.lock = threading.Lock() def add(self, **kwargs): """Atomically add data.""" with self.lock: self.update(kwargs) def __enter__(self): self.add(connections=1) return self def __exit__(self, *args): self.add(connections=-1, errors=int(any(args)))
[docs]class Proxy(Client): """An extensible embedded proxy client to multiple hosts. The default implementation provides load balancing based on active connections. It does not provide error handling or retrying. :param urls: base urls for requests :param kwargs: same options as `Client`_ """ Stats = Stats def __init__(self, *urls: str, **kwargs): super().__init__('', **kwargs) self.urls = {(url.rstrip('/') + '/'): self.Stats() for url in urls} @classmethod def clone(cls, other, path=''): urls = (urljoin(url, path) for url in other.urls) return cls(*urls, trailing=other.trailing, **other.__getstate__())
[docs] def priority(self, url: str): """Return comparable priority for url. Minimizes errors, failures (500s), and active connections. None may be used to eliminate from consideration. """ stats = self.urls[url] return tuple(stats[key] for key in ('errors', 'failures', 'connections'))
[docs] def choice(self, method: str) -> str: """Return chosen url according to priority. :param method: placeholder for extensions which distinguish read/write requests """ priorities = collections.defaultdict(list) # type: dict for url in self.urls: priorities[self.priority(url)].append(url) priorities.pop(None, None) return random.choice(priorities[min(priorities)])
[docs] def request(self, method, path, **kwargs): """Send request with relative or absolute path and return response.""" url = self.choice(method) with self.urls[url] as stats: response = super().request(method, urljoin(url, path), **kwargs) stats.add(failures=int(response.status_code >= 500)) return response