pyspark rdd map未调用函数

jjjwad0x  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(553)

我尝试在我的rdd上做一些转换,为此,我使用map调用一个函数。但是,不会调用此函数。有人请告诉我我做错了什么?
我看得见 test 函数被调用但未被调用 store_past_info ```
def store_past_info(row):
print "------------------- store_past_info ------------------------------"

if row["transactiontype"] == "Return":
    global prv_transaction_number
    prv_transaction_number = row["transnumber"]
    global return_occured
    return_occured = True
    global group_id
    group_id.append(row["transnumber"])

if row["transactiontype"] == "Purchase":
        if return_occured:
            global group_id
            group_id.append(prv_transaction_number)
        else:
            global group_id
            group_id.append(row["transnumber"])

print group_id

def test(rdd):
print "------------------- test ------------------------------"
rdd.map(store_past_info).collect()
print group_id

这就是它在商店里的工作原理:
如果购买了某个项目,则会生成一个id。
如果您想从您购买的商品中退回一些商品,则会有两个条目
所有产品退货的带有新id的退货条目,带有 `org_id` 作为 `id` 您要退回的采购订单
使用相同的 `id` 作为你最后一次购买你想要保留的东西的id
输入

Date Type Id org_id
25-03-2018 Purchase 111
25-03-2018 Purchase 112
26-03-2018 Return 113 111
26-03-2018 Purchase 111

输出我想添加一个新的列组\u id,它将显示退货的相同id和退货后发生的相应购买(客户不做此购买,这是系统为每次退货保留条目的方式)步骤2.1

Date Type Id org_id group_id
25-03-2018 Purchase 111 111
25-03-2018 Purchase 112 112
26-03-2018 Return 113 111 113
26-03-2018 Purchase 111 113

6yjfywim

6yjfywim1#

iiuc,我相信你可以用 DataFrame s、 a pyspark.sql.Window 功能,以及 crossJoin() 首先转换你的 rdd 到Dataframe使用

df = rdd.toDF()  # you may have to specify the column names
df.show()

# +----------+--------+---+------+

# |      Date|    Type| Id|org_id|

# +----------+--------+---+------+

# |25-03-2018|Purchase|111|  null|

# |25-03-2018|Purchase|112|  null|

# |26-03-2018|  Return|113|   111|

# |26-03-2018|Purchase|111|  null|

# +----------+--------+---+------+

然后我们需要添加一个索引列来跟踪行的顺序。我们可以用 pyspark.sql.functions.monotonically_increasing_id() . 这将保证值将增加(因此可以对它们进行排序),但并不意味着它们将是连续的。

import pyspark.sql.functions as f
df = df.withColumn('Index', f.monotonically_increasing_id())
df.show()

# +----------+--------+---+------+-----------+

# |      Date|    Type| Id|org_id|      Index|

# +----------+--------+---+------+-----------+

# |25-03-2018|Purchase|111|  null| 8589934592|

# |25-03-2018|Purchase|112|  null|17179869184|

# |26-03-2018|  Return|113|   111|34359738368|

# |26-03-2018|Purchase|111|  null|42949672960|

# +----------+--------+---+------+-----------+

排序很重要,因为您希望查找返回之后的行。
下次使用 crossJoin 加入 DataFrame 对自己。
由于这将返回笛卡尔积,因此我们将仅对满足以下任一条件的行进行过滤: l.Index = r.Index (本质上是将一行连接到自身) (l.Id = r.org_id) AND (l.Index > r.Index) (安) Id 等于 org_id 从前面的行-这是索引列有用的地方)
然后我们为 group_id 并将其设置为 r.Id 如果满足第二个条件。否则,我们将此列设置为 None .

df1 = df.alias('l').crossJoin(df.alias('r'))\
    .where('(l.Index = r.Index) OR ((l.Id = r.org_id) AND (l.Index > r.Index))')\
    .select(
        'l.Index',
        'l.Date',
        'l.Type',
        'l.Id',
        'l.org_id',
        f.when(
            (f.col('l.Id') == f.col('r.org_id')) & (f.col('l.Index') > f.col('r.Index')),
            f.col('r.Id')
        ).otherwise(f.lit(None)).alias('group_id')
    )
df1.show()

# +-----------+----------+--------+---+------+--------+

# |      Index|      Date|    Type| Id|org_id|group_id|

# +-----------+----------+--------+---+------+--------+

# | 8589934592|25-03-2018|Purchase|111|  null|    null|

# |17179869184|25-03-2018|Purchase|112|  null|    null|

# |34359738368|26-03-2018|  Return|113|   111|    null|

# |42949672960|26-03-2018|Purchase|111|  null|     113|

# |42949672960|26-03-2018|Purchase|111|  null|    null|

# +-----------+----------+--------+---+------+--------+

我们就快到了,但正如你所看到的,还有两件事需要做。
我们需要消除重复的行 Index = 42949672960 我们需要填写表格 group_id 对于它所在的行 null 使用中的值 Id .
第一步,我们将使用 Window 函数创建一个名为 rowNum . 这将是 pyspark.sql.functions.row_number() 对于每个 Index 按布尔条件排序 group_id IS NULL .
对于有多个行的索引值 group_id 已设置,将首先排序。因此,我们只需要选择 rowNum 等于1( row_number() 从1开始,而不是0)。
完成后,第二步很简单-只需替换剩余的 null 值的值 Id .

from pyspark.sql import Window
w = Window.partitionBy(f.col('Index')).orderBy(f.isnull('group_id'))
df2 = df1.withColumn('rowNum', f.row_number().over(w))\
    .where(f.col('rowNum')==1)\
    .sort('Index')\
    .select(
        'Date',
        'Type',
        'Id',
        'org_id',
        f.when(
            f.isnull('group_id'),
            f.col('Id')
        ).otherwise(f.col('group_id')).alias('group_id')
    )

df2.show()

# +----------+--------+---+------+--------+

# |      Date|    Type| Id|org_id|group_id|

# +----------+--------+---+------+--------+

# |25-03-2018|Purchase|111|  null|     111|

# |25-03-2018|Purchase|112|  null|     112|

# |26-03-2018|  Return|113|   111|     113|

# |26-03-2018|Purchase|111|  null|     113|

# +----------+--------+---+------+--------+

相关问题