Многопоточность ConcurrentModificationException
Я некоторое время искал в Интернете, пытаясь решить эту проблему, но безуспешно.
В моем приложении у меня есть большой набор сообщений, которые я пытаюсь зашифровать с использованием базовой коммутативной схемы шифрования. Поскольку наборы - это большое количество BigIntegers, я пытаюсь использовать многопоточность шифрования для повышения производительности.
По сути, я беру большой набор сообщений и делю его на подмножества, которые передаются в поток шифрования для выполнения подмножества шифрований. Затем я пытаюсь извлечь каждое подмножество и объединить их в исходный большой набор после того, как все потоки выполнили свои части. Когда я перебираю потоки и извлекаю каждое из их шифрований, возникает ошибка, когда я пытаюсь фактически добавить все шифрования в список всех шифрований, а выдаваемая ошибка - это ошибка java.util.ConcurrentModificationException.
Я пытался использовать синхронизацию, но это не помогает.
Вот вызов функции:
protected Set<BigInteger> multiEncrypt(BigInteger key, HashSet<BigInteger> messageSet) {
ArrayList<BigInteger> messages = new ArrayList<BigInteger>(messageSet);
Set<BigInteger> encryptions = Collections.synchronizedSet(new HashSet<BigInteger>());
int cores = Runtime.getRuntime().availableProcessors();
int numMessages = messages.size();
int stride = numMessages/cores;
//create all the threads and run them
ArrayList<EncryptThread> threads = new ArrayList<EncryptThread>();
for (int thread = 0; thread < cores; thread++) {
int start = thread*stride;
//don't want to go over the end
int stop = ((thread+1)*stride >= messages.size()) ? messages.size()-1 : (thread+1)*stride;
List<BigInteger> subList = messages.subList(start, stop);
EncryptThread t = new EncryptThread(encryptionScheme.getPrime(), key, subList);
t.start();
threads.add(t);
}
//pull out the encryptions
synchronized(encryptions){
for (int i=0; i < threads.size()-1; i++) {
EncryptThread thread = threads.get(i);
ArrayList<BigInteger> these = thread.getEncryptions();
encryptions.addAll(these); //<-- Erroring Here
thread.finish();
}
}
И вот соответствующие части класса EncryptThread, который я написал для шифрования:
/**
* Constructor
*/
public EncryptThread(BigInteger prime, BigInteger key, List<BigInteger> messages) {
//need a new encryption scheme object for each thread
encryptionScheme = new EncryptionScheme(prime);
encryptions = new ArrayList<BigInteger>();
this.key = key;
this.messages = messages;
wait = true;
}
@Override
public void run() {
encryptMessages(key, messages);
while(wait);
}
/**
* Used to encrypt a set of messages
* @param key
* @param messages
* @return
*/
public void encryptMessages(BigInteger key, List<BigInteger> messages) {
System.out.println("Encrypting stuff");
for (BigInteger m : messages) {
BigInteger em = encryptionScheme.encrypt(key, m);
encryptions.add(m);
}
}
public ArrayList<BigInteger> getEncryptions() {
return encryptions;
}
//call this after encryptions have been pulled to let the thread finish
public void finish() {
wait = false;
}
}
Я не новичок в Java, но я новичок в многопоточности в Java, поэтому я буду признателен за любые советы. Заранее спасибо!
РЕДАКТИРОВАТЬ: Согласно предложениям, я добавил простой механизм блокировки в класс EncryptThread, который заставляет поток ждать, пока они не завершат шифрование, и все заработает.
public void encryptMessages(BigInteger key, List<BigInteger> messages) {
System.out.println("Encrypting stuff");
this.lock = true;
for (BigInteger m : messages) {
BigInteger em = encryptionScheme.encrypt(key, m);
//deals with when we have to mark chaff at S2
if (shift) {
em.shiftLeft(1);
if(shiftVal != 0) em.add(BigInteger.ONE);
}
encryptions.add(m);
}
this.lock = false;
}
public ArrayList<BigInteger> getEncryptions() {
while(lock);
return encryptions;
}
РЕДАКТИРОВАТЬ #2 Таким образом, я в конечном итоге использовал решение, которое было предложено мне кем-то из моей лаборатории. Я избавился от логических значений блокировки и ожидания и функции finish() в классе EncryptThread и вместо этого добавил простой цикл thread.join() между циклами start и getEncryption:
//create all the threads
ArrayList<EncryptThread> threads = new ArrayList<EncryptThread>();
for (int thread = 0; thread < cores; thread++) {
int start = thread*stride;
//don't want to go over the end
int stop = ((thread+1)*stride >= messages.size()) ? messages.size()-1 : (thread+1)*stride;
List<BigInteger> subList = messages.subList(start, stop);
EncryptThread t = new EncryptThread(encryptionScheme.getPrime(), key, subList, shiftVal);
t.start();
threads.add(t);
}
//wait for them to finish
for( EncryptThread thread: threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//pull out the encryptions
for (int i=0; i < threads.size()-1; i++) {
EncryptThread thread = threads.get(i);
encryptions.addAll(thread.getEncryptions());
}
Я думаю, что моя главная путаница заключалась в том, что я думал, что класс потока не может вызывать свои методы после его завершения. Но вышесказанное работает отлично.
3 ответа
Если вы посмотрите на документацию по List
Это addAll, он говорит следующее:
Добавляет все элементы в указанной коллекции в конец этого списка в том порядке, в котором они возвращаются итератором указанной коллекции (необязательная операция). Поведение этой операции не определено, если указанная коллекция была изменена во время выполнения операции. (Обратите внимание, что это произойдет, если указанная коллекция является этим списком и она непуста.)
Вы можете увидеть, что ваш список изменяется addAll
использует его итератор в вашем encryptMessages
метод, который в данный момент исполняет один из ваших потоков.
for (BigInteger m : messages) {
BigInteger em = encryptionScheme.encrypt(key, m);
encryptions.add(m); // <-- here
}
Я не просмотрел весь ваш код полностью, но некоторые вещи здесь не являются потокобезопасными. Вы могли бы преуспеть, используя CopyOnWriteArrayList
вместо обычного ArrayList
чтобы избежать ConcurrentModificationException
Если вы не добавили все в список в вызове addAll, если нет, вам также нужно дождаться завершения потоков. Вы, вероятно, хотите вместо этого просто использовать задачи с ExecutorService. Возможно, есть и другие улучшения.
Кроме того, книга переходов, которую все упоминают, чтобы научиться писать поточно-ориентированные программы на Java, называется "Параллелизм на практике". Я бы порекомендовал, если вы новичок в параллелизме на Java.
ConcurrentModificationException возникает, когда вы изменяете коллекцию во время итерации по ней. Это очень мало связано с многопоточностью, так как вы можете легко создать однопоточный пример:
ArrayList<String> myStrings = new ArrayList<>();
myStrings.add("foo");
myStrings.add("bar");
for(String s : myStrings) {
myStrings.add("Hello ConcurrentModificationException!");
Вы начинаете свои темы здесь.
for (int thread = 0; thread < cores; thread++) {
int start = thread*stride;
//don't want to go over the end
int stop = ((thread+1)*stride >= messages.size()) ? messages.size()-1 : (thread+1)*stride;
List<BigInteger> subList = messages.subList(start, stop);
EncryptThread t = new EncryptThread(encryptionScheme.getPrime(), key, subList);
t.start();
threads.add(t);
}
Что ж. Затем вы должны дождаться завершения всех потоков, прежде чем начинать агрегирование в этом блоке.
//pull out the encryptions
synchronized(encryptions){
for (int i=0; i < threads.size()-1; i++) {
EncryptThread thread = threads.get(i);
ArrayList<BigInteger> these = thread.getEncryptions();
encryptions.addAll(these); //<-- Erroring Here
thread.finish();
}
}
вы блокируете темы, которые обращаются encryptions
только. но созданная вами нить не имеет доступа к set
, среднее время, которое он будет продолжать добавлять в свой собственный массив List these
, Поэтому, когда вы звоните encryptions.addAll(these);
доступ к ним осуществляется двумя потоками encryptions
и нить владеющая these
И другие ответы предоставили подробности о том, почему одновременное исключение в addAll.
Вы должны ждать, пока все потоки не завершат свою работу.
Вы можете сделать это используя ExecutorService
Измените начальную тему как
ExecutorService es = Executors.newFixedThreadPool(cores);
for(int i=0;i<5;i++)
es.execute(new Runnable() { /* your task */ }); //EncryptThread instance
es.shutdown();
boolean finshed = es.awaitTermination(1, TimeUnit.MINUTES);
Затем обработайте процесс добавления.
ExecutorService es = Executors.newFixedThreadPool(cores);
for (int thread = 0; thread < cores; thread++) {
int start = thread*stride;
//don't want to go over the end
int stop = ((thread+1)*stride >= messages.size()) ? messages.size()-1 : (thread+1)*stride;
List<BigInteger> subList = messages.subList(start, stop);
EncryptThread t = new EncryptThread(encryptionScheme.getPrime(), key, subList);
es.execute(t);
threads.add(t);
}
es.shutdown();
boolean finshed = es.awaitTermination(1, TimeUnit.MINUTES);
//pull out the encryptions
synchronized(encryptions){
for (int i=0; i < threads.size()-1; i++) {
EncryptThread thread = threads.get(i);
ArrayList<BigInteger> these = thread.getEncryptions();
encryptions.addAll(these); //<-- Erroring Here
thread.finish();
}
}
Предполагается, что ваш EncryptThread является потоком прямо сейчас. Возможно, вам придется перейти на инструменты Runnable. и никаких других изменений в getEncryptions