控制异步网络消息设计速率的通信管理器
Communication Manager that controls rate of asynchronous network messaging design
我有一个系统需要访问,这个系统的访问速率是每秒 1 API 调用。但是,我希望通过异步的 Web 界面提供对它的访问。我有一个设计,它需要一个专用的通信管理器线程,该线程将消息收集到一个队列中,一次传输一个,然后用消息的结果回调给发送者。
这是一个好方法吗?你在我的代码中看到任何明显的陷阱吗?
public class CommunicationManager implements Runnable
{
private BlockingQueue<Message> message = new LinkedList<> ();
private boolean shutdown = false;
private Messenger messenger = new Messenger();
public CommunicationManager() {}
public void run()
{
long elapsed, start, diff;
start = 0;
while (!shutdown)
{
elapsed = System.currentTimeMillis();
diff = elapsed - start;
if (diff < 1000)
{
Thread.sleep(1000 - diff);
}
if (!message.isEmpty())
{
Message next = message.remove();
next.getSender().recieve(messenger.send(next.getMessage()));
}
start = elapsed;
}
}
public synchronized void addMessage(Sender sender, String message)
{
this.message.add(new Message(sender, message));
}
public synchronized void shutdown()
{
this.shutdown = true;
}
}
这个管理器的希望结果是每个循环,如果自上一个循环开始以来还没有经过一秒钟,它将在剩余的时间内休眠。然后它将检查队列是否为空。如果队列不为空,它将检索队列中的下一条消息,将消息和 return 结果发送给发送者的回调。然后循环结束并再次开始循环。
我使用 BlockingQueue 来避免在我删除队列中的最后一条消息时有人向队列添加消息的问题。我不认为默认的 Queue 结构是线程安全的,所以我需要一些方法来防止这种情况发生。
事实证明我在这里有点重新发明轮子。使用 Java ScheduledExecutorService 可以更轻松地完成我真正想做的事情。
class CommunicationControl
{
private final ScheduledExecutorService scheduluer = Executors.newScheduledThreadPool(1);
public void startManager()
{
final CommunicationManager manager = new CommunicationManager();
scheduler.scheduleAtFixedRate(manager, 10, 1, SECONDS);
}
public void stopManager()
{
while (manager.getMessageCount() > 0)
{
try
{
Thread.sleep(manager.getMessageCount() * 1000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
scheduler.shutdown();
}
}
原始 class 此处的返工:
class CommunicationManager implements Runnable
{
private BlockingQueue<Message> message = new ArrayBlockingQueue<Message> (1000);
private Messenger messenger = new Messenger();
public CommunicationManager() {}
public void run()
{
Message next = message.poll();
if (next != null)
{
next.getSender().recieve(messenger.send(next.getMessage()));
}
}
public void addMessage(Sender sender, String message)
{
try
{
while (!this.message.offer(new Message(sender, message), 1, TimeUnit.SECONDS)) {}
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
这将首先创建一个执行程序来处理我的 Communication Manager 运行 的频率,在本例中是在延迟 10 秒后每秒一次。然后,一旦这种情况每秒发生一次,CommuncationManager 的 运行 方法将执行并执行一个工作单元(如果可用)。
其他方法可以自由地向 CommunicationManager 添加工作,只要有队列 space,否则它们将阻塞直到 space 可用。
这也让我能够将其构建到框架中,例如 Spring。
我有一个系统需要访问,这个系统的访问速率是每秒 1 API 调用。但是,我希望通过异步的 Web 界面提供对它的访问。我有一个设计,它需要一个专用的通信管理器线程,该线程将消息收集到一个队列中,一次传输一个,然后用消息的结果回调给发送者。
这是一个好方法吗?你在我的代码中看到任何明显的陷阱吗?
public class CommunicationManager implements Runnable
{
private BlockingQueue<Message> message = new LinkedList<> ();
private boolean shutdown = false;
private Messenger messenger = new Messenger();
public CommunicationManager() {}
public void run()
{
long elapsed, start, diff;
start = 0;
while (!shutdown)
{
elapsed = System.currentTimeMillis();
diff = elapsed - start;
if (diff < 1000)
{
Thread.sleep(1000 - diff);
}
if (!message.isEmpty())
{
Message next = message.remove();
next.getSender().recieve(messenger.send(next.getMessage()));
}
start = elapsed;
}
}
public synchronized void addMessage(Sender sender, String message)
{
this.message.add(new Message(sender, message));
}
public synchronized void shutdown()
{
this.shutdown = true;
}
}
这个管理器的希望结果是每个循环,如果自上一个循环开始以来还没有经过一秒钟,它将在剩余的时间内休眠。然后它将检查队列是否为空。如果队列不为空,它将检索队列中的下一条消息,将消息和 return 结果发送给发送者的回调。然后循环结束并再次开始循环。
我使用 BlockingQueue 来避免在我删除队列中的最后一条消息时有人向队列添加消息的问题。我不认为默认的 Queue 结构是线程安全的,所以我需要一些方法来防止这种情况发生。
事实证明我在这里有点重新发明轮子。使用 Java ScheduledExecutorService 可以更轻松地完成我真正想做的事情。
class CommunicationControl
{
private final ScheduledExecutorService scheduluer = Executors.newScheduledThreadPool(1);
public void startManager()
{
final CommunicationManager manager = new CommunicationManager();
scheduler.scheduleAtFixedRate(manager, 10, 1, SECONDS);
}
public void stopManager()
{
while (manager.getMessageCount() > 0)
{
try
{
Thread.sleep(manager.getMessageCount() * 1000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
scheduler.shutdown();
}
}
原始 class 此处的返工:
class CommunicationManager implements Runnable
{
private BlockingQueue<Message> message = new ArrayBlockingQueue<Message> (1000);
private Messenger messenger = new Messenger();
public CommunicationManager() {}
public void run()
{
Message next = message.poll();
if (next != null)
{
next.getSender().recieve(messenger.send(next.getMessage()));
}
}
public void addMessage(Sender sender, String message)
{
try
{
while (!this.message.offer(new Message(sender, message), 1, TimeUnit.SECONDS)) {}
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
这将首先创建一个执行程序来处理我的 Communication Manager 运行 的频率,在本例中是在延迟 10 秒后每秒一次。然后,一旦这种情况每秒发生一次,CommuncationManager 的 运行 方法将执行并执行一个工作单元(如果可用)。
其他方法可以自由地向 CommunicationManager 添加工作,只要有队列 space,否则它们将阻塞直到 space 可用。
这也让我能够将其构建到框架中,例如 Spring。