我可以使用下面的代码片段连接到aws管理的cassandra服务。
CassandraSink.addSink(cassandraEntityStream)
.setClusterBuilder(
new ClusterBuilder() {
private static final long serialVersionUID = 2793938419775311824L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("cassandra.ap-northeast-1.amazonaws.com")
.withPort(9142)
.withSSL()
.withCredentials(
"username",
"password")
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy
.builder()
.withLocalDc("ap-northeast-1")
.build())
//.withQueryOptions(option)
.build();
}
})
.setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})
.build()
.name("Write to Cassandra")
.uid("cassandra_sink");
我在向cassandra写入stream pojo时遇到以下异常。
com.datastax.driver.core.exceptions.invalidqueryexception:此操作不支持一致性级别本地\u one。支持的一致性级别为:本地\u仲裁
我能够在另一个项目中解决这个问题(不使用flink),通过使用下面的代码片段设置consistentylevel=local\u quorum。
QueryOptions option = new QueryOptions();
option.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
final Cluster cluster =
Cluster.builder()
.addContactPoint("cassandra.ap-northeast-1.amazonaws.com")
.withPort(9142)
.withSSL()
.withQueryOptions(option) // NOTE
.withAuthProvider(
new PlainTextAuthProvider(
"username",
"password"))
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy.builder().withLocalDc("ap-northeast-1").build())
.build();
final Session session = cluster.connect("test");
当我在flink中尝试同样的方法时,我得到了以下错误:
线程“main”org.apache.flink.api.common.invalidProgrameException中出现异常:com.datastax.driver.core。queryoptions@130161f7 不可序列化。对象可能包含或引用不可序列化的字段。
有什么,我错过了吗?请详细说明如何使用flink cassandra连接器连接/写入mcs。
附言:
我使用了下面的命令来创建keyspace。
CREATE KEYSPACE "test"
WITH
REPLICATION = {'class': 'SingleRegionStrategy'}
我的代码中没有使用amazonrootca1.pem。
我没有在我的代码或环境中使用cassandra\u truststore.jks。
我已经安装了证书 temp_file.der
证书,它是通过以下步骤创建的。
我使用的是flink 1.8.2,因为这是kinesis数据分析中可用的环境版本。
更新07-04-2020
我可以通过为queryoptions创建一个可序列化的 Package 来修复序列化问题。请在下面查找代码段:
import com.datastax.driver.core.QueryOptions;
import java.io.Serializable;
public class QueryOptionsSerializable extends QueryOptions implements Serializable {
private static final long serialVersionUID = 2793938419775311824L;
}
有了这个解决方案,我可以在代码中将一致性级别设置为local\u quorum,并且可以在没有任何异常的情况下运行。
// Setting consistency level
QueryOptionsSerializable option = new QueryOptionsSerializable();
option.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
CassandraSink.addSink(entityStream)
.setClusterBuilder(
new ClusterBuilder() {
private static final long serialVersionUID = 2793938419775311824L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
Cluster.Builder tempBuilder = builder.addContactPoint(host).withPort(port);
if (isSSLEnabled) {
// enable SSL config if isSSLEnabled flag is ON.
tempBuilder.withSSL();
}
if (username != null && password != null) {
// if username & password is provided, use it for connection.
tempBuilder.withCredentials(username, password);
}
tempBuilder.withQueryOptions(option);
return tempBuilder.build();
}
})
.setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})
.setDefaultKeyspace(keyspace)
.build()
.name("Write to Cassandra")
.uid("cassandra_sink");
但在给mcs写信的时候,我也遇到了同样的错误:
com.datastax.driver.core.exceptions.invalidqueryexception:此操作不支持一致性级别本地\u one。支持的一致性级别为:本地\u仲裁
任何帮助都将不胜感激!
1条答案
按热度按时间pbgvytdp1#
终于明白了。这是关于使用一致性的设置
@Table
注解。下面提供了代码段: