总结
当配置flink使用处理时间时,我希望context.timestamp()在一个键控处理函数中返回null,而在测试时,它似乎返回了Kafka源主题中的摄取时间戳。
详情
我在Datastream环境中使用pyflink 1.16.1,我将应用程序配置为使用处理时间,如下所示。
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
我还在源上配置水印,如下所示:
properties = get_kafka_properties(args) # Security config
kafka_source = KafkaSource.builder()\
.set_properties(properties)\
.set_topics(topic)\
.set_starting_offsets(KafkaOffsetsInitializer.earliest())\
.set_value_only_deserializer(SimpleStringSchema())\
.build()
source = env.from_source(
source=kafka_source,
watermark_strategy=WatermarkStrategy.no_watermarks()
source_name="kafka_source")
当我使用这个简单的测试处理函数时,我看到使用了消息到达Kafka队列的时间,而我期望返回null。
class TestProcessFunction(KeyedProcessFunction):
def __init__(self):
pass
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield ctx.timestamp()
这让我怀疑处理时间是否确实被使用了,或者我的配置中是否有错误。
1条答案
按热度按时间mccptt671#
这并没有按预期工作,因为根据发行说明,
set_stream_time_characteristic
的使用在Flink 1.12中已被弃用:弃用流执行环境。设置流时间特征()和时间特征
以及:
在Flink 1.12中,默认的流时间特性已经被更改为EventTime,因此您不再需要调用这个方法来启用事件时间支持。显式使用处理时间窗口和计时器在事件时间模式下工作。如果您需要禁用水印,请使用ExecutionConfig.setAutoWatermarkInterval(长整型)。如果使用的是摄取时间,请手动设置适当的WatermarkStrategy。如果您正在使用常规“时间窗口”操作(例如KeyedStream.timeWindow()),请使用显式指定处理时间或事件时间的等效操作。
这可以解释为什么您会看到当前遇到的行为。使用
WatermarkStrategy.no_watermarks()
应该足以满足您的情况。您还可以考虑查看上下文公开的可用TimerService
,以根据需要访问其他与计时相关的信息(例如,当前处理时间、当前水位线等):