ConcurrentHashMap: избежать создания дополнительных объектов с помощью "putIfAbsent"?

Я собираю несколько значений для ключей в многопоточной среде. Ключи не известны заранее. Я думал, что сделаю что-то вроде этого:

class Aggregator {
    protected ConcurrentHashMap<String, List<String>> entries =
                            new ConcurrentHashMap<String, List<String>>();
    public Aggregator() {}

    public void record(String key, String value) {
        List<String> newList =
                    Collections.synchronizedList(new ArrayList<String>());
        List<String> existingList = entries.putIfAbsent(key, newList);
        List<String> values = existingList == null ? newList : existingList;
        values.add(value);
    }
}

Проблема, которую я вижу, заключается в том, что каждый раз, когда этот метод запускается, мне нужно создать новый экземпляр ArrayList, который я потом выбрасываю (в большинстве случаев). Это похоже на неоправданное злоупотребление сборщиком мусора. Есть ли лучший, потокобезопасный способ инициализации такого рода структуры без необходимости synchronize record метод? Я несколько удивлен решением иметь putIfAbsent Метод не возвращает вновь созданный элемент, и из-за отсутствия способа отложить создание экземпляров, если он не требуется (так сказать).

7 ответов

Решение

Java 8 представила API для решения именно этой проблемы, создав решение в 1 строку:

public void record(String key, String value) {
    entries.computeIfAbsent(key, k -> Collections.synchronizedList(new ArrayList<String>())).add(value);
}

Для Java 7:

public void record(String key, String value) {
    List<String> values = entries.get(key);
    if (values == null) {
        entries.putIfAbsent(key, Collections.synchronizedList(new ArrayList<String>()));
        // At this point, there will definitely be a list for the key.
        // We don't know or care which thread's new object is in there, so:
        values = entries.get(key);
    }
    values.add(value);
}

Это стандартный шаблон кода при заполнении ConcurrentHashMap,

Специальный метод putIfAbsent(K, V)) либо поместит ваш объект значения, либо, если перед вами появился другой поток, он проигнорирует ваш объект значения. В любом случае, после звонка putIfAbsent(K, V)), get(key) гарантированно согласован между потоками, и поэтому приведенный выше код является потокобезопасным.

Затрачиваются только лишние издержки, если какой-то другой поток добавляет новую запись в то же время для того же ключа: вы можете в конечном итоге отбросить вновь созданное значение, но это происходит только в том случае, если запись еще не существует и существует гонка, в которой ваш поток теряет, что обычно бывает редко.

Начиная с Java-8 вы можете создавать Multi Maps, используя следующий шаблон:

public void record(String key, String value) { entries.computeIfAbsent(key, k -> Collections.synchronizedList(new ArrayList<String>())) .add(value); }

В документации ConcurrentHashMap (а не в общем контракте) указано, что ArrayList будет создаваться только один раз для каждого ключа при незначительной начальной стоимости задержки обновления, пока создается ArrayList для нового ключа:

http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html

В конце концов, я реализовал небольшую модификацию ответа @Bohemian. Его предлагаемое решение перезаписывает values переменная с putIfAbsent вызов, который создает ту же проблему, которую я имел раньше. Код, который, кажется, работает, выглядит так:

    public void record(String key, String value) {
        List<String> values = entries.get(key);
        if (values == null) {
            values = Collections.synchronizedList(new ArrayList<String>());
            List<String> values2 = entries.putIfAbsent(key, values);
            if (values2 != null)
                values = values2;
        }
        values.add(value);
    }

Это не так элегантно, как хотелось бы, но лучше, чем оригинал, который создает новый ArrayList экземпляр на каждый звонок.

Создано две версии на основе ответа Джина

public  static <K,V> void putIfAbsetMultiValue(ConcurrentHashMap<K,List<V>> entries, K key, V value) {
    List<V> values = entries.get(key);
    if (values == null) {
        values = Collections.synchronizedList(new ArrayList<V>());
        List<V> values2 = entries.putIfAbsent(key, values);
        if (values2 != null)
            values = values2;
    }
    values.add(value);
}

public  static <K,V> void putIfAbsetMultiValueSet(ConcurrentMap<K,Set<V>> entries, K key, V value) {
    Set<V> values = entries.get(key);
    if (values == null) {
        values = Collections.synchronizedSet(new HashSet<V>());
        Set<V> values2 = entries.putIfAbsent(key, values);
        if (values2 != null)
            values = values2;
    }
    values.add(value);
}

Это работает хорошо

Это проблема, я тоже искал ответ. Метод putIfAbsent на самом деле не решает проблему создания дополнительного объекта, он просто гарантирует, что один из этих объектов не заменит другой. Но условия состязания между потоками могут вызвать создание нескольких объектов. Я мог бы найти 3 решения для этой проблемы (и я следовал бы этому порядку предпочтения):

1- Если вы работаете на Java 8, лучший способ добиться этого, вероятно, новый computeIfAbsent метод ConcurrentMap, Вам просто нужно дать ему функцию вычисления, которая будет выполняться синхронно (по крайней мере, для ConcurrentHashMap реализация). Пример:

private final ConcurrentMap<String, List<String>> entries =
        new ConcurrentHashMap<String, List<String>>();

public void method1(String key, String value) {
    entries.computeIfAbsent(key, s -> new ArrayList<String>())
            .add(value);
}

Это из Javadoc ConcurrentHashMap.computeIfAbsent:

