Source code for clients.aio

import asyncio
import contextlib
from urllib.parse import urljoin
import httpx
from .base import validate, Client, Graph, Proxy, Remote, Resource


[docs]class AsyncClient(httpx.AsyncClient): """An asynchronous Client which sends requests to a base url. :param url: base url for requests :param trailing: trailing chars (e.g. /) appended to the url :param attrs: additional AsyncClient options """ __truediv__ = Client.__truediv__ __repr__ = Client.__repr__ # type: ignore get = Client.get # type: ignore options = Client.options # type: ignore head = Client.head # type: ignore post = Client.post # type: ignore put = Client.put # type: ignore patch = Client.patch # type: ignore delete = Client.delete # type: ignore def __init__(self, url: str, *, trailing: str = '', **attrs): super().__init__(base_url=url.rstrip('/') + '/', **attrs) self._attrs = attrs self.trailing = trailing def __del__(self): pass @property def url(self): return str(self.base_url) @classmethod def clone(cls, other, path='', **kwargs): url = str(other.base_url.join(path)) kwargs.update(other._attrs) return cls(url, trailing=other.trailing, **kwargs)
[docs] def request(self, method, path, **kwargs): """Send request with relative or absolute path and return response.""" url = str(self.base_url.join(path)).rstrip('/') + self.trailing return super().request(method, url, **kwargs)
[docs] def run(self, name: str, *args, **kwargs): """Synchronously call method and run coroutine.""" return asyncio.get_event_loop().run_until_complete(getattr(self, name)(*args, **kwargs))
[docs]class AsyncResource(AsyncClient): """An `AsyncClient`_ which returns json content and has syntactic support for requests.""" client = property(AsyncClient.clone, doc="upcasted `AsyncClient`_") __getattr__ = AsyncClient.__truediv__ __getitem__ = AsyncClient.get # type: ignore content_type = Resource.content_type __call__ = Resource.__call__
[docs] async def request(self, method, path, **kwargs): """Send request with path and return processed content.""" response = await super().request(method, path, **kwargs) response.raise_for_status() if self.content_type(response) == 'json': return response.json() return response.text if response.encoding else response.content
async def updater(self, path='', **kwargs): response = await super().request('GET', path, **kwargs) response.raise_for_status() kwargs['headers'] = dict(kwargs.get('headers', {}), **validate(response)) yield await self.put(path, (yield response.json()), **kwargs)
[docs] async def updating(self, path: str = '', **kwargs): """Provisional context manager to GET and conditionally PUT json data.""" updater = self.updater(path, **kwargs) json = await updater.__anext__() yield json await updater.asend(json)
if hasattr(contextlib, 'asynccontextmanager'): # pragma: no branch updating = contextlib.asynccontextmanager(updating)
[docs] async def update(self, path='', callback=None, **json): """PATCH request with json params. :param callback: optionally update with GET and validated PUT. ``callback`` is called on the json result with keyword params, i.e., ``dict`` correctly implements the simple update case. """ if callback is None: return await self.patch(path, json) updater = self.updater(path) return await updater.asend(callback(await updater.__anext__(), **json))
[docs] async def authorize(self, path: str = '', **kwargs) -> dict: """Acquire oauth access token and set ``Authorization`` header.""" method = 'GET' if {'json', 'data'}.isdisjoint(kwargs) else 'POST' result = await self.request(method, path, **kwargs) self.headers['authorization'] = f"{result['token_type']} {result['access_token']}" self._attrs['headers'] = self.headers return result
[docs]class AsyncRemote(AsyncClient): """An `AsyncClient`_ which defaults to posts with json bodies, i.e., RPC. :param url: base url for requests :param json: default json body for all calls :param kwargs: same options as `AsyncClient`_ """ client = AsyncResource.client __getattr__ = AsyncResource.__getattr__ check = staticmethod(Remote.check)
[docs] def __init__(self, url: str, json=(), **kwargs): super().__init__(url, **kwargs) self.json = dict(json)
@classmethod def clone(cls, other, path=''): return AsyncClient.clone.__func__(cls, other, path, json=other.json)
[docs] async def __call__(self, path='', **json): """POST request with json body and check result.""" response = await self.post(path, json=dict(self.json, **json)) response.raise_for_status() return self.check(response.json())
[docs]class AsyncGraph(AsyncRemote): """An `AsyncRemote`_ client which executes GraphQL queries.""" Error = httpx.HTTPError execute = Graph.execute
[docs] @classmethod def check(cls, result: dict): # type: ignore """Return ``data`` or raise ``errors``.""" for error in result.get('errors', ()): raise cls.Error(error, request=None) return result.get('data')
[docs]class AsyncProxy(AsyncClient): """An extensible embedded proxy client to multiple hosts. The default implementation provides load balancing based on active connections. It does not provide error handling or retrying. :param urls: base urls for requests :param kwargs: same options as `AsyncClient`_ """ Stats = Proxy.Stats priority = Proxy.priority choice = Proxy.choice def __init__(self, *urls: str, **kwargs): super().__init__('https://proxies', **kwargs) self.urls = {(url.rstrip('/') + '/'): self.Stats() for url in urls} @classmethod def clone(cls, other, path=''): urls = (urljoin(url, path) for url in other.urls) return cls(*urls, trailing=other.trailing, **other._attrs)
[docs] async def request(self, method, path, **kwargs): """Send request with relative or absolute path and return response.""" url = self.choice(method) with self.urls[url] as stats: response = await super().request(method, urljoin(url, path), **kwargs) stats.add(failures=int(response.status_code >= 500)) return response