在databricks上的事件中心中设置startingposition

zfycwa2u  于 2021-05-26  发布在  Spark
关注(0)|答案(1)|浏览(342)

我正在尝试使用pyspark从eventhub读取事件流。我无法将起始位置设置为流的开头。在scala中这一点很清楚,但对于python,我不断得到:

org.json4s.package$MappingException: No usable value for offset.

这是我的配置。

conf = {
  "eventhubs.connectionString":
      "Endpoint=sb://XXXX;SharedAccessKeyName=XXX;SharedAccessKey=XXXX;EntityPath=XXXX",
  "eventhubs.consumerGroup": "$Default",
  "eventhubs.startingPosition": "-1"
}
fcipmucu

fcipmucu1#

在斯卡拉

val cs = "YOUR.CONNECTION.STRING"
val ehConf = EventHubsConf(cs)
  .setStartingPosition(EventPosition.fromEndOfStream)

参考:scala中的事件中心配置
通过pyspark在python中

ehConf = {'eventhubs.connectionString' : connectionString}

startTime = "2020-04-07T01:05:05.662231Z"
endTime = "2020-04-07T01:15:05.662185Z"

startingEventPosition = {
"offset": None,
"seqNo": -1, #not in use
"enqueuedTime": startTime,
"isInclusive": True
}

endingEventPosition = {
"offset": None, #not in use
"seqNo": -1, #not in use
"enqueuedTime": endTime,
"isInclusive": True
}

# Put the positions into the Event Hub config dictionary

ehConf["eventhubs.startingPosition"] = json.dumps(startingEventPosition)
ehConf["eventhubs.endingPosition"] = json.dumps(endingEventPosition)

df = spark.read.format("eventhubs").options(**ehConf).load()

通过sdk在python中
异步使用事件中心中的事件

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint(event)

async def receive():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive(on_event=on_event, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive())

以异步方式批量使用事件中心中的事件

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint()

async def receive_batch():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive_batch(
            on_event_batch=on_event_batch,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive_batch())

使用检查点存储使用事件和保存检查点。

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
    # do something
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,  # For load balancing and checkpoint. Leave None for no load balancing
    )
    async with client:
        await receive(client)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

参考:python中的事件中心配置

相关问题