Source code for obelisk.asynchronous.consumer_test

import pytest
from .consumer import Consumer

pytest_plugins = ('pytest_asyncio',)

client_id = "682c6c46604b3b3be35429df"
client_secret = "7136832d-01be-456a-a1fe-25c7f9e130c5"

[docs] @pytest.mark.asyncio async def test_demo_igent(): consumer = Consumer(client=client_id, secret=client_secret) result = await consumer.single_chunk( datasets=["612f6c39cbceda0ea9753d95"], metrics=["org.dyamand.types.common.Temperature::number"], from_timestamp=1740924034000, to_timestamp=1741100614258, limit=2 ) assert len(result.items) == 2