给定有效输入时pyspark randomsplit出错

jhkqcmku  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(368)

数据科学家和开发人员。我在使用随机分割函数时遇到了问题。我可以打印出输入Dataframe,它看起来没有任何问题。然而,当我对这样的数据报应用randomplit函数时,它给了我一个无法理解的错误。我注意到,如果我的feature列(来自vectorasembly)只包含一个热编码器,那么代码运行良好。但是,当矢量组合由数字列和一个热编码器组成时,randomsplit函数将不起作用。谢谢你看这个!

Here is a portion of the code I am referring to 

# IT DOES NOT WORK WHEN I HAVE A COMBINATION OF DOUBLE/INTEGER TYPE WITH THE ONE HOT ENCODER TYPE!!

# feature_columns = ['tenure','MonthlyCharges','TotalCharges']+['onehot_' + c for c in categorical_columns]

feature_columns = ['onehot_' + c for c in categorical_columns] 

# Full code

from pyspark import SparkContext
from pyspark.sql.functions import col
sc = SparkContext(master = 'local')

from pyspark.sql import SparkSession
spark = SparkSession.builder \
          .appName("Python Spark SQL basic example") \
          .config("spark.some.config.option", "some-value") \
          .getOrCreate()

cuse = spark.read.csv("data/datasets_13996_18858_WA_Fn-UseC_-Telco-Customer-Churn.csv", header=True, inferSchema=True)
cuse=cuse.withColumn('TotalCharges', col('TotalCharges').cast('double'))

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# categorical columns

categorical_columns = cuse.columns[1:-3]
print(categorical_columns)
categorical_columns.remove('tenure')
print(categorical_columns)

stringindexer_stages = [StringIndexer(inputCol=c, outputCol='strindexed_' + c) for c in categorical_columns]

# encode label column and add it to stringindexer_stages

stringindexer_stages += [StringIndexer(inputCol='Churn', outputCol='label')]

onehotencoder_stages = [OneHotEncoder(inputCol='strindexed_' + c, outputCol='onehot_' + c) for c in categorical_columns]

# IT DOES NOT WORK WHEN I HAVE A COMBINATION OF DOUBLE/INTEGER TYPE WITH THE ONE HOT ENCODER TYPE!!

<div>

# feature_columns = ['tenure','MonthlyCharges','TotalCharges']+['onehot_' + c for c in categorical_columns]

</div>
feature_columns = ['onehot_' + c for c in categorical_columns]
vectorassembler_stage = VectorAssembler(inputCols=feature_columns, outputCol='features')

all_stages = stringindexer_stages + onehotencoder_stages + [vectorassembler_stage]
pipeline = Pipeline(stages=all_stages)
pipeline_model = pipeline.fit(cuse)

final_columns = ['features', 'label']
cuse_df = pipeline_model.transform(cuse).\
            select(final_columns)

cuse_df.limit(3).toPandas()
training, test = cuse_df.randomSplit([0.8, 0.2], seed=1234)
training.limit(3).toPandas()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题