如何枚举spark的Dataframe中的列?如果列是嵌套的呢?

vqlkdk9b  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(452)

表面上它没有 items() 方法。那怎么办?
我正在尝试用以下代码将行发送到数据库:

def write_row(table_name, cur, row):
    data = []
    for key, value in row.items():
        data.append((key, value))
    data = zip(*data)

    columns = ", ".join(data[0])
    values = data[1]
    questionmarks = ", ".join(["?"] * len(columns))

    query = f"INSERT INTO {table_name} ({columns}) VALUES ({questionmarks})"
    cur.execute(query, values)

def write_data_frame(df, epoch1):
    conn = mariadb.connect(**config["mariadb"])
    cur = conn.cursor()

    table_name = "pysparktest"

    rows = df.collect()
    for row in rows:
        write_row(table_name, cur, row)

    conn.commit()

它发誓

AttributeError: items

如果行是嵌套的呢?

root
 |-- track: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- version: string (nullable = true)
 |-- car: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- version: string (nullable = true)
 |-- cnt: long (nullable = false)
 |-- minBestLapTime: double (nullable = true)
guz6ccqo

guz6ccqo1#

就像编译器发誓的那样,row类中没有名为“items()”的方法。
你需要做的是使用“asdict”方法。它以python dict的形式输出行中的键、值。
对于嵌套列,asdict函数中有一个名为recursive的参数,将其设置为true。默认情况下,设置为false。
例如:

row = Row(name="Alice", age=11)
row_as_dict = row.asDict()
row_as_dict

输出:

{'name': 'Alice', 'age': 11}

对于迭代:

for key in row_as_dict:
    print("{} : {}".format(key, row_as_dict[key]))

输出:

name : Alice
age : 11

如果是嵌套列

row = Row(key=1, value=Row(name='a', age=2))
row_as_dict = row.asDict(recursive=True)
row_as_dict

输出:

{'key': 1, 'value': {'name': 'a', 'age': 2}}

相关问题