PySpark -将RDD转换为对键值RDD

1l5u6lss  于 2022-11-25  发布在  Apache
关注(0)|答案(2)|浏览(215)

我从CSV创建了rdd lines = sc.textFile(data)现在我需要将行转换为键值rdd,其中value将是字符串(拆分后),key将是csv的列数,例如CSV
| 第1栏|列2|
| - -|- -|
| 七十三|小行星230666|
| 五十五|小行星149610|
我想得到rdd.take(1):【(1,73),(2,230666)】

I create rdd  of lists 
lines_of_list = lines_data.map(lambda line  : line.split(','))

I create function that get list and return list of tuples (key, value)
def list_of_tuple (l):
  list_tup = []
  for i in range(len(l[0])):
    list_tup.append((l[0][i],i))
  return(list_tup)

But I can’t  get the correct result when I try to map this function on RDD
lpwwtiir

lpwwtiir1#

你可以使用PySpark的create_map函数来实现,如下所示:

from pyspark.sql.functions import create_map, col, lit

df = spark.createDataFrame([(73, 230666), (55, 149610)], "Col1: int, Col2: int")
mapped_df = df.select(create_map(lit(1), col("Col1")).alias("mappedCol1"), create_map(lit(2), col("Col2")).alias("mappedCol2"))
mapped_df.show()

+----------+-------------+
|mappedCol1|   mappedCol2|
+----------+-------------+
| {1 -> 73}|{2 -> 230666}|
| {1 -> 55}|{2 -> 149610}|
+----------+-------------+

如果你仍然想使用RDD API,那么它是DataFrame的一个属性,所以你可以这样使用它:

mapped_df.rdd.take(1)

Out[32]: [Row(mappedCol1={1: 73}, mappedCol2={2: 230666})]
tjjdgumg

tjjdgumg2#

我以下列方式修正问题:

def list_of_tuple (line_rdd):
  l = line_rdd.split(',')
  list_tup = []
  for i in range(len(l)):
    list_tup.append((l[i],i))
  return(list_tup)

pairs_rdd = lines_data.map(lambda line: list_of_tuple(line))

相关问题