Безопасен ли параллельный поток StreamTransformer?
Предположим, у меня есть кластер воспламенения с несколькими узлами и непустой секционированный IgniteCache с именем "TEST_CACHE". Затем я запускаю следующий код в одном из узлов:
ignite.compute().run(new IgniteRunnable(){
@IgniteInstanceResource
private Ignite ignite;
@Override
public void run() {
IgniteDataStreamer<String,Long> ds = ignite.dataStreamer("TEST_CACHE");
ds.receiver(new StreamTransformer<String,Long>(){
@Override
public Object process(MutableEntry<String, Long> entry, Object... arguments)
throws EntryProcessorException {
Long value = entry.getValue();
entry.setValue(value==null?1L:(value.longValue()+1L));
return null;
}
});
//loop for adding lots of String data
while(...)
ds.addData(...);
}
});
Это похоже на официальный код StreamTransformerExample, но отличается то, что каждый узел получит экземпляр DataStreamer одного и того же кэша и одновременно вызовет метод addData. Другими словами, для одних и тех же строковых данных в разных узлах, возможно, один узел только что получил значение с помощью "Long value = entry.getValue()", но не выполняет код следующей строки для установки значения и обновления в кеш, тогда другой узел выполняет "entry.getValue()". Так возможно ли обновить неправильное значение в этом параллельном сценарии использования StreamTransformer?
1 ответ
StreamReceiver.receive вызывает cache.invoke с вашим процессором ввода, поэтому запись блокируется в этой операции. Так что да, это безопасно одновременно.
Кстати, вы включили allowOverwrite в вашем DataStreamer?