我必须把scala代码转换成python。scala代码将string的rdd转换为case类的rdd。代码如下:
case class Stock(
stockName: String,
dt: String,
openPrice: Double,
highPrice: Double,
lowPrice: Double,
closePrice: Double,
adjClosePrice: Double,
volume: Double
)
def parseStock(inputRecord: String, stockName: String): Stock = {
val coloumn = inputRecord.split(",")
Stock(
stockName,
coloumn(0),
coloumn(1).toDouble,
coloumn(2).toDouble,
coloumn(3).toDouble,
coloumn(4).toDouble,
coloumn(5).toDouble,
coloumn(6).toDouble)
}
def parseRDD(rdd: RDD[String], stockName: String): RDD[Stock] = {
val header = rdd.first
rdd.filter((data) => {
data(0) != header(0) && !data.contains("null")
})
.map(data => parseStock(data, stockName))
}
有可能在pyspark中实现这一点吗?我试着使用下面的代码,它给出了错误
from dataclasses import dataclass
@dataclass(eq=True,frozen=True)
class Stock:
stockName : str
dt: str
openPrice: float
highPrice: float
lowPrice: float
closePrice: float
adjClosePrice: float
volume: float
def parseStock(inputRecord, stockName):
coloumn = inputRecord.split(",")
return Stock(stockName,
coloumn[0],
coloumn[1],
coloumn[2],
coloumn[3],
coloumn[4],
coloumn[5],
coloumn[6])
def parseRDD(rdd, stockName):
header = rdd.first()
res = rdd.filter(lambda data : data != header).map(lambda data : parseStock(data, stockName))
return res
error py4jjavaerror:调用z:org.apache.spark.api.pythonrdd.collectandserve时出错:org.apache.spark.sparkexception:由于阶段失败而中止作业:阶段21.0中的任务0失败了1次,最近的失败:阶段21.0中的任务0.0丢失(tid 31,localhost,executor driver):org.apache.spark.api.python异常:回溯(上次调用):
file“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py”,第364行,在main func、profiler、deserializer、serializer=read\u命令(pickleser、infle)file“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py”,第69行,在read\u command command=serializer.\u read\u with \u length(file)file“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py”中,第173行,在loads return pickle.loads(obj)file“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py”中,第587行,在loads return pickle.loads(obj,encoding=encoding)attributeerror:无法获取上的属性'main'
1条答案
按热度按时间nfs0ujit1#
数据集api不适用于python。
“数据集是数据的分布式集合。dataset是spark 1.6中添加的一个新接口,它提供了RDD(强类型、使用强大lambda函数的能力)的优点和spark sql优化的执行引擎的优点。可以从jvm对象构造数据集,然后使用函数转换(map、flatmap、filter等)进行操作。数据集api有scala和java两种版本。python不支持datasetapi。但是由于python的动态特性,datasetapi的许多优点已经可用(即,您可以通过row.columnname自然地访问一行的字段)。r的情况类似。”
https://spark.apache.org/docs/latest/sql-programming-guide.html