我为KinesisStreamsSink
实现了SerializationSchema
,并在'Open'方法中初始化了SerializationSchema。但是我发现'Open'方法实际上没有被Flink调用。有人知道这个问题吗?
因为我的SerializationSchema实现依赖于某个不可Serializable的类,所以我必须在运行时的open方法中初始化它。
我使用KinesisStreamSink
遵循flink文档的指南,如:
inesisStreamsSink<String> kdsSink =
KinesisStreamsSink.<String>builder()
.setKinesisClientProperties(sinkProperties) // Required
.setSerializationSchema(new CustomizedSchema()) // Required
.setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) // Required
.setStreamName("your-stream-name") // Required
.setFailOnError(false) // Optional
.setMaxBatchSize(500) // Optional
.setMaxInFlightRequests(50) // Optional
.setMaxBufferedRequests(10_000) // Optional
.setMaxBatchSizeInBytes(5 * 1024 * 1024) // Optional
.setMaxTimeInBufferMS(5000) // Optional
.setMaxRecordSizeInBytes(1 * 1024 * 1024) // Optional
.build();
在代码中,由于未调用open方法,CustomizedSchema
未正确初始化。
1条答案
按热度按时间jmo0nnb31#
这是Flink 1.15中的一个错误。在1.16中通过PR解决了这个问题。
建议的Flink 1.15版本的解决方案是空值检查和延迟初始化,请参见Flink用户邮件列表中的discussion。