我对Kafka很陌生
我已经使用官方示例创建了一个producer和consumer greoup,尽管我想从producer和consumer发送节俭包,以获取包并存储在包数组中。
我已经写了生产者方面的代码为
KeyedMessage<String, Bundle> data = new KeyedMessage<String, Bundle>("bundles", "Bundle", bundle);
producer.send(data);
但在消费者方面,我有
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(NO_OF_THREADS));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> messageStreams = consumerMap.get(topic);
我可以让kafka消费者以bundle类型而不是byte[]数组的形式获取数据吗。
1条答案
按热度按时间ukxgm1gy1#
您可以使用以下方法将值直接解码为bundle类型:
在这种情况下,您需要实现自己的
valueDecoder
类型Decoder<Bundle>
使用相应的thrift api。请参阅kafka文档中的高级使用者api描述。