Source code for obelisk.asynchronous.consumer_test

import pytest
from .consumer import Consumer

pytest_plugins = ('pytest_asyncio',)

[docs] @pytest.mark.asyncio async def test_demo_igent(): consumer = Consumer(client="67c716e616c11421cfe2faf6", secret="08dafe89-0389-45b4-9832-cc565fb8c2eb") result = await consumer.single_chunk( datasets=["612f6c39cbceda0ea9753d95"], metrics=["org.dyamand.types.common.Temperature::number"], from_timestamp=1740924034000, to_timestamp=1741100614258, limit=2 ) assert len(result.items) == 2