optorsim создает исключение ConcurrentModificationException

Эта программа (optoesim) бросает java.util.ConcurrentModificationException на линии для stats.put("jobTimesWithQueue", new LinkedHashMap(_jobTimesWithQueue));,

Это программа с открытым исходным кодом, и я ничего не изменил. Может кто-нибудь, пожалуйста, объясните мне, что это означает, что вызывает это, и как я могу избежать этого.

Exception in thread ConcurrentModificationException

java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:711)** 
at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:744)

at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:742)
at java.util.HashMap.putMapEntries(HashMap.java:511)
at java.util.LinkedHashMap.<init>(LinkedHashMap.java:384)
at org.edg.data.replication.optorsim.SimpleComputingElement.getStatistics(SimpleComputingElement.java:164)
at org.edg.data.replication.optorsim.GridDataThread.run(GridDataThread.java:95)


public Statistics getStatistics() {
    Map stats = new HashMap();
      // After remove see the result here.
    OptorSimParameters params = OptorSimParameters.getInstance();
    float _usage = _time.getTimeMillis() - _startRunning == 0 ? 0 : 100 *_workingTime/(_time.getTimeMillis() - _startRunning);
    stats.put("usage",  new Float(_usage));
    stats.put("remoteReads",  new Long(_remoteReads));
    stats.put("localReads",  new Long(_localReads));
    if( params.outputStatistics() ==3) {

        stats.put("jobTimes",  new LinkedHashMap( _jobTimes));
        stats.put("jobTimesWithQueue", new LinkedHashMap(_jobTimesWithQueue));
        stats.put("jobFiles",  new LinkedHashMap(_jobFiles));
        stats.put("numberOfJobs", new Integer(_jobsCompleted));
        stats.put("workerNodes",  new Integer(_workerNodes));
        stats.put("status",  new Boolean(_active));
        stats.put("queueLength",  new Integer(_inputJobHandler.getQueueSize()));
        stats.put("runnableStatus",  new Boolean(_runnable));
    }
    stats.put("totalJobTime",  new Float(_totalJobTime/(float)1000));


    long meanJobTime = 0;
    if (_jobsCompleted!=0)
       meanJobTime = _workingTime/_jobsCompleted;
      /////////////////////////////////////////
    stats.put("meanJobTime",  new Long(meanJobTime));

    return new Statistics(this, stats);
}
**Edit:**

Я хотел бы показать вам все детали ошибки

Exception in thread "Thread-72" java.util.ConcurrentModificationException
at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:711)
at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:744)
at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:742)
at java.util.HashMap.putMapEntries(HashMap.java:511)
at java.util.LinkedHashMap.<init>(LinkedHashMap.java:384)
at org.edg.data.replication.optorsim.SimpleComputingElement.getStatistics(SimpleComputingElement.java:174)
at org.edg.data.replication.optorsim.SiteDataThread.run(SiteDataThread.java:112)

когда я нажимаю на первую связанную хэш-карту

    final LinkedHashMap.Entry<K,V> nextNode() {
        LinkedHashMap.Entry<K,V> e = next;
        if (modCount != expectedModCount)
            throw new ConcurrentModificationException();
        if (e == null)
            throw new NoSuchElementException();
        current = e;
        next = e.after;
        return e;
    }

вторая связанная карта

 final class LinkedEntryIterator extends LinkedHashIterator
    implements Iterator<Map.Entry<K,V>> {
    public final Map.Entry<K,V> next() { return nextNode(); }
}

и ошибка находится в 3-й строке 3-й связанной хэш-карты

final class LinkedEntryIterator extends LinkedHashIterator
    implements Iterator<Map.Entry<K,V>> {
    public final Map.Entry<K,V> next() { return nextNode(); }
}

