无法阅读Kafka在梁Kafka

rekjcdws  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(325)

我已经在apchea beam中编写了一个非常简单的管道,如下所示,从汇合云上的kafka集群读取数据:

Pipeline pipeline = Pipeline.create(options);

        Map<String, Object> propertyBuilder = new HashMap();
        propertyBuilder.put("ssl.endpoint.identification.algorithm", "https");
        propertyBuilder.put("sasl.mechanism","PLAIN");
        propertyBuilder.put("request.timeout.ms","20000");
        propertyBuilder.put("retry.backoff.ms","500");

        pipeline
            .apply(KafkaIO.<byte[], byte[]>readBytes()
               .withBootstrapServers("pkc-epgnk.us-central1.gcp.confluent.cloud:9092")
               .withTopic("gcp-ingestion-1")  
               .withKeyDeserializer(ByteArrayDeserializer.class)
               .withValueDeserializer(ByteArrayDeserializer.class)
               .updateConsumerProperties(propertyBuilder)             
               .withoutMetadata() // PCollection<KV<Long, String>>
            ) .apply(Values.<byte[]>create());

然而,当运行上述代码从kafka集群中读取数据时,我得到了下面的解释
我在上面的DirectJava runner上运行,我使用的是beam 2.8,
我可以阅读和产生消息到我的Kafka合流集群,但不是由上述代码。

mfuanj7w

mfuanj7w1#

如果遵循堆栈跟踪,则代码似乎试图将超时配置属性强制转换为 Integer : https://github.com/apache/beam/blob/2e759fecf63d62d110f29265f9438128e3bdc8ab/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/kafkaunboundedreader.java#l112
但是它得到了一个字符串。我猜这是因为你把它设为字符串: propertyBuilder.put("request.timeout.ms","20000") . 我想正确的做法是把它设为 Integer ,例如 propertyBuilder.put("request.timeout.ms", 20000) (超时值周围没有引号)。
其他配置属性也可能存在类似问题(例如重试退避),您需要仔细检查属性类型。

相关问题