找不到spark complaining列,即使错误消息显示它在那里

fhg3lkii  于 2023-08-06  发布在  Apache
关注(0)|答案(2)|浏览(112)

I am trying to flatten a nested json to individual columns and apply transformation.In the real scenario its a nested json so i cant predict what columns are nested ahead so i m using recursive looping to traverse through the json and flatten it out.below is the sample scenario

INPUT JSON

{
    "order": {      
        "name": "John Doe",
        "address": {
            "state": "NY"
        },
        "orders": [
            {
                "order_id": 1001,
                "quarter": "2023-06-30"
            },
            {
                "order_id": 1002,
                "quarter": "2023-06-29"
            }
        ]
    }
}

After flattening it display below

+-------------------+----------+---------------------+------------------------+
|order.address.state|order.name|order.orders.order_id|order.orders.quarter    |
+-------------------+----------+---------------------+------------------------+
|NY                 |John Doe  |[1001, 1002]         |[2023-06-30, 2023-06-29]|
|NY                 |John Doe  |[1001, 1002]         |[2023-06-30, 2023-06-29]|
+-------------------+----------+---------------------+------------------------+

But when i try to transform the column it complains the orders.order.state is missing even though error message list this specific column as one of the columns !!!!
Exception in thread "main" org.apache.spark.sql.AnalysisException: Column 'order.address.state' does not exist. Did you mean one of the following? [order.address.state, order.orders.quarter, order.name, order.orders.order_id];

CODE SNIPPET

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Column;

import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class JsonFlattener {

    // UDF to transform values based on the config file
    private static UserDefinedFunction udfTransform = functions.udf(
        (UDF1<Object, Object>) value -> {
            // Your transformation logic based on the config file
            // Here, we're simply appending 'transformed_' prefix to the value
            return "transformed_" + value;
        }, DataTypes.StringType);

    public static void main(String[] args) throws IOException {
        SparkSession spark = SparkSession.builder()
            .appName("JsonFlattener")
            .master("local")
            .getOrCreate();

        spark.udf().register("getQuarterOfYearUDF", getQuarterOfYearUDF, DataTypes.IntegerType);

        // Read the JSON file
        Dataset<Row> input = spark.read().option("multiLine","true").option("inferSchema","true").json("/home/akannan/INPUT/orders2.json");

        input.show(false);

        // Flatten the nested JSON recursively based on the level of nesting
        Dataset<Row> flattened = flattenNestedJson(input);
        flattened.printSchema();
        flattened.show(false);

        // Read the config file to determine which columns need transformation
        String configPath = "/home/akannan/INPUT/orders_modify.json"; // Specify the path to your config file
        String configFileContents = new String(Files.readAllBytes(Paths.get(configPath)));
        Gson gson = new Gson();
        Type listType = new TypeToken<List<UDFFullReplace.FieldTransformation>>() {
        }.getType();
        List<UDFFullReplace.FieldTransformation> transformations = gson.fromJson(configFileContents, listType);

        // Apply the UDF transformation to the specified columns
        Dataset<Row> transformed = applyTransformation(flattened, transformations);

        transformed.show(false);
        transformed.printSchema();
        // Unflatten the DataFrame back to the original JSON format
        Dataset<Row> unflattened = unflattenJson(transformed);

        // Show the result
        unflattened.show();
    }

    private static Dataset<Row> flattenNestedJson(Dataset<Row> input) {
        StructType schema = input.schema();
        return flattenNestedJson(input, "", schema);
    }

    private static Dataset<Row> flattenNestedJson(Dataset<Row> input, String prefix, DataType dataType) {
        if (dataType instanceof StructType) {
            StructType structType = (StructType) dataType;
            for (StructField field : structType.fields()) {
                String columnName = field.name();
                String fullColumnName = prefix.isEmpty() ? columnName : prefix + "." + columnName;
                DataType columnDataType = field.dataType();
                input = flattenColumn(input, fullColumnName, columnDataType);
            }
            input = input.drop(prefix);
        } else if (dataType instanceof ArrayType) {
            ArrayType arrayType = (ArrayType) dataType;
            DataType elementType = arrayType.elementType();
//            input = input.withColumn(prefix, functions.explode(functions.col(prefix)))
//                .selectExpr("*", prefix + ".*")
//                .drop(prefix);
            input = input.withColumn(prefix, functions.explode_outer(functions.col(prefix)));

            input = flattenColumn(input, prefix, elementType);
        }
        return input;
    }

    private static Dataset<Row> flattenColumn(Dataset<Row> input, String columnName, DataType columnDataType) {
        if (columnDataType instanceof StructType) {
            input = flattenNestedJson(input, columnName, columnDataType);
        } else if (columnDataType instanceof  ArrayType) {
            input = flattenNestedJson(input, columnName, columnDataType);
        }
        else {
            input = input.withColumn(columnName, functions.col(columnName));
        }
        return input;
    }

    private static Dataset<Row> applyTransformation(Dataset<Row> input, List<UDFFullReplace.FieldTransformation> transformations) {
        input.cache();
        for (UDFFullReplace.FieldTransformation transformation : transformations) {
            String[] fieldNames = transformation.getName().split("\\.");
            String columnName = transformation.getName();
            String udf = transformation.getUdf();

//            input = input.withColumn(columnName, udfTransform.apply(functions.col(columnName)));
            input =  input.withColumn(columnName, functions.concat(functions.col(columnName), functions.lit("_suffix")));
        }
        return input;

    }

    private static Dataset<Row> unflattenJson(Dataset<Row> input) {
        // Group the columns based on the nested structure and create a struct
        StructType schema = input.schema();
        StructField[] fields = schema.fields();
        Column[] columns = new Column[fields.length];
        for (int i = 0; i < fields.length; i++) {
            columns[i] = functions.col(fields[i].name());
        }

        // Create a single column named 'value' with the struct
        //Dataset<Row> withStructColumn = input.withColumn("value", functions.struct(columns));
        Dataset<Row> withStructColumn = input.withColumn("value", functions.struct(
            input.col("order.address.state").alias("state"),
            input.col("order.name").alias("name"),
            input.col("order.orders.order_id").alias("order_id"),
            input.col("order.orders.quarter").alias("quarter")
        ));

        // Convert the DataFrame to a JSON string
        Dataset<Row> jsonString = withStructColumn.select(functions.to_json(functions.col("value")).alias("json"));

        // Parse the JSON string and extract the struct column
        return jsonString.select(functions.from_json(functions.col("json"), schema).alias("value")).select("value.*");
    }

    static UDF1<String, Integer> getQuarterOfYearUDF = (dateString) -> {
        String[] parts = dateString.split("-");
        int month = Integer.parseInt(parts[1]);
        int quarter = (month - 1) / 3 + 1;
        return quarter;
    };
}

