在Python中获取融合Kafka主题的最新消息

zbdgwd5y  于 2023-02-03  发布在  Apache
关注(0)|答案(2)|浏览(157)

以下是我目前所做的尝试:

from confluent_kafka import Consumer

c = Consumer({... several security/server settings skipped...
              'auto.offset.reset': 'beginning',
              'group.id': 'my-group'})

c.subscribe(['my.topic'])
msg = poll(30.0)  # msg is of None type.

msg几乎总是以None结束,我认为问题可能是'my-group'已经使用了'my.topic'的所有消息......但是我不关心消息是否已经被使用--我仍然需要最新的消息,具体来说,我需要最新消息的时间戳。
我又试了一点,从这里看,主题中大概有25条信息,但我不知道如何获取它们:

a = c.assignment()
print(a)  # Outputs [TopicPartition{topic=my.topic,partition=0,offset=-1001,error=None}]
offsets = c.get_watermark_offsets(a[0])
print(offsets)  # Outputs: (25, 25)

如果没有任何消息是因为主题从来没有写过任何东西,我该如何确定呢?如果是这样,我该如何确定主题已经存在了多久呢?我希望写一个脚本,自动删除任何主题,没有写过在过去的X天(最初14-可能会调整它随着时间的推移)。

iqjalb3h

iqjalb3h1#

我遇到了同样的问题,没有关于这个的例子。在我的例子中有一个分区,我需要阅读最后一条消息,从该消息中了解一些信息,以设置我拥有的消费者/生产者组件。
逻辑是启动Consumer,订阅主题,轮询消息-〉这将触发on_assign,在那里通过分配回修改的分区来发生倒带。在on_assign完成后,对msg的轮询将继续,并从主题读取最后一条消息。

settings = {
    "bootstrap.servers": "my.kafka.server",
    "group.id": "my-work-group",
    "client.id": "my-work-client-1",
    "enable.auto.commit": False,
    "session.timeout.ms": 6000,
    "default.topic.config": {"auto.offset.reset": "largest"},
}
consumer = Consumer(settings)

def on_assign(a_consumer, partitions):
    # get offset tuple from the first partition
    last_offset = a_consumer.get_watermark_offsets(partitions[0])
    # position [1] being the last index
    partitions[0].offset = last_offset[1] - 1
    consumer.assign(partitions)

consumer.subscribe(["test-topic"], on_assign=on_assign)

msg = consumer.poll(6.0)

现在msg里面有最后一条消息。

ny6fqffe

ny6fqffe2#

如果任何人仍然需要一个例子的情况下,多个分区;我是这样做的:

from confluent_kafka import OFFSET_END, Consumer

settings = {
    'bootstrap.servers': "my.kafka.server",
    'group.id': "my-work-group",
    'auto.offset.reset': "latest"
}

def on_assign(consumer, partitions):
    for partition in partitions:
        partition.offset = OFFSET_END
    consumer.assign(partitions)

consumer = Consumer(settings)

consumer.subscribe(["test-topic"], on_assign=on_assign)

msg = consumer.poll(1.0)

相关问题