Source code for obelisk.asynchronous.core

"""
This module contains the asynchronous API to interface with Obelisk CORE.
These methods all return a :class:`Awaitable`.

Relevant entrance points are :class:`Client`.

This API vaguely resembles that of clients to previous Obelisk versions,
but also significantly diverts from it where the underlying Obelisk CORE API does so.
"""
from obelisk.asynchronous.base import BaseClient
from obelisk.exceptions import ObeliskError
from obelisk.types.core import FieldName, Filter

from datetime import datetime, timedelta
import httpx
import json
from pydantic import BaseModel, AwareDatetime, ConfigDict, Field, ValidationError, model_validator
from typing import Annotated, AsyncIterator, Dict, Iterator, List, Literal, Optional, Any, cast, get_args
from typing_extensions import Self
from numbers import Number


DataType = Literal['number', 'number[]', 'json', 'bool', 'string']
"""The possible types of data Obelisk can accept"""


[docs] def type_suffix(metric: str) -> DataType: """ Extracts the :any:`DataType` from a string metric, useful for the dataType field in queries. Throws a :py:exc:`ValueError` if the provided string does not appear to be a typed metric, or the found type suffix is not a known one. """ split = metric.split('::') if len(split) != 2: raise ValueError("Incorrect amount of type qualifiers") suffix = split[1] if suffix not in get_args(DataType): raise ValueError(f"Invalid type suffix, should be one of {', '.join(get_args(DataType))}") return cast(DataType, suffix)
Aggregator = Literal['last', 'min', 'mean', 'max', 'count', 'stddev'] """Type of aggregation Obelisk can process""" Datapoint = Dict[str, Any] """Datapoints resulting from queries are modeled as simple dicts, as fields can come and go depending on query."""
[docs] class ObeliskPosition(BaseModel): """ Format for coordinates as expected by Obelisk. """ lat: float """Latitude""" lng: float """Longitude""" elevation: float
[docs] class IncomingDatapoint(BaseModel): """ A datapoint to be submitted to Obelisk. These are validated quite extensively, but not fully. .. automethod:: check_metric_type(self) """ timestamp: Optional[AwareDatetime] = None metric: str value: Any labels: Optional[Dict[str, str]] = None location: Optional[ObeliskPosition] = None
[docs] @model_validator(mode='after') def check_metric_type(self) -> Self: suffix = type_suffix(self.metric) if suffix == 'number' and not isinstance(self.value, Number): raise ValueError(f"Type suffix mismatch, expected number, got {type(self.value)}") if suffix == 'number[]': if type(self.value) is not list or any([not isinstance(x, Number) for x in self.value]): raise ValueError("Type suffix mismatch, expected value of number[]") # Do not check json, most things should be serialisable if suffix == 'bool' and type(self.value) is not bool: raise ValueError(f"Type suffix mismatch, expected bool, got {type(self.value)}") if suffix == 'string' and type(self.value) is not str: raise ValueError(f"Type suffix mismatch, expected bool, got {type(self.value)}") return self
[docs] class QueryParams(BaseModel): dataset: str groupBy: Optional[List[FieldName]] = None aggregator: Optional[Aggregator] = None fields: Optional[List[FieldName]] = None orderBy: Optional[List[str]] = None # More complex than just FieldName, can be prefixed with - to invert sort dataType: Optional[DataType] = None filter_: Annotated[Optional[str|Filter], Field(serialization_alias='filter')] = None """ Obelisk CORE handles filtering in `RSQL format <https://obelisk.pages.ilabt.imec.be/obelisk-core/query.html#rsql-format>`__ , to make it easier to also programatically write these filters, we provide the :class:`Filter` option as well. Suffix to avoid collisions. """ cursor: Optional[str] = None limit: int = 1000 model_config = ConfigDict(arbitrary_types_allowed=True)
[docs] @model_validator(mode='after') def check_datatype_needed(self) -> Self: if self.fields is None or 'value' in self.fields: if self.dataType is None: raise ValueError("Value field requested, must specify datatype") return self
[docs] def to_dict(self) -> Dict: return self.model_dump(exclude_none=True, by_alias=True)
[docs] class ChunkedParams(BaseModel): dataset: str groupBy: Optional[List[FieldName]] = None aggregator: Optional[Aggregator] = None fields: Optional[List[FieldName]] = None orderBy: Optional[List[str]] = None # More complex than just FieldName, can be prefixed with - to invert sort dataType: Optional[DataType] = None filter_: Optional[str | Filter] = None """Underscore suffix to avoid name collisions""" start: datetime end: datetime jump: timedelta = timedelta(hours=1) model_config = ConfigDict(arbitrary_types_allowed=True)
[docs] @model_validator(mode='after') def check_datatype_needed(self) -> Self: if self.fields is None or 'value' in self.fields: if self.dataType is None: raise ValueError("Value field requested, must specify datatype") return self
[docs] def chunks(self) -> Iterator[QueryParams]: current_start = self.start while current_start < self.end: current_end = current_start + self.jump filter_=f'timestamp>={current_start.isoformat()};timestamp<{current_end.isoformat()}' if self.filter_: filter_ += f';{self.filter_}' yield QueryParams( dataset=self.dataset, groupBy=self.groupBy, aggregator=self.aggregator, fields=self.fields, orderBy=self.orderBy, dataType=self.dataType, filter_=filter_ ) current_start += self.jump
[docs] class QueryResult(BaseModel): cursor: Optional[str] = None items: List[Datapoint]
[docs] class Client(BaseClient): page_limit: int = 250 """How many datapoints to request per page in a cursored fetch"""
[docs] async def send( self, dataset: str, data: List[IncomingDatapoint], ) -> httpx.Response: """ Publishes data to Obelisk Parameters ---------- dataset : str ID for the dataset to publish to data : List[IncomingDatapoint] List of Obelisk-acceptable datapoints. Exact format varies between Classic or HFS, caller is responsible for formatting. Raises ------ ObeliskError When the resulting status code is not 204, an :exc:`~obelisk.exceptions.ObeliskError` is raised. """ response = await self.http_post( f"{self.kind.root_url}/{dataset}/data/ingest", data=[x.model_dump(mode='json') for x in data] ) if response.status_code != 204: msg = f"An error occured during data ingest. Status {response.status_code}, message: {response.text}" self.log.warning(msg) raise ObeliskError(msg) return response
[docs] async def fetch_single_chunk( self, params: QueryParams ) -> QueryResult: response = await self.http_get( f"{self.kind.root_url}/{params.dataset}/data/query", params=params.to_dict() ) if response.status_code != 200: self.log.warning(f"Unexpected status code: {response.status_code}") raise ObeliskError(response.status_code, response.reason_phrase) try: js = response.json() return QueryResult.model_validate(js) except json.JSONDecodeError as e: msg = f"Obelisk response is not a JSON object: {e}" self.log.warning(msg) raise ObeliskError(msg) except ValidationError as e: msg = f"Response cannot be validated: {e}" self.log.warning(msg) raise ObeliskError(msg)
[docs] async def query( self, params: QueryParams ) -> List[Datapoint]: params.cursor = None result_set: List[Datapoint] = [] result_limit = params.limit # Obelisk CORE does not actually stop emitting a cursor when done, limit serves as page limit params.limit = self.page_limit while True: result: QueryResult = await self.fetch_single_chunk( params ) result_set.extend(result.items) params.cursor = result.cursor if len(result_set) >= result_limit: break return result_set
[docs] async def query_time_chunked( self, params: ChunkedParams ) -> AsyncIterator[List[Datapoint]]: for chunk in params.chunks(): yield await self.query( chunk )