关于团簇上flink平行度的scala问题

9lowa7mx  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(360)

对于apache flink和流处理框架,我都是新手,我对它有几个问题,特别是并行性。
首先这是我的代码:

object KafkaConsuming {

  def main(args: Array[String]) {

    //****CONFIGURATION & PARAMETERS****
    val params: ParameterTool = ParameterTool.fromArgs(args)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(8)
        env.getConfig.setGlobalJobParameters(params)

    //****Kafka CONNECTION****
    val properties = new Properties();
    properties.setProperty("bootstrap.servers", params.get("server"));

    //****Get KAFKA source****
    val stream: DataStream[String] = env.addSource(new FlinkKafkaConsumer010[String](params.get("topic"), new SimpleStringSchema(), properties))

    //****PROCESSING****
    val logs: DataStream[MinifiedLog] = stream.map(x => LogParser2.parse(x))

    val sessions = logs.map { x => (x.timestamp, x.bytesSent, 1l, 1)}

    val sessionCnt: DataStream[(Long, Long, Long, Int)] = sessions
      .keyBy(3).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
      .reduce( (x: (Long, Long, Long, Int), y: (Long, Long, Long, Int)) => (x._1, x._2 + y._2, x._3 + y._3, x._4))
      .map { z => (z._1, z._2 / 10, z._3 / 10, z._4)}

    //****OUTPUT****
    val output: DataStream[String] = sessionCnt.map(x => (x.toString() + "\n"))
    output.writeToSocket("X.X.X.X", 3333, new SimpleStringSchema)

    env.execute("Kafka consuming")

  }
}

当我想在我的集群上运行它时,我运行以下命令:

./bin/flink run -m yarn-cluster -yn 8 /directories/myjar.jar --server X.X.X.X --topic mytopic

这很有效。下面是我的问题:
我在flink的网页用户界面上看到:

1.为什么接收的记录总是发送的记录的一半,而数据量是相同的?
如果我进入Windows的细节:

显然所有的过程都是在我的slave 4上完成的,而且只有一个线程!同样的情况也发生在源头上。只有一个线程用于接收数据。
2.为什么Flink没有为这一步使用所有可能的线程?
我注意到源、窗口和接收器是由不同的从机处理的,但我仍然希望这个过程在集群上并行完成。
我在这篇文章上读到:https://stackoverflow.com/a/32329010/5035392 如果kafka源只有一个分区(这是我的情况),flink就不能在不同的节点上共享任务。不过,我的窗口处理应该能做到吗?
如果这些都是无关紧要的问题,我很抱歉。我不确定我做错的是flink还是我的集群配置。谢谢您。

66bbxpm5

66bbxpm51#

同一个键的所有值都在一个 TaskManager . 在你的例子中 sessions.keyBy(3) 流具有相同的密钥-> 1 因此,所有计算都在单个任务槽中执行。

相关问题