scala—如何更改flink数据集的默认分区数?

mznpcxlj  于 2021-07-15  发布在  Flink
关注(0)|答案(1)|浏览(557)

这里有一个要求:数据集太大,我们需要对数据进行分区,在每个分区中计算一个局部结果,然后合并。例如,如果有100万条数据被划分为100个分区,那么每个副本将只有大约10000条数据。由于需要使用分区数进行优化,因此分区数必须是可变的。另外,一个分区的所有数据都必须批量计算,不能逐个计算。
实现如下:在分区阶段之后,每个数据段都有一个键来表示它所属的分区。现在,数据应该是这样的: afterPartitionedData=[(0,data1),(0,data2)…(1,data3),(1,data4),…,(99,datan)] 下一步,用Flink的 partitionCustom 以及 mapPartition 操作员。

dataSet = env. fromCollection(afterPartitionedData)
  dataset
      .partitionCustom(new myPartitioner(),0)
      .mapPartition(new myMapPartitionFunction[(Int,String),Int]())
…
…
  class myPartitioner extends  Partitioner[Int]{
    override def partition(key: Int, numPartitions: Int) = {
      println("numPartitions="+numPartitions) // 6 , CPU number
      key // just return the partitionID
    }
  }

但是,报告了一个错误:

...
Caused by: java.lang.ArrayIndexOutOfBoundsException: 6
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:226)
...

这似乎是因为flink的默认分区数 DataSet 是CPU的数量,在我的计算机上是6,所以它将被报告 java.lang.ArrayIndexOutOfBoundsException : 6 .
所以我的问题是:有没有办法随意改变分区的数量?我在方法中找到了这个参数 Partition (key: int, numpartitions: int) 在api分区器中,但不知道如何更改它。
有没有办法改变 DataSet 游击队?
flink版本为1.6,测试代码为:

object SimpleFlinkFromBlog {

  def main(args: Array[String]): Unit = {
    val  env  =  ExecutionEnvironment.getExecutionEnvironment
    val afterPartitionedData = new mutable.MutableList[(Int,  String)]
    afterPartitionedData.+=((0,  "0"))

    afterPartitionedData.+=((1,  "1"))

    afterPartitionedData.+=((2, "2"))
    afterPartitionedData.+=((2, "2"))

    afterPartitionedData.+=((3,  "3"))
    afterPartitionedData.+=((3,  "3"))
    afterPartitionedData.+=((3,  "3"))

    afterPartitionedData.+=((4,  "4"))

    afterPartitionedData.+=((5,  "5"))
    afterPartitionedData.+=((5,  "5"))
    afterPartitionedData.+=((5,  "5"))

    // Comment this line will not report an error.
    // java.lang.ArrayIndexOutOfBoundsException : 6
    afterPartitionedData.+=((6,  "will wrong"))

    val dataSet = env.fromCollection( afterPartitionedData )
    val localRes = dataSet
      .partitionCustom(new myPartitioner(),0)
      .mapPartition(new MapPartitionFunction[(Int,String),Int] {
        override def mapPartition(values: lang.Iterable[(Int, String)], out: Collector[Int]) = {
          var count = 0;
          values.forEach(new Consumer[(Int, String)] {
            override def accept(t: (Int, String)): Unit = {
              count=count+1;
              print("current count is " + count + "   tuple is " + t + "\n");
            }
          })
          out.collect(count)
        }
      })

    localRes.collect().foreach(println)
  }

  class myPartitioner extends  Partitioner[Int]{
    override def partition(key: Int, numPartitions: Int) = {
//      println("numPartitions="+numPartitions)
      key
    }
  }
}

谢谢您!

5m1hhzi4

5m1hhzi41#

分区数是并行度,提交作业时可以在命令行上或在flink-conf.yaml中设置。

相关问题