跟踪地图中队列之间的进度

Tracking the progress between Queues in a Map

我目前有两个队列和项目在它们之间移动。最初,一个项目被放入 firstQueue,然后三个专用线程之一将其移动到 secondQueue,最后另一个专用线程将其删除。这些动作显然包括一些处理。我需要能够获取任何项目的状态(IN_FIRSTAFTER_FIRSTIN_SECONDAFTER_SECONDABSENT),我通过以下方式手动实现了它对 statusMap 进行更新,其中队列被修改为

while (true) {
    Item i = firstQueue.take();
    statusMap.put(i, AFTER_FIRST);
    process(i);
    secondQueue.add(i);
    statusMap.put(i, IN_SECOND);
}

这行得通,但它很丑,并且会留下状态不一致的时间 window。不一致没什么大不了的,它可以通过同步来解决,但这可能会适得其反,因为队列的容量有限并且可能会阻塞。丑陋更让我烦恼。

效率几乎不重要,因为处理需要几秒钟。使用专用线程来控制并发性。任何项目都不应该处于多个状态(但这不是很重要,我目前的活泼方法也不能保证)。将会有更多的队列(和状态),它们会有不同的种类(DelayQueueArrayBlockingQueue,也许 PriorityQueue)。

我想知道是否有一个很好的解决方案可以推广到多个队列?

用逻辑包装队列来管理项目状态是否有意义?

public class QueueWrapper<E> implements BlockingQueue<E> {
    private Queue<E> myQueue = new LinkedBlockingQueue<>();
    private Map<E, Status> statusMap;

    public QueueWrapper(Map<E, Status> statusMap) {
        this.statusMap = statusMap;
    }

    [...]
    @Override
    public E take() throws InterruptedException {
        E result = myQueue.take();
        statusMap.put(result, Status.AFTER_FIRST);
        return result;
    }

这样状态管理总是与队列操作相关(并包含在队列操作中)...

显然 statusMap 需要同步,但这无论如何都是一个问题。

如前所述,包装队列或项目将是可行的解决方案或两者兼而有之。

public class ItemWrapper<E> {
   E item;
   Status status;
   public ItemWrapper(Item i, Status s){ ... }
   public setStatus(Status s){ ... }
   // not necessary if you use a queue wrapper (see queue wrapper)
   public boolean equals(Object obj) {
     if ( obj instanceof ItemWrapper)
       return item.equals(((ItemWrapper) obj).item) 
     return false;
   }
   public int hashCode(){
     return item;
   }
}
...
process(item) // process update status in the item
...

可能更好的方法(已经回答)是让 QueueWrapper 更新队列状态。为了好玩,我不使用状态图,但我使用以前的 itemwrapper 它看起来更干净(状态图也有效)。

public class QueueWrapper<E> implements Queue<E> {
  private Queue<ItemWrapper<E>> myQueue;
  static private Status inStatus; // FIRST
  static private Status outStatus; // AFTER_FIRST
  public QueueWrapper(Queue<E> myQueue, Status inStatus, Status outStatus) {...}
  @Override
  public boolean add(E e) {
    return myQueue.add(new ItemWrapper(e, inStatus));
  }
  @Override
  public E remove(){
    ItemWrapper<E> result = myQueue.remove();
    result.setStatus(outStatus)
    return result.item;
  }
  ...  
  }

您还可以使用 AOP 在队列中注入状态更新而不更改队列(状态映射应该比 itemwrapper 更合适)。

也许我没有很好地回答你的问题,因为一个简单的方法可以知道你的物品在哪里,可以使用 "contains" 功能检查每个队列。

我发现您的模型可能会在一致性、状态控制和缩放方面得到改进。

实现这一点的一种方法是将项目与您的状态耦合,将这对耦合入队和出队,并创建一种机制来确保状态更改。

我的建议如下图:

根据这个模型和你的例子,我们可以做到:

package Whosebug;

import java.util.concurrent.LinkedBlockingQueue;

import Whosebug.item.ItemState;
import Whosebug.task.CreatingTask;
import Whosebug.task.FirstMovingTask;
import Whosebug.task.SecondMovingTask;

public class Main {

    private static void startTask(String name, Runnable r){
        Thread t = new Thread(r, name);
        t.start();
    }

