ApacheFlink:如何从另一个流调用一个流

2g32fytz  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(779)

我的场景是,我想基于另一个流输入调用一个流。两种流类型不同。下面是我的示例代码。当从kafka流接收到消息时,我想触发一个流。
当应用程序启动时,我可以从数据库读取数据。然后,我想再次从数据库中获取基于Kafka消息的数据。当我在流中收到kafka消息时,我想再次从db获取数据。
如何做到这一点?有可能吗?

public class DataStreamCassandraExample implements Serializable{

   private static final long serialVersionUID = 1L;

   static Logger LOG = LoggerFactory.getLogger(DataStreamCassandraExample.class);

   private transient static StreamExecutionEnvironment env;
    static DataStream<Tuple4<UUID,String,String,String>> inputRecords;

        public static void main(String[] args) throws Exception {
             env = StreamExecutionEnvironment.getExecutionEnvironment();

            ParameterTool argParameters = ParameterTool.fromArgs(args);
            env.getConfig().setGlobalJobParameters(argParameters);

               Properties kafkaProps = new Properties();
               kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
               kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group1");

               FlinkKafkaConsumer<String> kafkaConsumer =  new FlinkKafkaConsumer<>("testtopic", new SimpleStringSchema(), kafkaProps);

               ClusterBuilder cb = new ClusterBuilder() {

               private static final long serialVersionUID = 1L;

                   @Override
                   public Cluster buildCluster(Cluster.Builder builder) {
                       return builder.addContactPoint("127.0.0.1")
                               .withPort(9042)
                               .withoutJMXReporting()
                               .build();
                   }
               };

               CassandraInputFormat<Tuple4<UUID,String,String,String>> cassandraInputFormat =
                       new CassandraInputFormat<> ("select * from employee_details", cb);

               //While Application is start up , Read data from table and send as stream
               inputRecords = getDBData(env,cassandraInputFormat);

               // If any data comes from kafka means, again i want to get data from table.
               //How to i trigger getDBData() method from inside this stream.
               //The below code is not working
               DataStream<String> inputRecords1= env.addSource(kafkaConsumer)
                           .map(new MapFunction<String,String>() {
                               private static final long serialVersionUID = 1L;

                               @Override
                               public String map(String value) throws Exception {
                                   inputRecords =  getDBData(env,cassandraInputFormat);
                                   return "OK";
                               }
                           });

               //This is not printed , when i call getDBData() stream from inside the kafka stream.
               inputRecords1.print();

                DataStream<Employee> empDataStream = inputRecords.map(new MapFunction<Tuple4<UUID,String,String,String>, Tuple2<String,Employee>>() {
                       private static final long serialVersionUID = 1L;

                       @Override
                       public Tuple2<String, Employee> map(Tuple4<UUID,String,String,String> value) throws Exception {
                           Employee emp = new Employee();
                           try{
                           emp.setEmpid(value.f0);
                           emp.setFirstname(value.f1);
                           emp.setLastname(value.f2);
                           emp.setAddress(value.f3);

                           }
                           catch(Exception e){
                           }

                           return new Tuple2<>(emp.getEmpid().toString(), emp);
                       }
                   }).keyBy(0).map(new MapFunction<Tuple2<String,Employee>,Employee>() {

                       private static final long serialVersionUID = 1L;

                       @Override
                       public Employee map(Tuple2<String, Employee> value)
                               throws Exception {
                           return value.f1;
                       }   

                   });

             empDataStream.print();

                env.execute();
        }

        private static  DataStream<Tuple4<UUID,String,String,String>> getDBData(StreamExecutionEnvironment env,
                                                                   CassandraInputFormat<Tuple4<UUID,String,String,String>> cassandraInputFormat){

            DataStream<Tuple4<UUID,String,String,String>> inputRecords = env
                    .createInput
                    (cassandraInputFormat   
                    ,TupleTypeInfo.of(new TypeHint<Tuple4<UUID,String,String,String>>() {}));
           return inputRecords;

        }          
}
wn9m85ua

wn9m85ua1#

这将是一个非常冗长的回答。
要正确使用flink作为开发人员,您需要理解它的基本概念。我建议您从架构概述开始(https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html),它包含了所有你需要知道的,以便进入世界的Flink当你从编程。
现在,看看您的代码,它不应该做您期望的事情,因为flink将如何读取它。您需要了解flink在执行代码时至少有两个大步骤:首先,它构建一个只描述它需要做什么的执行图。这发生在工作经理级别。第二个大步骤是让一个或多个工作人员执行图形。这两个步骤是连续的,任何关于图形描述的操作都必须在作业管理器级别完成,而不是在操作内部完成。
在您的情况下,该图有:
卡法克人的消息来源。
一个将在工作级调用getdbdata()的Map(不太好,因为getdbdata()每次调用时都会添加一个新的输入来改变图形)。
线路 inputRecords = getDBData(env,cassandraInputFormat); 将创建图的孤立分支。那条线呢 DataStream<Employee> empDataStream = inputRecords.map... 将附加 map->keyBy->map 去那个孤儿院。这将构建一部分图,从cassandra读取所有员工记录并应用 map->keyBy->map 转变。这不会与Kafka的消息来源有任何联系。
现在让我们回到你的需要。我知道当员工的身份证来自Kafka时,您需要为他/她的身份证提取数据并进行一些操作。
最干净的处理方法称为侧输入。这是您在构建图形时声明的数据输入,作业管理器处理数据的读取及其向工作进程的传输。坏消息是,flink中的流式作业还不能使用side输入(https://issues.apache.org/jira/browse/flink-2491 -此错误导致Streaming作业不创建checskpoints,因为侧输入完成得很快,这会使作业处于一种奇怪的状态)。
话说回来,你还有三个选择。正确的选项取决于employee cassandra表的大小。
第二种选择是将所有员工加载到一个静态final变量 employees 并在Map函数中使用它。这种方法的背后是,作业管理器会将此变量的序列化副本发送给所有工作进程,可能会阻塞您的网络,也可能会使ram过载。如果表的大小很小,将来不应该变大,那么这可能是一个可以接受的工作,直到边输入为流式作业工作为止。如果表的大小很大,或者将来会发生变化,那么考虑第三种选择。
第三种选择是对第二种选择的改进。它使用flink的广播变量(参见https://flink.apache.org/2019/06/26/broadcast-state.html 以及https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html). 短篇小说:这是与以前一样,更好的转移管理。flink将找到存储变量并将其发送给工人的最佳方法。不过,这种方法要正确实现要复杂一些。
最后一种选择在正常情况下是不可取的。它只需要在map操作中调用cassandra。这不是一个好的做法,因为它会给所有map执行增加重复的延迟(调用的次数与通过kafka的项的次数相同)。一个调用意味着一个连接的创建,实际的查询请求,等待cassandra回复并释放连接。这可能是一个在你的图表步骤很多工作。当你真的找不到任何替代品时,这是一个需要考虑的解决方案。
对于你的情况,我建议第三种选择。我想employee表应该不是很大,使用广播变量是一个不错的选择。

相关问题