我使用PySpark进行LDA。对于每一行,我计算主题(LDA结果的主题词),并将结果的单词列表附加到主列表中。问题是主题词的列表是pyspark.sql.column.Column
类型。* * 我知道如何将这个主题词列表转换为普通的Python字符串列表。**
下面是一段代码:
def map_ID_to_Word(termIndices):
words = []
for termID in termIndices:
words.append(vocab_broadcast.value[termID])
return words
udf_map_ID_to_Word = udf(map_ID_to_Word , ArrayType(StringType()))
tf_result = tf_model.transform(sdf)
tfidf_res = idf_model.transform(tf_res)
model = lda_model.fit(tfidf_res)
topics_df = model.describeTopics()
topic_words = udf_map_ID_to_Word(topics_df.termIndices)
# topic_words is of type <class 'pyspark.sql.column.Column'>
任何关于如何将<class 'pyspark.sql.column.Column'>的列表项转换为普通字符串列表的建议都非常感谢。
我的代码有点类似于这里:https://www.databricks.com/blog/2021/07/29/an-experimentation-pipeline-for-extracting-topics-from-text-data-using-pyspark.html
`print(topic_words)` gives this: `Column<'map_termID_to_Word(termIndices)'>`
和/或
print(type(topic_words)) gives this:
<class 'pyspark.sql.column.Column'>
我计划在主列表中收集所有这些主题词(每行),然后将该主列表附加到包含该文档的现有数据框架中。我有一段代码可以将字符串列表附加到现有的pyspark Dataframe 中。但是,问题是,我不知道如何得到每一行的单词列表。
- 更新**以下是我的代码的大图片:
vocab_read_list = vocab
vocab_broadcast = sc.broadcast(vocab_read_list)
master_topics_list = []
for iter in cdi_grouped_df.collect():
date = iter["date"]
brand = iter["brand"]
t_id = iter["t_id"]
word_list = iter["final_word_list"]
pdf = pd.DataFrame(data=[[word_list]], columns=["final_word_list"])
sdf = spark.createDataFrame(pdf)
print(f'SDF with final_word_list')
sdf.show()
# ------------
# We must make sure if the list is not empty to proceed.
# Word List: []
# SDF with final_word_list
# +---------------+
# |final_word_list|
# +---------------+
# | []|
# +---------------+
#-------------
if sdf.count() > 0:
first_row = sdf.collect()[0]
fwl = first_row["final_word_list"]
if len(fwl) > 0:
tf_result = tf_model.transform(sdf)
tfidf_result = idf_model.transform(tf_result)
model = lda_model.fit(tfidf_result)
topics_df = model.describeTopics()
topics_df.show()
topic_words = udf_map_termID_to_Word(topics_df.termIndices)
print(f"\nTopic Words:")
print(topic_words)
# Now let's add the topic_words column to your topics_df
topics_df = topics_df.withColumn("topic_desc", topic_words)
# And extract your wanted values into a Python list
topic_list = [x[0] for x in topics_df.select("topic_desc").collect()]
master_topics_list.append(topic_list)
在最后一行(topic_list =[x [0]...]),我得到了这个错误:
PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
'NameError: name 'vocab_broadcast' is not defined', from , line 4. Full traceback below:
Traceback (most recent call last):
File "", line 4, in map_termID_to_Word
NameError: name 'vocab_broadcast' is not defined
1条答案
按热度按时间toiithl61#
(Py)Spark中的
Column
对象与Pandas中的列对象不同,例如它并不真正包含数据本身,而是包含数据的DataFrame
中的列的表示。因此,为了能够访问实际数据,您需要将此列添加到DataFrame
中,然后从中提取所需的值。你可以做一些与你链接的例子类似的事情:
一个小小的警告:如果此列中的数据量非常大,则在执行此操作时应小心。在这种情况下执行
.collect()
操作可能会导致OutOfMemory异常。如果最后你想把它附加到一个Pyspark数据框中,你最好不要转换成一个列表,而只是使用union
操作符将一个数据框附加到另一个数据框中。当然,这取决于具体情况。