from kafka import KafkaConsumer
consumer = KafkaConsumer(
'someTopicName',
bootstrap_servers=['192.168.1.160:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
print("We have a consumer instantiated")
print(consumer)
messageCache = []
for message in consumer:
messageCache.append(message.value)
在本例中,我的kafka代理位于我的专用局域网上,使用默认端口,因此我的引导服务器列表仅为[“192.168.1.160:9092”]。 您可以使用标准计数器和if语句将列表保存到文件或其他任何内容,因为kafka流被假定为永远运行。例如,我有一个进程,它使用kafka消息,并将它们作为每1000000条消息的一个Dataframe保存在parquet to hdfs中。在本例中,我想保存历史消息以开发一个ml模型。Kafka最大的优点是我可以编写另一个进程,对每一条消息进行实时评估并做出潜在的响应。
1条答案
按热度按时间rfbsl7qr1#
下面是在python中将来自kafka的消息放入列表的基本模式。
在本例中,我的kafka代理位于我的专用局域网上,使用默认端口,因此我的引导服务器列表仅为[“192.168.1.160:9092”]。
您可以使用标准计数器和if语句将列表保存到文件或其他任何内容,因为kafka流被假定为永远运行。例如,我有一个进程,它使用kafka消息,并将它们作为每1000000条消息的一个Dataframe保存在parquet to hdfs中。在本例中,我想保存历史消息以开发一个ml模型。Kafka最大的优点是我可以编写另一个进程,对每一条消息进行实时评估并做出潜在的响应。