我有很多工作看起来像这个草图
Source<GenericRecord> kafkaSource;
kafkaSource
.map(function that takes generic record)
.map( ... )
...
.sink(kafka sink that takes in generic records)
我们将数据表示为genericrecords的原因是,正在使用的avro模式在运行时会发生变化。我们为一个单独的主题编写模式描述。我们知道向/从操作符/kafka传递genericords会带来性能损失,因此我们为genericords编写了自己的kafka序列化模式和kryo序列化程序。棘手的部分是我们的定制序列化程序需要知道当前的模式列表是什么,这样它就可以知道如何在消息通过图时对其进行ser/de处理。
我一辈子都搞不懂如何用理智的方式把这些信息传递给我们的序列化程序。我知道的方法有:
一种静态字段,用于在外部系统中轮询记录列表。我们已经这样做了,但我们认为这会导致类加载器泄漏,因为轮询线程是在自定义序列化程序中创建的,不清楚应该在哪里取消它。
广播状态。我们可以尝试使用广播状态来流式处理图形周围的模式,但这意味着编写这些作业的工效学要低得多;图中的每个操作符都必须接收广播状态,并且必须在内部处理序列化,而不是使用我们的自定义序列化程序
由Kafka流填充的静态字段。这避免了(1)的线程泄漏,但我不认为flink保证我们可以在每个任务槽中填充这个静态字段。很难控制Kafka流的处理位置。
我知道这是一个复杂的情况,所以我希望它明白。我觉得很困惑,因为我考虑过的所有解决方案似乎都不够。有没有其他我没想到的选择?有没有更好的方法来管理avro模式的动态集合而不重新启动?希望得到任何建议!谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!