Предотвращение потока от дублирования обработки в Java

Постановка задачи

у меня есть JMS слушатель работает как нить, слушая тему. Как только приходит сообщение, я создаю новый Thread обрабатывать ограниченное сообщение. Так что для каждого входящего сообщения я порождаю новый Thread,
У меня есть сценарий, когда дублирующее сообщение также обрабатывается, когда оно вводится немедленно в последовательном порядке. Мне нужно, чтобы это не обрабатывалось. Я пытался с помощью ConcurrentHashMap провести время процесса, где я добавляю в запись, как только Thread икру и удали его с карты как только Thread завершает его выполнение. Но это не помогло, когда я попробовал сценарий, в котором я проходил одно и то же одно за другим одновременно.

Общие сведения о моей проблеме, прежде чем погрузиться в реальную базу кода

onMessage(){
    processIncomingMessage(){
        ExecutorService executorService = Executors.newFixedThreadPool(1000);
            //Map is used to make an entry before i spawn a new thread to process incoming message
            //Map contains "Key as the incoming message" and "value as boolean"
            //check map for duplicate check
            //The below check is failing and allowing duplicate messages to be processed in parallel
        if(entryisPresentInMap){ 
                //return doing nothing
        }else{
                //spawn a new thread for each incoming message
                //also ensure a duplicate message being processed when it in process by an active thread
        executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //actuall business logic
                    }finally{
                        //remove entry from the map so after processing is done with the message
                    }

                }
        }
    }

Автономный пример для имитации сценария

public class DuplicateCheck {

private static Map<String,Boolean> duplicateCheckMap =
        new ConcurrentHashMap<String,Boolean>(1000);

private static String name=null;
private static String[] nameArray = new String[20];
public static void processMessage(String message){
    System.out.println("Processed message =" +message);

}

public static void main(String args[]){
    nameArray[0] = "Peter";
    nameArray[1] = "Peter";
    nameArray[2] = "Adam";
    for(int i=0;i<=nameArray.length;i++){
    name=nameArray[i];
    if(duplicateCheckMap.get(name)!=null  && duplicateCheckMap.get(name)){
        System.out.println("Thread detected for processing your name ="+name);
        return;
    }
    addNameIntoMap(name);
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                processMessage(name);
            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                freeNameFromMap(name);
            }
        }
    }).start();
    }
}

private static synchronized void addNameIntoMap(String name) {
    if (name != null) {
        duplicateCheckMap.put(name, true);
        System.out.println("Thread processing the "+name+" is added to the status map");
    }
}

private static synchronized void freeNameFromMap(String name) {
    if (name != null) {
        duplicateCheckMap.remove(name);
        System.out.println("Thread processing the "+name+" is released from the status map");
    }
}

Фрагмент кода ниже

public void processControlMessage(final Message message) {
    RDPWorkflowControlMessage rdpWorkflowControlMessage=    unmarshallControlMessage(message);
    final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
    final String controlMessageEvent=rdpWorkflowControlMessage.getControlMessage().value();
    if(controlMessageStateMap.get(workflowName)!=null && controlMessageStateMap.get(workflowName)){
        log.info("Cache cleanup for the workflow :"+workflowName+" is already in progress");
        return;
    }else {
        log.info("doing nothing");
    }
    Semaphore controlMessageLock = new Semaphore(1); 
    try{
    controlMessageLock.acquire();
    synchronized(this){
        new Thread(new Runnable(){
            @Override
            public void run() {
                try {
                    lock.lock();
                    log.info("Processing Workflow Control Message for the workflow :"+workflowName);
                    if (message instanceof TextMessage) {
                    if ("REFRESH".equalsIgnoreCase(controlMessageEvent)) {
                        clearControlMessageBuffer();
                        enableControlMessageStatus(workflowName);
                        List<String> matchingValues=new ArrayList<String>();
                        matchingValues.add(workflowName);
                        ConcreteSetDAO tasksSetDAO=taskEventListener.getConcreteSetDAO();
                        ConcreteSetDAO workflowSetDAO=workflowEventListener.getConcreteSetDAO();
                        tasksSetDAO.deleteMatchingRecords(matchingValues);
                        workflowSetDAO.deleteMatchingRecords(matchingValues);
                        fetchNewWorkflowItems();
                        addShutdownHook(workflowName);
                        }
                    }
                } catch (Exception e) {
                    log.error("Error extracting item of type RDPWorkflowControlMessage from message "
                            + message);
                } finally {
                    disableControlMessageStatus(workflowName);
                    lock.unlock();
                }
            }
        }).start();
    }
    } catch (InterruptedException ie) {
        log.info("Interrupted Exception during control message lock acquisition"+ie);
    }finally{
        controlMessageLock.release();
    }
}

private void addShutdownHook(final String workflowName) {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            disableControlMessageStatus(workflowName);
        }
    });
    log.info("Shut Down Hook Attached for the thread processing the workflow :"+workflowName);
}

private RDPWorkflowControlMessage unmarshallControlMessage(Message message) {
    RDPWorkflowControlMessage rdpWorkflowControlMessage = null;
    try {
        TextMessage textMessage = (TextMessage) message;
        rdpWorkflowControlMessage = marshaller.unmarshalItem(textMessage.getText(), RDPWorkflowControlMessage.class);
    } catch (Exception e) {
        log.error("Error extracting item of type RDPWorkflowTask from message "
                + message);
    }
    return rdpWorkflowControlMessage;
}

private void fetchNewWorkflowItems() {
    initSSL();
    List<RDPWorkflowTask> allTasks=initAllTasks();
    taskEventListener.addRDPWorkflowTasks(allTasks);
    workflowEventListener.updateWorkflowStatus(allTasks);
}

