在pyspark中创建类的apachespark rdd

efzxgjgh  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(539)

我必须把scala代码转换成python。scala代码将string的rdd转换为case类的rdd。代码如下:

case class Stock(
                  stockName: String,
                  dt: String,
                  openPrice: Double,
                  highPrice: Double,
                  lowPrice: Double,
                  closePrice: Double,
                  adjClosePrice: Double,
                  volume: Double
                )

  def parseStock(inputRecord: String, stockName: String): Stock = {
    val coloumn = inputRecord.split(",")
    Stock(
      stockName,
      coloumn(0),
      coloumn(1).toDouble,
      coloumn(2).toDouble,
      coloumn(3).toDouble,
      coloumn(4).toDouble,
      coloumn(5).toDouble,
      coloumn(6).toDouble)
  }

  def parseRDD(rdd: RDD[String], stockName: String): RDD[Stock] = {
    val header = rdd.first
    rdd.filter((data) => {
      data(0) != header(0) && !data.contains("null")
    })
      .map(data => parseStock(data, stockName))
  }

有可能在pyspark中实现这一点吗?我试着使用下面的代码,它给出了错误

from dataclasses import dataclass

@dataclass(eq=True,frozen=True)
class Stock:
    stockName : str
    dt: str
    openPrice: float
    highPrice: float
    lowPrice: float
    closePrice: float
    adjClosePrice: float
    volume: float

def parseStock(inputRecord, stockName):
  coloumn = inputRecord.split(",")
  return Stock(stockName,
               coloumn[0],
               coloumn[1],
               coloumn[2],
               coloumn[3],
               coloumn[4],
               coloumn[5],
               coloumn[6])

def parseRDD(rdd, stockName):
  header = rdd.first()
  res = rdd.filter(lambda data : data != header).map(lambda data : parseStock(data, stockName))
  return res

error py4jjavaerror:调用z:org.apache.spark.api.pythonrdd.collectandserve时出错:org.apache.spark.sparkexception:由于阶段失败而中止作业:阶段21.0中的任务0失败了1次,最近的失败:阶段21.0中的任务0.0丢失(tid 31,localhost,executor driver):org.apache.spark.api.python异常:回溯(上次调用):
file“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py”,第364行,在main func、profiler、deserializer、serializer=read\u命令(pickleser、infle)file“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py”,第69行,在read\u command command=serializer.\u read\u with \u length(file)file“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py”中,第173行,在loads return pickle.loads(obj)file“/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py”中,第587行,在loads return pickle.loads(obj,encoding=encoding)attributeerror:无法获取上的属性'main'

nfs0ujit

nfs0ujit1#

数据集api不适用于python。
“数据集是数据的分布式集合。dataset是spark 1.6中添加的一个新接口,它提供了RDD(强类型、使用强大lambda函数的能力)的优点和spark sql优化的执行引擎的优点。可以从jvm对象构造数据集,然后使用函数转换(map、flatmap、filter等)进行操作。数据集api有scala和java两种版本。python不支持datasetapi。但是由于python的动态特性,datasetapi的许多优点已经可用(即,您可以通过row.columnname自然地访问一行的字段)。r的情况类似。”
https://spark.apache.org/docs/latest/sql-programming-guide.html

相关问题