我的flink程序应该做一个cassandra查找每个输入记录,并根据结果,应该做一些进一步的处理。
但我现在只能从Cassandra那里读取数据。这是我到目前为止提出的代码片段。
ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
.withPort(props.getCassandraPort())
.withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
.withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
.build();
}
};
for (int i=1; i<5; i++) {
CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat =
new CassandraInputFormat<>("select * from test where id=hello" + i, secureCassandraSinkClusterBuilder);
cassandraInputFormat.configure(null);
cassandraInputFormat.open(null);
Tuple2<String, String> out = new Tuple8<>();
cassandraInputFormat.nextRecord(out);
System.out.println(out);
}
但问题是,每次查找都要花费近10秒,换句话说,这 for
执行循环需要50秒。
如何加快这项行动?或者,有没有其他方法可以在Flink找到Cassandra?
1条答案
按热度按时间3hvapo4f1#
我提出了一个解决方案,它在用流数据查询cassandra时相当快。会对有同样问题的人有用。
首先,cassandra可以用尽可能少的代码进行查询,
但问题是
Session
这是一个非常耗时的操作,每个密钥空间应该执行一次。你创造了Session
一次,并将其用于所有读取查询。现在,自从
Session
不是java可序列化的,它不能作为参数传递给flink运算符,如Map
或者ProcessFunction
. 有几种方法可以解决这个问题,您可以使用richfunction并在其Open
方法,或使用单例。我将使用第二种解决方案。创建一个singleton类,如下所示
Session
.然后,您可以将此会话用于将来的所有查询。我用的是
ProcessFunction
以查询为例。注意你可以通过
ClusterBuilder
至ProcessFunction
因为它是可序列化的。现在是cassandraLookUp
方法执行查询。只有在第一次运行查询时才创建singleton对象,然后重复使用同一个对象,因此不会延迟查找。