配置Kafka发送自定义类型数据

cgvd09ve  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(462)

我对Kafka很陌生
我已经使用官方示例创建了一个producer和consumer greoup,尽管我想从producer和consumer发送节俭包,以获取包并存储在包数组中。
我已经写了生产者方面的代码为

KeyedMessage<String, Bundle> data = new KeyedMessage<String, Bundle>("bundles", "Bundle", bundle); 
        producer.send(data);

但在消费者方面,我有

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(NO_OF_THREADS));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> messageStreams = consumerMap.get(topic);

我可以让kafka消费者以bundle类型而不是byte[]数组的形式获取数据吗。

ukxgm1gy

ukxgm1gy1#

您可以使用以下方法将值直接解码为bundle类型:

public interface kafka.javaapi.consumer.ConsumerConnector {
  ...
  public <K,V> Map<String, List<KafkaStream<K,V>>> 
     createMessageStreams(
         Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);

在这种情况下,您需要实现自己的 valueDecoder 类型 Decoder<Bundle> 使用相应的thrift api。
请参阅kafka文档中的高级使用者api描述。

相关问题