Hazelcast JCache CacheEntryListener срабатывает слишком часто

Мне кажется, что у меня есть проблема с механизмом обратного вызова JCache EntryListener. Я пытаюсь настроить несколько участников jcache (реализация hazelcast), каждый из которых работает на своем локальном компьютере как отдельное java-приложение (сейчас в Ecipse запускаем main вручную для каждого узла).

Узлы содержат только один пустой кеш в начале <Long, SpecificDataType>, Используя файл по умолчанию hazelcast-default.xml, я самостоятельно подготовил минимальную конфигурацию программным способом, в основном, зарегистрировав CacheEntryListener, который создает простой sysout для созданной / обновленной / удаленной / просроченной записи. Когда я запускаю несколько (три или более) элементов, я ожидаю, что каждый элемент напечатает измененную пару ключ / значение ровно один раз, где содержится запись (в качестве рабочей или резервной записи). Проблема в том, что в некоторых случаях sysout появляется несколько раз, и мне кажется, что слушатель срабатывает слишком часто (более одного раза).

Например, при создании некоторой записи с помощью простого клиента (который просто получает кэш и помещает запись с ключом 123689 в кеш), системный вывод "CREATE EVENT RECEIVED" обнаруживается 4 раза в третьем элементе и даже чаще в четвертом и тд...

CREATE EVENT RECEIVED:
Key: 123689, Value:SpecificDataType[...]

CREATE EVENT RECEIVED:
Key: 123689, Value:SpecificDataType[...]

CREATE EVENT RECEIVED: 
Key: 123689, Value:SpecificDataType[...]

Кажется, что слушатель как-то возводит в степень... Что я делаю не так? Я что-то упускаю в отношении конфигурации?

Код:

public static void main(String[] args) {
    Member member = Member.getInstance();

    try {
        Cache<Long, SpecificDataType> cache = member.getCache("SpecificDataTypeCache", SpecificDataType.class);
        System.out.println(cache);
    } catch (Exception e) {
        e.printStackTrace();

    }

    // shutdown loop
    boolean shutdown = false;
    while (!shutdown) {
        if (new Scanner(System.in).nextLine().equals("shutdown")) {
            shutdown = true;
        }
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    // cache.clear();
    member.shutdown();

}

public <T> Cache<Long, T> getCache(String name, Class<T> clazz) {

    // configure the cache
    MutableConfiguration<Long, T> config = new MutableConfiguration<Long, T>();
    config.setStoreByValue(true).setTypes(Long.class, clazz)
            .setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(TEN_SEC))
            .setStatisticsEnabled(false);

    // create / get cache
    Cache<Long, T> cache = cacheManager.getCache(name, Long.class, clazz);
    if (cache == null) {
        System.out.println("create cache");
        cache = cacheManager.createCache(name, config);
        // create the EntryListener
        MyCacheEntryListener<Long, T> clientListener = new MyCacheEntryListener<Long, T>();

        // using out listener, lets create a configuration
        CacheEntryListenerConfiguration<Long, T> conf = new MutableCacheEntryListenerConfiguration<Long, T>(
                FactoryBuilder.factoryOf(clientListener), null, false, true);

        // register to cache
        cache.registerCacheEntryListener(conf);
    } else {
        System.out.println("get cache");
    }

    return cache;
}

Слушатель настолько прост, насколько это возможно:

public class MyCacheEntryListener<K, V> implements CacheEntryCreatedListener<K, V>,
    CacheEntryUpdatedListener<K, V>, CacheEntryExpiredListener<K, V>,
    CacheEntryRemovedListener<K, V>, Serializable {


@Override
public void onCreated(Iterable<CacheEntryEvent<? extends K, ? extends V>> cacheEntryEvents)
        throws CacheEntryListenerException {
    System.out.println("CREATE EVENT RECEIVED: ");
    System.out.println(printEvent(cacheEntryEvents));
}


@Override
public void onExpired(Iterable<CacheEntryEvent<? extends K, ? extends V>> cacheEntryEvents)
        throws CacheEntryListenerException {
    System.out.println("EXPIRE EVENT RECEIVED: ");
    System.out.println(printEvent(cacheEntryEvents));
}


@Override
public void onRemoved(Iterable<CacheEntryEvent<? extends K, ? extends V>> cacheEntryEvents)
        throws CacheEntryListenerException {
    System.out.println("REMOVE EVENT RECEIVED: ");
    System.out.println(printEvent(cacheEntryEvents));
}


@Override
public void onUpdated(Iterable<CacheEntryEvent<? extends K, ? extends V>> cacheEntryEvents)
        throws CacheEntryListenerException {
    System.out.println("UPDATE EVENT RECEIVED: ");
    System.out.println(printEvent(cacheEntryEvents));
}


private String printEvent(Iterable<CacheEntryEvent<? extends K, ? extends V>> cacheEntryEvents) {
    StringBuilder sb = new StringBuilder();
    final Iterator<CacheEntryEvent<? extends K, ? extends V>> iterator = cacheEntryEvents
            .iterator();
    while (iterator.hasNext()) {
        final CacheEntryEvent<? extends K, ? extends V> next = iterator.next();
        sb.append("Key: ");
        sb.append(next.getKey());
        sb.append(", Value:");
        sb.append(next.getValue());
        sb.append("\n");
    }
    return sb.toString();
}

}

Любая помощь будет оценена!

1 ответ

Я попробовал ваш пример с 4 экземплярами и 3 счетчиками резервных копий, и слушатель получает уведомление только один раз, как и ожидалось

  • Вы уверены, что не регистрируете одного и того же слушателя несколько раз?
  • Можете ли вы распечатать MyCacheEntryListener сам также после CREATE EVENT RECEIVED сообщение?
  • Какую версию Hazelcast вы используете?

С уважением.

Другие вопросы по тегам