Azure事件中心:接收作为单个消息发送到集线器的批处理事件?

7nbnzgx9  于 2023-05-01  发布在  其他
关注(0)|答案(2)|浏览(133)

我有以下模式,
Http触发函数App -〉Event Hub -〉Event Hub监听函数App
不幸的是,HTTP Trigger应用程序的发送方发送了许多单独的请求,这意味着发送到事件中心的执行也是每个消息的少量数据。
这导致侦听器在每个小消息上触发,在数据加载结束时创建许多许多文件。
我的侦听器已经配置为接收批处理事件,我在过去看到,当侦听器功能被禁用一段时间,然后重新启用时,数据将以大批量接收。
如何确保多个事件在到达侦听器后被批量处理,即使它们作为单独的请求/消息发送到事件中心?
有没有一些延迟设置为听众,也许?

r1zhe5dt

r1zhe5dt1#

没有足够的上下文来理解maxEventBatchSize是如何在host.json中配置的,但我假设它被设置为大于1的值。在这种情况下,听起来像是分区没有积压的事件,并且在它们到达时正在读取。由于没有足够的事件来保持预取队列为满,因此事件在流入时被分派。
我建议利用事件中心触发器的v5.3.0中添加的minEventBatchSize。这与maxWaitTime一起工作,要求触发器等待一段时间并构建一个批处理,而不是其首选最大化吞吐量的默认行为。
一个最小的例子看起来像这样:

{
    "version": "2.0",
    "extensions": {
        "eventHubs": {
            "maxEventBatchSize" : 100,
            "minEventBatchSize" : 25,
            "maxWaitTime" : "00:05:00",            
            "batchCheckpointFrequency" : 1,
            "prefetchCount" : 300,
        }
    }
}

主机中提供了更多详细信息。文档的json设置部分。

yhxst69z

yhxst69z2#

EventProcessorOptions对象的MaxBatchSize参数可用于批量接收事件。
要批量接收事件,可以使用EventProcessorOptions对象的MaxBatchSize属性。
参考文献:
感谢@ciaranodonnell

发送消息

EventHubProducerClient producer = new EventHubProducerClient(namespaceConnectionString, eventHubName);
var batch = await producer.CreateBatchAsync(new CreateBatchOptions { PartitionKey = "this is another string" });
for (int i = 0; i < 10; i++)
{
batch.TryAdd(new EventData($"This is event {i}"));
}
await producer.SendAsync(batch);
}

接收消息

EventProcessorOptions options = new EventProcessorOptions
            {
                MaxBatchSize = 100,
                PrefetchCount = 100,
                InvokeProcessorAfterReceiveTimeout = true
            };

            await processorHost.RegisterEventProcessorAsync<MyEventProcessor>(options);
  • 另一种可能的方法是消息内容包含特定项。在这两种方式接收批。对于多个事件,使用消费者组发送和接收消息进行批处理。
    输出:

参考文献:

相关问题