我能够通过使用下面的代码在python中发送数据,而不是在消费者端接收消息。我已经启用了aws msk集群中的所有身份验证。我能用Kafka命令发送和接收。我需要同样的东西通过一个Python。我已经尝试了未经授权的模式和SASL_SSL协议。都能发送但不能接收请找到下面的代码,并建议我解决这个问题。
生产商代码:
from confluent_kafka import Producer
from datetime import datetime
from time import strftime
import json
bootstrap_servers = 'b-1.xxxxxxxxxxxxxxx.amazonaws.com:9096,b-2.xxxxxxxxxxxxxxxxxxxx.amazonaws.com:9096' producer = Producer({ 'bootstrap.servers': bootstrap_servers, 'security.protocol': 'SASL_SSL', 'sasl.username': 'cccccc', 'sasl.password': 'ccccccccccccccc', 'sasl.mechanism': 'SCRAM-SHA-512' })
data = { 'message': 'hello world', 'timestamp': datetime.now().strftime("%m/%d/%Y %H:%M:%S") } #print(producer.bootstrap_connected()) producer.produce('testTopic1', json.dumps(data).encode('utf-8')) print('message sent') producer.flush()
消费者代码:
from confluent_kafka import Consumer
from datetime import datetime
from time import strftime
import json
bootstrap_servers = 'b-1.xxxxxxxxxxxxxxxxxx.amazonaws.com:9096,b-2.xxxxxxxxxxxxxxxxxxxx.amazonaws.com:9096'
consumer = Consumer({ 'bootstrap.servers': bootstrap_servers, 'security.protocol': 'SASL_SSL', 'sasl.username': 'cccccccccc', 'sasl.password': 'cccccccccccccc', 'sasl.mechanism': 'SCRAM-SHA-512' })
print('start reading')
consumer.subscribe(['testTopic1'])
while True:
msg = consumer.poll(timeout=1.0)
print(msg)
if msg is None:
continue
1条答案
按热度按时间ttcibm8c1#
消费者总是需要设置一个消费者组,因为这是分区的消费者偏移量存储的地方。
您需要将
'group.id': 'myconsumergroup'
添加到消费者配置中。当您向主题添加更多消费者时,他们应该使用相同的group.id,因为这是Kafka在同一组内跨消费者负载平衡流量的方式。
建议您也设置
'auto.offset.reset': 'earliest'
或latest来确定默认的消费者行为(第一次运行时);这决定了你的消费者应该从开头还是从最后一条消息开始。