spark-dataframe-cast列用于kudu兼容性

xnifntxz  于 2021-06-26  发布在  Impala
关注(0)|答案(1)|浏览(328)

(我是spark、impala和kudu的新手。)我正在尝试通过kudu将一个表从oracle数据库复制到spark中具有相同结构的impala表。代码尝试Maporacle时出错 NUMBER 到kudu数据类型。如何更改spark的数据类型 DataFrame 让它和Kudu兼容?
这是一个从oracle到impala的1对1数据拷贝。我提取了源表的oracle模式,并创建了具有相同结构(相同的列名和合理的数据类型Map)的目标impala表。我希望spark+kudu能自动Map所有内容,并复制数据。相反,kudu抱怨说它无法绘制Map DecimalType(38,0) .
我想指定“column#1,name some#col,这是一个 NUMBER 在oracle中,应Map到 LongType ,这在kudu中得到了支持。
我该怎么做?

// This works
val df: DataFrame = spark.read
  .option("fetchsize", 10000)
  .option("driver", "oracle.jdbc.driver.OracleDriver")
  .jdbc("jdbc:oracle:thin:@(DESCRIPTION=...)", "SCHEMA.TABLE_NAME", partitions, props)

// This does not work  
kuduContext.insertRows(df.toDF(colNamesLower: _*), "impala::schema.table_name")
// Error: No support for Spark SQL type DecimalType(38,0)
// See https://github.com/cloudera/kudu/blob/master/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala

// So let's see the Spark data types
df.dtypes.foreach{case (colName, colType) => println(s"$colName: $colType")}
// Spark  data type: SOME_COL DecimalType(38,0)
// Oracle data type: SOME_COL NUMBER -- no precision specifier; values are int/long
// Kudu   data type: SOME_COL BIGINT
44u64gxh

44u64gxh1#

显然,我们可以在从jdbc数据源读取时指定一个自定义模式。

connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

成功了。我可以指定一个 customSchema 像这样:

col1 Long, col2 Timestamp, col3 Double, col4 String

这样,代码就起作用了:

import spark.implicits._
val df: Dataset[case_class_for_table] = spark.read
  .option("fetchsize", 10000)
  .option("driver", "oracle.jdbc.driver.OracleDriver")
  .jdbc("jdbc:oracle:thin:@(DESCRIPTION=...)", "SCHEMA.TABLE_NAME", partitions, props)
  .as[case_class_for_table]
kuduContext.insertRows(df.toDF(colNamesLower: _*), "impala::schema.table_name")

相关问题