我在跟进一个例子https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/cassandra.html 把Cassandra和Flink联系起来
我的代码如下所示
public class writeToCassandra {
private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE test WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
private static final String createTable = "CREATE TABLE test.cassandraData(id varchar, heart_rate varchar, PRIMARY KEY(id));" ;
private final static Collection<String> collection = new ArrayList<>(50);
static {
for (int i = 1; i <= 50; ++i) {
collection.add("element " + i);
}
}
public static void main(String[] args) throws Exception {
//setting the env variable to local
StreamExecutionEnvironment envrionment = StreamExecutionEnvironment.createLocalEnvironment(1);
DataStream<Tuple2<String, String>> dataStream = envrionment
.fromCollection(collection)
.map(new MapFunction<String, Tuple2<String, String>>() {
final String mapped = " mapped ";
String[] splitted;
@Override
public Tuple2<String, String> map(String s) throws Exception {
splitted = s.split("\\s+");
return Tuple2.of(
UUID.randomUUID().toString(),
splitted[0] + mapped + splitted[1]
);
}
});
CassandraSink.addSink(dataStream)
.setQuery("INSERT INTO test.cassandraData(id,heart_rate) values (?,?);")
.setHost("127.0.0.1")
.build();
envrionment.execute();
} //main
} //writeToCassandra
我得到以下错误
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
2条答案
按热度按时间9q78igpj1#
异常只是表示示例程序无法访问c数据库。
flink cassandra连接器提供流式api连接到指定的c数据库。因此,您需要运行一个c示例。
每个流作业都被推送到/序列化到任务管理器运行的节点。在您的示例中,假设c与tm节点运行在同一个节点上。另一种方法是将c*地址从127.0.0.1更改为公共地址。
kxe2p93d2#
不确定这是否总是必需的,但我设置cassandrasink的方式如下:
我已经对数据流返回的pojo进行了注解,因此我不需要查询,但您只需要在“.addsink(…)”行后面包含“.setquery(…)”。