我的cassandraFlume配置如下:
ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
.withPort(props.getCassandraPort())
.withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
.build();
};
CassandraSink
.addSink(cassandraObjectStream)
.setClusterBuilder(secureCassandraSinkClusterBuilder)
.build()
.name("Cassandra-Sink");
现在,当与cassandra的连接配置不正确时,我会得到一个nohostavailableexception,或者当连接意外断开时,我会得到一个connectiontimeoutexception,或者有时是writetimeoutexception。这最终会触发jobexecutionexception,整个flink作业将终止。
我在哪里发现这些cassandra异常?这些扔在哪里?我试着在Cassandra辛周围放一个试抓块,但没用。我想捕获这些异常,并在连接超时时重试连接到cassandra,或者在写入超时时重试写入cassandra。
1条答案
按热度按时间mbyulnm01#
好吧,你不能用
CassandraSink
.捕捉timeoutexception等异常的一种方法是为cassandra实现自己的接收器,但这可能需要很多时间。。。
另一种方法是,如果运行流式处理作业,可以将任务重试次数设置为1到2次以上
StreamingExecutionEnvironment.setRestartStrategy
,并启用检查点,以便流作业可以基于上一个检查点继续工作。CassandraSink
支持沃尔,所以EXACTLY_ONCE
可以在启用检查点的情况下实现。