Kafka如何从消费者的主题中解读

qqrboqgw  于 2021-06-08  发布在  Kafka
关注(0)|答案(5)|浏览(440)

我正试图找出我目前的高级消费者正在努力抵消哪些影响。我使用kafka 0.8.2.1,在kafka的server.properties中没有设置“offset.storage”,我认为这意味着偏移量存储在kafka中(我还通过检查zk shell中的路径验证了zookeeper中没有存储偏移: /consumers/consumer_group_name/offsets/topic_name/partition_number )
我试着听听 __consumer_offsets 看看哪个消费者节省了什么价值的补偿,但它没有工作。。。
我尝试了以下方法:
为控制台使用者创建了配置文件,如下所示:

=> more kafka_offset_consumer.config 

 exclude.internal.topics=false

并尝试了两个版本的控制台使用者脚本:


# 1:

bin/kafka-console-consumer.sh --consumer.config kafka_offset_consumer.config --topic __consumer_offsets --zookeeper localhost:2181

# 2

./bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 0 --broker-list localhost:9092 --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" --consumer.config kafka_offset_consumer.config

两者都不起作用-它只是坐在那里,但不打印任何东西,即使消费者积极消费/储蓄抵消。
我是否缺少其他配置/属性?
谢谢!
玛丽娜

blpfk2vs

blpfk2vs1#

从kafka0.11开始,(scala)源代码可以在这里找到
对于那些需要java翻译的用户,假设您得到了一个 ConsumerRecord<byte[], byte[]> consumerRecord ,您可以使用
获取密钥(首先检查密钥是否为空)并使用 GroupMetadataManager.readMessageKey(consumerRecord.key) . 它可以返回不同的类型,所以请检查 if ( ... instanceof OffsetKey) ,然后将其强制转换,可以从中获得各种值。
要获取偏移量的kafka记录值,可以使用 String.valueOf(GroupMetadataManager.readOffsetMessageValue(consumerRecord.value)) 一个从scala代码翻译过来的java示例。。。

byte[] key = consumerRecord.key;
if (key != null) {
    Object o = GroupMetadataManager.readMessageKey(key);
    if (o != null && o instanceOf OffsetKey) {
        OffsetKey offsetKey = (OffsetKey) o;
        Object groupTopicPartition = offsetKey.key;
        byte[] value = consumerRecord.value;
        String formattedValue = String.valueOf(GroupMetadataManager.readOffsetMessageValue(value);
       // TODO: Print, store, or compute results with the new key and value 
    }
}

注意,也可以使用adminclientapi来描述组,而不是使用这些原始消息
listconsumergroupoffsets():查找特定组的所有偏移
descripbeconsumergroups():查找组成员的详细信息
scala源代码提取

def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
  Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
    // Only print if the message is an offset record.
    // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
    case offsetKey: OffsetKey =>
      val groupTopicPartition = offsetKey.key
      val value = consumerRecord.value
      val formattedValue =
        if (value == null) "NULL"
        else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
      output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
      output.write("::".getBytes(StandardCharsets.UTF_8))
      output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
      output.write("\n".getBytes(StandardCharsets.UTF_8))
    case _ => // no-op
  }
epfja78i

epfja78i2#

如果你加上 --from-beginning 它很可能会给你一些结果,至少在我尝试的时候是这样。或者,如果您没有提供该参数,但在让消费者侦听时读取了更多消息(并触发偏移量提交),那么也应该在那里显示消息。

0s7z1bwu

0s7z1bwu3#

对于kafka-2.x,使用以下命令 kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"

tjjdgumg

tjjdgumg4#

我在尝试从消费补偿主题中消费时遇到了这个问题。我设法找到了不同版本的Kafka,并认为我会分享我的发现
对于Kafka0.8.2.x
注意:这使用zookeeper连接


# Create consumer config

echo "exclude.internal.topics=false" > /tmp/consumer.config

# Consume all offsets

./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" \
--zookeeper localhost:2181 --topic __consumer_offsets --from-beginning

对于Kafka0.9.x.x和0.10.x.x


# Create consumer config

echo "exclude.internal.topics=false" > /tmp/consumer.config

# Consume all offsets

./kafka-console-consumer.sh --new-consumer --consumer.config /tmp/consumer.config \
--formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \
--bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning

对于0.11.x.x-2.x


# Create consumer config

echo "exclude.internal.topics=false" > /tmp/consumer.config

# Consume all offsets

./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning
mbzjlibv

mbzjlibv5#

好的,我已经知道问题出在哪里了。我的Kafka实际上是使用zookeeper作为偏移存储,而不是Kafka。。。。我没有立即检测到的原因是我检查zk内容不正确:
我在做什么

ls  /consumers/consumer_group_name/offsets/topic_name/partition_number

什么也看不见。相反,我必须“获取”内容,这确实为我的消费者显示了正确的补偿,如下所示:

get /consumers/consumer_group_name/offsets/topic_name/partition_number 
185530404
cZxid = 0x70789ad05
ctime = Mon Nov 23 17:49:46 GMT 2015
mZxid = 0x7216cdc5c
mtime = Thu Dec 03 20:18:57 GMT 2015
pZxid = 0x70789ad05
cversion = 0
dataVersion = 3537384
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0

相关问题