使用pyspark和aws glue进行数据交换

iyfjxgzm  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(427)

我是pyspark的新手,在数据转换方面面临一些挑战。我正在用aws胶水来做这项工作。当前数据如下所示:

+-----------------+-----+------+-----+
|  Country        |Code |1969  |1979 |
+-----------------+------------------+
|  United States  | USA | 1234 | 4569|
--------------------------------------

我需要把数据转换成:

+-----------------+-----+-------+----------+
|Country          |Code | Year | Population| 
+-----------------+-------------------------
|United States.   |USA  | 1969 | 1234.     |
--------------------------------------------
|United States.   |USA  | 1970 | 4569.     |
--------------------------------------------

我试图使用胶水Map功能,但这比那复杂得多。任何帮助都将不胜感激。

eit6fx6z

eit6fx6z1#

我想你需要的是一个相当于Pandas融化的Pypark:

from typing import Iterable

from pyspark.sql import functions as F
from pyspark.sql import DataFrame
def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

然后

melt(df, id_vars=['Country', 'Code'], value_vars=['1969', '1979']
    var_name=['Year'], value_name=['Population'] ).show()

相关问题