如何使用pyspark(2.1.0)LdA?

scyqe7ek  于 2023-04-29  发布在  Spark
关注(0)|答案(3)|浏览(113)

我使用pyspark的LDAModel从语料库中获取主题。我的目标是找到与每个文档相关的主题。为此,我尝试按照Docs设置topicDistributionCol。因为我是新手,所以我不知道这个专栏的目的是什么。

from pyspark.ml.clustering import LDA
lda_model = LDA(k=10, optimizer="em").setTopicDistributionCol("topicDistributionCol")
// documents is valid dataset for this lda model
lda_model = lda_model.fit(documents)
transformed = lda_model.transform(documents)

topics = lda_model.describeTopics(maxTermsPerTopic=num_words_per_topic)
print("The topics described by their top-weighted terms:")
print topics.show(truncate=False)

它列出了包含termIndices和termWeights的所有主题。

下面的代码将给予我topicDistributionCol.每一行都是针对每个文档的。

print transformed.select("topicDistributionCol").show(truncate=False)

我想得到这样的文档主题矩阵。pysparks LDA模型可以吗?

doc | topic 
1   |  [2,4]
2   |  [3,4,6]

注意:我已经使用gensims LDA模型通过以下代码完成了这一点。但是我需要使用pysparks LDA模型。

texts = [[word for word in document.lower().split() if word not in stoplist] for document in documents]
dictionary = corpora.Dictionary(texts)

corpus = [dictionary.doc2bow(text) for text in texts]
doc_topics = LdaModel(corpus=corpus, id2word=dictionary, num_topics=10, passes=10)
## to fetch topics for one document
vec_bow = dictionary.doc2bow(text[0])
Topics = doc_topics[vec_bow]
Topic_list = [x[0] for x in Topics]
## topic list is [1,5]
slhcrj9b

slhcrj9b1#

使用toPandas可以帮助:

df_p = transformed.select('topicDistributionCol').toPandas()
df_p1 = df_p.topicDistribution.apply(lambda x:np.array(x))
df_p2 = pd.DataFrame(df_p1.tolist()).apply(lambda x:x.argmax(),axis=1)
df_p3 = df_p2.reset_index()
df_p3.columns = ['doc','topic']
df_p3
qhhrdooz

qhhrdooz2#

**不要把你的数据出Spark!**这违背了使用Spark的目的。如果你实际上不需要它,使用scikit。

加上@kevin提供的答案,一旦你在LDA模型transformed = model.transform(input_data)上运行了transform方法来获得你的主题分布,你应该看到一个包含以下列的数据框:

+--------------------+
|  topicDistribution |
+--------------------+
|[8.11971897779803...|
|[0.00341037397939...|
|[0.00141274604502...|

这是针对n维三角形单形形状中的所有顶点的每行(预测)的测量值列表,该n维三角形单形形状是LDA模型。
您可以使用下面这个方便的小udf,通过在上面的col中的概率分布上运行它来自动Map每行ONEtop topic**。
简而言之,我们正在寻找列表中的最大概率,这是您的首要主题。概率测量的顺序与您的主题**相同。

from pyspark.sql.types import IntegerType
import pyspark.sql.functions as f

@f.udf(returnType=IntegerType())
def top_topic(topic_dist):
    dict = {prob: i for i, prob in enumerate(topic_dist)}
    return dict.get(max(topic_dist))

如果你只想MapONEtop topic为分布中的每一行,你的时间复杂度将是O(n)。
但是如果你试图为每个文档建模多个关系,你可以修改如下的方法,以支持最多****个主题,这将给予你的复杂度为O(n ^ k),其中k是你想要返回的主题的数量。所以保持k合理;)

from pyspark.sql.types import ArrayType, IntegerType
import pyspark.sql.functions as f

@f.udf(returnType=ArrayType(IntegerType()))
def top_3_topics(topic_dist):
    k = 3
    res = []
    mutable = list(topic_dist)
    dict = {prob: i for i, prob in enumerate(topic_dist)}
    for i in range(k):
        current = dict.get(max(mutable))
        res.append(current)
        mutable[current] = 0.0

    del mutable
    return res

完成后,你会得到这个:

+--------------------+-----+
|  topicDistribution |topic|
+--------------------+-----+
|[8.11971897779803...|   19|
|[0.00341037397939...|   12|
|[0.00141274604502...|   19|

或者这样(如果需要多个主题,则使用ArrayType列):

+--------------------+------------+
|  topicDistribution |       topic|
+--------------------+------------+
|[8.11971897779803...|  [19, 1, 7]|
|[0.00341037397939...| [12, 16, 4]|
|[0.00141274604502...|[19, 16, 11]|

这样做的目的是将主题列附加到现有结果中。
命名您的主题并将主题标签自动加入到结果中:

from pyspark.sql.types import StringType, StructType, StructField

schema = StructType([
            StructField("topic", StringType(), True),
            StructField("topic_name", StringType(), True)])

data = [('0', 'politics'),
        ('1', 'global'),
        ('2', 'crime')]

        topics_data = self.spark.createDataFrame(data, schema)
        results = results.join(topics_data, 'topic', 'left')
q7solyqu

q7solyqu3#

我认为这个问题有一个简单的答案。请执行以下操作:

transformed.take(10)

输出的最后一列将是“topicDistribution”,即文档主题分布。

相关问题