使用spark流在hbase/hdfs中保存protobuf

m3eecexj  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(531)

我正在寻找存储在hbase/hdfs使用Spark流的protobuf消息。我有两个问题
存储大量protobuf消息的有效方法是什么?检索它们进行分析的有效方法是什么?例如,它们应该在hbase中存储为strings/byte[],还是应该在hdfs中存储为parquet文件等。
protobuf消息的层次结构应该如何存储?我的意思是,嵌套的元素应该在存储之前被展平,还是有任何机制可以按原样存储它们?如果嵌套元素是集合或Map,是否应将它们分解并存储为多行?
protobuf消息的示例结构如下所示

>     +--MsgNode-1
>       +--Attribute1 - String
>       +--Attribute2 - Int
>       +--MsgNode-2
>         +--Attribute1 - String
>         +--Attribute2 - Double
>         +--MsgNode-3 - List of MsgNode-3's
>           +--Attribute1 - Int

我计划使用spark streaming收集protobuf消息作为字节,并将它们存储在hbase/hdfs中。

gpnt7bae

gpnt7bae1#

问题1:
存储大量protobuf消息的有效方法是什么?检索它们进行分析的有效方法是什么?例如,它们应该在hbase中存储为strings/byte[],还是应该在hdfs中存储为parquet文件等。
我建议将proto buf存储为parquet avro文件(使用avro模式分割成有意义的消息)。
这可以通过使用DataFramesAPI spark 1.5及更高版本来实现( PartiotionBySaveMode.Append )
看到这个强大的大数据三重奏了吗
如果存储为字符串或字节数组,则无法直接进行数据分析(查询原始数据)。
如果您使用的是cloudera,那么可以使用impala(支持parquet avro)来查询rawdata。
问题2:
protobuf消息的层次结构应该如何存储?我的意思是,嵌套的元素应该在存储之前被展平,还是有任何机制可以按原样存储它们?如果嵌套元素是集合或Map,是否应将它们分解并存储为多行?
如果您以spark streaming的原始格式存储数据,您将如何查询业务部门是否希望查询并知道他们接收到的数据类型(这一要求非常常见)。
首先,您必须了解您的数据(即不同消息与in protobuf之间的关系,以便您可以决定单行或多行),然后开发protobuf解析器来解析protobuf的消息结构。根据您的数据,将其转换为avro通用记录以另存为parquet文件。

提示:

protobuf解析器可以根据您的需求以不同的方式开发。其中一种通用方法如下例所示。

public SortedMap<String, Object> convertProtoMessageToMap(GeneratedMessage src) {

    final SortedMap<String, Object> finalMap = new TreeMap<String, Object>();
    final Map<FieldDescriptor, Object> fields = src.getAllFields();

    for (final Map.Entry<FieldDescriptor, Object> fieldPair : fields.entrySet()) {

        final FieldDescriptor desc = fieldPair.getKey();

        if (desc.isRepeated()) {
            final List<?> fieldList = (List<?>) fieldPair.getValue();
            if (fieldList.size() != 0) {
                final List<String> arrayListOfElements = new ArrayList<String>();
                for (final Object o : fieldList) {
                    arrayListOfElements.add(o.toString());
                }
                finalMap.put(desc.getName(), arrayListOfElements);
            }
        } else {

            finalMap.put(desc.getName(), fieldPair.getValue().toString());

        }

    }
    return finalMap;
}

相关问题