如何根据 属性 压缩 2 个序列(zip,join)
How to zip 2 sequences based on property (zip, join)
我想基于一个共同的 属性 压缩 2 个序列的项目,类似于在使用枚举时加入它们。我怎样才能通过第二次测试?
using NUnit.Framework;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Threading.Tasks;
public class SequenceTests
{
private class Entry
{
public Entry(DateTime timestamp, string value)
{
Timestamp = timestamp;
Value = value;
}
public DateTime Timestamp { get; }
public string Value { get; }
}
private readonly IEnumerable<Entry> Tasks = new List<Entry>
{
new Entry(new DateTime(2021, 6, 6), "Do homework"),
new Entry(new DateTime(2021, 6, 7), "Buy groceries"), // <-- This date is also in the People collection!
new Entry(new DateTime(2021, 6, 8), "Walk the dog"),
};
private readonly IEnumerable<Entry> People = new List<Entry>
{
new Entry(new DateTime(2021, 6, 4), "Peter"),
new Entry(new DateTime(2021, 6, 5), "Jane"),
new Entry(new DateTime(2021, 6, 7), "Paul"), // <-- This date is also in the Tasks collection!
new Entry(new DateTime(2021, 6, 9), "Mary"),
};
private class Assignment
{
public string Task { get; set; }
public string Person { get; set; }
}
[Test]
public void Join_two_collections_should_succeed()
{
var assignments = Tasks
.Join(People,
task => task.Timestamp,
person => person.Timestamp,
(task, person) => new Assignment { Task = task.Value, Person = person.Value });
Assert.AreEqual(1, assignments.Count());
Assert.AreEqual("Buy groceries", assignments.First().Task);
Assert.AreEqual("Paul", assignments.First().Person);
}
[Test]
public async Task Zip_two_sequences_should_succeed()
{
var tasks = Observable.ToObservable(Tasks);
var people = Observable.ToObservable(People);
var sequence = tasks
.Zip(people)
.Select(pair => new Assignment { Task = pair.First.Value, Person = pair.Second.Value });
var assignments = await sequence.ToList();
Assert.AreEqual(1, assignments.Count);
Assert.AreEqual("Buy groceries", assignments.First().Task);
Assert.AreEqual("Paul", assignments.First().Person);
}
}
observable Zip
运算符的工作方式与可枚举版本相同。你没有在第一次测试中使用它,所以它不像你在这里需要的操作员。
您需要的只是 SelectMany
运算符。
试试这个查询:
var sequence =
from t in tasks
from p in people
where t.Timestamp == p.Timestamp
select new Assignment { Task = t.Value, Person = p.Value };
这适用于您的测试。
这里有一个自定义的 Join
运算符,可以用来解决这个问题。它基于 Merge
、GroupByUntil
和 SelectMany
运算符:
/// <summary>
/// Correlates the elements of two sequences based on matching keys. Results are
/// produced for all combinations of correlated elements that have an overlapping
/// duration.
/// </summary>
public static IObservable<TResult> Join<TLeft, TRight, TKey, TResult>(
this IObservable<TLeft> left,
IObservable<TRight> right,
Func<TLeft, TKey> leftKeySelector,
Func<TRight, TKey> rightKeySelector,
Func<TLeft, TRight, TResult> resultSelector,
TimeSpan? keyDuration = null,
IEqualityComparer<TKey> keyComparer = null)
{
// Arguments validation omitted
keyComparer ??= EqualityComparer<TKey>.Default;
var groupDuration = keyDuration.HasValue ?
Observable.Timer(keyDuration.Value) : Observable.Never<long>();
return left
.Select(x => (x, (TRight)default, Type: 1, Key: leftKeySelector(x)))
.Merge(right.Select(x => ((TLeft)default, x, Type: 2, Key: rightKeySelector(x))))
.GroupByUntil(e => e.Key, _ => groupDuration, keyComparer)
.Select(g => (
g.Where(e => e.Type == 1).Select(e => e.Item1),
g.Where(e => e.Type == 2).Select(e => e.Item2).Replay().AutoConnect(0)
))
.SelectMany(g => g.Item1.SelectMany(_ => g.Item2, resultSelector));
}
用法示例:
IObservable<Assignment> sequence = tasks
.Join(people, t => t.Timestamp, p => p.Timestamp,
(t, p) => new Assignment { Task = t.Value, Person = p.Value });
需要注意的是,如果不缓冲两个源序列产生的所有元素,则无法保证 100% 正确地解决此问题。显然,如果序列包含无限元素,这将无法很好地扩展。
如果为了可伸缩性而牺牲绝对正确性是可以接受的,可选的 keyDuration
参数可用于配置存储密钥(及其关联元素)可在内存中保留的最长持续时间。如果 left
或 right
序列产生具有此键的新元素,则过期键可能会重生。
以上实现对于包含大量元素的序列表现得相当好。加入两个相同大小的序列,每个序列都有 100,000 个元素,在我的 PC 上需要大约 8 秒。
我不喜欢发布的任何一个答案。它们都是同一主题的变体:将两个序列的所有成员无限期地保存在内存中,并在新的左元素进入时迭代整个右序列,并在新的右元素进入时增量检查左键。两个答案你 O(L + R)
内存无限期并且是 O(R * L)
时间复杂度(其中 L 和 R 是左右序列的大小)。
如果我们正在处理集合(或枚举),那将是一个足够的答案。但我们不是:我们正在处理可观察对象,答案应该承认这一点。实际用例之间可能会有很大的时间间隔。这个问题是作为一个可枚举的测试用例提出的。如果它只是一个可枚举的,正确的答案是转换回 Enumerable 并使用 Linq 的 Join
。如果 运行 过程可能存在时间间隔,答案应该承认您可能只想加入在某个时间段内发生的元素,释放过程中的内存。
这满足测试答案,同时允许一个时间框:
var sequence = tasks.Join(people,
_ => Observable.Timer(TimeSpan.FromSeconds(.5)),
_ => Observable.Timer(TimeSpan.FromSeconds(.5)),
(t, p) => (task: t, person: p)
)
.Where(t => t.person.Timestamp == t.task.Timestamp)
.Select(t => new Assignment { Task = t.task.Value, Person = t.person.Value });
这会为每个 0.5 秒的元素创建一个 window,这意味着如果左元素和右元素在彼此相隔 0.5 秒内弹出,则它们将匹配。 0.5 秒后,每个元素都从内存中释放。如果出于某种原因,您不想从内存中释放并将所有对象无限期地保留在内存中,这就足够了:
var sequence = tasks.Join(people,
_ => Observable.Never<Unit>(),
_ => Observable.Never<Unit>(),
(t, p) => (task: t, person: p)
)
.Where(t => t.person.Timestamp == t.task.Timestamp)
.Select(t => new Assignment { Task = t.task.Value, Person = t.person.Value });
我想基于一个共同的 属性 压缩 2 个序列的项目,类似于在使用枚举时加入它们。我怎样才能通过第二次测试?
using NUnit.Framework;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Threading.Tasks;
public class SequenceTests
{
private class Entry
{
public Entry(DateTime timestamp, string value)
{
Timestamp = timestamp;
Value = value;
}
public DateTime Timestamp { get; }
public string Value { get; }
}
private readonly IEnumerable<Entry> Tasks = new List<Entry>
{
new Entry(new DateTime(2021, 6, 6), "Do homework"),
new Entry(new DateTime(2021, 6, 7), "Buy groceries"), // <-- This date is also in the People collection!
new Entry(new DateTime(2021, 6, 8), "Walk the dog"),
};
private readonly IEnumerable<Entry> People = new List<Entry>
{
new Entry(new DateTime(2021, 6, 4), "Peter"),
new Entry(new DateTime(2021, 6, 5), "Jane"),
new Entry(new DateTime(2021, 6, 7), "Paul"), // <-- This date is also in the Tasks collection!
new Entry(new DateTime(2021, 6, 9), "Mary"),
};
private class Assignment
{
public string Task { get; set; }
public string Person { get; set; }
}
[Test]
public void Join_two_collections_should_succeed()
{
var assignments = Tasks
.Join(People,
task => task.Timestamp,
person => person.Timestamp,
(task, person) => new Assignment { Task = task.Value, Person = person.Value });
Assert.AreEqual(1, assignments.Count());
Assert.AreEqual("Buy groceries", assignments.First().Task);
Assert.AreEqual("Paul", assignments.First().Person);
}
[Test]
public async Task Zip_two_sequences_should_succeed()
{
var tasks = Observable.ToObservable(Tasks);
var people = Observable.ToObservable(People);
var sequence = tasks
.Zip(people)
.Select(pair => new Assignment { Task = pair.First.Value, Person = pair.Second.Value });
var assignments = await sequence.ToList();
Assert.AreEqual(1, assignments.Count);
Assert.AreEqual("Buy groceries", assignments.First().Task);
Assert.AreEqual("Paul", assignments.First().Person);
}
}
observable Zip
运算符的工作方式与可枚举版本相同。你没有在第一次测试中使用它,所以它不像你在这里需要的操作员。
您需要的只是 SelectMany
运算符。
试试这个查询:
var sequence =
from t in tasks
from p in people
where t.Timestamp == p.Timestamp
select new Assignment { Task = t.Value, Person = p.Value };
这适用于您的测试。
这里有一个自定义的 Join
运算符,可以用来解决这个问题。它基于 Merge
、GroupByUntil
和 SelectMany
运算符:
/// <summary>
/// Correlates the elements of two sequences based on matching keys. Results are
/// produced for all combinations of correlated elements that have an overlapping
/// duration.
/// </summary>
public static IObservable<TResult> Join<TLeft, TRight, TKey, TResult>(
this IObservable<TLeft> left,
IObservable<TRight> right,
Func<TLeft, TKey> leftKeySelector,
Func<TRight, TKey> rightKeySelector,
Func<TLeft, TRight, TResult> resultSelector,
TimeSpan? keyDuration = null,
IEqualityComparer<TKey> keyComparer = null)
{
// Arguments validation omitted
keyComparer ??= EqualityComparer<TKey>.Default;
var groupDuration = keyDuration.HasValue ?
Observable.Timer(keyDuration.Value) : Observable.Never<long>();
return left
.Select(x => (x, (TRight)default, Type: 1, Key: leftKeySelector(x)))
.Merge(right.Select(x => ((TLeft)default, x, Type: 2, Key: rightKeySelector(x))))
.GroupByUntil(e => e.Key, _ => groupDuration, keyComparer)
.Select(g => (
g.Where(e => e.Type == 1).Select(e => e.Item1),
g.Where(e => e.Type == 2).Select(e => e.Item2).Replay().AutoConnect(0)
))
.SelectMany(g => g.Item1.SelectMany(_ => g.Item2, resultSelector));
}
用法示例:
IObservable<Assignment> sequence = tasks
.Join(people, t => t.Timestamp, p => p.Timestamp,
(t, p) => new Assignment { Task = t.Value, Person = p.Value });
需要注意的是,如果不缓冲两个源序列产生的所有元素,则无法保证 100% 正确地解决此问题。显然,如果序列包含无限元素,这将无法很好地扩展。
如果为了可伸缩性而牺牲绝对正确性是可以接受的,可选的 keyDuration
参数可用于配置存储密钥(及其关联元素)可在内存中保留的最长持续时间。如果 left
或 right
序列产生具有此键的新元素,则过期键可能会重生。
以上实现对于包含大量元素的序列表现得相当好。加入两个相同大小的序列,每个序列都有 100,000 个元素,在我的 PC 上需要大约 8 秒。
我不喜欢发布的任何一个答案。它们都是同一主题的变体:将两个序列的所有成员无限期地保存在内存中,并在新的左元素进入时迭代整个右序列,并在新的右元素进入时增量检查左键。两个答案你 O(L + R)
内存无限期并且是 O(R * L)
时间复杂度(其中 L 和 R 是左右序列的大小)。
如果我们正在处理集合(或枚举),那将是一个足够的答案。但我们不是:我们正在处理可观察对象,答案应该承认这一点。实际用例之间可能会有很大的时间间隔。这个问题是作为一个可枚举的测试用例提出的。如果它只是一个可枚举的,正确的答案是转换回 Enumerable 并使用 Linq 的 Join
。如果 运行 过程可能存在时间间隔,答案应该承认您可能只想加入在某个时间段内发生的元素,释放过程中的内存。
这满足测试答案,同时允许一个时间框:
var sequence = tasks.Join(people,
_ => Observable.Timer(TimeSpan.FromSeconds(.5)),
_ => Observable.Timer(TimeSpan.FromSeconds(.5)),
(t, p) => (task: t, person: p)
)
.Where(t => t.person.Timestamp == t.task.Timestamp)
.Select(t => new Assignment { Task = t.task.Value, Person = t.person.Value });
这会为每个 0.5 秒的元素创建一个 window,这意味着如果左元素和右元素在彼此相隔 0.5 秒内弹出,则它们将匹配。 0.5 秒后,每个元素都从内存中释放。如果出于某种原因,您不想从内存中释放并将所有对象无限期地保留在内存中,这就足够了:
var sequence = tasks.Join(people,
_ => Observable.Never<Unit>(),
_ => Observable.Never<Unit>(),
(t, p) => (task: t, person: p)
)
.Where(t => t.person.Timestamp == t.task.Timestamp)
.Select(t => new Assignment { Task = t.task.Value, Person = t.person.Value });