领英解析如何 运行 个任务接一个任务?
Linkedin parseq. How to run task after task?
我正在使用 parseq 框架进行异步计算。
考虑以下代码。它首先查询 google.com 的内容,然后将内容映射到它的长度。最后打印长度。
问题是只有第一个任务是运行。为什么?
public class Main {
public static void main(String[] args) throws Exception {
OkHttpClient okHttpClient = new OkHttpClient();
final int numCores = Runtime.getRuntime().availableProcessors();
final ExecutorService taskScheduler = Executors.newFixedThreadPool(numCores + 1);
final ScheduledExecutorService timerScheduler = Executors.newScheduledThreadPool(numCores + 1);
final Engine engine = new EngineBuilder()
.setTaskExecutor(taskScheduler)
.setTimerScheduler(timerScheduler)
.build();
Task<Integer> task = Task.async(() -> {
SettablePromise<String> promise = Promises.settable();
Request request = new Request.Builder()
.url("http://google.com")
.build();
okHttpClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
System.out.println("error");
}
@Override
public void onResponse(Call call, Response response) throws IOException {
promise.done(response.body().string());
}
});
return promise;
}).map("map content to length", content -> content.length())
.andThen(System.out::println);
engine.blockingRun(task);
engine.blockingRun(task);
}
}
我使用 HttpClient
而不是 OkHttp
解决了您的问题。
以下是我用于此代码的总体 Maven 依赖项:
<dependency>
<groupId>com.linkedin.parseq</groupId>
<artifactId>parseq</artifactId>
<version>3.0.11</version>
</dependency>
<dependency>
<groupId>com.linkedin.parseq</groupId>
<artifactId>parseq-http-client</artifactId>
<version>3.0.11</version>
</dependency>
import com.linkedin.parseq.Engine;
import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.httpclient.HttpClient;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class Main {
private static Task<Integer> fetchBody(String url) {
Task<Integer> map = HttpClient.get(url).task().map("map content to length", content -> content.getResponseBody().length());
return map;
}
public static void main(String[] args) {
final int numCores = Runtime.getRuntime().availableProcessors();
final ExecutorService taskScheduler = Executors.newFixedThreadPool(numCores + 1);
final ScheduledExecutorService timerScheduler = Executors.newScheduledThreadPool(numCores + 1);
final Engine engine = new EngineBuilder()
.setTaskExecutor(taskScheduler)
.setTimerScheduler(timerScheduler)
.build();
final Task<Integer> Whosebug = fetchBody("http://www.whosebug.com");
final Task<Integer> google = fetchBody("http://www.google.com");
final Task<Integer> ethereum = fetchBody("http://ethereum.stackexchange.com");
final Task<String> plan = Task.par(Whosebug, google, ethereum)
.map((s, g, e) -> "Whosebug Page: " + s + " \n" +
"Google Page: " + g + "\n" +
"Ethereum Page: " + e + "\n")
.andThen(System.out::println);
engine.run(plan);
}
}
输出:
Whosebug Page: 149
Google Page: 13097
Ethereum Page: 152
This example is fully asynchronous. The home pages for Whosebug ,
Google, and Ethereum are all fetched in parallel while the original
thread has returned to the calling code. We used Tasks.par to tell the
engine to parallelize these HTTP requests. Once all of the responses
have been retrieved they are transformed into a int
(string
length)that is finally printed out.
要点: https://gist.github.com/vishwaratna/26417f7467a4e827eadeee6923ddf3ae
因为您使用 与 运行 相同的任务。
任务是接口,抽象class是包含字段“_stateRef”的BaseTask,这个字段维护任务状态。
第一个运行处于INIT状态的任务,当第一个执行时。状态更改为 运行。
在此代码中阻止任务执行。
com.linkedin.parseq.BaseTask#contextRun
有一个判断:transitionRun(traceBuilder).
所以,正确的代码执行方式如下:
private void replayOkHttpNotExecuteSecondTask() {
try {
log.info("begin task");
engine.blockingRun(okHttpTask());
engine.blockingRun(okHttpTask());
} catch (Exception e) {
e.printStackTrace();
}
}
private Task okHttpTask() {
OkHttpClient okHttpClient = new OkHttpClient();
return Task.async(() -> {
SettablePromise<String> settablePromise = Promises.settable();
Request request = new Request.Builder().url("http://baidu.com").build();
okHttpClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
System.out.println("error");
}
@Override
public void onResponse(Call call, okhttp3.Response response) throws IOException {
settablePromise.done(response.body().string());
}
});
return settablePromise;
}).map("map to length", content -> content.length())
.andThen(System.out::println);
}
我正在使用 parseq 框架进行异步计算。
考虑以下代码。它首先查询 google.com 的内容,然后将内容映射到它的长度。最后打印长度。
问题是只有第一个任务是运行。为什么?
public class Main {
public static void main(String[] args) throws Exception {
OkHttpClient okHttpClient = new OkHttpClient();
final int numCores = Runtime.getRuntime().availableProcessors();
final ExecutorService taskScheduler = Executors.newFixedThreadPool(numCores + 1);
final ScheduledExecutorService timerScheduler = Executors.newScheduledThreadPool(numCores + 1);
final Engine engine = new EngineBuilder()
.setTaskExecutor(taskScheduler)
.setTimerScheduler(timerScheduler)
.build();
Task<Integer> task = Task.async(() -> {
SettablePromise<String> promise = Promises.settable();
Request request = new Request.Builder()
.url("http://google.com")
.build();
okHttpClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
System.out.println("error");
}
@Override
public void onResponse(Call call, Response response) throws IOException {
promise.done(response.body().string());
}
});
return promise;
}).map("map content to length", content -> content.length())
.andThen(System.out::println);
engine.blockingRun(task);
engine.blockingRun(task);
}
}
我使用 HttpClient
而不是 OkHttp
解决了您的问题。
以下是我用于此代码的总体 Maven 依赖项:
<dependency>
<groupId>com.linkedin.parseq</groupId>
<artifactId>parseq</artifactId>
<version>3.0.11</version>
</dependency>
<dependency>
<groupId>com.linkedin.parseq</groupId>
<artifactId>parseq-http-client</artifactId>
<version>3.0.11</version>
</dependency>
import com.linkedin.parseq.Engine;
import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.httpclient.HttpClient;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class Main {
private static Task<Integer> fetchBody(String url) {
Task<Integer> map = HttpClient.get(url).task().map("map content to length", content -> content.getResponseBody().length());
return map;
}
public static void main(String[] args) {
final int numCores = Runtime.getRuntime().availableProcessors();
final ExecutorService taskScheduler = Executors.newFixedThreadPool(numCores + 1);
final ScheduledExecutorService timerScheduler = Executors.newScheduledThreadPool(numCores + 1);
final Engine engine = new EngineBuilder()
.setTaskExecutor(taskScheduler)
.setTimerScheduler(timerScheduler)
.build();
final Task<Integer> Whosebug = fetchBody("http://www.whosebug.com");
final Task<Integer> google = fetchBody("http://www.google.com");
final Task<Integer> ethereum = fetchBody("http://ethereum.stackexchange.com");
final Task<String> plan = Task.par(Whosebug, google, ethereum)
.map((s, g, e) -> "Whosebug Page: " + s + " \n" +
"Google Page: " + g + "\n" +
"Ethereum Page: " + e + "\n")
.andThen(System.out::println);
engine.run(plan);
}
}
输出:
Whosebug Page: 149
Google Page: 13097
Ethereum Page: 152
This example is fully asynchronous. The home pages for Whosebug , Google, and Ethereum are all fetched in parallel while the original thread has returned to the calling code. We used Tasks.par to tell the engine to parallelize these HTTP requests. Once all of the responses have been retrieved they are transformed into a
int
(string length)that is finally printed out.
要点: https://gist.github.com/vishwaratna/26417f7467a4e827eadeee6923ddf3ae
因为您使用 与 运行 相同的任务。
任务是接口,抽象class是包含字段“_stateRef”的BaseTask,这个字段维护任务状态。
第一个运行处于INIT状态的任务,当第一个执行时。状态更改为 运行。 在此代码中阻止任务执行。 com.linkedin.parseq.BaseTask#contextRun
有一个判断:transitionRun(traceBuilder).
所以,正确的代码执行方式如下:
private void replayOkHttpNotExecuteSecondTask() {
try {
log.info("begin task");
engine.blockingRun(okHttpTask());
engine.blockingRun(okHttpTask());
} catch (Exception e) {
e.printStackTrace();
}
}
private Task okHttpTask() {
OkHttpClient okHttpClient = new OkHttpClient();
return Task.async(() -> {
SettablePromise<String> settablePromise = Promises.settable();
Request request = new Request.Builder().url("http://baidu.com").build();
okHttpClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
System.out.println("error");
}
@Override
public void onResponse(Call call, okhttp3.Response response) throws IOException {
settablePromise.done(response.body().string());
}
});
return settablePromise;
}).map("map to length", content -> content.length())
.andThen(System.out::println);
}