基于日期的pig数据清理

x6yk4ghg  于 2021-06-24  发布在  Pig
关注(0)|答案(1)|浏览(335)

我有两个数据集,如下所示:
1id和位置

{ID, beginning year, ending year, location}.

样品:

(1001, 2010, 2012, CA)
(1001, 2013, 2015, WA)
(1002, 2009, 2015, AZ)
(1003, 2014, 2015, FL)

2id和连接

{ID1, ID2, connection creating date}

样品:

(1001, 1002, 2013)
(1001, 1003, 2014)

我想根据地点和年份计算连接数。我假设一旦建立了连接,它就永远不会过期。我想要的结果如下

{Location 1, Location2, year, number of connections}

在上面的例子中,应该是:

(WA, AZ,2013,1)
(WA, AZ,2014,1)
(WA, AZ,2015,1)
(WA, FL,2014,1)
(WA, FL,2015,1)

有人知道如何在ApachePig中实现这一点吗?

scyqe7ek

scyqe7ek1#

正如您在评论中提到的,我们将在某个时候需要转移到年度信息。为了将数据大小膨胀的影响降到最低,我们需要在pig脚本中尽可能地将其向下移动。我们首先需要做的是以下数据转换:

{ID1, ID2, connection creating date} -> {Location1, Location2, start_year, end_year}

这可以通过以下pig脚本语句实现:

locationData = LOAD 'path1' USING PigStorage('\t') AS (ID:chararray, beginning_year:long, ending_year:long, location:chararray);
connectionData = LOAD 'path2' USING PigStorage('\t') AS (ID1:chararray, ID2:chararray, connection_year:long);

partialJoin = JOIN connectionData USING ID1, locationData USING ID;
partialExtracted = FOREACH partialJoin GENERATE
                           ID2,
                           connection_year,
                           location AS location1,
                           (beginning_year > connection_year ? beginning_year : connection_year) AS start_year,
                           ending_year AS end_year;

fullJoin = JOIN partialExtracted USING ID2, locationData USING ID;
fullExtracted = FOREACH fullJoin GENERATE,
                           location1,
                           location AS location2,
                           (beginning_year > start_year ? beginning_year : start_year) AS start_year,
                           (ending_year < end_year ? ending_year : end_year ) AS end_year;

fullFiltered = FILTER fullExtracted BY (end_year < start_year);

我们现在准备爆炸的数据,以获得每年的信息。基本上,需要进行以下数据转换:

{Location1, Location2, start_year, end_year} -> {Location1, Location2, year}
e.g.
WA, AZ, 2013, 2015
->
WA, AZ, 2013
WA, AZ, 2014
WA, AZ, 2015

在这里,自定义项是不可避免的。我们将需要一个自定义项,其中采取开始年和结束年,并返回一个包的范围内的年。你应该能够按照在线教程来编写你的自定义项。假设这个自定义项被称为getyearrange()。您的脚本如下所示:

fullExploded = FOREACH fullFiltered GENERATE
                       location1, location2,
                       FLATTEN(getYearRange(start_year, end_year)) AS year;

剩下的就是一组人来做最后的计数:

fullGrouped = GROUP fullExploded BY (location1, location2, year);
finalOutput = FOREACH fullGrouped GENERATE 
              FLATTEN(group) AS (location1, location2, year),
              COUNT(fullExploded) AS count;

上面描述了数据流。您可能需要添加额外的步骤来处理边缘情况并确保数据的完整性。

相关问题