在我之前的问题(使用redis的spark结构化流式动态查找)中,由于https://stackoverflow.com/users/689676/fe2s
我尝试使用mappartitions,但我无法解决一个问题,即如何在迭代时达到下面代码部分中的每行每列。因为我想根据redis中的查找字段来丰富我的每行。我发现了类似的东西,但是我如何能够访问dataframe列并添加新的列来查找redis。我非常感谢你的帮助,谢谢。
import org.apache.spark.sql.types._
def transformRow(row: Row): Row = {
Row.fromSeq(row.toSeq ++ Array[Any]("val1", "val2"))
}
def transformRows(iter: Iterator[Row]): Iterator[Row] =
{
val redisConn =new RedisClient("xxx.xxx.xx.xxx",6379,1,Option("Secret123"))
println(redisConn.get("ModelValidityPeriodName").getOrElse(""))
//want to reach DataFrame column here
redisConn.close()
iter.map(transformRow)
}
val newSchema = StructType(raw_customer_df.schema.fields ++
Array(
StructField("ModelValidityPeriod", StringType, false),
StructField("ModelValidityPeriod2", StringType, false)
)
)
spark.sqlContext.createDataFrame(raw_customer_df.rdd.mapPartitions(transformRows), newSchema).show
1条答案
按热度按时间dxpyg8gm1#
迭代器
iter
表示Dataframe行上的迭代器。如果我答对了你的问题,你可以通过迭代访问列值iter
打电话来像这样的