Удержание запросов до тех пор, пока сообщение перед ConcurrentQueue не работает

У меня возникла проблема, связанная с тем, что мой ConcurrentQueue в синглтоне не обрабатывает элементы в правильном порядке. Я знаю, что это FIFO, поэтому я думаю, что, может быть, очередь в памяти не то же самое, или что-то не так с моим Dequeue? Я проверяю это, быстро отправляя 3 запроса почтальона на мою конечную точку API. Если кто-нибудь может помочь мне понять, почему они не идут друг за другом, я буду очень признателен!

На данный момент я склоняюсь к Queue.TryPeek не работает должным образом, так как 2-й и 3-й запросы, как представляется, стоят в очереди до того, как первый будет снят с очереди.

Поэтому, когда я запускаю приведенный ниже код, я вижу следующий вывод в консоли.

Queued message: Test 1
Sending message: Test 1
Queued message: Test 2
Sending message: Test 2
Dequeuing message: Test 2
Returning response: Test 2
Queued message: Test 3
Sending message: Test 3
Dequeuing message: Test 1
Returning response: Test 1
Dequeuing message: Test 3
Returning response: Test 3

Это мой метод контроллера API, который получает сообщение и ставит его в очередь, как только сообщение ставится в очередь, оно будет ждать, пока не увидит сообщение этого запроса спереди, а затем отправит его и затем выведет его из очереди.

контроллер

[HttpPost]
[Route("message")]
public IActionResult SendMessageUser([FromBody]Message request)
{
    Console.WriteLine($"Queued message: {request.Message}");
    _messageQueue.QueueAndWaitForTurn(request);
    Console.WriteLine($"Sending message: {request.Message}");
    var sendMessageResponse = _messageService.SendMessageToUser(request.Name, request.Message);
    Console.WriteLine($"Dequeuing message: {request.Message}");
    _messageQueue.DequeueMessage(request);
    Console.WriteLine($"Returning response: {request.Message}");
    return Ok(sendMessageResponse);
}

Что касается очереди, я связываю ее с IoC следующим образом:

public void ConfigureServices(IServiceCollection services)
{
    services.AddSingleton<IMessageQueue, MessageQueue>();
    services.AddScoped<IMessageService, MessageService>();

    services.AddMvc();
}

И это мой синглтон класса Queue, я использую здесь Singleton, потому что мне бы хотелось, чтобы только 1 экземпляр этой очереди за время существования приложения.

public class MessageQueue : IMessageQueue
{
    private Lazy<ConcurrentQueue<Message>> _queue = 
        new Lazy<ConcurrentQueue<Message>>(new ConcurrentQueue<Message>());

    public ConcurrentQueue<Message> Queue
    {
        get
        {
            return _queue.Value;
        }
    }

    public void QueueAndWaitForTurn(Message message)
    {
        Queue.Enqueue(message);

        WaitForTurn();
    }

    public bool DequeueMessage(Message message)
    {
        var messageIsDequeued = Queue.TryDequeue(out message);

        return messageIsDequeued;
    }

    public void WaitForTurn()
    {
        Message myMessage = null;
        var myMessageIsNext = Queue.TryPeek(out myMessage);

        while (!Queue.TryPeek(out myMessage))
        {
            Thread.Sleep(1000);
            WaitForTurn();
        }
    }
}

1 ответ

Решение

Я бы создал вид FifoSemaphore:

public class FifoSemaphore : IDisposable
{
    private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
    private readonly Queue<TaskCompletionSource<object>> _taskQueue = new Queue<TaskCompletionSource<object>>(10);
    private readonly object _lockObject = new object();

    public async Task WaitAsync()
    {
        // Enqueue a task
        Task resultTask;
        lock (_lockObject)
        {
            var tcs = new TaskCompletionSource<object>();
            resultTask = tcs.Task;
            _taskQueue.Enqueue(tcs);
        }

        // Wait for the lock
        await _semaphore.WaitAsync();

        // Dequeue the next item and set it to resolved (release back to API call)
        TaskCompletionSource<object> queuedItem;
        lock (_lockObject)
        {
            queuedItem = _taskQueue.Dequeue();
        }
        queuedItem.SetResult(null);

        // Await our own task
        await resultTask;
    }

    public void Release()
    {
        // Release the semaphore so another waiting thread can enter
        _semaphore.Release();
    }

    public void Dispose()
    {
        _semaphore?.Dispose();
    }
}

И затем используйте это так:

[HttpPost]
[Route("message")]
public async Task<IActionResult> SendMessageUser([FromBody]Message request)
{
    try
    {
        await _fifoSemaphore.WaitAsync();
        // process message code here
    }
    finally // important to have a finally to release the semaphore, so that even in the case of an exception, it can continue to process the next message
    {
        _fifoSemaphore.Release();
    }
}

Идея состоит в том, что каждый ожидающий элемент сначала будет поставлен в очередь.

Затем мы ждем, чтобы семафор пустил нас (наш семафор допускает один элемент за раз).

Затем мы удаляем следующий ожидающий элемент и возвращаем его обратно в метод API.

Наконец, мы ждем завершения нашей собственной позиции в очереди и затем возвращаемся к методу API.

В методе API мы асинхронно ждем своей очереди, выполняем нашу задачу и затем возвращаемся. Включена команда try/finally, чтобы гарантировать, что семафор будет освобожден для последующих сообщений даже в случае сбоя.

Другие вопросы по тегам