obelisk.asynchronous.consumer.Consumer

class obelisk.asynchronous.consumer.Consumer(client: str, secret: str, retry_strategy: ~obelisk.strategies.retry.RetryStrategy = <obelisk.strategies.retry.NoRetryStrategy object>, kind: ~obelisk.types.ObeliskKind = ObeliskKind.CLASSIC)[source]

Bases: Client

Component that contains all the logic to consume data from the Obelisk API (e.g. historical data, sse).

Obelisk API Documentation: https://obelisk.docs.apiary.io/

Attributes:
token
token_expires

Methods

http_post(url[, data, params])

Send an HTTP POST request to Obelisk, with proper auth.

query(datasets[, metrics, fields, ...])

Queries data from obelisk, automatically iterating when a cursor is returned.

query_time_chunked(datasets, metrics, ...[, ...])

Fetches all data matching the provided filters, yielding one chunk at a time.

single_chunk(datasets[, metrics, fields, ...])

Queries one chunk of events from Obelisk for given parameters, does not handle paging over Cursors.

__init__(client: str, secret: str, retry_strategy: ~obelisk.strategies.retry.RetryStrategy = <obelisk.strategies.retry.NoRetryStrategy object>, kind: ~obelisk.types.ObeliskKind = ObeliskKind.CLASSIC) None

Methods

__init__(client, secret[, retry_strategy, kind])

http_post(url[, data, params])

Send an HTTP POST request to Obelisk, with proper auth.

query(datasets[, metrics, fields, ...])

Queries data from obelisk, automatically iterating when a cursor is returned.

query_time_chunked(datasets, metrics, ...[, ...])

Fetches all data matching the provided filters, yielding one chunk at a time.

single_chunk(datasets[, metrics, fields, ...])

Queries one chunk of events from Obelisk for given parameters, does not handle paging over Cursors.

Attributes

grace_period

Controls how much before the expiration deadline a token will be refreshed.

token

Current authentication token

token_expires

Deadline after which token is no longer useable

retry_strategy

kind

log

grace_period: timedelta = datetime.timedelta(seconds=10)

Controls how much before the expiration deadline a token will be refreshed.

async http_post(url: str, data: Any = None, params: dict | None = None) Response

Send an HTTP POST request to Obelisk, with proper auth.

Possibly refreshes the authentication token and performs backoff as per retry_strategy. This method is not of stable latency because of these properties.

No validation is performed on the input data, callers are responsible for formatting it in a method Obelisk understands.

async query(datasets: List[str], metrics: List[str] | None = None, fields: List[str] | None = None, from_timestamp: int | None = None, to_timestamp: int | None = None, order_by: dict | None = None, filter_: dict | None = None, limit: int | None = None, limit_by: dict | None = None) List[Datapoint][source]

Queries data from obelisk, automatically iterating when a cursor is returned.

Parameters:
datasetsList[str]

List of Dataset IDs.

metricsOptional[List[str]] = None

List of Metric IDs or wildcards (e.g. *::number), defaults to all metrics.

fieldsOptional[List[str]] = None

List of fields to return in the result set. Defaults to [metric, source, value]

from_timestampOptional[int] = None

Limit output to events after (and including) this UTC millisecond timestamp, if present.

to_timestampOptional[int] = None

Limit output to events before (and excluding) this UTC millisecond timestamp, if present.

order_byOptional[dict] = None

Specifies the ordering of the output, defaults to ascending by timestamp. See Obelisk docs for format. Caller is responsible for validity.

filter_Optional[dict] = None

Limit output to events matching the specified Filter expression. See Obelisk docs, caller is responsible for validity.

limitOptional[int] = None

Limit output to a maximum number of events. Also determines the page size. Default is server-determined, usually 2500.

limit_byOptional[dict] = None

Limit the combination of a specific set of Index fields to a specified maximum number.

async query_time_chunked(datasets: List[str], metrics: List[str], from_time: datetime, to_time: datetime, jump: timedelta, filter_: dict | None = None, direction: Literal['asc', 'desc'] = 'asc') Generator[List[Datapoint], None, None][source]

Fetches all data matching the provided filters, yielding one chunk at a time. One “chunk” may require several Obelisk calls to resolve cursors. By necessity iterates over time, no other ordering is supported.

Parameters:
datasetsList[str]

Dataset IDs to query from

metricsList[str]

IDs of metrics to query

from_timedatetime.datetime

Start time to fetch from

to_timedatetime.datetime

End time to fetch until.

jumpdatetime.timedelta

Size of one yielded chunk

filter_Optional[dict] = None

Obelisk filter, caller is responsible for correct format

directionLiteral[‘asc’, ‘desc’] = ‘asc’

Yield older data or newer data first, defaults to older first.

async single_chunk(datasets: List[str], metrics: List[str] | None = None, fields: List[str] | None = None, from_timestamp: int | None = None, to_timestamp: int | None = None, order_by: dict | None = None, filter_: dict | None = None, limit: int | None = None, limit_by: dict | None = None, cursor: str | None = None) QueryResult[source]

Queries one chunk of events from Obelisk for given parameters, does not handle paging over Cursors.

Parameters:
datasetsList[str]

List of Dataset IDs.

metricsOptional[List[str]] = None

List of Metric IDs or wildcards (e.g. *::number), defaults to all metrics.

fieldsOptional[List[str]] = None

List of fields to return in the result set. Defaults to [metric, source, value]

from_timestampOptional[int] = None

Limit output to events after (and including) this UTC millisecond timestamp, if present.

to_timestampOptional[int] = None

Limit output to events before (and excluding) this UTC millisecond timestamp, if present.

order_byOptional[dict] = None

Specifies the ordering of the output, defaults to ascending by timestamp. See Obelisk docs for format. Caller is responsible for validity.

filter_Optional[dict] = None

Limit output to events matching the specified Filter expression. See Obelisk docs, caller is responsible for validity.

limitOptional[int] = None

Limit output to a maximum number of events. Also determines the page size. Default is server-determined, usually 2500.

limit_byOptional[dict] = None

Limit the combination of a specific set of Index fields to a specified maximum number.

cursorOptional[str] = None

Specifies the next cursor, used when paging through large result sets.

token: str | None = None

Current authentication token

token_expires: datetime | None = None

Deadline after which token is no longer useable