Source code for obelisk.asynchronous.client
from datetime import datetime, timedelta
import logging
import base64
from typing import Any, Optional
import httpx
from obelisk.exceptions import AuthenticationError
from obelisk.strategies.retry import RetryStrategy, \
NoRetryStrategy
from obelisk.types import ObeliskKind
[docs]
class Client:
"""
Base class handling Obelisk auth and doing the core HTTP communication.
Only exists in asynchronous variety, as it is not usually directly useful for user code.
"""
_client: str = ""
_secret: str = ""
token: Optional[str] = None
"""Current authentication token"""
token_expires: Optional[datetime] = None
"""Deadline after which token is no longer useable"""
grace_period: timedelta = timedelta(seconds=10)
"""Controls how much before the expiration deadline a token will be refreshed."""
retry_strategy: RetryStrategy
kind: ObeliskKind
log: logging.Logger
_token_url = 'https://obelisk.ilabt.imec.be/api/v3/auth/token'
_root_url = 'https://obelisk.ilabt.imec.be/api/v3'
_metadata_url = 'https://obelisk.ilabt.imec.be/api/v3/catalog/graphql'
_events_url = 'https://obelisk.ilabt.imec.be/api/v3/data/query/events'
_ingest_url = 'https://obelisk.ilabt.imec.be/api/v3/data/ingest'
_streams_url = 'https://obelisk.ilabt.imec.be/api/v3/data/streams'
[docs]
def __init__(self, client: str, secret: str,
retry_strategy: RetryStrategy = NoRetryStrategy(),
kind: ObeliskKind = ObeliskKind.CLASSIC) -> None:
self._client = client
self._secret = secret
self.retry_strategy = retry_strategy
self.kind = kind
self.log = logging.getLogger('obelisk')
if self.kind == ObeliskKind.HFS:
self._token_url = 'https://obelisk-hfs.discover.ilabt.imec.be/auth/realms/obelisk-hfs/protocol/openid-connect/token'
self._root_url = 'https://obelisk-hfs.discover.ilabt.imec.be'
self._events_url = 'https://obelisk-hfs.discover.ilabt.imec.be/data/query/events'
self._ingest_url = 'https://obelisk-hfs.discover.ilabt.imec.be/data/ingest'
else:
self._token_url = 'https://obelisk.ilabt.imec.be/api/v3/auth/token'
self._root_url = 'https://obelisk.ilabt.imec.be/api/v3'
self._metadata_url = 'https://obelisk.ilabt.imec.be/api/v3/catalog/graphql'
self._events_url = 'https://obelisk.ilabt.imec.be/api/v3/data/query/events'
self._ingest_url = 'https://obelisk.ilabt.imec.be/api/v3/data/ingest'
self._streams_url = 'https://obelisk.ilabt.imec.be/api/v3/data/streams'
async def _get_token(self):
auth_string = str(base64.b64encode(
f'{self._client}:{self._secret}'.encode('utf-8')), 'utf-8')
headers = {
'Authorization': f'Basic {auth_string}',
'Content-Type': ('application/x-www-form-urlencoded'
if self.kind == ObeliskKind.HFS else 'application/json')
}
payload = {
'grant_type': 'client_credentials'
}
async with httpx.AsyncClient() as client:
response = None
last_error = None
retry = self.retry_strategy.make()
while not response or await retry.should_retry():
try:
request = await client.post(
self._token_url,
json=payload if self.kind == ObeliskKind.CLASSIC else None,
data=payload if self.kind == ObeliskKind.HFS else None,
headers=headers)
response = request.json()
except Exception as e:
last_error = e
self.log.error(e)
continue
if response is None and last_error is not None:
raise last_error
if request.status_code != 200:
if 'error' in response:
self.log.warning(f"Could not authenticate, {response['error']}")
raise AuthenticationError
self.token = response['access_token']
self.token_expires = (datetime.now()
+ timedelta(seconds=response['expires_in']))
async def _verify_token(self):
if (self.token is None
or self.token_expires < (datetime.now() - self.grace_period)):
retry = self.retry_strategy.make()
first = True
while first or await retry.should_retry():
first = False
try:
await self._get_token()
return
except:
continue
[docs]
async def http_post(self, url: str, data: Any = None,
params: Optional[dict] = None) -> httpx.Response:
"""
Send an HTTP POST request to Obelisk,
with proper auth.
Possibly refreshes the authentication token and performs backoff as per `retry_strategy`.
This method is not of stable latency because of these properties.
No validation is performed on the input data,
callers are responsible for formatting it in a method Obelisk understands.
"""
await self._verify_token()
headers = {
'Authorization': f'Bearer {self.token}',
'Content-Type': 'application/json'
}
if params is None:
params = {}
async with httpx.AsyncClient() as client:
response = None
retry = self.retry_strategy.make()
last_error = None
while not response or await retry.should_retry():
if response is not None:
self.log.debug(f"Retrying, last response: {response.status_code}")
try:
response = await client.post(url,
json=data,
params={k: v for k, v in params.items() if
v is not None},
headers=headers)
if response.status_code // 100 == 2:
return response
except Exception as e:
self.log.error(e)
last_error = e
continue
if not response and last_error:
raise last_error
return response