如何使用pyspark管道并行化Dataframe转换?

vs3odd8k  于 2021-06-02  发布在  Hadoop
关注(0)|答案(0)|浏览(354)

我的方法如下。我认为,由于我使用for循环并分别处理每一列,所以这段代码不会分发,因为完成它需要很多小时。我在pysparkDataframe中有8000列。你能告诉我一个正确的方法来实现这个并行/分布式的方式吗。谢谢

indexers = [ 
 StringIndexer(inputCol=c, outputCol="{}_idx".format(c)) for c in cat_string]

encoders1 = [
  OneHotEncoder(
    inputCol=idx.getOutputCol(),
    outputCol="{0}_enc".format(idx.getOutputCol())) for idx in indexers]

encoders2 = [
    OneHotEncoder(
     inputCol=c,
      outputCol="{0}_enc".format(c)) for c in cat_numeric]

label_indexer = StringIndexer(inputCol = "dep", outputCol = "label")

assembler = VectorAssembler(
 inputCols=[enc.getOutputCol() for enc in encoders1] +[enc.getOutputCol()    for enc in encoders2]+ con_columns,
outputCol="features")

pipeline = Pipeline(stages=indexers + encoders1 +encoders2+[label_indexer]+[assembler])

model = pipeline.fit(final_df)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题