ActionBlock 可以包含状态吗?
Can an ActionBlock contain a state?
我正在编写一个使用 TPL 数据流的应用程序。我正在尝试配置一个操作块来写入数据库。
但是,我需要这个动作块在它收到的第一条消息上执行初始化步骤(注意我必须等待第一条消息并且不能在动作块创建期间执行初始化)。
因此,我的操作块需要保持某种状态,指示它是否已经收到第一条消息。
ActionBlock 是否可以保持状态?
参考下面的 Microsoft 示例代码,我将如何向 ActionBlock 添加状态变量?它似乎只维护局部变量。
// Performs several computations by using dataflow and returns the elapsed
// time required to perform the computations.
static TimeSpan TimeDataflowComputations(int maxDegreeOfParallelism,
int messageCount)
{
// Create an ActionBlock<int> that performs some work.
var workerBlock = new ActionBlock<int>(
// Simulate work by suspending the current thread.
millisecondsTimeout => Thread.Sleep(millisecondsTimeout),
// Specify a maximum degree of parallelism.
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
});
// Compute the time that it takes for several messages to
// flow through the dataflow block.
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
for (int i = 0; i < messageCount; i++)
{
workerBlock.Post(1000);
}
workerBlock.Complete();
// Wait for all messages to propagate through the network.
workerBlock.Completion.Wait();
// Stop the timer and return the elapsed number of milliseconds.
stopwatch.Stop();
return stopwatch.Elapsed;
}
您可以实现自己的 StatefulActionBlock<T>
,就像这样。根据您的 MaxDegreeOfParallelism
,您可能不需要锁(即使您需要锁,也可能有更好的方法来实现线程安全)。感谢@TheodorZoulias 帮助我改进这种方法。
public class StatefulActionBlock<TInput, TState> : IDataflowBlock, ITargetBlock<TInput>
{
private bool _initialized;
private Action<TState> _initializer;
private object _lock = new object();
private ITargetBlock<TInput> _actionBlock;
private TState _state;
public Task Completion => _actionBlock.Completion;
public StatefulActionBlock(Action<TInput> action, Action<TState> initializer, TState state, ExecutionDataflowBlockOptions options)
{
//null checks omitted...
_initializer = initializer;
_actionBlock = new ActionBlock<TInput>(action, options);
_state = state;
}
void Initialize()
{
_initializer(_state);
_initialized = true;
}
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, bool consumeToAccept)
{
lock (_lock)
{
if (!_initialized)
Initialize();
}
return _actionBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
public void Complete() =>
_actionBlock.Complete();
public void Fault(Exception exception) =>
_actionBlock.Fault(exception);
}
您还可以锁定并检查您是否已在 Action 中初始化。
private static object _lock = new Object();
private static bool _isInitialized = false;
// Performs several computations by using dataflow and returns the elapsed
// time required to perform the computations.
static TimeSpan TimeDataflowComputations(int maxDegreeOfParallelism, int messageCount)
{
// Create an ActionBlock<int> that performs some work.
var workerBlock = new ActionBlock<int>(
// Simulate work by suspending the current thread.
DoStuff,
// Specify a maximum degree of parallelism.
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
});
// Compute the time that it takes for several messages to
// flow through the dataflow block.
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
for (int i = 0; i < messageCount; i++)
{
workerBlock.Post(1000);
}
workerBlock.Complete();
// Wait for all messages to propagate through the network.
workerBlock.Completion.Wait();
// Stop the timer and return the elapsed number of milliseconds.
stopwatch.Stop();
return stopwatch.Elapsed;
}
private static void DoStuff(int i)
{
lock (_lock)
{
if (!_initialized)
{
Initialize();
_initialized = true;
}
}
Thread.Sleep(i); //make a snack
}
private static void Initialize()
{
//...
}
我正在编写一个使用 TPL 数据流的应用程序。我正在尝试配置一个操作块来写入数据库。
但是,我需要这个动作块在它收到的第一条消息上执行初始化步骤(注意我必须等待第一条消息并且不能在动作块创建期间执行初始化)。
因此,我的操作块需要保持某种状态,指示它是否已经收到第一条消息。
ActionBlock 是否可以保持状态?
参考下面的 Microsoft 示例代码,我将如何向 ActionBlock 添加状态变量?它似乎只维护局部变量。
// Performs several computations by using dataflow and returns the elapsed
// time required to perform the computations.
static TimeSpan TimeDataflowComputations(int maxDegreeOfParallelism,
int messageCount)
{
// Create an ActionBlock<int> that performs some work.
var workerBlock = new ActionBlock<int>(
// Simulate work by suspending the current thread.
millisecondsTimeout => Thread.Sleep(millisecondsTimeout),
// Specify a maximum degree of parallelism.
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
});
// Compute the time that it takes for several messages to
// flow through the dataflow block.
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
for (int i = 0; i < messageCount; i++)
{
workerBlock.Post(1000);
}
workerBlock.Complete();
// Wait for all messages to propagate through the network.
workerBlock.Completion.Wait();
// Stop the timer and return the elapsed number of milliseconds.
stopwatch.Stop();
return stopwatch.Elapsed;
}
您可以实现自己的 StatefulActionBlock<T>
,就像这样。根据您的 MaxDegreeOfParallelism
,您可能不需要锁(即使您需要锁,也可能有更好的方法来实现线程安全)。感谢@TheodorZoulias 帮助我改进这种方法。
public class StatefulActionBlock<TInput, TState> : IDataflowBlock, ITargetBlock<TInput>
{
private bool _initialized;
private Action<TState> _initializer;
private object _lock = new object();
private ITargetBlock<TInput> _actionBlock;
private TState _state;
public Task Completion => _actionBlock.Completion;
public StatefulActionBlock(Action<TInput> action, Action<TState> initializer, TState state, ExecutionDataflowBlockOptions options)
{
//null checks omitted...
_initializer = initializer;
_actionBlock = new ActionBlock<TInput>(action, options);
_state = state;
}
void Initialize()
{
_initializer(_state);
_initialized = true;
}
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, bool consumeToAccept)
{
lock (_lock)
{
if (!_initialized)
Initialize();
}
return _actionBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
public void Complete() =>
_actionBlock.Complete();
public void Fault(Exception exception) =>
_actionBlock.Fault(exception);
}
您还可以锁定并检查您是否已在 Action 中初始化。
private static object _lock = new Object();
private static bool _isInitialized = false;
// Performs several computations by using dataflow and returns the elapsed
// time required to perform the computations.
static TimeSpan TimeDataflowComputations(int maxDegreeOfParallelism, int messageCount)
{
// Create an ActionBlock<int> that performs some work.
var workerBlock = new ActionBlock<int>(
// Simulate work by suspending the current thread.
DoStuff,
// Specify a maximum degree of parallelism.
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
});
// Compute the time that it takes for several messages to
// flow through the dataflow block.
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
for (int i = 0; i < messageCount; i++)
{
workerBlock.Post(1000);
}
workerBlock.Complete();
// Wait for all messages to propagate through the network.
workerBlock.Completion.Wait();
// Stop the timer and return the elapsed number of milliseconds.
stopwatch.Stop();
return stopwatch.Elapsed;
}
private static void DoStuff(int i)
{
lock (_lock)
{
if (!_initialized)
{
Initialize();
_initialized = true;
}
}
Thread.Sleep(i); //make a snack
}
private static void Initialize()
{
//...
}