Алгоритм циклического планирования в Java с использованием AtomicBoolean

Я хочу реализовать строгое циклическое планирование при отправке запросов во внешнюю систему. Есть два внешних системных сервера. Первый запрос должен идти к "System1", а второй - к "System2", а следующий к "System1" и так далее.

Поскольку у меня есть только два сервера для отправки запроса, и поскольку я хочу максимальной производительности без каких-либо блокировок и переключения контекста, я выбрал AtomicBoolean, поскольку он использует операцию CAS.

Мои классы реализации

1. RoundRobinTest.java

package com.concurrency;

import java.util.Iterator;

public class RoundRobinTest 
{
    public static void main(String[] args) 
    {
        for (int i = 0; i < 500; i++) 
        {
            new Thread(new RoundRobinLogic()).start();
        }
        try 
        {
            // Giving a few seconds for the threads to complete
            Thread.currentThread().sleep(2000);
            Iterator<String> output = RoundRobinLogic.output.iterator();
            int i=0;
            while (output.hasNext()) 
            {
                System.out.println(i+++":"+output.next());
                // Sleeping after each out.print 
                Thread.currentThread().sleep(20);
            }
        } 
        catch (Exception ex) 
        {
            // do nothing
        }
    }

}

2.RoundRobinLogic.java (Класс со статическим объектом AtomicBoolean)

package com.concurrency;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;

public class RoundRobinLogic implements Runnable 
{
    private static AtomicBoolean bool = new AtomicBoolean(true);

    public static Queue<String> output = new ConcurrentLinkedDeque<>();

    @Override
    public void run() 
    {
        if(bool.getAndSet(false))
        {
            // Sending the request to first system
            output.add("Request to System1");
        }
        else if(!bool.getAndSet(true))
        {
            // Sending the request to first system
            output.add("Request to System2");
        }       
    }

}

Выход:


......................
314:Request to System1
315:Request to System2
316:Request to System1
317:Request to System2
318:Request to System1
319:Request to System1
320:Request to System2
321:Request to System2
322:Request to System1
323:Request to System2
324:Request to System1
325:Request to System2
......................

Запросы 318 и 319 были отправлены на один и тот же сервер, и в этом сценарии происходит сбой AtomicBoolean. Для моего приложения 1000-2000 потоков могут одновременно обращаться к общему объекту. Из практики параллелизма Java на практике я увидел следующее.

при высоких уровнях конкуренции блокировка имеет тенденцию превосходить атомные переменные, но при более реалистичных уровнях конкуренции атомные переменные превосходят блокировки. Это связано с тем, что блокировка реагирует на конфликты путем приостановки потоков, сокращения использования процессора и синхронизации трафика на шине общей памяти. При низком или умеренном конфликте атомы обеспечивают лучшую масштабируемость; с высокой конкуренцией, замки предлагают лучшее предотвращение конкуренции. (Алгоритмы на основе CAS также превосходят алгоритмы на основе блокировок в системах с одним ЦП, поскольку CAS всегда успешно работает в системе с одним ЦП, за исключением маловероятного случая, когда поток прерывается в середине операции записи с изменением чтения.)

Теперь у меня есть следующие вопросы.

  1. Существует ли какой-либо другой эффективный неблокирующий способ, позволяющий добиться циклического пересылки запроса.
  2. Может ли AtomicBoolean потерпеть неудачу в условиях сильной конкуренции? Я понимаю, что производительность / пропускная способность могут снизиться из-за сильной конкуренции. Но в приведенном выше примере AtomicBoolean не удается. Зачем?

3 ответа

Решение

В дополнение к ответу Джона, более чистая и, возможно, чуть более эффективная реализация RoundRobinLogic будет использовать AtomicInteger или же AtomicLong, Это устраняет необходимость сравнивать текущее значение AtomicBoolean с новым значением:

class RoundRobinLogic implements Runnable
{
    private static final AtomicInteger systemIndex = new AtomicInteger(1);

    public static final Queue<String> output = new ConcurrentLinkedDeque<>();

    @Override
    public void run()
    {
        if (systemIndex.incrementAndGet() % 2 == 0) {
            // Sending the request to first system
            output.add("Request to System1");
        } else {
            // Sending the request to second system
            output.add("Request to System2");
        }
    }
}

И это позволит вам довольно легко распространить это на дополнительные системы:

class RemoteSystem
{
    private final String name;

    RemoteSystem(String name)
    {
        this.name = name;
    }

    public String name()
    {
        return name;
    }
}

class RoundRobinLogic implements Runnable
{
    private static final AtomicInteger systemIndex = new AtomicInteger(1);

    private static final RemoteSystem[] systems = new RemoteSystem[] {
        new RemoteSystem("System1"),
        new RemoteSystem("System2"),
        new RemoteSystem("System3"),
        new RemoteSystem("System4"),
    };

    public static final Queue<String> output = new ConcurrentLinkedDeque<>();

    @Override
    public void run()
    {
        RemoteSystem system = systems[systemIndex.incrementAndGet() % systems.length];

        // Sending the request to right system
        output.add("Request to " + system.name());
    }
}

Давайте предположим, что вы не используете Queue но API для реальной системы. Проблема, которую я вижу, связана с:

    if(bool.getAndSet(false))
    {
        // Sending the request to first system
        output.add("Request to System1");
    }
    else if(!bool.getAndSet(true))
    {
        // Sending the request to second system
        output.add("Request to System2");
    }     

Что если оба условия не сработают? Как это возможно? Представь, что войдя в первый if бул это true, Затем вы пытаетесь установить его в ложь, но другой поток побеждает вас, чтобы вы видели false, Затем вы попробуйте else if, А что если else if когда вы получаете ложный, но установлен в истинный купить другой поток? В этом случае обе попытки потерпят неудачу.

Я бы рефакторинг это выглядит так:

while(true){
  boolean current = bool.get();
  if(bool.compareAndSet(current, !current){
     if(current){ 
        //send request to first system
     } else {
        //send request to second system
     }
     return;
  }
}

Как упоминал Шон Брайт, поскольку вы добавляете в очередь, даже если вы реализуете ее, как я описал выше, вы все равно можете видеть некоторые значения не по порядку, потому что сама очередь не является частью синхронизации с AtomicBoolean.

Поскольку ваше требование в основном: реализовать атомарную операцию, которая

  1. вычисляет и переворачивает логическое значение (или оценивает по модулю и увеличивает счетчик в общем n дело серверов) и
  2. вставляет запись в очередь на основе результата шага 1,

вы не можете сделать это, сделав шаги 1 и 2 индивидуально поточно-ориентированными; Вы должны синхронизировать шаги 1 и 2 вместе.

Вот простая реализация, которая должна работать:

import java.util.LinkedList;
import java.util.Queue;

public class RoundRobinLogic implements Runnable 
{
    private static boolean bool = true;
    public static final Queue<String> OUTPUT = new LinkedList<String>();
    private static final Object LOCK = new Object();

    @Override
    public void run() {
        synchronized (LOCK) {
            OUTPUT.add(bool ? "Request to System1" : "Request to System2");
            bool = !bool;
        }
    }
}

По поводу ваших вопросов:

  1. Вы не можете избежать блокировки, если вам нужно синхронизировать две операции выше уровня процессора вместе. Занятия в java.util.concurrent.atomic использовать элементарные инструкции на уровне машины, поэтому код, использующий эти классы (обычно в зависимости от платформы), не нужно блокировать.
  2. В вашей реализации, AtomicBoolean не подвела. Вместо этого было условие гонки между чтением логического значения и добавлением элемента в очередь.
Другие вопросы по тегам