pig到cassandra-使用python udf和cqlstorage传递列表对象

gt0wga4j  于 2021-06-21  发布在  Pig
关注(0)|答案(1)|浏览(296)

我正在研究一个数据流,包括pig中的一些聚合步骤和cassandra中的存储步骤。我已经能够传递相对简单的数据类型,比如integer、long或dates,但是找不到如何使用cqlstorage将某种列表、集合或元组从pig传递到cassandra。
我使用pig0.9.2,所以不能使用flatten方法。

问题

如何填充包含复杂数据类型(如pig0.9.2中的集合或列表)的cassandra表?

我的具体应用概述:

我根据以下描述创建了相应的cassandra表:

CREATE TABLE mycassandracf (
my_id int,
date timestamp,
my_count bigint,
grouped_ids list<bigint>,
PRIMARY KEY (my_id, date));

以及一份载有准备好的声明的存储说明:

STORE CassandraAggregate
INTO 'cql://test/mycassandracf?output_query=UPDATE+test.mycassandracf+set+my_count+%3D+%3F%2C+grouped_ids+%3D+%3F'
USING CqlStorage;

从“groupby”关系中,我以cql友好的格式(例如元组)来“生成”一个关系,我想将其存储到cassandra中。

CassandraAggregate = FOREACH GroupedRelation
    GENERATE TOTUPLE(TOTUPLE('my_id', $0.my_id),
    TOTUPLE('date', ISOToUnix($0.createdAt))),
    TOTUPLE(COUNT($1), $1.grouped_id);

DUMP CassandraAggregate;

(((my_id,30021),(date,1357084800000)),(2,{(60128490006325819),(62726281032786005)}))
(((my_id,30165),(date,1357084800000)),(1,{(60128411174143024)}))
(((my_id,30376),(date,1357084800000)),(4,{(60128411146211875),(63645100121476995),(60128411146211875),(63645100121476995)}))

不出所料,在此关系上使用store指令会引发异常:
java.lang.classcastexception:org.apache.pig.data.defaultdatabag不能转换为org.apache.pig.data.databytearray
因此,我添加了一个用python编写的udf,以便对分组的\u id包应用一些扁平化:

@outputSchema("flat_bag:bag{}")
def flattenBag(bag):
    return tuple([long(item) for tup in bag for item in tup])

我之所以使用tuple,是因为使用python集和python列表最终会导致转换错误。
将其添加到我的管道中,我有:

CassandraAggregate = FOREACH GroupedRelation
    GENERATE TOTUPLE(TOTUPLE('my_id', $0.my_id),
    TOTUPLE('date', ISOToUnix($0.createdAt))),
    TOTUPLE(COUNT($1), py_f.flattenBag($1.grouped_id));

DUMP CassandraAggregate;

(((my_id,30021),(date,1357084800000)),(2,(60128490006325819,62726281032786005)))
(((my_id,31120),(date,1357084800000)),(1,(60128411174143024)))
(((my_id,31120),(date,1357084800000)),(1,(60128411146211875,63645100121476995,6012841114621187563645100121476995)))

在最后一个关系上使用store指令会引发异常并返回错误堆栈:

java.io.IOException: java.io.IOException: org.apache.thrift.transport.TTransportException
at     org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:465)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:428)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:408)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:262)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:652)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
at org.apache.hadoop.mapred.Child$4.run(Child.java:266)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:260)
Caused by: java.io.IOException: org.apache.thrift.transport.TTransportException
at org.apache.cassandra.hadoop.cql3.CqlRecordWriter$RangeClient.run(CqlRecordWriter.java:248)
Caused by: org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
at org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
at org.apache.cassandra.thrift.Cassandra$Client.recv_execute_prepared_cql3_query(Cassandra.java:1724)
at org.apache.cassandra.thrift.Cassandra$Client.execute_prepared_cql3_query(Cassandra.java:1709)
at org.apache.cassandra.hadoop.cql3.CqlRecordWriter$RangeClient.run(CqlRecordWriter.java:232)

我用简单的数据类型测试了完全相同的工作流,并且工作得非常好。我真正想要的是用复杂类型(如pig中的集合或列表)填充cassandra表的方法。
非常感谢

x7yiwoj4

x7yiwoj41#

经过进一步调查,我在这里找到了解决办法:
https://issues.apache.org/jira/browse/cassandra-5867
基本上,cqlstorage支持复杂类型。为此,类型应该由元组中的元组来表示,将数据类型作为字符串作为第一个元素。对于列表,这是如何做到这一点:


# python

@outputSchema("flat_bag:bag{}")
def flattenBag(bag):
    return ('list',) + tuple([long(item) for tup in bag for item in tup])

因此,咕哝着:


# pig

CassandraAggregate = FOREACH GroupedRelation
    GENERATE TOTUPLE(TOTUPLE('my_id', $0.my_id),
    TOTUPLE('date', ISOToUnix($0.createdAt))),
    TOTUPLE(COUNT($1), py_f.flattenBag($1.grouped_id));

DUMP CassandraAggregate;

(((my_id,30021),(date,1357084800000)),(2,(list, 60128490006325819,62726281032786005)))
(((my_id,31120),(date,1357084800000)),(1,(list, 60128411174143024)))
(((my_id,31120),(date,1357084800000)),(1,(list, 60128411146211875,63645100121476995,6012841114621187563645100121476995)))

然后使用经典编码的prepared语句将其存储到cassandra中。
希望这能有所帮助。

相关问题