我的场景是,我想基于另一个流输入调用一个流。两种流类型不同。下面是我的示例代码。当从kafka流接收到消息时,我想触发一个流。
当应用程序启动时,我可以从数据库读取数据。然后,我想再次从数据库中获取基于Kafka消息的数据。当我在流中收到kafka消息时,我想再次从db获取数据。
如何做到这一点?有可能吗?
public class DataStreamCassandraExample implements Serializable{
private static final long serialVersionUID = 1L;
static Logger LOG = LoggerFactory.getLogger(DataStreamCassandraExample.class);
private transient static StreamExecutionEnvironment env;
static DataStream<Tuple4<UUID,String,String,String>> inputRecords;
public static void main(String[] args) throws Exception {
env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool argParameters = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(argParameters);
Properties kafkaProps = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group1");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("testtopic", new SimpleStringSchema(), kafkaProps);
ClusterBuilder cb = new ClusterBuilder() {
private static final long serialVersionUID = 1L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1")
.withPort(9042)
.withoutJMXReporting()
.build();
}
};
CassandraInputFormat<Tuple4<UUID,String,String,String>> cassandraInputFormat =
new CassandraInputFormat<> ("select * from employee_details", cb);
//While Application is start up , Read data from table and send as stream
inputRecords = getDBData(env,cassandraInputFormat);
// If any data comes from kafka means, again i want to get data from table.
//How to i trigger getDBData() method from inside this stream.
//The below code is not working
DataStream<String> inputRecords1= env.addSource(kafkaConsumer)
.map(new MapFunction<String,String>() {
private static final long serialVersionUID = 1L;
@Override
public String map(String value) throws Exception {
inputRecords = getDBData(env,cassandraInputFormat);
return "OK";
}
});
//This is not printed , when i call getDBData() stream from inside the kafka stream.
inputRecords1.print();
DataStream<Employee> empDataStream = inputRecords.map(new MapFunction<Tuple4<UUID,String,String,String>, Tuple2<String,Employee>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Employee> map(Tuple4<UUID,String,String,String> value) throws Exception {
Employee emp = new Employee();
try{
emp.setEmpid(value.f0);
emp.setFirstname(value.f1);
emp.setLastname(value.f2);
emp.setAddress(value.f3);
}
catch(Exception e){
}
return new Tuple2<>(emp.getEmpid().toString(), emp);
}
}).keyBy(0).map(new MapFunction<Tuple2<String,Employee>,Employee>() {
private static final long serialVersionUID = 1L;
@Override
public Employee map(Tuple2<String, Employee> value)
throws Exception {
return value.f1;
}
});
empDataStream.print();
env.execute();
}
private static DataStream<Tuple4<UUID,String,String,String>> getDBData(StreamExecutionEnvironment env,
CassandraInputFormat<Tuple4<UUID,String,String,String>> cassandraInputFormat){
DataStream<Tuple4<UUID,String,String,String>> inputRecords = env
.createInput
(cassandraInputFormat
,TupleTypeInfo.of(new TypeHint<Tuple4<UUID,String,String,String>>() {}));
return inputRecords;
}
}
1条答案
按热度按时间wn9m85ua1#
这将是一个非常冗长的回答。
要正确使用flink作为开发人员,您需要理解它的基本概念。我建议您从架构概述开始(https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html),它包含了所有你需要知道的,以便进入世界的Flink当你从编程。
现在,看看您的代码,它不应该做您期望的事情,因为flink将如何读取它。您需要了解flink在执行代码时至少有两个大步骤:首先,它构建一个只描述它需要做什么的执行图。这发生在工作经理级别。第二个大步骤是让一个或多个工作人员执行图形。这两个步骤是连续的,任何关于图形描述的操作都必须在作业管理器级别完成,而不是在操作内部完成。
在您的情况下,该图有:
卡法克人的消息来源。
一个将在工作级调用getdbdata()的Map(不太好,因为getdbdata()每次调用时都会添加一个新的输入来改变图形)。
线路
inputRecords = getDBData(env,cassandraInputFormat);
将创建图的孤立分支。那条线呢DataStream<Employee> empDataStream = inputRecords.map...
将附加map->keyBy->map
去那个孤儿院。这将构建一部分图,从cassandra读取所有员工记录并应用map->keyBy->map
转变。这不会与Kafka的消息来源有任何联系。现在让我们回到你的需要。我知道当员工的身份证来自Kafka时,您需要为他/她的身份证提取数据并进行一些操作。
最干净的处理方法称为侧输入。这是您在构建图形时声明的数据输入,作业管理器处理数据的读取及其向工作进程的传输。坏消息是,flink中的流式作业还不能使用side输入(https://issues.apache.org/jira/browse/flink-2491 -此错误导致Streaming作业不创建checskpoints,因为侧输入完成得很快,这会使作业处于一种奇怪的状态)。
话说回来,你还有三个选择。正确的选项取决于employee cassandra表的大小。
第二种选择是将所有员工加载到一个静态final变量
employees
并在Map函数中使用它。这种方法的背后是,作业管理器会将此变量的序列化副本发送给所有工作进程,可能会阻塞您的网络,也可能会使ram过载。如果表的大小很小,将来不应该变大,那么这可能是一个可以接受的工作,直到边输入为流式作业工作为止。如果表的大小很大,或者将来会发生变化,那么考虑第三种选择。第三种选择是对第二种选择的改进。它使用flink的广播变量(参见https://flink.apache.org/2019/06/26/broadcast-state.html 以及https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html). 短篇小说:这是与以前一样,更好的转移管理。flink将找到存储变量并将其发送给工人的最佳方法。不过,这种方法要正确实现要复杂一些。
最后一种选择在正常情况下是不可取的。它只需要在map操作中调用cassandra。这不是一个好的做法,因为它会给所有map执行增加重复的延迟(调用的次数与通过kafka的项的次数相同)。一个调用意味着一个连接的创建,实际的查询请求,等待cassandra回复并释放连接。这可能是一个在你的图表步骤很多工作。当你真的找不到任何替代品时,这是一个需要考虑的解决方案。
对于你的情况,我建议第三种选择。我想employee表应该不是很大,使用广播变量是一个不错的选择。