用例-有一个包含消息的主题(null,元数据)。我需要根据主题创建一个ktable,其键(metadata.entity\ id)和值为metatdata。此表稍后将用于与具有相同键的流进行连接。
private final static String KAFKA_BROKERS = "localhost:9092";
private final static String APPLICATION_ID = "TestMetadataTable";
private final static String AUTO_OFFSET_RESET_CONFIG = "earliest";
private final static String METADATA_TOPIC = "test-metadata-topic";
public static void main (String args[]) {
//Setting the Stream configuration params.
final Properties kafkaStreamConfiguration = new Properties();
kafkaStreamConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
kafkaStreamConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, APPLICATION_ID);
kafkaStreamConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
kafkaStreamConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
//Creating Serdes for MetricMetadata
GenericJsonSerializer<MetricMetadata> metadataJsonSerializer = new GenericJsonSerializer<MetricMetadata>();
GenericJsonDeserializer<MetricMetadata> metadataJsonDeserializer = new GenericJsonDeserializer<MetricMetadata>(MetricMetadata.class);
Serde<MetricMetadata> metadataSerde = Serdes.serdeFrom(metadataJsonSerializer, metadataJsonDeserializer);
//Creating kafka stream.
final StreamsBuilder builder = new StreamsBuilder();
KTable<String, MetricMetadata> metaTable = builder.table(METADATA_TOPIC, Consumed.with(Serdes.String(), metadataSerde))
.groupBy((key, value) -> KeyValue.pair(value.getEntity_id(), value))
.aggregate( () -> null,
(key, value, aggValue) -> value,
(key, value, aggValue) -> value
);
final KafkaStreams streams = new KafkaStreams(builder.build(), kafkaStreamConfiguration);
streams.start();
}
一旦我把一条消息推送到主题-元数据主题。这将导致以下错误。我是不是漏掉了什么。Kafka流2.2.0
Exception in thread "TestMetadataTable-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store test-metadata-topic-STATE-STORE-0000000000
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:519)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:471)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:459)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.streams.kstream.internals.ChangedSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: org.apache.kafka.streams.kstream.internals.Change). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
at org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:95)
at org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:72)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:102)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:79)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:127)
at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:72)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:224)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:239)
... 10 more
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 28 more
1条答案
按热度按时间drkbr07n1#
在这种情况下,您需要提供
Serdes
到KTable.groupBy()
操作通过Grouped
作为召唤groupBy
触发重新分区。您还需要提供相同的Serdes
状态存储的聚合操作。而且,因为关键是
null
,我认为你应该使用KStream
一开始。那就打电话groupByKey
(您仍然需要提供Serdes
通过Grouped
),聚合将为您提供KTable
你想要的。在我脑子里,这样的事情应该行得通