如何用hadoop将cql集合对象保存到cassandra?

disho6za  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(411)

我使用spark hadoop api从cassandra获取数据并将结果保存到cassandra。对于行值,如果列类型为long,则这是使用用于hadoop的cqloutputformat适配器向cassandra发送数据的方法:

val outVal = new java.util.ArrayList[ByteBuffer](1)
outVal.add(ByteBufferUtil.bytes(count.longValue()))

但是,当列类型为 set<text> ,我无法让它工作。我试图用objectoutputstream序列化java.util.set对象,但thrift客户端抛出 InvalidRequestException(why:string didn't validate.) ```
val outVal = new java.util.ArrayListByteBuffer
val byteOut = new ByteArrayOutputStream()
val out = new ObjectOutputStream(byteOut)
out.writeObject(data)
byteOut.close()
outVal.add(ByteBuffer.wrap(byteOut.toByteArray))
(outKey, outVal)

似乎它希望outval是一个字符串值。我查看了cassandra中setserializer和collectionserializer类的源代码,似乎cassandra对集合对象使用自定义序列化。hadoop cql3api提供了一种序列化集合对象的方法,还是我必须找到一种从外部使用cassandra内部类的方法?
i34xakig

i34xakig1#

现在唯一的解决方案似乎是从cassandra源代码复制序列化代码。以下是cassandra如何在内部处理集合对象:

List<ByteBuffer> bbs = new ArrayList(list.size());
    int size = 0;
    for (String elt : list)
    {
        ByteBuffer bb = ByteBufferUtil.bytes(elt);
        bbs.add(bb);
        size += 2 + bb.remaining();
    }

    ByteBuffer result = ByteBuffer.allocate(2 + size);
    result.putShort((short)list.size());
    for (ByteBuffer bb : bbs)
    {
        result.putShort((short)bb.remaining());
        result.put(bb.duplicate());
    }
    return (ByteBuffer)result.flip();

相关问题