在parquet中写入时修改时间戳

bvhaajcl  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(398)

我有一个spark应用程序,它加载csv文件,将其转换为Parquet文件,将Parquet文件存储在我的数据湖存储器中,然后将数据加载到bigquery表中。
问题是当csv有太多的旧时间戳值时,转换会发生,但是时间戳列不能显示在bigquery表中。
当我设置配置时 spark.sql.parquet.outputTimestampTypeTIMESTAMP_MICROS ,我在bigquery上遇到以下错误:

Cannot return an invalid timestamp value of -62135607600000000 microseconds relative to the Unix epoch. The range of valid timestamp values is [0001-01-1 00:00:00, 9999-12-31 23:59:59.999999]; error in writing field reference_date

当我设置配置时 spark.sql.parquet.outputTimestampTypeTIMESTAMP_MILLIS ,我在气流上有个错误:

Error while reading data, error message: Invalid timestamp value -62135607600000 for field 'reference_date' of type 'INT64' (logical type 'TIMESTAMP_MILLIS'): generic::out_of_range: Invalid timestamp value: -62135607600000

csv文件:

id,reference_date
"6829baef-bcd9-412a-a2f3-abdfed02jsd","0001-01-02 21:00:00"

读取csv文件(并转换 reference_date 到时间戳列):

def castDFColumn(
  df: DataFrame,
  column: String,
  dataType: DataType
): DataFrame = df.withColumn(column, df(column).cast(dataType))

...
var df = spark
  .read
  .format("csv")
  .option("header", true)
  .load("myfile.csv")

df = castDFColumn(df, "reference_date", TimestampType)

转换为文件Parquet文件:

df
  .write
  .mode("overwrite")
  .parquet("path/to/save")

spark应用程序运行时配置:

val conf = new SparkConf().setAppName("Load CSV")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS/TIMESTAMP_MICROS")
conf.set("spark.sql.session.timeZone", "UTC")

时间戳好像改成了 0000-12-31 21:00:00 ,或者类似的,超出了可接受的范围 INT64 时间戳。
有人经历过吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题