下面是我的代码,用于将数据从delta表写入事件中心(消费者团队将从这里消费数据):
import org.apache.spark.eventhubs._
import org.apache.spark.sql.streaming.Trigger._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import java.util.Properties
import com.microsoft.azure.eventhubs.{ EventData, PartitionSender }
import org.apache.spark.eventhubs.EventHubsConf
import io.delta.tables._
import org.apache.spark.sql.streaming.Trigger
import java.io.PrintWriter
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import scala.concurrent.duration._
import java.nio.file.{Paths, Files}
// Configure Azure Event Hub details
val namespaceNameOut = "des-ent-prod-cus-stream-eventhub-001"
val eventHubNameOut = "hvc-prodstats-output"
val sasKeyNameOut = "writer"
val sasKeyOut = dbutils.secrets.get(scope="deskvscope", key="des-ent-prod-cus-stream-e venthub-001-writer")
// Configure checkpoint and bad data paths
val checkpoint_dir_path = "/mnt/hvc-wenco/prodstats/stream/checkpoints"
val baddata_path = "/mnt/hvc-wenco/prodstats/stream/Bad_data"
// Define timestamp checkpoint path
val tbl_version_timestamp_path = "/mnt/hvc- wenco/equip_status_history_table/checkpoints/checkpoint"
// Configure other parameters
val MaxEvents = 5000
// Read the last checkpoint timestamp
val last_checkpoint_string = dbutils.fs.head(tbl_version_timestamp_path)
// Parse last checkpoint timestamp
val time_format = "yyyy-MM-dd HH:mm:ss.SSSz"
val formatter = DateTimeFormatter.ofPattern(time_format)
val last_checkpoint = ZonedDateTime.parse(last_checkpoint_string, formatter)
// Build connection to Event Hub
val connStrOut = new com.microsoft.azure.eventhubs.ConnectionStringBuilder()
.setNamespaceName(namespaceNameOut)
.setEventHubName(eventHubNameOut)
.setSasKeyName(sasKeyNameOut)
.setSasKey(sasKeyOut)
val ehWriteConf = EventHubsConf(connStrOut.toString())
// Create a streaming dataframe from the Delta table
val InputStreamingDF =
spark
.readStream
.option("maxFilesPerTrigger", 1)
.option("startingTimestamp", last_checkpoint_string)
.option("readChangeFeed", "true")
.table("wencohvc.equip_status_history_table")
val dropPreTransform = InputStreamingDF.filter(InputStreamingDF("_change_type") =!= "update_preimage")
val operationTransform = dropPreTransform.withColumn("operation", when($"_change_type" === "insert", 2).otherwise(when($"_change_type" === "update_postimage", 4)))
val transformedDF = operationTransform.withColumn("DeletedIndicator", when($"_change_type" === "delete", "Y").otherwise("N"))
val finalDF = transformedDF.drop("_change_type", "_commit_version", "_commit_timestamp")
// Write to Event Hubs with retry and checkpointing
var retry = true
var retryCount = 0
val maxRetries = 3
while (retry && retryCount < maxRetries) {
try {
val stream = finalDF
.select(to_json(struct(/* column list */)).alias("body"))
.writeStream
.format("eventhubs")
.options(ehWriteConf.toMap)
.option("checkpointLocation", checkpoint_dir_path)
.trigger(Trigger.AvailableNow)
.start()
stream.awaitTermination()
retry = false
} catch {
case e: Exception =>
retryCount += 1
if (retryCount < maxRetries) {
val delay = 2.seconds * retryCount
println(s"Stream attempt $retryCount failed, retrying in ${delay.toSeconds} seconds...")
Thread.sleep(delay.toMillis)
}
}
}
// Write checkpoint
val emptyDF = Seq((1)).toDF("seq")
val checkpoint_timestamp = emptyDF.withColumn("current_timestamp", current_timestamp()).first().getTimestamp(1) + "+00:00"
dbutils.fs.put(tbl_version_timestamp_path, checkpoint_timestamp.toString(), true)
问题是在最后一个命令之前超时,而最后一个检查点命令未运行。我也试过重试机制,但仍然超时。数据量是巨大的,从源,我不想流重复的数据,通过运行笔记本电脑一次又一次,如果适当的检查点是不是发生。这个问题怎么解决??我希望作业运行并正确存储最后一个检查点时间戳,因此它在下一次运行时从那里开始,但它在最后一个命令之前超时。最后一个命令被跳过一个结果。我得到的错误是:
ERROR: Some streams terminated before this command could finish!
Stream attempt 1 failed, retrying in 2 seconds...
Stream attempt 2 failed, retrying in 4 seconds...
retry: Boolean = true
retryCount: Int = 3
maxRetries: Int = 3
ERROR: Some streams terminated before this command could finish!
Command took 0.04 seconds
1条答案
按热度按时间xghobddn1#
这可能与EventHubs不支持可扩展的Now触发器有关(参见this issue)。解决方法是使用内置在Databricks中的Kafka连接器-它可以与EventHubs标准层(和更高层)一起使用-您只需要提供正确的选项,如本答案所述。