我是Flink的新手,我想把Kafka的流式数据存储到Cassandra。我已经把字符串转换成pojo了。我的工作如下,
@Table(keyspace = "sample", name = "contact")
public class Person implements Serializable {
private static final long serialVersionUID = 1L;
@Column(name = "name")
private String name;
@Column(name = "timeStamp")
private LocalDateTime timeStamp;
我的转变过程如下,
stream.flatMap(new FlatMapFunction<String, Person>() {
public void flatMap(String value, Collector<Person> out) {
try {
out.collect(objectMapper.readValue(value, Person.class));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}).print(); // I need to use proper method to convert to Datastream.
env.execute();
我阅读了以下链接上的文档以供参考,
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/cassandra.html
cassandra接收器接受datastream示例。我需要转换我的转换和存储到Kafka他们。
不能创造CassandrapojoFlume也给了我一些想法。
有方法 .forward()
它回来了 DataStream<Reading> forward
,将示例传递给时,
CassandraSink.addSink(forward)
.setHost("localhost")
.build();
cannot access org.apache.flink.streaming.api.scala.DataStream
如何将我的pojo转换为cassandra中的store?
暂无答案!
目前还没有任何答案,快来回答吧!