从Spark数据集中检索String类型的列作为String变量,并将其作为Redis缓存的“key”传递

b91juud3  于 2023-02-28  发布在  Redis
关注(0)|答案(1)|浏览(100)

我正在尝试使用spark streaming来读取Kafka主题中的数据。
来自kafka的消息是一个JSON,我将它作为String存储在下面数据集的value列中。

    • 示例消息:只是一个示例,实际的json很复杂**
{
    "Name": "Bauddhik",
    "Profession": "Developer"
  }
Dataset<Row> df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(value AS STRING)");

现在我的数据集有一个包含整个JSON的值列,我需要选择一个字段作为关键字存储在Redis中。假设该字段是JSON中的"Name"。
因此,首先我在下面选择取出"名称"字段作为我的数据框中的新列。

Dataset<Row> df1 = df.select(functions.col("value"), functions.get_json_object(functions.col("value"), "$['name']").as("name");

这工作正常,现在我的df2看起来像

Value         |       name
<Json>        |     Bauddhik

现在我想把它插入到Redis缓存中,键为'Bauddhik',值为整个Json。所以我使用下面的foreachbatch选项来持久化Redis。

df1.writeStream().foreachbatch (
 new VoidFunction2<Dataset<Row>, Long>()
{
public void call (Dataset<Row> dataset, Long batchId) {
  dataset.write()
.format("org.apache.spark.sql.redis")
.option("key.coloum", **<hereistheissue>**)
.option("table","test")
.mode(SaveMode.Overwrite)
.save();
}
}).start()

如果你看上面的代码(这里是问题),我需要把键作为Bauddhik,它是我之前在数据框架中作为一个单独的列导出的。
我无法以字符串的形式检索name列,因此我无法将其作为键传递到Redis缓存。我尝试使用map和df. head(). getString(1),但似乎没有任何效果。
有人能指导我如何从数据集中读取一个列作为一个字符串,并传递到关键选项,同时写入Redis缓存。

enyaitl3

enyaitl31#

您在代码中拼错了column ...
.option(“键.列”,)
应该是..
.option(“键.列”,)

相关问题