apacheflink:将数据流写入postgres表

kmynzznz  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(1018)

我正在尝试编写一个流作业,它将数据流放入postgres表中。为了提供完整的信息,我的工作基于以下文章:https://tech.signavio.com/2017/postgres-flink-sink 建议使用jdbcoutputformat。
我的代码如下所示:

98     ... 
99     String strQuery = "INSERT INTO public.alarm (entity, duration, first, type, windowsize) VALUES (?, ?, ?, 'dur', 6)";
100
101     JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
102      .setDrivername("org.postgresql.Driver")
103      .setDBUrl("jdbc:postgresql://localhost:5432/postgres?user=michel&password=polnareff")
104      .setQuery(strQuery)
105      .setSqlTypes(new int[] { Types.VARCHAR, Types.INTEGER, Types.VARCHAR}) //set the types
106      .finish();
107
108     DataStream<Row> rows = FilterStream
109                 .map((tuple)-> {
110                    Row row = new Row(3);                  // our prepared statement has 3 parameters
111                    row.setField(0, tuple.f0);             // first parameter is case ID
112                    row.setField(1, tuple.f1);             // second paramater is tracehash
113                    row.setField(2, f.format(tuple.f2));   // third paramater is tracehash
114                    return row;
115                 });
116
117     rows.writeUsingOutputFormat(jdbcOutput);
118
119     env.execute();
120
121     }
122 }

我现在的问题是,只有当我的作业停止时(确切地说,当我从apache flink Jmeter 板取消作业时),才会插入值。
所以我的问题是:我错过了什么吗?我应该把插入的行提交到某个地方吗?
祝你好运,伊格纳修斯

polhcujo

polhcujo1#

费边的答案是一种至少实现一次语义的方法;通过将写操作与flink的检查点同步。但是,这样做的缺点是,sink的数据新鲜度现在与检查点间隔周期紧密相关。
另一种方法是,您可以在flink自己的托管状态中存储具有(entity、duration、first)字段的元组或行,以便flink负责检查点(换句话说,使sink的状态具有容错性)。为此,您实现了checkpointedfunction和checkpointedrestoring接口(无需将写操作与检查点同步)。如果不必使用jdbcoutputformat,甚至可以单独执行sql插入。请参见:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-托管操作员状态。另一种解决方案是只实现listcheckpointed接口(可以以与不推荐的checkpointedrestoring接口类似的方式使用,并且支持列表样式的状态重新分布)。

laximzn5

laximzn52#

正如chesnay在评论中所说,您必须调整批处理间隔。
然而,这并不是全部。如果您想获得至少一次结果,就必须将批写入与flink的检查点同步。基本上,你得把衣服包起来 JdbcOutputFormat 在一个 SinkFunction 这也实现了 CheckpointedFunction 接口。当 snapshotState() 调用时,您必须将批处理写入数据库。您可以看看这个pull请求,它将在下一个版本中提供此功能。

相关问题