如何基于包含pyspark中元组数组的rdd中的第一个元素进行过滤?

vwkv1x7d  于 2021-05-24  发布在  Spark
关注(0)|答案(2)|浏览(376)

我在从rdd筛选元组列表时遇到问题。
示例business.json

{"business_id":"gnKjwL_1w79qoiV3IC_xQQ","state":"NC","postal_code":"28210","latitude":35.092564,"longitude":-80.859132,"stars":4.0},
{"business_id":"xvX2CttrVhyG2z1dFg_0xw","state":"AZ","postal_code":"85338","latitude":33.4556129678,"longitude":-112.3955963552,"stars":5.0}
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

stars = "4.0"
input_business_lines = sc.textFile('data/business.json') \
    .map(lambda lines: json.loads(lines))

business_ids = input_business_lines \
    .map(lambda kv: (kv['business_id'], kv['stars'], kv['state'])) \
    .filter(lambda kv: kv[1] >= float(stars)).map(lambda kv: (kv[0], kv[2])).collect()

上面的代码返回每个元组拥有的元组列表 (first element = business_id , second element = state) ```
[('gnKjwL_1w79qoiV3IC_xQQ', 'NC'),('xvX2CttrVhyG2z1dFg_0xw', 'AZ'),...,('HhyxOkGAM07SRYtlQ4wMFQ', 'NC')]

到目前为止一切都很好。现在我需要创建一个join with reviews表,并希望用review的rdd过滤所有匹配的业务id。如果这是一个Dataframe,那就容易多了。但在元组的情况下,我不知道我们如何才能做到这一点。
这是我的尝试
-注意:但只有当business\u id是business\u id的列表而不是元组时,它才起作用
示例review.json

{"review_id": "c-6aA9Bd7JxpmMroRoas9A", "user_id": "bK4Y_GZUoAUTXIrmeEUGYw", "business_id": "gnKjwL_1w79qoiV3IC_xQQ", "stars": 4.0, "text": "Went there Saturday noon they open at 12pm but people were waiting outside before 12pm so you can tell it should be a good place. Nice Katsu & Eel with rice. Many Japanese go there.", "date": "2014-07-13 20:28:18"},
{"review_id": "EhvpZ1-MzemK1EMBUf19gQ", "user_id": "p0IderpL5zE4D021CXVxtA", "business_id": "KWywu2tTEPWmR9JnBc0WyQ", "stars": 5.0, "text": "I came here for my 40th .First of they offer free limo service to and from which was really cool. When we first arrived we were greeted by the manager he walked us up to the top floor where the male review was and told us we had the choice of seats wherever we wanted. We were also given two drink coupons per person so we ordered a drink and got seated. The show started off slowly but what I really liked is the guys came out to sit and talk to you, also they were really cool about taking pictures. By the end of the night it was so crowded but it was amazing had the best time I definitely will be back. I hope soon.", "date": "2015-11-20 05:24:45"}

代码

input_review_lines = sc.textFile('data/review.json').map(lambda lines: json.loads(lines))
rew_ids_bus_ids = input_review_lines
.map(lambda kv: (kv['user_id'], kv['business_id']))
.filter(lambda kv: kv[1] in business_ids).collect()
rew_ids_bus_ids

wd2eg0qa

wd2eg0qa1#

你可以加入他们 rdd s。

import json

stars = 4.0
input_business_lines = sc.textFile('test.json') \
    .map(lambda lines: json.loads(lines))

business_ids = input_business_lines \
    .filter(lambda kv: kv['stars'] >= stars) \
    .map(lambda kv: (kv['business_id'], kv['state']))

print(business_ids.collect())

input_review_lines = sc.textFile('test2.json') \
    .map(lambda lines: json.loads(lines))

rew_ids_bus_ids = input_review_lines \
    .map(lambda kv: (kv['business_id'], kv['user_id']))

joined = business_ids \
    .join(rew_ids_bus_ids)

print(joined.collect())

# [('gnKjwL_1w79qoiV3IC_xQQ', 'NC'), ('xvX2CttrVhyG2z1dFg_0xw', 'AZ')]

# [('gnKjwL_1w79qoiV3IC_xQQ', ('NC', 'bK4Y_GZUoAUTXIrmeEUGYw'))]
rta7y2nd

rta7y2nd2#

import json
from pyspark import SparkContext

if __name__ == '__main__':
    input_review_json_path = "publicdata/review.json"
    input_business_json_path = "publicdata/business.json"
    output_csv_path = "outputs/user_state.csv"
    stars = "4.0"

    sc = SparkContext.getOrCreate()

    input_business_lines = sc.textFile(input_business_json_path) \
                             .map(lambda lines: json.loads(lines))

    business_ids = input_business_lines \
                        .map(lambda kv: (kv['business_id'], kv['stars'], kv['state'])) \
                        .filter(lambda kv: kv[1] >= float(stars)).map(lambda kv: (kv[0], kv[2]))

    input_review_lines = sc.textFile(input_review_json_path) \
                            .map(lambda lines: json.loads(lines))

    rew_ids_bus_ids = input_review_lines.map(lambda kv: (kv['business_id'], kv['user_id']))
    finalRdd = business_ids.join(rew_ids_bus_ids).map(lambda kv: (kv[0], kv[1][0]))

    review_rdd = finalRdd.collect()

相关问题