我在观察一个奇怪的问题。我在Java8中构建了一个lambda函数;它使用kafka consumer api轮询msk主题,即consumer.poll(5000)(我尝试过各种超时)。还有第二个lambda函数,它是生产者并向同一主题发送消息。这两个功能都连接到msk所在的vpc。
制片人工作得很好。我可以从ec2看到运行kafka console consumer的消息b。但是消费者lambda不起作用,它只是给出了超时。
只有当我同时运行lambda producer、ec2上的kafka console consumer和lambda consumer时,consumer才会收到一些消息!!准确地说,生产者在一个循环中发送5条消息,ec2控制台消费者显示所有5条消息,但是lambda消费者显示第3条或第4条消息。
为什么会这样?这里可能存在什么问题,如何在lambda consumer中获得一致的消息?
如果有人有一个工作的代码样本,我将非常感谢。
谢谢您。
进一步更新:我已经安排了consumer函数,然后它将获得所有事件。我仍然有以下问题-1>为什么当我手动触发函数时它没有收到消息?2> 我测试了一个用python编写的consumer函数;也没有收到任何信息。下面是python代码:
def lambda_handler(event, context):
bootstrap_servers = ["<msk bootstrap>"]
topicName = '<mp-topic-name>'
consumer = KafkaConsumer (topicName, group_id = 'test',bootstrap_servers = bootstrap_servers,auto_offset_reset = 'earliest', consumer_timeout_ms=5000)
for message in consumer:
consumer.commit()
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
KafkaConsumer.close()
return ("Processed")
java用户代码;这是一个普通的Kafka客户。通过lambda,当以预定方式运行时,它能够读取消息;但不是手动测试时。
try {
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));
context.getLogger().log("Subscribed to topic " + topicName);
int i = 0;
ConsumerRecords<String, String> records = consumer.poll(5000);
for (ConsumerRecord<String, String> record : records) {
context.getLogger().log("Message::: offset = "+record.offset()+", key = "+record.key()+", value = "+record.value()+"\n");
}
context.getLogger().log("After messages");
} catch (Exception e) {
e.printStackTrace();
context.getLogger().log("Exception: "+e.getMessage());
}
暂无答案!
目前还没有任何答案,快来回答吧!