将pyspark.sql. datrame. dataframe类型转换为字典

wkyowqbh  于 11个月前  发布在  Spark
关注(0)|答案(5)|浏览(151)

我有一个pyspark Dataframe,我需要将其转换为python字典。
下面的代码是可复制的:

from pyspark.sql import Row
rdd = sc.parallelize([Row(name='Alice', age=5, height=80),Row(name='Alice', age=5, height=80),Row(name='Alice', age=10, height=80)])
df = rdd.toDF()

字符串
一旦我有了这个框架,我需要把它转换成字典。
我试过这样

df.set_index('name').to_dict()


但它会产生误差。我怎样才能做到这一点呢

dpiehjr4

dpiehjr41#

请看下面的例子:

>>> from pyspark.sql.functions import col
>>> df = (sc.textFile('data.txt')
            .map(lambda line: line.split(","))
            .toDF(['name','age','height'])
            .select(col('name'), col('age').cast('int'), col('height').cast('int')))

+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
|  Bob|  5|    80|
|Alice| 10|    80|
+-----+---+------+

>>> list_persons = map(lambda row: row.asDict(), df.collect())
>>> list_persons
[
    {'age': 5, 'name': u'Alice', 'height': 80}, 
    {'age': 5, 'name': u'Bob', 'height': 80}, 
    {'age': 10, 'name': u'Alice', 'height': 80}
]

>>> dict_persons = {person['name']: person for person in list_persons}
>>> dict_persons
{u'Bob': {'age': 5, 'name': u'Bob', 'height': 80}, u'Alice': {'age': 10, 'name': u'Alice', 'height': 80}}

字符串
我用来测试data.txt的输入:

Alice,5,80
Bob,5,80
Alice,10,80


首先我们使用pyspark通过阅读行来进行加载。然后我们通过逗号分割将行转换为列。然后我们将原生RDD转换为DF并将名称添加到冒号。最后我们将列转换为适当的格式。
然后我们将所有数据收集到驱动程序中,并使用一些python列表解析将数据转换为首选的形式。我们使用asDict()方法将Row对象转换为字典。在输出中,我们可以观察到Alice只出现一次,但这当然是因为Alice的键被覆盖了。
请记住,您希望在将结果返回给驱动程序之前,在pypspark中完成所有的处理和过滤。
希望有帮助,干杯。

mpbci0fu

mpbci0fu2#

首先需要使用toPandas()转换为pandas.DataFrame,然后可以使用orient='list'在转置的矩阵上使用to_dict()方法:

df.toPandas().set_index('name').T.to_dict('list')
# Out[1]: {u'Alice': [10, 80]}

字符串

iswrvxsc

iswrvxsc3#

RDD内置了asDict()函数,它允许将每一行表示为一个dict。
如果你有一个framedf,那么你需要将它转换为一个rdd并应用asDict()。

new_rdd = df.rdd.map(lambda row: row.asDict(True))

字符串
然后可以使用new_rdd来执行正常的python map操作,如:

# You can define normal python functions like below and plug them when needed
def transform(row):
    # Add a new key to each row
    row["new_key"] = "my_new_value"
    return row

new_rdd = new_rdd.map(lambda row: transform(row))

9bfwbjaz

9bfwbjaz4#

一个简单的方法是收集行RDD并使用字典理解对其进行遍历。这里我将尝试演示类似的东西:
让我们假设一个电影框架:
电影_DF
| movieId|平均额定值|
| --|--|
| 1 |三点九二|
| 10 |3.5|
| 100 |2.79|
| 100044 |4.0|
| 100068 |3.5|
| 100083 |3.5|
| 100106 |3.5|
| 100159 |4.5|
| 100163 |2.9|
| 100194 |4.5|
我们可以像下面这样对行RDD使用字典理解和解析:

movie_dict = {int(row.asDict()['movieId']) : row.asDict()['avg_rating'] for row in movie_avg_rating.collect()}
print(movie_dict)
{1: 3.92,
 10: 3.5,
 100: 2.79,
 100044: 4.0,
 100068: 3.5,
 100083: 3.5,
 100106: 3.5,
 100159: 4.5,
 100163: 2.9,
 100194: 4.5}

字符串

dhxwm5r4

dhxwm5r45#

这里有一个两行代码,用于非常简单的情况。一些更通用的东西可能使用lambda和asDict来生成值。
假设DataFrame df带有keyval包含列名的字符串:

list = df.select(key, val).collect()
dict = {row[key]: row[val] for row in list }

字符串

相关问题