ERROR STACK

Exception in thread "main" org.apache.spark.sql.AnalysisException: Column 'order.address.state' does not exist. Did you mean one of the following? [order.address.state, order.orders.quarter, order.name, order.orders.order_id]; 'Project [concat('order.address.state, _suffix) AS order.address.state#95, order.name#15, order.orders.order_id#27, order.orders.quarter#34] +- Project [order.address.state#9, order.name#15, order.orders.order_id#27, order.orders.quarter#34] +- Project [order#0, order.address.state#9, order.name#15, order.orders.order_id#27, order.orders.quarter#34] +- Project [order#0, order.address.state#9, order.name#15, order.orders#22, order.orders.order_id#27, order#0.orders.quarter AS order.orders.quarter#34] +- Project [order#0, order.address.state#9, order.name#15, order.orders#22, order#0.orders.order_id AS order.orders.order_id#27] +- Project [order#0, order.address.state#9, order.name#15, order.orders#22] +- Generate explode(order#0.orders), true, [order.orders#22] +- Project [order#0, order.address.state#9, order#0.name AS order.name#15] +- Project [order#0, order#0.address.state AS order.address.state#9] +- Relation [order#0] json

at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7(CheckAnalysis.scala:199)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7$adapted(CheckAnalysis.scala:192)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6(CheckAnalysis.scala:192)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6$adapted(CheckAnalysis.scala:192)
at scala.collection.immutable.Stream.foreach(Stream.scala:533)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:192)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:101)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:101)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:96)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:187)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:210)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3887)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1519)
at org.apache.spark.sql.Dataset.withColumns(Dataset.scala:2542)
at org.apache.spark.sql.Dataset.withColumn(Dataset.scala:2480)
at org.nypd.nextgen.data.pipeline.JsonFlattener.applyTransformation(JsonFlattener.java:116)
at org.nypd.nextgen.data.pipeline.JsonFlattener.main(JsonFlattener.java:57)
iklwldmw

iklwldmw1#

你的代码并不完整,但我在这里看到的是列名中的点的问题。
尽量避免点通常用于获得嵌套字段。一般不要使用任何特殊字符,只使用下划线。
如果您确实希望保留点,那么在访问指定列时,请使用反引号(`

flattened.withColumn("test", functions.col("`order.address.state`"));

字符串

ylamdve6

ylamdve62#

我看到你写了非常大的逻辑来扁平化数组列。您可以使用inline函数来展平行中的数组列,如下所示。
使用.访问struct & array中的任何列。

scala> df.printSchema
root
 |-- order: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- orders: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- order_id: long (nullable = true)
 |    |    |    |-- quarter: string (nullable = true)

scala> df.select("order.*").selectExpr("*", "inline(orders)").drop("orders").printSchema
root
 |-- address: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |-- name: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- quarter: string (nullable = true)

scala> df.select("order.*").selectExpr("*", "inline(orders)").drop("orders").show(false)
+-------+--------+--------+----------+
|address|name    |order_id|quarter   |
+-------+--------+--------+----------+
|{NY}   |John Doe|1001    |2023-06-30|
|{NY}   |John Doe|1002    |2023-06-29|
+-------+--------+--------+----------+

字符串

相关问题