Объединить putIfAbsent и заменить на ConcurrentMap

У меня есть случай, когда я должен

  • вставить новое значение, если ключ не существует в ConcurrentHashMap
  • замените старое значение новым значением, если ключ уже существует в ConcurrentHashMap, где новое значение получено из старого значения (не дорогая операция)

У меня есть следующий код, чтобы предложить:

public void insertOrReplace(String key, String value) {
        boolean updated = false;
        do {
            String oldValue = concurrentMap.get(key);
            if (oldValue == null) {
                oldValue = concurrentMap.putIfAbsent(key, value);
                if (oldValue == null) {
                    updated = true;
                }
            }
            if (oldValue != null) {
                final String newValue = recalculateNewValue(oldValue, value);
                updated = concurrentMap.replace(key, oldValue, newValue);
            }
        } while (!updated);
    }

Как вы думаете, это правильно и потокобезопасно?

Есть ли более простой способ?

4 ответа

Решение

Вы можете сделать это немного короче с кодом ниже, который эквивалентен вашему. Я провел небольшое стресс-тестирование с тысячами потоков, обращающихся к нему одновременно: он работает, как и ожидалось, с несколькими повторными попытками (циклами) (очевидно, вы никогда не сможете доказать правильность при тестировании в параллельном мире).

public void insertOrReplace(String key, String value) {
    for (;;) {
        String oldValue = concurrentMap.putIfAbsent(key, value);
        if (oldValue == null)
            return;

        final String newValue = recalculateNewValue(oldValue, value);
        if (concurrentMap.replace(key, oldValue, newValue))
            return;
    }
}

Я не думаю, что это правильно. Насколько я понимаю, метод merge() будет правильным инструментом для работы. В настоящее время у меня та же проблема, и я написал небольшой тест, чтобы увидеть результаты.

Этот тест начинает 100 рабочих. Каждый из них увеличивает значение на карте в 100 раз. Таким образом, ожидаемый результат будет 10000.

Есть два типа работников. Тот, который использует алгоритм замены, и который использует слияние. Тест запускается два раза с разными реализациями.

import java.util.concurrent.ArrayBlockingQueue;                                                                     
import java.util.concurrent.ConcurrentHashMap;                                                                      
import java.util.concurrent.ConcurrentMap;                                                                          
import java.util.concurrent.ExecutorService;                                                                        
import java.util.concurrent.ThreadPoolExecutor;                                                                     
import java.util.concurrent.TimeUnit;                                                                               

public class ConcurrentMapTest                                                                                      
{                                                                                                                   

   private static ConcurrentMap<String, Integer> map = new ConcurrentHashMap<>();                                   

   private final class ReplaceWorker implements Runnable                                                            
   {                                                                                                                
      public void run()                                                                                             
      {                                                                                                             
         for(int i = 0; i<100; i++)                                                                                 
         {                                                                                                          
            Integer putIfAbsent = map.putIfAbsent("key", Integer.valueOf(1));                                       
            if(putIfAbsent == null)                                                                                 
               return;                                                                                              
            map.replace("key", putIfAbsent + 1);                                                                    
         }                                                                                                          
      }                                                                                                             
   }                                                                                                                

   private final class MergeWorker implements Runnable                                                              
   {                                                                                                                
      public void run()                                                                                             
      {                                                                                                             
         for(int i = 0; i<100; i++)                                                                                 
         {                                                                                                          
            map.merge("key", Integer.valueOf(1), (ov, nv) -> {                                                      
               return ov + 1;                                                                                       
            });                                                                                                     
         }                                                                                                          
      }                                                                                                             
   }                                                                                                                

   public MergeWorker newMergeWorker()                                                                              
   {                                                                                                                
      return new MergeWorker();                                                                                     
   }                                                                                                                

   public ReplaceWorker newReplaceWorker()                                                                          
   {                                                                                                                
      return new ReplaceWorker();                                                                                   
   }                                                                                                                

