我们正在考虑使用FlinkSQL对过去5-10分钟内的实时Kafka数据进行特别分析。为了实现这一点,我们似乎需要扩展kafka连接器,使其在给定的时间段内只读取消息,并使用它来生成有限的输入源。我想知道在这方面是否有其他的方法。欢迎提出任何建议。
2nc8po8w1#
flink-kafka连接器支持以各种方式设置开始位置,包括 myConsumer.setStartFromTimestamp(...) . kafka表格连接器似乎支持这些相同的选项。如果您想使用flink的sql客户机,可能需要编写一个瘦 Package 器来计算10分钟前的时间戳,并相应地设置起始kafka偏移量。
myConsumer.setStartFromTimestamp(...)
1条答案
按热度按时间2nc8po8w1#
flink-kafka连接器支持以各种方式设置开始位置,包括
myConsumer.setStartFromTimestamp(...)
. kafka表格连接器似乎支持这些相同的选项。如果您想使用flink的sql客户机,可能需要编写一个瘦 Package 器来计算10分钟前的时间戳,并相应地设置起始kafka偏移量。