如何在spark.sql.function.from_csv中设置schema?

uqdfh47h  于 2023-08-06  发布在  Apache
关注(0)|答案(1)|浏览(95)

我在windows 11上使用spark-3.4.1-hadoop 3。我尝试生成要传递给from_csv函数参数的模式。下面是我的代码。

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.from_csv;
import static org.apache.spark.sql.functions.not;

import java.util.HashMap;
import java.util.Map;

SparkSession spark = SparkSession.builder().appName("FromCsvStructExample").getOrCreate();

Dataset<Row> df = spark.read().format("csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load("/path/to/csv/file");

Map<String, String> options = new HashMap<String, String>();

String schemaString = "name string, age int, job string";

Column schema = from_csv(col("csv"), col(schemaString), options);
Dataset<Row> parsed = df.select(schema.as("data"));
parsed.printSchema();
spark.close();

字符串
但代码会抛出以下异常。

Exception in thread "main" org.apache.spark.sql.AnalysisException: [INVALID_SCHEMA.NON_STRING_LITERAL] The input schema "name string, age int, job string" is not a valid schema string. The input expression must be string literal and not null.
        at org.apache.spark.sql.errors.QueryCompilationErrors$.unexpectedSchemaTypeError(QueryCompilationErrors.scala:1055)    
        at org.apache.spark.sql.catalyst.expressions.ExprUtils$.evalTypeExpr(ExprUtils.scala:42)
        at org.apache.spark.sql.catalyst.expressions.ExprUtils$.evalSchemaExpr(ExprUtils.scala:47)
        at org.apache.spark.sql.catalyst.expressions.CsvToStructs.<init>(csvExpressions.scala:72)
        at org.apache.spark.sql.functions$.from_csv(functions.scala:4955)
        at org.apache.spark.sql.functions.from_csv(functions.scala)
        at com.aaa.etl.processor.Test_CSV.main(Test_CSV.java:43)


我担心schemaString对于org.apache.spark.sql.functions.col函数是不正确的。请告诉我如何用org.apache.spark.sql.functions.col函数生成模式。我知道有一个重载的from_csv函数,其模式参数类型是StructType。但是在使用这个函数时,我必须使scala函数化,我甚至没有scala的基本知识。

==更新部分

我尝试使用Java特定的from_csv方法。

from_csv(Column e, Column schema, java.util.Map<String,String> options)


如你所知,模式的类型不是StructType,而是Column。我在这一部分卡住了。我不知道如何在java中生成列类型模式。如果你有任何的参考产生的java列类型模式,请告诉我如何.

v6ylcynt

v6ylcynt1#

你是对的,你不能直接生成一个Column给定的DDL字符串。一种方法是使用lit或StructType.fromDDL函数。正如您已经提到的,from_csv函数的一个签名接受模式的StructType。然后Scala代码如下所示:

import org.apache.spark.sql.types.StructType

var schema: StructType = StructType.fromDDL("name string, age int, job string")

// StructType(
//   StructField(name,StringType,true),
//   StructField(age,IntegerType,true),
//   StructField(job,StringType,true)
// )

val targetCol = from_csv(col("csv"), schema, options)

字符串
代码应该与Java非常相似。
根据from_csv的另一个签名,它接受Column而不是StructType,它与相应的单元测试中所示的lit函数结合使用。这适用于您希望将模式作为字符串传递的情况。
你的情况是:

val schema = "name string, age int, job string"

val targetCol = from_csv(col("csv"), lit(schema), options)

相关问题