我正在处理json格式的大量数据,使用pyspark将其转换为parquet。在格拉尔,一切都是好的,但目前我有一些转变的问题。但我不能调试或我不;怎么过天桥。我可以转换大量的数据,但这似乎是一个问题,但我不能检测或跳过
pc_attrs = json.map(js_loads).filter(shrink_).map(
shrink_attrs).map(convert_time_pc_atts)
s3aRdd = spark_session.createDataFrame(pc_attrs, procAttrsSchema)
s3aRdd.write.mode('overwrite').partitionBy("day","tenantId").parquet(
s3_table_parquet)```
The exception:
```Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/root/appcache/application_1595414885588_0004/container_1595414885588_0004_01_000228/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/mnt/yarn/usercache/root/appcache/application_1595414885588_0004/container_1595414885588_0004_01_000228/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/mnt/yarn/usercache/root/appcache/application_1595414885588_0004/container_1595414885588_0004_01_000228/pyspark.zip/pyspark/serializers.py", line 393, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/mnt/yarn/usercache/root/appcache/application_1595414885588_0004/container_1595414885588_0004_01_000228/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args,**kwargs)
File "/usr/lib64/python3.6/json/__init__.py", line 354, in loads
return _default_decoder.decode(s)
File "/usr/lib64/python3.6/json/decoder.py", line 339, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib64/python3.6/json/decoder.py", line 355, in raw_decode
obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Unterminated string starting at: line 1 column 2649 (char 2648)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more```
暂无答案!
目前还没有任何答案,快来回答吧!