使用BigDecimal、scale和precision创建spark数据框

p5cysglq  于 2023-08-06  发布在  Apache
关注(0)|答案(1)|浏览(134)

我正在尝试使用Spark java在java代码中创建一个parquet文件。我有一个字段是浮点型的,我想指定刻度和精度,而不是spark设置的默认值。
当尝试使用自定义模式创建数据框时,出现以下错误:
org.apache.spark.SparkException:[INTERNAL_ERROR] Spark SQL阶段分析失败,出现内部错误。你在Spark或你使用的Spark插件中遇到了一个bug。请将此错误报告给相应的社区或供应商,并提供完整的堆栈跟踪。]
我使用的是Java 17和spark-core_2.13:3.4.1spark-sql_2.13:3.4.1(Java 13和spark-*:3.4.0也有同样的问题。
我如何创建一个自定义模式并为我的浮点字段指定小数位数和精度?
代码示例

// Person class
public class Person {
  private String name;
  private int age;
  private String address;

  private BigDecimal balance;
  private BigDecimal other;
}

// schema method
public static StructType schema() {
        StructType personStruct =
            new StructType(new StructField[] {
                new StructField("name", StringType, true, null), // Replace with your other fields
                new StructField("age", IntegerType, true, null), // Replace with your other fields
                new StructField("address", StringType, true, null), // Replace with your other fields
                new StructField("balance", DecimalType.apply(5, 3), true, null), // Replace with your other fields
                new StructField("other", DecimalType.apply(10, 5), true, null),
                // Add other fields with appropriate data types and names
            });
        return personStruct;
}

public static void main() {
List<Person> people = List.of(Person.builder().name("Alica").age(12).address("haifa")
            .balance(Person.getBalance(12.3)).build(),
        Person.builder().name("Haim").age(12).address("Tel Aviv").balance(Person.getBalance(1000.213)).build(),
        Person.builder().name("Baruch").age(12).address("Ramat Gan").balance(Person.getBalance(0.213))
            .other(Person.getOther(5.0004)).build());

    Dataset<Row> peopleDF = sparkSession.createDataFrame(people, Person.class);
    Dataset<Row> peopleDf2 = sparkSession.createDataFrame(peopleDF.rdd(), schema()); // exception thrown here
}

字符串
Stacktrace:

java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.types.Metadata.contains(String)" because the return value of "org.apache.spark.sql.catalyst.expressions.AttributeReference.metadata()" is null
    at org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin$.org$apache$spark$sql$execution$analysis$DetectAmbiguousSelfJoin$$isColumnReference(DetectAmbiguousSelfJoin.scala:48) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin$$anonfun$stripColumnReferenceMetadataInPlan$1.applyOrElse(DetectAmbiguousSelfJoin.scala:168) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin$$anonfun$stripColumnReferenceMetadataInPlan$1.applyOrElse(DetectAmbiguousSelfJoin.scala:167) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsDownWithPruning$1(QueryPlan.scala:166) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:207) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:207) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:218) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:223) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at scala.collection.immutable.List.map(List.scala:246) ~[scala-library-2.13.8.jar:na]
    at scala.collection.immutable.List.map(List.scala:79) ~[scala-library-2.13.8.jar:na]
    at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:223) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:228) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:355) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:228) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDownWithPruning(QueryPlan.scala:166) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsWithPruning(QueryPlan.scala:137) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:115) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin$.stripColumnReferenceMetadataInPlan(DetectAmbiguousSelfJoin.scala:167) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin$.apply(DetectAmbiguousSelfJoin.scala:81) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin$.apply(DetectAmbiguousSelfJoin.scala:44) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:169) ~[scala-library-2.13.8.jar:na]
    at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:165) ~[scala-library-2.13.8.jar:na]
    at scala.collection.immutable.List.foldLeft(List.scala:79) ~[scala-library-2.13.8.jar:na]
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at scala.collection.immutable.List.foreach(List.scala:333) ~[scala-library-2.13.8.jar:na]
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:228) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:224) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:224) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) ~[spark-catalyst_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:90) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.SparkSession.internalCreateDataFrame(SparkSession.scala:571) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.SparkSession.$anonfun$createDataFrame$3(SparkSession.scala:365) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:358) ~[spark-sql_2.13-3.4.1.jar:3.4.1]
    at com.example.parquet.ParquetService.writeParquet(ParquetService.java:60) ~[main/:na]
    at com.example.parquet.ParquetController.writeParquet(ParquetController.java:14) ~[main/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-5.3.22.jar:5.3.22]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-5.3.22.jar:5.3.22]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) ~[spring-webmvc-5.3.22.jar:5.3.22]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) ~[spring-webmvc-5.3.22.jar:5.3.22]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.22.jar:5.3.22]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.22.jar:5.3.22]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1070) ~[spring-webmvc-5.3.22.jar:5.3.22]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963) ~[spring-webmvc-5.3.22.jar:5.3.22]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.22.jar:5.3.22]
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.22.jar:5.3.22]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:497) ~[jakarta.servlet-api-4.0.4.jar:4.0.4]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.22.jar:5.3.22]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:584) ~[jakarta.servlet-api-4.0.4.jar:4.0.4]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.22.jar:5.3.22]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.22.jar:5.3.22]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.22.jar:5.3.22]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.22.jar:5.3.22]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.22.jar:5.3.22]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.22.jar:5.3.22]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:360) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:399) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:890) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1789) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-9.0.65.jar:9.0.65]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

mnemlml8

mnemlml81#

Dataset<Row> peopleDF = sparkSession.createDataFrame(people, Person.class);
Dataset<Row> peopleDf2 = peopleDF.selectExpr("name","address","age", "cast(balance as DECIMAL(5,3)) balance", "cast(other as DECIMAL(10,5)) other");

字符串
当然,你也可以使用/Column DSL函数。

相关问题