IObservable ObserveOn 正在锁定线程,这是可以预防的吗?
IObservable ObserveOn is locking the Thread, is this preventable?
我正在设计一个服务器,将客户端请求转移到一个专用于处理数据的线程。我这样做是为了防止正在处理的数据出现任何竞争条件或并发问题。因为服务器被设计成反应式的,每当服务器收到请求时,我都会使用 Observables 来通知程序的其余部分该请求。现在因为服务器套接字正在监听并从多个线程发出信号,所以我想确保可观察对象,无论服务器在哪个线程上发出,总是被观察到 在专用数据处理线程上。我选择使用 ObserveOn
方法,但这立即适得其反。我立即注意到,在一次可观察到的开火后,none 其他人也在开火。
不仅如此,发送到专用线程的其他操作也没有触发。
从本质上讲,observable 似乎是在为自己“声明”线程。该线程完全被 observable 阻塞,除了该 observable 的发射之外根本不能用于任何其他事情。我不希望发生这种情况,因为该线程专用于所有数据处理操作,这使我无法将该线程用于任何其他可观察对象或未来的数据处理任务。那么,我在这里有什么选择可以防止 observable 将线程锁定到自身,或者将 observable 的观察强制到我的专用线程而不阻塞其他 observable。
此示例代码演示了问题。在这里,我们使用单线程任务调度程序并注意到它运行良好,直到第一个主题(已设置为 ObserveOn
调度程序)发出它的字符串。发生这种情况后,不会触发进一步的主题或动作。第一个主题有效地为自己锁定了线程。
public static class Program
{
static void Main(string[] args)
{
//Within the Tester class we setup a single threaded task scheduler that will be handling all of these methods
var _t = new Tester();
string _string = "Hello World";
//These three will print their string to the console
_t.PrintDirectlyWithAction(_string);//Succeeds
_t.PrintDirectlyWithAction(_string);//Succeeds
_t.PrintDirectlyWithAction(_string);//Succeeds
//Only subject 1 will emit and print it's string, the other two fail
_t.PrintThroughSubject1(_string);//Succeeds
_t.PrintThroughSubject2(_string);//Fails
_t.PrintThroughSubject3(_string);//Fails
_t.PrintDirectlyWithAction(_string);//Fails
_t.PrintDirectlyWithAction(_string);//Fails
_t.PrintDirectlyWithAction(_string);//Fails
//We essentially can't do anything with the thread after subject 1 observed on it
Console.ReadLine();
}
public class Tester
{
TaskFactory tf;
TaskPoolScheduler pool;
int _actionCount = 0;
Subject<string> s1 = new Subject<string>();
Subject<string> s2 = new Subject<string>();
Subject<string> s3 = new Subject<string>();
public Tester()
{
//We're create a task pool that uses a single threaded concurrent task scheduler
var _scheduler = new ConcurrentExclusiveSchedulerPair();
tf = new TaskFactory(_scheduler.ExclusiveScheduler);
pool = new TaskPoolScheduler(tf);
//And then we set the subjects to each be observed on the single threaded scheduler
s1.ObserveOn(pool).Subscribe(_s => Console.WriteLine(
$"Subject (1) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
s2.ObserveOn(pool).Subscribe(_s => Console.WriteLine(
$"Subject (2) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
s3.ObserveOn(pool).Subscribe(_s => Console.WriteLine(
$"Subject (3) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
}
public void PrintThroughSubject1(string _string)
{
s1.OnNext(_string);
}
public void PrintThroughSubject2(string _string)
{
s2.OnNext(_string);
}
public void PrintThroughSubject3(string _string)
{
s3.OnNext(_string);
}
public void PrintDirectlyWithAction(string _string)
{
//This is here to demonstrate that the single threaded task scheduler accepts actions just fine
//and can handle them in sequence
tf.StartNew(() =>
{
Console.WriteLine(
$"Direct action ({_actionCount++}) says \"{_string}\" - on thread {Thread.CurrentThread.ManagedThreadId}");
});
}
}
}
TL;DR:我需要能够强制在特定线程上观察多个可观察对象发射,但 RxNet 似乎只允许在一个线程上观察单个主题,而其他任何东西都不能。我怎样才能避免这种情况以在同一线程上观察多个可观察对象?
我可能把它复杂化了。 EventLoopScheduler
可能就是您所需要的。
试试这个:
public static class Program
{
static void Main(string[] args)
{
//Within the Tester class we setup a single threaded task scheduler that will be handling all of these methods
var _t = new Tester();
string _string = "Hello World";
//These three will print their string to the console
_t.PrintDirectlyWithAction(_string);//Succeeds
_t.PrintDirectlyWithAction(_string);//Succeeds
_t.PrintDirectlyWithAction(_string);//Succeeds
//Only subject 1 will emit and print it's string, the other two fail
_t.PrintThroughSubject1(_string);//Succeeds
_t.PrintThroughSubject2(_string);//Fails
_t.PrintThroughSubject3(_string);//Fails
_t.PrintDirectlyWithAction(_string);//Fails
_t.PrintDirectlyWithAction(_string);//Fails
_t.PrintDirectlyWithAction(_string);//Fails
//We essentially can't do anything with the thread after subject 1 observed on it
Console.ReadLine();
}
public class Tester
{
private EventLoopScheduler els = new EventLoopScheduler();
int _actionCount = 0;
Subject<string> s1 = new Subject<string>();
Subject<string> s2 = new Subject<string>();
Subject<string> s3 = new Subject<string>();
public Tester()
{
//We're create a task pool that uses a single threaded concurrent task scheduler
//And then we set the subjects to each be observed on the single threaded scheduler
s1.ObserveOn(els).Subscribe(_s => Console.WriteLine(
$"Subject (1) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
s2.ObserveOn(els).Subscribe(_s => Console.WriteLine(
$"Subject (2) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
s3.ObserveOn(els).Subscribe(_s => Console.WriteLine(
$"Subject (3) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
}
public void PrintThroughSubject1(string _string)
{
s1.OnNext(_string);
}
public void PrintThroughSubject2(string _string)
{
s2.OnNext(_string);
}
public void PrintThroughSubject3(string _string)
{
s3.OnNext(_string);
}
public void PrintDirectlyWithAction(string _string)
{
//This is here to demonstrate that the single threaded task scheduler accepts actions just fine
//and can handle them in sequence
els.Schedule(() =>
{
Console.WriteLine(
$"Direct action ({_actionCount++}) says \"{_string}\" - on thread {Thread.CurrentThread.ManagedThreadId}");
});
}
}
}
我得到这个结果:
Direct action (0) says "Hello World" - on thread 17
Direct action (1) says "Hello World" - on thread 17
Direct action (2) says "Hello World" - on thread 17
Subject (1) says "Hello World" - on thread 17
Subject (2) says "Hello World" - on thread 17
Subject (3) says "Hello World" - on thread 17
Direct action (3) says "Hello World" - on thread 17
Direct action (4) says "Hello World" - on thread 17
Direct action (5) says "Hello World" - on thread 17
完成后别忘了.Dispose()
你的EventLoopScheduler
。
我正在设计一个服务器,将客户端请求转移到一个专用于处理数据的线程。我这样做是为了防止正在处理的数据出现任何竞争条件或并发问题。因为服务器被设计成反应式的,每当服务器收到请求时,我都会使用 Observables 来通知程序的其余部分该请求。现在因为服务器套接字正在监听并从多个线程发出信号,所以我想确保可观察对象,无论服务器在哪个线程上发出,总是被观察到 在专用数据处理线程上。我选择使用 ObserveOn
方法,但这立即适得其反。我立即注意到,在一次可观察到的开火后,none 其他人也在开火。
不仅如此,发送到专用线程的其他操作也没有触发。
从本质上讲,observable 似乎是在为自己“声明”线程。该线程完全被 observable 阻塞,除了该 observable 的发射之外根本不能用于任何其他事情。我不希望发生这种情况,因为该线程专用于所有数据处理操作,这使我无法将该线程用于任何其他可观察对象或未来的数据处理任务。那么,我在这里有什么选择可以防止 observable 将线程锁定到自身,或者将 observable 的观察强制到我的专用线程而不阻塞其他 observable。
此示例代码演示了问题。在这里,我们使用单线程任务调度程序并注意到它运行良好,直到第一个主题(已设置为 ObserveOn
调度程序)发出它的字符串。发生这种情况后,不会触发进一步的主题或动作。第一个主题有效地为自己锁定了线程。
public static class Program
{
static void Main(string[] args)
{
//Within the Tester class we setup a single threaded task scheduler that will be handling all of these methods
var _t = new Tester();
string _string = "Hello World";
//These three will print their string to the console
_t.PrintDirectlyWithAction(_string);//Succeeds
_t.PrintDirectlyWithAction(_string);//Succeeds
_t.PrintDirectlyWithAction(_string);//Succeeds
//Only subject 1 will emit and print it's string, the other two fail
_t.PrintThroughSubject1(_string);//Succeeds
_t.PrintThroughSubject2(_string);//Fails
_t.PrintThroughSubject3(_string);//Fails
_t.PrintDirectlyWithAction(_string);//Fails
_t.PrintDirectlyWithAction(_string);//Fails
_t.PrintDirectlyWithAction(_string);//Fails
//We essentially can't do anything with the thread after subject 1 observed on it
Console.ReadLine();
}
public class Tester
{
TaskFactory tf;
TaskPoolScheduler pool;
int _actionCount = 0;
Subject<string> s1 = new Subject<string>();
Subject<string> s2 = new Subject<string>();
Subject<string> s3 = new Subject<string>();
public Tester()
{
//We're create a task pool that uses a single threaded concurrent task scheduler
var _scheduler = new ConcurrentExclusiveSchedulerPair();
tf = new TaskFactory(_scheduler.ExclusiveScheduler);
pool = new TaskPoolScheduler(tf);
//And then we set the subjects to each be observed on the single threaded scheduler
s1.ObserveOn(pool).Subscribe(_s => Console.WriteLine(
$"Subject (1) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
s2.ObserveOn(pool).Subscribe(_s => Console.WriteLine(
$"Subject (2) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
s3.ObserveOn(pool).Subscribe(_s => Console.WriteLine(
$"Subject (3) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
}
public void PrintThroughSubject1(string _string)
{
s1.OnNext(_string);
}
public void PrintThroughSubject2(string _string)
{
s2.OnNext(_string);
}
public void PrintThroughSubject3(string _string)
{
s3.OnNext(_string);
}
public void PrintDirectlyWithAction(string _string)
{
//This is here to demonstrate that the single threaded task scheduler accepts actions just fine
//and can handle them in sequence
tf.StartNew(() =>
{
Console.WriteLine(
$"Direct action ({_actionCount++}) says \"{_string}\" - on thread {Thread.CurrentThread.ManagedThreadId}");
});
}
}
}
TL;DR:我需要能够强制在特定线程上观察多个可观察对象发射,但 RxNet 似乎只允许在一个线程上观察单个主题,而其他任何东西都不能。我怎样才能避免这种情况以在同一线程上观察多个可观察对象?
我可能把它复杂化了。 EventLoopScheduler
可能就是您所需要的。
试试这个:
public static class Program
{
static void Main(string[] args)
{
//Within the Tester class we setup a single threaded task scheduler that will be handling all of these methods
var _t = new Tester();
string _string = "Hello World";
//These three will print their string to the console
_t.PrintDirectlyWithAction(_string);//Succeeds
_t.PrintDirectlyWithAction(_string);//Succeeds
_t.PrintDirectlyWithAction(_string);//Succeeds
//Only subject 1 will emit and print it's string, the other two fail
_t.PrintThroughSubject1(_string);//Succeeds
_t.PrintThroughSubject2(_string);//Fails
_t.PrintThroughSubject3(_string);//Fails
_t.PrintDirectlyWithAction(_string);//Fails
_t.PrintDirectlyWithAction(_string);//Fails
_t.PrintDirectlyWithAction(_string);//Fails
//We essentially can't do anything with the thread after subject 1 observed on it
Console.ReadLine();
}
public class Tester
{
private EventLoopScheduler els = new EventLoopScheduler();
int _actionCount = 0;
Subject<string> s1 = new Subject<string>();
Subject<string> s2 = new Subject<string>();
Subject<string> s3 = new Subject<string>();
public Tester()
{
//We're create a task pool that uses a single threaded concurrent task scheduler
//And then we set the subjects to each be observed on the single threaded scheduler
s1.ObserveOn(els).Subscribe(_s => Console.WriteLine(
$"Subject (1) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
s2.ObserveOn(els).Subscribe(_s => Console.WriteLine(
$"Subject (2) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
s3.ObserveOn(els).Subscribe(_s => Console.WriteLine(
$"Subject (3) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
}
public void PrintThroughSubject1(string _string)
{
s1.OnNext(_string);
}
public void PrintThroughSubject2(string _string)
{
s2.OnNext(_string);
}
public void PrintThroughSubject3(string _string)
{
s3.OnNext(_string);
}
public void PrintDirectlyWithAction(string _string)
{
//This is here to demonstrate that the single threaded task scheduler accepts actions just fine
//and can handle them in sequence
els.Schedule(() =>
{
Console.WriteLine(
$"Direct action ({_actionCount++}) says \"{_string}\" - on thread {Thread.CurrentThread.ManagedThreadId}");
});
}
}
}
我得到这个结果:
Direct action (0) says "Hello World" - on thread 17
Direct action (1) says "Hello World" - on thread 17
Direct action (2) says "Hello World" - on thread 17
Subject (1) says "Hello World" - on thread 17
Subject (2) says "Hello World" - on thread 17
Subject (3) says "Hello World" - on thread 17
Direct action (3) says "Hello World" - on thread 17
Direct action (4) says "Hello World" - on thread 17
Direct action (5) says "Hello World" - on thread 17
完成后别忘了.Dispose()
你的EventLoopScheduler
。