进行 REST 调用的 Flink 转换(异步、Future、Netty)
Flink transformation which does REST call (async, Future, Netty)
让我们假设 Flink 每秒接收 1000 条推文流,并且在这个过程中的某个地方,它需要将它们分类为垃圾邮件或不分类。我有一个集群,例如20 台机器通过 REST API 提供 "classification" 微服务,它们可以提供每秒 10k 条推文的最大吞吐量,延迟为 3 秒。这意味着在最坏的情况下,我可能有 3 万条动态推文,这没关系。我想从 Flink 中使用这个服务,实现将是这样的:
public class Classifier implements MapFunction<Tweet, TweetWithClass> {
@Override
public TweetWithClass map(Tweet tweet) {
TweetWithClass twc = new TweetWithClass(tweet);
twc.classes = (new Post('http://my.classifier.com', data = tweet.body)).bodyAsStringArrayFromJson();
return twc;
}
}
DataSet<TweetWithClass> outTweets = inTweets.map(new Classifier()).setParallelism(30000);
现在,鉴于此 API,我的猜测是 Flink 除了启动 30k 线程外别无选择,这可能很糟糕。我在源码中看到Flink使用了Netty,我猜它可以通过使用异步调用更高效地支持这个操作......如果虚构漂亮的Netty,Flink和Java API存在,这看起来有点像像这样:
public class Classifier implements MapFunction<Tweet, TweetWithClass> {
@Override
public Future<TweetWithClass> map(Tweet tweet) {
Future<String[]> classes = (new NettyPost('http://my.classifier.com', data = tweet.body)).asyncBodyAsStringArrayFromJson();
return classes.onGet( (String[] classes) -> new TweetWithClass(tweet, twc.classes) );
}
}
DataSet<TweetWithClass> outTweets = inTweets.nettyMap(new Classifier()).setMaxParallelism(30000);
有没有办法在 Flink 中用很少的线程使用异步调用来获得巨大的可扩展性?
我知道这是一个相对较老的问题,但从 Flink 1.2(2017 年 2 月发布)开始,Flink 提供了一个 API 正是为了这个目的。
它被称为异步 I/O.
使用 async I/O,您可以对外部数据库执行异步调用,或者在您的情况下是外部 Web 服务,并通过将来的回调获取结果。
可在此处找到更多信息:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
让我们假设 Flink 每秒接收 1000 条推文流,并且在这个过程中的某个地方,它需要将它们分类为垃圾邮件或不分类。我有一个集群,例如20 台机器通过 REST API 提供 "classification" 微服务,它们可以提供每秒 10k 条推文的最大吞吐量,延迟为 3 秒。这意味着在最坏的情况下,我可能有 3 万条动态推文,这没关系。我想从 Flink 中使用这个服务,实现将是这样的:
public class Classifier implements MapFunction<Tweet, TweetWithClass> {
@Override
public TweetWithClass map(Tweet tweet) {
TweetWithClass twc = new TweetWithClass(tweet);
twc.classes = (new Post('http://my.classifier.com', data = tweet.body)).bodyAsStringArrayFromJson();
return twc;
}
}
DataSet<TweetWithClass> outTweets = inTweets.map(new Classifier()).setParallelism(30000);
现在,鉴于此 API,我的猜测是 Flink 除了启动 30k 线程外别无选择,这可能很糟糕。我在源码中看到Flink使用了Netty,我猜它可以通过使用异步调用更高效地支持这个操作......如果虚构漂亮的Netty,Flink和Java API存在,这看起来有点像像这样:
public class Classifier implements MapFunction<Tweet, TweetWithClass> {
@Override
public Future<TweetWithClass> map(Tweet tweet) {
Future<String[]> classes = (new NettyPost('http://my.classifier.com', data = tweet.body)).asyncBodyAsStringArrayFromJson();
return classes.onGet( (String[] classes) -> new TweetWithClass(tweet, twc.classes) );
}
}
DataSet<TweetWithClass> outTweets = inTweets.nettyMap(new Classifier()).setMaxParallelism(30000);
有没有办法在 Flink 中用很少的线程使用异步调用来获得巨大的可扩展性?
我知道这是一个相对较老的问题,但从 Flink 1.2(2017 年 2 月发布)开始,Flink 提供了一个 API 正是为了这个目的。 它被称为异步 I/O.
使用 async I/O,您可以对外部数据库执行异步调用,或者在您的情况下是外部 Web 服务,并通过将来的回调获取结果。
可在此处找到更多信息:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html