在apacheflink中有没有内置函数可以用来运行源代码同步?

nbnkbykc  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(341)

我们是ApacheFlink和scala的新手。这里是我们的用例,就像我们从amps服务器(crankuptheamps)开始使用两种类型的主题来传播数据一样。源1和2从主题1和2中提取数据。
用例
我们的要求只是最初的数据源#1将在第二个数据源启动之前播送(世界现状)数据。因为我们只是将源1数据存储到Map状态。那么我们只需要启动source#2sow操作。最终我们需要逐个运行源代码。那么是否有任何选项可以逐个运行源代码。

def sourceConnect(environment: StreamExecutionEnvironment,topic: String, subscriptionType: SubscriptionType): DataStream[Map[String, String]] = {

val dataStream : DataStream[Map[String, String]] = environment.addSource(new RichSourceFunction[Map[String,String]]()  {

  var sourceClient: Client = null

  override def open(parameters: Configuration): Unit = {
    // .... Code Here
  }

  override def run(sourceContext: SourceFunction.SourceContext[Map[String, String]]): Unit = {
    subscriptionType match {

      case SubscriptionType.sow =>
          //.... Code Here

    }
  }

  override def getRuntimeContext: RuntimeContext = super.getRuntimeContext

  override def cancel(): Unit = {
    sourceClient.close()
  }

  override def close(): Unit = try cancel()
  finally super.close()

})
dataStream }
private var environment: StreamExecutionEnvironment = null

     // .... Code Here

     val source1 = environment.addSource(....)
     val source2 = environment.addSource(....)

     val conn = source1.connect(source2)

     conn.print()

     environment.execute()

最终我们的用例只需要先运行source1,然后运行source2,这意味着同步

j2cgzkjk

j2cgzkjk1#

我以前入侵过一个联合消息源 SourceFunction 这使我可以首先发送来自一个源的所有数据,然后再发送来自后续源的数据。我用这个来引导某个州。也许这对你的用例有用。
--肯

luaexgnf

luaexgnf2#

在Flink,没有什么特别奇妙的方法可以在开始阅读另一条流之前先吞下一条流。这个一般性的主题通常被称为边输入,在这个主题上有一个flip(flink改进建议)。
到目前为止,在这个主题上最好的资源是在FLink转发旧金山2018,由LyFT的格雷戈瑞费在Apache Fink上的自举状态进行了讨论,探索了几种可能的方法。哪一种可能是最好的取决于您特定的应用程序需求。

相关问题