如何使用 .net reactive 通过 key many sources zip/merge?
How to zip/merge by key many sources using .net reactive?
我在列表中有很多来源(数量仅在 运行 时间已知)。
所有来源都发出相同类型的元素(数据)。
如何通过作为其属性之一的键(currentDate)对它们进行分组?然后我需要将它们转换为一个不同的元素 (FullData),仅当所有源都发出有效的数据元素时。因此,仅当每个源都针对特定日期时间发出有效数据时,才会发出 FullData。
class Program
{
static void Main(string[] args)
{
var rand = new Random();
List<IObservable<Data>> sources = new List<IObservable<Data>>();
//let's assume that value comes from a user
var sourcesCounter = 4;
for (int i = 0; i < sourcesCounter; i++)
{
sources.Add(
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(e => new Data
{
currentDate = DateTime.Now, //let's assume it is round to seconds
Samples = new List<double>(1000),
IsValid = rand.Next(5) < 4 //Generate true/false randomly
})
);
}
var merged = sources.Merge();
merged.Subscribe(
e =>
{
Console.WriteLine($"received: {e.currentDate.Second} {e.IsValid}");
},
ex => Console.WriteLine(ex),
() => Console.WriteLine("Completed - merged")
);
Console.ReadKey();
}
}
public class Data
{
public DateTime currentDate { get; set; }
public bool IsValid { get; set; }
public List<double> Samples { get; set; }
}
public class FullData
{
public DateTime currentDate { get; set; }
public List<List<double>> Samples { get; set; }
}
Zip
的重载需要 IEnumerable<IObservable<Data>>
。
试试这个:
var rand = new Random();
var sourcesCounter = 4;
IEnumerable<IObservable<Data>> sources =
Enumerable
.Range(0, sourcesCounter)
.Select(x =>
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(e => new Data()
{
currentDate = DateTime.Now, //let's assume it is round to seconds
Samples = new List<double>(1000),
IsValid = rand.Next(5) < 4 //Generate true/false randomly
}));
IObservable<IList<Data>> zipped = sources.Zip(values => values);
下面的代码按 currentDate
对数据进行分组,直到它获得特定键的无效数据。
var keys = new ConcurrentDictionary<DateTime, DateTime>();
var dataDictionary = new ConcurrentDictionary<DateTime, FullData>();
sources
.Merge()
.GroupByUntil(data => data.currentDate, s => s.Any(data => !data.IsValid)) // group by currentDate until an invalid data appears in the group (the last invalid data can be in this group)
.Where(g => keys.TryAdd(g.Key, g.Key)) // skip the reborned groups for the same key (they are created because of durationSelector, which controls the lifetime of a group)
.Merge() // switch to the previous flattened structure
.Where(data => data.IsValid) // remove the last invalid item emitted by GroupByUntil
.Subscribe(x =>
{
var fullData = dataDictionary.GetOrAdd(x.currentDate, f => new FullData { currentDate = x.currentDate, Samples = new List<List<double>>() });
fullData.Samples.Add(x.Samples);
Console.WriteLine($"received: {x.currentDate.ToLocalTime()} {x.IsValid} {string.Join(", ", x.Samples)}");
}, () => Console.WriteLine("Completed"));
Console.ReadKey();
foreach (var item in dataDictionary)
{
Console.WriteLine($"{item.Key.ToLocalTime()}, {string.Join(",", item.Value.Samples.SelectMany(t => t))}");
}
如果您知道所有可观察序列都是有限的,并且您需要创建FullData
仅当每个源仅发出有效数据,您可以使用不同的方法:
sources.Merge().ToList().Subscribe(list =>
{
var fullDataList = list
.GroupBy(data => data.currentDate)
.Where(g => g.All(data => data.IsValid))
.Select(g => new FullData { currentDate = g.Key, Samples = g.Select(data => data.Samples).ToList() });
foreach (var fullDataItem in fullDataList)
{
Console.WriteLine($"{fullDataItem.currentDate.ToLocalTime()}, {string.Join(",", fullDataItem.Samples.SelectMany(t => t))}");
}
});
以上代码等待所有 observable 完成,创建所有接收项目的列表,最后使用简单的 LINQ 查询生成 FullData
。
我在列表中有很多来源(数量仅在 运行 时间已知)。 所有来源都发出相同类型的元素(数据)。 如何通过作为其属性之一的键(currentDate)对它们进行分组?然后我需要将它们转换为一个不同的元素 (FullData),仅当所有源都发出有效的数据元素时。因此,仅当每个源都针对特定日期时间发出有效数据时,才会发出 FullData。
class Program
{
static void Main(string[] args)
{
var rand = new Random();
List<IObservable<Data>> sources = new List<IObservable<Data>>();
//let's assume that value comes from a user
var sourcesCounter = 4;
for (int i = 0; i < sourcesCounter; i++)
{
sources.Add(
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(e => new Data
{
currentDate = DateTime.Now, //let's assume it is round to seconds
Samples = new List<double>(1000),
IsValid = rand.Next(5) < 4 //Generate true/false randomly
})
);
}
var merged = sources.Merge();
merged.Subscribe(
e =>
{
Console.WriteLine($"received: {e.currentDate.Second} {e.IsValid}");
},
ex => Console.WriteLine(ex),
() => Console.WriteLine("Completed - merged")
);
Console.ReadKey();
}
}
public class Data
{
public DateTime currentDate { get; set; }
public bool IsValid { get; set; }
public List<double> Samples { get; set; }
}
public class FullData
{
public DateTime currentDate { get; set; }
public List<List<double>> Samples { get; set; }
}
Zip
的重载需要 IEnumerable<IObservable<Data>>
。
试试这个:
var rand = new Random();
var sourcesCounter = 4;
IEnumerable<IObservable<Data>> sources =
Enumerable
.Range(0, sourcesCounter)
.Select(x =>
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(e => new Data()
{
currentDate = DateTime.Now, //let's assume it is round to seconds
Samples = new List<double>(1000),
IsValid = rand.Next(5) < 4 //Generate true/false randomly
}));
IObservable<IList<Data>> zipped = sources.Zip(values => values);
下面的代码按 currentDate
对数据进行分组,直到它获得特定键的无效数据。
var keys = new ConcurrentDictionary<DateTime, DateTime>();
var dataDictionary = new ConcurrentDictionary<DateTime, FullData>();
sources
.Merge()
.GroupByUntil(data => data.currentDate, s => s.Any(data => !data.IsValid)) // group by currentDate until an invalid data appears in the group (the last invalid data can be in this group)
.Where(g => keys.TryAdd(g.Key, g.Key)) // skip the reborned groups for the same key (they are created because of durationSelector, which controls the lifetime of a group)
.Merge() // switch to the previous flattened structure
.Where(data => data.IsValid) // remove the last invalid item emitted by GroupByUntil
.Subscribe(x =>
{
var fullData = dataDictionary.GetOrAdd(x.currentDate, f => new FullData { currentDate = x.currentDate, Samples = new List<List<double>>() });
fullData.Samples.Add(x.Samples);
Console.WriteLine($"received: {x.currentDate.ToLocalTime()} {x.IsValid} {string.Join(", ", x.Samples)}");
}, () => Console.WriteLine("Completed"));
Console.ReadKey();
foreach (var item in dataDictionary)
{
Console.WriteLine($"{item.Key.ToLocalTime()}, {string.Join(",", item.Value.Samples.SelectMany(t => t))}");
}
如果您知道所有可观察序列都是有限的,并且您需要创建FullData
仅当每个源仅发出有效数据,您可以使用不同的方法:
sources.Merge().ToList().Subscribe(list =>
{
var fullDataList = list
.GroupBy(data => data.currentDate)
.Where(g => g.All(data => data.IsValid))
.Select(g => new FullData { currentDate = g.Key, Samples = g.Select(data => data.Samples).ToList() });
foreach (var fullDataItem in fullDataList)
{
Console.WriteLine($"{fullDataItem.currentDate.ToLocalTime()}, {string.Join(",", fullDataItem.Samples.SelectMany(t => t))}");
}
});
以上代码等待所有 observable 完成,创建所有接收项目的列表,最后使用简单的 LINQ 查询生成 FullData
。