解析Pyspark/Databricks中的数据加载

ggazkfy8  于 2023-08-03  发布在  Spark
关注(0)|答案(1)|浏览(124)

我有超过5 M的xml文件位于GS桶中,我想在Pyspark中解析。
我想加载XML文件,最大化PySpark的多处理功能。代码如下:
def read_process_files(path):spark = SparkSession.builder.getOrCreate()

# Parsing the XML files
df = spark.read.format('com.databricks.spark.xml').option('rowTag', 'article').load(path)

# Apply the UDF to parse XML
df_parsed = df.withColumn('parsed_data', parse_xml_udf(col('body')))

# Add a boolean column verifying if the article is in the Pathway candidate list
df_parsed = df_parsed.withColumn("criteria", col("parsed_data.issn").isin(candidate_issn_list))

# Write to JSON files
json_dir = f'gs://setup/processed/articles/'
df_parsed.select(
    "parsed_data.article_title",
    "parsed_data.alt_title",
    "parsed_data.journal_title",
    "parsed_data.issn",
    "parsed_data.year",
    "parsed_data.abstract",
    "parsed_data.body",
    "parsed_data.uri",
    "parsed_data.doi",
    "criteria"
).write.mode("overwrite").format("json").option("ignoreNullFields", "false").save(json_dir)

个字符
问题是无法加载SOURCE DIR setup/raw/articles/xml_test,因为它的内存太大。
我想解析加载过程,以便以10为单位运行代码。
我已经尝试了不同的方法,但都没有成功,我不想遍历文件并单独加载它们。任何帮助都是受欢迎的。
谢啦,谢啦

gwbalxhn

gwbalxhn1#

尝试在读取大文件时添加此选项:.option(“fetchsize”,20)

df = spark.read.format('com.databricks.spark.xml').option('rowTag', 'article').option("fetchsize", 20).load(path)

字符串
请让我知道这是否解决了你的问题,如果没有,也让我知道。

相关问题