Удержание запросов до тех пор, пока сообщение перед ConcurrentQueue не работает
У меня возникла проблема, связанная с тем, что мой ConcurrentQueue в синглтоне не обрабатывает элементы в правильном порядке. Я знаю, что это FIFO, поэтому я думаю, что, может быть, очередь в памяти не то же самое, или что-то не так с моим Dequeue? Я проверяю это, быстро отправляя 3 запроса почтальона на мою конечную точку API. Если кто-нибудь может помочь мне понять, почему они не идут друг за другом, я буду очень признателен!
На данный момент я склоняюсь к Queue.TryPeek не работает должным образом, так как 2-й и 3-й запросы, как представляется, стоят в очереди до того, как первый будет снят с очереди.
Поэтому, когда я запускаю приведенный ниже код, я вижу следующий вывод в консоли.
Queued message: Test 1
Sending message: Test 1
Queued message: Test 2
Sending message: Test 2
Dequeuing message: Test 2
Returning response: Test 2
Queued message: Test 3
Sending message: Test 3
Dequeuing message: Test 1
Returning response: Test 1
Dequeuing message: Test 3
Returning response: Test 3
Это мой метод контроллера API, который получает сообщение и ставит его в очередь, как только сообщение ставится в очередь, оно будет ждать, пока не увидит сообщение этого запроса спереди, а затем отправит его и затем выведет его из очереди.
контроллер
[HttpPost]
[Route("message")]
public IActionResult SendMessageUser([FromBody]Message request)
{
Console.WriteLine($"Queued message: {request.Message}");
_messageQueue.QueueAndWaitForTurn(request);
Console.WriteLine($"Sending message: {request.Message}");
var sendMessageResponse = _messageService.SendMessageToUser(request.Name, request.Message);
Console.WriteLine($"Dequeuing message: {request.Message}");
_messageQueue.DequeueMessage(request);
Console.WriteLine($"Returning response: {request.Message}");
return Ok(sendMessageResponse);
}
Что касается очереди, я связываю ее с IoC следующим образом:
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IMessageQueue, MessageQueue>();
services.AddScoped<IMessageService, MessageService>();
services.AddMvc();
}
И это мой синглтон класса Queue, я использую здесь Singleton, потому что мне бы хотелось, чтобы только 1 экземпляр этой очереди за время существования приложения.
public class MessageQueue : IMessageQueue
{
private Lazy<ConcurrentQueue<Message>> _queue =
new Lazy<ConcurrentQueue<Message>>(new ConcurrentQueue<Message>());
public ConcurrentQueue<Message> Queue
{
get
{
return _queue.Value;
}
}
public void QueueAndWaitForTurn(Message message)
{
Queue.Enqueue(message);
WaitForTurn();
}
public bool DequeueMessage(Message message)
{
var messageIsDequeued = Queue.TryDequeue(out message);
return messageIsDequeued;
}
public void WaitForTurn()
{
Message myMessage = null;
var myMessageIsNext = Queue.TryPeek(out myMessage);
while (!Queue.TryPeek(out myMessage))
{
Thread.Sleep(1000);
WaitForTurn();
}
}
}
1 ответ
Я бы создал вид FifoSemaphore:
public class FifoSemaphore : IDisposable
{
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly Queue<TaskCompletionSource<object>> _taskQueue = new Queue<TaskCompletionSource<object>>(10);
private readonly object _lockObject = new object();
public async Task WaitAsync()
{
// Enqueue a task
Task resultTask;
lock (_lockObject)
{
var tcs = new TaskCompletionSource<object>();
resultTask = tcs.Task;
_taskQueue.Enqueue(tcs);
}
// Wait for the lock
await _semaphore.WaitAsync();
// Dequeue the next item and set it to resolved (release back to API call)
TaskCompletionSource<object> queuedItem;
lock (_lockObject)
{
queuedItem = _taskQueue.Dequeue();
}
queuedItem.SetResult(null);
// Await our own task
await resultTask;
}
public void Release()
{
// Release the semaphore so another waiting thread can enter
_semaphore.Release();
}
public void Dispose()
{
_semaphore?.Dispose();
}
}
И затем используйте это так:
[HttpPost]
[Route("message")]
public async Task<IActionResult> SendMessageUser([FromBody]Message request)
{
try
{
await _fifoSemaphore.WaitAsync();
// process message code here
}
finally // important to have a finally to release the semaphore, so that even in the case of an exception, it can continue to process the next message
{
_fifoSemaphore.Release();
}
}
Идея состоит в том, что каждый ожидающий элемент сначала будет поставлен в очередь.
Затем мы ждем, чтобы семафор пустил нас (наш семафор допускает один элемент за раз).
Затем мы удаляем следующий ожидающий элемент и возвращаем его обратно в метод API.
Наконец, мы ждем завершения нашей собственной позиции в очереди и затем возвращаемся к методу API.
В методе API мы асинхронно ждем своей очереди, выполняем нашу задачу и затем возвращаемся. Включена команда try/finally, чтобы гарантировать, что семафор будет освобожден для последующих сообщений даже в случае сбоя.