使用python将json消息导入even hub

oprakyz7  于 2023-08-08  发布在  Python
关注(0)|答案(1)|浏览(78)

我正在尝试将json消息摄取到AZURE Event Hub中我的问题是json消息的大小,因为Event Hub的限制为1 MB我有一个包含多个json消息的大json消息

DATA = [{"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}]

字符串
此数据是一个示例。DATA已采用json格式,但DATA包含10000多个具有相同格式json事件我希望将此json消息接收到事件中心
有人能帮我一下吗?我该如何将这一条重要消息消化到事件中心中?通过切片或其他方式
多谢了!
允植
我试着将其切片,但一条json消息中的事件数量总是不同,而且非常大...

91zkwejq

91zkwejq1#

最有效的方法是将庞大的消息列表拆分为多个批次,并将批次逐个发送到Eventhub。批处理的大小取决于每个单独消息的大小,请记住,在单个批处理中只能发送1 MB。假设单个消息的平均大小为100字节,则每批消息大约有10 K条。您可以将其减少到5000-8000以确保安全。
下面是一段代码,它将原始消息JSON数组(DATA)分解为单个的JSON数组批次,并将它们一次发送给eventhub。你可以将batch_limit调整为5000- 8000之间的任意值。因为你说过数组中的消息数量可以是10 K+,所以你只需要2-3批发送这个消息。

import time
import asyncio
import os
import json

from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub.exceptions import EventHubError
from azure.eventhub import EventData

CONNECTION_STR = 'Endpoint=sb://xxxxxxxxxxxx='
EVENTHUB_NAME = 'xxxxxxxxxxxxxx'

DATA = [{"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}, {"Id": "393092", "UID": "7f0034ee", "date": "2023-01-06", "f_id": "430", "origin": "CN"}]
batch_limit = 2
async def run():
    print('started')
    producer = EventHubProducerClient.from_connection_string(
        conn_str=CONNECTION_STR,
        eventhub_name=EVENTHUB_NAME
    )
    batch_cnt = 0
    msg_array = []
    for DATA_msg in DATA:
      msg_array.append(DATA_msg)
      batch_cnt += 1
      if batch_cnt > batch_limit:
        async with producer:
            event_data_batch = await producer.create_batch()
            for msg in msg_array:
                event_data_batch.add(EventData(json.dumps(msg)))
            await producer.send_batch(event_data_batch)
            print('sent a batch of messages')
        batch_cnt = 0
        msg_array = []
    if batch_cnt > 0:
       async with producer:
            event_data_batch = await producer.create_batch()
            for msg in msg_array:
                event_data_batch.add(EventData(json.dumps(msg)))
            await producer.send_batch(event_data_batch)
            print('sent remaining messages as last batch')
      
start_time = time.time()
asyncio.run(run())
print("Send messages in {} seconds.".format(time.time() - start_time))

字符串

相关问题