我尝试在我的rdd上做一些转换,为此,我使用map调用一个函数。但是,不会调用此函数。有人请告诉我我做错了什么?
我看得见 test
函数被调用但未被调用 store_past_info
```
def store_past_info(row):
print "------------------- store_past_info ------------------------------"
if row["transactiontype"] == "Return":
global prv_transaction_number
prv_transaction_number = row["transnumber"]
global return_occured
return_occured = True
global group_id
group_id.append(row["transnumber"])
if row["transactiontype"] == "Purchase":
if return_occured:
global group_id
group_id.append(prv_transaction_number)
else:
global group_id
group_id.append(row["transnumber"])
print group_id
def test(rdd):
print "------------------- test ------------------------------"
rdd.map(store_past_info).collect()
print group_id
这就是它在商店里的工作原理:
如果购买了某个项目,则会生成一个id。
如果您想从您购买的商品中退回一些商品,则会有两个条目
所有产品退货的带有新id的退货条目,带有 `org_id` 作为 `id` 您要退回的采购订单
使用相同的 `id` 作为你最后一次购买你想要保留的东西的id
输入
Date Type Id org_id
25-03-2018 Purchase 111
25-03-2018 Purchase 112
26-03-2018 Return 113 111
26-03-2018 Purchase 111
输出我想添加一个新的列组\u id,它将显示退货的相同id和退货后发生的相应购买(客户不做此购买,这是系统为每次退货保留条目的方式)步骤2.1
Date Type Id org_id group_id
25-03-2018 Purchase 111 111
25-03-2018 Purchase 112 112
26-03-2018 Return 113 111 113
26-03-2018 Purchase 111 113
1条答案
按热度按时间6yjfywim1#
iiuc,我相信你可以用
DataFrame
s、 apyspark.sql.Window
功能,以及crossJoin()
首先转换你的rdd
到Dataframe使用然后我们需要添加一个索引列来跟踪行的顺序。我们可以用
pyspark.sql.functions.monotonically_increasing_id()
. 这将保证值将增加(因此可以对它们进行排序),但并不意味着它们将是连续的。排序很重要,因为您希望查找返回之后的行。
下次使用
crossJoin
加入DataFrame
对自己。由于这将返回笛卡尔积,因此我们将仅对满足以下任一条件的行进行过滤:
l.Index = r.Index
(本质上是将一行连接到自身)(l.Id = r.org_id) AND (l.Index > r.Index)
(安)Id
等于org_id
从前面的行-这是索引列有用的地方)然后我们为
group_id
并将其设置为r.Id
如果满足第二个条件。否则,我们将此列设置为None
.我们就快到了,但正如你所看到的,还有两件事需要做。
我们需要消除重复的行
Index = 42949672960
我们需要填写表格group_id
对于它所在的行null
使用中的值Id
.第一步,我们将使用
Window
函数创建一个名为rowNum
. 这将是pyspark.sql.functions.row_number()
对于每个Index
按布尔条件排序group_id IS NULL
.对于有多个行的索引值
group_id
已设置,将首先排序。因此,我们只需要选择rowNum
等于1(row_number()
从1开始,而不是0)。完成后,第二步很简单-只需替换剩余的
null
值的值Id
.