如何将数据从cassandra加载到ApacheFlink数据流

qmelpv7a  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(364)

尝试使用apache flink从cassandra获取数据,引用本文,我可以读取数据,但不知道如何将其加载到datastream对象中。代码如下:

ClusterBuilder cb = new ClusterBuilder() {
            @Override
            public Cluster buildCluster(Cluster.Builder builder) {
                return builder.addContactPoint("localhost")
                        /*.withCredentials("hduser".trim(), "hadoop".trim())*/
                        .build();
            }
        };
CassandraInputFormat<Tuple2<UUID, String>> cassandraInputFormat = new CassandraInputFormat<Tuple2<UUID, String>>(query, cb);

cassandraInputFormat.configure(null);
cassandraInputFormat.open(null);

Tuple2<UUID, String> testOutputTuple = new Tuple2<>();
ByteArrayOutputStream res = new ByteArrayOutputStream();
res.reset();

while (!cassandraInputFormat.reachedEnd()) {
    cassandraInputFormat.nextRecord(testOutputTuple);
    res.write((testOutputTuple.f0.toString() + "," + testOutputTuple.f1).getBytes());
}
DataStream<byte[]> temp = new DataStream<byte[]>(env, new StreamTransformation<byte[]>(res.toByteArray()));

我试过了

DataStream<byte[]> temp = new DataStream<byte[]>(env, new StreamTransformation<byte[]>(res.toByteArray()));

加载数据 res 变为 DataStream<byte[]> 但这不是正确的方法。我该怎么做?我读Cassandra的方法适合流处理吗?

mw3dktmi

mw3dktmi1#

从数据库读取数据是一项有限的任务。使用cassandrainputformat时,应该使用数据集api,而不是数据流。例如:

DataSet<Tuple2<Long, Date>> ds = env.createInput(executeQuery(YOUR_QUERY), TupleTypeInfo.of(new TypeHint<Tuple2<Long, Date>>() {}));

private static CassandraInputFormat<Tuple2<Long, Date>> executeQuery(String YOUR_QUERY) throws IOException {
    return new CassandraInputFormat<>(YOUR_QUERY, new ClusterBuilder() {
        private static final long serialVersionUID = 1;
            @Override
            protected Cluster buildCluster(com.datastax.driver.core.Cluster.Builder builder) {
                return builder.addContactPoints(CASSANDRA_HOST).build();
            }
        });
    }
}
ctrmrzij

ctrmrzij2#

在flink中创建数据流总是从executionenvironment开始。
而不是:

DataStream<byte[]> temp = new DataStream<byte[]>(env, new StreamTransformation<byte[]>(res.toByteArray()));

尝试:

DataStream<Tuple2<UUID, String>> raw = ExecutionEnvironment.createInput(cassandraInputFormat);

然后可以使用map函数将数据类型更改为datastream
我没有使用Cassandra连接器本身,所以我不知道你是否正确使用该部分。

相关问题