我遇到了一个可重复的错误,而产生的avro消息与ReactKafka和avro4s。一旦 identityMapCapacity
客户的( CachedSchemaRegistryClient
)已达到,序列化失败
java.lang.IllegalStateException: Too many schema objects created for <myTopic>-value
这是出乎意料的,因为所有消息都应该具有相同的模式—它们是相同case类的序列化。
val avroProducerSettings: ProducerSettings[String, GenericRecord] =
ProducerSettings(system, Serdes.String().serializer(),
avroSerde.serializer())
.withBootstrapServers(settings.bootstrapServer)
val avroProdFlow: Flow[ProducerMessage.Message[String, GenericRecord, String],
ProducerMessage.Result[String, GenericRecord, String],
NotUsed] = Producer.flow(avroProducerSettings)
val avroQueue: SourceQueueWithComplete[Message[String, GenericRecord, String]] =
Source.queue(bufferSize, overflowStrategy)
.via(avroProdFlow)
.map(logResult)
.to(Sink.ignore)
.run()
...
queue.offer(msg)
序列化程序是 KafkaAvroSerializer
,用 new CachedSchemaRegistryClient(settings.schemaRegistry, 1000)
生成 GenericRecord
:
def toAvro[A](a: A)(implicit recordFormat: RecordFormat[A]): GenericRecord =
recordFormat.to(a)
val makeEdgeMessage: (Edge, String) => Message[String, GenericRecord, String] = { (edge, topic) =>
val edgeAvro: GenericRecord = toAvro(edge)
val record = new ProducerRecord[String, GenericRecord](topic, edge.id, edgeAvro)
ProducerMessage.Message(record, edge.id)
}
模式是在代码的深处创建的( io.confluent.kafka.serializers.AbstractKafkaAvroSerDe#getSchema
,由调用 io.confluent.kafka.serializers.AbstractKafkaAvroSerializer#serializeImpl
)我对它没有影响,所以我不知道如何修复漏洞。在我看来,这两个合流的项目不能很好地协同工作。
我在这里、这里和这里发现的问题似乎没有涉及到我的用例。
我目前的两个解决方法是:
不使用模式注册表-显然不是一个长期的解决方案
创建自定义 SchemaRegistryClient
不依赖对象标识是可行的,但我希望避免产生比重新实现更多的问题
有没有办法根据消息/记录类型生成或缓存一致的模式,并将其用于我的设置?
1条答案
按热度按时间tpxzln5u1#
编辑2017.11.20
我的问题是
GenericRecord
我的消息已被的另一个示例序列化RecordFormat
,包含Schema
. 这里的隐式解析每次都生成一个新示例。def toAvro[A](a: A)(implicit recordFormat: RecordFormat[A]): GenericRecord = recordFormat.to(a)
解决办法是用别针固定RecordFormat
示例到val
并明确地重用它。非常感谢https://github.com/heliocentrist 为了解释细节。原始答复:
在等待了一段时间之后(对于github问题也没有答案),我不得不实现我自己的
SchemaRegistryClient
. 90%以上是从原件复制过来的CachedSchemaRegistryClient
,刚刚翻译成scala。使用scalamutable.Map
修复了内存泄漏。我没有进行任何全面的测试,所以使用风险自负。