写入parquet时pyspark出现异常

iqjalb3h  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(398)

我正在处理json格式的大量数据,使用pyspark将其转换为parquet。在格拉尔,一切都是好的,但目前我有一些转变的问题。但我不能调试或我不;怎么过天桥。我可以转换大量的数据,但这似乎是一个问题,但我不能检测或跳过

pc_attrs = json.map(js_loads).filter(shrink_).map(
        shrink_attrs).map(convert_time_pc_atts)

    s3aRdd = spark_session.createDataFrame(pc_attrs, procAttrsSchema)
    s3aRdd.write.mode('overwrite').partitionBy("day","tenantId").parquet(
        s3_table_parquet)```

The exception:

```Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/root/appcache/application_1595414885588_0004/container_1595414885588_0004_01_000228/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/mnt/yarn/usercache/root/appcache/application_1595414885588_0004/container_1595414885588_0004_01_000228/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt/yarn/usercache/root/appcache/application_1595414885588_0004/container_1595414885588_0004_01_000228/pyspark.zip/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/mnt/yarn/usercache/root/appcache/application_1595414885588_0004/container_1595414885588_0004_01_000228/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args,**kwargs)
  File "/usr/lib64/python3.6/json/__init__.py", line 354, in loads
    return _default_decoder.decode(s)
  File "/usr/lib64/python3.6/json/decoder.py", line 339, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib64/python3.6/json/decoder.py", line 355, in raw_decode
    obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Unterminated string starting at: line 1 column 2649 (char 2648)

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
    at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
    at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more```

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题