无法理解TwoPhaseCommitinkFunction生命周期

dl5txlt9  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(325)

我需要一个接收器到postgres数据库,所以我开始构建一个自定义的flink函数。因为flinkkafkaproducer实现了TwoPhaseCommitinkFunction,所以我决定也这么做。正如o'reilley的书《用ApacheFlink进行流处理》中所述,您只需要实现抽象方法,启用检查点,就可以开始了。但当我运行代码时真正发生的是 commit 方法只被调用一次,并且在 invoke ,这是完全出乎意料的,因为如果准备提交事务集为空,则不应该准备提交。最糟糕的是,在犯了罪之后, invoke 为我的文件中存在的所有事务行调用,然后 abort 被称为,这更是出乎意料。
当接收器初始化时,我的理解是应该发生以下情况:
begintransaction被调用并发送一个标识符来调用
invoke根据接收到的标识符将行添加到事务中
预提交对当前事务数据进行所有最终修改
提交处理预提交数据的最终事务
所以,我不明白为什么我的程序没有显示这种行为。
这是我的Flume代码:

package PostgresConnector

import java.sql.{BatchUpdateException, DriverManager, PreparedStatement, SQLException, Timestamp}
import java.text.ParseException
import java.util.{Date, Properties, UUID}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{SinkFunction, TwoPhaseCommitSinkFunction}
import org.apache.flink.streaming.api.scala._
import org.slf4j.{Logger, LoggerFactory}

class PostgreSink(props : Properties, config : ExecutionConfig) extends TwoPhaseCommitSinkFunction[(String,String,String,String),String,String](createTypeInformation[String].createSerializer(config),createTypeInformation[String].createSerializer(config)){

    private var transactionMap : Map[String,Array[(String,String,String,String)]] = Map()

    private var parsedQuery : PreparedStatement = _

    private val insertionString : String = "INSERT INTO mydb (field1,field2,point) values (?,?,point(?,?))"

    override def invoke(transaction: String, value: (String,String,String,String), context: SinkFunction.Context[_]): Unit = {

        val LOG = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])

        val res = this.transactionMap.get(transaction)

        if(res.isDefined){

            var array = res.get

            array = array ++ Array(value)

            this.transactionMap += (transaction -> array)

        }else{

            val array = Array(value)

            this.transactionMap += (transaction -> array)

        }

        LOG.info("\n\nPassing through invoke\n\n")

        ()

    }

    override def beginTransaction(): String = {

        val LOG: Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])

        val identifier = UUID.randomUUID.toString

        LOG.info("\n\nPassing through beginTransaction\n\n")

        identifier

    }

    override def preCommit(transaction: String): Unit = {

        val LOG = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])

        try{

            val tuple : Option[Array[(String,String,String,String)]]= this.transactionMap.get(transaction)

            if(tuple.isDefined){

                tuple.get.foreach( (value : (String,String,String,String)) => {

                    LOG.info("\n\n"+value.toString()+"\n\n")

                    this.parsedQuery.setString(1,value._1)
                    this.parsedQuery.setString(2,value._2)
                    this.parsedQuery.setString(3,value._3)
                    this.parsedQuery.setString(4,value._4)
                    this.parsedQuery.addBatch()

                })

            }

        }catch{

            case e : SQLException =>
                LOG.info("\n\nError when adding transaction to batch: SQLException\n\n")

            case f : ParseException =>
                LOG.info("\n\nError when adding transaction to batch: ParseException\n\n")

            case g : NoSuchElementException =>
                LOG.info("\n\nError when adding transaction to batch: NoSuchElementException\n\n")

            case h : Exception =>
                LOG.info("\n\nError when adding transaction to batch: Exception\n\n")

        }

        this.transactionMap = this.transactionMap.empty

        LOG.info("\n\nPassing through preCommit...\n\n")
    }

    override def commit(transaction: String): Unit = {

        val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])

        if(this.parsedQuery != null) {
            LOG.info("\n\n" + this.parsedQuery.toString+ "\n\n")
        }

        try{

            this.parsedQuery.executeBatch
            val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
            LOG.info("\n\nExecuting batch\n\n")

        }catch{

            case e : SQLException =>
                val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
                LOG.info("\n\n"+"Error : SQLException"+"\n\n")

        }

        this.transactionMap = this.transactionMap.empty

        LOG.info("\n\nPassing through commit...\n\n")

    }

    override def abort(transaction: String): Unit = {

        val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])

        this.transactionMap = this.transactionMap.empty

        LOG.info("\n\nPassing through abort...\n\n")

    }

    override def open(parameters: Configuration): Unit = {

        val LOG: Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])

        val driver = props.getProperty("driver")
        val url = props.getProperty("url")
        val user = props.getProperty("user")
        val password = props.getProperty("password")
        Class.forName(driver)
        val connection = DriverManager.getConnection(url + "?user=" + user + "&password=" + password)
        this.parsedQuery = connection.prepareStatement(insertionString)

        LOG.info("\n\nConfiguring BD conection parameters\n\n")
    }
}

这是我的主要计划:

package FlinkCEPClasses

import PostgresConnector.PostgreSink
import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.pattern.conditions.SimpleCondition
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import java.util.Properties

import org.apache.flink.api.common.ExecutionConfig
import org.slf4j.{Logger, LoggerFactory}

class FlinkCEPPipeline {

  val LOG: Logger = LoggerFactory.getLogger(classOf[FlinkCEPPipeline])
  LOG.info("\n\nStarting the pipeline...\n\n")

  var env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  env.enableCheckpointing(10)
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
  env.setParallelism(1)

