使用kafka流尝试将消息从输入主题写入输出主题

3hvapo4f  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(305)

下面是我试图将数据从一个主题写到另一个主题的代码逻辑。

//Computational logic
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> kStream = builder.stream(topicName_In);
kStream.foreach((k, v) -> System.out.println("Key = " + k + " Value = " + v));
//kStream.peek((k, v) -> System.out.println("Key = " + k + " Value = " + v));
kStream.to(topicName_Out);
Topology topology = builder.build();

输入主题数据格式:简单消息-1
错误

Exception in thread "HelloStreams-564343a1-1709-4bae-8fe5-514b37eee595-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:175)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:162)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:765)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:943)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:764)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
h79rfbju

h79rfbju1#

给我这条线

KStream<String, String> kStream = builder.stream(topicName_In);

我假设您的输入数据是字符串。
错误信息显示

Size of data received by IntegerDeserializer is not 4

这表示您确实配置了 IntegerSerde (对于键和/或值),但您需要配置 StringSerde 让它工作。

相关问题