Если указанный ключ еще не связан со значением, пытается вычислить его значение с использованием заданной функции отображения и вводит его в эту карту, если не указано значение NULL. Весь вызов метода выполняется атомарно, поэтому функция применяется не более одного раза для каждой клавиши. Некоторые попытки обновления этой карты другими потоками могут быть заблокированы во время вычислений, поэтому вычисления должны быть короткими и простыми и не должны пытаться обновить какие-либо другие сопоставления этой карты.

2- Если вы не можете использовать Java 8, вы можете использовать Guava"s LoadingCache, который является потокобезопасным. Вы определяете функцию загрузки для него (так же, как compute выше), и вы можете быть уверены, что он будет вызываться синхронно. Пример:

private final LoadingCache<String, List<String>> entries = CacheBuilder.newBuilder()
        .build(new CacheLoader<String, List<String>>() {
            @Override
            public List<String> load(String s) throws Exception {
                return new ArrayList<String>();
            }
        });

public void method2(String key, String value) {
    entries.getUnchecked(key).add(value);
}

3. Если вы не можете использовать Guava, вы всегда можете синхронизироваться вручную и выполнить двойную проверку блокировки. Пример:

private final ConcurrentMap<String, List<String>> entries =
        new ConcurrentHashMap<String, List<String>>();

public void method3(String key, String value) {
    List<String> existing = entries.get(key);
    if (existing != null) {
        existing.add(value);
    } else {
        synchronized (entries) {
            List<String> existingSynchronized = entries.get(key);
            if (existingSynchronized != null) {
                existingSynchronized.add(value);
            } else {
                List<String> newList = new ArrayList<>();
                newList.add(value);
                entries.put(key, newList);
            }
        }
    }
}

Я сделал пример реализации всех этих трех методов и, кроме того, несинхронизированного метода, который вызывает создание дополнительного объекта: http://pastebin.com/qZ4DUjTr

Трата памяти (также GC и т. Д.), Которая связана с проблемой создания списка пустых массивов в Java 1.7.40. Не беспокойтесь о создании пустого массива. Ссылка: http://javarevisited.blogspot.com.tr/2014/07/java-optimization-empty-arraylist-and-Hashmap-cost-less-memory-jdk-17040-update.html

Подход с putIfAbsent имеет самое быстрое время выполнения, оно от 2 до 50 раз быстрее, чем "лямбда" подход в средах с высокой конкуренцией. Лямбда не является причиной этой "потери мощности", проблема заключается в принудительной синхронизации внутри computeIfAbsent до оптимизации Java-9.

эталон:

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class ConcurrentHashMapTest {
    private final static int numberOfRuns = 1000000;
    private final static int numberOfThreads = Runtime.getRuntime().availableProcessors();
    private final static int keysSize = 10;
    private final static String[] strings = new String[keysSize];
    static {
        for (int n = 0; n < keysSize; n++) {
            strings[n] = "" + (char) ('A' + n);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int n = 0; n < 20; n++) {
            testPutIfAbsent();
            testComputeIfAbsentLamda();
        }
    }

    private static void testPutIfAbsent() throws InterruptedException {
        final AtomicLong totalTime = new AtomicLong();
        final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<String, AtomicInteger>();
        final Random random = new Random();
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

        for (int i = 0; i < numberOfThreads; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    long start, end;
                    for (int n = 0; n < numberOfRuns; n++) {
                        String s = strings[random.nextInt(strings.length)];
                        start = System.nanoTime();

                        AtomicInteger count = map.get(s);
                        if (count == null) {
                            count = new AtomicInteger(0);
                            AtomicInteger prevCount = map.putIfAbsent(s, count);
                            if (prevCount != null) {
                                count = prevCount;
                            }
                        }
                        count.incrementAndGet();
                        end = System.nanoTime();
                        totalTime.addAndGet(end - start);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
    }

    private static void testComputeIfAbsentLamda() throws InterruptedException {
        final AtomicLong totalTime = new AtomicLong();
        final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<String, AtomicInteger>();
        final Random random = new Random();
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
        for (int i = 0; i < numberOfThreads; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    long start, end;
                    for (int n = 0; n < numberOfRuns; n++) {
                        String s = strings[random.nextInt(strings.length)];
                        start = System.nanoTime();

                        AtomicInteger count = map.computeIfAbsent(s, (k) -> new AtomicInteger(0));
                        count.incrementAndGet();

                        end = System.nanoTime();
                        totalTime.addAndGet(end - start);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
                + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
    }

}

Результаты, достижения:

Test testPutIfAbsent average time per run: 115.756501 ns
Test testComputeIfAbsentLamda average time per run: 276.9667055 ns
Test testPutIfAbsent average time per run: 134.2332435 ns
Test testComputeIfAbsentLamda average time per run: 223.222063625 ns
Test testPutIfAbsent average time per run: 119.968893625 ns
Test testComputeIfAbsentLamda average time per run: 216.707419875 ns
Test testPutIfAbsent average time per run: 116.173902375 ns
Test testComputeIfAbsentLamda average time per run: 215.632467375 ns
Test testPutIfAbsent average time per run: 112.21422775 ns
Test testComputeIfAbsentLamda average time per run: 210.29563725 ns
Test testPutIfAbsent average time per run: 120.50643475 ns
Test testComputeIfAbsentLamda average time per run: 200.79536475 ns
Другие вопросы по тегам