我在flink上使用scala和dataset api。我想跨节点重新划分数据。spark有一个函数,允许用户用给定的numberofpartitions参数(link)重新划分数据,我相信flink不支持这个函数。因此,我想通过实现一个自定义分区函数来实现这一点。
我的数据属于数据集类型(double,sparsevector)数据中的一行示例:
(1.0 SparseVector((2024,1.0), (2025,1.0), (2030,1.0), (2045,1.0), (2046,1.41), (2063,1.0), (2072,1.0), (3031,1.0), (3032,1.0), (4757,1.0), (4790,1.0), (177196,1.0), (177197,0.301), (177199,1.0), (177202,1.0), (1544177,1.0), (1544178,1.0), (1544179,1.0), (1654031,1.0), (1654190,1.0), (1654191,1.0), (1654192,1.0), (1654193,1.0), (1654194,1.0), (1654212,1.0), (1654237,1.0), (1654238,1.0)))
因为我的“double”是二进制的(1或-1),所以我想根据sparcevector的长度来划分数据。我的自定义分区如下:
class myPartitioner extends Partitioner[SparseVector]
{
override def partition(key: SparseVector, numPartitions: Int): Int = {
key.size % numPartitions
}
}
我将此自定义分区器称为:
data.partitionCustom(new myPartitioner(),1)
有人能帮助我理解在scala中调用mypartitioner函数时如何将分区数指定为“numpartitions”参数吗。
谢谢您。
3条答案
按热度按时间xwmevbvl1#
spark使用repartition(n:int)函数将数据重新分配到n个分区中,这些分区将由n个任务处理。在我看来,这包括两个变化:数据再分配和下游任务的数量。
因此,在apache flink中,我认为partitionerMap到数据再分配,parallelismMap到下游任务的数量,这意味着您可以使用setparallelism来确定“numpartitions”。
ds97pgxw2#
我假设你用的是
SparseVector
只是为了得到一些相对随机的值来进行分区。如果这是真的,那么你可以DataSet.rebalance()
. 如果任何操作员(包括Sink
)将平行度设置为numPartitions
,那么您应该可以很好地重新分区数据。但是你的描述
...want to re-partition my data across the nodes
让我觉得你是在运用spark的RDD
这是Flink,这不是真的有效。e、 g.假设你有numPartition
并行运算符处理数据集中的(重新分区的)数据,然后这些运算符将在可用TaskManager提供的插槽中运行,这些插槽可能在不同的物理服务器上,也可能不在不同的物理服务器上。pkbketx93#
在Flink你可以定义
setParallelism
对于单个运算符或使用enviornment.setParallelism
. 我希望这个链接能对你有所帮助。