kafka java simpleconsumer奇怪的编码

sz81bmfz  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(289)

我正试图使用kafka 9中的simpleconsumer来允许用户从时间偏移量重播事件-但是我从kafka收到的消息采用了一种非常奇怪的编码:

7icf-test-testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7\�W>8������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819330373,"context":{"userid":0,"username":"testUser"}}�!}�a�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819331637,"context":{"userid":1,"username":"testUser"}}���r�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819332754,"context":{"userid":2,"username":"testUser"}}��������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819333868,"context":{"userid":3,"username":"testUser"}}�p=
                            ������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819334997,"context":{"userid":4,"username"

使用kafkaconsumer可以解析这些消息。下面是我使用simpleconsumer检索消息的代码:

for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
        long currentOffset = messageAndOffset.offset();
        if (currentOffset < readOffset) {
            log.debug("Found an old offset - skip");
            continue;
        }

        readOffset = messageAndOffset.nextOffset();

        int payloadOffset = 14 + messageAndOffset.message().keySize(); // remove first x bytes, schema Id
        byte[] data = messageAndOffset.message().payload().array();
        byte[] realData = Arrays.copyOfRange(data, payloadOffset, data.length - payloadOffset);
        log.debug("Read " + new String(realData, "UTF-8"));
}

我添加了跳过第一个x字节的代码,在我不断得到关于字节太高的utf-32错误之后,我假设这是因为kafka在有效负载中预先准备了消息大小之类的信息。这是avro神器吗?

ie3xauqp

ie3xauqp1#

我从来没有找到一个好的答案-但我改用 SimpleConsumer 要查询Kafka的偏移量,我需要(每个分区。尽管实现很差),然后使用本机kafkaconsumer使用 seek(TopicPartition, offset) 或者 seekToBeginning(TopicPartition) 检索消息。希望在下一个版本中,它们能为本机客户端添加从给定时间戳检索消息的功能。

um6iljoc

um6iljoc2#

你在找这个吗?

readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();

    if(payload == null) {
        System.err.println("Message is null : " + readOffset);
        continue;
    }

final byte[] realData = new byte[payload.limit()];
payload.get(realData);
System.out.println("Read " + new String(realData, "UTF-8"));
xuo3flqw

xuo3flqw3#

您可以使用消息的时间戳(可能不是每次提交)周期性地将您正在提交的偏移量记录到分区中,然后您可以在将来进行一些度量来设置使用者偏移量。我想这是为了生产调试。
我怀疑他们会增加这样一个功能,这似乎是不可行的考虑Kafka如何工作,虽然我可能是错的,总有天才的东西在进行。我会做记录的事。

相关问题