   public static void main(String[] args)                                                                           
   {                                                                                                                
      map.put("key", 1);                                                                                            
      ConcurrentMapTest test = new ConcurrentMapTest();                                                             
      ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQu
      for(int i = 0; i<100; i++)                                                                                    
      {                                                                                                             
         threadPool.submit(test.newMergeWorker());                                                                  
      }                                                                                                             
      awaitTermination(threadPool);                                                                                 
      System.out.println(test.map.get("key"));                                                                      

      map.put("key", 1);                                                                                            
      threadPool = new ThreadPoolExecutor(10, 10, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000));      
      for(int i = 0; i<100; i++)                                                                                    
      {                                                                                                             
         threadPool.submit(test.newReplaceWorker());                                                                
      }                                                                                                             
      awaitTermination(threadPool);                                                                                 
      System.out.println(test.map.get("key"));                                                                      
   }                                                                                                                

   private static void awaitTermination(ExecutorService threadPool)                                                 
   {                                                                                                                
      try                                                                                                           
      {                                                                                                             
         threadPool.shutdown();                                                                                     
         boolean awaitTermination = threadPool.awaitTermination(1, TimeUnit.SECONDS);                               
         System.out.println("terminted successfull: " + awaitTermination);                                          
      }                                                                                                             
      catch (InterruptedException e)                                                                                
      {                                                                                                             
         // TODO Auto-generated catch block                                                                         
         e.printStackTrace();                                                                                       
      }                                                                                                             
   }                                                                                                                
}                                                                                          
результат:
прервано успешно: верно
10000
прервано успешно: верно
1743

Проблема в том, что в вашем случае есть разрыв между get и put, поэтому при одновременном доступе к карте результаты перезаписываются. С merge это атомарная операция, хотя в документации ничего об этом не сказано.

Ваш метод кажется потокобезопасным. Если вам не требуются преимущества производительности ConcurrentHashMap, рассмотрите возможность использования обычного HashMap и синхронизируйте весь доступ к нему. Ваш метод похож на AtomicInteger.getAndSet(int), поэтому он должен быть в порядке. Я сомневаюсь, что есть более простой способ сделать это, если вы не ищете библиотечный вызов, чтобы сделать работу за вас.

Ты можешь использовать MutableMapIterable.updateValueWith(K key, Function0<? extends V> factory, Function2<? super V,? super P,? extends V> function, P parameter) из коллекции Eclipse.

factory Аргумент создает начальное значение, если на карте нет ни одного. function Аргумент применяется к значению карты вместе с дополнительным параметром, чтобы придумать новое значение карты. Тот parameter передается в качестве последнего аргумента updateValueWith(), Функция вызывается даже в том случае, если ключа не было на карте. Таким образом, начальное значение действительно function применяется к выходу factory а также parameter, function не должен изменять значение; он должен вернуть новое значение. В вашем примере значения карты являются строками, которые являются неизменяемыми, поэтому мы в порядке.

В ConcurrentMaps, как org.eclipse.collections.impl.map.mutable.ConcurrentHashMap, реализация updateValueWith() также потокобезопасен и атомарен. Важно что function не изменяет значения карты или не будет поточно-ориентированным. Вместо этого он должен возвращать новые значения. В вашем примере значения карты являются строками, которые являются неизменяемыми, поэтому мы в порядке.

Если ваш метод recalculateNewValue() просто выполняет конкатенацию строк, вот как вы можете использовать updateValueWith(),

Function0<String> factory = () -> "initial ";
Function2<String, String, String> recalculateNewValue = String::concat;

MutableMap<String, String> map = new ConcurrentHashMap<>();
map.updateValueWith("test", factory, recalculateNewValue, "append1 ");
Assert.assertEquals("initial append1 ", map.get("test"));
map.updateValueWith("test", factory, recalculateNewValue, "append2");
Assert.assertEquals("initial append1 append2", map.get("test"));

Вы можете использовать Java 8 ConcurrentMap.compute(клавиша K, BiFunction remappingFunction), чтобы выполнить то же самое, но у него есть несколько недостатков.

ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
map.compute("test", (key, oldValue) -> oldValue == null ? "initial append1 " : oldValue + "append1 ");
Assert.assertEquals("initial append1 ", map.get("test"));
map.compute("test", (key, oldValue) -> oldValue == null ? "initial append1 " : oldValue + "append2");
Assert.assertEquals("initial append1 append2", map.get("test"));
  • Нет отдельной фабрики для обработки случая отсутствия ключей, поэтому тело лямбды должно иметь дело со значениями и начальными значениями.
  • API не поддается повторному использованию лямбд. Каждый звонок updateValueWith() разделяет те же самые лямбды, но каждый звонок compute() создает новый мусор в куче.

Примечание: я являюсь коммиттером для коллекций Eclipse

Другие вопросы по тегам