我正在尝试从mqtt代理接收xml:
val df = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option...
.load()
.selectExpr("CAST(payload AS STRING)")
然后消耗它,例如:
val query = df.writeStream
.outputMode("append")
.format("console")
.start()
重点是,我想解析它(列 $payload
包含一个字符串),然后在输出/保存它之前将其转换(如json)。
我尝试了以下方法:
追加 .format("com.databricks.spark.xml")
但它不支持结构化流媒体。
自定义函数
使用 scala.xml.XML.loadString
给我一个 scala.MatchError: scala.xml.Elem (of class scala.reflect.internal.Types$ClassNoArgsTypeRef)
(似乎不是udf函数的有效类型)
使用 import com.databricks.spark.xml.functions.from_xml
是有问题的,因为它需要一个先前的模式(我没有,我试着接受它) import com.databricks.spark.xml.functions.from_xml.schema_of_xml
有两种方法: .withColumn("xml", from_xml(col("payload"), schema_of_xml(Seq(col("payload").toString()).toDS)))
给我一个空栏。
使用 udf((s: String) => from_xml_string(s, schema_of_xml(Seq(s).toDS)))
给我一个 java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Row is not supported
,但我的数据没有预定义的schhema,所以。。。
暂无答案!
目前还没有任何答案,快来回答吧!