Java 8 ForkJoin 无法解释的输出
Java 8 ForkJoin Unexplained Output
我正在玩 ForkJoin
框架和 Java 8 accumulateAndGet
函数。以下程序产生了我无法解释的输出。可以吗?
P.S:这不是作业。这是 Cay S. Horstmann 在 "Java SE 8 for the Really Impatient" 一书中的练习。是本好书。
/**
* Q1: Write a program that keeps track of the longest string that is observed by a number of threads. Use an
* {@code AtomicReference} and an appropriate accumulator.
*
* @param observed
* Longest string.
* @param x
* String value to be compared to the longest value.
* @return Longest string.
*/
public static String updateLongestString(final AtomicReference<String> observed, final String x) {
LOGGER.info("Received observed: {}, x: {}", observed, x);
final String longestString = observed.accumulateAndGet(x,
maxBy((str1, str2) -> observed.get().length() - x.length()));
LOGGER.info("New observed: {}.", longestString);
return longestString;
}
单元测试:
@Test
public void testUpdateLongestString() {
final String[] words = new String[] { "Java 8", "Java 8 is Awesome!",
"Java 8 is the Best thing Since Sliced Bread!", "Java 8 Changes Everything!" };
final int len = words.length;
final int stopAfter = 100;
final AtomicReference<String> longestString = new AtomicReference<>(words[0]);
final AtomicInteger count = new AtomicInteger(1);
class UpdateLongestStringTask extends RecursiveAction {
private static final long serialVersionUID = -2288401002001447054L;
private int id = -1;
private UpdateLongestStringTask(final int id) {
this.id = id;
}
@Override
protected void compute() {
LOGGER.info("Executing task #: {}.", id);
if (count.get() >= stopAfter) {
return;
}
final ForkJoinTask<Void> task = new UpdateLongestStringTask(count.incrementAndGet()).fork();
updateLongestString(longestString, words[randomIndex()]);
task.join();
}
private int randomIndex() {
return ThreadLocalRandom.current().nextInt(len);
}
}
/* Just because we can. */
final int parallelism = min(getRuntime().availableProcessors(), 4);
new ForkJoinPool(parallelism).invoke(new UpdateLongestStringTask(count.get()));
}
输出(标记为 -->> 的行无法解释;它为什么要打印一个它显然从未收到过的值):
2015-01-05 23:20:00.974 [ForkJoinPool-1-worker-1] [INFO ] n.a.j.j.c.PracticeQuestionsCh6Test - Executing task #: 1.
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-2] [INFO ] n.a.j.j.c.PracticeQuestionsCh6Test - Executing task #: 2.
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-3] [INFO ] n.a.j.j.c.PracticeQuestionsCh6Test - Executing task #: 3.
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-0] [INFO ] n.a.j.j.c.PracticeQuestionsCh6Test - Executing task #: 4.
2015-01-05 23:20:00.981 [ForkJoinPool-1-worker-0] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - Received observed: Java 8, x: Java 8 Changes Everything!
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-3] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - Received observed: Java 8, x: Java 8
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-1] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - Received observed: Java 8, x: Java 8 is Awesome!
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-2] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - Received observed: Java 8, x: Java 8 is the Best thing Since Sliced Bread!
2015-01-05 23:20:01.028 [ForkJoinPool-1-worker-0] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - New observed: Java 8 Changes Everything!.
-->> 2015-01-05 23:20:01.028 [ForkJoinPool-1-worker-3] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - New observed: Java 8 Changes Everything!.
2015-01-05 23:20:01.028 [ForkJoinPool-1-worker-1] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - New observed: Java 8 is Awesome!.
如果您提供一个函数,该函数应该对其参数进行操作,即
final String longestString = observed.accumulateAndGet(x,
maxBy((str1, str2) -> observed.get().length() - x.length()));
严重违反了函数 API 的契约。 accumulateAndGet
将提供有关指定操作的原子更新,但不会提供有关函数内另一个 get
操作的原子更新。
不清楚你为什么这样做,因为实现正确的功能是直截了当的:
final String longestString = observed.accumulateAndGet(x,
maxBy((str1, str2) -> str1.length() - str2.length()));
或
final String longestString =
observed.accumulateAndGet(x, maxBy(comparingInt(String::length)));
请注意,在修复代码后,您仍然可能观察到与之前记录的两个值不同的结果,因为 accumulateAndGet
为您提供了原子更新,但这种原子性不会扩展到您执行的记录操作 在 调用accumulateAndGet
之前。 AtomicReference
的记录内容在执行原子更新时可能已过时。但由于更新合同和您提供的运算符,结果 String
的大小至少与您之前看到的最大值相同。
此外,请记住,日志操作的感知顺序并不能说明更新操作的实际执行顺序。
您可以更好地了解更改代码时发生的情况,如下所示:
public static String updateLongestString(AtomicReference<String> observed, String x) {
final String longestString = observed.accumulateAndGet(x, maxBy((str1, str2) -> {
LOGGER.info("Received str1: {}, str2: {}", str1, str2);
return str1.length() - str2.length();
}));
LOGGER.info("New observed: {}.", longestString);
return longestString;
}
我正在玩 ForkJoin
框架和 Java 8 accumulateAndGet
函数。以下程序产生了我无法解释的输出。可以吗?
P.S:这不是作业。这是 Cay S. Horstmann 在 "Java SE 8 for the Really Impatient" 一书中的练习。是本好书。
/**
* Q1: Write a program that keeps track of the longest string that is observed by a number of threads. Use an
* {@code AtomicReference} and an appropriate accumulator.
*
* @param observed
* Longest string.
* @param x
* String value to be compared to the longest value.
* @return Longest string.
*/
public static String updateLongestString(final AtomicReference<String> observed, final String x) {
LOGGER.info("Received observed: {}, x: {}", observed, x);
final String longestString = observed.accumulateAndGet(x,
maxBy((str1, str2) -> observed.get().length() - x.length()));
LOGGER.info("New observed: {}.", longestString);
return longestString;
}
单元测试:
@Test
public void testUpdateLongestString() {
final String[] words = new String[] { "Java 8", "Java 8 is Awesome!",
"Java 8 is the Best thing Since Sliced Bread!", "Java 8 Changes Everything!" };
final int len = words.length;
final int stopAfter = 100;
final AtomicReference<String> longestString = new AtomicReference<>(words[0]);
final AtomicInteger count = new AtomicInteger(1);
class UpdateLongestStringTask extends RecursiveAction {
private static final long serialVersionUID = -2288401002001447054L;
private int id = -1;
private UpdateLongestStringTask(final int id) {
this.id = id;
}
@Override
protected void compute() {
LOGGER.info("Executing task #: {}.", id);
if (count.get() >= stopAfter) {
return;
}
final ForkJoinTask<Void> task = new UpdateLongestStringTask(count.incrementAndGet()).fork();
updateLongestString(longestString, words[randomIndex()]);
task.join();
}
private int randomIndex() {
return ThreadLocalRandom.current().nextInt(len);
}
}
/* Just because we can. */
final int parallelism = min(getRuntime().availableProcessors(), 4);
new ForkJoinPool(parallelism).invoke(new UpdateLongestStringTask(count.get()));
}
输出(标记为 -->> 的行无法解释;它为什么要打印一个它显然从未收到过的值):
2015-01-05 23:20:00.974 [ForkJoinPool-1-worker-1] [INFO ] n.a.j.j.c.PracticeQuestionsCh6Test - Executing task #: 1.
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-2] [INFO ] n.a.j.j.c.PracticeQuestionsCh6Test - Executing task #: 2.
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-3] [INFO ] n.a.j.j.c.PracticeQuestionsCh6Test - Executing task #: 3.
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-0] [INFO ] n.a.j.j.c.PracticeQuestionsCh6Test - Executing task #: 4.
2015-01-05 23:20:00.981 [ForkJoinPool-1-worker-0] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - Received observed: Java 8, x: Java 8 Changes Everything!
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-3] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - Received observed: Java 8, x: Java 8
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-1] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - Received observed: Java 8, x: Java 8 is Awesome!
2015-01-05 23:20:00.980 [ForkJoinPool-1-worker-2] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - Received observed: Java 8, x: Java 8 is the Best thing Since Sliced Bread!
2015-01-05 23:20:01.028 [ForkJoinPool-1-worker-0] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - New observed: Java 8 Changes Everything!.
-->> 2015-01-05 23:20:01.028 [ForkJoinPool-1-worker-3] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - New observed: Java 8 Changes Everything!.
2015-01-05 23:20:01.028 [ForkJoinPool-1-worker-1] [INFO ] n.a.j.j.c.PracticeQuestionsCh6 - New observed: Java 8 is Awesome!.
如果您提供一个函数,该函数应该对其参数进行操作,即
final String longestString = observed.accumulateAndGet(x,
maxBy((str1, str2) -> observed.get().length() - x.length()));
严重违反了函数 API 的契约。 accumulateAndGet
将提供有关指定操作的原子更新,但不会提供有关函数内另一个 get
操作的原子更新。
不清楚你为什么这样做,因为实现正确的功能是直截了当的:
final String longestString = observed.accumulateAndGet(x,
maxBy((str1, str2) -> str1.length() - str2.length()));
或
final String longestString =
observed.accumulateAndGet(x, maxBy(comparingInt(String::length)));
请注意,在修复代码后,您仍然可能观察到与之前记录的两个值不同的结果,因为 accumulateAndGet
为您提供了原子更新,但这种原子性不会扩展到您执行的记录操作 在 调用accumulateAndGet
之前。 AtomicReference
的记录内容在执行原子更新时可能已过时。但由于更新合同和您提供的运算符,结果 String
的大小至少与您之前看到的最大值相同。
此外,请记住,日志操作的感知顺序并不能说明更新操作的实际执行顺序。
您可以更好地了解更改代码时发生的情况,如下所示:
public static String updateLongestString(AtomicReference<String> observed, String x) {
final String longestString = observed.accumulateAndGet(x, maxBy((str1, str2) -> {
LOGGER.info("Received str1: {}, str2: {}", str1, str2);
return str1.length() - str2.length();
}));
LOGGER.info("New observed: {}.", longestString);
return longestString;
}