消息不会被刷新到kafka

1szpjjfi  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(190)

我有一个包含5个分区(a)的主题,每个分区有5个消费者从一个分区中提取消息。进行一些处理(大约需要30秒)并推送到两个主题,每个主题有一个分区(topic-b1,topicb2)。topic-b1是一个日志压缩主题
Kafkapython==2.0.1
Python3.8

producer = KafkaProducer(bootstrap_servers = cluster_ip, acks = -1, retries = 3)

# acks = -1 is similar to acks = all

我添加了回调,在发送时返回

def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)

def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)
    # handle exception
producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)
producer.flush()

消费者代码片段-

while True:
    raw_messages = consumer.poll(timeout_ms=20000, max_records=2)
    for topic_partition, messages in raw_messages.items():
        for msg in messages:
            try:
                #process msg .....
                data1, data2 = some_func(msg.value)
                producer.send(Topic-B1, key = msg.key, value = data1).add_callback(on_send_success).add_errback(on_send_error)
                producer.flush()
                producer.send(Topic-B2, key = msg.key, value = data2).add_callback(on_send_success).add_errback(on_send_error)
                producer.flush()
                consumer.commit()
            except Exception as e:
                log.error('I am an errback', exc_info=str(e))

但是很少有消息没有发送到topic-b1,topic-b2。我没有看到任何日志或错误的消费者。如何确保消息确实发送,或者如何在失败时引发异常。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题