ошибка находится в первой строке hashmap

  for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) {
            K key = e.getKey();
            V value = e.getValue();
            putVal(hash(key), key, value, false, evict);

LinkedHashMap

 public LinkedHashMap(Map<? extends K, ? extends V> m) {
    super();
    accessOrder = false;
    putMapEntries(m, false);
}

ошибка в линии

     putMapEntries(m, false);
at org.edg.data.replication.optorsim.SimpleComputingElement.getStatistics(SimpleComputingElement.java:174)

это весь код простого элемента вычисления

package org.edg.data.replication.optorsim;

/ * ComputingElement запускает поток, который выполняет GridJobs *, переданные ему через его {@link JobHandler}. Для каждого требуемого файла * ComputingElement вызывает метод getBestFile (), который возвращает * местоположение лучшей реплики файла в соответствии с * выбранным алгоритмом оптимизации, который может выполнять или не выполнять * репликацию. ComputingElement считывает файл из этого местоположения * и обрабатывает его. Время обработки файла рассчитывается как * время, указанное в файле параметров, деленное на количество рабочих * узлов в ComputingElement. *

* Каждый ComputingElement в настоящее время может запускать только одно задание за раз. * Информация о времени, затрачиваемом на каждое задание, может быть найдена в * выводе статистики в конце моделирования, если в файле параметров выбран уровень статистики 3 *, или в гистограммах времени задания *, если используется графический интерфейс пользователя. *

* Copyright (c) 2002 CERN, ITC-irst, PPARC, от имени ЕС DataGrid. * Условия лицензии см. В файле LICENSE или * http://www.edg.org/license.html *.

* @since JDK1.4 /*

public class SimpleComputingElement implements ComputingElement {

private static int _LastCEId = 0;

private GridSite _site; 
private String _ceName;  
private boolean _imAlive;
private boolean _paused = false;
private int _CEId;
private long _workingTime = 0;
private long _startRunning;
private long _totalJobTime = 0;
private Map _jobTimes = new LinkedHashMap();
private Map _jobTimesWithQueue = new LinkedHashMap();
private Map _jobFiles = new LinkedHashMap();

private int _jobsCompleted = 0;

protected JobHandler _inputJobHandler;
protected boolean _runnable = false;
protected boolean _active=false;
protected long _remoteReads = 0;
protected long _localReads = 0;
protected int _workerNodes = 0;
protected float _workerCapacity = 0;
protected GridTime _time;

public SimpleComputingElement( GridSite site,  int workerNodes, float capacity) {

    OptorSimParameters params = OptorSimParameters.getInstance();

    _time = GridTimeFactory.getGridTime();
    _site = site;
    _workerNodes = workerNodes;
    _workerCapacity = capacity;
    _CEId = ++_LastCEId;
    _ceName = "CE"+_CEId+"@"+_site;
    _inputJobHandler = new JobHandler( params.getMaxQueueSize());
    _imAlive = true;
    _site.registerCE( this);
    _startRunning = _time.getTimeMillis();
}

/**
 * Return a more meaningful name.
 * @return the CE's name
 */
public String toString() {
    return _ceName;
}

/**
 * Check whether this CE is active (processing jobs) or idle.
 */
public boolean active() {
    return _active;
}

/**
 * Check whether this CE is still running or has been shut down.
 */
public boolean imAlive() {
      return _imAlive;
}

/**
 * A method to return the input sandbox for this computing element.
 */
public JobHandler getJobHandler() {
    return _inputJobHandler;
}

/**
 * Method to get the site that this CE is on.
 * @return The site this CE is on.
 */
public GridSite getSite() {
    return _site;
}


/**
 * Method to give the name of this CE.
 * @return The name of this CE.
 */
public String getCeName() {
    return _ceName;
}

public int getWorkerNodes() {
    return _workerNodes;
}

/**
 * Method to check against our ID 
 */
public boolean iAm( int id) {
    return _CEId == id;
}

/**
 * Method to collate and return information relevant 
 * to this CE as a {@link Statistics} object.
 * @return The statistics of this CE
 */



public Statistics getStatistics() {
    Map stats = new HashMap();




    OptorSimParameters params = OptorSimParameters.getInstance();
    float _usage = _time.getTimeMillis() - _startRunning == 0 ? 0 : 100 *_workingTime/(_time.getTimeMillis() - _startRunning);
    stats.put("usage",  new Float(_usage));
    stats.put("remoteReads",  new Long(_remoteReads));
    stats.put("localReads",  new Long(_localReads));
    if( params.outputStatistics() ==3) {
                    /*  LinkedHashSet<String> lhs = new LinkedHashSet<String>();




        stats.put("jobTimes",  new LinkedHashMap( _jobTimes));
        stats.put("jobTimesWithQueue", new LinkedHashMap(_jobTimesWithQueue));
        stats.put("jobFiles",  new LinkedHashMap(_jobFiles));


        stats.put("numberOfJobs", new Integer(_jobsCompleted));
        stats.put("workerNodes",  new Integer(_workerNodes));
        stats.put("status",  new Boolean(_active));
        stats.put("queueLength",  new Integer(_inputJobHandler.getQueueSize()));
        stats.put("runnableStatus",  new Boolean(_runnable));
    }
    stats.put("totalJobTime",  new Float(_totalJobTime/(float)1000));


    long meanJobTime = 0;
    if (_jobsCompleted!=0)
       meanJobTime = _workingTime/_jobsCompleted;
      /////////////////////////////////////////
    stats.put("meanJobTime",  new Long(meanJobTime));

    return new Statistics(this, stats);
}

/**
 * When running, the ComputingElement processes all the jobs
 * submitted to it through the JobHandler, sleeping while the
 * JobHandler is empty. It is notified to shut down by the 
 * ResourceBroker.
 */
public void run() {

    // Boost our priority
    Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
    Double execTime;
    OptorSimParameters params = OptorSimParameters.getInstance();

    _runnable = true;

    // to keep thread running
    for( GridJob job=null; job != null || _imAlive; ) {

        _active=false;
        job=_inputJobHandler.get();  // This potentially blocks

        // We might get a null job from JobHandler, if so, skip any further activity
        if( job == null)
        continue;

        job.started();

        OptorSimOut.println(_ceName+"> starting to process "+job+" (queue length now "+
                   _inputJobHandler.getQueueSize()+")");
        _active=true;       

        // Install our optimiser
        Optimisable replicaOptimiser = OptimiserFactory.getOptimisable( _site);

        AccessPatternGenerator accessPatternGenerator 
        = AccessPatternGeneratorFactory.getAPGenerator(job);

        String[] logicalfilenames = new String[1];

        List filesAccessed  = new LinkedList();     

        for( String lfn = accessPatternGenerator.getNextFile();
         lfn != null; 
         lfn = accessPatternGenerator.getNextFile()) {

            filesAccessed.add(lfn);

            // Pack the logical file name into the expected structure:      
            logicalfilenames[0] = lfn;
            float[] fileFractions = new float[1];
            fileFractions[0] = (float)1.0;

                // Use optimiser to locate best replica of this file
            DataFile[] files = replicaOptimiser.getBestFile(logicalfilenames, 
                                            fileFractions);
            if( files.length != 1) {
                System.out.println( "ASSERT FAILED: CE, getBestFile return array with wrong number of entries: "+  files.length  +" != 1");
                continue; // skip to next file
            }

            if(files[0] == null) {
                System.out.println( _ceName + "> ERROR getBestFile returned"+
                    " null for "+logicalfilenames[0]);
                continue; // skip to next file
            }

            StorageElement fileSE = files[0].se();
            GridSite fileSite = fileSE.getGridSite();

            // Special case.  If file is remote, then simulate the remoteIO, unPin and move on to next file.
            if( _site != fileSite) {
                simulateRemoteIO( files[0], fileFractions[0]);

                // log this as an access on the close SE (if it exists!)
                if(_site.hasSEs())
                    _site.getCloseSE().accessFile(files[0]);

                if(_workerNodes != 0) {
                    execTime = new Double((job.getLatency() + job.getLinearFactor()*files[0].size())/(_workerNodes*_workerCapacity));
                    _time.gtSleep(execTime.longValue());
                }
                files[0].releasePin();
                _remoteReads++;
                continue;
            }
            else {
                fileSE.accessFile(files[0]);
                _localReads++;
            }

            // process the file
            if(_workerNodes != 0) {
                execTime = new Double((job.getLatency() + job.getLinearFactor()*files[0].size())/(_workerNodes*_workerCapacity));
//              System.out.println(this.toString()+"> processing file...");
                _time.gtSleep(execTime.longValue());
            }

            files[0].releasePin();

            //A while loop the ce enters when paused by gui
            while(_paused){
                _time.gtWait(this);
            }

        } // for each datafile in job

        // statistics logging
        long duration = _time.getTimeMillis() - job.timeStarted();
        long durationWithQueue = _time.getTimeMillis() - job.timeScheduled();
        if( duration < 0) {
            OptorSimOut.println("BUG> Duration < 0!!");
        }
        _totalJobTime += durationWithQueue;
        _workingTime += duration;
        _jobsCompleted++;


        if( params.outputStatistics() == 3 || params.useGui()) {
            _jobTimes.put(job.toString(), new Long(duration));
            _jobTimesWithQueue.put(job.toString(), new Long(durationWithQueue));
            _jobFiles.put( job.toString(), filesAccessed);
        }

    } // while there are jobs left to run        
    _runnable = false;
} // run


/**
 * A routine used by the CE to simulate remote IO. The GridContainer's copy() method is
 * used to block the equivalent amount of time.
 */
protected void simulateRemoteIO( DataFile remoteFile, float fraction) 
{
    GridContainer gc = GridContainer.getInstance();
    gc.copy( remoteFile, _site, fraction);
}


/**
 * GUI calls this method to pause the ComputingElement
 * threads when pause button is pressed.
 */
public void pauseCE() {
    _paused = true;
}

/**
 * GUI calls this method to unpause the ComputingElement
 * threads when continue button is pressed.
 */
public void unpauseCE() {
    _paused = false;
    _time.gtNotify(this);
}

/**
 * The ResourceBroker calls this method when it has
 * distributed all the jobs to shut down the ComputingElement
 * threads.
 */
public void shutDownCE(){
    _imAlive = false;
}

}

at org.edg.data.replication.optorsim.SiteDataThread.run(SiteDataThread.java:112)

ошибка в линии

 st = ce.getStatistics();






{
              //get the statistics object for this comp. element
              ce = site.getCE();
              st = ce.getStatistics();

              //sample mean job time
              Object r1 = st.getStatistic("meanJobTime");
              String stat1 = r1.toString();
              int stat1Int = Integer.parseInt(stat1);
              seriesSMJTVTime.add(timeSecs, stat1Int);

              //sample job times
              Object r2 = st.getStatistic("jobTimes");
              Map m = (Map)r2;
              int pairs = m.size();
              //if (number of previous key-value pairs != pairs)
              //    instantiate new histarray and fill with job time values
              if (prevNoOfPairs!=pairs)
              {
                 histarray = new double[pairs];
                 int i=0;
                 prevNoOfPairs++;
                 Set keySet = m.keySet();
                 Iterator iter = keySet.iterator();
                 while (iter.hasNext())
                 {
                    Object key = iter.next();
                    Object value = m.get(key);
                    String duration = value.toString();
                    float jobTime = Float.parseFloat(duration);
                    histarray[i] = jobTime;
                    i++;
                 }
              }

              //sample usage
              Object r3 = st.getStatistic("usage");
              String stat3 = r3.toString();
              float coUsage = Float.parseFloat(stat3);
              /* if (range values identical for last three readings)
               *    remove intermediate statistic
               */
              if (coUsage==prevCoUsage&&coUsage==prevPrevCoUsage)
              {
                 int itemCount = seriesSSEUVTime.getItemCount();
                 if (itemCount>2)
                    seriesSSEUVTime.remove(itemCount-1);
              }
              prevPrevCoUsage = prevCoUsage;
              prevCoUsage = coUsage;
              seriesSCEUVTime.add(timeSecs, coUsage);
           }
        }

1 ответ

ConcurrentModificationException означает, что кто-то изменил элемент, который не является текущим потоком в то время, когда текущий поток используется в нем.

Так что какой-то другой поток может перебирать карту. Это вероятно один объект или одноэлементный объект из-за этой строки: OptorSimParameters.getInstance(); что используется в целом с дизайном скороговоркой Singelton.

Таким образом, это означает, что один и тот же объект изменился из 2 мест. Из кода, который вы показали, и другой части кода.

Если это с открытым исходным кодом, возможно, это никогда не работало, или это ошибка, которую вы обнаружили.. Вы действительно не имеете ничего общего с этим, кроме, возможно, чтобы найти, кто вызывает OptorSimParameters.getInstance(); и попытаться понять, кто еще вызывает этот метод и изменить или использовать эту карту

Надеюсь, что имеет смысл

РЕДАКТИРОВАТЬ:

Это исключение случается, когда кто-то как это происходит

Iterator<Point> iter = points.iterator();
while (iter.hasNext()) {
Point p = iter.next();

//Antoher loop inside this one that will just iterate the itmes
Iterator<Point> iter2 = points.iterator();
while (iter2.hasNext()) {
    Point p = iter2.next();
    if (p.equals(pointWeWantToRemove)) {
        iter2.remove();
    }
}

iter.remove();

Этот случай выше просто делает цикл внутри цикла. Какой внешний цикл удаляет элементы один за другим, а внутренний удаляет конкретную точку. Исключение будет выброшено в этом случае, потому что итераторы работают в одном и том же списке и удаляют элементы друг для друга. Как только итератор создан points.iterator() Это сохранит исходный размер коллекции и после этого он его проверяет:

if (initSize != collection.size()) { 
     throw new ConcurrentModificationException ();
}

Так что, если вы создали как-то 2 итератора, которые одновременно изменяют одну и ту же коллекцию, это произойдет... Вы показали одну часть кода, но если вы можете найти другой итератор, нам нужно подумать, как этого избежать. Если другой метод, который использовал коллекцию, просто читает ее, вы можете создать клон перед ее чтением, и перебрать клон коллекции, и это решит вашу проблему

Надеюсь, что имеет смысл

Другие вопросы по тегам