Как обрабатывать данные, поступающие из нескольких потоков в одном потоке (с ConcurrentQueue)?
У меня есть заказы, поступающие из нескольких потоков, и я хочу обработать эти данные в одном потоке. Если я правильно понял, способ сделать это с ConcurrentQueue.
Я посмотрел на SO вопрос Как работать с многопоточностью с ConcurrentQueue
Я написал небольшое тестовое приложение (с.NET Core 2.1), чтобы посмотреть, смогу ли я заставить его работать.
Это то, что он должен сделать: сделать агрегаты для 100 заказов. Есть 3 агрегата для 3 разных типов заказов: Тип1, Тип2 и Тип3
Вывод должен быть примерно таким:
Type: Type1 Count: 38
Type: Type2 Count: 31
Type: Type3 Count: 31
Total for all types: 100
Я начал писать приложение без ConcurrentQueue. Как и ожидалось, результаты в _агрегатах неверны.
/* Incorrect version, not using ConcurrentQueue, showing incorrect results */
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
namespace ConcurrentQueue
{
class Program
{
private static readonly int OrderCount = 100;
private static IEnumerable<Order> _orders;
private static Dictionary<OrderTypeEnum, Aggregate> _aggregates;
static void Main(string[] args)
{
//Prepare
InitializeAggregates();
_orders = CreateOrders();
//Execute
MainAsync(args).GetAwaiter().GetResult();
}
static async Task MainAsync(string[] args)
{
await Task.Run(() => ProcessOrders());
ShowOutput();
}
public static void ProcessOrders()
{
var aggregator = new Aggregator();
Parallel.ForEach(_orders, order => {
aggregator.Aggregate(order, _aggregates);
});
}
private static IEnumerable<Order> CreateOrders()
{
var orderList = new Collection<Order>();
for (var i = 1; i <= OrderCount; i++)
{
var order = CreateOrder(i);
orderList.Add(order);
}
return orderList;
}
private static void InitializeAggregates()
{
_aggregates = new Dictionary<OrderTypeEnum, Aggregate>();
_aggregates[OrderTypeEnum.Type1] = new Aggregate();
_aggregates[OrderTypeEnum.Type2] = new Aggregate();
_aggregates[OrderTypeEnum.Type3] = new Aggregate();
}
private static Order CreateOrder(int id)
{
var order = new Order() { Id = id, OrderType = GetRandomAggregtationType() };
return order;
}
private static OrderTypeEnum GetRandomAggregtationType()
{
Array values = Enum.GetValues(typeof(OrderTypeEnum));
var random = new Random();
return (OrderTypeEnum)values.GetValue(random.Next(values.Length));
}
private static void ShowOutput()
{
Console.WriteLine($"Type: {OrderTypeEnum.Type1} Count: {_aggregates[OrderTypeEnum.Type1].Count}");
Console.WriteLine($"Type: {OrderTypeEnum.Type2} Count: {_aggregates[OrderTypeEnum.Type2].Count}");
Console.WriteLine($"Type: {OrderTypeEnum.Type3} Count: {_aggregates[OrderTypeEnum.Type3].Count}");
var total =
_aggregates[OrderTypeEnum.Type1].Count +
_aggregates[OrderTypeEnum.Type2].Count +
_aggregates[OrderTypeEnum.Type3].Count;
Console.WriteLine($"Total for all types: {total}");
Console.ReadKey();
}
}
public class Order
{
public int Id { get; set; }
public OrderTypeEnum OrderType { get; set; }
}
public class Aggregator
{
public void Aggregate(Order order, Dictionary<OrderTypeEnum, Aggregate> aggregates)
{
aggregates[order.OrderType].Count++;
}
}
public class Aggregate
{
public int Count { get; set; }
}
public enum OrderTypeEnum
{
Type1 = 1,
Type2 = 2,
Type3 = 3
}
}
Поэтому я переписал приложение, используя ConcurrentQueue. Результаты верны сейчас, но у меня есть ощущение, что я делаю это неправильно или это можно сделать более эффективно.
/* improved version using ConcurrentQueue, showing correct results */
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
namespace ConcurrentQueue
{
class Program
{
private static readonly int OrderCount = 100;
private static IEnumerable<Order> _orders;
private static Dictionary<OrderTypeEnum, Aggregate> _aggregates;
static void Main(string[] args)
{
//Prepare
InitializeAggregates();
_orders = CreateOrders();
//Execute
var proxy = new OrderProxy();
var ordersQueue = new ConcurrentQueue<OrderResult>();
Parallel.ForEach(_orders, order => {
var orderResult = proxy.PlaceOrder(order);
ordersQueue.Enqueue(orderResult);
});
foreach (var order in ordersQueue)
{
_aggregates[order.OrderType].Count++;
}
ShowOutput();
}
private static IEnumerable<Order> CreateOrders()
{
var orderList = new Collection<Order>();
for (var i = 1; i <= OrderCount; i++)
{
var order = CreateOrder(i);
orderList.Add(order);
}
return orderList;
}
private static void InitializeAggregates()
{
_aggregates = new Dictionary<OrderTypeEnum, Aggregate>();
_aggregates[OrderTypeEnum.Type1] = new Aggregate();
_aggregates[OrderTypeEnum.Type2] = new Aggregate();
_aggregates[OrderTypeEnum.Type3] = new Aggregate();
}
private static Order CreateOrder(int id)
{
var order = new Order() { Id = id, AggregateType = GetRandomAggregtationType() };
return order;
}
private static OrderTypeEnum GetRandomAggregtationType()
{
Array values = Enum.GetValues(typeof(OrderTypeEnum));
var random = new Random();
return (OrderTypeEnum)values.GetValue(random.Next(values.Length));
}
private static void ShowOutput()
{
Console.WriteLine($"Type: {OrderTypeEnum.Type1} Count: {_aggregates[OrderTypeEnum.Type1].Count}");
Console.WriteLine($"Type: {OrderTypeEnum.Type2} Count: {_aggregates[OrderTypeEnum.Type2].Count}");
Console.WriteLine($"Type: {OrderTypeEnum.Type3} Count: {_aggregates[OrderTypeEnum.Type3].Count}");
var total =
_aggregates[OrderTypeEnum.Type1].Count +
_aggregates[OrderTypeEnum.Type2].Count +
_aggregates[OrderTypeEnum.Type3].Count;
Console.WriteLine($"Total for all types: {total}");
Console.ReadKey();
}
}
public class Order
{
public int Id { get; set; }
public OrderTypeEnum AggregateType { get; set; }
}
public class OrderResult
{
public int Id { get; set; }
public OrderTypeEnum OrderType { get; set; }
}
public class OrderProxy
{
public OrderResult PlaceOrder(Order order)
{
var orderResult = new OrderResult() { Id = order.Id, OrderType = order.AggregateType };
return orderResult;
}
}
public class Aggregate
{
public OrderTypeEnum OrderType { get; set; }
public int Count { get; set; }
}
public enum OrderTypeEnum
{
Type1 = 1,
Type2 = 2,
Type3 = 3
}
}
Как видите, я добавляю объекты типа OrderResult в ConcurrentQueue. Мне не нужно использовать класс OrderResult. Конечно, я мог бы просто добавить порядок в очередь, выполнить итерации по ним и вычислить суммы после того, как я закончу извлечение данных. Это то, что я должен сделать? Я просто хочу обработать поступающие заказы, и просто сразу посчитать различные типы заказов и сохранить их в моей "коллекции агрегатов". Это возможно? Если да, то как?
3 ответа
По предложению самого Дэвида Фаулера я попытался использовать каналы System.Threading.Cannel для решения своей проблемы, и мне удалось найти что-то, что, кажется, работает правильно.
Библиотека System.Threading.Channels плохо документирована, поэтому я надеюсь, что то, что я сделал, было сделано именно так, как и предполагалось.
using System;
using System.Threading.Tasks;
using System.Threading.Channels;
using System.Threading;
using System.Collections.Generic;
namespace ConcurrentQueue
{
class Program
{
//Buffer for writing. After the capacity has been reached, a read must take place because the channel is full.
private static readonly int Capacity = 10;
//Number of orders to write by each writer. (Choose 0 for infinitive.)
private static readonly int NumberOfOrdersToWrite = 25;
//Delay in ms used
private static readonly int Delay = 50;
private static Dictionary<OrderTypeEnum, Aggregate> _aggregates;
static void Main(string[] args)
{
//Prepare
InitializeAggregates();
MainAsync(args).GetAwaiter().GetResult();
}
static async Task MainAsync(string[] args)
{
var channel = Channel.CreateBounded<Order>(Capacity);
var readerTask = Task.Run(() => ReadFromChannelAsync(channel.Reader));
var writerTask01 = Task.Run(() => WriteToChannelAsync(channel.Writer, 1, NumberOfOrdersToWrite));
var writerTask02 = Task.Run(() => WriteToChannelAsync(channel.Writer, 2, NumberOfOrdersToWrite));
var writerTask03 = Task.Run(() => WriteToChannelAsync(channel.Writer, 3, NumberOfOrdersToWrite));
var writerTask04 = Task.Run(() => WriteToChannelAsync(channel.Writer, 4, NumberOfOrdersToWrite));
while (!writerTask01.IsCompleted || !writerTask02.IsCompleted || !writerTask03.IsCompleted || !writerTask04.IsCompleted)
{
}
channel.Writer.Complete();
await channel.Reader.Completion;
ShowOutput();
}
public static async Task WriteToChannelAsync(ChannelWriter<Order> writer, int writerNumber, int numberOfOrdersToWrite = 0)
{
int i = 1;
while (numberOfOrdersToWrite == 0 || i <= numberOfOrdersToWrite)
{
var order = CreateOrder(writerNumber, i);
await writer.WriteAsync(order);
Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId}: writer {writerNumber} just wrote order {order.OrderNumber} with value {order.OrderType}.");
i++;
//await Task.Delay(Delay); //this simulates some work...
}
}
private static async Task ReadFromChannelAsync(ChannelReader<Order> reader)
{
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out Order order))
{
Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId}: reader just read order {order.OrderNumber} with value {order.OrderType}.");
_aggregates[order.OrderType].Count++;
await Task.Delay(Delay); //this simulates some work...
}
}
}
private static void InitializeAggregates()
{
_aggregates = new Dictionary<OrderTypeEnum, Aggregate>();
_aggregates[OrderTypeEnum.Type1] = new Aggregate();
_aggregates[OrderTypeEnum.Type2] = new Aggregate();
_aggregates[OrderTypeEnum.Type3] = new Aggregate();
}
private static Order CreateOrder(int writerNumber, int seq)
{
string orderNumber = $"{writerNumber}-{seq}";
var order = new Order() { OrderNumber = orderNumber, OrderType = GetRandomOrderType() };
return order;
}
private static OrderTypeEnum GetRandomOrderType()
{
Array values = Enum.GetValues(typeof(OrderTypeEnum));
var random = new Random();
return (OrderTypeEnum)values.GetValue(random.Next(values.Length));
}
private static void ShowOutput()
{
var total =
_aggregates[OrderTypeEnum.Type1].Count +
_aggregates[OrderTypeEnum.Type2].Count +
_aggregates[OrderTypeEnum.Type3].Count;
Console.WriteLine();
Console.WriteLine($"Type: {OrderTypeEnum.Type1} Count: {_aggregates[OrderTypeEnum.Type1].Count}");
Console.WriteLine($"Type: {OrderTypeEnum.Type2} Count: {_aggregates[OrderTypeEnum.Type2].Count}");
Console.WriteLine($"Type: {OrderTypeEnum.Type3} Count: {_aggregates[OrderTypeEnum.Type3].Count}");
Console.WriteLine($"Total for all types: {total}");
Console.WriteLine();
Console.WriteLine("Done! Press a key to close the window.");
Console.ReadKey();
}
}
public class Order
{
public string OrderNumber { get; set; }
public OrderTypeEnum OrderType { get; set; }
}
public class Aggregator
{
public void Aggregate(Order order, Dictionary<OrderTypeEnum, Aggregate> aggregates)
{
aggregates[order.OrderType].Count++;
}
}
public class Aggregate
{
public OrderTypeEnum OrderType { get; set; }
public int Count { get; set; }
}
public enum OrderTypeEnum
{
Type1 = 1,
Type2 = 2,
Type3 = 3
}
}
Мне не нравится, как я проверяю завершение работы авторов. Как улучшить это?
while (!writerTask01.IsCompleted || !writerTask02.IsCompleted ||
!writerTask03.IsCompleted || !writerTask04.IsCompleted)
{
}
Любая обратная связь высоко ценится.
Как предположил Шейн, использование оператора блокировки (в моем первом примере кода) работает:
public class Aggregator
{
private static readonly Object _lockObj = new Object();
public void Aggregate(Order order, Dictionary<OrderTypeEnum, Aggregate> aggregates)
{
lock (_lockObj)
{
aggregates[order.OrderType].Count++;
}
}
}
Я думаю, что DataFlow и System.Threading.Channels являются более гибкими и более элегантными решениями.
Вы второе решение, используя ConcurrentQueue<T>
на самом деле не выполняет агрегацию одновременно. Это только добавление элементов в очередь одновременно и последующая обработка очереди. Для этого конкретного примера кода самым простым решением было бы использовать первое решение, которое вы придумали, за исключением lock
вокруг приращения в Aggregator.Aggregate
метод, как это:
public class Aggregator
{
public void Aggregate(Order order, Dictionary<OrderTypeEnum, Aggregate> aggregates)
{
var aggregate = aggregates[order.OrderType];
Interlocked.Increment(ref aggregate.Count);
}
}