将Dataframe保存为.txt或.csv文件

k4aesqcs  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(1210)

我正在研究一种机器学习算法来预测以太坊的价格。我已经有一个小的数据集,我正在做预测。我可以在终端机上打印预测,我可以看到它们。但是,我无法将它们保存为text/csv文件。这是我的密码

from pyspark.sql.types import *
from pyspark.sql import Row, SparkSession
from pyspark.mllib.util import MLUtils
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

from pyspark.ml.regression import LinearRegression
from pyspark.sql.types import DateType

from pyspark import SparkContext
import pyspark
import datetime

sc = pyspark.SparkContext()
spark = SparkSession.builder.appName('Ethereum').getOrCreate()
 #get csv file as a DataFram object
data = spark.read.csv('hdfs://andromeda.student.eecs.qmul.ac.uk/user/cln31/ethereum', header=True,inferSchema=True)

# DataFrame type

data = data.select(data.date.cast("int"),
                    data.PriceBTC.cast("float"),
                    data.PriceUSD.cast("float"),
                    data.TxCnt.cast("float"),
                    data.TxTfrValMedUSD.cast("float"),
                    data.CapMrktCurUSD.cast("float"),
                    data.IssContUSD.cast("float"),
                    data.TxTfrValMeanUSD.cast("float"),
                    data.TxTfrValUSD.cast("float"))

data.printSchema()

featureassembler=VectorAssembler(inputCols=["date","TxTfrValMedUSD","CapMrktCurUSD","TxCnt","TxTfrValUSD", "IssContUSD", "TxTfrValMeanUSD"],outputCol="Independent Features")
output = featureassembler.setHandleInvalid("skip").transform(data)
output.show()

output.select("Independent Features").show()

finalized_data=output.select("Independent Features","PriceUSD")
finalized_data.show()

train_data,test_data=finalized_data.randomSplit([0.75,0.25])

regressor=LinearRegression(featuresCol='Independent Features', labelCol='PriceUSD')
regressor=regressor.fit(train_data)

test_data1 = output.filter(data.date >= 1455408000) #2016.02.14
test_data1 = test_data1.filter(test_data1.date <= 1561852800) #2019.06.30

test_data1 = test_data1.select("Independent Features","PriceUSD")

test_data1.show()

pred_results=regressor.evaluate(test_data1)
pred_results.predictions.describe().show()

pred_results.predictions.write.csv("partCOut.csv")

我基本上是想保存pred\u results.predictions的输出。这是我得到的错误:
pyspark.sql.utils.analysisexception:u'csv数据源不支持 struct<type:tinyint,size:int,indices:array<int>,values:array<double>> 数据类型。;'

nzkunb0c

nzkunb0c1#

听起来很奇怪。我最近处理过类似的事情。我认为问题是我试图保存的对象不是实际的Dataframe。将其转换为Dataframe解决了问题,然后我可以保存该文件(在我的场景中,它将被保存到一个SQLServer表中)。
试试这样的。


# your code...

df = pd.DataFrame(mylist)

display(df)

# convert python df to spark df

spark_df = spark.createDataframe(df)

# write df out as table

spark_df.write.csv("/rawdata/AAA.csv")

要检查对象的类型,请尝试以下操作。

z = []
type(z)
<type 'list'>

z = ()
type(z)
<type 'tuple'>

相关问题