ReactiveExtensions:在它分离的任务完成之前阻止一个可观察的返回?
ReactiveExtensions: Stop an observable from returning before the tasks it has spun off have finished?
我是 Rx .NET 的新手,但我有一个我认为需要的业务场景。但是,我仍然无法理解初始设计。
问题
- 我有一大堆物品,比如说 600k。
- 我有办法从数据库中分批提取这些数据(假设一次提取 1,000 个)
- 我会 运行 并行处理这些项目,一次最多 x 数量(假设一次 50 个)
- 当我们完成后,我需要知道这一点,因为我需要吐出一些额外的统计数据并确保漫长的 运行ning 过程 returns。
这似乎是响应式扩展的理想选择——我有:
- 随时间推移提供列表的东西
- 我想对这些项目进行的一系列操作
- 需要处理错误
- 需要处理完成。
我从哪里开始
这似乎我会有一个项目列表作为可观察对象,我的 "looping" 从数据库中获取这些项目的过程会将它们 "push" 放入这个可观察对象中,然后订阅该可观察对象将接管。
我卡在哪里
- 我有点不确定语法
- 我有点不确定如何处理具有限制的 x 并行度
- 我不确定我如何真正知道何时完成。那时我从数据库中提取的循环会调用 "OnComplete()" 而不是 "OnNext" 吗?
我希望有人能帮助我从概念上分解我正在寻找的东西,这样我就可以更好地思考它。谢谢!
代码 v3 -- 好多了,但该方法仍然退出得太快。
这真的开始感觉好多了,但我知道还不太好ther.e
public override async Task ProcessAsync(DataLoadRequest dataLoadRequest, Func<string, Task> createTrackingPayload)
{
_requestParameters = Deserialize<SchoolETLRequestParameters>(dataLoadRequest.DataExtractorParams);
WireUpDependencies();
//This is the new retriever which allows records to be "paged" (e.g. returns empty list for pageNum > 0 on the ones that don't have paging.)
_recordsToProcessRetriever = new SettingBasedRecordsRetriever(_propertyRepository, _requestParameters.RunType, _requestParameters.ResidentialProfileIDOverrides, _processorSettings.MaxBatchesToProcess, _etlLogger);
var query = Observable.Range(0, int.MaxValue)
.Select(pageNum => _recordsToProcessRetriever.GetResProfIDsToProcess(pageNum, _processorSettings.BatchSize))
.TakeWhile(resProfList => resProfList.Any())
.SelectMany(records => records)
.Select(resProf => Observable.Start(() => Task.Run(()=> _schoolDataProcessor.ProcessSchoolsAsync(resProf)).Result))
.Merge(maxConcurrent: _processorSettings.ParallelProperties);
var subscription = query.Subscribe(async trackingRequests =>
{
await CreateRequests(trackingRequests, createTrackingPayload);
var numberOfAttachments = SumOfRequestType(trackingRequests, TrackingRecordRequestType.AttachSchool);
var numberOfDetachments = SumOfRequestType(trackingRequests, TrackingRecordRequestType.DetachSchool);
var numberOfAssignmentTypeUpdates = SumOfRequestType(trackingRequests, TrackingRecordRequestType.UpdateAssignmentType);
_etlLogger.Info("Extractor generated {0} attachments, {1} detachments, and {2} assignment type changes.",
numberOfAttachments, numberOfDetachments, numberOfAssignmentTypeUpdates);
},
() =>
{
_etlLogger.Info("Finished! Woohoo!");
});
}
v3 的问题
- ProcessAsync 方法仍然在后台处理完所有项目之前完成。通常我会接受,但在我们的例子中,我使用的框架需要等到所有跟踪请求都已创建(例如,直到为每批结果调用
CreateTrackingRequests
)。
是否可以等待所有操作在此期间完成?
更新:有关该问题的更多信息
在这种情况下,直到 运行 时间我们才知道什么会产生可观测值。该应用程序通过命令传递,相当于:
- "New Records":命中一个方法,该方法 return 是特定存储过程
的结果
- "Specific Record":用于测试;命中一个针对特定给定值命中单独存储过程的方法
- "All Records": 命中一个进入连续分页循环的方法,在 x 页(由设置定义)中循环 600k 条记录。
前两个场景听起来我可以毫无问题地将它们直接传递到一个可观察对象中。但是,在这种情况下,最后一个似乎我必须循环遍历一组可观察对象,这不是我想要的行为(我希望所有 600k 项目最终都在一个大队列中并在 50 处处理一次)。
我希望我能有一种方法"throws things on the queue",并让处理任务以 50 个为一组连续地从中提取。
注意:所有那些调用存储过程 return 的方法完全相同 - IThing
的列表(出于必要而混淆)。
我已经将所有这些存储库函数等连接到我的处理器 AS 依赖项中,因此调用 ProcessStuffForMyThing(List<IThing>)
会处理整个过程,并且可以使用相同的对象并行工作(不需要每次都要更新)。
首先,我不建议滚动您自己的枚举转换。如果您有 IEnumerable<T>
,您可以使用 .ToObservable()
扩展来为您处理枚举。
其次,您应该在 Subscribe
方法中处理 Observable
的结果,现在您的方法将 return 在枚举之后立即处理,因为您实际上并没有await async
方法中的任何内容。如果您必须使用当前的方法签名,那么您可以利用 Observable 也是可等待的。
这是我建议的代码结构(警告 未经测试):
public override async Task ProcessAsync(Request theRequest,
Func<string,Task> createTrackingPayload) // not my design^TM
{
// ...do stuff with the request, wire up some dependencies, etc.
//End goal is to call createTrackingPayload with some things.
await items.ToObservable()
.Select(thing => Observable.FromAsync(async () =>
{
var requests = await _dataProcessor.DoSomethingAsync(thing);
if (requests != null && requests.Any())
{
var numberOfType1 = SumOfRequestType(requests, TrackingRecordRequestType.Type1);
var numberOfType2 = SumOfRequestType(requests, TrackingRecordRequestType.DetachSchool);
var numberOfType3 = SumOfRequestType(requests, TrackingRecordRequestType.UpdateAssignmentType);
await CreateRequests(requests, createTrackingPayload); // something that will iterate over the list and call the function we need to call.
return requests.Count();
}
return 0;
}
}))
.Merge(maxConcurrent: _processorSettings.DegreeofParallelism)
.Do(x => _logger.Info("processed {0} items.", x))
.Aggregate(0, (acc, x) => acc + x);
}
基本上这里的想法是你等待 Observable 的完成,这实际上会给你 Observable 完成之前的最后一个值。通过添加 Do
和 Aggregate
,您可以将日志记录逻辑移出处理逻辑。
您的代码有很多问题需要解决。你犯的错误我见过很多次——每个人似乎都在走同一条路。它真的归结为将您的思维从程序化转变为功能化。
首先,Rx 有很多旨在让您的生活更轻松的运算符。其中之一是 Observable.Using
。它的工作是启动一次性资源,构建可观察对象,并在可观察对象完成时处理资源。非常适合从数据库中读取记录。
您的代码似乎有一个已经打开的数据库连接,并且您正在通过主题抽取记录。您应该避免使用外部状态(数据处理器),并且应该避免使用主题。几乎总有一个您可以使用的可观察运算符。
您正在做的另一件您可能不应该做的事情是混合您的 monad——或者更具体地说,observables 和任务。 Rx 中有一些运算符可以将任务转换为可观察对象,但它们是用来与现有代码交互的,不应该用作可观察对象中的工具。规则是尝试进入一个可观察对象并停留在那里,直到您准备好订阅您的数据。
我觉得你的代码有点零散,无法准确理解在哪里调用了什么,所以我写了一段通用代码,我认为它涵盖了你需要的东西。这是查询:
var pageSize = 4;
Func<Record, Result> process = r =>
{
Thread.Sleep(100); // Only here to demonstrate parallelism
return new Result(r.ID);
};
var query =
Observable
.Using(
() => new DataProcessor(),
dc =>
Observable
.Range(0, int.MaxValue)
.Select(n => dc.GetRecords(n, pageSize))
.TakeWhile(rs => rs.Any())
.SelectMany(rs => rs)
.Select(r => Observable.Start(() => process(r)))
.Merge(maxConcurrent: 4));
var subscription =
query
.Subscribe(
r => Console.WriteLine(r.ID),
() => Console.WriteLine("Done."));
我显然在你的代码中使用了一些快捷方式,但本质上是一样的(我希望)。
如果您添加以下 类:
,则此代码 运行 可用
public class DataProcessor : IDisposable
{
public DataProcessor() { Console.WriteLine("Opened."); }
public void Dispose() { Console.WriteLine("Closed."); }
public IEnumerable<Record> GetRecords(int page, int count)
{
Console.WriteLine("Reading.");
Thread.Sleep(100);
var records = page <= 5
? Enumerable
.Range(0, count < 5 ? count : count / 2)
.Select(x => new Record())
.ToArray()
: new Record[] { };
Console.WriteLine("Read.");
return records;
}
}
public class Record
{
private static int __counter = 0;
public Record() { this.ID = __counter++; }
public int ID { get; private set; }
}
public class Result
{
public Result(int id) { this.ID = id; }
public int ID { get; private set; }
}
当我 运行 它时,我得到了这个结果:
Opened.
Reading.
Read.
Reading.
0
2
3
1
Read.
Reading.
7
Read.
5
6
4
Reading.
10
11
9
8
Read.
Reading.
15
12
Read.
14
Reading.
13
17
19
18
16
Read.
Reading.
21
Read.
20
22
23
Done.
Closed.
可以看到是并行处理的。您可以看到 observable 正在完成。您还可以看到数据库正在打开,然后在 observable 完成后关闭。
如果有帮助请告诉我。
我在这里将功劳归功于 Enigmativity,因为他们的回答是将我带到我的(大部分)正确位置的原因。
下面是我需要的代码, 除外。
var query = Observable.Range(0, int.MaxValue)
.Select(pageNum =>
{
_etlLogger.Info("Calling GetResProfIDsToProcess with pageNum of {0}", pageNum);
return _recordsToProcessRetriever.GetResProfIDsToProcess(pageNum, _processorSettings.BatchSize);
})
.TakeWhile(resProfList => resProfList.Any())
.SelectMany(records => records.Where(x=> _determiner.ShouldProcess(x)))
.Select(resProf => Observable.Start(async () => await _schoolDataProcessor.ProcessSchoolsAsync(resProf)))
.Merge(maxConcurrent: _processorSettings.ParallelProperties)
.Do(async trackingRequests =>
{
await CreateRequests(trackingRequests.Result, createTrackingPayload);
var numberOfAttachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.AttachSchool);
var numberOfDetachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.DetachSchool);
var numberOfAssignmentTypeUpdates = SumOfRequestType(trackingRequests.Result,
TrackingRecordRequestType.UpdateAssignmentType);
_etlLogger.Info("Extractor generated {0} attachments, {1} detachments, and {2} assignment type changes.",
numberOfAttachments, numberOfDetachments, numberOfAssignmentTypeUpdates);
});
var subscription = query.Subscribe(
trackingRequests =>
{
//Nothing really needs to happen here. Technically we're just doing something when it's done.
},
() =>
{
_etlLogger.Info("Finished! Woohoo!");
});
await query.Wait();
我是 Rx .NET 的新手,但我有一个我认为需要的业务场景。但是,我仍然无法理解初始设计。
问题
- 我有一大堆物品,比如说 600k。
- 我有办法从数据库中分批提取这些数据(假设一次提取 1,000 个)
- 我会 运行 并行处理这些项目,一次最多 x 数量(假设一次 50 个)
- 当我们完成后,我需要知道这一点,因为我需要吐出一些额外的统计数据并确保漫长的 运行ning 过程 returns。
这似乎是响应式扩展的理想选择——我有:
- 随时间推移提供列表的东西
- 我想对这些项目进行的一系列操作
- 需要处理错误
- 需要处理完成。
我从哪里开始
这似乎我会有一个项目列表作为可观察对象,我的 "looping" 从数据库中获取这些项目的过程会将它们 "push" 放入这个可观察对象中,然后订阅该可观察对象将接管。
我卡在哪里
- 我有点不确定语法
- 我有点不确定如何处理具有限制的 x 并行度
- 我不确定我如何真正知道何时完成。那时我从数据库中提取的循环会调用 "OnComplete()" 而不是 "OnNext" 吗?
我希望有人能帮助我从概念上分解我正在寻找的东西,这样我就可以更好地思考它。谢谢!
代码 v3 -- 好多了,但该方法仍然退出得太快。
这真的开始感觉好多了,但我知道还不太好ther.e
public override async Task ProcessAsync(DataLoadRequest dataLoadRequest, Func<string, Task> createTrackingPayload)
{
_requestParameters = Deserialize<SchoolETLRequestParameters>(dataLoadRequest.DataExtractorParams);
WireUpDependencies();
//This is the new retriever which allows records to be "paged" (e.g. returns empty list for pageNum > 0 on the ones that don't have paging.)
_recordsToProcessRetriever = new SettingBasedRecordsRetriever(_propertyRepository, _requestParameters.RunType, _requestParameters.ResidentialProfileIDOverrides, _processorSettings.MaxBatchesToProcess, _etlLogger);
var query = Observable.Range(0, int.MaxValue)
.Select(pageNum => _recordsToProcessRetriever.GetResProfIDsToProcess(pageNum, _processorSettings.BatchSize))
.TakeWhile(resProfList => resProfList.Any())
.SelectMany(records => records)
.Select(resProf => Observable.Start(() => Task.Run(()=> _schoolDataProcessor.ProcessSchoolsAsync(resProf)).Result))
.Merge(maxConcurrent: _processorSettings.ParallelProperties);
var subscription = query.Subscribe(async trackingRequests =>
{
await CreateRequests(trackingRequests, createTrackingPayload);
var numberOfAttachments = SumOfRequestType(trackingRequests, TrackingRecordRequestType.AttachSchool);
var numberOfDetachments = SumOfRequestType(trackingRequests, TrackingRecordRequestType.DetachSchool);
var numberOfAssignmentTypeUpdates = SumOfRequestType(trackingRequests, TrackingRecordRequestType.UpdateAssignmentType);
_etlLogger.Info("Extractor generated {0} attachments, {1} detachments, and {2} assignment type changes.",
numberOfAttachments, numberOfDetachments, numberOfAssignmentTypeUpdates);
},
() =>
{
_etlLogger.Info("Finished! Woohoo!");
});
}
v3 的问题
- ProcessAsync 方法仍然在后台处理完所有项目之前完成。通常我会接受,但在我们的例子中,我使用的框架需要等到所有跟踪请求都已创建(例如,直到为每批结果调用
CreateTrackingRequests
)。
是否可以等待所有操作在此期间完成?
更新:有关该问题的更多信息
在这种情况下,直到 运行 时间我们才知道什么会产生可观测值。该应用程序通过命令传递,相当于:
- "New Records":命中一个方法,该方法 return 是特定存储过程 的结果
- "Specific Record":用于测试;命中一个针对特定给定值命中单独存储过程的方法
- "All Records": 命中一个进入连续分页循环的方法,在 x 页(由设置定义)中循环 600k 条记录。
前两个场景听起来我可以毫无问题地将它们直接传递到一个可观察对象中。但是,在这种情况下,最后一个似乎我必须循环遍历一组可观察对象,这不是我想要的行为(我希望所有 600k 项目最终都在一个大队列中并在 50 处处理一次)。
我希望我能有一种方法"throws things on the queue",并让处理任务以 50 个为一组连续地从中提取。
注意:所有那些调用存储过程 return 的方法完全相同 - IThing
的列表(出于必要而混淆)。
我已经将所有这些存储库函数等连接到我的处理器 AS 依赖项中,因此调用 ProcessStuffForMyThing(List<IThing>)
会处理整个过程,并且可以使用相同的对象并行工作(不需要每次都要更新)。
首先,我不建议滚动您自己的枚举转换。如果您有 IEnumerable<T>
,您可以使用 .ToObservable()
扩展来为您处理枚举。
其次,您应该在 Subscribe
方法中处理 Observable
的结果,现在您的方法将 return 在枚举之后立即处理,因为您实际上并没有await async
方法中的任何内容。如果您必须使用当前的方法签名,那么您可以利用 Observable 也是可等待的。
这是我建议的代码结构(警告 未经测试):
public override async Task ProcessAsync(Request theRequest,
Func<string,Task> createTrackingPayload) // not my design^TM
{
// ...do stuff with the request, wire up some dependencies, etc.
//End goal is to call createTrackingPayload with some things.
await items.ToObservable()
.Select(thing => Observable.FromAsync(async () =>
{
var requests = await _dataProcessor.DoSomethingAsync(thing);
if (requests != null && requests.Any())
{
var numberOfType1 = SumOfRequestType(requests, TrackingRecordRequestType.Type1);
var numberOfType2 = SumOfRequestType(requests, TrackingRecordRequestType.DetachSchool);
var numberOfType3 = SumOfRequestType(requests, TrackingRecordRequestType.UpdateAssignmentType);
await CreateRequests(requests, createTrackingPayload); // something that will iterate over the list and call the function we need to call.
return requests.Count();
}
return 0;
}
}))
.Merge(maxConcurrent: _processorSettings.DegreeofParallelism)
.Do(x => _logger.Info("processed {0} items.", x))
.Aggregate(0, (acc, x) => acc + x);
}
基本上这里的想法是你等待 Observable 的完成,这实际上会给你 Observable 完成之前的最后一个值。通过添加 Do
和 Aggregate
,您可以将日志记录逻辑移出处理逻辑。
您的代码有很多问题需要解决。你犯的错误我见过很多次——每个人似乎都在走同一条路。它真的归结为将您的思维从程序化转变为功能化。
首先,Rx 有很多旨在让您的生活更轻松的运算符。其中之一是 Observable.Using
。它的工作是启动一次性资源,构建可观察对象,并在可观察对象完成时处理资源。非常适合从数据库中读取记录。
您的代码似乎有一个已经打开的数据库连接,并且您正在通过主题抽取记录。您应该避免使用外部状态(数据处理器),并且应该避免使用主题。几乎总有一个您可以使用的可观察运算符。
您正在做的另一件您可能不应该做的事情是混合您的 monad——或者更具体地说,observables 和任务。 Rx 中有一些运算符可以将任务转换为可观察对象,但它们是用来与现有代码交互的,不应该用作可观察对象中的工具。规则是尝试进入一个可观察对象并停留在那里,直到您准备好订阅您的数据。
我觉得你的代码有点零散,无法准确理解在哪里调用了什么,所以我写了一段通用代码,我认为它涵盖了你需要的东西。这是查询:
var pageSize = 4;
Func<Record, Result> process = r =>
{
Thread.Sleep(100); // Only here to demonstrate parallelism
return new Result(r.ID);
};
var query =
Observable
.Using(
() => new DataProcessor(),
dc =>
Observable
.Range(0, int.MaxValue)
.Select(n => dc.GetRecords(n, pageSize))
.TakeWhile(rs => rs.Any())
.SelectMany(rs => rs)
.Select(r => Observable.Start(() => process(r)))
.Merge(maxConcurrent: 4));
var subscription =
query
.Subscribe(
r => Console.WriteLine(r.ID),
() => Console.WriteLine("Done."));
我显然在你的代码中使用了一些快捷方式,但本质上是一样的(我希望)。
如果您添加以下 类:
,则此代码 运行 可用public class DataProcessor : IDisposable
{
public DataProcessor() { Console.WriteLine("Opened."); }
public void Dispose() { Console.WriteLine("Closed."); }
public IEnumerable<Record> GetRecords(int page, int count)
{
Console.WriteLine("Reading.");
Thread.Sleep(100);
var records = page <= 5
? Enumerable
.Range(0, count < 5 ? count : count / 2)
.Select(x => new Record())
.ToArray()
: new Record[] { };
Console.WriteLine("Read.");
return records;
}
}
public class Record
{
private static int __counter = 0;
public Record() { this.ID = __counter++; }
public int ID { get; private set; }
}
public class Result
{
public Result(int id) { this.ID = id; }
public int ID { get; private set; }
}
当我 运行 它时,我得到了这个结果:
Opened.
Reading.
Read.
Reading.
0
2
3
1
Read.
Reading.
7
Read.
5
6
4
Reading.
10
11
9
8
Read.
Reading.
15
12
Read.
14
Reading.
13
17
19
18
16
Read.
Reading.
21
Read.
20
22
23
Done.
Closed.
可以看到是并行处理的。您可以看到 observable 正在完成。您还可以看到数据库正在打开,然后在 observable 完成后关闭。
如果有帮助请告诉我。
我在这里将功劳归功于 Enigmativity,因为他们的回答是将我带到我的(大部分)正确位置的原因。
下面是我需要的代码,
var query = Observable.Range(0, int.MaxValue)
.Select(pageNum =>
{
_etlLogger.Info("Calling GetResProfIDsToProcess with pageNum of {0}", pageNum);
return _recordsToProcessRetriever.GetResProfIDsToProcess(pageNum, _processorSettings.BatchSize);
})
.TakeWhile(resProfList => resProfList.Any())
.SelectMany(records => records.Where(x=> _determiner.ShouldProcess(x)))
.Select(resProf => Observable.Start(async () => await _schoolDataProcessor.ProcessSchoolsAsync(resProf)))
.Merge(maxConcurrent: _processorSettings.ParallelProperties)
.Do(async trackingRequests =>
{
await CreateRequests(trackingRequests.Result, createTrackingPayload);
var numberOfAttachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.AttachSchool);
var numberOfDetachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.DetachSchool);
var numberOfAssignmentTypeUpdates = SumOfRequestType(trackingRequests.Result,
TrackingRecordRequestType.UpdateAssignmentType);
_etlLogger.Info("Extractor generated {0} attachments, {1} detachments, and {2} assignment type changes.",
numberOfAttachments, numberOfDetachments, numberOfAssignmentTypeUpdates);
});
var subscription = query.Subscribe(
trackingRequests =>
{
//Nothing really needs to happen here. Technically we're just doing something when it's done.
},
() =>
{
_etlLogger.Info("Finished! Woohoo!");
});
await query.Wait();