我正在尝试使用spark streaming来读取Kafka主题中的数据。
来自kafka的消息是一个JSON,我将它作为String存储在下面数据集的value列中。
- 示例消息:只是一个示例,实际的json很复杂**
{
"Name": "Bauddhik",
"Profession": "Developer"
}
Dataset<Row> df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load()
.selectExpr("CAST(value AS STRING)");
现在我的数据集有一个包含整个JSON的值列,我需要选择一个字段作为关键字存储在Redis中。假设该字段是JSON中的"Name"。
因此,首先我在下面选择取出"名称"字段作为我的数据框中的新列。
Dataset<Row> df1 = df.select(functions.col("value"), functions.get_json_object(functions.col("value"), "$['name']").as("name");
这工作正常,现在我的df2看起来像
Value | name
<Json> | Bauddhik
现在我想把它插入到Redis缓存中,键为'Bauddhik',值为整个Json。所以我使用下面的foreachbatch选项来持久化Redis。
df1.writeStream().foreachbatch (
new VoidFunction2<Dataset<Row>, Long>()
{
public void call (Dataset<Row> dataset, Long batchId) {
dataset.write()
.format("org.apache.spark.sql.redis")
.option("key.coloum", **<hereistheissue>**)
.option("table","test")
.mode(SaveMode.Overwrite)
.save();
}
}).start()
如果你看上面的代码(这里是问题),我需要把键作为Bauddhik,它是我之前在数据框架中作为一个单独的列导出的。
我无法以字符串的形式检索name列,因此我无法将其作为键传递到Redis缓存。我尝试使用map和df. head(). getString(1),但似乎没有任何效果。
有人能指导我如何从数据集中读取一个列作为一个字符串,并传递到关键选项,同时写入Redis缓存。
1条答案
按热度按时间enyaitl31#
您在代码中拼错了column ...
.option(“键.列”,)
应该是..
.option(“键.列”,)