执行rest调用的flink转换(async、future、netty)

ctzwtxfj  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(340)

假设flink每秒接收1000条tweet流,在这个过程中的某个地方,它需要将它们归类为垃圾邮件或非垃圾邮件。我有一个集群,例如20台机器,通过restapi提供“分类”微服务,它们可以提供每秒10k tweets的最大吞吐量,延迟为3秒。这意味着在最坏的情况下,我可能会有3万条推特在飞,这没关系。我猜要使用flink提供的服务,实现应该是这样的:

public class Classifier implements MapFunction<Tweet, TweetWithClass> {
  @Override
  public TweetWithClass map(Tweet tweet) {
    TweetWithClass twc = new TweetWithClass(tweet);
    twc.classes = (new Post('http://my.classifier.com', data = tweet.body)).bodyAsStringArrayFromJson();
    return twc;
  }
}

DataSet<TweetWithClass> outTweets = inTweets.map(new Classifier()).setParallelism(30000);

现在,考虑到这个api,我猜flink除了启动30k线程之外别无选择,这可能会很糟糕。我在flink使用netty的源代码中看到,我猜它可以通过使用异步调用更有效地支持此操作。。。如果存在虚构的美丽的netty、flink和java api,则看起来像这样:

public class Classifier implements MapFunction<Tweet, TweetWithClass> {
  @Override
  public Future<TweetWithClass> map(Tweet tweet) {
    Future<String[]> classes = (new NettyPost('http://my.classifier.com', data = tweet.body)).asyncBodyAsStringArrayFromJson();
    return classes.onGet( (String[] classes) -> new TweetWithClass(tweet, twc.classes) );
  }
}

DataSet<TweetWithClass> outTweets = inTweets.nettyMap(new Classifier()).setMaxParallelism(30000);

有没有一种方法可以在flink中使用很少的线程来使用异步调用来获得巨大的可伸缩性?

jw5wzhpr

jw5wzhpr1#

我知道这是一个相对古老的问题,但从flink 1.2(2017年2月发布)开始,flink就提供了一个用于此目的的api。它被称为异步i/o。
使用异步i/o,您可以对外部数据库或外部web服务执行异步调用,并在将来通过回调获得结果。
更多信息请参见:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html

相关问题