从vm套接字向kafka发送数据流并在主机操作系统的flink程序上接收:反序列化问题

gg58donl  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(262)

我使用下面的代码将数据流从vm发送到kafka的测试主题(在192.168.0.12ip的主机操作系统上运行)

public class WriteToKafka {

    public  static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Use ingestion time => TimeCharacteristic == EventTime + IngestionTimeExtractor
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream<JoinedStreamEvent> joinedStreamEventDataStream = env
                .addSource(new JoinedStreamGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());

        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers", "192.168.0.12:9092");
        properties.setProperty("zookeeper.connect", "192.168.0.12:2181");
        properties.setProperty("group.id", "test");

        DataStreamSource<JoinedStreamEvent> stream = env.addSource(new JoinedStreamGenerator());
        stream.addSink(new FlinkKafkaProducer09<JoinedStreamEvent>("test", new TypeInformationSerializationSchema<>(stream.getType(),env.getConfig()), properties));

        env.execute();
    }
``` `JoinedStreamEvent` 属于类型 `DataSream<Tuple3<Integer,Integer,Integer>>` 它基本上连接了两条流 `respirationRateStream` 以及 `heartRateStream` ```
public JoinedStreamEvent(Integer patient_id, Integer heartRate, Integer respirationRate) {
        Patient_id = patient_id;
        HeartRate = heartRate;
        RespirationRate = respirationRate;

另一个flink程序正在主机操作系统上运行,试图从kafka读取数据流。我在这里使用localhost,因为kafka和zookeper在主机操作系统上运行。

public class ReadFromKafka {

    public static void main(String[] args) throws Exception {
        // create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test");

       DataStream<String> message = env.addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(), properties));

       /* DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
                new , properties));*/

        message.print();

        env.execute();

    } //main
} //ReadFromKafka

我得到这样的输出

我想我需要实现类型为的反序列化程序 JoinedStreamEvent . 有人能给我一个想法,我应该如何写,反序列化为 JoinedStreamEvent 类型 DataSream<Tuple3<Integer, Integer, Integer>> 如果需要做别的事,请告诉我。
p、 我想写以下反序列化程序,但我不认为这是正确的

DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
                new TypeInformationSerializationSchema<JoinedStreamEvent>() , properties));
rjzwgtxy

rjzwgtxy1#

通过为vm和主机操作系统程序编写一个定制的序列化程序和反序列化程序,我能够以相同的格式接收事件vm,如下所述

public class JoinSchema implements DeserializationSchema<JoinedStreamEvent> , SerializationSchema<JoinedStreamEvent> {

    @Override
    public JoinedStreamEvent deserialize(byte[] bytes) throws IOException {
        return JoinedStreamEvent.fromstring(new String(bytes));
    }

    @Override
    public boolean isEndOfStream(JoinedStreamEvent joinedStreamEvent) {
        return false;
    }

    @Override
    public TypeInformation<JoinedStreamEvent> getProducedType() {
        return TypeExtractor.getForClass(JoinedStreamEvent.class);
    }

    @Override
    public byte[] serialize(JoinedStreamEvent joinedStreamEvent) {
        return joinedStreamEvent.toString().getBytes();
    }
} //JoinSchema

请注意,您可能需要在事件类型方法中编写fromstring()方法,正如我在下面添加的fromstring()joinedstreamevent类一样

public static JoinedStreamEvent fromstring(String line){

        String[] token = line.split(",");

        JoinedStreamEvent joinedStreamEvent = new JoinedStreamEvent();

        Integer val1 = Integer.valueOf(token[0]);
        Integer val2 = Integer.valueOf(token[1]);
        Integer val3 = Integer.valueOf(token[2]);

        return  new JoinedStreamEvent(val1,val2,val3);

    } //fromstring

使用以下代码从vm发送事件

stream.addSink(new FlinkKafkaProducer09<JoinedStreamEvent>("test", new JoinSchema(), properties));

使用以下代码接收事件

public static void main(String[] args) throws Exception {
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("zookeeper.connect", "localhost:2181");
    properties.setProperty("group.id", "test");

    DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
            new JoinSchema(), properties));

    message.print();

    env.execute();

} //main

相关问题