Объединить putIfAbsent и заменить на ConcurrentMap
У меня есть случай, когда я должен
- вставить новое значение, если ключ не существует в ConcurrentHashMap
- замените старое значение новым значением, если ключ уже существует в ConcurrentHashMap, где новое значение получено из старого значения (не дорогая операция)
У меня есть следующий код, чтобы предложить:
public void insertOrReplace(String key, String value) {
boolean updated = false;
do {
String oldValue = concurrentMap.get(key);
if (oldValue == null) {
oldValue = concurrentMap.putIfAbsent(key, value);
if (oldValue == null) {
updated = true;
}
}
if (oldValue != null) {
final String newValue = recalculateNewValue(oldValue, value);
updated = concurrentMap.replace(key, oldValue, newValue);
}
} while (!updated);
}
Как вы думаете, это правильно и потокобезопасно?
Есть ли более простой способ?
4 ответа
Вы можете сделать это немного короче с кодом ниже, который эквивалентен вашему. Я провел небольшое стресс-тестирование с тысячами потоков, обращающихся к нему одновременно: он работает, как и ожидалось, с несколькими повторными попытками (циклами) (очевидно, вы никогда не сможете доказать правильность при тестировании в параллельном мире).
public void insertOrReplace(String key, String value) {
for (;;) {
String oldValue = concurrentMap.putIfAbsent(key, value);
if (oldValue == null)
return;
final String newValue = recalculateNewValue(oldValue, value);
if (concurrentMap.replace(key, oldValue, newValue))
return;
}
}
Я не думаю, что это правильно. Насколько я понимаю, метод merge() будет правильным инструментом для работы. В настоящее время у меня та же проблема, и я написал небольшой тест, чтобы увидеть результаты.
Этот тест начинает 100 рабочих. Каждый из них увеличивает значение на карте в 100 раз. Таким образом, ожидаемый результат будет 10000.
Есть два типа работников. Тот, который использует алгоритм замены, и который использует слияние. Тест запускается два раза с разными реализациями.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ConcurrentMapTest
{
private static ConcurrentMap<String, Integer> map = new ConcurrentHashMap<>();
private final class ReplaceWorker implements Runnable
{
public void run()
{
for(int i = 0; i<100; i++)
{
Integer putIfAbsent = map.putIfAbsent("key", Integer.valueOf(1));
if(putIfAbsent == null)
return;
map.replace("key", putIfAbsent + 1);
}
}
}
private final class MergeWorker implements Runnable
{
public void run()
{
for(int i = 0; i<100; i++)
{
map.merge("key", Integer.valueOf(1), (ov, nv) -> {
return ov + 1;
});
}
}
}
public MergeWorker newMergeWorker()
{
return new MergeWorker();
}
public ReplaceWorker newReplaceWorker()
{
return new ReplaceWorker();
}
public static void main(String[] args)
{
map.put("key", 1);
ConcurrentMapTest test = new ConcurrentMapTest();
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQu
for(int i = 0; i<100; i++)
{
threadPool.submit(test.newMergeWorker());
}
awaitTermination(threadPool);
System.out.println(test.map.get("key"));
map.put("key", 1);
threadPool = new ThreadPoolExecutor(10, 10, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000));
for(int i = 0; i<100; i++)
{
threadPool.submit(test.newReplaceWorker());
}
awaitTermination(threadPool);
System.out.println(test.map.get("key"));
}
private static void awaitTermination(ExecutorService threadPool)
{
try
{
threadPool.shutdown();
boolean awaitTermination = threadPool.awaitTermination(1, TimeUnit.SECONDS);
System.out.println("terminted successfull: " + awaitTermination);
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
результат: прервано успешно: верно 10000 прервано успешно: верно 1743
Проблема в том, что в вашем случае есть разрыв между get и put, поэтому при одновременном доступе к карте результаты перезаписываются. С merge это атомарная операция, хотя в документации ничего об этом не сказано.
Ваш метод кажется потокобезопасным. Если вам не требуются преимущества производительности ConcurrentHashMap, рассмотрите возможность использования обычного HashMap и синхронизируйте весь доступ к нему. Ваш метод похож на AtomicInteger.getAndSet(int), поэтому он должен быть в порядке. Я сомневаюсь, что есть более простой способ сделать это, если вы не ищете библиотечный вызов, чтобы сделать работу за вас.
Ты можешь использовать MutableMapIterable.updateValueWith(K key, Function0<? extends V> factory, Function2<? super V,? super P,? extends V> function, P parameter)
из коллекции Eclipse.
factory
Аргумент создает начальное значение, если на карте нет ни одного. function
Аргумент применяется к значению карты вместе с дополнительным параметром, чтобы придумать новое значение карты. Тот parameter
передается в качестве последнего аргумента updateValueWith()
, Функция вызывается даже в том случае, если ключа не было на карте. Таким образом, начальное значение действительно function
применяется к выходу factory
а также parameter
, function
не должен изменять значение; он должен вернуть новое значение. В вашем примере значения карты являются строками, которые являются неизменяемыми, поэтому мы в порядке.
В ConcurrentMaps, как org.eclipse.collections.impl.map.mutable.ConcurrentHashMap
, реализация updateValueWith()
также потокобезопасен и атомарен. Важно что function
не изменяет значения карты или не будет поточно-ориентированным. Вместо этого он должен возвращать новые значения. В вашем примере значения карты являются строками, которые являются неизменяемыми, поэтому мы в порядке.
Если ваш метод recalculateNewValue()
просто выполняет конкатенацию строк, вот как вы можете использовать updateValueWith()
,
Function0<String> factory = () -> "initial ";
Function2<String, String, String> recalculateNewValue = String::concat;
MutableMap<String, String> map = new ConcurrentHashMap<>();
map.updateValueWith("test", factory, recalculateNewValue, "append1 ");
Assert.assertEquals("initial append1 ", map.get("test"));
map.updateValueWith("test", factory, recalculateNewValue, "append2");
Assert.assertEquals("initial append1 append2", map.get("test"));
Вы можете использовать Java 8 ConcurrentMap.compute(клавиша K, BiFunction remappingFunction), чтобы выполнить то же самое, но у него есть несколько недостатков.
ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
map.compute("test", (key, oldValue) -> oldValue == null ? "initial append1 " : oldValue + "append1 ");
Assert.assertEquals("initial append1 ", map.get("test"));
map.compute("test", (key, oldValue) -> oldValue == null ? "initial append1 " : oldValue + "append2");
Assert.assertEquals("initial append1 append2", map.get("test"));
- Нет отдельной фабрики для обработки случая отсутствия ключей, поэтому тело лямбды должно иметь дело со значениями и начальными значениями.
- API не поддается повторному использованию лямбд. Каждый звонок
updateValueWith()
разделяет те же самые лямбды, но каждый звонокcompute()
создает новый мусор в куче.
Примечание: я являюсь коммиттером для коллекций Eclipse