Apache 中的 IgniteQueue Ignite.NET
IgniteQueue in Apache Ignite.NET
我们正在使用 Ignite.NET 并且没有使用 Ignite Java API 的选项(团队技能、技术亲和力等)。我们正在寻求创建一种排队机制,以便我们可以以分布式方式处理消息。我发现 IgniteQueue 数据结构最合适,但它似乎在 ignite.net 中不可用,有人可以为该场景提出解决方案。多个生产者将一个唯一的工作项排队,一次只能由一个消费者可靠地处理。
例如有 P1、P2 生产者(在不同的机器上)他们在队列上生成 T1、T2、T3 我们有 C1、C2、C3 消费者(在不同的机器上)现在 T1 应该由来自 C1、C2、C3 和以此类推,对于 T2,T3 也应该类似地仅由 1 个消费者处理一次
IgniteQueue 建立在 Ignite Cache 之上,所以是的,您可以在 .NET 中复制相同的功能:
- 创建缓存
- 使用Continuous Query作为消费者,调用ICache.Remove保证每个item只处理一次
- 使用 Data Streamers 或仅使用
ICache.Put
/ PutAll
将数据添加到生产者的缓存中
下面是连续查询监听器的代码:
class CacheEventListener<TK, TV> : ICacheEntryEventListener<TK, TV>
{
private readonly string _cacheName;
[InstanceResource] // Injected automatically.
private readonly IIgnite _ignite = null;
private ICache<TK, TV> _cache;
public CacheEventListener(string cacheName)
{
_cacheName = cacheName;
}
public void OnEvent(IEnumerable<ICacheEntryEvent<TK, TV>> events)
{
_cache = _cache ?? _ignite.GetCache<TK, TV>(_cacheName);
foreach (var entryEvent in events)
{
if (entryEvent.EventType == CacheEntryEventType.Created && _cache.Remove(entryEvent.Key))
{
// Run consumer logic here - use another thread for heavy processing.
Consume(entryEvent.Value);
}
}
}
}
然后我们通过一次调用将其部署到每个节点:
var consumer = new CacheEventListener<Guid, string>(cache.Name);
var continuousQuery = new ContinuousQuery<Guid, string>(consumer);
cache.QueryContinuous(continuousQuery);
因此,OnEvent
在该条目的主节点上的每个条目被调用一次。所以每个 Ignite 节点有一个消费者。我们可以通过使用 BlockingCollection
等将实际消费者逻辑卸载到其他线程来增加每个节点的消费者有效数量。
最后一件事 - 我们必须为每个新条目提供一个唯一的缓存键。最简单的就是Guid.NewGuid()
,不过我们也可以用AtomicSequence
.
我们正在使用 Ignite.NET 并且没有使用 Ignite Java API 的选项(团队技能、技术亲和力等)。我们正在寻求创建一种排队机制,以便我们可以以分布式方式处理消息。我发现 IgniteQueue 数据结构最合适,但它似乎在 ignite.net 中不可用,有人可以为该场景提出解决方案。多个生产者将一个唯一的工作项排队,一次只能由一个消费者可靠地处理。
例如有 P1、P2 生产者(在不同的机器上)他们在队列上生成 T1、T2、T3 我们有 C1、C2、C3 消费者(在不同的机器上)现在 T1 应该由来自 C1、C2、C3 和以此类推,对于 T2,T3 也应该类似地仅由 1 个消费者处理一次
IgniteQueue 建立在 Ignite Cache 之上,所以是的,您可以在 .NET 中复制相同的功能:
- 创建缓存
- 使用Continuous Query作为消费者,调用ICache.Remove保证每个item只处理一次
- 使用 Data Streamers 或仅使用
ICache.Put
/PutAll
将数据添加到生产者的缓存中
下面是连续查询监听器的代码:
class CacheEventListener<TK, TV> : ICacheEntryEventListener<TK, TV>
{
private readonly string _cacheName;
[InstanceResource] // Injected automatically.
private readonly IIgnite _ignite = null;
private ICache<TK, TV> _cache;
public CacheEventListener(string cacheName)
{
_cacheName = cacheName;
}
public void OnEvent(IEnumerable<ICacheEntryEvent<TK, TV>> events)
{
_cache = _cache ?? _ignite.GetCache<TK, TV>(_cacheName);
foreach (var entryEvent in events)
{
if (entryEvent.EventType == CacheEntryEventType.Created && _cache.Remove(entryEvent.Key))
{
// Run consumer logic here - use another thread for heavy processing.
Consume(entryEvent.Value);
}
}
}
}
然后我们通过一次调用将其部署到每个节点:
var consumer = new CacheEventListener<Guid, string>(cache.Name);
var continuousQuery = new ContinuousQuery<Guid, string>(consumer);
cache.QueryContinuous(continuousQuery);
因此,OnEvent
在该条目的主节点上的每个条目被调用一次。所以每个 Ignite 节点有一个消费者。我们可以通过使用 BlockingCollection
等将实际消费者逻辑卸载到其他线程来增加每个节点的消费者有效数量。
最后一件事 - 我们必须为每个新条目提供一个唯一的缓存键。最简单的就是Guid.NewGuid()
,不过我们也可以用AtomicSequence
.