使用来自GooglePubSub的消息并将其发布到kafka

bihw5rsg  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(353)

我正在尝试使用同步pullapi使用googlepubsub消息。这在apache beam google pubio连接器库中提供。我想用kafkaio把消耗的信息写给kafka。我想使用flinkrunner来执行作业,因为我们在gcp之外运行这个应用程序。
我面临的问题是,在gcp pubsub中消耗的消息没有得到确认。我已经确认本地kafka示例有从gcp pubsub消费的消息。gcpdataflow中的文档表明,当管道以数据接收器(在我的例子中是kafka)终止时,数据包就完成了。
但是,由于代码是在apache flink中运行的,而不是gcp数据流,因此我认为与确认提交的消息相关的某种回调不会被触发。
我做错什么了?

pipeline
                    .apply("Read  GCP PubSub Messages", PubsubIO.readStrings()
                            .fromSubscription(subscription)
                    )
                    .apply(ParseJsons.of(User.class))
                    .setCoder(SerializableCoder.of(User.class))
                    .apply("Filter-1", ParDo.of(new FilterTextFn()))
                    .apply(AsJsons.of(User.class).withMapper(new ObjectMapper()))
                    .apply("Write to Local Kafka",
                            KafkaIO.<Void,String>write()
                                    .withBootstrapServers("127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094")
                                    .withTopic("test-topic")
                                    .withValueSerializer((StringSerializer.class))
                                    .values()
                    );
jdg4fx2g

jdg4fx2g1#

我解决这个问题的方法是使用纪尧姆·布拉奎尔的方法(https://stackoverflow.com/users/11372593/guillaume-blaquiere)建议查看检查点。即使在管道中添加了window.into()函数,源pubsub订阅终结点也没有收到ack。
问题出在flink服务器配置上,我没有提到检查点配置。如果没有这些参数,将禁用检查点。

state.backend: rocksdb
state.checkpoints.dir: file:///tmp/flink-1.9.3/state/checkpoints/

这些配置应该放在flink\u home/conf/flink-conf.yaml中。在添加这些条目并重新启动flink之后。在gcp pubsub监控图表中,所有积压的(未确认的消息)都变为0。

bxfogqkk

bxfogqkk2#

在pubio子类的beam文档中提到:
检查点用于将接收到的消息确认回pubsub(以便它们可以在pubsub端失效),以及在需要恢复检查点时对已使用的消息进行nack(以便pubsub将立即重新发送这些消息)。
如果ack没有链接到数据流,则应该在数据流上具有相同的行为。ack通过检查点发送。通常检查点是在流上设置的窗口。
但是,你没有设置窗口!默认情况下,窗口是全局的,只有在结束时才关闭,如果你优雅地停止你的工作(甚至,我也不确定这一点)。无论如何,一个更好的解决方案是有固定的窗口(例如5分钟)来确认每个窗口上的消息。

相关问题