我使用spark 2.1.1。
我有以下几点 DataSet<Row>
ds1型;
name | ratio | count // column names
"hello" | 1.56 | 34
( ds1.isStreaming
给予 true
)
我正在努力创造 DataSet<String>
ds2。换句话说,当我写KafkaFlume我想写这样的东西
{"name": "hello", "ratio": 1.56, "count": 34}
我试过这样的方法 df2.toJSON().writeStream().foreach(new KafkaSink()).start()
但它给出了以下错误
Queries with streaming sources must be executed with writeStream.start()
有 to_json
以及 json_tuple
然而,我不知道如何利用他们在这里?
我试着用 json_tuple()
功能
Dataset<String> df4 = df3.select(json_tuple(new Column("result"), " name", "ratio", "count")).as(Encoders.STRING());
我得到以下错误:
无法解析' result
'给定的输入列:[name,ratio,count];;
1条答案
按热度按时间vmdwslir1#
热释光;dr使用
struct
函数后跟to_json
(作为toJSON
由于spark-17029(20天前刚刚修复),流式数据集已损坏)。引用struct的scaladoc:
struct(colname:string,colnames:string*):column创建一个由多个输入列组成的新struct列。
如果您使用java api,那么struct function也有4种不同的变体:
公共静态列结构(列。。。cols)创建新的结构列。
使用tojson函数,您的案例包括:
public static column to \u json(column e)将包含structtype的列转换为具有指定模式的json字符串。
以下是scala代码(将其转换为java是您的家庭练习):
我还尝试了一下您的解决方案(使用今天构建的spark2.3.0-snapshot),它似乎工作得非常完美。