跟踪地图中队列之间的进度
Tracking the progress between Queues in a Map
我目前有两个队列和项目在它们之间移动。最初,一个项目被放入 firstQueue
,然后三个专用线程之一将其移动到 secondQueue
,最后另一个专用线程将其删除。这些动作显然包括一些处理。我需要能够获取任何项目的状态(IN_FIRST
、AFTER_FIRST
、IN_SECOND
、AFTER_SECOND
或 ABSENT
),我通过以下方式手动实现了它对 statusMap
进行更新,其中队列被修改为
while (true) {
Item i = firstQueue.take();
statusMap.put(i, AFTER_FIRST);
process(i);
secondQueue.add(i);
statusMap.put(i, IN_SECOND);
}
这行得通,但它很丑,并且会留下状态不一致的时间 window。不一致没什么大不了的,它可以通过同步来解决,但这可能会适得其反,因为队列的容量有限并且可能会阻塞。丑陋更让我烦恼。
效率几乎不重要,因为处理需要几秒钟。使用专用线程来控制并发性。任何项目都不应该处于多个状态(但这不是很重要,我目前的活泼方法也不能保证)。将会有更多的队列(和状态),它们会有不同的种类(DelayQueue
、ArrayBlockingQueue
,也许 PriorityQueue
)。
我想知道是否有一个很好的解决方案可以推广到多个队列?
用逻辑包装队列来管理项目状态是否有意义?
public class QueueWrapper<E> implements BlockingQueue<E> {
private Queue<E> myQueue = new LinkedBlockingQueue<>();
private Map<E, Status> statusMap;
public QueueWrapper(Map<E, Status> statusMap) {
this.statusMap = statusMap;
}
[...]
@Override
public E take() throws InterruptedException {
E result = myQueue.take();
statusMap.put(result, Status.AFTER_FIRST);
return result;
}
这样状态管理总是与队列操作相关(并包含在队列操作中)...
显然 statusMap
需要同步,但这无论如何都是一个问题。
如前所述,包装队列或项目将是可行的解决方案或两者兼而有之。
public class ItemWrapper<E> {
E item;
Status status;
public ItemWrapper(Item i, Status s){ ... }
public setStatus(Status s){ ... }
// not necessary if you use a queue wrapper (see queue wrapper)
public boolean equals(Object obj) {
if ( obj instanceof ItemWrapper)
return item.equals(((ItemWrapper) obj).item)
return false;
}
public int hashCode(){
return item;
}
}
...
process(item) // process update status in the item
...
可能更好的方法(已经回答)是让 QueueWrapper 更新队列状态。为了好玩,我不使用状态图,但我使用以前的 itemwrapper 它看起来更干净(状态图也有效)。
public class QueueWrapper<E> implements Queue<E> {
private Queue<ItemWrapper<E>> myQueue;
static private Status inStatus; // FIRST
static private Status outStatus; // AFTER_FIRST
public QueueWrapper(Queue<E> myQueue, Status inStatus, Status outStatus) {...}
@Override
public boolean add(E e) {
return myQueue.add(new ItemWrapper(e, inStatus));
}
@Override
public E remove(){
ItemWrapper<E> result = myQueue.remove();
result.setStatus(outStatus)
return result.item;
}
...
}
您还可以使用 AOP 在队列中注入状态更新而不更改队列(状态映射应该比 itemwrapper 更合适)。
也许我没有很好地回答你的问题,因为一个简单的方法可以知道你的物品在哪里,可以使用 "contains" 功能检查每个队列。
我发现您的模型可能会在一致性、状态控制和缩放方面得到改进。
实现这一点的一种方法是将项目与您的状态耦合,将这对耦合入队和出队,并创建一种机制来确保状态更改。
我的建议如下图:
根据这个模型和你的例子,我们可以做到:
package Whosebug;
import java.util.concurrent.LinkedBlockingQueue;
import Whosebug.item.ItemState;
import Whosebug.task.CreatingTask;
import Whosebug.task.FirstMovingTask;
import Whosebug.task.SecondMovingTask;
public class Main {
private static void startTask(String name, Runnable r){
Thread t = new Thread(r, name);
t.start();
}
public static void main(String[] args) {
//create queues
LinkedBlockingQueue<ItemState> firstQueue = new LinkedBlockingQueue<ItemState>();
LinkedBlockingQueue<ItemState> secondQueue = new LinkedBlockingQueue<ItemState>();
//start three threads
startTask("Thread#1", new CreatingTask(firstQueue));
startTask("Thread#2", new FirstMovingTask(firstQueue, secondQueue));
startTask("Thread#3", new SecondMovingTask(secondQueue));
}
}
每个任务在 ItemState:
上运行符合以下确认的操作 op()
one of three dedicated thread moves it to secondQueue and finally
another dedicated thread removes it.
ItemState
是一个包含 Item
和您的 State
的不可变对象。这确保了 Item 和 State 值之间的一致性。
ItemState 已确认创建 self-controled 状态机制的下一个状态:
public class FirstMovingTask {
//others codes
protected void op() {
try {
//dequeue
ItemState is0 = new ItemState(firstQueue.take());
System.out.println("Item " + is0.getItem().getValue() + ": " + is0.getState().getValue());
//process here
//enqueue
ItemState is1 = new ItemState(is0);
secondQueue.add(is1);
System.out.println("Item " + is1.getItem().getValue() + ": " + is1.getState().getValue());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//others codes
}
使用 ItemState 实现:
public class ItemStateImpl implements ItemState {
private final Item item;
private final State state;
public ItemStateImpl(Item i){
this.item = i;
this.state = new State();
}
public ItemStateImpl(ItemState is) {
this.item = is.getItem();
this.state = is.getState().next();
}
// gets attrs
}
所以这种方式可以构建更优雅、灵活和可扩展的解决方案。
可扩展,因为您可以控制更多状态,仅更改 next()
并推广移动任务以增加队列数量。
结果:
Item 0: AFTER_FIRST
Item 0: IN_FIRST
Item 0: IN_SECOND
Item 0: AFTER_SECOND
Item 1: IN_FIRST
Item 1: AFTER_FIRST
Item 1: IN_SECOND
Item 1: AFTER_SECOND
Item 2: IN_FIRST
Item 2: AFTER_FIRST
Item 2: IN_SECOND
... others
更新(06/07/2018):分析地图在搜索中的使用
使用比较器等等于值在地图中搜索可能不起作用,因为通常值和标识 (key/hash) 之间的映射不是 one-to-one(见下图)。通过这种方式,需要为搜索值创建一个排序列表,其结果为 O(n) (worst-case).
与 Item.getValuesHashCode()
:
private int getValuesHashCode(){
return new HashCodeBuilder().append(value).hashCode();
}
在这种情况下,您必须保留 Vector<ItemState>
而不是 Item
并像 getValuesHashCode
的结果一样使用密钥。更改 state-control 的机制以保持项目的第一个引用和当前状态。见下文:
//Main.class
public static void main(String[] args) {
... others code ...
//references repository
ConcurrentHashMap<Integer, Vector<ItemState>> statesMap = new ConcurrentHashMap<Integer, Vector<ItemState>>();
//start three threads
startTask("Thread#1", new CreatingTask(firstQueue, statesMap));
... others code ...
}
//CreateTask.class
protected void op() throws InterruptedException {
//create item
ItemState is = new ItemStateImpl(new Item(i++, NameGenerator.name()));
//put in monitor and enqueue
int key = is.getHashValue();
Vector<ItemState> items = map.get(key);
if (items == null){
items = new Vector<>();
map.put(key, items);
}
items.add(is);
//enqueue
queue.put(is);
}
//FirstMovingTask.class
protected void op() throws InterruptedException{
//dequeue
ItemState is0 = firstQueue.take();
//process
ItemState is1 = process(is0.next());
//enqueue
secondQueue.put(is1.next());
}
//ItemState.class
public ItemState next() {
//required for consistent change state
synchronized (state) {
state = state.next();
return this;
}
}
要搜索,您必须使用 concurrentMapRef.get(key)。结果将引用更新的 ItemState。
我的测试结果:
# key = hash("a")
# concurrentMapRef.get(key)
...
Item#7#0 : a - IN_FIRST
... many others lines
Item#7#0 : a - AFTER_FIRST
Item#12#1 : a - IN_FIRST
... many others lines
Item#7#0 : a - IN_SECOND
Item#12#1 : a - IN_FIRST
... many others lines
Item#7#0 : a - AFTER_SECOND
Item#12#1 : a - IN_FIRST
代码中有更多详细信息:https://github.com/ag-studies/Whosebug-queue
2018 年 6 月 9 日更新:重新设计
概括这个项目,我可以理解状态机是这样的:
通过这种方式,我解耦了队列的工作人员以改进概念。我使用 MemoryRep 来保持整个过程中项目的唯一引用。
当然,如果您需要将 ItemState 保存在物理存储库中,您可以使用策略 event-based。
这保留了之前的想法,并为概念创造了更多的易读性。看到这个:
我了解到每个作业都会有两个队列(input/output)并且跟一个业务模型有关系! 研究人员总是会找到 Item 的最新和一致的状态。
所以,回答你的问题:
我可以在任何地方使用 MemoryRep(基本上是一个 Map)找到 Item 的一致状态,在 ItemState[= 中包装状态和项目105=],并控制作业的更改状态入队或出队。
除了 next()
的 运行 之外,性能保持不变
状态始终一致(针对您的问题)
在此模型中,可以使用任何队列类型、任意数量的 jobs/queues 和任意数量的状态。
另外这个很漂亮!!
这里有一点和别人说的不一样。从队列服务和系统的世界中,我们有消息确认的概念。这很好,因为它还为您提供了一些内置的重试逻辑。
我将从高层次阐述它的工作原理,如果您需要,我可以添加代码。
基本上,您的每个队列都会有一个 Set
。您会将队列包装在一个对象中,这样当您将一个项目从队列中取出时,就会发生一些事情
- 项目已从队列中删除
- 项目已添加到关联集
- 一项任务(lambda 包含一个原子布尔值(默认为 false))已安排。当 运行 时,它将从集合中删除项目,如果布尔值为 false,则将其放回队列中
- 布尔值周围的项和包装器 returned 给调用者
一旦 process(i);
完成,您的代码将向包装器指示接收确认,并且包装器将从集合中删除项目并使布尔值变为 false。
return 状态的方法将简单地检查哪个队列或设置项目在其中。
请注意,这会提供 "at least once" 交付,这意味着一个项目将至少处理一次,但如果处理时间太接近超时,则可能不止一次。
我目前有两个队列和项目在它们之间移动。最初,一个项目被放入 firstQueue
,然后三个专用线程之一将其移动到 secondQueue
,最后另一个专用线程将其删除。这些动作显然包括一些处理。我需要能够获取任何项目的状态(IN_FIRST
、AFTER_FIRST
、IN_SECOND
、AFTER_SECOND
或 ABSENT
),我通过以下方式手动实现了它对 statusMap
进行更新,其中队列被修改为
while (true) {
Item i = firstQueue.take();
statusMap.put(i, AFTER_FIRST);
process(i);
secondQueue.add(i);
statusMap.put(i, IN_SECOND);
}
这行得通,但它很丑,并且会留下状态不一致的时间 window。不一致没什么大不了的,它可以通过同步来解决,但这可能会适得其反,因为队列的容量有限并且可能会阻塞。丑陋更让我烦恼。
效率几乎不重要,因为处理需要几秒钟。使用专用线程来控制并发性。任何项目都不应该处于多个状态(但这不是很重要,我目前的活泼方法也不能保证)。将会有更多的队列(和状态),它们会有不同的种类(DelayQueue
、ArrayBlockingQueue
,也许 PriorityQueue
)。
我想知道是否有一个很好的解决方案可以推广到多个队列?
用逻辑包装队列来管理项目状态是否有意义?
public class QueueWrapper<E> implements BlockingQueue<E> {
private Queue<E> myQueue = new LinkedBlockingQueue<>();
private Map<E, Status> statusMap;
public QueueWrapper(Map<E, Status> statusMap) {
this.statusMap = statusMap;
}
[...]
@Override
public E take() throws InterruptedException {
E result = myQueue.take();
statusMap.put(result, Status.AFTER_FIRST);
return result;
}
这样状态管理总是与队列操作相关(并包含在队列操作中)...
显然 statusMap
需要同步,但这无论如何都是一个问题。
如前所述,包装队列或项目将是可行的解决方案或两者兼而有之。
public class ItemWrapper<E> {
E item;
Status status;
public ItemWrapper(Item i, Status s){ ... }
public setStatus(Status s){ ... }
// not necessary if you use a queue wrapper (see queue wrapper)
public boolean equals(Object obj) {
if ( obj instanceof ItemWrapper)
return item.equals(((ItemWrapper) obj).item)
return false;
}
public int hashCode(){
return item;
}
}
...
process(item) // process update status in the item
...
可能更好的方法(已经回答)是让 QueueWrapper 更新队列状态。为了好玩,我不使用状态图,但我使用以前的 itemwrapper 它看起来更干净(状态图也有效)。
public class QueueWrapper<E> implements Queue<E> {
private Queue<ItemWrapper<E>> myQueue;
static private Status inStatus; // FIRST
static private Status outStatus; // AFTER_FIRST
public QueueWrapper(Queue<E> myQueue, Status inStatus, Status outStatus) {...}
@Override
public boolean add(E e) {
return myQueue.add(new ItemWrapper(e, inStatus));
}
@Override
public E remove(){
ItemWrapper<E> result = myQueue.remove();
result.setStatus(outStatus)
return result.item;
}
...
}
您还可以使用 AOP 在队列中注入状态更新而不更改队列(状态映射应该比 itemwrapper 更合适)。
也许我没有很好地回答你的问题,因为一个简单的方法可以知道你的物品在哪里,可以使用 "contains" 功能检查每个队列。
我发现您的模型可能会在一致性、状态控制和缩放方面得到改进。
实现这一点的一种方法是将项目与您的状态耦合,将这对耦合入队和出队,并创建一种机制来确保状态更改。
我的建议如下图:
根据这个模型和你的例子,我们可以做到:
package Whosebug;
import java.util.concurrent.LinkedBlockingQueue;
import Whosebug.item.ItemState;
import Whosebug.task.CreatingTask;
import Whosebug.task.FirstMovingTask;
import Whosebug.task.SecondMovingTask;
public class Main {
private static void startTask(String name, Runnable r){
Thread t = new Thread(r, name);
t.start();
}
public static void main(String[] args) {
//create queues
LinkedBlockingQueue<ItemState> firstQueue = new LinkedBlockingQueue<ItemState>();
LinkedBlockingQueue<ItemState> secondQueue = new LinkedBlockingQueue<ItemState>();
//start three threads
startTask("Thread#1", new CreatingTask(firstQueue));
startTask("Thread#2", new FirstMovingTask(firstQueue, secondQueue));
startTask("Thread#3", new SecondMovingTask(secondQueue));
}
}
每个任务在 ItemState:
上运行符合以下确认的操作op()
one of three dedicated thread moves it to secondQueue and finally another dedicated thread removes it.
ItemState
是一个包含 Item
和您的 State
的不可变对象。这确保了 Item 和 State 值之间的一致性。
ItemState 已确认创建 self-controled 状态机制的下一个状态:
public class FirstMovingTask {
//others codes
protected void op() {
try {
//dequeue
ItemState is0 = new ItemState(firstQueue.take());
System.out.println("Item " + is0.getItem().getValue() + ": " + is0.getState().getValue());
//process here
//enqueue
ItemState is1 = new ItemState(is0);
secondQueue.add(is1);
System.out.println("Item " + is1.getItem().getValue() + ": " + is1.getState().getValue());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//others codes
}
使用 ItemState 实现:
public class ItemStateImpl implements ItemState {
private final Item item;
private final State state;
public ItemStateImpl(Item i){
this.item = i;
this.state = new State();
}
public ItemStateImpl(ItemState is) {
this.item = is.getItem();
this.state = is.getState().next();
}
// gets attrs
}
所以这种方式可以构建更优雅、灵活和可扩展的解决方案。
可扩展,因为您可以控制更多状态,仅更改 next()
并推广移动任务以增加队列数量。
结果:
Item 0: AFTER_FIRST
Item 0: IN_FIRST
Item 0: IN_SECOND
Item 0: AFTER_SECOND
Item 1: IN_FIRST
Item 1: AFTER_FIRST
Item 1: IN_SECOND
Item 1: AFTER_SECOND
Item 2: IN_FIRST
Item 2: AFTER_FIRST
Item 2: IN_SECOND
... others
更新(06/07/2018):分析地图在搜索中的使用 使用比较器等等于值在地图中搜索可能不起作用,因为通常值和标识 (key/hash) 之间的映射不是 one-to-one(见下图)。通过这种方式,需要为搜索值创建一个排序列表,其结果为 O(n) (worst-case).
与 Item.getValuesHashCode()
:
private int getValuesHashCode(){
return new HashCodeBuilder().append(value).hashCode();
}
在这种情况下,您必须保留 Vector<ItemState>
而不是 Item
并像 getValuesHashCode
的结果一样使用密钥。更改 state-control 的机制以保持项目的第一个引用和当前状态。见下文:
//Main.class
public static void main(String[] args) {
... others code ...
//references repository
ConcurrentHashMap<Integer, Vector<ItemState>> statesMap = new ConcurrentHashMap<Integer, Vector<ItemState>>();
//start three threads
startTask("Thread#1", new CreatingTask(firstQueue, statesMap));
... others code ...
}
//CreateTask.class
protected void op() throws InterruptedException {
//create item
ItemState is = new ItemStateImpl(new Item(i++, NameGenerator.name()));
//put in monitor and enqueue
int key = is.getHashValue();
Vector<ItemState> items = map.get(key);
if (items == null){
items = new Vector<>();
map.put(key, items);
}
items.add(is);
//enqueue
queue.put(is);
}
//FirstMovingTask.class
protected void op() throws InterruptedException{
//dequeue
ItemState is0 = firstQueue.take();
//process
ItemState is1 = process(is0.next());
//enqueue
secondQueue.put(is1.next());
}
//ItemState.class
public ItemState next() {
//required for consistent change state
synchronized (state) {
state = state.next();
return this;
}
}
要搜索,您必须使用 concurrentMapRef.get(key)。结果将引用更新的 ItemState。
我的测试结果:
# key = hash("a")
# concurrentMapRef.get(key)
...
Item#7#0 : a - IN_FIRST
... many others lines
Item#7#0 : a - AFTER_FIRST
Item#12#1 : a - IN_FIRST
... many others lines
Item#7#0 : a - IN_SECOND
Item#12#1 : a - IN_FIRST
... many others lines
Item#7#0 : a - AFTER_SECOND
Item#12#1 : a - IN_FIRST
代码中有更多详细信息:https://github.com/ag-studies/Whosebug-queue
2018 年 6 月 9 日更新:重新设计
概括这个项目,我可以理解状态机是这样的:
通过这种方式,我解耦了队列的工作人员以改进概念。我使用 MemoryRep 来保持整个过程中项目的唯一引用。 当然,如果您需要将 ItemState 保存在物理存储库中,您可以使用策略 event-based。
这保留了之前的想法,并为概念创造了更多的易读性。看到这个:
我了解到每个作业都会有两个队列(input/output)并且跟一个业务模型有关系! 研究人员总是会找到 Item 的最新和一致的状态。
所以,回答你的问题:
我可以在任何地方使用 MemoryRep(基本上是一个 Map)找到 Item 的一致状态,在 ItemState[= 中包装状态和项目105=],并控制作业的更改状态入队或出队。
除了 next()
的 运行 之外,性能保持不变
状态始终一致(针对您的问题)
在此模型中,可以使用任何队列类型、任意数量的 jobs/queues 和任意数量的状态。
另外这个很漂亮!!
这里有一点和别人说的不一样。从队列服务和系统的世界中,我们有消息确认的概念。这很好,因为它还为您提供了一些内置的重试逻辑。
我将从高层次阐述它的工作原理,如果您需要,我可以添加代码。
基本上,您的每个队列都会有一个 Set
。您会将队列包装在一个对象中,这样当您将一个项目从队列中取出时,就会发生一些事情
- 项目已从队列中删除
- 项目已添加到关联集
- 一项任务(lambda 包含一个原子布尔值(默认为 false))已安排。当 运行 时,它将从集合中删除项目,如果布尔值为 false,则将其放回队列中
- 布尔值周围的项和包装器 returned 给调用者
一旦 process(i);
完成,您的代码将向包装器指示接收确认,并且包装器将从集合中删除项目并使布尔值变为 false。
return 状态的方法将简单地检查哪个队列或设置项目在其中。
请注意,这会提供 "at least once" 交付,这意味着一个项目将至少处理一次,但如果处理时间太接近超时,则可能不止一次。