Предотвращение потока от дублирования обработки в Java
Постановка задачи
у меня есть JMS
слушатель работает как нить, слушая тему. Как только приходит сообщение, я создаю новый Thread
обрабатывать ограниченное сообщение. Так что для каждого входящего сообщения я порождаю новый Thread
,
У меня есть сценарий, когда дублирующее сообщение также обрабатывается, когда оно вводится немедленно в последовательном порядке. Мне нужно, чтобы это не обрабатывалось. Я пытался с помощью ConcurrentHashMap
провести время процесса, где я добавляю в запись, как только Thread
икру и удали его с карты как только Thread
завершает его выполнение. Но это не помогло, когда я попробовал сценарий, в котором я проходил одно и то же одно за другим одновременно.
Общие сведения о моей проблеме, прежде чем погрузиться в реальную базу кода
onMessage(){
processIncomingMessage(){
ExecutorService executorService = Executors.newFixedThreadPool(1000);
//Map is used to make an entry before i spawn a new thread to process incoming message
//Map contains "Key as the incoming message" and "value as boolean"
//check map for duplicate check
//The below check is failing and allowing duplicate messages to be processed in parallel
if(entryisPresentInMap){
//return doing nothing
}else{
//spawn a new thread for each incoming message
//also ensure a duplicate message being processed when it in process by an active thread
executorService.execute(new Runnable() {
@Override
public void run() {
try {
//actuall business logic
}finally{
//remove entry from the map so after processing is done with the message
}
}
}
}
Автономный пример для имитации сценария
public class DuplicateCheck {
private static Map<String,Boolean> duplicateCheckMap =
new ConcurrentHashMap<String,Boolean>(1000);
private static String name=null;
private static String[] nameArray = new String[20];
public static void processMessage(String message){
System.out.println("Processed message =" +message);
}
public static void main(String args[]){
nameArray[0] = "Peter";
nameArray[1] = "Peter";
nameArray[2] = "Adam";
for(int i=0;i<=nameArray.length;i++){
name=nameArray[i];
if(duplicateCheckMap.get(name)!=null && duplicateCheckMap.get(name)){
System.out.println("Thread detected for processing your name ="+name);
return;
}
addNameIntoMap(name);
new Thread(new Runnable() {
@Override
public void run() {
try {
processMessage(name);
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
freeNameFromMap(name);
}
}
}).start();
}
}
private static synchronized void addNameIntoMap(String name) {
if (name != null) {
duplicateCheckMap.put(name, true);
System.out.println("Thread processing the "+name+" is added to the status map");
}
}
private static synchronized void freeNameFromMap(String name) {
if (name != null) {
duplicateCheckMap.remove(name);
System.out.println("Thread processing the "+name+" is released from the status map");
}
}
Фрагмент кода ниже
public void processControlMessage(final Message message) {
RDPWorkflowControlMessage rdpWorkflowControlMessage= unmarshallControlMessage(message);
final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
final String controlMessageEvent=rdpWorkflowControlMessage.getControlMessage().value();
if(controlMessageStateMap.get(workflowName)!=null && controlMessageStateMap.get(workflowName)){
log.info("Cache cleanup for the workflow :"+workflowName+" is already in progress");
return;
}else {
log.info("doing nothing");
}
Semaphore controlMessageLock = new Semaphore(1);
try{
controlMessageLock.acquire();
synchronized(this){
new Thread(new Runnable(){
@Override
public void run() {
try {
lock.lock();
log.info("Processing Workflow Control Message for the workflow :"+workflowName);
if (message instanceof TextMessage) {
if ("REFRESH".equalsIgnoreCase(controlMessageEvent)) {
clearControlMessageBuffer();
enableControlMessageStatus(workflowName);
List<String> matchingValues=new ArrayList<String>();
matchingValues.add(workflowName);
ConcreteSetDAO tasksSetDAO=taskEventListener.getConcreteSetDAO();
ConcreteSetDAO workflowSetDAO=workflowEventListener.getConcreteSetDAO();
tasksSetDAO.deleteMatchingRecords(matchingValues);
workflowSetDAO.deleteMatchingRecords(matchingValues);
fetchNewWorkflowItems();
addShutdownHook(workflowName);
}
}
} catch (Exception e) {
log.error("Error extracting item of type RDPWorkflowControlMessage from message "
+ message);
} finally {
disableControlMessageStatus(workflowName);
lock.unlock();
}
}
}).start();
}
} catch (InterruptedException ie) {
log.info("Interrupted Exception during control message lock acquisition"+ie);
}finally{
controlMessageLock.release();
}
}
private void addShutdownHook(final String workflowName) {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
disableControlMessageStatus(workflowName);
}
});
log.info("Shut Down Hook Attached for the thread processing the workflow :"+workflowName);
}
private RDPWorkflowControlMessage unmarshallControlMessage(Message message) {
RDPWorkflowControlMessage rdpWorkflowControlMessage = null;
try {
TextMessage textMessage = (TextMessage) message;
rdpWorkflowControlMessage = marshaller.unmarshalItem(textMessage.getText(), RDPWorkflowControlMessage.class);
} catch (Exception e) {
log.error("Error extracting item of type RDPWorkflowTask from message "
+ message);
}
return rdpWorkflowControlMessage;
}
private void fetchNewWorkflowItems() {
initSSL();
List<RDPWorkflowTask> allTasks=initAllTasks();
taskEventListener.addRDPWorkflowTasks(allTasks);
workflowEventListener.updateWorkflowStatus(allTasks);
}
private void clearControlMessageBuffer() {
taskEventListener.getRecordsForUpdate().clear();
workflowEventListener.getRecordsForUpdate().clear();
}
private synchronized void enableControlMessageStatus(String workflowName) {
if (workflowName != null) {
controlMessageStateMap.put(workflowName, true);
log.info("Thread processing the "+workflowName+" is added to the status map");
}
}
private synchronized void disableControlMessageStatus(String workflowName) {
if (workflowName != null) {
controlMessageStateMap.remove(workflowName);
log.info("Thread processing the "+workflowName+" is released from the status map");
}
}
Я изменил свой код, чтобы включить предложения, представленные ниже, но все же он не работает
public void processControlMessage(final Message message) {
ExecutorService executorService = Executors.newFixedThreadPool(1000);
try{
lock.lock();
RDPWorkflowControlMessage rdpWorkflowControlMessage= unmarshallControlMessage(message);
final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
final String controlMessageEvent=rdpWorkflowControlMessage.getControlMessage().value();
if(controlMessageStateMap.get(workflowName)!=null && controlMessageStateMap.get(workflowName)){
log.info("Cache cleanup for the workflow :"+workflowName+" is already in progress");
return;
}else {
log.info("doing nothing");
}
enableControlMessageStatus(workflowName);
executorService.execute(new Runnable() {
@Override
public void run() {
try {
//actual code
fetchNewWorkflowItems();
addShutdownHook(workflowName);
}
}
} catch (Exception e) {
log.error("Error extracting item of type RDPWorkflowControlMessage from message "
+ message);
} finally {
disableControlMessageStatus(workflowName);
}
}
});
} finally {
executorService.shutdown();
lock.unlock();
}
}
private void addShutdownHook(final String workflowName) {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
disableControlMessageStatus(workflowName);
}
});
log.info("Shut Down Hook Attached for the thread processing the workflow :"+workflowName);
}
private synchronized void enableControlMessageStatus(String workflowName) {
if (workflowName != null) {
controlMessageStateMap.put(workflowName, true);
log.info("Thread processing the "+workflowName+" is added to the status map");
}
}
private synchronized void disableControlMessageStatus(String workflowName) {
if (workflowName != null) {
controlMessageStateMap.remove(workflowName);
log.info("Thread processing the "+workflowName+" is released from the status map");
}
}
2 ответа
Проблема устранена. Большое спасибо @awsome за подход. Это позволяет избежать дубликатов, когда поток уже обрабатывает входящее дублированное сообщение. Если ни один поток не обрабатывается, то он берется
public void processControlMessage(final Message message) {
try {
lock.lock();
RDPWorkflowControlMessage rdpWorkflowControlMessage = unmarshallControlMessage(message);
final String workflowName = rdpWorkflowControlMessage.getWorkflowName();
final String controlMessageEvent = rdpWorkflowControlMessage.getControlMessage().value();
new Thread(new Runnable() {
@Override
public void run() {
try {
if (message instanceof TextMessage) {
if ("REFRESH".equalsIgnoreCase(controlMessageEvent)) {
if (tryAddingWorkflowNameInStatusMap(workflowName)) {
log.info("Processing Workflow Control Message for the workflow :"+ workflowName);
addShutdownHook(workflowName);
clearControlMessageBuffer();
List<String> matchingValues = new ArrayList<String>();
matchingValues.add(workflowName);
ConcreteSetDAO tasksSetDAO = taskEventListener.getConcreteSetDAO();
ConcreteSetDAO workflowSetDAO = workflowEventListener.getConcreteSetDAO();
tasksSetDAO.deleteMatchingRecords(matchingValues);
workflowSetDAO.deleteMatchingRecords(matchingValues);
List<RDPWorkflowTask> allTasks=fetchNewWorkflowItems(workflowName);
updateTasksAndWorkflowSet(allTasks);
removeWorkflowNameFromProcessingMap(workflowName);
} else {
log.info("Cache clean up is already in progress for the workflow ="+ workflowName);
return;
}
}
}
} catch (Exception e) {
log.error("Error extracting item of type RDPWorkflowControlMessage from message "
+ message);
}
}
}).start();
} finally {
lock.unlock();
}
}
private boolean tryAddingWorkflowNameInStatusMap(final String workflowName) {
if(controlMessageStateMap.get(workflowName)==null){
synchronized (this) {
if(controlMessageStateMap.get(workflowName)==null){
log.info("Adding an entry in to the map for the workflow ="+workflowName);
controlMessageStateMap.put(workflowName, true);
return true;
}
}
}
return false;
}
private synchronized void removeWorkflowNameFromProcessingMap(String workflowName) {
if (workflowName != null
&& (controlMessageStateMap.get(workflowName) != null && controlMessageStateMap
.get(workflowName))) {
controlMessageStateMap.remove(workflowName);
log.info("Thread processing the " + workflowName+ " is released from the status map");
}
}
Вот как вы должны добавить значение на карту. Эта двойная проверка гарантирует, что только один поток добавляет значение на карту в любой конкретный момент времени, и вы можете контролировать доступ впоследствии. Удалите всю логику блокировки впоследствии. Это так просто
public void processControlMessage(final String workflowName) {
if(!tryAddingMessageInProcessingMap(workflowName)){
Thread.sleep(1000); // sleep 1 sec and try again
processControlMessage(workflowName);
return ;
}
System.out.println(workflowName);
try{
// your code goes here
} finally{
controlMessageStateMap.remove(workflowName);
}
}
private boolean tryAddingMessageInProcessingMap(final String workflowName) {
if(controlMessageStateMap .get(workflowName)==null){
synchronized (this) {
if(controlMessageStateMap .get(workflowName)==null){
controlMessageStateMap.put(workflowName, true);
return true;
}
}
}
return false;
}
Читайте здесь больше для https://en.wikipedia.org/wiki/Double-checked_locking