有没有可能在akka流之间使用flink流处理器作为源和汇?

91zkwejq  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(298)

我想用flink替换我的基于akka流的流处理器的一部分。当前是否可以使用akka流作为flink的源,然后使用flink作为同一代码库中akka流的源?
akka溪流的水流如下:

// Kafka Source -> Read Flow involving Azure cosmosDB -> Evaluate Flow -> Write Flow -> Logger Flow -> Sink
  lazy private val converterGraph: Graph[ClosedShape.type, Future[Done]] =
    GraphDSL.create(sink) { implicit builder => out =>
      source.watchTermination()(Keep.none) ~> prepareFlow ~> readFlow ~> evaluateFlow ~> writeFlow ~> loggerFlow ~> out
      ClosedShape
  }

上述流程定义如下:

def prepareFlow: Flow[FromSource, ToRead, NotUsed]

def readFlow: Flow[ToRead, ToEvaluate, NotUsed]

而不是现在 readFlow 作为一个akka流,我想用一个flink流处理器来代替它。所以 prepareFlow 将是flink的一个输入 readFlow ,其输出将被输入到 evaluateFlow .
基本上,有没有可能这样做:

prepareFlow ~> [Flink source ->read -> result] ~> evaluateFlow ~> writeFlow ~> loggerFlow ~> out

我看到apachebahir中有一个flink-akka连接器(sink),但不确定它是否可以仅用于akka参与者或流。

hm2xizp9

hm2xizp91#

你可以把衣服包起来 prepareFlow 作为一个定制的flink从cosmosdb阅读 Source (通过延长 SourceFunction ),并将整个evaluate write logger流 Package 为自定义流 SinkFunction .
由于flink本身是分布式的,所以您将把akka流集成到flink作业中,但反之亦然。我看到这种方法的主要问题是,阿克卡流有反压力的盒子,但Flink本身主要是阻止。例如,sourcefunction.run()方法需要有一个内部无限循环在每次迭代中生成消息,所以您必须在那里阻塞,等待akka stream在那里生成下一条消息。

相关问题