如何为使用ContinuousProcessingTimeTrigger和TumblingProcessingTimeWindows的Flink管道编写测试?它在测试中从不输出

wljmcqd8  于 2022-12-20  发布在  Apache
关注(0)|答案(1)|浏览(133)

我试着写一个简单的测试,但不知道为什么它在我的代码中不起作用。不知道我错过了什么。它似乎从来没有调用触发器到测试中的输出。管道只是发出字符串的聚合输出每秒的每一个窗口。我错过了什么?

object CollectSink {
  val collectedResults = mutable.ListBuffer[(Int, String)]()
}

class CollectSink extends SinkFunction[(Int, String)] {
  override def invoke(value: (Int, String), context: SinkFunction.Context): Unit = {
    CollectSink.collectedResults += value
  }
}

class MyTestSuite extends BaseSuite with BeforeAndAfterAll with BeforeAndAfterEach {
  val flinkCluster = new MiniClusterWithClientResource(
    new MiniClusterResourceConfiguration.Builder()
      .setNumberSlotsPerTaskManager(2)
      .setNumberTaskManagers(1)
      .build
  )

  override def beforeAll() = {
    flinkCluster.before()
  }

  override def beforeEach() = {
    CollectSink.collectedResults.clear()
  }

  override def afterAll() = {
    flinkCluster.after()
  }

  it should "Simple pipeline emits the expected output" in {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)

    val sourceDataStream = env.fromCollection(
      Seq(
        (1, "a"),
        (1, "b"),
        (2, "c"),
        (1, "d"),
        (2, "e")
      )
    )
    val collectSink = new CollectSink()

    // Sample pipeline that emits aggregated strings
    // It emits the result every second for every window of 1 second
    sourceDataStream
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
      .trigger(ContinuousProcessingTimeTrigger.of[TimeWindow](Time.seconds(1)))
      .reduce(new ReduceFunction[(Int, String)] {
        override def reduce(value1: (Int, String), value2: (Int, String)): (Int, String) = (value1._1, value1._2 + value2._2)
      })
      .addSink(collectSink)

    env.execute()

    // Issue here is that it never collects anything somehow, size is always 0
    // It seems like the the pipeline never emits anything in this test
    CollectSink.collectedResults should have size 1
  }
}

参考测试文档:https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/testing/#testing-flink-jobs

bvjxkvbb

bvjxkvbb1#

为了保证这个测试产生结果,它必须运行至少一秒钟。由于只有5个事件要处理,这是不可能的。
一旦所有的输入都被处理,具有绑定源的作业就会停止。挂起处理时间计时器不会被调用。
几个可能的解决方法:

  • 切换到使用事件时间。当一个有界源的作业到达其输入的末尾时,Flink触发所有挂起的事件时间计时器。
  • 请使用未绑定的源,并使测试作业运行足够长的时间,以便生成结果。

相关问题