尝试使用apache flink从cassandra获取数据,引用本文,我可以读取数据,但不知道如何将其加载到datastream对象中。代码如下:
ClusterBuilder cb = new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("localhost")
/*.withCredentials("hduser".trim(), "hadoop".trim())*/
.build();
}
};
CassandraInputFormat<Tuple2<UUID, String>> cassandraInputFormat = new CassandraInputFormat<Tuple2<UUID, String>>(query, cb);
cassandraInputFormat.configure(null);
cassandraInputFormat.open(null);
Tuple2<UUID, String> testOutputTuple = new Tuple2<>();
ByteArrayOutputStream res = new ByteArrayOutputStream();
res.reset();
while (!cassandraInputFormat.reachedEnd()) {
cassandraInputFormat.nextRecord(testOutputTuple);
res.write((testOutputTuple.f0.toString() + "," + testOutputTuple.f1).getBytes());
}
DataStream<byte[]> temp = new DataStream<byte[]>(env, new StreamTransformation<byte[]>(res.toByteArray()));
我试过了
DataStream<byte[]> temp = new DataStream<byte[]>(env, new StreamTransformation<byte[]>(res.toByteArray()));
加载数据 res
变为 DataStream<byte[]>
但这不是正确的方法。我该怎么做?我读Cassandra的方法适合流处理吗?
2条答案
按热度按时间mw3dktmi1#
从数据库读取数据是一项有限的任务。使用cassandrainputformat时,应该使用数据集api,而不是数据流。例如:
ctrmrzij2#
在flink中创建数据流总是从executionenvironment开始。
而不是:
尝试:
然后可以使用map函数将数据类型更改为datastream
我没有使用Cassandra连接器本身,所以我不知道你是否正确使用该部分。