我有一个用pyspark编写的代码,它基本上做了一些转换。
df2=df1.select('*',F.explode(F.col('UseData')).alias('UseData1'))\
.select('*',F.explode(F.col('UseData1')).alias('UseData2'))\
.drop('UseData','UseData1','value')\
.select('*',F.explode(F.col('point'))).drop('point')\
.withColumn('label',F.col('UseData2.label')).filter(F.col('label')=='jrnyCount')\
.withColumn('value',F.col('UseData2.value'))\
.withColumn('datetime',F.col('UseData2.datetime'))\
.withColumn('latitude',F.col('col.latitude')).withColumn('longitude',F.col('col.longitude'))\
.drop('col','UseData2')\
.where(F.col('latitude').isNotNull() | F.col('longitude').isNotNull())
如果由于输入Dataframedf1中的坏数据而发生任何异常,是否有任何方法可以捕获?由于作业是在不同节点上的多个执行器中执行的,如何确保如果上面的任何一行中有任何错误,代码不会失败,并且忽略了错误数据?任何帮助都将受到高度赞赏。
1条答案
按热度按时间xe55xuns1#
异常处理是用python异常处理方法来完成的。但我认为您需要编写一些逻辑来忽略异常值/垃圾数据,这应该作为预处理的一部分手动完成;可能需要编写自定义项以根据条件过滤或更新数据。
如果在执行作业时出现错误或警告,您将在yarn日志中获得它,这样就不必在节点级别专门处理它们。