我是flink的新手,需要通过flink将json数据从kafka流到mysql表。无法在外部mysql示例中传输数据。这是我试过的。任何帮助都将不胜感激。该字段包括timestamp:string、merchantid:string、cid:string、uid:string、sessionid:string、event:string、eventtype:string、ip:string、refurl:string、referer:string
val env = StreamExecutionEnvironment.createLocalEnvironment()
val tEnv = TableEnvironment.getTableEnvironment(env)
val prop = new Properties()
prop.setProperty("zookeeper.connect","")
prop.setProperty("bootstrap.servers", "localhost:9092")
prop.setProperty("group.id", "test")
prop.setProperty("auto.offset.reset", "earliest")
val output: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("TQBQTEST_USER_AUG_2018", new serialization.SimpleStringSchema(), prop))
val finalTable = tEnv.registerDataStream("User",output)
env.execute()
1条答案
按热度按时间kwvwclae1#
有两点:
(1) 您可能需要升级到flink 1.6,因为在这个版本中,对将flink的表/SQLAPI连接到外部系统的支持得到了显著增强。
(2) 如果您使用适当的kafka表源和jdbc表接收器,您将有一个更轻松的时间。您可以在这里找到版本1.6的文档,在这里找到版本1.5的文档。