我需要一个接收器到postgres数据库,所以我开始构建一个自定义的flink函数。因为flinkkafkaproducer实现了TwoPhaseCommitinkFunction,所以我决定也这么做。正如o'reilley的书《用ApacheFlink进行流处理》中所述,您只需要实现抽象方法,启用检查点,就可以开始了。但当我运行代码时真正发生的是 commit
方法只被调用一次,并且在 invoke
,这是完全出乎意料的,因为如果准备提交事务集为空,则不应该准备提交。最糟糕的是,在犯了罪之后, invoke
为我的文件中存在的所有事务行调用,然后 abort
被称为,这更是出乎意料。
当接收器初始化时,我的理解是应该发生以下情况:
begintransaction被调用并发送一个标识符来调用
invoke根据接收到的标识符将行添加到事务中
预提交对当前事务数据进行所有最终修改
提交处理预提交数据的最终事务
所以,我不明白为什么我的程序没有显示这种行为。
这是我的Flume代码:
package PostgresConnector
import java.sql.{BatchUpdateException, DriverManager, PreparedStatement, SQLException, Timestamp}
import java.text.ParseException
import java.util.{Date, Properties, UUID}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{SinkFunction, TwoPhaseCommitSinkFunction}
import org.apache.flink.streaming.api.scala._
import org.slf4j.{Logger, LoggerFactory}
class PostgreSink(props : Properties, config : ExecutionConfig) extends TwoPhaseCommitSinkFunction[(String,String,String,String),String,String](createTypeInformation[String].createSerializer(config),createTypeInformation[String].createSerializer(config)){
private var transactionMap : Map[String,Array[(String,String,String,String)]] = Map()
private var parsedQuery : PreparedStatement = _
private val insertionString : String = "INSERT INTO mydb (field1,field2,point) values (?,?,point(?,?))"
override def invoke(transaction: String, value: (String,String,String,String), context: SinkFunction.Context[_]): Unit = {
val LOG = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
val res = this.transactionMap.get(transaction)
if(res.isDefined){
var array = res.get
array = array ++ Array(value)
this.transactionMap += (transaction -> array)
}else{
val array = Array(value)
this.transactionMap += (transaction -> array)
}
LOG.info("\n\nPassing through invoke\n\n")
()
}
override def beginTransaction(): String = {
val LOG: Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
val identifier = UUID.randomUUID.toString
LOG.info("\n\nPassing through beginTransaction\n\n")
identifier
}
override def preCommit(transaction: String): Unit = {
val LOG = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
try{
val tuple : Option[Array[(String,String,String,String)]]= this.transactionMap.get(transaction)
if(tuple.isDefined){
tuple.get.foreach( (value : (String,String,String,String)) => {
LOG.info("\n\n"+value.toString()+"\n\n")
this.parsedQuery.setString(1,value._1)
this.parsedQuery.setString(2,value._2)
this.parsedQuery.setString(3,value._3)
this.parsedQuery.setString(4,value._4)
this.parsedQuery.addBatch()
})
}
}catch{
case e : SQLException =>
LOG.info("\n\nError when adding transaction to batch: SQLException\n\n")
case f : ParseException =>
LOG.info("\n\nError when adding transaction to batch: ParseException\n\n")
case g : NoSuchElementException =>
LOG.info("\n\nError when adding transaction to batch: NoSuchElementException\n\n")
case h : Exception =>
LOG.info("\n\nError when adding transaction to batch: Exception\n\n")
}
this.transactionMap = this.transactionMap.empty
LOG.info("\n\nPassing through preCommit...\n\n")
}
override def commit(transaction: String): Unit = {
val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
if(this.parsedQuery != null) {
LOG.info("\n\n" + this.parsedQuery.toString+ "\n\n")
}
try{
this.parsedQuery.executeBatch
val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
LOG.info("\n\nExecuting batch\n\n")
}catch{
case e : SQLException =>
val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
LOG.info("\n\n"+"Error : SQLException"+"\n\n")
}
this.transactionMap = this.transactionMap.empty
LOG.info("\n\nPassing through commit...\n\n")
}
override def abort(transaction: String): Unit = {
val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
this.transactionMap = this.transactionMap.empty
LOG.info("\n\nPassing through abort...\n\n")
}
override def open(parameters: Configuration): Unit = {
val LOG: Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
val driver = props.getProperty("driver")
val url = props.getProperty("url")
val user = props.getProperty("user")
val password = props.getProperty("password")
Class.forName(driver)
val connection = DriverManager.getConnection(url + "?user=" + user + "&password=" + password)
this.parsedQuery = connection.prepareStatement(insertionString)
LOG.info("\n\nConfiguring BD conection parameters\n\n")
}
}
这是我的主要计划:
package FlinkCEPClasses
import PostgresConnector.PostgreSink
import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.pattern.conditions.SimpleCondition
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import java.util.Properties
import org.apache.flink.api.common.ExecutionConfig
import org.slf4j.{Logger, LoggerFactory}
class FlinkCEPPipeline {
val LOG: Logger = LoggerFactory.getLogger(classOf[FlinkCEPPipeline])
LOG.info("\n\nStarting the pipeline...\n\n")
var env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(10)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.setParallelism(1)
//var input : DataStream[String] = env.readFile(new TextInputFormat(new Path("/home/luca/Desktop/lines")),"/home/luca/Desktop/lines",FileProcessingMode.PROCESS_CONTINUOUSLY,1)
var input : DataStream[String] = env.readTextFile("/home/luca/Desktop/lines").name("Raw stream")
var tupleStream : DataStream[(String,String,String,String)] = input.map(new S2PMapFunction()).name("Tuple Stream")
var properties : Properties = new Properties()
properties.setProperty("driver","org.postgresql.Driver")
properties.setProperty("url","jdbc:postgresql://localhost:5432/mydb")
properties.setProperty("user","luca")
properties.setProperty("password","root")
tupleStream.addSink(new PostgreSink(properties,env.getConfig)).name("Postgres Sink").setParallelism(1)
tupleStream.writeAsText("/home/luca/Desktop/output",FileSystem.WriteMode.OVERWRITE).name("File Sink").setParallelism(1)
env.execute()
}
我的S2PMAP功能代码:
package FlinkCEPClasses
import org.apache.flink.api.common.functions.MapFunction
case class S2PMapFunction() extends MapFunction[String,(String,String,String,String)] {
override def map(value: String): (String, String, String,String) = {
var tuple = value.replaceAllLiterally("(","").replaceAllLiterally(")","").split(',')
(tuple(0),tuple(1),tuple(2),tuple(3))
}
}
我的管道是这样工作的:我从文件中读取行,将它们Map到一个字符串元组,然后使用元组中的数据将它们保存在postgres数据库中
如果要模拟数据,只需创建一个包含以下格式行的文件: (field1,field2,pointx,pointy)
####编辑
TwoPhaseCommitinkFunction方法的执行顺序如下:
Starting pipeline...
beginTransaction
preCommit
beginTransaction
commit
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
abort
2条答案
按热度按时间o2g1uqev1#
所以,这个问题的“答案”来了。只是想说清楚:在这个时刻,关于
TwoPhaseCommitSinkFunction
还没有解决。如果你要找的是原来的问题,那么你应该寻找另一个答案。如果你不在乎用什么做Flume,也许我能帮你。按照@davidanderson的建议,我开始研究表api,看看它是否能解决我的问题,即使用flink在数据库表中插入行。
结果很简单,你会看到的。
注意你正在使用的版本。我的Flink版本是
1.9.0
.源代码
l0oc07j22#
我不是这方面的Maven,但有几个猜测:
preCommit
每当flink开始检查点时就会调用commit
在检查点完成时调用。调用这些方法的原因很简单,因为检查点正在发生,而不管接收器是否接收到任何数据。检查点周期性地发生,而不管是否有任何数据流过管道。考虑到检查点间隔很短(10毫秒),第一个检查点屏障将在源发送任何数据之前到达接收器似乎是合理的。
看起来您还假设一次只打开一个事务。我不确定这是严格保证,但只要
maxConcurrentCheckpoints
如果为1(默认值),则应该没事。