Java 中的循环调度算法使用 AtomicBoolean
Round robin scheduling algorithm in Java using AtomicBoolean
我想在向外部系统发送请求时实施严格的循环调度。有两个外部系统服务器。第一个请求应该转到 'System1',第二个请求必须转到 'System2',下一个请求必须转到 'System1',依此类推。
因为我只有两个服务器可以向其发送请求,并且因为我希望在没有任何阻塞和上下文切换的情况下获得最大性能,所以我选择了 AtomicBoolean,因为它使用 CAS 操作。
我的实现类
1. RoundRobinTest.java
package com.concurrency;
import java.util.Iterator;
public class RoundRobinTest
{
public static void main(String[] args)
{
for (int i = 0; i < 500; i++)
{
new Thread(new RoundRobinLogic()).start();
}
try
{
// Giving a few seconds for the threads to complete
Thread.currentThread().sleep(2000);
Iterator<String> output = RoundRobinLogic.output.iterator();
int i=0;
while (output.hasNext())
{
System.out.println(i+++":"+output.next());
// Sleeping after each out.print
Thread.currentThread().sleep(20);
}
}
catch (Exception ex)
{
// do nothing
}
}
}
2.RoundRobinLogic.java(Class 静态 AtomicBoolean 对象)
package com.concurrency;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
public class RoundRobinLogic implements Runnable
{
private static AtomicBoolean bool = new AtomicBoolean(true);
public static Queue<String> output = new ConcurrentLinkedDeque<>();
@Override
public void run()
{
if(bool.getAndSet(false))
{
// Sending the request to first system
output.add("Request to System1");
}
else if(!bool.getAndSet(true))
{
// Sending the request to first system
output.add("Request to System2");
}
}
}
输出:
......................
314:Request to System1
315:Request to System2
316:Request to System1
317:Request to System2
318:Request to System1
319:Request to System1
320:Request to System2
321:Request to System2
322:Request to System1
323:Request to System2
324:Request to System1
325:Request to System2
......................
请求 318 和 319 已发送到同一台服务器,并且 AtomicBoolean 在这种情况下失败。对于我的应用程序,可能有 1000-2000 个线程同时访问共享对象。
从 Java 并发实践中,我看到了下面的内容。
在高争用级别上,锁定往往优于原子变量,但在更现实的情况下
争用级别原子变量胜过锁。这是因为锁通过挂起线程来响应争用,
减少 CPU 共享内存总线上的使用和同步流量。
对于低到中等的竞争,原子提供更好的可扩展性;竞争激烈,锁报价
更好地避免争用。 (基于 CAS 的算法在单个 CPU 系统上也优于基于锁的算法,因为
CAS 总是在单个 CPU 系统上成功,除非线程在系统中间被抢占这种不太可能的情况
读取修改写入操作。)
现在我有以下问题。
- 有没有其他高效的非阻塞方式,实现轮询请求发送
- 在竞争激烈的情况下,AtomicBoolean 是否有可能失败?我的理解是 performance/throughput 可能会由于激烈的争用而下降。但在上面的示例中,AtomicBoolean 失败了。为什么 ?
假设您在实际系统中使用的不是 Queue
,而是 api。我看到的问题与:
if(bool.getAndSet(false))
{
// Sending the request to first system
output.add("Request to System1");
}
else if(!bool.getAndSet(true))
{
// Sending the request to second system
output.add("Request to System2");
}
如果两个条件都失败怎么办?这怎么可能?想象一下,在输入第一个 if
时,布尔值是 true
。然后您尝试将其设置为 false,但另一个线程抢在您之前,所以您看到了 false
。然后你试试else if
。现在,如果到达那里时 else if
是 false,但设置为 true 又会怎样?在那种情况下,两次尝试都会失败。
我会将其重构为:
while(true){
boolean current = bool.get();
if(bool.compareAndSet(current, !current){
if(current){
//send request to first system
} else {
//send request to second system
}
return;
}
}
正如 Sean Bright 提到的,因为您正在添加到队列中,即使您像我上面那样实现它,您仍然可能会看到一些乱序的值,因为队列本身不是与 AtomicBoolean 同步的一部分.
除了 John 的回答之外,RoundRobinLogic
的更简洁且可能稍微更有效的实施将使用 AtomicInteger
或 AtomicLong
。这消除了将 AtomicBoolean
的当前值与新值进行比较的需要:
class RoundRobinLogic implements Runnable
{
private static final AtomicInteger systemIndex = new AtomicInteger(1);
public static final Queue<String> output = new ConcurrentLinkedDeque<>();
@Override
public void run()
{
if (systemIndex.incrementAndGet() % 2 == 0) {
// Sending the request to first system
output.add("Request to System1");
} else {
// Sending the request to second system
output.add("Request to System2");
}
}
}
这将使您能够相当轻松地将其扩展到其他系统:
class RemoteSystem
{
private final String name;
RemoteSystem(String name)
{
this.name = name;
}
public String name()
{
return name;
}
}
class RoundRobinLogic implements Runnable
{
private static final AtomicInteger systemIndex = new AtomicInteger(1);
private static final RemoteSystem[] systems = new RemoteSystem[] {
new RemoteSystem("System1"),
new RemoteSystem("System2"),
new RemoteSystem("System3"),
new RemoteSystem("System4"),
};
public static final Queue<String> output = new ConcurrentLinkedDeque<>();
@Override
public void run()
{
RemoteSystem system = systems[systemIndex.incrementAndGet() % systems.length];
// Sending the request to right system
output.add("Request to " + system.name());
}
}
因为您的要求基本上是:实现一个原子操作,
- 评估并翻转一个布尔值(或在通用
n
服务器案例中评估一个模数并递增一个计数器)和
- 根据步骤 1 的结果将条目插入队列,
您无法通过使步骤 1 和步骤 2 分别成为线程安全的来真正实现这一目标;您必须 同步 步骤 1 和步骤 2。
这是一个应该有效的简单实现:
import java.util.LinkedList;
import java.util.Queue;
public class RoundRobinLogic implements Runnable
{
private static boolean bool = true;
public static final Queue<String> OUTPUT = new LinkedList<String>();
private static final Object LOCK = new Object();
@Override
public void run() {
synchronized (LOCK) {
OUTPUT.add(bool ? "Request to System1" : "Request to System2");
bool = !bool;
}
}
}
关于您的问题:
- 如果需要同时同步两个高于处理器级别的操作,则无法避免阻塞。
java.util.concurrent.atomic
中的 类 使用机器级原子指令,这就是为什么使用这些 类 的代码(通常,取决于平台)不需要阻塞。
- 在您的实施中,
AtomicBoolean
没有失败。相反,在读取布尔值和将元素添加到队列之间存在竞争条件。
我想在向外部系统发送请求时实施严格的循环调度。有两个外部系统服务器。第一个请求应该转到 'System1',第二个请求必须转到 'System2',下一个请求必须转到 'System1',依此类推。
因为我只有两个服务器可以向其发送请求,并且因为我希望在没有任何阻塞和上下文切换的情况下获得最大性能,所以我选择了 AtomicBoolean,因为它使用 CAS 操作。
我的实现类
1. RoundRobinTest.java
package com.concurrency;
import java.util.Iterator;
public class RoundRobinTest
{
public static void main(String[] args)
{
for (int i = 0; i < 500; i++)
{
new Thread(new RoundRobinLogic()).start();
}
try
{
// Giving a few seconds for the threads to complete
Thread.currentThread().sleep(2000);
Iterator<String> output = RoundRobinLogic.output.iterator();
int i=0;
while (output.hasNext())
{
System.out.println(i+++":"+output.next());
// Sleeping after each out.print
Thread.currentThread().sleep(20);
}
}
catch (Exception ex)
{
// do nothing
}
}
}
2.RoundRobinLogic.java(Class 静态 AtomicBoolean 对象)
package com.concurrency;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
public class RoundRobinLogic implements Runnable
{
private static AtomicBoolean bool = new AtomicBoolean(true);
public static Queue<String> output = new ConcurrentLinkedDeque<>();
@Override
public void run()
{
if(bool.getAndSet(false))
{
// Sending the request to first system
output.add("Request to System1");
}
else if(!bool.getAndSet(true))
{
// Sending the request to first system
output.add("Request to System2");
}
}
}
输出:
......................
314:Request to System1
315:Request to System2
316:Request to System1
317:Request to System2
318:Request to System1
319:Request to System1
320:Request to System2
321:Request to System2
322:Request to System1
323:Request to System2
324:Request to System1
325:Request to System2
......................
请求 318 和 319 已发送到同一台服务器,并且 AtomicBoolean 在这种情况下失败。对于我的应用程序,可能有 1000-2000 个线程同时访问共享对象。 从 Java 并发实践中,我看到了下面的内容。
在高争用级别上,锁定往往优于原子变量,但在更现实的情况下 争用级别原子变量胜过锁。这是因为锁通过挂起线程来响应争用, 减少 CPU 共享内存总线上的使用和同步流量。 对于低到中等的竞争,原子提供更好的可扩展性;竞争激烈,锁报价 更好地避免争用。 (基于 CAS 的算法在单个 CPU 系统上也优于基于锁的算法,因为 CAS 总是在单个 CPU 系统上成功,除非线程在系统中间被抢占这种不太可能的情况 读取修改写入操作。)
现在我有以下问题。
- 有没有其他高效的非阻塞方式,实现轮询请求发送
- 在竞争激烈的情况下,AtomicBoolean 是否有可能失败?我的理解是 performance/throughput 可能会由于激烈的争用而下降。但在上面的示例中,AtomicBoolean 失败了。为什么 ?
假设您在实际系统中使用的不是 Queue
,而是 api。我看到的问题与:
if(bool.getAndSet(false))
{
// Sending the request to first system
output.add("Request to System1");
}
else if(!bool.getAndSet(true))
{
// Sending the request to second system
output.add("Request to System2");
}
如果两个条件都失败怎么办?这怎么可能?想象一下,在输入第一个 if
时,布尔值是 true
。然后您尝试将其设置为 false,但另一个线程抢在您之前,所以您看到了 false
。然后你试试else if
。现在,如果到达那里时 else if
是 false,但设置为 true 又会怎样?在那种情况下,两次尝试都会失败。
我会将其重构为:
while(true){
boolean current = bool.get();
if(bool.compareAndSet(current, !current){
if(current){
//send request to first system
} else {
//send request to second system
}
return;
}
}
正如 Sean Bright 提到的,因为您正在添加到队列中,即使您像我上面那样实现它,您仍然可能会看到一些乱序的值,因为队列本身不是与 AtomicBoolean 同步的一部分.
除了 John 的回答之外,RoundRobinLogic
的更简洁且可能稍微更有效的实施将使用 AtomicInteger
或 AtomicLong
。这消除了将 AtomicBoolean
的当前值与新值进行比较的需要:
class RoundRobinLogic implements Runnable
{
private static final AtomicInteger systemIndex = new AtomicInteger(1);
public static final Queue<String> output = new ConcurrentLinkedDeque<>();
@Override
public void run()
{
if (systemIndex.incrementAndGet() % 2 == 0) {
// Sending the request to first system
output.add("Request to System1");
} else {
// Sending the request to second system
output.add("Request to System2");
}
}
}
这将使您能够相当轻松地将其扩展到其他系统:
class RemoteSystem
{
private final String name;
RemoteSystem(String name)
{
this.name = name;
}
public String name()
{
return name;
}
}
class RoundRobinLogic implements Runnable
{
private static final AtomicInteger systemIndex = new AtomicInteger(1);
private static final RemoteSystem[] systems = new RemoteSystem[] {
new RemoteSystem("System1"),
new RemoteSystem("System2"),
new RemoteSystem("System3"),
new RemoteSystem("System4"),
};
public static final Queue<String> output = new ConcurrentLinkedDeque<>();
@Override
public void run()
{
RemoteSystem system = systems[systemIndex.incrementAndGet() % systems.length];
// Sending the request to right system
output.add("Request to " + system.name());
}
}
因为您的要求基本上是:实现一个原子操作,
- 评估并翻转一个布尔值(或在通用
n
服务器案例中评估一个模数并递增一个计数器)和 - 根据步骤 1 的结果将条目插入队列,
您无法通过使步骤 1 和步骤 2 分别成为线程安全的来真正实现这一目标;您必须 同步 步骤 1 和步骤 2。
这是一个应该有效的简单实现:
import java.util.LinkedList;
import java.util.Queue;
public class RoundRobinLogic implements Runnable
{
private static boolean bool = true;
public static final Queue<String> OUTPUT = new LinkedList<String>();
private static final Object LOCK = new Object();
@Override
public void run() {
synchronized (LOCK) {
OUTPUT.add(bool ? "Request to System1" : "Request to System2");
bool = !bool;
}
}
}
关于您的问题:
- 如果需要同时同步两个高于处理器级别的操作,则无法避免阻塞。
java.util.concurrent.atomic
中的 类 使用机器级原子指令,这就是为什么使用这些 类 的代码(通常,取决于平台)不需要阻塞。 - 在您的实施中,
AtomicBoolean
没有失败。相反,在读取布尔值和将元素添加到队列之间存在竞争条件。