Безопасен ли параллельный поток StreamTransformer?

Предположим, у меня есть кластер воспламенения с несколькими узлами и непустой секционированный IgniteCache с именем "TEST_CACHE". Затем я запускаю следующий код в одном из узлов:

ignite.compute().run(new IgniteRunnable(){
    @IgniteInstanceResource
    private Ignite ignite;

    @Override
    public void run() {
        IgniteDataStreamer<String,Long> ds = ignite.dataStreamer("TEST_CACHE");
        ds.receiver(new StreamTransformer<String,Long>(){
            @Override
            public Object process(MutableEntry<String, Long> entry, Object... arguments)
                    throws EntryProcessorException {
                Long value = entry.getValue();
                entry.setValue(value==null?1L:(value.longValue()+1L));
                return null;
            }
        });

        //loop for adding lots of String data
        while(...)
            ds.addData(...);
    }

});

Это похоже на официальный код StreamTransformerExample, но отличается то, что каждый узел получит экземпляр DataStreamer одного и того же кэша и одновременно вызовет метод addData. Другими словами, для одних и тех же строковых данных в разных узлах, возможно, один узел только что получил значение с помощью "Long value = entry.getValue()", но не выполняет код следующей строки для установки значения и обновления в кеш, тогда другой узел выполняет "entry.getValue()". Так возможно ли обновить неправильное значение в этом параллельном сценарии использования StreamTransformer?

1 ответ

Решение

StreamReceiver.receive вызывает cache.invoke с вашим процессором ввода, поэтому запись блокируется в этой операции. Так что да, это безопасно одновременно.

Кстати, вы включили allowOverwrite в вашем DataStreamer?

Другие вопросы по тегам