Second Flux 永远不会开始工作
Second Flux does never start to do its work
我想测试 spring 反应堆,为此我实现了一个小例子,这里是相关代码:
申请:
@SpringBootApplication
public class Application implements CommandLineRunner {
@Autowired
private ServiceData data;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... args) throws Exception {
this.data.start();
}
}
FluxCreatorFunction
public class FluxCreatorFunction<T extends Function<V, E>, V, E> {
public ConnectableFlux<E> createFlux(T t, V v) {
return (ConnectableFlux<E>) Flux.<E>create(flux -> {
while (true) {
try {
Thread.sleep((ThreadLocalRandom.current().nextInt(1, (3 + 1)) * 1000));
flux.next(t.apply(v));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).publish();
}
}
服务数据
@Service
public class ServiceData {
@Autowired
@Qualifier("inserter")
private ConnectableFlux<Todo> fluxInserter;
@Autowired
@Qualifier("deleter")
private ConnectableFlux<Todo> fluxDeleter;
@Autowired
private TodoRepository repo;
public void start() {
this.startInserter();
this.startDeleter();
}
public void startInserter() {
this.fluxInserter.subscribe(new ConsumerInserter());
this.fluxInserter.subscribe((todo) -> {
this.repo.save(todo);
this.repo.flush();
});
this.fluxInserter.connect();
}
public void startDeleter() {
this.fluxDeleter.subscribe(new ConsumerDeleter());
this.fluxDeleter.subscribe((todo) -> {
this.repo.delete(todo);
this.repo.flush();
});
this.fluxDeleter.connect();
}
@Bean
@Qualifier("inserter")
public ConnectableFlux<Todo> createInserter() {
return new FluxCreatorFunction<FunctionInserter, Void, Todo>().createFlux(new FunctionInserter(), null);
}
@Bean
@Qualifier("deleter")
public ConnectableFlux<Todo> createDeleter() {
return new FluxCreatorFunction<FunctionDeleter, TodoRepository, Todo>().createFlux(new FunctionDeleter(), this.repo);
}
}
class FunctionInserter implements Function<Void, Todo> {
private RestTemplate restTemplate = new RestTemplate();
@Override
public Todo apply(Void v) {
String quote = this.restTemplate.getForObject("http://gturnquist-quoters.cfapps.io/api/random", QuoteResource.class).getValue().getQuote();
return new Todo(quote, false);
}
}
class FunctionDeleter implements Function<TodoRepository, Todo> {
@Override
public Todo apply(TodoRepository repo) {
return repo.findAll().get(0);
}
}
class ConsumerInserter implements Consumer<Todo> {
@Override
public void accept(Todo todo) {
System.out.println("New todo: " + todo.getText());
}
}
class ConsumerDeleter implements Consumer<Todo> {
@Override
public void accept(Todo todo) {
System.out.println("Deleted todo: " + todo.getText());
}
}
如您所见,我正在创建两个不同的 Flux
发布者。两者都是作为服务的 @Bean
和 @Autowired
创建的。
问题是:Onlay 第一个 Flux
正在工作。如果我先启动插入器:
this.startInserter();
this.startDeleter();
输出为:
New todo: So easy it is to switch container in #springboot.
New todo: Spring has come quite a ways in addressing developer enjoyment and ease of use since the last time I built an application using it.
New todo: Working with Spring Boot is like pair-programming with the Spring developers.
New todo: Spring has come quite a ways in addressing developer enjoyment and ease of use since the last time I built an application using it.
New todo: The real benefit of Boot, however, is that it's just Spring. That means any direction the code takes, regardless of complexity, I know it's a safe bet.
New todo: I have two hours today to build an app from scratch. @springboot to the rescue!
New todo: Really loving Spring Boot, makes stand alone Spring apps easy.
New todo: Really loving Spring Boot, makes stand alone Spring apps easy.
如果我反过来:
this.startDeleter();
this.startInserter();
输出为:
Deleted todo: Spring has come quite a ways in addressing developer enjoyment and ease of use since the last time I built an application using it.
Deleted todo: Spring has come quite a ways in addressing developer enjoyment and ease of use since the last time I built an application using it.
Deleted todo: The real benefit of Boot, however, is that it's just Spring. That means any direction the code takes, regardless of complexity, I know it's a safe bet.
Deleted todo: Spring has come quite a ways in addressing developer enjoyment and ease of use since the last time I built an application using it.
Deleted todo: So easy it is to switch container in #springboot.
Deleted todo: I have two hours today to build an app from scratch. @springboot to the rescue!
Deleted todo: Really loving Spring Boot, makes stand alone Spring apps easy.
Deleted todo: So easy it is to switch container in #springboot.
所以我先开始 Flux
并不重要。第二个 Flux
从来没有完成它的工作,我不知道为什么。 Flux
运行 都在同一个线程上吗?他们需要标识符吗?还有什么问题?
您在 Flux.create
中生成数据的方式必须是异步的。你在这里阻塞,这是不支持的。是的,Flux
最终都在同一个线程上执行,第一个无限循环并阻塞所述线程。
您可以使用 "subscribeOn" 和 Schedulers.parallel()
或 Schedulers.elastic()
等调度程序来推迟单独线程上的工作。
您也可以尝试避免使用 create/generate
并通过使用时间运算符完全不阻塞。最新的 3.1.0.M2 里程碑有 delayUntil
,
例如。你也可以使用 Flux.range(1, n).concatMap(index -> Mono.delay(generateRandomDelayValue(index))
编辑:我能想到的重现您尝试实施的随机延迟行为的方法是:
public class FluxCreatorFunction<T extends Function<V, E>, V, E> {
public ConnectableFlux<E> createFlux(T t, V v) {
//use generate to generate a random value per cycle
return Flux.generate(sink -> ThreadLocalRandom
.current()
.nextInt(1, (3 + 1)))
//that random value will be used as a delay, so we need to
//transform each value into a new async sequence, and also
//ensure that the order is preserved, hence concatMap
.concatMap(randomDelay ->
//we introduce a delay then...
Mono.delay(Duration.ofSeconds(randomDelay))
//... map to the result of the function
.map(ignore -> t.apply(v))
.publish();
}
}
我想测试 spring 反应堆,为此我实现了一个小例子,这里是相关代码:
申请:
@SpringBootApplication
public class Application implements CommandLineRunner {
@Autowired
private ServiceData data;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... args) throws Exception {
this.data.start();
}
}
FluxCreatorFunction
public class FluxCreatorFunction<T extends Function<V, E>, V, E> {
public ConnectableFlux<E> createFlux(T t, V v) {
return (ConnectableFlux<E>) Flux.<E>create(flux -> {
while (true) {
try {
Thread.sleep((ThreadLocalRandom.current().nextInt(1, (3 + 1)) * 1000));
flux.next(t.apply(v));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).publish();
}
}
服务数据
@Service
public class ServiceData {
@Autowired
@Qualifier("inserter")
private ConnectableFlux<Todo> fluxInserter;
@Autowired
@Qualifier("deleter")
private ConnectableFlux<Todo> fluxDeleter;
@Autowired
private TodoRepository repo;
public void start() {
this.startInserter();
this.startDeleter();
}
public void startInserter() {
this.fluxInserter.subscribe(new ConsumerInserter());
this.fluxInserter.subscribe((todo) -> {
this.repo.save(todo);
this.repo.flush();
});
this.fluxInserter.connect();
}
public void startDeleter() {
this.fluxDeleter.subscribe(new ConsumerDeleter());
this.fluxDeleter.subscribe((todo) -> {
this.repo.delete(todo);
this.repo.flush();
});
this.fluxDeleter.connect();
}
@Bean
@Qualifier("inserter")
public ConnectableFlux<Todo> createInserter() {
return new FluxCreatorFunction<FunctionInserter, Void, Todo>().createFlux(new FunctionInserter(), null);
}
@Bean
@Qualifier("deleter")
public ConnectableFlux<Todo> createDeleter() {
return new FluxCreatorFunction<FunctionDeleter, TodoRepository, Todo>().createFlux(new FunctionDeleter(), this.repo);
}
}
class FunctionInserter implements Function<Void, Todo> {
private RestTemplate restTemplate = new RestTemplate();
@Override
public Todo apply(Void v) {
String quote = this.restTemplate.getForObject("http://gturnquist-quoters.cfapps.io/api/random", QuoteResource.class).getValue().getQuote();
return new Todo(quote, false);
}
}
class FunctionDeleter implements Function<TodoRepository, Todo> {
@Override
public Todo apply(TodoRepository repo) {
return repo.findAll().get(0);
}
}
class ConsumerInserter implements Consumer<Todo> {
@Override
public void accept(Todo todo) {
System.out.println("New todo: " + todo.getText());
}
}
class ConsumerDeleter implements Consumer<Todo> {
@Override
public void accept(Todo todo) {
System.out.println("Deleted todo: " + todo.getText());
}
}
如您所见,我正在创建两个不同的 Flux
发布者。两者都是作为服务的 @Bean
和 @Autowired
创建的。
问题是:Onlay 第一个 Flux
正在工作。如果我先启动插入器:
this.startInserter();
this.startDeleter();
输出为:
New todo: So easy it is to switch container in #springboot.
New todo: Spring has come quite a ways in addressing developer enjoyment and ease of use since the last time I built an application using it.
New todo: Working with Spring Boot is like pair-programming with the Spring developers.
New todo: Spring has come quite a ways in addressing developer enjoyment and ease of use since the last time I built an application using it.
New todo: The real benefit of Boot, however, is that it's just Spring. That means any direction the code takes, regardless of complexity, I know it's a safe bet.
New todo: I have two hours today to build an app from scratch. @springboot to the rescue!
New todo: Really loving Spring Boot, makes stand alone Spring apps easy.
New todo: Really loving Spring Boot, makes stand alone Spring apps easy.
如果我反过来:
this.startDeleter();
this.startInserter();
输出为:
Deleted todo: Spring has come quite a ways in addressing developer enjoyment and ease of use since the last time I built an application using it.
Deleted todo: Spring has come quite a ways in addressing developer enjoyment and ease of use since the last time I built an application using it.
Deleted todo: The real benefit of Boot, however, is that it's just Spring. That means any direction the code takes, regardless of complexity, I know it's a safe bet.
Deleted todo: Spring has come quite a ways in addressing developer enjoyment and ease of use since the last time I built an application using it.
Deleted todo: So easy it is to switch container in #springboot.
Deleted todo: I have two hours today to build an app from scratch. @springboot to the rescue!
Deleted todo: Really loving Spring Boot, makes stand alone Spring apps easy.
Deleted todo: So easy it is to switch container in #springboot.
所以我先开始 Flux
并不重要。第二个 Flux
从来没有完成它的工作,我不知道为什么。 Flux
运行 都在同一个线程上吗?他们需要标识符吗?还有什么问题?
您在 Flux.create
中生成数据的方式必须是异步的。你在这里阻塞,这是不支持的。是的,Flux
最终都在同一个线程上执行,第一个无限循环并阻塞所述线程。
您可以使用 "subscribeOn" 和 Schedulers.parallel()
或 Schedulers.elastic()
等调度程序来推迟单独线程上的工作。
您也可以尝试避免使用 create/generate
并通过使用时间运算符完全不阻塞。最新的 3.1.0.M2 里程碑有 delayUntil
,
例如。你也可以使用 Flux.range(1, n).concatMap(index -> Mono.delay(generateRandomDelayValue(index))
编辑:我能想到的重现您尝试实施的随机延迟行为的方法是:
public class FluxCreatorFunction<T extends Function<V, E>, V, E> {
public ConnectableFlux<E> createFlux(T t, V v) {
//use generate to generate a random value per cycle
return Flux.generate(sink -> ThreadLocalRandom
.current()
.nextInt(1, (3 + 1)))
//that random value will be used as a delay, so we need to
//transform each value into a new async sequence, and also
//ensure that the order is preserved, hence concatMap
.concatMap(randomDelay ->
//we introduce a delay then...
Mono.delay(Duration.ofSeconds(randomDelay))
//... map to the result of the function
.map(ignore -> t.apply(v))
.publish();
}
}