我使用impala(jdbc)两次来获取kafka偏移量并将数据保存在foreachrdd中。
但 Impala 和Kudu总是关机。现在我想设置连接池,但对scala来说很少。
这是我的伪代码:
# node-1
val newOffsets = getNewOffset() // JDBC read kafka offset in kudu
val messages = KafkaUtils.createDirectStream(*,newOffsets,)
messages.foreachRDD(rdd => {
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
#node-2
Class.forName(jdbcDriver)
val con = DriverManager.getConnection("impala url")
val stmt = con.createStatement()
stmt.executeUpdate(sql)
#node-3
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach { r => {
val rt_upsert = s"UPSert into ${execTable} values('${r.topic}',${r.partition},${r.untilOffset})"
stmt.executeUpdate(rt_upsert)
stmt.close()
conn.close()
}}
}
如何用c3p0或其它编码?我会感谢你的帮助。
1条答案
按热度按时间yhuiod9q1#
下面是从Kafka读取数据并将数据插入kudu的代码。