scala 将数据从增量表写入事件中心的流式作业在数据块中失败-超时问题

wgx48brx  于 2023-10-18  发布在  Scala
关注(0)|答案(1)|浏览(144)

下面是我的代码,用于将数据从delta表写入事件中心(消费者团队将从这里消费数据):

import org.apache.spark.eventhubs._  
    import org.apache.spark.sql.streaming.Trigger._  
    import org.apache.spark.sql.types._  
    import org.apache.spark.sql.functions._  
    import java.util.Properties  
    import com.microsoft.azure.eventhubs.{ EventData, PartitionSender }  
    import org.apache.spark.eventhubs.EventHubsConf  
    import io.delta.tables._  
    import org.apache.spark.sql.streaming.Trigger  
    import java.io.PrintWriter  
    import java.time.ZonedDateTime  
    import java.time.format.DateTimeFormatter  
    import scala.concurrent.duration._  
    import java.nio.file.{Paths, Files}    

    // Configure Azure Event Hub details  
    val namespaceNameOut = "des-ent-prod-cus-stream-eventhub-001"  
    val eventHubNameOut = "hvc-prodstats-output"  
    val sasKeyNameOut = "writer"  
    val sasKeyOut = dbutils.secrets.get(scope="deskvscope", key="des-ent-prod-cus-stream-e venthub-001-writer")  

    // Configure checkpoint and bad data paths
    val checkpoint_dir_path = "/mnt/hvc-wenco/prodstats/stream/checkpoints"
    val baddata_path = "/mnt/hvc-wenco/prodstats/stream/Bad_data"

    // Define timestamp checkpoint path  
    val tbl_version_timestamp_path = "/mnt/hvc-  wenco/equip_status_history_table/checkpoints/checkpoint"  

    // Configure other parameters  
    val MaxEvents = 5000  

    // Read the last checkpoint timestamp  
    val last_checkpoint_string = dbutils.fs.head(tbl_version_timestamp_path)  

    // Parse last checkpoint timestamp  
    val time_format = "yyyy-MM-dd HH:mm:ss.SSSz"  
    val formatter = DateTimeFormatter.ofPattern(time_format)  
    val last_checkpoint = ZonedDateTime.parse(last_checkpoint_string, formatter)  

    // Build connection to Event Hub  
    val connStrOut = new com.microsoft.azure.eventhubs.ConnectionStringBuilder()  
            .setNamespaceName(namespaceNameOut)  
            .setEventHubName(eventHubNameOut)  
            .setSasKeyName(sasKeyNameOut)  
            .setSasKey(sasKeyOut)  
    val ehWriteConf = EventHubsConf(connStrOut.toString())  

    // Create a streaming dataframe from the Delta table  
    val InputStreamingDF =   
      spark  
        .readStream  
        .option("maxFilesPerTrigger", 1)  
        .option("startingTimestamp", last_checkpoint_string)  
        .option("readChangeFeed", "true")
        .table("wencohvc.equip_status_history_table")  
    val dropPreTransform = InputStreamingDF.filter(InputStreamingDF("_change_type") =!= "update_preimage")  
    val operationTransform = dropPreTransform.withColumn("operation", when($"_change_type" === "insert", 2).otherwise(when($"_change_type" === "update_postimage", 4)))  

    val transformedDF = operationTransform.withColumn("DeletedIndicator", when($"_change_type" === "delete", "Y").otherwise("N"))  
    val finalDF = transformedDF.drop("_change_type", "_commit_version", "_commit_timestamp")  

    // Write to Event Hubs with retry and checkpointing  
    var retry = true  
    var retryCount = 0  
    val maxRetries = 3  

    while (retry && retryCount < maxRetries) {  
      try {  
        val stream = finalDF  
          .select(to_json(struct(/* column list */)).alias("body"))  
          .writeStream  
          .format("eventhubs")  
          .options(ehWriteConf.toMap)  
          .option("checkpointLocation", checkpoint_dir_path)  
          .trigger(Trigger.AvailableNow)  
          .start()  

        stream.awaitTermination()  
        retry = false  
      } catch {  
        case e: Exception =>  
          retryCount += 1  
          if (retryCount < maxRetries) {  
            val delay = 2.seconds * retryCount  
            println(s"Stream attempt $retryCount failed, retrying in ${delay.toSeconds} seconds...")  
            Thread.sleep(delay.toMillis)  
          }  
      }  
    }  

    // Write checkpoint  
    val emptyDF = Seq((1)).toDF("seq")  
    val checkpoint_timestamp = emptyDF.withColumn("current_timestamp", current_timestamp()).first().getTimestamp(1) + "+00:00"  
    dbutils.fs.put(tbl_version_timestamp_path, checkpoint_timestamp.toString(), true)

问题是在最后一个命令之前超时,而最后一个检查点命令未运行。我也试过重试机制,但仍然超时。数据量是巨大的,从源,我不想流重复的数据,通过运行笔记本电脑一次又一次,如果适当的检查点是不是发生。这个问题怎么解决??我希望作业运行并正确存储最后一个检查点时间戳,因此它在下一次运行时从那里开始,但它在最后一个命令之前超时。最后一个命令被跳过一个结果。我得到的错误是:

ERROR: Some streams terminated before this command could finish!
Stream attempt 1 failed, retrying in 2 seconds...
Stream attempt 2 failed, retrying in 4 seconds...
retry: Boolean = true
retryCount: Int = 3
maxRetries: Int = 3
ERROR: Some streams terminated before this command could finish!
Command took 0.04 seconds
xghobddn

xghobddn1#

这可能与EventHubs不支持可扩展的Now触发器有关(参见this issue)。解决方法是使用内置在Databricks中的Kafka连接器-它可以与EventHubs标准层(和更高层)一起使用-您只需要提供正确的选项,如本答案所述。

相关问题