如何根据包含pyspark中元组数组的rdd中的第一个元素进行过滤?

How to filter based on first element in rdd containing array of tuples in pyspark?

我在从 rdd 过滤元组列表时遇到问题。

样本business.json

{"business_id":"gnKjwL_1w79qoiV3IC_xQQ","state":"NC","postal_code":"28210","latitude":35.092564,"longitude":-80.859132,"stars":4.0},
{"business_id":"xvX2CttrVhyG2z1dFg_0xw","state":"AZ","postal_code":"85338","latitude":33.4556129678,"longitude":-112.3955963552,"stars":5.0}
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

stars = "4.0"
input_business_lines = sc.textFile('data/business.json') \
    .map(lambda lines: json.loads(lines))

business_ids = input_business_lines \
    .map(lambda kv: (kv['business_id'], kv['stars'], kv['state'])) \
    .filter(lambda kv: kv[1] >= float(stars)).map(lambda kv: (kv[0], kv[2])).collect()

上面的代码returns元组列表每个元组有(first element = business_id , second element = state)

[('gnKjwL_1w79qoiV3IC_xQQ', 'NC'),('xvX2CttrVhyG2z1dFg_0xw', 'AZ'),...,('HhyxOkGAM07SRYtlQ4wMFQ', 'NC')]

到目前为止一切都很好。 现在我需要加入评论 table 并想用评论的 rdd 过滤所有匹配的业务 ID。如果那是一个数据框,那就容易多了。但是如果是元组,我不确定我们该怎么做。

这是我的尝试

- 注意:但只有当 business_ids 是 business_ids 的列表而不是元组

时才有效

样本review.json

{"review_id": "c-6aA9Bd7JxpmMroRoas9A", "user_id": "bK4Y_GZUoAUTXIrmeEUGYw", "business_id": "gnKjwL_1w79qoiV3IC_xQQ", "stars": 4.0, "text": "Went there Saturday noon they open at 12pm but people were waiting outside before 12pm so you can tell it should be a good place. Nice Katsu & Eel with rice. Many Japanese go there.", "date": "2014-07-13 20:28:18"},
{"review_id": "EhvpZ1-MzemK1EMBUf19gQ", "user_id": "p0IderpL5zE4D021CXVxtA", "business_id": "KWywu2tTEPWmR9JnBc0WyQ", "stars": 5.0, "text": "I came here for my 40th .First of they offer free limo service to and from which was really cool.  When we first arrived we were greeted by the manager he walked us up to the top floor where the male review was and told us we had the choice of seats wherever we wanted.  We were also given two drink coupons per person so we ordered a drink and got seated. The show started off slowly but what I really liked is the guys came out to sit and talk to you, also they were really cool about taking pictures. By the end of the night it was so crowded but it was amazing had the best time I definitely will be back. I hope soon.", "date": "2015-11-20 05:24:45"}

代码

input_review_lines = sc.textFile('data/review.json').map(lambda lines: json.loads(lines))
rew_ids_bus_ids = input_review_lines \
                    .map(lambda kv: (kv['user_id'], kv['business_id'])) \
                    .filter(lambda kv: kv[1] in business_ids).collect()
rew_ids_bus_ids

您可以加​​入那些 rdds。

import json

stars = 4.0
input_business_lines = sc.textFile('test.json') \
    .map(lambda lines: json.loads(lines))

business_ids = input_business_lines \
    .filter(lambda kv: kv['stars'] >= stars) \
    .map(lambda kv: (kv['business_id'], kv['state']))

print(business_ids.collect())

input_review_lines = sc.textFile('test2.json') \
    .map(lambda lines: json.loads(lines))

rew_ids_bus_ids = input_review_lines \
    .map(lambda kv: (kv['business_id'], kv['user_id']))

joined = business_ids \
    .join(rew_ids_bus_ids)

print(joined.collect())


# [('gnKjwL_1w79qoiV3IC_xQQ', 'NC'), ('xvX2CttrVhyG2z1dFg_0xw', 'AZ')]
# [('gnKjwL_1w79qoiV3IC_xQQ', ('NC', 'bK4Y_GZUoAUTXIrmeEUGYw'))]
import json
from pyspark import SparkContext

if __name__ == '__main__':
    input_review_json_path = "publicdata/review.json"
    input_business_json_path = "publicdata/business.json"
    output_csv_path = "outputs/user_state.csv"
    stars = "4.0"

    sc = SparkContext.getOrCreate()

    input_business_lines = sc.textFile(input_business_json_path) \
                             .map(lambda lines: json.loads(lines))

    business_ids = input_business_lines \
                        .map(lambda kv: (kv['business_id'], kv['stars'], kv['state'])) \
                        .filter(lambda kv: kv[1] >= float(stars)).map(lambda kv: (kv[0], kv[2]))

    input_review_lines = sc.textFile(input_review_json_path) \
                            .map(lambda lines: json.loads(lines))

    rew_ids_bus_ids = input_review_lines.map(lambda kv: (kv['business_id'], kv['user_id']))
    finalRdd = business_ids.join(rew_ids_bus_ids).map(lambda kv: (kv[0], kv[1][0]))

    review_rdd = finalRdd.collect()