我正在构建一个sparksql应用程序,它使用一个kafka主题,转换一些数据,然后用一个特定的json对象写回一个单独的kafka主题。
我已经完成了大部分工作-我可以消费,转换,并写回Kafka-这是在转换后编写的json对象的形状,我正在努力。
现在我可以查询/转换我想要的内容并编写它:
Dataset<Row> reader = myData.getRecordCount();
reader.select(to_json(struct("record_count")).alias("value"))
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "new_separate_topic")
.save();
这会在主题中生成如下记录:
{
"record_count": 989
}
我需要的是,让这一位json成为更大json对象的有效负载(子)属性,我们将其用作微服务的标准使用者对象。
我想写的主题实际上是这样的:
{
"id": "ABC123",
"timestamp": "2018-11-16 20:40:26.108",
"user": "DEF456",
"type": "new_entity",
"data": {
"record_count": 989
}
}
此外,“id”、“user”和“type”字段将从外部填充-它们将来自触发整个过程的原始kafka消息。基本上,我需要为要写入kafka的元数据/对象注入一些值,并将“data”字段设置为上面sparksql查询的结果。
这可能吗?怎样?谢谢!
1条答案
按热度按时间ffvjumwh1#
如果要添加新字段,则不能只选择一个字段。
e、 g.双方
write.format("kafka")
以及.select()
,你需要做一些withColumn()
```Dataset reader = myData.getRecordCount();
// Keep your DataSet as Columns
reader = reader.select("record_count"))
// Add more data
reader = reader.withColumn(...)
// Then convert structs to JSON and write the output
reader.select(to_json(...))
.write()
.format("kafka")