如何使用来自ApacheKafka的数据

klsxnrf1  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(487)

参与chalenge时,它会这样说:您的第一步-使用来自apachekafka的数据样本。所以他们给了我主题名,api\u密钥和api\u秘密。哦,还有引导服务器。然后他们声称好像你不熟悉Kafka,有全面的文件提供的合流。那么好吧,登录到confluent,创建一个集群,然后。。消费数据的下一步是什么?

rfbsl7qr

rfbsl7qr1#

下面是在python中将来自kafka的消息放入列表的基本模式。

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'someTopicName',
     bootstrap_servers=['192.168.1.160:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: loads(x.decode('utf-8')))
print("We have a consumer instantiated")
print(consumer)

messageCache = []

for message in consumer:
    messageCache.append(message.value)

在本例中,我的kafka代理位于我的专用局域网上,使用默认端口,因此我的引导服务器列表仅为[“192.168.1.160:9092”]。
您可以使用标准计数器和if语句将列表保存到文件或其他任何内容,因为kafka流被假定为永远运行。例如,我有一个进程,它使用kafka消息,并将它们作为每1000000条消息的一个Dataframe保存在parquet to hdfs中。在本例中,我想保存历史消息以开发一个ml模型。Kafka最大的优点是我可以编写另一个进程,对每一条消息进行实时评估并做出潜在的响应。

相关问题