Kafka 在aws MSK消费者端没有收到消息

oxiaedzo  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(108)

我能够通过使用下面的代码在python中发送数据,而不是在消费者端接收消息。我已经启用了aws msk集群中的所有身份验证。我能用Kafka命令发送和接收。我需要同样的东西通过一个Python。我已经尝试了未经授权的模式和SASL_SSL协议。都能发送但不能接收请找到下面的代码,并建议我解决这个问题。
生产商代码:

from confluent_kafka import Producer 
from datetime import datetime 
from time import strftime 
import json

bootstrap_servers = 'b-1.xxxxxxxxxxxxxxx.amazonaws.com:9096,b-2.xxxxxxxxxxxxxxxxxxxx.amazonaws.com:9096' producer = Producer({ 'bootstrap.servers': bootstrap_servers, 'security.protocol': 'SASL_SSL', 'sasl.username': 'cccccc', 'sasl.password': 'ccccccccccccccc', 'sasl.mechanism': 'SCRAM-SHA-512' })

data = { 'message': 'hello world', 'timestamp': datetime.now().strftime("%m/%d/%Y %H:%M:%S") } #print(producer.bootstrap_connected()) producer.produce('testTopic1', json.dumps(data).encode('utf-8')) print('message sent') producer.flush()

消费者代码:

from confluent_kafka import Consumer 
from datetime import datetime 
from time import strftime 
import json

bootstrap_servers = 'b-1.xxxxxxxxxxxxxxxxxx.amazonaws.com:9096,b-2.xxxxxxxxxxxxxxxxxxxx.amazonaws.com:9096' 
consumer = Consumer({ 'bootstrap.servers': bootstrap_servers, 'security.protocol': 'SASL_SSL', 'sasl.username': 'cccccccccc', 'sasl.password': 'cccccccccccccc', 'sasl.mechanism': 'SCRAM-SHA-512' })

print('start reading') 
consumer.subscribe(['testTopic1'])

while True: 
   msg = consumer.poll(timeout=1.0) 
     print(msg)
     if msg is None: 
        continue
ttcibm8c

ttcibm8c1#

消费者总是需要设置一个消费者组,因为这是分区的消费者偏移量存储的地方。
您需要将'group.id': 'myconsumergroup'添加到消费者配置中。
当您向主题添加更多消费者时,他们应该使用相同的group.id,因为这是Kafka在同一组内跨消费者负载平衡流量的方式。
建议您也设置'auto.offset.reset': 'earliest'或latest来确定默认的消费者行为(第一次运行时);这决定了你的消费者应该从开头还是从最后一条消息开始。

相关问题