如何在PySpark中使用foreach或foreachBatch写数据库?

ecfdbz9o  于 2022-12-27  发布在  Apache
关注(0)|答案(3)|浏览(355)

我想用Python(PySpark)从Kafka源码到MariaDB做Spark结构化流(Spark 2.4.x)。
我想使用流Spark Dataframe ,而不是静态或Pandas Dataframe 。
It seems that one has to use foreach or foreachBatch since there are no possible database sinks for streamed dataframes according to https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks.
下面是我的尝试:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType
from pyspark.sql import DataFrameWriter
# configuration of target db
db_target_url = "jdbc:mysql://localhost/database"
db_target_properties = {"user":"writer", "password":"1234"}
# schema
schema_simple = StructType([StructField("Signal", StringType()),StructField("Value", DoubleType())])

# create spark session
spark = SparkSession.builder.appName("streamer").getOrCreate()

# create DataFrame representing the stream
df = spark.readStream \
  .format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "mytopic") \
  .load() \
  .selectExpr("Timestamp", "cast (value as string) as json") \
  .select("Timestamp", F.from_json("json", schema_simple).alias('json_wrapper')) \
  .selectExpr("Timestamp", "json_wrapper.Signal", "json_wrapper.Value")
df.printSchema()
# Do some dummy processing
df2 = df.filter("Value < 11111111111")
print("df2: ", df2.isStreaming)

def process_row(row):
    # Process row
    row.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
    pass
query = df2.writeStream.foreach(process_row).start()

我得到一个错误:
属性错误:写
为什么?

mi7gmzs6

mi7gmzs61#

tl;drforeach替换为foreachBatch

引用正式文件:
foreachforeachBatch操作允许您在流式查询的输出上应用任意操作和写入逻辑。它们的使用情形略有不同-foreach允许在每行上自定义写入逻辑,foreachBatch允许在每个微批处理的输出上应用任意操作和自定义逻辑。
换句话说,您的writeStream.foreach(process_row)作用于没有write.jdbc可用的单行(数据),因此产生错误。
将行看作一段数据,可以使用任何API将其保存在任何位置。
如果您确实需要Spark的支持(并且确实使用write.jdbc),那么实际上应该使用foreachBatch
foreach允许在每一行上定制写入逻辑,而foreachBatch允许在每一微批处理的输出上进行任意运算和定制逻辑。

chhkpiq4

chhkpiq42#

在Jacek的支持下,我可以修正我的示例:

def process_row(df, epoch_id):
    df2.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
    pass
query = df2.writeStream.foreachBatch(process_row).start()

你还必须把epoch_id放入函数参数中,否则你会在spark日志文件中得到jupyter笔记本中没有显示的错误。

soat7uwm

soat7uwm3#

#
here working code spark Structured Streaming  (3.2.1)
from kafka to postgres

   

spark = SparkSession.builder.appName(stg).getOrCreate()                  
jdbcDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "<>") \
    .option("subscribe", "<>") \
    .option("startingOffsets", "earliest") \
    .load()


jdbcDF = jdbcDF.withColumn("value",col("value").cast(StringType()))

df = jdbcDF

def foreach_batch_function(df, epoch_id):
    df.printSchema()
    df.show()
    df.write \
    .format("jdbc") \
    .mode("overwrite") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", url) \
    .option("dbtable", stg) \
    .option("user", pg_username) \
    .option("password", pg_password) \
    .option("truncate", True) \
    .save()

writing_sink = df.writeStream \
    .trigger(processingTime='12 seconds') \
    .foreachBatch(foreach_batch_function) \
    .start()
    

writing_sink.awaitTermination()

spark.stop()
  • 需要jar库

通用池2 - 2.11.1. jar、kafka客户端-3.2.1. jar、postgresql-42.5.0. jar、Spark-sql-kafka-0 - 10_2.12 - 3.2.1. jar、Spark-令牌-提供者-kafka-0 - 10_2.12 - 3.2.1. jar

相关问题