如何读取spark中Kafka主题的二进制数据

erhoui1w  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(518)

我需要读Kafka主题的加密信息。我当前从主题中读取字符串的代码如下所示:

JavaPairReceiverInputDStream<String, String> pairrdd = 
            KafkaUtils.createStream(jssc, zkQuorum, group, topicmap);

我应该怎么做才能从kafka队列中更改此代码,以确保读取的字节数组、加密的数据没有损坏

qv7cva1a

qv7cva1a1#

用于读取Kafka在 <byte[], byte[]> 窗体,您可以使用 KafkaUtils . 像这样-

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", ByteArrayDeserializer.class);
kafkaParams.put("value.deserializer", ByteArrayDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("topicA", "topicB");

JavaInputDStream<ConsumerRecord<byte[], byte[]>> pairrdd =
  KafkaUtils.createDirectStream(
    jssc,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<byte[], byte[]>Subscribe(topics, kafkaParams)
  );

我希望这有帮助!

相关问题