apachepig:简化程序的扁平化和并行执行

rsaldnfx  于 2021-06-03  发布在  Hadoop
关注(0)|答案(4)|浏览(316)

我已经实现了一个apachepig脚本。当我执行脚本时,它会为一个特定的步骤生成许多Map器,但是该步骤只有一个reducer。由于这种情况(多个Map器,一个reducer),hadoop集群在单个reducer执行时几乎处于空闲状态。为了更好地使用集群的资源,我还希望有许多并行运行的reducer。
即使我在pig脚本中使用set default\u parallel命令设置并行性,我仍然只会得到一个reducer。
出现问题的代码部分如下:

SET DEFAULT_PARALLEL 5;
inputData = LOAD 'input_data.txt' AS (group_name:chararray, item:int);
inputDataGrouped = GROUP inputData BY (group_name);
-- The GeneratePairsUDF generates a bag containing pairs of integers, e.g. {(1, 5), (1, 8), ..., (8, 5)}
pairs = FOREACH inputDataGrouped GENERATE GeneratePairsUDF(inputData.item) AS pairs_bag;
pairsFlat = FOREACH pairs GENERATE FLATTEN(pairs_bag) AS (item1:int, item2:int);

“inputdata”和“inputdatagrouped”别名在Map器中计算。
减速器中的“成对”和“成对扁平”。
如果我用flatten命令(pairsflat=foreach pairs generate flatten(pairs_bag)as(item1:int,item2:int);)删除行来更改脚本然后执行5个reducer(因此是并行执行)。
似乎扁平命令是问题所在,它避免了创建许多减速器。
我怎样才能达到同样的扁平化结果,但是让脚本并行执行(使用许多缩减器)?
编辑:
有两个foreach时说明计划(如上所述):

Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-32
|   |
|   Project[chararray][0] - scope-33
|
|---inputData: New For Each(false,false)[bag] - scope-29
    |   |
    |   Cast[chararray] - scope-24
    |   |
    |   |---Project[bytearray][0] - scope-23
    |   |
    |   Cast[int] - scope-27
    |   |
    |   |---Project[bytearray][1] - scope-26
    |
    |---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-22--------

Reduce Plan
pairsFlat: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-42
|
|---pairsFlat: New For Each(true)[bag] - scope-41
    |   |
    |   Project[bag][0] - scope-39
    |
    |---pairs: New For Each(false)[bag] - scope-38
        |   |
        |   POUserFunc(GeneratePairsUDF)[bag] - scope-36
        |   |
        |   |---Project[bag][1] - scope-35
        |       |
        |       |---Project[bag][1] - scope-34
        |
        |---inputDataGrouped: Package[tuple]{chararray} - scope-31--------
Global sort: false

当只有一个foreach和flatten Package udf时,说明计划:

Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-29
|   |
|   Project[chararray][0] - scope-30
|
|---inputData: New For Each(false,false)[bag] - scope-26
    |   |
    |   Cast[chararray] - scope-21
    |   |
    |   |---Project[bytearray][0] - scope-20
    |   |
    |   Cast[int] - scope-24
    |   |
    |   |---Project[bytearray][1] - scope-23
    |
    |---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-19--------

Reduce Plan
pairs: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
|
|---pairs: New For Each(true)[bag] - scope-35
    |   |
    |   POUserFunc(GeneratePairsUDF)[bag] - scope-33
    |   |
    |   |---Project[bag][1] - scope-32
    |       |
    |       |---Project[bag][1] - scope-31
    |
    |---inputDataGrouped: Package[tuple]{chararray} - scope-28--------
Global sort: false
nimxete2

nimxete21#

我认为数据有偏差。只有少数Map器产生指数级的大输出。查看数据中密钥的分布。like数据包含的组很少,记录也很多。

pftdvrlh

pftdvrlh2#

如果pig对pig脚本中的每个步骤都使用配置默认的\u并行值,则没有把握。尝试与您觉得需要时间的特定加入/小组步骤并行(在您的案例小组步骤中)。

inputDataGrouped = GROUP inputData BY (group_name) PARALLEL 67;

如果它仍然不起作用,那么您可能需要查看您的数据以解决偏斜问题。

kknvjkwl

kknvjkwl3#

我尝试了“设置默认并行”和“并行100”,但没有运气。清管器仍使用1个减速机。
结果我必须为每个记录生成一个从1到100的随机数,然后用这个随机数对这些记录进行分组。
我们在分组上浪费时间,但对我来说要快得多,因为现在我可以使用更多的缩减器。
下面是代码(提交者是我自己的自定义项):

tmpRecord = FOREACH record GENERATE (int)(RANDOM()*100.0) as rnd, data;
groupTmpRecord = GROUP tmpRecord BY rnd;
result = FOREACH groupTmpRecord GENERATE FLATTEN(SUBMITTER(tmpRecord));
dm7nw8vv

dm7nw8vv4#

为了回答你的问题,我们必须首先知道pig执行了多少个reducer来完成全局重排过程。因为根据我的理解,生成/投影不需要一个减速机。我不能对扁平说同样的话。然而,我们从常识中知道,在展平过程中,目的是将元组从包中去嵌套,反之亦然。要做到这一点,属于一个包的所有元组都应该在同一个reducer中可用。我可能错了。但是有人能在这里添加一些东西来得到这个用户的答案吗?

相关问题