我试图在pyspark中创建一个管道,以便为随机林准备数据。我使用的是spark 2.2(2.2.0.2.6.4.0-91)。
我的数据不包含空值。我确定了分类列和数字列。
我正在对分类列进行编码并定义我的标签(options['vae'])。然后我使用vectorassembler为我的特征得到一个向量列。
在管道中,经过拟合和转换后,我应该得到一个sparkDataframe,其中包含一个label列和一个features(vector)列。
不幸的是,在安装时,spark出现了以下错误:
回溯(最近一次呼叫):
文件“/usr/hdp/current/spark2 client/python/lib/pyspark.zip/pyspark/sql/utils.py”,第63行,deco格式
文件“/usr/hdp/current/spark2 client/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py”,第319行,在get\u return\u value中
py4j.protocol.py4jjavaerror:调用o4563.fit时出错:java.lang.illegalargumentexception:要求失败:输出列t\u q\u dav\u索引已存在*
我在别的地方没有发现这种错误。所以我在这里寻求帮助。
以下是我的管道代码:
print("One Hot Encoding...")
stringIndexer = [StringIndexer(inputCol=col, outputCol=col + "_indexed").setHandleInvalid("keep") for col in categoricalColumns]
encoders = [OneHotEncoder(dropLast=True, inputCol=col + '_indexed', outputCol=col+ "_class") for col in categoricalColumns]
label_stringIdx = StringIndexer(inputCol = options['vae'], outputCol = 'label')
print("Vectorizing...")
assembler = VectorAssembler(inputCols=[col+ "_class" for col in categoricalColumns] + numericCols, outputCol="features")
print("Pipeline...")
pipeline = Pipeline(stages = stringIndexer + encoders + [label_stringIdx] + [assembler])
print('Fit...')
pipelineModel = pipeline.fit(s_df)
print('Transform...')
s_df = pipelineModel.transform(s_df)
selectedCols = ['label', 'features'] + cols
s_df = s_df.select(selectedCols)
暂无答案!
目前还没有任何答案,快来回答吧!