Apache Spark 使用toTable在数据块中写入流不会对每个批处理执行

s3fp2yjn  于 2023-02-24  发布在  Apache
关注(0)|答案(1)|浏览(132)

下面的代码工作正常,也就是说,数据被写入输出表,并且可以在10秒内从表中选择。问题是foreachBatch没有执行。
当我用.format(“console”)测试它并调用.start()时,foreachBatch就运行了,所以我觉得.toTable()应该受到责备。
此代码使用Kafka连接器,但事件集线器连接器存在相同的问题。
如果我尝试在toTable()之后添加.start(),则会得到错误
“StreamingQuery”对象没有属性“start”
下面是除foreachBatch之外的其他代码

TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhub.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=mykeyname;SharedAccessKey=mykey;EntityPath=myentitypath;\";"

df = spark.readStream \
    .format("kafka") \
    .option("subscribe", TOPIC) \
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", EH_SASL) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "60000") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", "earliest") \
    .load()

n = 100
count = 0

def run_command(batchDF, epoch_id):
    global count
    count += 1
    if count % n == 0:
        spark.sql("OPTIMIZE firstcatalog.bronze.factorydatas3 ZORDER BY (readtimestamp)")

...Omitted code where I transform the data in the value column to strongly typed data...

myTypedDF.writeStream \
    .foreachBatch(run_command) \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
    .partitionBy("somecolumn") \
    .toTable("myunitycatalog.bronze.mytable")
6yoyoihd

6yoyoihd1#

您可以执行foreachBatchtoTable,但不能两者都执行。您可以在foreachBatch函数内部将写入移动到表-只需确保执行幂等写入,因为批处理可能会重新启动。请将代码更改为:

def run_command(batchDF, epoch_id):
    global count
    batchDF.write.format("delta") \
       .option("txnVersion", epoch_id) \
       .option("txnAppId", "my_app") \
       .partitionBy("somecolumn") \
       .mode("append") \
       .saveAsTable("myunitycatalog.bronze.mytable")
    count += 1
    if count % n == 0:
        spark.sql("OPTIMIZE myunitycatalog.bronze.mytable ZORDER BY (readtimestamp)")

myTypedDF.writeStream \
    .foreachBatch(run_command) \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
    .start()

相关问题