我有一个流数据集,从Kafka读取,并试图写入csv
case class Event(map: Map[String,String])
def decodeEvent(arrByte: Array[Byte]): Event = ...//some implementation
val eventDataset: Dataset[Event] = spark
.readStream
.format("kafka")
.load()
.select("value")
.as[Array[Byte]]
.map(decodeEvent)
``` `Event` 持有 `Map[String,String]` 在内部和写入csv我需要一些模式。
假设所有字段都是 `String` 所以我尝试了spark回购的例子
val columns = List("year","month","date","topic","field1","field2")
val schema = new StructType() //Prepare schema programmatically
columns.foreach { field => schema.add(field, "string") }
val rowRdd = eventDataset.rdd.map { event => Row.fromSeq(
columns.map(c => event.getOrElse(c, "")
)}
val df = spark.sqlContext.createDataFrame(rowRdd, schema)
这会在运行时的“eventdataset.rdd”行上出现错误:
原因:org.apache.spark.sql.analysisexception:流源查询必须使用writestream.start()执行;;
下面不起作用,因为“.map”有一个列表[string]而不是元组
eventDataset.map(event => columns.map(c => event.getOrElse(c,""))
.toDF(columns:_*)
有没有一种方法可以通过编程模式和结构化流数据集来实现这一点?
1条答案
按热度按时间xesrikrc1#
我会使用更简单的方法:
但是如果您想要更接近当前解决方案,请跳过rdd转换