无法理解使用partitionCustom和keyBy的flink代码的结果

vaqhlq81  于 2023-02-27  发布在  Apache
关注(0)|答案(2)|浏览(206)

我有下面的flink代码来实验水印。我想实现的是AB通过将它们路由到不同的分区来使用不同的水印。

import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks, KeyedProcessFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.util.Collector

import java.text.SimpleDateFormat
import java.util.Date

object Test{
  def to_milli(str: String) =
    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(str).getTime

  def to_char(milli: Long) = {
    val date = if (milli <= 0) new Date(0) else new Date(milli)
    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date)
  }

  val data = Seq(
    ("A", "2020-08-30 10:50:11"),
    ("B", "2020-08-30 10:50:13"),
    ("B", "2020-08-30 10:50:04"),
    ("A", "2020-08-30 10:50:08")
  )

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(4)

    env.fromCollection(data).setParallelism(1).partitionCustom(new Partitioner[String] {
      override def partition(key: String, numPartitions: Int): Int = key.hashCode() % numPartitions
    }, e => e._1).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[(String, String)]() {
      var maxSeen = Long.MinValue

      override def checkAndGetNextWatermark(lastElement: (String, String), extractedTimestamp: Long): Watermark = {
        val eventTime = to_milli(lastElement._2)
        if (eventTime > maxSeen) {
          maxSeen = eventTime
        }
        println(s"key: ${lastElement._1}, event time: ${to_char(eventTime)}, watermark: ${to_char(maxSeen - 4 * 1000)} ")
        new Watermark(maxSeen - 4 * 1000)
      }

      override def extractTimestamp(element: (String, String), previousElementTimestamp: Long): Long = to_milli(element._2)

    }).setParallelism(2).keyBy(_._1).process(new KeyedProcessFunction[String, (String, String), (String, String)] {
      override def processElement(value: (String, String), ctx: KeyedProcessFunction[String, (String, String), (String, String)]#Context, out: Collector[(String, String)]): Unit = {
        val watermark = ctx.timerService().currentWatermark()
        val key = ctx.getCurrentKey
        val s = if (watermark == Long.MinValue) s"MIN_VALUE: $watermark" else to_char(watermark)
        System.out.println(s"watermark:$key is: $s ")
        val eventTime = value._2

        if (eventTime > to_char(watermark)) out.collect(value)
      }
    }).setParallelism(2).print()

    env.execute()

  }

}

当我运行上面的代码时,它会打印:

key: B, event time: 2020-08-30 10:50:13, watermark: 2020-08-30 10:50:09 
key: A, event time: 2020-08-30 10:50:11, watermark: 2020-08-30 10:50:07 
key: B, event time: 2020-08-30 10:50:04, watermark: 2020-08-30 10:50:09 
key: A, event time: 2020-08-30 10:50:08, watermark: 2020-08-30 10:50:07 

watermark:A is: MIN_VALUE: -9223372036854775808 
watermark:B is: MIN_VALUE: -9223372036854775808 
watermark:A is: MIN_VALUE: -9223372036854775808 
watermark:B is: 2020-08-30 10:50:09 

3> (B,2020-08-30 10:50:13)
4> (A,2020-08-30 10:50:08)
3> (A,2020-08-30 10:50:11)

我只是不明白为什么我

watermark:B is: MIN_VALUE: -9223372036854775808 
watermark:A is: MIN_VALUE: -9223372036854775808 
watermark:B is: MIN_VALUE: -9223372036854775808

有没有人能试试这个代码帮我解决一下?我已经在这个问题上纠结了好几个小时了!
谢谢!

lvjbypge

lvjbypge1#

问题在于,由于您正在并行执行keyBy,因此Flink会将Watermark计算为所有并行流生成的水印的min
因此,在这种情况下,A生成的水印对于B并不总是可见的,而B生成的时间戳对于A也是不可见的,甚至可能出现上面的程序所有行都打印-9223372036854775808的情况。
请注意,partitionCustom不执行逻辑分区,因此它与keyBy不是一回事,它仍然会导致shuffle。

ars1skjm

ars1skjm2#

以下是导致这一结果的事实:
(1)作业拓扑如下所示:你有一个源任务,接着是一个定制的分区器,它将A和B事件发送到水印生成器的2个示例,一个处理A事件,另一个处理B事件,接着是一个散列分区器,它将密钥处理函数的2个示例混洗。
(2)间断水印生成器的工作方式是,所创建的水印总是跟随产生它们的事件。(还有fwiw,只有当水印大于当前水印时才会发出水印。)这解释了为什么process函数使用-9223372036854775808打印某些行,因为processElement是在操作符处理相应水印之前调用的。
(3)具有多个输入的算子的当前水印是来自其所有输入通道的最近接收的水印的最小值。
对于正好打印MIN_VALUE 3次的方案,请考虑以下顺序:

  • process函数的示例1处理事件A,生成MIN_VALUE行
  • process函数的示例2处理事件B,生成另一MIN_VALUE行
  • 示例1处理A之后的水印,并更新通道A的水印,同时将通道B的水印保留在MIN_VALUE
  • 示例1处理另一个A事件。它的当前水位线仍然是MIN_VALUE。
  • 示例2处理B事件之后的水印。在某个时刻,它还处理来自通道A的水印,之后其当前水印变为2020-08-30 10:50:09
  • 示例2处理另一个B事件并打印出其水印,现在水印为2020-08-30 10:50:09。

请注意,确切的行为是不确定的,因为它取决于来自两个不同上游运算符的事件和水印之间的竞争。如果来自A的水印在第二个B事件之后到达例程2,则将打印第四个MIN_VALUE。

相关问题