使用spark流解析xml字符串

a64a0gku  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(521)

我正在尝试从mqtt代理接收xml:

val df = spark.readStream
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    .option...
    .load()
    .selectExpr("CAST(payload AS STRING)")

然后消耗它,例如:

val query = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

重点是,我想解析它(列 $payload 包含一个字符串),然后在输出/保存它之前将其转换(如json)。
我尝试了以下方法:
追加 .format("com.databricks.spark.xml") 但它不支持结构化流媒体。
自定义函数
使用 scala.xml.XML.loadString 给我一个 scala.MatchError: scala.xml.Elem (of class scala.reflect.internal.Types$ClassNoArgsTypeRef) (似乎不是udf函数的有效类型)
使用 import com.databricks.spark.xml.functions.from_xml 是有问题的,因为它需要一个先前的模式(我没有,我试着接受它) import com.databricks.spark.xml.functions.from_xml.schema_of_xml 有两种方法: .withColumn("xml", from_xml(col("payload"), schema_of_xml(Seq(col("payload").toString()).toDS))) 给我一个空栏。
使用 udf((s: String) => from_xml_string(s, schema_of_xml(Seq(s).toDS))) 给我一个 java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Row is not supported ,但我的数据没有预定义的schhema,所以。。。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题