使用 StackExchange.Redis 实现 RedLock 的问题
Problems with RedisLock implemetation by using StackExchange.Redis
我想使用 StackExchange.Redis 库实现 RedisLock。
关注这篇文章:
https://www.c-sharpcorner.com/article/creating-distributed-lock-with-redis-in-net-core/
- 我如何需要阻止 Redis 流。
- 我需要如何解锁Redis流。我真的需要使用脚本从流中删除对象,也许另一个程序想要处理当前元素?
运行脚本有问题,但是循环的每一遍程序都无法实现锁定,但下一遍能够访问锁定的流:
我的实现:
RedisLock的实现同文章
public async void ListenTask()
{
var handledResult = await db.StreamRangeAsync(streamName, "-", "+", 1, Order.Descending);
var lowestHandledId = handledResult.Last().Id;
var readTask = Task.Run(async () =>
{
while (!Token.IsCancellationRequested)
{
var result = await db.StreamRangeAsync(streamName, lowestHandledId, "+", 2);
var handleResult = result.Last();
if (result.Any() && lowestHandledId != handleResult.Id)
{
bool isLocked = RedisLock.AcquireLock(streamName, handleResult.Id.ToString(), expiry);
if (!isLocked)
{
//lock
lowestHandledId = handleResult.Id;
var streamCat = handleResult.Values;
Cat cat = ParseResult(streamCat);
switch (streamCat[0].Value.ToString())
{
case "insert":
Console.WriteLine($"Insert cat at id:{cat.Id} [{cat.Name} - {cat.CreatedDate}]");
cacheDictionary.Add(cat.Id, new WeakReference(cat));
break;
case "delete":
Console.WriteLine($"Deleted cat at id:{cat.Id}");
cacheDictionary.Remove(cat.Id);
break;
}
RedisLock.ReleaseLock(streamName, handleResult.Id.ToString());
}
}
await Task.Delay(2000);
}
});
}
我使用 LockTake/LockRelease 命令解决了 RedLock 实现的问题。
关注这篇文章:
Whosebug question
public async void ListenTask()
{
var handledResult = await db.StreamRangeAsync(streamName, "-", "+", 1, Order.Descending);
var lowestHandledId = handledResult.Last().Id;
RedisValue token = Environment.MachineName;
var readTask = Task.Run(async () =>
{
while (!Token.IsCancellationRequested)
{
var result = await db.StreamRangeAsync(streamName, lowestHandledId, "+", 2);
var handleResult = result.Last();
if (result.Any() && lowestHandledId != handleResult.Id)
{
if (!db.LockTake(streamName, token, expiry))
{
lowestHandledId = handleResult.Id;
var streamCat = handleResult.Values;
Cat cat = ParseResult(streamCat);
switch (streamCat[0].Value.ToString())
{
case "insert":
Console.WriteLine($"Insert cat at id:{cat.Id} [{cat.Name} - {cat.CreatedDate}]");
cacheDictionary.Add(cat.Id, new WeakReference(cat));
break;
case "delete":
Console.WriteLine($"Deleted cat at id:{cat.Id}");
cacheDictionary.Remove(cat.Id);
break;
}
db.LockRelease(streamName, token);
}
}
await Task.Delay(100);
}
});
}
我想使用 StackExchange.Redis 库实现 RedisLock。
关注这篇文章:
https://www.c-sharpcorner.com/article/creating-distributed-lock-with-redis-in-net-core/
- 我如何需要阻止 Redis 流。
- 我需要如何解锁Redis流。我真的需要使用脚本从流中删除对象,也许另一个程序想要处理当前元素?
运行脚本有问题,但是循环的每一遍程序都无法实现锁定,但下一遍能够访问锁定的流:
我的实现:
RedisLock的实现同文章
public async void ListenTask()
{
var handledResult = await db.StreamRangeAsync(streamName, "-", "+", 1, Order.Descending);
var lowestHandledId = handledResult.Last().Id;
var readTask = Task.Run(async () =>
{
while (!Token.IsCancellationRequested)
{
var result = await db.StreamRangeAsync(streamName, lowestHandledId, "+", 2);
var handleResult = result.Last();
if (result.Any() && lowestHandledId != handleResult.Id)
{
bool isLocked = RedisLock.AcquireLock(streamName, handleResult.Id.ToString(), expiry);
if (!isLocked)
{
//lock
lowestHandledId = handleResult.Id;
var streamCat = handleResult.Values;
Cat cat = ParseResult(streamCat);
switch (streamCat[0].Value.ToString())
{
case "insert":
Console.WriteLine($"Insert cat at id:{cat.Id} [{cat.Name} - {cat.CreatedDate}]");
cacheDictionary.Add(cat.Id, new WeakReference(cat));
break;
case "delete":
Console.WriteLine($"Deleted cat at id:{cat.Id}");
cacheDictionary.Remove(cat.Id);
break;
}
RedisLock.ReleaseLock(streamName, handleResult.Id.ToString());
}
}
await Task.Delay(2000);
}
});
}
我使用 LockTake/LockRelease 命令解决了 RedLock 实现的问题。
关注这篇文章: Whosebug question
public async void ListenTask()
{
var handledResult = await db.StreamRangeAsync(streamName, "-", "+", 1, Order.Descending);
var lowestHandledId = handledResult.Last().Id;
RedisValue token = Environment.MachineName;
var readTask = Task.Run(async () =>
{
while (!Token.IsCancellationRequested)
{
var result = await db.StreamRangeAsync(streamName, lowestHandledId, "+", 2);
var handleResult = result.Last();
if (result.Any() && lowestHandledId != handleResult.Id)
{
if (!db.LockTake(streamName, token, expiry))
{
lowestHandledId = handleResult.Id;
var streamCat = handleResult.Values;
Cat cat = ParseResult(streamCat);
switch (streamCat[0].Value.ToString())
{
case "insert":
Console.WriteLine($"Insert cat at id:{cat.Id} [{cat.Name} - {cat.CreatedDate}]");
cacheDictionary.Add(cat.Id, new WeakReference(cat));
break;
case "delete":
Console.WriteLine($"Deleted cat at id:{cat.Id}");
cacheDictionary.Remove(cat.Id);
break;
}
db.LockRelease(streamName, token);
}
}
await Task.Delay(100);
}
});
}