我已经创建了一个如下所示的Dataframe,在这里我使用了\u json()方法来创建json数组值。
+----------------------------------------------------------------------------------------------------
|json_data |
+-----------------------------------------------------------------------------------------------------------+
|{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}|
+-----------------------------------------------------------------------------------------------------------+
我使用下面的方法将Dataframe发送到Kafka主题。但是当我使用发送到kafka主题的数据时,我可以看到json数据被字符串化了。
将数据推送到Kafka的代码:
outgoingDF.selectExpr("CAST(Key as STRING) as key", "to_json(struct(*)) AS value")
.write
.format("kafka")
.option("topic", "topic_test")
.option("kafka.bootstrap.servers", "localhost:9093")
.option("checkpointLocation", checkpointPath)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("truncate", false)
.save()
Kafka接收到的字符串化数据:
{
"name": "sensor1",
"value-array": "[{\"time\":\"2020-11-27T01:01:00.000Z\",\"sensorvalue\":11.0,\"tag1\":\"tagvalue\"}]"
}
我们如何将数据发送到kafka主题,以便不将字符串化的json视为输出?
1条答案
按热度按时间z2acfund1#
json_data
属于类型string
&你又路过了json_data
至to_json(struct("*"))
功能。检查
value
去Kafka的专栏。试试下面的代码。