使用cassandra和pig进行数据清理

uurv41yg  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(302)

我有两组数据,我想在Pig比较。两者都有相同的唯一ID,其中第二组数据中的名称随机更改。逻辑如下:
加载1原始数据
加载EMP2原始数据
选择“名称不相同”和“emplno相等”的行
我做过:

A1=  LOAD 'cassandra://employees_pig1/employees_cf' USING CassandraStorage() AS (key, columns: bag {T: tuple(name, value)});

B1=  LOAD 'cassandra://employees_pig2/employees_cf' USING CassandraStorage() AS (key, columns: bag {T: tuple(name, value)});

A2 = FOREACH A1 GENERATE key, FLATTEN(columns);

B2 = FOREACH B1 GENERATE key as key2, FLATTEN(columns);

嘿,不能在论坛上发布图片。下面是说明a2、b2的链接https://picasaweb.google.com/lh/photo/su3qgksba4nmq83cdnhivdmtjnzetymypjy0liipfm0?feat=directlink
现在需要帮助,我的方法正确吗?

C1 = join A2 by key, B2 by key2;

D1= filter C1 by A2.key==B2.key2 -- cannot do a A2.first_name!=B2.first_name;

想在“names is not the same”和“emplno is equal”之间选择行,但不完全确定如何选择。请告知。
谢谢你
更新:-我做了一个cogroup c3=cogroup a2 by key,b2 by key,而不是join;
https://picasaweb.google.com/lh/photo/_lkeqw4bvigbnzshkdcjgnmtjnzetymypjy0liipfm0?feat=directlink
接下来,我想

D1= FOREACH C3 GENERATE group, A2.first_name as fn1, B2.first_name as fn2

组返回所需的结果(即empno),但“a2.first\u name,b2.first\u name”不正确。需要知道如何访问a2和b2包/元组中的数据。
然后我就可以用fn1==fn2做一个过滤器。

rkue9o1l

rkue9o1l1#

通过做一个 JOIN (至少是一个内部连接,这是您在上面所做的),您已经注意确保 emplno 来自 A 以及 B 我们是平等的。那么你所要做的就是根据 name 它们是一样的。

C1 = join A2 by key, B2 by key;
D1 = filter C1 by A2::name != B2::name;
hivapdat

hivapdat2#

已解决:)
步骤:-下载pygmalionhttps://github.com/jeromatron/pygmalion/downloads
快速测试:

register '/usr/share/dse/pygmalion/pygmalion-1.0.0.jar';
define FromCassandraBag org.pygmalion.udf.FromCassandraBag();
define ToCassandraBag org.pygmalion.udf.ToCassandraBag();

A1=  LOAD 'cassandra://employees_pig1/employees_cf' USING CassandraStorage() AS (key,
columns: bag {T: tuple(name, value)});
B1=  LOAD 'cassandra://employees_pig2/employees_cf' USING CassandraStorage() AS (key, 
columns: bag {T: tuple(name, value)});

A2 = foreach A1 generate key,
flatten(org.pygmalion.udf.FromCassandraBag('first_name', columns))
as (first_name: chararray);

B2 = foreach B1 generate key,
flatten(org.pygmalion.udf.FromCassandraBag('first_name', columns))
as (first_name: chararray);

C1 = join A2 by key, B2 by key;
D1= filter C1 BY A2::first_name != B2::first_name;

相关问题