Замыкания застряли в 2.0 при попытке добавить элемент в очередь

У нас есть случай использования, как показано ниже

1-) Запустите 2, запустите экземпляр как узлы данных и вставьте данные в кеш.

2-) Создайте очередь и зарегистрируйте удаленного слушателя, используя remoteListen, как показано ниже

//Queue creation         
CollectionConfiguration colCfg = new CollectionConfiguration(); 
colCfg.setCacheMode(PARTITIONED); 
IgniteQueue<BinaryObject> queue = Ignition.ignite().queue(queueName, 0, colCfg);



//Remote Listener Closure 
IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() { 
                        @Override public boolean apply(CacheEvent evt) { 
                                System.out.println("Cache event [name=" + evt.name() + ", key=" + evt.key() + ']'); 
                Ignite ignite = Ignition.ignite(); 
                IgniteQueue<String> queue = ignite.queue(queueName, 0, null); 
                String key = evt.key(); 
                BinaryObject profile = (BinaryObject) evt.newValue(); 
                System.out.println("Received event [evt=" + evt.name() + ", key=" + evt.key() + 
                        ", oldVal=" + evt.oldValue().toString() + ", newVal=" + evt.newValue().toString()); 

                if (profile.<Double>field("usage") > start && profile.<Double>field("usage") < end 
                        && ignite.affinity("profileCache").isPrimary(ignite.cluster().localNode(), key)){ 
                    queue.add(profile.field("number")); 
                } 
                                return false; 
                        } 
                };       

Ignition.ignite().events(ignite.cluster().forCacheNodes("profileCache")).remoteListen(1,1l,false,null, rmtLsnr, 
                                EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED); 

3-) Выполните некоторые обновления в экземплярах кэша, как показано ниже, чтобы получить обновления в remotelistener.

void updateAnyProfile(Double newUsage){         
SqlQuery qry = new SqlQuery(Profile.class,"select * from Profile where usage < 30 limit 10"); 
    List<CacheEntryImpl<String, profile>> res = profileCache.query(qry).getAll(); 
    Profile profile = res.iterator().next().getValue(); 
    profile.setUsage(newUsage); 
    profileCache.put(profile.getCtn(), profile); 
    profile.setUsage(newUsage+1); 
    profileCache.put(profile.getCtn(), profile); 

} 

4-) Возьмите элементы из очереди.

 public void readFromQueue (String queueName) { 
    // Initialize new FIFO queue. 
    IgniteQueue<String> queue = Ignition.ignite().queue(queueName, 0, null); 
    while (true) { 
        String profile = queue.take(); 
        System.out.println("Profile from queue: " + profile.toString()); 
    } 
} 

Шаг 2,3,4 запускается из разных экземпляров JVM с клиентским узлом TRUE. Проблема в том, что приложение зависает при выполнении любой операции после выполнения вышеуказанного сценария. Не могли бы вы помочь нам? Будем очень признательны, если вы расскажете нам, что мы делаем не так?

Ниже дамп потока висящей датододы, и та же самая датодода висит ниже кода

 IgniteQueue<String> queue = ignite.queue(queueName, 0, null);

Иногда вы можете успешно обновить записи и после следующего обновления он начинает зависать или даже не может сделать операцию в кеше.

"sys-stripe-5-#6%null%" #25 prio=5 os_prio=31 tid=0x00007fd88d031800 nid=0x14c07 waiting on condition [0x00007000036e7000] 
   java.lang.Thread.State: WAITING (parking) 
        at sun.misc.Unsafe.park(Native Method) 
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304) 
        at org.apache.ignite.internal.util.future.GridFutureAdapter.get0(GridFutureAdapter.java:176) 
        at org.apache.ignite.internal.util.future.GridFutureAdapter.get(GridFutureAdapter.java:139) 
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get0(GridCacheAdapter.java:4482) 
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:4463) 
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:1405) 
        at org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager.queue0(CacheDataStructuresManager.java:270) 
        at org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager.queue(CacheDataStructuresManager.java:231) 
        at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor$12.applyx(DataStructuresProcessor.java:952) 
        at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor$12.applyx(DataStructuresProcessor.java:950) 
        at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.getCollection(DataStructuresProcessor.java:1078) 
        at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.queue(DataStructuresProcessor.java:950) 
        at org.apache.ignite.internal.IgniteKernal.queue(IgniteKernal.java:3560) 
        at com.ignite.trial.roaming.ProfileService$4.apply(ProfileService.java:303) 
        at com.ignite.trial.roaming.ProfileService$4.apply(ProfileService.java:297) 
        at org.apache.ignite.internal.GridEventConsumeHandler$2.onEvent(GridEventConsumeHandler.java:170) 
        at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager$LocalListenerWrapper.onEvent(GridEventStorageManager.java:1311) 
        at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.notifyListeners(GridEventStorageManager.java:892) 
        at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.record0(GridEventStorageManager.java:340) 
        at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.record(GridEventStorageManager.java:297) 
        at org.apache.ignite.internal.processors.cache.GridCacheEventManager.addEvent(GridCacheEventManager.java:297) 
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.innerUpdate(GridCacheMapEntry.java:1806) 
        - locked <0x00000007b6d01f10> (a org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCacheEntry) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateSingle(GridDhtAtomicCache.java:2386) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal0(GridDhtAtomicCache.java:1792) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal(GridDhtAtomicCache.java:1630) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.processNearAtomicUpdateRequest(GridDhtAtomicCache.java:3016) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.access$400(GridDhtAtomicCache.java:127) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$6.apply(GridDhtAtomicCache.java:282) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$6.apply(GridDhtAtomicCache.java:277) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.processMessage(GridCacheIoManager.java:863) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.onMessage0(GridCacheIoManager.java:386) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:308) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.access$000(GridCacheIoManager.java:100) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager$1.onMessage(GridCacheIoManager.java:253) 
        at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1257) 
        at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:885) 
        at org.apache.ignite.internal.managers.communication.GridIoManager.access$2100(GridIoManager.java:114) 
        at org.apache.ignite.internal.managers.communication.GridIoManager$7.run(GridIoManager.java:802) 
        at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:483) 
        at java.lang.Thread.run(Thread.java:748) 

1 ответ

Решение

В EventListener нельзя вызывать методы ignite.queue и ignite.affinity, потому что это может привести к тупику.

Все операции с кешем, включая EventListener, выполняются в системном пуле, поэтому не рекомендуется вызывать внутри операций EventListener, которые тоже используют системный пул.

Вы можете прочитать больше здесь на "Выполнение замыканий и пулы потоков": https://apacheignite.readme.io/docs/async-support

И здесь https://apacheignite.readme.io/docs/thread-pools

Другие вопросы по тегам