private void clearControlMessageBuffer() {
    taskEventListener.getRecordsForUpdate().clear();
    workflowEventListener.getRecordsForUpdate().clear();
}

private synchronized void enableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.put(workflowName, true);
        log.info("Thread processing the "+workflowName+" is added to the status map");
    }
}

private synchronized void disableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.remove(workflowName);
        log.info("Thread processing the "+workflowName+" is released from the status map");
    }
}

Я изменил свой код, чтобы включить предложения, представленные ниже, но все же он не работает

public void processControlMessage(final Message message) {
    ExecutorService executorService = Executors.newFixedThreadPool(1000);
    try{
        lock.lock();
        RDPWorkflowControlMessage rdpWorkflowControlMessage=    unmarshallControlMessage(message);
        final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
        final String controlMessageEvent=rdpWorkflowControlMessage.getControlMessage().value();
        if(controlMessageStateMap.get(workflowName)!=null && controlMessageStateMap.get(workflowName)){
            log.info("Cache cleanup for the workflow :"+workflowName+" is already in progress");
            return;
        }else {
            log.info("doing nothing");
        }
        enableControlMessageStatus(workflowName);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    //actual code
                        fetchNewWorkflowItems();
                        addShutdownHook(workflowName);
                        }
                    }
                } catch (Exception e) {
                    log.error("Error extracting item of type RDPWorkflowControlMessage from message "
                            + message);
                } finally {
                    disableControlMessageStatus(workflowName);
                }
            }
        });
    } finally {
        executorService.shutdown();
        lock.unlock();
    }
}

private void addShutdownHook(final String workflowName) {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            disableControlMessageStatus(workflowName);
        }
    });
    log.info("Shut Down Hook Attached for the thread processing the workflow :"+workflowName);
}

private synchronized void enableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.put(workflowName, true);
        log.info("Thread processing the "+workflowName+" is added to the status map");
    }
}

private synchronized void disableControlMessageStatus(String workflowName) {
    if (workflowName != null) {
        controlMessageStateMap.remove(workflowName);
        log.info("Thread processing the "+workflowName+" is released from the status map");
    }
}

2 ответа

Решение

Проблема устранена. Большое спасибо @awsome за подход. Это позволяет избежать дубликатов, когда поток уже обрабатывает входящее дублированное сообщение. Если ни один поток не обрабатывается, то он берется

public void processControlMessage(final Message message) {
    try {
        lock.lock();
        RDPWorkflowControlMessage rdpWorkflowControlMessage = unmarshallControlMessage(message);
        final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
        final String controlMessageEvent = rdpWorkflowControlMessage.getControlMessage().value();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    if (message instanceof TextMessage) {
                        if ("REFRESH".equalsIgnoreCase(controlMessageEvent)) {
                            if (tryAddingWorkflowNameInStatusMap(workflowName)) {
                                log.info("Processing Workflow Control Message for the workflow :"+ workflowName);
                                addShutdownHook(workflowName);
                                clearControlMessageBuffer();
                                List<String> matchingValues = new ArrayList<String>();
                                matchingValues.add(workflowName);
                                ConcreteSetDAO tasksSetDAO = taskEventListener.getConcreteSetDAO();
                                ConcreteSetDAO workflowSetDAO = workflowEventListener.getConcreteSetDAO();
                                tasksSetDAO.deleteMatchingRecords(matchingValues);
                                workflowSetDAO.deleteMatchingRecords(matchingValues);
                                List<RDPWorkflowTask> allTasks=fetchNewWorkflowItems(workflowName);
                                updateTasksAndWorkflowSet(allTasks);
                                removeWorkflowNameFromProcessingMap(workflowName);

                            } else {
                                log.info("Cache clean up is already in progress for the workflow ="+ workflowName);
                                return;
                            }
                        }
                    }
                } catch (Exception e) {
                    log.error("Error extracting item of type RDPWorkflowControlMessage from message "
                            + message);
                }
            }
        }).start();
    } finally {
        lock.unlock();
    }
}

private boolean tryAddingWorkflowNameInStatusMap(final String workflowName) {
    if(controlMessageStateMap.get(workflowName)==null){
        synchronized (this) {
             if(controlMessageStateMap.get(workflowName)==null){
                 log.info("Adding an entry in to the map for the workflow ="+workflowName);
                 controlMessageStateMap.put(workflowName, true);
                 return true;
             }
        }
    }
    return false;
}

private synchronized void removeWorkflowNameFromProcessingMap(String workflowName) {
    if (workflowName != null
            && (controlMessageStateMap.get(workflowName) != null && controlMessageStateMap
                    .get(workflowName))) {
        controlMessageStateMap.remove(workflowName);
        log.info("Thread processing the " + workflowName+ " is released from the status map");
    }
}

Вот как вы должны добавить значение на карту. Эта двойная проверка гарантирует, что только один поток добавляет значение на карту в любой конкретный момент времени, и вы можете контролировать доступ впоследствии. Удалите всю логику блокировки впоследствии. Это так просто

public void processControlMessage(final  String workflowName) {
    if(!tryAddingMessageInProcessingMap(workflowName)){
           Thread.sleep(1000); // sleep 1 sec and try again
            processControlMessage(workflowName);
        return ;
    }
     System.out.println(workflowName);
     try{
         // your code goes here
     } finally{
         controlMessageStateMap.remove(workflowName);
     }
}

private boolean tryAddingMessageInProcessingMap(final String workflowName) {
    if(controlMessageStateMap .get(workflowName)==null){
        synchronized (this) {
             if(controlMessageStateMap .get(workflowName)==null){
                 controlMessageStateMap.put(workflowName, true);
                 return true;
             }
        }
    }
    return false;
}

Читайте здесь больше для https://en.wikipedia.org/wiki/Double-checked_locking

Другие вопросы по тегам