Пример многопоточной роли рабочего очереди Azure
У нас есть 4 очереди Azure, которые заполняются либо прямым REST API, либо предоставляемой нами службой WCF.
- Мы хотели бы иметь ОДНУ рабочую роль, чтобы контролировать все эти 4 очереди.
- Я имею в виду использование многопоточности, которая читает имя очереди и т. Д. Из конфигурации и вращает метод процесса (который читает сообщение из очереди и выполняет обработку)
Может ли кто-нибудь предоставить мне пример или руководство о том, как добиться этого в роли рабочего, пожалуйста?
Не слишком уверен, что вышеуказанного можно достичь без многопоточности, так как я новичок в многопоточности.
Спасибо
3 ответа
Вы можете запускать разные потоки для разных задач, но также следует учитывать и многопоточный подход (который может работать лучше или хуже в зависимости от того, что вы делаете с сообщениями):
while (true)
{
var msg = queue1.GetMessage();
if (msg != null)
{
didSomething = true;
// do something with it
queue1.DeleteMessage(msg);
}
msg = queue2.GetMessage();
if (msg != null)
{
didSomething = true;
// do something with it
queue2.DeleteMessage(msg);
}
// ...
if (!didSomething) Thread.Sleep(TimeSpan.FromSeconds(1)); // so I don't enter a tight loop with nothing to do
}
Вот наша текущая реализация, чтобы сделать именно то, что вы запрашиваете, но в лучшем виде (или так мы думаем). Тем не менее, этот код еще нуждается в серьезной очистке. Это функциональная версия 0.1 этого, хотя.
public class WorkerRole : RoleEntryPoint
{
public override void Run()
{
var logic = new WorkerAgent();
logic.Go(false);
}
public override bool OnStart()
{
// Initialize our Cloud Storage Configuration.
AzureStorageObject.Initialize(AzureConfigurationLocation.AzureProjectConfiguration);
return base.OnStart();
}
}
public class WorkerAgent
{
private const int _resistance_to_scaling_larger_queues = 9;
private Dictionary<Type, int> _queueWeights = new Dictionary<Type, int>
{
{typeof (Queue1.Processor), 1},
{typeof (Queue2.Processor), 1},
{typeof (Queue3.Processor), 1},
{typeof (Queue4.Processor), 1},
};
private readonly TimeSpan _minDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MinDelay")));
private readonly TimeSpan _maxDelay = TimeSpan.FromMinutes(Convert.ToDouble(RoleEnvironment.GetConfigurationSettingValue("MaxDelay")));
protected TimeSpan CurrentDelay { get; set; }
public Func<string> GetSpecificQueueTypeToProcess { get; set; }
/// <summary>
/// This is a superset collection of all Queues that this WorkerAgent knows how to process, and the weight of focus it should receive.
/// </summary>
public Dictionary<Type, int> QueueWeights
{
get
{
return _queueWeights;
}
set
{
_queueWeights = value;
}
}
public static TimeSpan QueueWeightCalibrationDelay
{
get { return TimeSpan.FromMinutes(15); }
}
protected Dictionary<Type, DateTime> QueueDelays = new Dictionary<Type, DateTime>();
protected Dictionary<Type, AzureQueueMetaData> QueueMetaData { get; set; }
public WorkerAgent(Func<string> getSpecificQueueTypeToProcess = null)
{
CurrentDelay = _minDelay;
GetSpecificQueueTypeToProcess = getSpecificQueueTypeToProcess;
}
protected IProcessQueues CurrentProcessor { get; set; }
/// <summary>
/// Processes queue request(s).
/// </summary>
/// <param name="onlyProcessOnce">True to only process one time. False to process infinitely.</param>
public void Go(bool onlyProcessOnce)
{
if (onlyProcessOnce)
{
ProcessOnce(false);
}
else
{
ProcessContinuously();
}
}
public void ProcessContinuously()
{
while (true)
{
// temporary hack to get this started.
ProcessOnce(true);
}
}
/// <summary>
/// Attempts to fetch and process a single queued request.
/// </summary>
public void ProcessOnce(bool shouldDelay)
{
PopulateQueueMetaData(QueueWeightCalibrationDelay);
if (shouldDelay)
{
Thread.Sleep(CurrentDelay);
}
var typesToPickFrom = new List<Type>();
foreach(var item in QueueWeights)
{
for (var i = 0; i < item.Value; i++)
{
typesToPickFrom.Add(item.Key);
}
}
var randomIndex = (new Random()).Next()%typesToPickFrom.Count;
var typeToTryAndProcess = typesToPickFrom[randomIndex];
CurrentProcessor = ObjectFactory.GetInstance(typeToTryAndProcess) as IProcessQueues;
CleanQueueDelays();
if (CurrentProcessor != null && !QueueDelays.ContainsKey(typeToTryAndProcess))
{
var errors = CurrentProcessor.Go();
var amountToDelay = CurrentProcessor.NumberProcessed == 0 && !errors.Any()
? _maxDelay // the queue was empty
: _minDelay; // else
QueueDelays[CurrentProcessor.GetType()] = DateTime.Now + amountToDelay;
}
else
{
ProcessOnce(true);
}
}
/// <summary>
/// This method populates/refreshes the QueueMetaData collection.
/// </summary>
/// <param name="queueMetaDataCacheLimit">Specifies the length of time to cache the MetaData before refreshing it.</param>
private void PopulateQueueMetaData(TimeSpan queueMetaDataCacheLimit)
{
if (QueueMetaData == null)
{
QueueMetaData = new Dictionary<Type, AzureQueueMetaData>();
}
var queuesWithoutMetaData = QueueWeights.Keys.Except(QueueMetaData.Keys).ToList();
var expiredQueueMetaData = QueueMetaData.Where(qmd => qmd.Value.TimeMetaDataWasPopulated < (DateTime.Now - queueMetaDataCacheLimit)).Select(qmd => qmd.Key).ToList();
var validQueueData = QueueMetaData.Where(x => !expiredQueueMetaData.Contains(x.Key)).ToList();
var results = new Dictionary<Type, AzureQueueMetaData>();
foreach (var queueProcessorType in queuesWithoutMetaData)
{
if (!results.ContainsKey(queueProcessorType))
{
var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
if (queueProcessor != null)
{
var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
var metaData = queue.GetMetaData();
results.Add(queueProcessorType, metaData);
QueueWeights[queueProcessorType] = (metaData.ApproximateMessageCount) == 0
? 1
: (int)Math.Log(metaData.ApproximateMessageCount, _resistance_to_scaling_larger_queues) + 1;
}
}
}
foreach (var queueProcessorType in expiredQueueMetaData)
{
if (!results.ContainsKey(queueProcessorType))
{
var queueProcessor = ObjectFactory.GetInstance(queueProcessorType) as IProcessQueues;
if (queueProcessor != null)
{
var queue = new AzureQueue(queueProcessor.PrimaryQueueName);
var metaData = queue.GetMetaData();
results.Add(queueProcessorType, metaData);
}
}
}
QueueMetaData = results.Union(validQueueData).ToDictionary(data => data.Key, data => data.Value);
}
private void CleanQueueDelays()
{
QueueDelays = QueueDelays.Except(QueueDelays.Where(x => x.Value < DateTime.Now)).ToDictionary(x => x.Key, x => x.Value);
}
}
Благодаря этому у нас есть отдельный класс, который знает, как обрабатывать каждую очередь, и он реализует IProcessQueues. Мы загружаем _queueWeights
Коллекция с каждым из тех типов, которые мы хотим обработать. Мы устанавливаем _resistance_to_scaling_larger_queues
постоянно контролировать, как мы хотим, чтобы это масштабироваться. Обратите внимание, что это масштабируется в логарифмической форме (см. PopulateQueueMetaData
метод). Ни одна очередь не имеет веса менее 1, даже если она содержит 0 элементов. Если вы установите PopulateQueueMetaData
в 10
затем при каждом увеличении величины на порядок "вес" этого типа увеличивается на 1. Например, если у вас есть QueueA с 0 элементами, QueueB с 0 элементами и QueueC с 10 элементами, то ваши соответствующие веса равны 1, 1 и 2. Это означает, что QueueC с вероятностью 50% будет обработан следующим, в то время как QueueA и QueueB имеют только 25% -ную вероятность обработки. Если в QueueC 100 предметов, то ваш вес равен 1, 1, 3, а ваши шансы на обработку составляют 20%, 20%, 60%. Это гарантирует, что ваши пустые очереди не будут забыты.
Другое дело, что это имеет _minDelay
а также _maxDelay
, Если этот код считает, что в очереди есть хотя бы 1 элемент, он продолжит обрабатывать его так же быстро, как и при _minDelay
темп. Однако, если в нем в последний раз было 0 элементов, он не будет обрабатываться быстрее, чем _maxDelay
темп. Таким образом, это означает, что если генератор случайных чисел поднимает очередь (независимо от веса), в которой есть 0 элементов, он просто пропустит попытку обработки и перейдет к следующей итерации. (В эту часть можно внести некоторую дополнительную оптимизацию для повышения эффективности транзакций с хранилищем, но это небольшое аккуратное дополнение.)
У нас есть пара пользовательских классов здесь (таких как AzureQueue
а также AzureQueueMetaData
) - один по сути является оберткой для CloudQueue
а другой хранит некоторую информацию, такую как приблизительное количество очередей - там ничего интересного (просто способ упростить код).
Опять же, я не называю этот "красивый" код, но некоторые довольно умные концепции реализованы и функциональны в этом коде. Используйте его по любой причине.:)
Наконец, написание такого кода позволяет нам иметь один проект, который может обрабатывать МНОГО больше очередей. Если мы обнаружим, что это просто не идет в ногу, мы можем легко масштабировать его до большего числа экземпляров, и это масштабируется для ВСЕХ очередей. В минимальном сценарии вы можете развернуть один экземпляр этого для мониторинга 3 очередей. Однако, если четвертая очередь начинает влиять на производительность (или вам нужна более высокая доступность), увеличьте это значение до 2 раз. Как только вы попали в 15 очередей, добавьте третью. 25 очередей, добавить 4-й экземпляр. Заведите нового клиента, и вам нужно обрабатывать МНОГО запросов очереди во всех системах, это нормально. Вращайте эту одну роль до 20 мгновений, пока не закончите, а затем вращайте их обратно вниз. Есть особенно неприятная очередь? Прокомментируйте эту очередь из _queueWeights
сбора, разверните, чтобы управлять остальными вашими очередями, а затем снова разверните его со всеми другими очередями, кроме этой, закомментированной из _queueWeights
сбора, а затем разверните его снова в другом наборе экземпляров и выполните отладку без а) вмешательства других процессоров Queue в вашу отладку и б) вмешательства в ваши другие процессоры QueueProcessors. В конечном счете, это обеспечивает МНОГО гибкости и эффективности.
Внутри цикла while рабочей роли запустите 4 потока, как будто вы пишете многопоточное приложение на C#. Конечно, вам нужно определить четыре разные функции потока, и эти функции должны иметь отдельные циклы while для опроса очередей. В конце цикла while работника просто дождитесь завершения потоков.