当连接到cassandra失败时如何处理异常?

omqzjyyz  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(332)

我的cassandraFlume配置如下:

ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
        @Override
        protected Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
                    .withPort(props.getCassandraPort())
                    .withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
                    .build();
    };

    CassandraSink
            .addSink(cassandraObjectStream)
            .setClusterBuilder(secureCassandraSinkClusterBuilder)
            .build()
            .name("Cassandra-Sink");

现在,当与cassandra的连接配置不正确时,我会得到一个nohostavailableexception,或者当连接意外断开时,我会得到一个connectiontimeoutexception,或者有时是writetimeoutexception。这最终会触发jobexecutionexception,整个flink作业将终止。
我在哪里发现这些cassandra异常?这些扔在哪里?我试着在Cassandra辛周围放一个试抓块,但没用。我想捕获这些异常,并在连接超时时重试连接到cassandra,或者在写入超时时重试写入cassandra。

mbyulnm0

mbyulnm01#

好吧,你不能用 CassandraSink .
捕捉timeoutexception等异常的一种方法是为cassandra实现自己的接收器,但这可能需要很多时间。。。
另一种方法是,如果运行流式处理作业,可以将任务重试次数设置为1到2次以上 StreamingExecutionEnvironment.setRestartStrategy ,并启用检查点,以便流作业可以基于上一个检查点继续工作。 CassandraSink 支持沃尔,所以 EXACTLY_ONCE 可以在启用检查点的情况下实现。

相关问题