我有下面的flink代码来实验水印。我想实现的是A
和B
通过将它们路由到不同的分区来使用不同的水印。
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks, KeyedProcessFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.util.Collector
import java.text.SimpleDateFormat
import java.util.Date
object Test{
def to_milli(str: String) =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(str).getTime
def to_char(milli: Long) = {
val date = if (milli <= 0) new Date(0) else new Date(milli)
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date)
}
val data = Seq(
("A", "2020-08-30 10:50:11"),
("B", "2020-08-30 10:50:13"),
("B", "2020-08-30 10:50:04"),
("A", "2020-08-30 10:50:08")
)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(4)
env.fromCollection(data).setParallelism(1).partitionCustom(new Partitioner[String] {
override def partition(key: String, numPartitions: Int): Int = key.hashCode() % numPartitions
}, e => e._1).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[(String, String)]() {
var maxSeen = Long.MinValue
override def checkAndGetNextWatermark(lastElement: (String, String), extractedTimestamp: Long): Watermark = {
val eventTime = to_milli(lastElement._2)
if (eventTime > maxSeen) {
maxSeen = eventTime
}
println(s"key: ${lastElement._1}, event time: ${to_char(eventTime)}, watermark: ${to_char(maxSeen - 4 * 1000)} ")
new Watermark(maxSeen - 4 * 1000)
}
override def extractTimestamp(element: (String, String), previousElementTimestamp: Long): Long = to_milli(element._2)
}).setParallelism(2).keyBy(_._1).process(new KeyedProcessFunction[String, (String, String), (String, String)] {
override def processElement(value: (String, String), ctx: KeyedProcessFunction[String, (String, String), (String, String)]#Context, out: Collector[(String, String)]): Unit = {
val watermark = ctx.timerService().currentWatermark()
val key = ctx.getCurrentKey
val s = if (watermark == Long.MinValue) s"MIN_VALUE: $watermark" else to_char(watermark)
System.out.println(s"watermark:$key is: $s ")
val eventTime = value._2
if (eventTime > to_char(watermark)) out.collect(value)
}
}).setParallelism(2).print()
env.execute()
}
}
当我运行上面的代码时,它会打印:
key: B, event time: 2020-08-30 10:50:13, watermark: 2020-08-30 10:50:09
key: A, event time: 2020-08-30 10:50:11, watermark: 2020-08-30 10:50:07
key: B, event time: 2020-08-30 10:50:04, watermark: 2020-08-30 10:50:09
key: A, event time: 2020-08-30 10:50:08, watermark: 2020-08-30 10:50:07
watermark:A is: MIN_VALUE: -9223372036854775808
watermark:B is: MIN_VALUE: -9223372036854775808
watermark:A is: MIN_VALUE: -9223372036854775808
watermark:B is: 2020-08-30 10:50:09
3> (B,2020-08-30 10:50:13)
4> (A,2020-08-30 10:50:08)
3> (A,2020-08-30 10:50:11)
我只是不明白为什么我
watermark:B is: MIN_VALUE: -9223372036854775808
watermark:A is: MIN_VALUE: -9223372036854775808
watermark:B is: MIN_VALUE: -9223372036854775808
有没有人能试试这个代码帮我解决一下?我已经在这个问题上纠结了好几个小时了!
谢谢!
2条答案
按热度按时间lvjbypge1#
问题在于,由于您正在并行执行
keyBy
,因此Flink会将Watermark
计算为所有并行流生成的水印的min
。因此,在这种情况下,
A
生成的水印对于B
并不总是可见的,而B
生成的时间戳对于A
也是不可见的,甚至可能出现上面的程序所有行都打印-9223372036854775808
的情况。请注意,
partitionCustom
不执行逻辑分区,因此它与keyBy
不是一回事,它仍然会导致shuffle。ars1skjm2#
以下是导致这一结果的事实:
(1)作业拓扑如下所示:你有一个源任务,接着是一个定制的分区器,它将A和B事件发送到水印生成器的2个示例,一个处理A事件,另一个处理B事件,接着是一个散列分区器,它将密钥处理函数的2个示例混洗。
(2)间断水印生成器的工作方式是,所创建的水印总是跟随产生它们的事件。(还有fwiw,只有当水印大于当前水印时才会发出水印。)这解释了为什么process函数使用-9223372036854775808打印某些行,因为processElement是在操作符处理相应水印之前调用的。
(3)具有多个输入的算子的当前水印是来自其所有输入通道的最近接收的水印的最小值。
对于正好打印MIN_VALUE 3次的方案,请考虑以下顺序:
请注意,确切的行为是不确定的,因为它取决于来自两个不同上游运算符的事件和水印之间的竞争。如果来自A的水印在第二个B事件之后到达例程2,则将打印第四个MIN_VALUE。