如何同步执行主题中的功能?
How do I execute functions in subject synchronously?
我要解决的问题是,我想从多个线程中启动一个异步操作,但一次只允许执行一个操作。
动作是与硬件设备通信,设备一次只能处理一个请求。
我的一个想法是将其与 System.Reactive.Subjects.Subject
同步。多个线程可以调用 OnNext
并且主体应该一个接一个地执行请求。我写了这个(可能非常幼稚)代码:
static void Main(string[] args)
{
var source = new System.Reactive.Subjects.Subject<Func<Task<int>>>();
source
// from http://code.fitness/post/2016/11/rx-selectmany-deep-dive.html
.SelectMany(async x => await x.Invoke())
.Subscribe(result => Console.WriteLine($"Work of index {result} completed"));
var noOfThreads = 3;
for (var i = 0; i < noOfThreads; i++)
{
var i1 = i;
var t = new Thread(() => source.OnNext(() => doWork(i1)));
t.Start();
}
Console.ReadKey();
}
static async Task<int> doWork(int index)
{
Console.WriteLine($"Start work {index}");
await Task.Delay(1000);
Console.WriteLine($"Stop work {index}");
return index;
}
我希望是这样的输出:
Start work 2
Stop work 2
Work of index 2 completed
Start work 0
Stop work 0
Work of index 0 completed
相反,我得到:
Start work 0
Start work 1
Start work 2
Stop work 1
Stop work 0
Work of index 1 completed
Work of index 0 completed
Stop work 2
Work of index 2 completed
这表明所有操作都是从一开始就开始的,不需要等待其他操作完成。我想知道 Reactive
是否是正确的方法,或者是否有其他聪明的方法可以完成我的任务。
编辑:提供更多背景信息为什么我需要这个:应用程序与设备通信。该设备具有串行接口,一次只能处理一个命令。所以我有一个线程不断获取状态更新,例如:
while (true)
{
ReadPosition();
ReadTempereatures();
ReadErrors();
}
然后是 ui,用户可以在其中启动设备上的某些操作。
我当前的解决方案是一个队列,我在其中排队我的命令。这行得通,但我想知道事件方法是否也行得通。
您正在混合使用 Rx、任务和线程。难怪它要离开 rails。选择一种方法 - Rx 是最好的,恕我直言 - 你应该没问题。
这是否足够:
static void Main(string[] args)
{
var source = new Subject<Func<int>>();
source
.Synchronize()
.Select(x => x())
.Subscribe(result => Console.WriteLine($"Work of index {result} completed"));
var noOfThreads = 3;
for (var i = 0; i < noOfThreads; i++)
{
var i1 = i;
var t = new Thread(() => source.OnNext(() => doWork(i1)));
t.Start();
}
Console.ReadLine();
}
static int doWork(int index)
{
Console.WriteLine($"Start work {index}");
Thread.Sleep(1000);
Console.WriteLine($"Stop work {index}");
return index;
}
这给出:
Start work 0
Stop work 0
Work of index 0 completed
Start work 2
Stop work 2
Work of index 2 completed
Start work 1
Stop work 1
Work of index 1 completed
关键是调用 .Synchronize()
将所有调用线程置于 Rx 契约之下。
我要解决的问题是,我想从多个线程中启动一个异步操作,但一次只允许执行一个操作。
动作是与硬件设备通信,设备一次只能处理一个请求。
我的一个想法是将其与 System.Reactive.Subjects.Subject
同步。多个线程可以调用 OnNext
并且主体应该一个接一个地执行请求。我写了这个(可能非常幼稚)代码:
static void Main(string[] args)
{
var source = new System.Reactive.Subjects.Subject<Func<Task<int>>>();
source
// from http://code.fitness/post/2016/11/rx-selectmany-deep-dive.html
.SelectMany(async x => await x.Invoke())
.Subscribe(result => Console.WriteLine($"Work of index {result} completed"));
var noOfThreads = 3;
for (var i = 0; i < noOfThreads; i++)
{
var i1 = i;
var t = new Thread(() => source.OnNext(() => doWork(i1)));
t.Start();
}
Console.ReadKey();
}
static async Task<int> doWork(int index)
{
Console.WriteLine($"Start work {index}");
await Task.Delay(1000);
Console.WriteLine($"Stop work {index}");
return index;
}
我希望是这样的输出:
Start work 2
Stop work 2
Work of index 2 completed
Start work 0
Stop work 0
Work of index 0 completed
相反,我得到:
Start work 0
Start work 1
Start work 2
Stop work 1
Stop work 0
Work of index 1 completed
Work of index 0 completed
Stop work 2
Work of index 2 completed
这表明所有操作都是从一开始就开始的,不需要等待其他操作完成。我想知道 Reactive
是否是正确的方法,或者是否有其他聪明的方法可以完成我的任务。
编辑:提供更多背景信息为什么我需要这个:应用程序与设备通信。该设备具有串行接口,一次只能处理一个命令。所以我有一个线程不断获取状态更新,例如:
while (true)
{
ReadPosition();
ReadTempereatures();
ReadErrors();
}
然后是 ui,用户可以在其中启动设备上的某些操作。 我当前的解决方案是一个队列,我在其中排队我的命令。这行得通,但我想知道事件方法是否也行得通。
您正在混合使用 Rx、任务和线程。难怪它要离开 rails。选择一种方法 - Rx 是最好的,恕我直言 - 你应该没问题。
这是否足够:
static void Main(string[] args)
{
var source = new Subject<Func<int>>();
source
.Synchronize()
.Select(x => x())
.Subscribe(result => Console.WriteLine($"Work of index {result} completed"));
var noOfThreads = 3;
for (var i = 0; i < noOfThreads; i++)
{
var i1 = i;
var t = new Thread(() => source.OnNext(() => doWork(i1)));
t.Start();
}
Console.ReadLine();
}
static int doWork(int index)
{
Console.WriteLine($"Start work {index}");
Thread.Sleep(1000);
Console.WriteLine($"Stop work {index}");
return index;
}
这给出:
Start work 0 Stop work 0 Work of index 0 completed Start work 2 Stop work 2 Work of index 2 completed Start work 1 Stop work 1 Work of index 1 completed
关键是调用 .Synchronize()
将所有调用线程置于 Rx 契约之下。