apachespark:kafka以自定义格式编写

ttcibm8c  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(359)

我正在构建一个sparksql应用程序,它使用一个kafka主题,转换一些数据,然后用一个特定的json对象写回一个单独的kafka主题。
我已经完成了大部分工作-我可以消费,转换,并写回Kafka-这是在转换后编写的json对象的形状,我正在努力。
现在我可以查询/转换我想要的内容并编写它:

Dataset<Row> reader = myData.getRecordCount();
reader.select(to_json(struct("record_count")).alias("value"))
    .write()
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "new_separate_topic")
    .save();

这会在主题中生成如下记录:

{
  "record_count": 989
}

我需要的是,让这一位json成为更大json对象的有效负载(子)属性,我们将其用作微服务的标准使用者对象。
我想写的主题实际上是这样的:

{
  "id": "ABC123",
  "timestamp": "2018-11-16 20:40:26.108",
  "user": "DEF456",
  "type": "new_entity",
  "data": {
      "record_count": 989
    }
}

此外,“id”、“user”和“type”字段将从外部填充-它们将来自触发整个过程的原始kafka消息。基本上,我需要为要写入kafka的元数据/对象注入一些值,并将“data”字段设置为上面sparksql查询的结果。
这可能吗?怎样?谢谢!

ffvjumwh

ffvjumwh1#

如果要添加新字段,则不能只选择一个字段。
e、 g.双方 write.format("kafka") 以及 .select() ,你需要做一些 withColumn() ```
Dataset reader = myData.getRecordCount();
// Keep your DataSet as Columns
reader = reader.select("record_count"))

// Add more data
reader = reader.withColumn(...)

// Then convert structs to JSON and write the output
reader.select(to_json(...))
.write()
.format("kafka")

“id”、“user”和“type”字段将从外部填充-它们将来自触发整个过程的原始kafka消息
那么你需要包括 `select("id", "user", "type")` 在你的代码里
另一种选择是使用kafka流,而不是被迫生成数据集,您可以使用实际的java类/jsonobjects

相关问题