  //var input : DataStream[String] = env.readFile(new TextInputFormat(new Path("/home/luca/Desktop/lines")),"/home/luca/Desktop/lines",FileProcessingMode.PROCESS_CONTINUOUSLY,1)

  var input : DataStream[String] = env.readTextFile("/home/luca/Desktop/lines").name("Raw stream")

  var tupleStream : DataStream[(String,String,String,String)] = input.map(new S2PMapFunction()).name("Tuple Stream")

  var properties : Properties = new Properties()

  properties.setProperty("driver","org.postgresql.Driver")
  properties.setProperty("url","jdbc:postgresql://localhost:5432/mydb")
  properties.setProperty("user","luca")
  properties.setProperty("password","root")

  tupleStream.addSink(new PostgreSink(properties,env.getConfig)).name("Postgres Sink").setParallelism(1)
  tupleStream.writeAsText("/home/luca/Desktop/output",FileSystem.WriteMode.OVERWRITE).name("File Sink").setParallelism(1)

  env.execute()

}

我的S2PMAP功能代码:

package FlinkCEPClasses

import org.apache.flink.api.common.functions.MapFunction

case class S2PMapFunction() extends MapFunction[String,(String,String,String,String)] {

    override def map(value: String): (String, String, String,String) = {

            var tuple = value.replaceAllLiterally("(","").replaceAllLiterally(")","").split(',')

            (tuple(0),tuple(1),tuple(2),tuple(3))

    }
}

我的管道是这样工作的:我从文件中读取行,将它们Map到一个字符串元组,然后使用元组中的数据将它们保存在postgres数据库中
如果要模拟数据,只需创建一个包含以下格式行的文件: (field1,field2,pointx,pointy) ####编辑
TwoPhaseCommitinkFunction方法的执行顺序如下:

Starting pipeline...
beginTransaction
preCommit
beginTransaction
commit
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
abort
o2g1uqev

o2g1uqev1#

所以,这个问题的“答案”来了。只是想说清楚:在这个时刻,关于 TwoPhaseCommitSinkFunction 还没有解决。如果你要找的是原来的问题,那么你应该寻找另一个答案。如果你不在乎用什么做Flume,也许我能帮你。
按照@davidanderson的建议,我开始研究表api,看看它是否能解决我的问题,即使用flink在数据库表中插入行。
结果很简单,你会看到的。
注意你正在使用的版本。我的Flink版本是 1.9.0 .
源代码

package FlinkCEPClasses

import java.sql.Timestamp
import java.util.Properties

import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.sinks.TableSink
import org.postgresql.Driver

class TableAPIPipeline {

    // --- normal pipeline initialization in this block ---

    var env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.enableCheckpointing(10)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.setParallelism(1)

    var input : DataStream[String] = env.readTextFile("/home/luca/Desktop/lines").name("Original stream")

    var tupleStream : DataStream[(String,Timestamp,Double,Double)] = input.map(new S2PlacaMapFunction()).name("Tuple Stream")

    var properties : Properties = new Properties()

    properties.setProperty("driver","org.postgresql.Driver")
    properties.setProperty("url","jdbc:postgresql://localhost:5432/mydb")
    properties.setProperty("user","myuser")
    properties.setProperty("password","mypassword")

    // --- normal pipeline initialization in this block END ---

    // These two lines create what Flink calls StreamTableEnvironment. 
    // It seems pretty similar to a normal stream initialization.
    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tableEnv = StreamTableEnvironment.create(env,settings)

    //Since I wanted to sink data into a database, I used JDBC TableSink,
    //because it is very intuitive and is a exact match with my need. You may
    //look for other TableSink classes that fit better in you solution.
    var tableSink : JDBCAppendTableSink = JDBCAppendTableSink.builder()
    .setBatchSize(1)
    .setDBUrl("jdbc:postgresql://localhost:5432/mydb")
    .setDrivername("org.postgresql.Driver")
    .setPassword("mypassword")
    .setUsername("myuser")
    .setQuery("INSERT INTO mytable (data1,data2,data3) VALUES (?,?,point(?,?))")
    .setParameterTypes(Types.STRING,Types.SQL_TIMESTAMP,Types.DOUBLE,Types.DOUBLE)
    .build()

    val fieldNames = Array("data1","data2","data3","data4")
    val fieldTypes = Array[TypeInformation[_]](Types.STRING,Types.SQL_TIMESTAMP,Types.DOUBLE, Types.DOUBLE)

    // This is the crucial part of the code: first, you need to register
    // your table sink, informing the name, the field names, field types and
    // the TableSink object.

    tableEnv.registerTableSink("postgres-table-sink",
        fieldNames,
        fieldTypes,
        tableSink
    )

    // Then, you transform your DataStream into a Table object.
    var table = tableEnv.fromDataStream(tupleStream)

    // Finally, you insert your stream data into the registered sink.
    table.insertInto("postgres-table-sink")

    env.execute()

}
l0oc07j2

l0oc07j22#

我不是这方面的Maven,但有几个猜测: preCommit 每当flink开始检查点时就会调用 commit 在检查点完成时调用。调用这些方法的原因很简单,因为检查点正在发生,而不管接收器是否接收到任何数据。
检查点周期性地发生,而不管是否有任何数据流过管道。考虑到检查点间隔很短(10毫秒),第一个检查点屏障将在源发送任何数据之前到达接收器似乎是合理的。
看起来您还假设一次只打开一个事务。我不确定这是严格保证,但只要 maxConcurrentCheckpoints 如果为1(默认值),则应该没事。

相关问题