spark 2.1.1 structure streaming中读取kafka avro消息的java代码

des4xlb0  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(274)

我正在寻找如何读取avro消息,其中有复杂的结构,从Kafka使用Spark结构流
然后我想解析这些消息并与hbase引用值进行比较,然后将结果保存到hdfs或另一个hbase表中。
我从以下示例代码开始:https://github.com/neuw84/spark-continuous-streaming/blob/master/src/main/java/es/aconde/structured/structureddemo.java
avro消息架构:

struct[mTimeSeries:
  struct[cName:string,
         eIpAddr:string,
         pIpAddr:string,
         pTime:string,
         mtrcs:array[struct[mName:string,
                            xValues:array[bigint],
                            yValues:array[string],
                            rName:string]]]]

我正在努力为此架构使用rowfactory.create创建一行。所以我需要遍历数组字段吗?我知道我们可以使用数据集上的explode函数来反规范化或访问struct数组的内部字段,一旦我们用这个结构创建数据集,就像我在hive中所做的那样。所以我想按原样创建一行,即avro消息的外观,然后使用sql函数进一步转换。

sparkSession.udf().register("deserialize", (byte[] data) -> {
        GenericRecord record = recordInjection.invert(data).get();
        return***RowFactory.create(record.get("machine").toString(), record.get("sensor").toString(), record.get("data"), record.get("eventTime"));***
    }, DataTypes.createStructType(type.fields())

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题