我使用下面的代码将数据流从vm发送到kafka的测试主题(在192.168.0.12ip的主机操作系统上运行)
public class WriteToKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Use ingestion time => TimeCharacteristic == EventTime + IngestionTimeExtractor
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<JoinedStreamEvent> joinedStreamEventDataStream = env
.addSource(new JoinedStreamGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.0.12:9092");
properties.setProperty("zookeeper.connect", "192.168.0.12:2181");
properties.setProperty("group.id", "test");
DataStreamSource<JoinedStreamEvent> stream = env.addSource(new JoinedStreamGenerator());
stream.addSink(new FlinkKafkaProducer09<JoinedStreamEvent>("test", new TypeInformationSerializationSchema<>(stream.getType(),env.getConfig()), properties));
env.execute();
}
``` `JoinedStreamEvent` 属于类型 `DataSream<Tuple3<Integer,Integer,Integer>>` 它基本上连接了两条流 `respirationRateStream` 以及 `heartRateStream` ```
public JoinedStreamEvent(Integer patient_id, Integer heartRate, Integer respirationRate) {
Patient_id = patient_id;
HeartRate = heartRate;
RespirationRate = respirationRate;
另一个flink程序正在主机操作系统上运行,试图从kafka读取数据流。我在这里使用localhost,因为kafka和zookeper在主机操作系统上运行。
public class ReadFromKafka {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> message = env.addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(), properties));
/* DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
new , properties));*/
message.print();
env.execute();
} //main
} //ReadFromKafka
我得到这样的输出
我想我需要实现类型为的反序列化程序 JoinedStreamEvent
. 有人能给我一个想法,我应该如何写,反序列化为 JoinedStreamEvent
类型 DataSream<Tuple3<Integer, Integer, Integer>>
如果需要做别的事,请告诉我。
p、 我想写以下反序列化程序,但我不认为这是正确的
DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
new TypeInformationSerializationSchema<JoinedStreamEvent>() , properties));
1条答案
按热度按时间rjzwgtxy1#
通过为vm和主机操作系统程序编写一个定制的序列化程序和反序列化程序,我能够以相同的格式接收事件vm,如下所述
请注意,您可能需要在事件类型方法中编写fromstring()方法,正如我在下面添加的fromstring()joinedstreamevent类一样
使用以下代码从vm发送事件
使用以下代码接收事件