我在这里建立了一个最小的例子,其中有n个kakfa主题的n个流(在下面的例子中是100个)。
我想在每个流看到“endofstream”消息时完成它。当所有流都完成时,我希望flink程序能够优雅地完成。
当parallelism设置为1时,这是正确的,但通常不会发生。
从另一个问题来看,似乎Kafka消费群体的所有线索都没有终结。
其他人建议抛出一个异常。但是,程序将在第一个异常时终止,并且不会等待所有流完成。
我还添加了一个最小的python程序,将消息添加到kafka主题中,以实现可复制性。请填写表格 <IP>:<PORT>
在每个程序中。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String outputPath = "file://" + System.getProperty("user.dir") + "/out/output";
Properties kafkaProps = null;
kafkaProps = new Properties();
String brokers = "<IP>:<PORT>";
kafkaProps.setProperty("bootstrap.servers", brokers);
kafkaProps.setProperty("auto.offset.reset", "earliest");
ArrayList<FlinkKafkaConsumer<String>> consumersList = new ArrayList<FlinkKafkaConsumer<String>>();
ArrayList<DataStream<String>> streamList = new ArrayList<DataStream<String>>();
for (int i = 0; i < 100; i++) {
consumersList.add(new FlinkKafkaConsumer<String>(Integer.toString(i),
new SimpleStringSchema() {
@Override
public boolean isEndOfStream(String nextElement) {
if (nextElement.contains("EndofStream")) {
// throw new RuntimeException("End of Stream");
return true;
} else {
return false;
}
}
}
, kafkaProps));
consumersList.get(i).setStartFromEarliest();
streamList.add(env.addSource(consumersList.get(i)));
streamList.get(i).writeAsText(outputPath + Integer.toString(i), WriteMode.OVERWRITE);
}
// execute program
env.execute("Flink Streaming Java API Skeleton");
python 3程序
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='<IP>:<PORT>')
for i in range(100): # Channel Number
for j in range(100): # Message Number
message = "Message: " + str(j) + " going on channel: " + str(i)
producer.send(str(i), str.encode(message))
message = "EndofStream on channel: " + str(i)
producer.send(str(i), str.encode(message))
producer.flush()
更改此行: streamList.add(env.addSource(consumersList.get(i)));
至 streamList.add(env.addSource(consumersList.get(i)).setParallelism(1));
也可以完成这项工作,但是flink将所有的消费者放在同一台物理机器上。
我希望消费者也能得到分配。
flink-conf.yaml公司
parallelism.default: 2
cluster.evenly-spread-out-slots: true
最后一种方法是将每个主题写在单独的文件中,并将文件作为源文件而不是Kafka使用者。
最终目标是测试flink为某些程序处理某些工作负载所需的时间。
1条答案
按热度按时间fruv7luv1#
使用
cancel
方法的父类flinkkafkaconsumerbaseFlinkKafkaConsumer
.public void cancel()说明从接口复制:sourcefunction取消源。大多数源在sourcefunction.run(sourcecontext)方法中都有while循环。实现需要确保在调用此方法后源将跳出该循环。典型的模式是在这个方法中将“volatile boolean isrunning”标志设置为false。在循环条件中检查该标志。
当源被取消时,执行线程也会被中断(通过thread.interrupt())。中断严格地发生在调用此方法之后,因此任何中断处理程序都可以依赖于此方法已完成的事实。很好的做法是将此方法更改的任何标志设置为“volatile”,以保证此方法的效果对任何中断处理程序的可见性。
指定者:在接口sourcefunction中取消
你是对的。有必要使用
SimpleStringSchema
. 这是基于这个答案https://stackoverflow.com/a/44247452/2096986. 看看这个例子。我先发了线Flink code we saw also works in a cluster
Kafka的消费者消费了这个信息。然后我派SHUTDOWNDDDDDDD
这对完成这条流也没有影响。最后,我把SHUTDOWN
流作业完成。查看程序下面的日志。日志: