防止线程在 java 中进行重复处理

Preventing thread from duplicate processing in java

问题陈述

我有一个 JMS 监听器 运行 作为线程监听一个主题。一旦收到消息,我就会生成一个新的 Thread 来处理入界消息。因此,对于每条传入消息,我都会生成一个新的 Thread.
我有一个场景,当它按顺序立即注入时,重复的消息也会被处理。我需要阻止它被处理。我尝试使用 ConcurrentHashMap 来保存我在 Thread 生成后立即添加条目的过程时间,并在 Thread 完成其执行后立即将其从地图中删除。但是当我尝试以并发方式一个接一个地传入同一个场景时,它并没有帮助。

在您深入实际代码库之前我的问题的概要

onMessage(){
    processIncomingMessage(){
        ExecutorService executorService = Executors.newFixedThreadPool(1000);
            //Map is used to make an entry before i spawn a new thread to process incoming message
            //Map contains "Key as the incoming message" and "value as boolean"
            //check map for duplicate check
            //The below check is failing and allowing duplicate messages to be processed in parallel
        if(entryisPresentInMap){ 
                //return doing nothing
        }else{
                //spawn a new thread for each incoming message
                //also ensure a duplicate message being processed when it in process by an active thread
        executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //actuall business logic
                    }finally{
                        //remove entry from the map so after processing is done with the message
                    }

                }
        }
    }

模拟场景的独立示例

public class DuplicateCheck {

private static Map<String,Boolean> duplicateCheckMap =
        new ConcurrentHashMap<String,Boolean>(1000);

private static String name=null;
private static String[] nameArray = new String[20];
public static void processMessage(String message){
    System.out.println("Processed message =" +message);

}

public static void main(String args[]){
    nameArray[0] = "Peter";
    nameArray[1] = "Peter";
    nameArray[2] = "Adam";
    for(int i=0;i<=nameArray.length;i++){
    name=nameArray[i];
    if(duplicateCheckMap.get(name)!=null  && duplicateCheckMap.get(name)){
        System.out.println("Thread detected for processing your name ="+name);
        return;
    }
    addNameIntoMap(name);
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                processMessage(name);
            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                freeNameFromMap(name);
            }
        }
    }).start();
    }
}

private static synchronized void addNameIntoMap(String name) {
    if (name != null) {
        duplicateCheckMap.put(name, true);
        System.out.println("Thread processing the "+name+" is added to the status map");
    }
}

private static synchronized void freeNameFromMap(String name) {
    if (name != null) {
        duplicateCheckMap.remove(name);
        System.out.println("Thread processing the "+name+" is released from the status map");
    }
}

代码片段如下

public void processControlMessage(final Message message) {
    RDPWorkflowControlMessage rdpWorkflowControlMessage=    unmarshallControlMessage(message);
    final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
    final String controlMessageEvent=rdpWorkflowControlMessage.getControlMessage().value();
    if(controlMessageStateMap.get(workflowName)!=null && controlMessageStateMap.get(workflowName)){
        log.info("Cache cleanup for the workflow :"+workflowName+" is already in progress");
        return;
    }else {
        log.info("doing nothing");
    }
    Semaphore controlMessageLock = new Semaphore(1); 
    try{
    controlMessageLock.acquire();
    synchronized(this){
        new Thread(new Runnable(){
            @Override
            public void run() {
                try {
                    lock.lock();
                    log.info("Processing Workflow Control Message for the workflow :"+workflowName);
                    if (message instanceof TextMessage) {
                    if ("REFRESH".equalsIgnoreCase(controlMessageEvent)) {
                        clearControlMessageBuffer();
                        enableControlMessageStatus(workflowName);
                        List<String> matchingValues=new ArrayList<String>();
                        matchingValues.add(workflowName);
                        ConcreteSetDAO tasksSetDAO=taskEventListener.getConcreteSetDAO();
                        ConcreteSetDAO workflowSetDAO=workflowEventListener.getConcreteSetDAO();
                        tasksSetDAO.deleteMatchingRecords(matchingValues);
                        workflowSetDAO.deleteMatchingRecords(matchingValues);
                        fetchNewWorkflowItems();
                        addShutdownHook(workflowName);
                        }
                    }
                } catch (Exception e) {
                    log.error("Error extracting item of type RDPWorkflowControlMessage from message "
                            + message);
                } finally {
                    disableControlMessageStatus(workflowName);
                    lock.unlock();
                }
            }
        }).start();
    }
    } catch (InterruptedException ie) {
        log.info("Interrupted Exception during control message lock acquisition"+ie);
    }finally{
        controlMessageLock.release();
    }
}

private void addShutdownHook(final String workflowName) {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            disableControlMessageStatus(workflowName);
        }
    });
    log.info("Shut Down Hook Attached for the thread processing the workflow :"+workflowName);
}

private RDPWorkflowControlMessage unmarshallControlMessage(Message message) {
    RDPWorkflowControlMessage rdpWorkflowControlMessage = null;
    try {
        TextMessage textMessage = (TextMessage) message;
        rdpWorkflowControlMessage = marshaller.unmarshalItem(textMessage.getText(), RDPWorkflowControlMessage.class);
    } catch (Exception e) {
        log.error("Error extracting item of type RDPWorkflowTask from message "
                + message);
    }
    return rdpWorkflowControlMessage;
}

private void fetchNewWorkflowItems() {
    initSSL();
    List<RDPWorkflowTask> allTasks=initAllTasks();
    taskEventListener.addRDPWorkflowTasks(allTasks);
    workflowEventListener.updateWorkflowStatus(allTasks);
}

private void clearControlMessageBuffer() {
    taskEventListener.getRecordsForUpdate().clear();
    workflowEventListener.getRecordsForUpdate().clear();
}

private synchronized void enableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.put(workflowName, true);
        log.info("Thread processing the "+workflowName+" is added to the status map");
    }
}

private synchronized void disableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.remove(workflowName);
        log.info("Thread processing the "+workflowName+" is released from the status map");
    }
}

我已修改我的代码以纳入下面提供的建议,但它仍然无法正常工作

public void processControlMessage(final Message message) {
    ExecutorService executorService = Executors.newFixedThreadPool(1000);
    try{
        lock.lock();
        RDPWorkflowControlMessage rdpWorkflowControlMessage=    unmarshallControlMessage(message);
        final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
        final String controlMessageEvent=rdpWorkflowControlMessage.getControlMessage().value();
        if(controlMessageStateMap.get(workflowName)!=null && controlMessageStateMap.get(workflowName)){
            log.info("Cache cleanup for the workflow :"+workflowName+" is already in progress");
            return;
        }else {
            log.info("doing nothing");
        }
        enableControlMessageStatus(workflowName);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    //actual code
                        fetchNewWorkflowItems();
                        addShutdownHook(workflowName);
                        }
                    }
                } catch (Exception e) {
                    log.error("Error extracting item of type RDPWorkflowControlMessage from message "
                            + message);
                } finally {
                    disableControlMessageStatus(workflowName);
                }
            }
        });
    } finally {
        executorService.shutdown();
        lock.unlock();
    }
}

private void addShutdownHook(final String workflowName) {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            disableControlMessageStatus(workflowName);
        }
    });
    log.info("Shut Down Hook Attached for the thread processing the workflow :"+workflowName);
}

private synchronized void enableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.put(workflowName, true);
        log.info("Thread processing the "+workflowName+" is added to the status map");
    }
}

private synchronized void disableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.remove(workflowName);
        log.info("Thread processing the "+workflowName+" is released from the status map");
    }
}

这是向地图添加值的方式。这种双重检查可确保只有一个线程在任何特定时刻向映射添加值,并且您可以在之后控制访问。之后删除所有锁定逻辑。就这么简单

public void processControlMessage(final  String workflowName) {
    if(!tryAddingMessageInProcessingMap(workflowName)){
           Thread.sleep(1000); // sleep 1 sec and try again
            processControlMessage(workflowName);
        return ;
    }
     System.out.println(workflowName);
     try{
         // your code goes here
     } finally{
         controlMessageStateMap.remove(workflowName);
     }
}

private boolean tryAddingMessageInProcessingMap(final String workflowName) {
    if(controlMessageStateMap .get(workflowName)==null){
        synchronized (this) {
             if(controlMessageStateMap .get(workflowName)==null){
                 controlMessageStateMap.put(workflowName, true);
                 return true;
             }
        }
    }
    return false;
}

阅读此处了解更多 https://en.wikipedia.org/wiki/Double-checked_locking

问题现已解决。非常感谢@awsome 提供的方法。当线程已经在处理传入的重复消息时,它正在避免重复。如果没有线程正在处理,那么它会被拾取

public void processControlMessage(final Message message) {
    try {
        lock.lock();
        RDPWorkflowControlMessage rdpWorkflowControlMessage = unmarshallControlMessage(message);
        final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
        final String controlMessageEvent = rdpWorkflowControlMessage.getControlMessage().value();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    if (message instanceof TextMessage) {
                        if ("REFRESH".equalsIgnoreCase(controlMessageEvent)) {
                            if (tryAddingWorkflowNameInStatusMap(workflowName)) {
                                log.info("Processing Workflow Control Message for the workflow :"+ workflowName);
                                addShutdownHook(workflowName);
                                clearControlMessageBuffer();
                                List<String> matchingValues = new ArrayList<String>();
                                matchingValues.add(workflowName);
                                ConcreteSetDAO tasksSetDAO = taskEventListener.getConcreteSetDAO();
                                ConcreteSetDAO workflowSetDAO = workflowEventListener.getConcreteSetDAO();
                                tasksSetDAO.deleteMatchingRecords(matchingValues);
                                workflowSetDAO.deleteMatchingRecords(matchingValues);
                                List<RDPWorkflowTask> allTasks=fetchNewWorkflowItems(workflowName);
                                updateTasksAndWorkflowSet(allTasks);
                                removeWorkflowNameFromProcessingMap(workflowName);

                            } else {
                                log.info("Cache clean up is already in progress for the workflow ="+ workflowName);
                                return;
                            }
                        }
                    }
                } catch (Exception e) {
                    log.error("Error extracting item of type RDPWorkflowControlMessage from message "
                            + message);
                }
            }
        }).start();
    } finally {
        lock.unlock();
    }
}

private boolean tryAddingWorkflowNameInStatusMap(final String workflowName) {
    if(controlMessageStateMap.get(workflowName)==null){
        synchronized (this) {
             if(controlMessageStateMap.get(workflowName)==null){
                 log.info("Adding an entry in to the map for the workflow ="+workflowName);
                 controlMessageStateMap.put(workflowName, true);
                 return true;
             }
        }
    }
    return false;
}

private synchronized void removeWorkflowNameFromProcessingMap(String workflowName) {
    if (workflowName != null
            && (controlMessageStateMap.get(workflowName) != null && controlMessageStateMap
                    .get(workflowName))) {
        controlMessageStateMap.remove(workflowName);
        log.info("Thread processing the " + workflowName+ " is released from the status map");
    }
}