Алгоритм циклического планирования в Java с использованием AtomicBoolean
Я хочу реализовать строгое циклическое планирование при отправке запросов во внешнюю систему. Есть два внешних системных сервера. Первый запрос должен идти к "System1", а второй - к "System2", а следующий к "System1" и так далее.
Поскольку у меня есть только два сервера для отправки запроса, и поскольку я хочу максимальной производительности без каких-либо блокировок и переключения контекста, я выбрал AtomicBoolean, поскольку он использует операцию CAS.
Мои классы реализации
1. RoundRobinTest.java
package com.concurrency;
import java.util.Iterator;
public class RoundRobinTest
{
public static void main(String[] args)
{
for (int i = 0; i < 500; i++)
{
new Thread(new RoundRobinLogic()).start();
}
try
{
// Giving a few seconds for the threads to complete
Thread.currentThread().sleep(2000);
Iterator<String> output = RoundRobinLogic.output.iterator();
int i=0;
while (output.hasNext())
{
System.out.println(i+++":"+output.next());
// Sleeping after each out.print
Thread.currentThread().sleep(20);
}
}
catch (Exception ex)
{
// do nothing
}
}
}
2.RoundRobinLogic.java (Класс со статическим объектом AtomicBoolean)
package com.concurrency;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
public class RoundRobinLogic implements Runnable
{
private static AtomicBoolean bool = new AtomicBoolean(true);
public static Queue<String> output = new ConcurrentLinkedDeque<>();
@Override
public void run()
{
if(bool.getAndSet(false))
{
// Sending the request to first system
output.add("Request to System1");
}
else if(!bool.getAndSet(true))
{
// Sending the request to first system
output.add("Request to System2");
}
}
}
Выход:
......................
314:Request to System1
315:Request to System2
316:Request to System1
317:Request to System2
318:Request to System1
319:Request to System1
320:Request to System2
321:Request to System2
322:Request to System1
323:Request to System2
324:Request to System1
325:Request to System2
......................
Запросы 318 и 319 были отправлены на один и тот же сервер, и в этом сценарии происходит сбой AtomicBoolean. Для моего приложения 1000-2000 потоков могут одновременно обращаться к общему объекту. Из практики параллелизма Java на практике я увидел следующее.
при высоких уровнях конкуренции блокировка имеет тенденцию превосходить атомные переменные, но при более реалистичных уровнях конкуренции атомные переменные превосходят блокировки. Это связано с тем, что блокировка реагирует на конфликты путем приостановки потоков, сокращения использования процессора и синхронизации трафика на шине общей памяти. При низком или умеренном конфликте атомы обеспечивают лучшую масштабируемость; с высокой конкуренцией, замки предлагают лучшее предотвращение конкуренции. (Алгоритмы на основе CAS также превосходят алгоритмы на основе блокировок в системах с одним ЦП, поскольку CAS всегда успешно работает в системе с одним ЦП, за исключением маловероятного случая, когда поток прерывается в середине операции записи с изменением чтения.)
Теперь у меня есть следующие вопросы.
- Существует ли какой-либо другой эффективный неблокирующий способ, позволяющий добиться циклического пересылки запроса.
- Может ли AtomicBoolean потерпеть неудачу в условиях сильной конкуренции? Я понимаю, что производительность / пропускная способность могут снизиться из-за сильной конкуренции. Но в приведенном выше примере AtomicBoolean не удается. Зачем?
3 ответа
В дополнение к ответу Джона, более чистая и, возможно, чуть более эффективная реализация RoundRobinLogic
будет использовать AtomicInteger
или же AtomicLong
, Это устраняет необходимость сравнивать текущее значение AtomicBoolean
с новым значением:
class RoundRobinLogic implements Runnable
{
private static final AtomicInteger systemIndex = new AtomicInteger(1);
public static final Queue<String> output = new ConcurrentLinkedDeque<>();
@Override
public void run()
{
if (systemIndex.incrementAndGet() % 2 == 0) {
// Sending the request to first system
output.add("Request to System1");
} else {
// Sending the request to second system
output.add("Request to System2");
}
}
}
И это позволит вам довольно легко распространить это на дополнительные системы:
class RemoteSystem
{
private final String name;
RemoteSystem(String name)
{
this.name = name;
}
public String name()
{
return name;
}
}
class RoundRobinLogic implements Runnable
{
private static final AtomicInteger systemIndex = new AtomicInteger(1);
private static final RemoteSystem[] systems = new RemoteSystem[] {
new RemoteSystem("System1"),
new RemoteSystem("System2"),
new RemoteSystem("System3"),
new RemoteSystem("System4"),
};
public static final Queue<String> output = new ConcurrentLinkedDeque<>();
@Override
public void run()
{
RemoteSystem system = systems[systemIndex.incrementAndGet() % systems.length];
// Sending the request to right system
output.add("Request to " + system.name());
}
}
Давайте предположим, что вы не используете Queue
но API для реальной системы. Проблема, которую я вижу, связана с:
if(bool.getAndSet(false))
{
// Sending the request to first system
output.add("Request to System1");
}
else if(!bool.getAndSet(true))
{
// Sending the request to second system
output.add("Request to System2");
}
Что если оба условия не сработают? Как это возможно? Представь, что войдя в первый if
бул это true
, Затем вы пытаетесь установить его в ложь, но другой поток побеждает вас, чтобы вы видели false
, Затем вы попробуйте else if
, А что если else if
когда вы получаете ложный, но установлен в истинный купить другой поток? В этом случае обе попытки потерпят неудачу.
Я бы рефакторинг это выглядит так:
while(true){
boolean current = bool.get();
if(bool.compareAndSet(current, !current){
if(current){
//send request to first system
} else {
//send request to second system
}
return;
}
}
Как упоминал Шон Брайт, поскольку вы добавляете в очередь, даже если вы реализуете ее, как я описал выше, вы все равно можете видеть некоторые значения не по порядку, потому что сама очередь не является частью синхронизации с AtomicBoolean.
Поскольку ваше требование в основном: реализовать атомарную операцию, которая
- вычисляет и переворачивает логическое значение (или оценивает по модулю и увеличивает счетчик в общем
n
дело серверов) и - вставляет запись в очередь на основе результата шага 1,
вы не можете сделать это, сделав шаги 1 и 2 индивидуально поточно-ориентированными; Вы должны синхронизировать шаги 1 и 2 вместе.
Вот простая реализация, которая должна работать:
import java.util.LinkedList;
import java.util.Queue;
public class RoundRobinLogic implements Runnable
{
private static boolean bool = true;
public static final Queue<String> OUTPUT = new LinkedList<String>();
private static final Object LOCK = new Object();
@Override
public void run() {
synchronized (LOCK) {
OUTPUT.add(bool ? "Request to System1" : "Request to System2");
bool = !bool;
}
}
}
По поводу ваших вопросов:
- Вы не можете избежать блокировки, если вам нужно синхронизировать две операции выше уровня процессора вместе. Занятия в
java.util.concurrent.atomic
использовать элементарные инструкции на уровне машины, поэтому код, использующий эти классы (обычно в зависимости от платформы), не нужно блокировать. - В вашей реализации,
AtomicBoolean
не подвела. Вместо этого было условие гонки между чтением логического значения и добавлением элемента в очередь.