下面的代码工作正常,也就是说,数据被写入输出表,并且可以在10秒内从表中选择。问题是foreachBatch没有执行。
当我用.format(“console”)测试它并调用.start()时,foreachBatch就运行了,所以我觉得.toTable()应该受到责备。
此代码使用Kafka连接器,但事件集线器连接器存在相同的问题。
如果我尝试在toTable()之后添加.start(),则会得到错误
“StreamingQuery”对象没有属性“start”
下面是除foreachBatch之外的其他代码
TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhub.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=mykeyname;SharedAccessKey=mykey;EntityPath=myentitypath;\";"
df = spark.readStream \
.format("kafka") \
.option("subscribe", TOPIC) \
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.request.timeout.ms", "60000") \
.option("kafka.session.timeout.ms", "60000") \
.option("failOnDataLoss", "false") \
.option("startingOffsets", "earliest") \
.load()
n = 100
count = 0
def run_command(batchDF, epoch_id):
global count
count += 1
if count % n == 0:
spark.sql("OPTIMIZE firstcatalog.bronze.factorydatas3 ZORDER BY (readtimestamp)")
...Omitted code where I transform the data in the value column to strongly typed data...
myTypedDF.writeStream \
.foreachBatch(run_command) \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
.partitionBy("somecolumn") \
.toTable("myunitycatalog.bronze.mytable")
1条答案
按热度按时间6yoyoihd1#
您可以执行
foreachBatch
或toTable
,但不能两者都执行。您可以在foreachBatch函数内部将写入移动到表-只需确保执行幂等写入,因为批处理可能会重新启动。请将代码更改为: