Может ли блок действий потока данных TPL быть сброшен после сбоя?
У меня есть блок действий потока данных TPL, который я использую для получения триггерных сообщений для камеры, а затем выполняю некоторую обработку. Если задача обработки выдает исключение, блок Action Block переходит в состояние ошибки. Я хотел бы отправить сообщение об ошибке в мой пользовательский интерфейс и отправить сообщение сброса в блок Action Block, чтобы он мог продолжить обработку входящих триггерных сообщений. Есть ли способ вернуть Action Block в состояние готовности (сбросить ошибку)?
Код для любопытных:
using System.Threading.Tasks.Dataflow;
namespace Anonymous
{
/// <summary>
/// Provides a messaging system between objects that inherit from Actor
/// </summary>
public abstract class Actor
{
//The Actor uses an ActionBlock from the DataFlow library. An ActionBlock has an input queue you can
// post messages to and an action that will be invoked for each received message.
//The ActionBlock handles all of the threading issues internally so that we don't need to deal with
// threads or tasks. Thread-safety comes from the fact that ActionBlocks are serialized by default.
// If you send two messages to it at the same time it will buffer the second message until the first
// has been processed.
private ActionBlock<Message> _action;
...Properties omitted for brevity...
public Actor(string name, int id)
{
_name = name;
_id = id;
CreateActionBlock();
}
private void CreateActionBlock()
{
// We create an action that will convert the actor and the message to dynamic objects
// and then call the HandleMessage method. This means that the runtime will look up
// a method called ‘HandleMessage’ with a parameter of the message type and call it.
// in TPL Dataflow if an exception goes unhandled during the processing of a message,
// (HandleMessage) the exception will fault the block’s Completion task.
//Dynamic objects expose members such as properties and methods at run time, instead
// of at compile time. This enables you to create objects to work with structures that
// do not match a static type or format.
_action = new ActionBlock<Message>(message =>
{
dynamic self = this;
dynamic msg = message;
self.HandleMessage(msg); //implement HandleMessage in the derived class
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1 // This specifies a maximum degree of parallelism of 1.
// This causes the dataflow block to process messages serially.
});
}
/// <summary>
/// Send a message to an internal ActionBlock for processing
/// </summary>
/// <param name="message"></param>
public async void SendMessage(Message message)
{
if (message.Source == null)
throw new Exception("Message source cannot be null.");
try
{
_action.Post(message);
await _action.Completion;
message = null;
//in TPL Dataflow if an exception goes unhandled during the processing of a message,
// the exception will fault the block’s Completion task.
}
catch(Exception ex)
{
_action.Completion.Dispose();
//throw new Exception("ActionBlock for " + _name + " failed.", ex);
Trace.WriteLine("ActionBlock for " + _name + " failed." + ExceptionExtensions.GetFullMessage(ex));
if (_action.Completion.IsFaulted)
{
_isFaulted = true;
_faultReason = _name + " ActionBlock encountered an exception while processing task: " + ex.ToString();
FaultMessage msg = new FaultMessage { Source = _name, FaultReason = _faultReason, IsFaulted = _isFaulted };
OnFaulted(msg);
CreateActionBlock();
}
}
}
public event EventHandler<FaultMessageEventArgs> Faulted;
public void OnFaulted(FaultMessage message)
{
Faulted?.Invoke(this, new FaultMessageEventArgs { Message = message.Copy() });
message = null;
}
/// <summary>
/// Use to await the message processing result
/// </summary>
public Task Completion
{
get
{
_action.Complete();
return _action.Completion;
}
}
}
}
1 ответ
Необработанное исключение в ActionBlock похоже на необработанное исключение в приложении. Не делай этого. Обработайте исключение соответствующим образом.
В простейшем случае зарегистрируйте это или сделайте что-нибудь внутри делегата блока. В более сложных сценариях вы можете использовать TransformBlock вместо ActionBlock и отправлять сообщения Succes или Failure в нижестоящие блоки.
Код, который вы разместили, имеет некоторые критические проблемы. Блоки потоков данных не являются агентами, а агенты не являются блоками потоков данных. Конечно, вы можете использовать один для построения другого, но они представляют разные парадигмы. В этом случае ваш Actor
эмулирует ActionBlock
собственный API с несколькими ошибками.
Например, вам не нужно создавать SendAsync
У блоков уже есть один. Вы не должны завершать блок после отправки сообщения. Вы не сможете обрабатывать любые другие сообщения. Только звонок Complete()
когда вы действительно не хотите больше использовать ActionBlock. Вам не нужно устанавливать DOP 1, это значение по умолчанию.
Вы можете установить границы для DataflowBlock, чтобы он принимал только, например, 10 сообщений одновременно. В противном случае все сообщения будут буферизироваться, пока блок не найдет возможность их обработать.
Вы можете заменить весь этот код следующим:
void MyMethod(MyMessage message)
{
try
{
//...
}
catch(Exception exc)
{
//ToString logs the *complete exception, no need for anything more
_log.Error(exc.ToString());
}
}
var blockOptions new ExecutionDataflowBlockOptions {
BoundedCapacity=10,
NameFormat="Block for MyMessage {0} {1}"
};
var block=new ActionBlock<MyMessage>(MyMethod,blockOptions);
for(int i=0;i<10000;i++)
{
//Will await if more than 10 messages are waiting
await block.SendAsync(new MyMessage(i);
}
block.Complete();
//Await until all leftover messages are processed
await block.Completion;
Обратите внимание на звонок Exception.ToString()
, Это создаст строку, содержащую всю информацию об исключениях, включая стек вызовов.
NameFormat
позволяет указать шаблон имени для блока, который может быть заполнен во время выполнения внутренним именем блока и идентификатором задачи.