将事件从Azure Event Hub传输到Azure Service Bus

3pmvbmvn  于 2023-10-22  发布在  其他
关注(0)|答案(3)|浏览(137)

我正在听各种事件的事件中心。
1.每一个事件都是高价值的,不能错过。
1.事件根据设备ID进行分区。
1.来自一个设备ID的事件是稀疏的,并且不是非常频繁(每几天几个事件)。它只发生在响应一个用户的行动是罕见的。
1.设备的数量是巨大的,所以我会有很多事件的各种设备ID。
对于每个事件,我需要对不是超级可靠的系统进行3-4次API调用。由于其中一些是跨地域呼叫,可能需要一些时间。
我计划把事件从事件中心,并把他们到服务总线。我的理由如下。
1.事件中心只能扩展到32个分区,如果一个事件需要时间,整个分区都会被阻塞。
1.另一方面,服务总线更具有水平可扩展性。如果吞吐量下降,我可以向服务总线添加更多的订阅者。
我一直在寻找这样的模式,但我还没有看到我们从基于日志的消息传递系统中获取数据并将其推送到基于队列的系统中的模式。
是否有更好的方法来处理这种情况?

u2nhd7ah

u2nhd7ah1#

我认为你可以使用事件集线器触发器和服务总线输出绑定来实现你想要的。
例如,我想监视Event hub 'test',我正在使用C#库:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;

namespace FunctionApp68
{
    public static class Function1
    {
        [FunctionName("Function1")]
        [return: ServiceBus("test1", Connection = "ServiceBusConnection")]
        public static string Run([EventHubTrigger("test", Connection = "str")] EventData[] events, ILogger log)
        {
            var exceptions = new List<Exception>();
            string messageBodyt = "";
            foreach (EventData eventData in events)
            {
                try
                {
                    string messageBody = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
                    messageBodyt = messageBodyt + messageBody;
                    // Replace these two lines with your processing logic.
                    log.LogInformation($"C# Event Hub trigger function processed a message: {messageBody}");
                    //await Task.Yield();
                }
                catch (Exception e)
                {
                    // We need to keep processing the rest of the batch - capture this exception and continue.
                    // Also, consider capturing details of the message that failed processing so it can be processed again later.
                    exceptions.Add(e);
                }
            }

            // Once processing of the batch is complete, if any messages in the batch failed processing throw an exception so that there is a record of the failure.

            if (exceptions.Count > 1)
                throw new AggregateException(exceptions);

            if (exceptions.Count == 1)
                throw exceptions.Single();
            return messageBodyt;
        }
    }
}

上述代码将从事件集线器“test”收集并保存到服务总线队列“test1”。
看看这些文件:
https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-event-hubs-trigger?tabs=csharp
https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus-output?tabs=csharp#example

3mpgtkmj

3mpgtkmj2#

您实际上需要的是每个设备ID的私有队列。一旦事件到达事件中心,就从事件中心拉取事件并将其放入设备ID的私有队列,然后串行处理它。
如何按设备ID构建队列:
1.构建队列的简单方法是使用SQL数据库(如果每秒请求不是非常非常高,它通常会工作,对于sql-db 100 req/秒是正常的。
1.另一种水平可伸缩的方式是使用azureappendblob(如果你的事件处理器是无状态的)。
1.您还可以使用高级方法,例如使用Azure Service Fabric Reliable Queue。

oalqel3c

oalqel3c3#

我有点晚了,但还有一个很好的答案是从这个线程失踪。考虑Azure Stream Analytics,它允许您将来自事件中心的事件流作为许多可能的输入之一,您甚至可以混合和匹配在一起,在作为输出放入服务总线主题或存储Blob之前执行您可能需要的任何过滤或转换。
很快的

相关问题