如何使用ApacheFlink阅读cassandra?

50few1ms  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(258)

我的flink程序应该做一个cassandra查找每个输入记录,并根据结果,应该做一些进一步的处理。
但我现在只能从Cassandra那里读取数据。这是我到目前为止提出的代码片段。

ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
        @Override
        protected Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
                    .withPort(props.getCassandraPort())
                    .withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
                    .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
                    .build();
        }
    };

    for (int i=1; i<5; i++) {
        CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat =
                new CassandraInputFormat<>("select * from test where id=hello" + i, secureCassandraSinkClusterBuilder);
        cassandraInputFormat.configure(null);
        cassandraInputFormat.open(null);
        Tuple2<String, String> out = new Tuple8<>();
        cassandraInputFormat.nextRecord(out);
        System.out.println(out);
    }

但问题是,每次查找都要花费近10秒,换句话说,这 for 执行循环需要50秒。
如何加快这项行动?或者,有没有其他方法可以在Flink找到Cassandra?

3hvapo4f

3hvapo4f1#

我提出了一个解决方案,它在用流数据查询cassandra时相当快。会对有同样问题的人有用。
首先,cassandra可以用尽可能少的代码进行查询,

Session session = secureCassandraSinkClusterBuilder.getCluster().connect();
ResultSet resultSet = session.execute("SELECT * FROM TABLE");

但问题是 Session 这是一个非常耗时的操作,每个密钥空间应该执行一次。你创造了 Session 一次,并将其用于所有读取查询。
现在,自从 Session 不是java可序列化的,它不能作为参数传递给flink运算符,如 Map 或者 ProcessFunction . 有几种方法可以解决这个问题,您可以使用richfunction并在其 Open 方法,或使用单例。我将使用第二种解决方案。
创建一个singleton类,如下所示 Session .

public class CassandraSessionSingleton {
    private static CassandraSessionSingleton cassandraSessionSingleton = null;

    public Session session;

    private CassandraSessionSingleton(ClusterBuilder clusterBuilder) {
        Cluster cluster = clusterBuilder.getCluster();
        session = cluster.connect();
    }

    public static CassandraSessionSingleton getInstance(ClusterBuilder clusterBuilder) {
        if (cassandraSessionSingleton == null)
            cassandraSessionSingleton = new CassandraSessionSingleton(clusterBuilder);
        return cassandraSessionSingleton;
    }

}

然后,您可以将此会话用于将来的所有查询。我用的是 ProcessFunction 以查询为例。

public class SomeProcessFunction implements ProcessFunction <Object, ResultSet> {
    ClusterBuilder secureCassandraSinkClusterBuilder;

    // Constructor
    public SomeProcessFunction (ClusterBuilder secureCassandraSinkClusterBuilder) {
        this.secureCassandraSinkClusterBuilder = secureCassandraSinkClusterBuilder;
    }

    @Override
    public void  ProcessElement (Object obj) throws Exception {
        ResultSet resultSet = CassandraLookUp.cassandraLookUp("SELECT * FROM TEST", secureCassandraSinkClusterBuilder);
        return resultSet;
    }
}

注意你可以通过 ClusterBuilderProcessFunction 因为它是可序列化的。现在是 cassandraLookUp 方法执行查询。

public class CassandraLookUp {
    public static ResultSet cassandraLookUp(String query, ClusterBuilder clusterBuilder) {
        CassandraSessionSingleton cassandraSessionSingleton = CassandraSessionSingleton.getInstance(clusterBuilder);
        Session session = cassandraSessionSingleton.session;
        ResultSet resultSet = session.execute(query);
        return resultSet;
    }
}

只有在第一次运行查询时才创建singleton对象,然后重复使用同一个对象,因此不会延迟查找。

相关问题