I am trying to flatten a nested json to individual columns and apply transformation.In the real scenario its a nested json so i cant predict what columns are nested ahead so i m using recursive looping to traverse through the json and flatten it out.below is the sample scenario
INPUT JSON
{
"order": {
"name": "John Doe",
"address": {
"state": "NY"
},
"orders": [
{
"order_id": 1001,
"quarter": "2023-06-30"
},
{
"order_id": 1002,
"quarter": "2023-06-29"
}
]
}
}
After flattening it display below
+-------------------+----------+---------------------+------------------------+
|order.address.state|order.name|order.orders.order_id|order.orders.quarter |
+-------------------+----------+---------------------+------------------------+
|NY |John Doe |[1001, 1002] |[2023-06-30, 2023-06-29]|
|NY |John Doe |[1001, 1002] |[2023-06-30, 2023-06-29]|
+-------------------+----------+---------------------+------------------------+
But when i try to transform the column it complains the orders.order.state is missing even though error message list this specific column as one of the columns !!!!
Exception in thread "main" org.apache.spark.sql.AnalysisException: Column 'order.address.state' does not exist. Did you mean one of the following? [order.address.state, order.orders.quarter, order.name, order.orders.order_id];
CODE SNIPPET
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Column;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class JsonFlattener {
// UDF to transform values based on the config file
private static UserDefinedFunction udfTransform = functions.udf(
(UDF1<Object, Object>) value -> {
// Your transformation logic based on the config file
// Here, we're simply appending 'transformed_' prefix to the value
return "transformed_" + value;
}, DataTypes.StringType);
public static void main(String[] args) throws IOException {
SparkSession spark = SparkSession.builder()
.appName("JsonFlattener")
.master("local")
.getOrCreate();
spark.udf().register("getQuarterOfYearUDF", getQuarterOfYearUDF, DataTypes.IntegerType);
// Read the JSON file
Dataset<Row> input = spark.read().option("multiLine","true").option("inferSchema","true").json("/home/akannan/INPUT/orders2.json");
input.show(false);
// Flatten the nested JSON recursively based on the level of nesting
Dataset<Row> flattened = flattenNestedJson(input);
flattened.printSchema();
flattened.show(false);
// Read the config file to determine which columns need transformation
String configPath = "/home/akannan/INPUT/orders_modify.json"; // Specify the path to your config file
String configFileContents = new String(Files.readAllBytes(Paths.get(configPath)));
Gson gson = new Gson();
Type listType = new TypeToken<List<UDFFullReplace.FieldTransformation>>() {
}.getType();
List<UDFFullReplace.FieldTransformation> transformations = gson.fromJson(configFileContents, listType);
// Apply the UDF transformation to the specified columns
Dataset<Row> transformed = applyTransformation(flattened, transformations);
transformed.show(false);
transformed.printSchema();
// Unflatten the DataFrame back to the original JSON format
Dataset<Row> unflattened = unflattenJson(transformed);
// Show the result
unflattened.show();
}
private static Dataset<Row> flattenNestedJson(Dataset<Row> input) {
StructType schema = input.schema();
return flattenNestedJson(input, "", schema);
}
private static Dataset<Row> flattenNestedJson(Dataset<Row> input, String prefix, DataType dataType) {
if (dataType instanceof StructType) {
StructType structType = (StructType) dataType;
for (StructField field : structType.fields()) {
String columnName = field.name();
String fullColumnName = prefix.isEmpty() ? columnName : prefix + "." + columnName;
DataType columnDataType = field.dataType();
input = flattenColumn(input, fullColumnName, columnDataType);
}
input = input.drop(prefix);
} else if (dataType instanceof ArrayType) {
ArrayType arrayType = (ArrayType) dataType;
DataType elementType = arrayType.elementType();
// input = input.withColumn(prefix, functions.explode(functions.col(prefix)))
// .selectExpr("*", prefix + ".*")
// .drop(prefix);
input = input.withColumn(prefix, functions.explode_outer(functions.col(prefix)));
input = flattenColumn(input, prefix, elementType);
}
return input;
}
private static Dataset<Row> flattenColumn(Dataset<Row> input, String columnName, DataType columnDataType) {
if (columnDataType instanceof StructType) {
input = flattenNestedJson(input, columnName, columnDataType);
} else if (columnDataType instanceof ArrayType) {
input = flattenNestedJson(input, columnName, columnDataType);
}
else {
input = input.withColumn(columnName, functions.col(columnName));
}
return input;
}
private static Dataset<Row> applyTransformation(Dataset<Row> input, List<UDFFullReplace.FieldTransformation> transformations) {
input.cache();
for (UDFFullReplace.FieldTransformation transformation : transformations) {
String[] fieldNames = transformation.getName().split("\\.");
String columnName = transformation.getName();
String udf = transformation.getUdf();
// input = input.withColumn(columnName, udfTransform.apply(functions.col(columnName)));
input = input.withColumn(columnName, functions.concat(functions.col(columnName), functions.lit("_suffix")));
}
return input;
}
private static Dataset<Row> unflattenJson(Dataset<Row> input) {
// Group the columns based on the nested structure and create a struct
StructType schema = input.schema();
StructField[] fields = schema.fields();
Column[] columns = new Column[fields.length];
for (int i = 0; i < fields.length; i++) {
columns[i] = functions.col(fields[i].name());
}
// Create a single column named 'value' with the struct
//Dataset<Row> withStructColumn = input.withColumn("value", functions.struct(columns));
Dataset<Row> withStructColumn = input.withColumn("value", functions.struct(
input.col("order.address.state").alias("state"),
input.col("order.name").alias("name"),
input.col("order.orders.order_id").alias("order_id"),
input.col("order.orders.quarter").alias("quarter")
));
// Convert the DataFrame to a JSON string
Dataset<Row> jsonString = withStructColumn.select(functions.to_json(functions.col("value")).alias("json"));
// Parse the JSON string and extract the struct column
return jsonString.select(functions.from_json(functions.col("json"), schema).alias("value")).select("value.*");
}
static UDF1<String, Integer> getQuarterOfYearUDF = (dateString) -> {
String[] parts = dateString.split("-");
int month = Integer.parseInt(parts[1]);
int quarter = (month - 1) / 3 + 1;
return quarter;
};
}
ERROR STACK
Exception in thread "main" org.apache.spark.sql.AnalysisException: Column 'order.address.state' does not exist. Did you mean one of the following? [order.address.state, order.orders.quarter, order.name, order.orders.order_id]; 'Project [concat('order.address.state, _suffix) AS order.address.state#95, order.name#15, order.orders.order_id#27, order.orders.quarter#34] +- Project [order.address.state#9, order.name#15, order.orders.order_id#27, order.orders.quarter#34] +- Project [order#0, order.address.state#9, order.name#15, order.orders.order_id#27, order.orders.quarter#34] +- Project [order#0, order.address.state#9, order.name#15, order.orders#22, order.orders.order_id#27, order#0.orders.quarter AS order.orders.quarter#34] +- Project [order#0, order.address.state#9, order.name#15, order.orders#22, order#0.orders.order_id AS order.orders.order_id#27] +- Project [order#0, order.address.state#9, order.name#15, order.orders#22] +- Generate explode(order#0.orders), true, [order.orders#22] +- Project [order#0, order.address.state#9, order#0.name AS order.name#15] +- Project [order#0, order#0.address.state AS order.address.state#9] +- Relation [order#0] json
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7(CheckAnalysis.scala:199)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$7$adapted(CheckAnalysis.scala:192)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:366)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:366)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:366)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6(CheckAnalysis.scala:192)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$6$adapted(CheckAnalysis.scala:192)
at scala.collection.immutable.Stream.foreach(Stream.scala:533)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:192)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:101)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:367)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:101)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:96)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:187)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:210)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3887)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1519)
at org.apache.spark.sql.Dataset.withColumns(Dataset.scala:2542)
at org.apache.spark.sql.Dataset.withColumn(Dataset.scala:2480)
at org.nypd.nextgen.data.pipeline.JsonFlattener.applyTransformation(JsonFlattener.java:116)
at org.nypd.nextgen.data.pipeline.JsonFlattener.main(JsonFlattener.java:57)
2条答案
按热度按时间iklwldmw1#
你的代码并不完整,但我在这里看到的是列名中的点的问题。
尽量避免点通常用于获得嵌套字段。一般不要使用任何特殊字符,只使用下划线。
如果您确实希望保留点,那么在访问指定列时,请使用反引号(`)
字符串
ylamdve62#
我看到你写了非常大的逻辑来扁平化数组列。您可以使用
inline
函数来展平行中的数组列,如下所示。使用
.
访问struct & array中的任何列。字符串