    public static void main(String[] args) {
        //create queues
        LinkedBlockingQueue<ItemState> firstQueue = new LinkedBlockingQueue<ItemState>();
        LinkedBlockingQueue<ItemState> secondQueue = new LinkedBlockingQueue<ItemState>();
        //start three threads
        startTask("Thread#1", new CreatingTask(firstQueue));
        startTask("Thread#2", new FirstMovingTask(firstQueue, secondQueue));
        startTask("Thread#3", new SecondMovingTask(secondQueue));
    }
}

每个任务在 ItemState:

上运行符合以下确认的操作 op()

one of three dedicated thread moves it to secondQueue and finally another dedicated thread removes it.

ItemState 是一个包含 Item 和您的 State 的不可变对象。这确保了 Item 和 State 值之间的一致性。

ItemState 已确认创建 self-controled 状态机制的下一个状态:

public class FirstMovingTask {
    //others codes
    protected void op() {
            try {
                //dequeue
                ItemState is0 = new ItemState(firstQueue.take());
                System.out.println("Item " + is0.getItem().getValue() + ": " + is0.getState().getValue());
                //process here
                //enqueue
                ItemState is1 = new ItemState(is0);
                secondQueue.add(is1);
                System.out.println("Item " + is1.getItem().getValue() + ": " + is1.getState().getValue());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    //others codes
}

使用 ItemState 实现:

public class ItemStateImpl implements ItemState {
    private final Item item;
    private final State state;

    public ItemStateImpl(Item i){
        this.item = i;
        this.state = new State();
    }

    public ItemStateImpl(ItemState is) {
        this.item = is.getItem();
        this.state = is.getState().next();
    }

    // gets attrs
}

所以这种方式可以构建更优雅、灵活和可扩展的解决方案。 可扩展,因为您可以控制更多状态,仅更改 next() 并推广移动任务以增加队列数量。

结果:

Item 0: AFTER_FIRST
Item 0: IN_FIRST
Item 0: IN_SECOND
Item 0: AFTER_SECOND
Item 1: IN_FIRST
Item 1: AFTER_FIRST
Item 1: IN_SECOND
Item 1: AFTER_SECOND
Item 2: IN_FIRST
Item 2: AFTER_FIRST
Item 2: IN_SECOND
... others

更新(06/07/2018):分析地图在搜索中的使用 使用比较器等等于值在地图中搜索可能不起作用,因为通常值和标识 (key/hash) 之间的映射不是 one-to-one(见下图)。通过这种方式,需要为搜索值创建一个排序列表,其结果为 O(n) (worst-case).

Item.getValuesHashCode():

private int getValuesHashCode(){
  return new HashCodeBuilder().append(value).hashCode();
}

在这种情况下,您必须保留 Vector<ItemState> 而不是 Item 并像 getValuesHashCode 的结果一样使用密钥。更改 state-control 的机制以保持项目的第一个引用和当前状态。见下文:

//Main.class
public static void main(String[] args) {
    ... others code ...

    //references repository
    ConcurrentHashMap<Integer, Vector<ItemState>> statesMap = new ConcurrentHashMap<Integer, Vector<ItemState>>();
    //start three threads
    startTask("Thread#1", new CreatingTask(firstQueue, statesMap));

    ... others code ...
}

//CreateTask.class
protected void op() throws InterruptedException {
    //create item
    ItemState is = new ItemStateImpl(new Item(i++, NameGenerator.name()));
    //put in monitor and enqueue
    int key = is.getHashValue();
    Vector<ItemState> items = map.get(key);
    if (items == null){
        items = new Vector<>();
        map.put(key, items);
    }
    items.add(is);
    //enqueue
    queue.put(is);
}

//FirstMovingTask.class
protected void op() throws InterruptedException{
    //dequeue
    ItemState is0 = firstQueue.take();
    //process
    ItemState is1 = process(is0.next());
    //enqueue 
    secondQueue.put(is1.next());
}

//ItemState.class
public ItemState next() {
    //required for consistent change state
    synchronized (state) {
        state = state.next();
        return this;
    }
}

要搜索,您必须使用 concurrentMapRef.get(key)。结果将引用更新的 ItemState。

我的测试结果:

# key = hash("a")
# concurrentMapRef.get(key)
...
Item#7#0    : a - IN_FIRST 
... many others lines
Item#7#0    : a - AFTER_FIRST 
Item#12#1   : a - IN_FIRST 
... many others lines
Item#7#0    : a - IN_SECOND 
Item#12#1   : a - IN_FIRST 
... many others lines
Item#7#0    : a - AFTER_SECOND 
Item#12#1   : a - IN_FIRST 

代码中有更多详细信息:https://github.com/ag-studies/Whosebug-queue

2018 年 6 月 9 日更新:重新设计

概括这个项目,我可以理解状态机是这样的:

通过这种方式,我解耦了队列的工作人员以改进概念。我使用 MemoryRep 来保持整个过程中项目的唯一引用。 当然,如果您需要将 ItemState 保存在物理存储库中,您可以使用策略 event-based。

这保留了之前的想法,并为概念创造了更多的易读性。看到这个:

我了解到每个作业都会有两个队列(input/output)并且跟一个业务模型有关系! 研究人员总是会找到 Item 的最新和一致的状态。

所以,回答你的问题:

  • 我可以在任何地方使用 MemoryRep(基本上是一个 Map)找到 Item 的一致状态,在 ItemState[= 中包装状态和项目105=],并控制作业的更改状态入队或出队。

  • 除了 next()

  • 的 运行 之外,性能保持不变
  • 状态始终一致(针对您的问题)

  • 在此模型中,可以使用任何队列类型、任意数量的 jobs/queues 和任意数量的状态。

  • 另外这个很漂亮!!

这里有一点和别人说的不一样。从队列服务和系统的世界中,我们有消息确认的概念。这很好,因为它还为您提供了一些内置的重试逻辑。

我将从高层次阐述它的工作原理,如果您需要,我可以添加代码。

基本上,您的每个队列都会有一个 Set。您会将队列包装在一个对象中,这样当您将一个项目从队列中取出时,就会发生一些事情

  1. 项目已从队列中删除
  2. 项目已添加到关联集
  3. 一项任务(lambda 包含一个原子布尔值(默认为 false))已安排。当 运行 时,它将从集合中删除项目,如果布尔值为 false,则将其放回队列中
  4. 布尔值周围的项和包装器 returned 给调用者

一旦 process(i); 完成,您的代码将向包装器指示接收确认,并且包装器将从集合中删除项目并使布尔值变为 false。

return 状态的方法将简单地检查哪个队列或设置项目在其中。

请注意,这会提供 "at least once" 交付,这意味着一个项目将至少处理一次,但如果处理时间太接近超时,则可能不止一次。