我正在尝试加入flink中两种类型的事件(比如事件a和事件b)。我想确认我的理解是否正确。事件的一些属性-
事件a立即流入flink,延迟几分钟(5-10分钟)
事件b稍微延迟15-30分钟
事件a和事件b之间有1:1的连接
我将事件a的数据流配置为10分钟的BoundedAutoFordernessTimestampExtractor,将事件b的数据流配置为30分钟。稍后,我使用表api执行时间窗口联接。
我对以下内容的理解是否正确-
只要事件在延迟窗口内(对于事件a为10分钟,对于事件b为30分钟),则在收到事件后立即对其进行处理和加入。由于flink的任何配置,端到端延迟没有最小限制。
该表将保留事件最多30分钟,直到两条流的水印到达。之后根据水印事件被清除
下面代码中的查询配置是多余的,实际上并不需要
对下面的代码还有什么建议吗?
queryConfig.withIdleStateRetentionTime(
org.apache.flink.api.common.time.Time.seconds(1),
org.apache.flink.api.common.time.Time.minutes(30))
val stream: DataStream[Any] = textStream.flatMap(json => convert(json))
val aStream: DataStream[ClassA] =
stream
.filter(obj => obj.isInstanceOf[ClassA])
.rebalance
.map(obj => obj.asInstanceOf[ClassA])
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[ClassA](
Time.minutes(10)){
override def extractTimestamp(element: ClassA): Long =
element.serviceTimestamp.toInstant.toEpochMilli
})
val bStream: DataStream[ClassB] =
stream
.filter(obj => obj.isInstanceOf[ClassB])
.rebalance
.map(obj => obj.asInstanceOf[ClassB])
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[ClassB](
Time.minutes(30)){
override def extractTimestamp(element: ClassB): Long =
element.timestamp.toInstant.toEpochMilli
})
val aTable: Table = tableEnv.fromDataStream[ClassA](aStream,
// The .rowtime is for setting event time attributes
'aTimestamp.rowtime as 'aTimestamp, 'aUniqueId, 'aItem)
val bTable: Table = tableEnv.fromDataStream[ClassB](bStream,
// The .rowtime is for setting event time attributes
// https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
'bTimestamp.rowtime as 'bTimestamp, 'uniqueId, 'bItem)
val result: Table = aTable
.join(aTable)
.where('aUniqueId === 'uniqueId
// Give ClassB events 30 minutes lateness.
// Use a time window join as optimization - https://stackoverflow.com/a/51620821
// & https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#time-windowed-joins
// Both time clauses are need to qualify as time window join
&& 'bTimestamp >= 'aTimestamp
&& 'bTimestamp <= 'aTimestamp + 30.minutes)
// DO NOT change order without changing order in later parsing code
.select('uniqueId, 'aItem, 'bItem, 'bTimestamp, 'aTimestamp.cast(createTypeInformation[Timestamp]))
val outputStream: DataStream[ClassC] = tableEnv
.toAppendStream[(String, String, String, Timestamp, Timestamp)](result)
// TODO find better way to map to a POJO
.map(row => ClassCUtils.toClassC(row))
1条答案
按热度按时间zvokhttg1#
只要事件在延迟窗口内(对于事件a为10分钟,对于事件b为30分钟),则在收到事件后立即对其进行处理和加入。由于flink的任何配置,端到端延迟没有最小限制。
没错。事件将在接收时被Map和过滤,并放入缓冲区以满足连接窗口的要求。
该表将保留事件最多30分钟,直到两条流的水印到达。之后根据水印事件被清除
没错。这个
IntervalJoinOperator
将从连接的右侧和左侧接收事件,检查它们是否在时间范围内,如果在时间范围内,则向下游发射它们:下面代码中的查询配置是多余的,实际上并不需要
没错。
withIdleStateRetentionTime
在使用无界运算符(例如GROUP BY
sql中没有windows属性的子句。