使用flink将pojo保存到cassandra

mpgws1up  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(252)

我是Flink的新手,我想把Kafka的流式数据存储到Cassandra。我已经把字符串转换成pojo了。我的工作如下,

@Table(keyspace = "sample", name = "contact")
public class Person implements Serializable {

    private static final long serialVersionUID = 1L;

    @Column(name = "name")
    private String name;

    @Column(name = "timeStamp")
    private LocalDateTime timeStamp;

我的转变过程如下,

stream.flatMap(new FlatMapFunction<String, Person>() {
            public void flatMap(String value, Collector<Person> out) {
                try {
                    out.collect(objectMapper.readValue(value, Person.class));
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        }).print(); // I need to use proper method to convert to Datastream.
        env.execute();

我阅读了以下链接上的文档以供参考,
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/cassandra.html
cassandra接收器接受datastream示例。我需要转换我的转换和存储到Kafka他们。
不能创造CassandrapojoFlume也给了我一些想法。
有方法 .forward() 它回来了 DataStream<Reading> forward ,将示例传递给时,

CassandraSink.addSink(forward)
                .setHost("localhost")
                .build();
cannot access org.apache.flink.streaming.api.scala.DataStream

如何将我的pojo转换为cassandra中的store?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题