我正在使用spark-sql-2.4.1v和java8。我有一个场景,其中我将被传递列名为list/seq,对于那些列,我只需要执行某些操作,如sum、avg、percentages等。
在我的场景中,假设我有第1列、第2列、第3列。我将第一次传递column1 name。
将拉取/选择“column1”数据并基于“column1”执行一些操作。第二次我将传递column2 name,但这次没有提取较早的column1,因此我的数据集不包含“column1”,因此较早的条件将中断,并出现错误“analysisexception:cannot resolve” column1
'给定输入列'。
因此,我需要检查列,如果某个列存在,则只执行与该列相关的操作,否则忽略这些操作。
如何在spark中执行此操作?
数据库中的示例数据。
val data = List(
("20", "score", "school", "2018-03-31", 14 , 12 , 20),
("21", "score", "school", "2018-03-31", 13 , 13 , 21),
("22", "rate", "school", "2018-03-31", 11 , 14, 22),
("21", "rate", "school", "2018-03-31", 13 , 12, 23)
)
val df = data.toDF("id", "code", "entity", "date", "column1", "column2" ,"column3")
.select("id", "code", "entity", "date", "column2") /// these are passed for each run....this set will keep changing.
Dataset<Row> enrichedDs = df
.withColumn("column1_org",col("column1"))
.withColumn("column1",
when(col("column1").isNotNull() , functions.callUDF("lookUpData",col("column1").cast(DataTypes.StringType)))
);
上述逻辑仅在选择列中的“column1”可用时适用。这在第二个集合中是失败的,因为“column1”不是select,所以我需要理解为什么这只适用于作为“column1”的selected列。我需要一些逻辑来实现这一点。
2条答案
按热度按时间xv8emn3q1#
检查这是否有用-
you can filter out columns, and process only valid columns
```df.show(false)
/**
* +---+-----+------+----------+-------+-------+-------+
* |id |code |entity|date |column1|column2|column3|
* +---+-----+------+----------+-------+-------+-------+
* |20 |score|school|2018-03-31|14 |12 |20 |
* |21 |score|school|2018-03-31|13 |13 |21 |
* |22 |rate |school|2018-03-31|11 |14 |22 |
* |21 |rate |school|2018-03-31|13 |12 |23 |
* +---+-----+------+----------+-------+-------+-------+
/
// list of columns
val cols = Seq("column1", "column2" ,"column3", "column4")
val processColumns = cols.filter(df.columns.contains).map(sqrt)
df.select(processColumns: _).show(false)
clj7thdc2#
不确定我是否完全理解您的需求,但您是否只是尝试执行一些条件操作,具体取决于Dataframe中哪些列在执行之前不知道?
如果是这样,dataframe.columns将返回一个列列表,您可以对这些列进行相应的分析和选择
即