我的设置
我正在使用以下组件
- Spark芯_2.10
- spark-sql_2.10
我的问题
这基本上是我的代码
Dataset<Row> rowsSource = spark.read()
.option("header", "true")
.option("delimiter", ";")
.csv("source.csv");
Dataset<Row> rowsTarget = spark.read()
.option("header", "true")
.option("delimiter", ";")
.csv("target.csv");
rowsSource.createOrReplaceTempView("source");
rowsTarget.createOrReplaceTempView("target");
Dataset<Row> result = spark.sql("SELECT source.Id FROM source" +
" LEFT OUTER JOIN target USING(Id)" +
" WHERE target.Id IS NULL");
result.show();
这里有一些测试数据:
资料来源:
“ID”;“状态”
“% 1”;“错误”
“2”;“OK”
目标:
“ID”;“状态”
“2”;“OK”
我希望SQL语句只找到一个ID,即“1”
但是如果我运行它,在执行SQL语句的行中发生异常,
2017-03-21 17:00:09,693 INFO [main] com.materna.mobility.smart.selenium.Aaaa: starting
Exception in thread "main" org.apache.spark.sql.AnalysisException: USING column `Detail` cannot be resolved on the left side of the join. The left-side columns: ["Detail", Detailp, Detaild, Detailb, Amount - 2016 48 +0100/1, Amount - 2016 49 +0100/1, Amount - 2016 50 +0100/1, Amount - 2016 51 +0100/1, Amount - 2016 52 +0100/1];
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90$$anonfun$apply$56.apply(Analyzer.scala:1977)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90$$anonfun$apply$56.apply(Analyzer.scala:1977)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90.apply(Analyzer.scala:1976)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90.apply(Analyzer.scala:1975)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$commonNaturalJoinProcessing(Analyzer.scala:1975)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$$anonfun$apply$31.applyOrElse(Analyzer.scala:1961)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$$anonfun$apply$31.applyOrElse(Analyzer.scala:1958)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:58)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:58)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$.apply(Analyzer.scala:1958)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$.apply(Analyzer.scala:1957)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:62)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:63)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
at MyClass.main(MyClass.java:48)
如果我在ID之前插入一个额外的后缀(;),一切都按预期工作,下面是一个例子:
;“ID”;“状态”
我认为Spark会解析3个列,但由于第一个列无效,所以它会被忽略。
2条答案
按热度按时间nbysray51#
使用Notepad删除BOM,右键单击文件,然后选择“使用Notepadd编辑",然后选择标签“编码”上面>选择“转换为UTF-8 >按Ctrl+S保存。
当我使用Spark读取CSV文件时,它在我的第一个列名之前添加了一个问号。“?[my_column]”
e0uiprwp2#
问题
我的CSV文件有一个BOM(字节顺序标记)包括(我刚刚发现)
字节顺序标记(BOM)是一个Unicode字符,U+FEFF字节顺序标记(BOM),它在文本流的开头作为一个幻数出现,可以向使用该文本的程序发出几个信号
在做了更多的搜索后,我发现了这个问题:https://github.com/databricks/spark-csv/issues/142
显然,这是自2015年以来的一个问题。
修复
最简单的方法是从文件中删除BOM表。
我发现的另一个修复方法(见上面的问题)是,你可以在第一个列名前面添加一个额外的后缀。显然,它会再解析一个列,但第一个列是无效的,会被忽略。然而:我强烈建议你不要使用这个方法,因为它可能在将来被修复,上面的解决方案更可靠
可视化
维基百科指出,如果我使用UTF-8(我这样做了),我应该在我的文件“EF BB BF”的前面期待以下字符(十六进制)
在这里,您可以看到我期望的CSV文件应该是什么样子(因为我还不知道它们有BOM),但实际上它们是什么样子
由于我缺乏声誉,我不能发布内容的图像,但在这里你去: