我的azure系统分为三个部分:
azure数据湖存储,我有一些csv文件。
azuredatabricks在这里我需要做一些处理-确切地说是将csv文件转换成redis散列格式。
azureredis缓存,我应该把转换后的数据放在那里。
在databricks文件系统中装载存储之后,需要处理一些数据。如何将databricks文件系统中的csv数据转换成redishash格式并正确地放入redis?具体来说,我不知道如何通过下面的代码来进行正确的Map。或者,可能有一些额外的传输到sql表的方法我找不到。
下面是我在scala上编写的代码示例:
import com.redislabs.provider.redis._
val redisServerDnsAddress = "HOST"
val redisPortNumber = 6379
val redisPassword = "Password"
val redisConfig = new RedisConfig(new RedisEndpoint(redisServerDnsAddress, redisPortNumber, redisPassword))
val data = spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/mnt/staging/data/file.csv")
// What is the right way of mapping?
val ds = table("data").select("Prop1", "Prop2", "Prop3", "Prop4", "Prop5" ).distinct.na.drop().map{x =>
(x.getString(0), x.getString(1), x.getString(2), x.getString(3), x.getString(4))
}
sc.toRedisHASH(ds, "data")
错误:
error: type mismatch;
found : org.apache.spark.sql.Dataset[(String, String)]
required: org.apache.spark.rdd.RDD[(String, String)]
sc.toRedisHASH(ds, "data")
如果我这样写最后一串代码:
sc.toRedisHASH(ds.rdd, "data")
错误:
org.apache.spark.sql.AnalysisException: Table or view not found: data;
1条答案
按热度按时间2skhul331#
准备一些示例数据来模拟从csv文件加载的数据。
转型:
将Dataframe写入redis,使用
Prop1
作为一把钥匙data
作为redis表名。见文件检查redis中的数据: