spring嵌入式kafka+模拟架构注册表:未注册状态存储changelog架构

huwehgph  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(453)

我正在为我们的kafka系统构建一个集成测试,使用spring嵌入的kafka代理和mockschemaregistryclient。我正在为我们的一个流拓扑构建一个测试,它是使用streams api(kstreambuilder)构建的。这个特定的拓扑结构有一个kstream(stream1)传入一个ktable(table1)。
当我将来自table1的ktableprocessor的输入馈送到stream1时遇到错误:

Exception in thread "mortgage-kafka-consumers-it-c1dd9185-ce16-415c-ad82-293c1281c897-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=streaming.mortgage.application_party, partition=0, offset=0
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:202)
    at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 6

**Caused by: java.io.IOException: Cannot get schema from schema registry!**

    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:106)
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getBySubjectAndID(MockSchemaRegistryClient.java:149)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54)
    at com.sofi.kafka.serialization.AvroDeserializer.deserialize(AvroDeserializer.java:35)
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:151)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:135)
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:62)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:45)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:131)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:188)
    at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:342)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:334)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:624)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)

The KTableProcessor is attempting to deserialize an entry from the RocksDB state store, however the schema does not exist in the mock schema registry. The topic whose schema is being requested is:**appname-KTABLE-SOURCE-STATE-STORE-0000000000-changelog**

As the exception states, the schema has not been registered. However, the topic**appname-KTABLE-SOURCE-STATE-STORE-0000000000-changelog-key**does have a registered schema (registered when the entry's key is serialized for the query). 

Since this is an internal topic, I don't expect to have to register this schema myself, however I'm failing because of the schema's absence in the registry. Is there a way to have changelog schemas registered prior to data ingestion? Is there a way to disable state store changelogging with the KStreamBuilder?

提前谢谢!

amrnrhlw

amrnrhlw1#

解决了这个问题,我现在将不好意思地重新叙述:当使用带有嵌入式kafka代理的ktable(通过streams api)时,您将希望使用嵌入式kafka代理的每次运行(在我的例子中,是测试的每次运行)所特有的状态存储目录来配置kafkastreams对象。
您可以通过 StreamsConfig.STATE_DIR_CONFIG 配置。我通过在默认的状态存储目录中附加一个时间戳使其唯一

properties.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kraken-streams/" + LocalDateTime.now().toString());

问题是每次初始化嵌入的kafka代理时,同一位置都存在一个旧的状态存储。当第一条记录被输入到ktable的主题中时,状态存储能够返回一个先前的值。这导致试图反序列化尚未序列化(就架构注册表示例而言)的状态存储记录。架构仅在序列化时注册,因此尝试反序列化失败,因为缺少已注册的架构。

相关问题