Apache Spark 高效地追加和更新增量表中的行

fjnneemd  于 2023-11-22  发布在  Apache
关注(0)|答案(1)|浏览(194)

我正在使用Azure数据块,需要在增量表中追加和更新记录。
由于我对Spark(和Databricks)还是个新手,我的主要问题是我是否走在正确的道路上。
所以,对于手头的问题:
1.我有一个delta表(源表),其中包含有关文件的信息(例如filepath)。
1.我运行三个并行进程,从源表中过滤出行,使用文件路径加载文件并从中提取数据(文件存储在blob存储中)。
1.一旦一个进程从一个文件中提取了数据,我想把结果写入另一个表(结果表)。
1.当所有三个进程都完成时:多个进程应该旋转并开始分析写入结果表的数据。
1.然后,应将分析结果写入/更新到与数据存储在结果表中相同的行中。

**所以对于第一种情况,将提取数据附加到结果表(步骤2):**我应该使用正常的“写”函数,例如:

spark.createDataFrame([file_data_dict]).write.format("delta").mode("append").saveAsTable(table_path)

字符串
目标是追加一个新行

我应该转向结构化流媒体吗?如果是的话,为什么?我已经用writeStream尝试了一些,但要么我用错了,要么这是错误的用例。因为我没有看到关于时间的改进。我的代码:

# Start stream
strmdf = spark.readStream.format("delta").load(stream_path)
q = strmdf.writeStream.format("delta").outputMode("append").option("checkpointLocation", f"{stream_path}/_checkpoint").start(stream_path)

# Save into stream
spark.createDataFrame([data_dict]).write.format("delta").mode("append").saveAsTable(stream_path)


我一直在研究的另一件事是将UDFforEach与流一起使用,但还没有完全成功。

qv7cva1a

qv7cva1a1#

我尝试过PySpark并行处理一系列文件,比如处理不同格式的文件,包括TXTCSVJSON

from pyspark.sql import SparkSession
import concurrent.futures
def process_txt(row):
    if row["filename"].endswith(".txt"):
        return f"Processing TXT file: {row['filepath']}"
def process_csv(row):
    if row["filename"].endswith(".csv"):
        return f"Processing CSV file: {row['filepath']}"
def process_json(row):
    if row["filename"].endswith(".json"):
        return f"Processing JSON file: {row['filepath']}"
data = [
    ("Hello_world___.txt", "/FileStore/tables/Hello_world___.txt"),
    ("example.csv", "/FileStore/tables/example.csv"),
    ("jsonexample.json", "/FileStore/tables/jsonexample.json")
]
spark = SparkSession.builder \
    .appName("parallel-processing") \
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
    .getOrCreate()
df = spark.createDataFrame(data, ["filename", "filepath"])
def process_partition(iterator):
    results = []
    for row in iterator:
        if row["filename"].endswith(".txt"):
            results.append(process_txt(row))
        elif row["filename"].endswith(".csv"):
            results.append(process_csv(row))
        elif row["filename"].endswith(".json"):
            results.append(process_json(row))
    return results
results = df.rdd.mapPartitions(process_partition).collect()
result_df = spark.createDataFrame([(result,) for result in results], ["result"])
result_df.write.mode("append").option("mergeSchema", "true").saveAsTable("result_table")
for result in results:
    print(result)

字符串


的数据
从文件数据列表中构造一个DataFrame,并定义一个函数来并行处理DataFrame。最后,它将处理后的结果写入Spark SQL表。

  • 在代码中,我从pyspark.sql模块导入Libariries,并导入concurrent.futures进行并行处理。
  • process_txtprocess_csvprocess_json被定义为处理TXT、CSV和JSON格式的文件
  • 一个名为data的列表,包含文件名和文件路径的元组,表示不同的文件格式
  • 定义一个名为process_partition的函数,它并行处理每个分区。它检查文件扩展名,并根据文件格式应用适当的处理函数。
  • 在DataFrame的RDD(弹性分布式数据集)上使用mapPartitions方法,将process_partition函数并行应用于每个分区。
  • 处理结果中的名为result_df的新DataFrame。
  • 代码使用saveAsTable方法将result_df DataFrame写入名为“result_table”的Spark SQL表。modeoption方法用于指定写入模式和模式合并选项。

使用上述流程可以利用Apache Spark的并行处理能力,以分布式和可扩展的方式处理不同的文件格式。

向Delta表追加数据:

analyzed_data.write.format("delta").option("mergeSchema", "true").mode("append").save(table_path)

保存分析数据为Delta表:

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
analyzed_data.write.format("delta").mode("overwrite").saveAsTable("result_table")

相关问题