flink自定义配分函数

7vhp5slm  于 2021-06-21  发布在  Flink
关注(0)|答案(3)|浏览(481)

我在flink上使用scala和dataset api。我想跨节点重新划分数据。spark有一个函数,允许用户用给定的numberofpartitions参数(link)重新划分数据,我相信flink不支持这个函数。因此,我想通过实现一个自定义分区函数来实现这一点。
我的数据属于数据集类型(double,sparsevector)数据中的一行示例:

(1.0 SparseVector((2024,1.0), (2025,1.0), (2030,1.0), (2045,1.0), (2046,1.41), (2063,1.0), (2072,1.0), (3031,1.0), (3032,1.0), (4757,1.0), (4790,1.0), (177196,1.0), (177197,0.301), (177199,1.0), (177202,1.0), (1544177,1.0), (1544178,1.0), (1544179,1.0), (1654031,1.0), (1654190,1.0), (1654191,1.0), (1654192,1.0), (1654193,1.0), (1654194,1.0), (1654212,1.0), (1654237,1.0), (1654238,1.0)))

因为我的“double”是二进制的(1或-1),所以我想根据sparcevector的长度来划分数据。我的自定义分区如下:

class myPartitioner extends Partitioner[SparseVector]
{ 
    override def partition(key: SparseVector, numPartitions: Int): Int = {
         key.size % numPartitions
    } 
}

我将此自定义分区器称为:

data.partitionCustom(new myPartitioner(),1)

有人能帮助我理解在scala中调用mypartitioner函数时如何将分区数指定为“numpartitions”参数吗。
谢谢您。

xwmevbvl

xwmevbvl1#

spark使用repartition(n:int)函数将数据重新分配到n个分区中,这些分区将由n个任务处理。在我看来,这包括两个变化:数据再分配和下游任务的数量。
因此,在apache flink中,我认为partitionerMap到数据再分配,parallelismMap到下游任务的数量,这意味着您可以使用setparallelism来确定“numpartitions”。

ds97pgxw

ds97pgxw2#

我假设你用的是 SparseVector 只是为了得到一些相对随机的值来进行分区。如果这是真的,那么你可以 DataSet.rebalance() . 如果任何操作员(包括 Sink )将平行度设置为 numPartitions ,那么您应该可以很好地重新分区数据。
但是你的描述 ...want to re-partition my data across the nodes 让我觉得你是在运用spark的 RDD 这是Flink,这不是真的有效。e、 g.假设你有 numPartition 并行运算符处理数据集中的(重新分区的)数据,然后这些运算符将在可用TaskManager提供的插槽中运行,这些插槽可能在不同的物理服务器上,也可能不在不同的物理服务器上。

pkbketx9

pkbketx93#

在Flink你可以定义 setParallelism 对于单个运算符或使用 enviornment.setParallelism . 我希望这个链接能对你有所帮助。

相关问题