将Pandas Dataframe 行发布为PubSub消息

qni6mghb  于 2023-03-16  发布在  其他
关注(0)|答案(1)|浏览(131)

我的任务是将pandasdataframe的行发布为pubsub消息。最好的方法是什么?我的pandasdataframe可能包含大约一百万条记录。
这是我发现要发表的一个例子。

for n in range(1, 10):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data)
    print(future.result())

我可以循环遍历每一行,将行转换为字符串,编码并发布吗?

for row_dict in df.to_dict(orient="records"):
    data = str(row_dict).encode("utf-8")
    future = publisher.publish(topic_path, data)
    print(future.result())

或者我错过了一些更简单的东西?我希望pubsub消息是在一个dict格式与每一行的消息,如下图所示。

{'col1': '123','col2': 'abc'}
{'col1': '124','col2': 'def'}
{'col1': '125','col2': 'ghi'}
{'col1': '126','col2': 'jkl'}
aij0ehis

aij0ehis1#

要将Pandas行发布为pubSubMessage,可以考虑以下示例:

import pandas as pd
import json
from google.cloud import pubsub_v1

df = pd.DataFrame({'col1': [123, 124, 125], 'col2': ['abc', 'abcd', 'abce']})

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('demo_project', 'topic')

for row_index, row in df.iterrows():
    message = json.dumps(row.to_dict()).encode('utf-8')
    future = publisher.publish(topic_path, data=message)
    print(future.result())
print("Completed")

结果:

相关问题