Flink 未调用SerializationSchema打开方法

5jvtdoz2  于 2023-01-28  发布在  Apache
关注(0)|答案(1)|浏览(114)

我为KinesisStreamsSink实现了SerializationSchema,并在'Open'方法中初始化了SerializationSchema。但是我发现'Open'方法实际上没有被Flink调用。有人知道这个问题吗?
因为我的SerializationSchema实现依赖于某个不可Serializable的类,所以我必须在运行时的open方法中初始化它。
我使用KinesisStreamSink遵循flink文档的指南,如:

inesisStreamsSink<String> kdsSink =
    KinesisStreamsSink.<String>builder()
        .setKinesisClientProperties(sinkProperties)                               // Required
        .setSerializationSchema(new CustomizedSchema())                         // Required
        .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))  // Required
        .setStreamName("your-stream-name")                                        // Required
        .setFailOnError(false)                                                    // Optional
        .setMaxBatchSize(500)                                                     // Optional
        .setMaxInFlightRequests(50)                                               // Optional
        .setMaxBufferedRequests(10_000)                                           // Optional
        .setMaxBatchSizeInBytes(5 * 1024 * 1024)                                  // Optional
        .setMaxTimeInBufferMS(5000)                                               // Optional
        .setMaxRecordSizeInBytes(1 * 1024 * 1024)                                 // Optional
        .build();

在代码中,由于未调用open方法,CustomizedSchema未正确初始化。

jmo0nnb3

jmo0nnb31#

这是Flink 1.15中的一个错误。在1.16中通过PR解决了这个问题。
建议的Flink 1.15版本的解决方案是空值检查和延迟初始化,请参见Flink用户邮件列表中的discussion

相关问题