我应该先说,我一直在使用aws glue studio来学习如何使用pyspark的胶水,到目前为止进展非常顺利。直到我遇到一个我无法理解(更不用说解决)的错误。下面是数据示例。
上下文
我所做的只是一个简单的数据转换。 Input S3 Bucket --> CustomTransform --> Output S3
. 但程序在导出部分数据后不断崩溃。稍后我也提到了,但我甚至尝试删除customtransformation,但是s3数据导出仍然失败,即使只是从一个bucket导出到另一个bucket。
错误
下面是我得到的错误的python部分(复制自cloudwatch):
2021-03-26 09:03:09,200 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last):
File "/tmp/GlueTest.py", line 69, in <module>
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "glueparquet", connection_options = {
"path": "s3://example-bucket-here/data/",
"compression": "snappy",
"partitionKeys": []
}, transformation_ctx = "DataSink0")
File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 640, in from_options
format_options, transformation_ctx)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 242, in write_dynamic_frame_from_options
format, format_options, transformation_ctx)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 265, in write_from_options
return sink.write(frame_or_dfc)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write
return self.writeFrame(dynamic_frame_or_dfc, info)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame
return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
return f(*a,**kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o85.pyWriteDynamicFrame.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most recent failure: Lost task 4.3 in stage 1.0 (TID 76, 172.36.109.34, executor 6): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
真正的难题
最让我困惑的是,这个崩溃发生在它已经将大部分数据导出到s3之后。这会立即表明数据有问题,因为它会导致一些损坏(或格式不好)的数据,然后崩溃。
因此,我查看了成功导出的数据和输入数据之间的差异,找到了所有未导出的行。对我来说,没有什么是奇怪的,也没有什么是出口失败的原因。
当我选择s3 bucket作为输入源时,知道模式是由aws glue推断出来的可能会有所帮助。
我试过的
所以我试着用glue支持的所有不同格式导出数据,但都不起作用。我还尝试跳过所有的数据转换,只获取输入s3 bucket并直接导出到输出s3 bucket,但它仍然崩溃并出现相同的错误(实际上这就是我上面包含的错误消息!)。
同样,这一切都表明数据有问题,但我查看了所有没有通过该过程的数据(只有大约180条记录),所有数据看起来都和通过该过程的数据一样。
为了进行健全性检查,我在一些其他(非常类似的)数据上使用了input s3-->output s3方法,它工作得很好,基本上起到了复制粘贴的作用。
我也看到了这篇文章。但这并没有真正起到帮助,当我试图更改输出格式以获取更多信息时,我遇到了相同的错误-没有额外的信息。
有没有人能帮忙找出这个问题?没有任何迹象表明这会崩溃。我很乐意提供java错误的其余部分,如果这对人们有帮助的话。
数据示例
以下是我的数据:
Date ticker_name currency exchange_name instrument_type first_trade_date Amount
1612229400 0382.HK HKD HKG EQUITY 1563240600 0.049
1613140200 SO USD NYQ EQUITY 378657000 0.64
1613053800 SIGI USD NMS EQUITY 322151400 0.25
1614240000 SIGT.L GBp LSE EQUITY 828601200 1.68
1612249200 SIH.BE EUR BER EQUITY 1252044000 0.038
除日期(long)、首次交易日期(long)和金额(double)外,所有字段都是字符串。
当我打电话的时候 .printSchema()
我得到以下信息:
root
|-- Date: long
|-- currency: string
|-- exchange_name: string
|-- instrument_type: string
|-- first_trade_date: long
|-- Amount: double
1条答案
按热度按时间uttx8gqw1#
解决方案
因此,如果有人有这个问题,这可能是令人沮丧的,因为这个错误似乎没有提供任何信息,说明什么是真正出了问题。我仅有的线索之一就是这篇文章。这说明我的模式有问题。
我不得不非常仔细地查看我的数据,最终注意到只有当我将某些文件与其他文件一起运行时才会出现这个错误。
原来我的一些Parquet文件上有日期
int
在其他时候它是一个float
. 此数据是使用从Dataframe创建的.to_parquet()
在另一个函数中,所以我不确定为什么数据类型不一致。最让我困惑的是,为什么当我尝试将日期类型转换为all时
int
(如图所示)我仍然得到了错误。无论如何,我的解决方案是修复pandas输出数据的方式,并确保在glue处理数据之前,它总是将日期作为整数输出。