在apache flink中对两个消息流使用相同的接收器

oprakyz7  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(479)

Flink收到两种信息
控制信息->仅滚动文件
数据消息->将使用sink存储在s3中
我们为这两条消息提供了单独的源流,并将同一个接收器连接到这两条流。我们要做的是广播控制消息,以便所有并行运行的接收器都能接收到它。
以下是相同的代码:

package com.ranjit.com.flinkdemo;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.DateTimeBucketer;
import org.apache.flink.streaming.connectors.fs.RollingSink;

import org.apache.flink.streaming.connectors.fs.StringWriter;;

public class FlinkBroadcast {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStream<String> ctrl_message_stream = env.socketTextStream("localhost", 8088);

        ctrl_message_stream.broadcast();

        DataStream<String> message_stream = env.socketTextStream("localhost", 8087);

        RollingSink sink = new RollingSink<String>("/base/path");
        sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
        sink.setWriter(new StringWriter<String>() );
        sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,

        ctrl_message_stream.broadcast().addSink(sink);
        message_stream.addSink(sink);

        env.execute("stream");
    }

}

但我观察到的是,它正在创建4个接收器示例,而控制消息只被广播到2个接收器(由控制消息流创建)。所以我的理解是,两个流应该通过同一个操作符链来完成这一点,我们不希望这样做,因为数据消息上会有多个转换。我们已经编写了自己的接收器,它将读取消息,如果它是控制消息,那么它将只滚动文件。
示例代码:

package com.gslab.com.dataSets;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkBroadcast {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        List<String> controlMessageList = new ArrayList<String>();
        controlMessageList.add("controlMessage1");
        controlMessageList.add("controlMessage2");

        List<String> dataMessageList = new ArrayList<String>();
        dataMessageList.add("Person1");
        dataMessageList.add("Person2");
        dataMessageList.add("Person3");
        dataMessageList.add("Person4");

        DataStream<String> controlMessageStream  = env.fromCollection(controlMessageList);
        DataStream<String> dataMessageStream  = env.fromCollection(dataMessageList);

        DataStream<GenericRecord> controlMessageGenericRecordStream = controlMessageStream.map(new MapFunction<String, GenericRecord>() {
            @Override
            public GenericRecord map(String value) throws Exception {
                 Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/controlMessageSchema.avsc")));
                 gr.put("TYPE", value);
                 return gr;
            }
        });

        DataStream<GenericRecord> dataMessageGenericRecordStream = dataMessageStream.map(new MapFunction<String, GenericRecord>() {
            @Override
            public GenericRecord map(String value) throws Exception {
                 Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/dataMessageSchema.avsc")));
                 gr.put("FIRSTNAME", value);
                 gr.put("LASTNAME", value+": lastname");
                 return gr;
            }
        });

        //Displaying Generic records
        dataMessageGenericRecordStream.map(new MapFunction<GenericRecord, GenericRecord>() {
            @Override
            public GenericRecord map(GenericRecord value) throws Exception {
                System.out.println("data before union: "+ value);
                return value;
            }
        });

        controlMessageGenericRecordStream.broadcast().union(dataMessageGenericRecordStream).map(new MapFunction<GenericRecord, GenericRecord>() {
            @Override
            public GenericRecord map(GenericRecord value) throws Exception {
                System.out.println("data after union: " + value);
                return value;
            }
        });
        env.execute("stream");
    }
}

输出:

05/09/2016 13:02:12 Source: Collection Source(1/1) switched to FINISHED 
05/09/2016 13:02:12 Source: Collection Source(1/1) switched to FINISHED 
05/09/2016 13:02:13 Map(1/2) switched to FINISHED 
05/09/2016 13:02:13 Map(2/2) switched to FINISHED 
data after union: {"TYPE": "controlMessage1"}
data before union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2: lastname"}
data after union: {"TYPE": "controlMessage1"}
data before union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1: lastname"}
data after union: {"TYPE": "controlMessage2"}
data after union: {"TYPE": "controlMessage2"}
data after union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1"}
data before union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4: lastname"}
data before union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3: lastname"}
data after union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2"}
data after union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3"}
05/09/2016 13:02:13 Map -> Map(2/2) switched to FINISHED 
data after union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4"}
05/09/2016 13:02:13 Map -> Map(1/2) switched to FINISHED 
05/09/2016 13:02:13 Map(1/2) switched to FINISHED 
05/09/2016 13:02:13 Map(2/2) switched to FINISHED 
05/09/2016 13:02:13 Job execution switched to status FINISHED.

正如我们所看到的,lastname值是不正确的,它将被每个记录的firstname值替换

wi3ka0sx

wi3ka0sx1#

您的代码基本上用您定义的接收器的自己的副本来终止这两个流。你想要的是这样的:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

DataStream<String> ctrl_message_stream = env.socketTextStream("localhost", 8088);

DataStream<String> message_stream = env.socketTextStream("localhost", 8087);

RollingSink sink = new RollingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
sink.setWriter(new StringWriter<String>() );
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,

ctrl_message_stream.broadcast().union(message_stream).addSink(sink);

env.execute("stream");

相关问题