Source code for obelisk.asynchronous.producer

from typing import List

import httpx

from obelisk.asynchronous.client import Client
from obelisk.exceptions import ObeliskError
from obelisk.types import IngestMode, TimestampPrecision


[docs] class Producer(Client): """ Allows publishing of data to Obelisk. """
[docs] async def send(self, dataset: str, data: List[dict], precision: TimestampPrecision = TimestampPrecision.MILLISECONDS, mode: IngestMode = IngestMode.DEFAULT) -> httpx.Response: """ Publishes data to Obelisk Parameters ---------- dataset : str ID for the dataset to publish to data : List[dict] List of Obelisk-acceptable datapoints. Exact format varies between Classic or HFS, caller is responsible for formatting. precision : :class:`~obelisk.types.TimestampPrecision` = TimestampPrecision.MILLISECONDS Precision used in the numeric timestamps contained in data. Ensure it matches to avoid weird errors. mode : :class:`~obelisk.types.IngestMode` = IngestMode.DEFAULT See docs for :class:`~obelisk.types.IngestMode`. Raises ------ ObeliskError When the resulting status code is not 204, an empty :exc:`~obelisk.exceptions.ObeliskError` is raised. """ params = { 'datasetId': dataset, 'timestampPrecision': precision, 'mode': mode.value } response = await self.http_post(f'{self._ingest_url}/{dataset}', data=data, params=params) if response.status_code != 204: self.log.warning('An error occurred during data ingestion') self.log.warning('[%d]: %s', response.status_code, response.text) raise ObeliskError return response