这里有一个要求:数据集太大,我们需要对数据进行分区,在每个分区中计算一个局部结果,然后合并。例如,如果有100万条数据被划分为100个分区,那么每个副本将只有大约10000条数据。由于需要使用分区数进行优化,因此分区数必须是可变的。另外,一个分区的所有数据都必须批量计算,不能逐个计算。
实现如下:在分区阶段之后,每个数据段都有一个键来表示它所属的分区。现在,数据应该是这样的: afterPartitionedData=[(0,data1),(0,data2)…(1,data3),(1,data4),…,(99,datan)]
下一步,用Flink的 partitionCustom
以及 mapPartition
操作员。
dataSet = env. fromCollection(afterPartitionedData)
dataset
.partitionCustom(new myPartitioner(),0)
.mapPartition(new myMapPartitionFunction[(Int,String),Int]())
…
…
class myPartitioner extends Partitioner[Int]{
override def partition(key: Int, numPartitions: Int) = {
println("numPartitions="+numPartitions) // 6 , CPU number
key // just return the partitionID
}
}
但是,报告了一个错误:
...
Caused by: java.lang.ArrayIndexOutOfBoundsException: 6
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:226)
...
这似乎是因为flink的默认分区数 DataSet
是CPU的数量,在我的计算机上是6,所以它将被报告 java.lang.ArrayIndexOutOfBoundsException : 6
.
所以我的问题是:有没有办法随意改变分区的数量?我在方法中找到了这个参数 Partition (key: int, numpartitions: int)
在api分区器中,但不知道如何更改它。
有没有办法改变 DataSet
游击队?
flink版本为1.6,测试代码为:
object SimpleFlinkFromBlog {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val afterPartitionedData = new mutable.MutableList[(Int, String)]
afterPartitionedData.+=((0, "0"))
afterPartitionedData.+=((1, "1"))
afterPartitionedData.+=((2, "2"))
afterPartitionedData.+=((2, "2"))
afterPartitionedData.+=((3, "3"))
afterPartitionedData.+=((3, "3"))
afterPartitionedData.+=((3, "3"))
afterPartitionedData.+=((4, "4"))
afterPartitionedData.+=((5, "5"))
afterPartitionedData.+=((5, "5"))
afterPartitionedData.+=((5, "5"))
// Comment this line will not report an error.
// java.lang.ArrayIndexOutOfBoundsException : 6
afterPartitionedData.+=((6, "will wrong"))
val dataSet = env.fromCollection( afterPartitionedData )
val localRes = dataSet
.partitionCustom(new myPartitioner(),0)
.mapPartition(new MapPartitionFunction[(Int,String),Int] {
override def mapPartition(values: lang.Iterable[(Int, String)], out: Collector[Int]) = {
var count = 0;
values.forEach(new Consumer[(Int, String)] {
override def accept(t: (Int, String)): Unit = {
count=count+1;
print("current count is " + count + " tuple is " + t + "\n");
}
})
out.collect(count)
}
})
localRes.collect().foreach(println)
}
class myPartitioner extends Partitioner[Int]{
override def partition(key: Int, numPartitions: Int) = {
// println("numPartitions="+numPartitions)
key
}
}
}
谢谢您!
1条答案
按热度按时间5m1hhzi41#
分区数是并行度,提交作业时可以在命令行上或在flink-conf.yaml中设置。