为什么在我将“mapvalues()”应用到rdd之后属性就消失了?

6yt4nkrj  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(371)

我正在使用stack overflow公共数据转储,试图找出给定的问题是否具有前10个常见标记之一。数据是这样的 <row Body="..." Id="1740" Tags="<machine-learning><spark><regression>" ... /> .
首先,我从训练数据中找出了10个最常见的标签。

import os
from pyspark import SparkContext
sc = SparkContext("local[*]", "temp")

def localpath(path):
    return 'file://' + str(os.path.abspath(os.path.curdir)) + '/' + path

class Record(object):
    def __init__(self, attributes):
        self.attr = attributes

    @classmethod
    def parse(cls, line):
        attributes = xmlparser(line)
        return cls(attributes)

def isRow(line):
    return "<row" in line

tags_10 = sc.textFile(localpath('spark-stats-data/allPosts/*')) \
            .filter(lambda x: isRow(x)) \
            .map(Record.parse) \
            .filter(lambda x: x.attr is not None and x.attr.get('Tags')) \
            .flatMap(lambda x: (x.attr['Tags'].strip('<>').split('><'))) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(lambda x, y: x + y) \
            .map(lambda x: (x[1], x[0])) \
            .sortByKey(ascending = False) \
            .take(10)

tags_10_words = [v for k, v in tags_10]
topwords_BV = sc.broadcast(tags_10_words)

当我试图解析 Body , Tags ,和 Id 从数据来看,我遇到了一个问题。

import mwparserfromhell as mwp

def bodyParser(body):
    try:
        return mwp.parse(body).strip_code().replace('\n',' ')
    except:
        return ''

train = sc.textFile(localpath("spark-stats-data/train/*")) \
          .filter(lambda x: isRow(x)) \
          .map(Record.parse) \
          .filter(lambda x: x.attr is not None and x.attr.get('Tags') and x.attr.get('Body') and x.attr.get('Id')) \
          .map(lambda x: (bodyParser(x.attr['Body']), x.attr['Id'], x.attr['Tags'].strip('<>').split('><'))) \
          .filter(lambda x: x[0]) \
          .mapValues(lambda x: [int(word in x) for word in topwords_BV.value]) \
          .map(lambda x: [x[0]] + x[1])

问题是我只看到了书中的文字 Body 以及关于 Tags 但不是那个 Id 属性(例如,参见 train.take(2)[1] 下面)。为什么会发生这样的事?我怎么能 Id 数据之外?

('I am carrying out an analysis using a 4$\\times$2 crosstab. I found an overall significant difference but I would like to find if there are significant differences among the 4 groups.  Is there a way to carry out these multiple comparisons?',
 [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

我怀疑 .mapValues() 是罪魁祸首因为如果我把它移开 Id :

test_out = sc.textFile(localpath("spark-stats-data/train/*")) \
             .filter(lambda x: isRow(x)) \
             .map(Record.parse) \
             .filter(lambda x: x.attr is not None and x.attr.get('Tags') and x.attr.get('Body') and x.attr.get('Id')) \
             .map(lambda x: (bodyParser(x.attr['Body']), x.attr['Tags'].strip('<>').split('><'), x.attr['Id'])) \
             .filter(lambda x: x[0]) \
             .take(2)

这是从 test_out[1] :

('I am carrying out an analysis using a 4$\\times$2 crosstab. I found an overall significant difference but I would like to find if there are significant differences among the 4 groups.  Is there a way to carry out these multiple comparisons?',
 ['chi-squared', 'multiple-comparisons'],
 '114743')

所以我的问题是,我怎样才能留住 Id 在应用 .mapValues 台阶?我非常感谢你的帮助!
附加问题:如果我想按 Id (升序),什么地方最好加这个?谢谢!!

8ftvxx2r

8ftvxx2r1#

我解决了我自己的问题!
基本思想是先分组 Body 以及 Id 所以每一行(( Body , Id ), Tags )然后申请 .mapValues() . 在得到的Map之后 Tags ,我做了一些额外的解包以 [Body, Id, 0, 1, ...., 0] (12个元素:正文、问题id、是否有10个最常见的标签)。

train = sc.textFile(localpath("spark-stats-data/train/*")) \
          .filter(lambda x: isRow(x)) \
          .map(Record.parse) \
          .filter(lambda x: x.attr is not None and x.attr.get('Tags') and x.attr.get('Body') and x.attr.get('Id')) \
          .map(lambda x: (bodyParser(x.attr['Body']), x.attr['Tags'].strip('<>').split('><'), x.attr['Id'])) \
          .filter(lambda x: x[0]) \
          .map(lambda x: ((x[0], x[2]), x[1])) \
          .mapValues(lambda x: [int(word in x) for word in topwords_BV.value]) \
          .map(lambda x: [x[0][0]] + [x[0][1]] + x[1])

相关问题