Reactive Streams TCK(技术兼容性套件)对简单应用程序的测试 stalls/slows 不会 运行
Reactive Streams TCK (Technology Compatibility Kit) test on simple application stalls/slows will not run
问题:
Need help successfully running test from Reactive Streams TCK (Technology Compatibility Kit)?
SimplePublisher.java(示例)
package aaa.bbb.ccc.jar;
import java.util.Iterator;
import java.util.stream.IntStream;
import org.reactivestreams.FlowAdapters;
import java.util.concurrent.Flow;
public class SimplePublisher implements Flow.Publisher<Integer> {
private final Iterator<Integer> iterator;
SimplePublisher(int count) {
this.iterator = IntStream.rangeClosed(1, count).iterator();
}
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
iterator.forEachRemaining(subscriber::onNext);
subscriber.onComplete();
}
public static void main(String[] args) {
new SimplePublisher(10).subscribe(new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
}
@Override
public void onNext(Integer item) {
System.out.println("item = [" + item + "]");
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
System.out.println("complete");
}
});
}
}
...运行ning "main()" 方法成功 运行s 产生了预测的输出...
item = [1]
item = [2]
item = [3]
item = [4]
item = [5]
item = [6]
item = [7]
item = [8]
item = [9]
item = [10]
complete
SimplePublisherTest.java(测试 - 未能达到 运行,即 slows/hangs)
package aaa.bbb.ccc.jar;
import java.util.concurrent.Flow;
import org.reactivestreams.tck.TestEnvironment;
import org.reactivestreams.tck.flow.FlowPublisherVerification;
//FlowPublisherVerification
public class SimplePublisherTest extends FlowPublisherVerification<Integer> {
public SimplePublisherTest() {
super(new TestEnvironment());
}
@Override
public Flow.Publisher<Integer> createFlowPublisher(long elements) {
return new SimplePublisher((int) elements);
}
@Override
public Flow.Publisher<Integer> createFailedFlowPublisher() {
return null;
}
}
运行测试
的输出
--- maven-clean-plugin:2.5:clean (default-clean) @ sp ---
Deleting C:\tools\sp\target
--- maven-resources-plugin:2.6:resources (default-resources) @ sp ---
Using 'UTF-8' encoding to copy filtered resources.
skip non existing resourceDirectory C:\tools\sp\src\main\resources
--- maven-compiler-plugin:3.8.0:compile (default-compile) @ sp ---
Changes detected - recompiling the module!
Compiling 1 source file to C:\tools\sp\target\classes
--- maven-resources-plugin:2.6:testResources (default-testResources) @ sp ---
Using 'UTF-8' encoding to copy filtered resources.
skip non existing resourceDirectory C:\tools\sp\src\test\resources
--- maven-compiler-plugin:3.8.0:testCompile (default-testCompile) @ sp ---
Changes detected - recompiling the module!
Compiling 1 source file to C:\tools\sp\target\test-classes
--- maven-surefire-plugin:2.12.4:test (default-test) @ sp ---
Surefire report directory: C:\tools\sp\target\surefire-reports
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running aaa.bbb.ccc.jar.SimplePublisherTest
Configuring TestNG with: org.apache.maven.surefire.testng.conf.TestNGMapConfigurator@123a439b
(test hangs at this point!)
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>aaa.bbb.ccc</groupId>
<artifactId>sp</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<name>sp</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<skip.unit.tests>false</skip.unit.tests>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-flow-adapters</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck-flow</artifactId>
<version>1.0.2</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
</plugins>
</build>
</project>
示例取自...
https://blog.softwaremill.com/how-not-to-use-reactive-streams-in-java-9-7a39ea9c2cb3
环境
java11
反应流 (v1.0.2)
反应流流适配器 (v1.0.2)
reactive-streams-tck-flow (v1.0.2)
似乎答案只是将 maxElementsFromPublisher() 方法添加到 SimplePublisherTest.java class...
例如,
@Override
public long maxElementsFromPublisher() {
return 1000;
}
在此 github 页面找到此信息...
https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck
从readme.md...
为了通知 TCK Publisher 最多只能发送 2 个元素,重写 maxElementsFromPublisher 方法,如下所示:
@Override public long maxElementsFromPublisher() {
return 2;
}
TCK 还支持无法发出完成信号的发布者。想象一个由计时器支持的发布者——这样的发布者没有自然的方式在一些滴答后 "complete"。可以实现 "take n elements from the TickPublisher and then signal completion to the downstream" 的处理器,但这会在 TCK 和最初想要测试的发布者之间增加一层间接。建议以任何一种方式测试此类未绑定的发布者——使用 "TakeNElementsProcessor" 或通过通知 TCK 发布者无法发出完成信号。然后 TCK 将跳过所有需要发出 onComplete 信号的测试。
为了通知 TCK 您的发布者无法发出完成信号,请重写 maxElementsFromPublisher 方法,如下所示:
@Override public long maxElementsFromPublisher() {
return publisherUnableToSignalOnComplete(); // == Long.MAX_VALUE == unbounded
}
问题:
Need help successfully running test from Reactive Streams TCK (Technology Compatibility Kit)?
SimplePublisher.java(示例)
package aaa.bbb.ccc.jar;
import java.util.Iterator;
import java.util.stream.IntStream;
import org.reactivestreams.FlowAdapters;
import java.util.concurrent.Flow;
public class SimplePublisher implements Flow.Publisher<Integer> {
private final Iterator<Integer> iterator;
SimplePublisher(int count) {
this.iterator = IntStream.rangeClosed(1, count).iterator();
}
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
iterator.forEachRemaining(subscriber::onNext);
subscriber.onComplete();
}
public static void main(String[] args) {
new SimplePublisher(10).subscribe(new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
}
@Override
public void onNext(Integer item) {
System.out.println("item = [" + item + "]");
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
System.out.println("complete");
}
});
}
}
...运行ning "main()" 方法成功 运行s 产生了预测的输出...
item = [1]
item = [2]
item = [3]
item = [4]
item = [5]
item = [6]
item = [7]
item = [8]
item = [9]
item = [10]
complete
SimplePublisherTest.java(测试 - 未能达到 运行,即 slows/hangs)
package aaa.bbb.ccc.jar;
import java.util.concurrent.Flow;
import org.reactivestreams.tck.TestEnvironment;
import org.reactivestreams.tck.flow.FlowPublisherVerification;
//FlowPublisherVerification
public class SimplePublisherTest extends FlowPublisherVerification<Integer> {
public SimplePublisherTest() {
super(new TestEnvironment());
}
@Override
public Flow.Publisher<Integer> createFlowPublisher(long elements) {
return new SimplePublisher((int) elements);
}
@Override
public Flow.Publisher<Integer> createFailedFlowPublisher() {
return null;
}
}
运行测试
的输出--- maven-clean-plugin:2.5:clean (default-clean) @ sp ---
Deleting C:\tools\sp\target
--- maven-resources-plugin:2.6:resources (default-resources) @ sp ---
Using 'UTF-8' encoding to copy filtered resources.
skip non existing resourceDirectory C:\tools\sp\src\main\resources
--- maven-compiler-plugin:3.8.0:compile (default-compile) @ sp ---
Changes detected - recompiling the module!
Compiling 1 source file to C:\tools\sp\target\classes
--- maven-resources-plugin:2.6:testResources (default-testResources) @ sp ---
Using 'UTF-8' encoding to copy filtered resources.
skip non existing resourceDirectory C:\tools\sp\src\test\resources
--- maven-compiler-plugin:3.8.0:testCompile (default-testCompile) @ sp ---
Changes detected - recompiling the module!
Compiling 1 source file to C:\tools\sp\target\test-classes
--- maven-surefire-plugin:2.12.4:test (default-test) @ sp ---
Surefire report directory: C:\tools\sp\target\surefire-reports
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running aaa.bbb.ccc.jar.SimplePublisherTest
Configuring TestNG with: org.apache.maven.surefire.testng.conf.TestNGMapConfigurator@123a439b
(test hangs at this point!)
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>aaa.bbb.ccc</groupId>
<artifactId>sp</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<name>sp</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<skip.unit.tests>false</skip.unit.tests>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-flow-adapters</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck-flow</artifactId>
<version>1.0.2</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
</plugins>
</build>
</project>
示例取自...
https://blog.softwaremill.com/how-not-to-use-reactive-streams-in-java-9-7a39ea9c2cb3
环境
java11
反应流 (v1.0.2)
反应流流适配器 (v1.0.2)
reactive-streams-tck-flow (v1.0.2)
似乎答案只是将 maxElementsFromPublisher() 方法添加到 SimplePublisherTest.java class...
例如,
@Override
public long maxElementsFromPublisher() {
return 1000;
}
在此 github 页面找到此信息...
https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck
从readme.md...
为了通知 TCK Publisher 最多只能发送 2 个元素,重写 maxElementsFromPublisher 方法,如下所示:
@Override public long maxElementsFromPublisher() {
return 2;
}
TCK 还支持无法发出完成信号的发布者。想象一个由计时器支持的发布者——这样的发布者没有自然的方式在一些滴答后 "complete"。可以实现 "take n elements from the TickPublisher and then signal completion to the downstream" 的处理器,但这会在 TCK 和最初想要测试的发布者之间增加一层间接。建议以任何一种方式测试此类未绑定的发布者——使用 "TakeNElementsProcessor" 或通过通知 TCK 发布者无法发出完成信号。然后 TCK 将跳过所有需要发出 onComplete 信号的测试。
为了通知 TCK 您的发布者无法发出完成信号,请重写 maxElementsFromPublisher 方法,如下所示:
@Override public long maxElementsFromPublisher() {
return publisherUnableToSignalOnComplete(); // == Long.MAX_VALUE == unbounded
}