我在hadoop3.0.0和spark2.2.0中使用以下scala代码处理Dataframe。baq是ids的一个列,aaa是dateyymdd的一个字符串列。
scala> val dtfAbnoFirs=dtfAbno.filter("AAA>='20201201' and BAQ<>'0'").
| groupBy("BAQ").agg("AAA"->"min");
dtfAbnoFirs: org.apache.spark.sql.DataFrame = [BAQ: string, min(AAA): string]
scala> val dtfBase = dtfCons.select("BAQ","AAA").distinct().filter("BAQ<>'0'").
| join(dtfAbnoFirs,Seq("BAQ"),"inner");
dtfBase: org.apache.spark.sql.DataFrame = [BAQ: string, AAA: string ... 1 more field]
scala> val dtfBaseEsti=dtfBase.filter("AAA<min(AAA)");
dtfBaseEsti: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [BAQ: string, AAA: string ... 1 more field]
scala> dtfBaseEsti.show(10,false);
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(BAQ#12, 200)
它通过了filter(“aaa<min(aaa)”),但是当我想在这个filter之后显示数据行时出错了。
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(BAQ#12, 200)
+- *HashAggregate(keys=[BAQ#12, AAA#13], functions=[], output=[BAQ#12, AAA#13])
....
....
Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: min(input[1, string, false])
这是一些错误。我用aaa作为double重新创建了Dataframe,得到了相同的错误。
1条答案
按热度按时间g52tjvyc1#
min(aaa)在最后一个语句中是一个列名,您必须用反勾号(`)引用它。spark正在尝试解析它