Flink 处理时间特征上下文返回时间戳

toe95027  于 2023-03-11  发布在  Apache
关注(0)|答案(1)|浏览(131)

总结

当配置flink使用处理时间时,我希望context.timestamp()在一个键控处理函数中返回null,而在测试时,它似乎返回了Kafka源主题中的摄取时间戳。

详情

我在Datastream环境中使用pyflink 1.16.1,我将应用程序配置为使用处理时间,如下所示。

env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)

我还在源上配置水印,如下所示:

properties = get_kafka_properties(args) # Security config
kafka_source = KafkaSource.builder()\
        .set_properties(properties)\
        .set_topics(topic)\
        .set_starting_offsets(KafkaOffsetsInitializer.earliest())\
        .set_value_only_deserializer(SimpleStringSchema())\
        .build()
source = env.from_source(
        source=kafka_source,
        watermark_strategy=WatermarkStrategy.no_watermarks()
        source_name="kafka_source")

当我使用这个简单的测试处理函数时,我看到使用了消息到达Kafka队列的时间,而我期望返回null。

class TestProcessFunction(KeyedProcessFunction):
    def __init__(self):
        pass

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        yield ctx.timestamp()

这让我怀疑处理时间是否确实被使用了,或者我的配置中是否有错误。

mccptt67

mccptt671#

这并没有按预期工作,因为根据发行说明,set_stream_time_characteristic的使用在Flink 1.12中已被弃用:
弃用流执行环境。设置流时间特征()和时间特征
以及:
在Flink 1.12中,默认的流时间特性已经被更改为EventTime,因此您不再需要调用这个方法来启用事件时间支持。显式使用处理时间窗口和计时器在事件时间模式下工作。如果您需要禁用水印,请使用ExecutionConfig.setAutoWatermarkInterval(长整型)。如果使用的是摄取时间,请手动设置适当的WatermarkStrategy。如果您正在使用常规“时间窗口”操作(例如KeyedStream.timeWindow()),请使用显式指定处理时间或事件时间的等效操作。
这可以解释为什么您会看到当前遇到的行为。使用WatermarkStrategy.no_watermarks()应该足以满足您的情况。您还可以考虑查看上下文公开的可用TimerService,以根据需要访问其他与计时相关的信息(例如,当前处理时间、当前水位线等):

def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
    # The timer service can expose some additional timing info based on your use-case 
    yield ctx.timer_service().current_processing_time()

相关问题