Source code for obelisk.sync.producer
import asyncio
from typing import List
import httpx
from obelisk.asynchronous.producer import \
Producer as AsyncProducer
from obelisk.strategies.retry import RetryStrategy, \
NoRetryStrategy
from obelisk.types import IngestMode, TimestampPrecision, \
ObeliskKind
[docs]
class Producer:
"""
Synchronous equivalient of :class:`~obelisk.asynchronous.producer.Producer`,
to publish data to Obelisk.
"""
loop: asyncio.AbstractEventLoop
async_producer: AsyncProducer
[docs]
def __init__(self, client: str, secret: str,
retry_strategy: RetryStrategy = NoRetryStrategy(),
kind: ObeliskKind = ObeliskKind.CLASSIC):
self.async_producer = AsyncProducer(client, secret, retry_strategy, kind)
self.loop = asyncio.get_event_loop()
[docs]
def send(self, dataset: str, data: List[dict],
precision: TimestampPrecision = TimestampPrecision.MILLISECONDS,
mode: IngestMode = IngestMode.DEFAULT) -> httpx.Response:
"""
Publishes data to Obelisk
Parameters
----------
dataset : str
ID for the dataset to publish to
data : List[dict]
List of Obelisk-acceptable datapoints.
Exact format varies between Classic or HFS,
caller is responsible for formatting.
precision : TimestampPrecision = TimestampPrecision.MILLISECONDS
Precision used in the numeric timestamps contained in data.
Ensure it matches to avoid weird errors.
mode : IngestMode = IngestMode.DEFAULT
See docs for :class:`~obelisk.types.IngestMode`.
Raises
------
ObeliskError
When the resulting status code is not 204, an empty :exc:`~obelisk.exceptions.ObeliskError` is raised.
"""
task = self.loop.create_task(
self.async_producer.send(dataset, data, precision, mode))
return self.loop.run_until_complete(task)