lambda中的问题,用于java消费者收听msk主题

wpcxdonn  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(267)

我在观察一个奇怪的问题。我在Java8中构建了一个lambda函数;它使用kafka consumer api轮询msk主题,即consumer.poll(5000)(我尝试过各种超时)。还有第二个lambda函数,它是生产者并向同一主题发送消息。这两个功能都连接到msk所在的vpc。
制片人工作得很好。我可以从ec2看到运行kafka console consumer的消息b。但是消费者lambda不起作用,它只是给出了超时。
只有当我同时运行lambda producer、ec2上的kafka console consumer和lambda consumer时,consumer才会收到一些消息!!准确地说,生产者在一个循环中发送5条消息,ec2控制台消费者显示所有5条消息,但是lambda消费者显示第3条或第4条消息。
为什么会这样?这里可能存在什么问题,如何在lambda consumer中获得一致的消息?
如果有人有一个工作的代码样本,我将非常感谢。
谢谢您。
进一步更新:我已经安排了consumer函数,然后它将获得所有事件。我仍然有以下问题-1>为什么当我手动触发函数时它没有收到消息?2> 我测试了一个用python编写的consumer函数;也没有收到任何信息。下面是python代码:

def lambda_handler(event, context):
bootstrap_servers = ["<msk bootstrap>"]
topicName = '<mp-topic-name>'

consumer = KafkaConsumer (topicName, group_id = 'test',bootstrap_servers = bootstrap_servers,auto_offset_reset = 'earliest', consumer_timeout_ms=5000)

for message in consumer:
      consumer.commit()
      print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

KafkaConsumer.close()         
return ("Processed")

java用户代码;这是一个普通的Kafka客户。通过lambda,当以预定方式运行时,它能够读取消息;但不是手动测试时。

try {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList(topicName));

        context.getLogger().log("Subscribed to topic " + topicName);

        int i = 0;
        ConsumerRecords<String, String> records = consumer.poll(5000);          

        for (ConsumerRecord<String, String> record : records) {
            context.getLogger().log("Message:::  offset = "+record.offset()+", key = "+record.key()+", value = "+record.value()+"\n");
        }
        context.getLogger().log("After messages");
    } catch (Exception e) {
        e.printStackTrace();
        context.getLogger().log("Exception: "+e.getMessage());
    }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题