协议错误,"no more data" 错误,"Zero length response" 错误,同时在高容量场景中使用 servicestack.redis
Protocol errors, "no more data" errors, "Zero length response" errors while using servicestack.redis in a high volume scenario
如果有人告诉我 PooledRedisClientManager 在高容量场景下是否存在问题,真的会有帮助吗?
我正在使用一个单例客户端管理器,它在一分钟内被多个 WCF 线程调用 GetClient() 1000 次,并且每个线程都可以 read/update/insert 进入 Redis 集合(我正在使用 redis 哈希集合)。
我间歇性地看到这些错误并且通常在重试时消失。
所有 GetClient() 调用都在 Using 语句中。
谢谢
Rb
以下是我从日志中看到的错误:
错误 1
ServiceStack.Redis.RedisResponseException: Unknown reply on integer response: 123"Key":"7c3699524bcc457ab377ad1af17eb046","Value":"9527cb78-2e32-4695-ad33-7991f92eb3a2"}, sPort: 64493, LastCommand: HEXISTS urn:xxxxxxxxxxxxxxxx "118fdc26117244819eb712a82b8e86fd"
at ServiceStack.Redis.RedisNativeClient.CreateResponseError(String error)
at ServiceStack.Redis.RedisNativeClient.ReadLong()
at ServiceStack.Redis.RedisClient.HashContainsEntry(String hashId, String key)
at ServiceStack.Redis.Generic.RedisTypedClient`1.HashContainsEntry[TKey](IRedisHash`2 hash, TKey key)
at ServiceStack.Redis.Generic.RedisClientHash`2.ContainsKey(TKey key)
Error 2
ServiceStack.Redis.RedisResponseException: No more data, sPort: 65005, LastCommand: HSET urn:xxxxxxxxxxxxxxxxx "9ced6120a876405faccf5cb043e70807" {"ID":"9ced6120a87...
at ServiceStack.Redis.RedisNativeClient.CreateResponseError(String error)
at ServiceStack.Redis.RedisNativeClient.ReadLong()
at ServiceStack.Redis.RedisClient.SetEntryInHash(String hashId, String key, String value)
at ServiceStack.Redis.Generic.RedisTypedClient`1.SetEntryInHash[TKey](IRedisHash`2 hash, TKey key, T value)
at ServiceStack.Redis.Generic.RedisClientHash`2.set_Item(TKey key, T value)
错误 3
ServiceStack.Redis.RedisResponseException: Protocol error: expected '$', got ' ', sPort: 64993, LastCommand: HGET urn:xxxxxxxxxxxxxxxxxxxx "705befa18af74f61aafff50b4282de19"
at ServiceStack.Redis.RedisNativeClient.CreateResponseError(String error)
at ServiceStack.Redis.RedisNativeClient.ParseSingleLine(String r)
at ServiceStack.Redis.Generic.RedisTypedClient`1.GetValueFromHash[TKey](IRedisHash`2 hash, TKey key)
at ServiceStack.Redis.Generic.RedisClientHash`2.get_Item(TKey key)
Error 4
ServiceStack.Redis.RedisResponseException: Protocol error: invalid multibulk length, sPort: 65154, LastCommand: HSET urn:xxxxxxxxxxxxxx "39a5023eee374b28acbe5f63561c6211" {"ID":"39a5023eee3...
at ServiceStack.Redis.RedisNativeClient.CreateResponseError(String error)
at ServiceStack.Redis.RedisNativeClient.ReadLong()
at ServiceStack.Redis.RedisClient.SetEntryInHash(String hashId, String key, String value)
at ServiceStack.Redis.Generic.RedisTypedClient`1.SetEntryInHash[TKey](IRedisHash`2 hash, TKey key, T value)
at ServiceStack.Redis.Generic.RedisClientHash`2.set_Item(TKey key, T value)
代码:
基本上我围绕 RedisHash 创建了一个包装器 RedisCacheCollection...这是为了支持使用 .net 列表和字典的现有代码。
public class RedisCachedCollection<TKey, TValue> : CacheCollectionBase<TKey, TValue>, IEnumerable<TValue>
{
private string _collectionKey;
private string _collectionLock;
private IRedisTypedClient<TValue> _redisTypedClient = null;
private int _locktimeout;
private Func<TValue, TKey> _idAction;
public RedisCachedCollection(string collectionKey, int locktimeoutsecs = 5)
{
_collectionKey = string.Format("urn:{0}:{1}", "XXXXX", collectionKey);
_collectionLock = string.Format("{0}+lock", _collectionKey);
_locktimeout = locktimeoutsecs;
}
private IRedisHash<TKey, TValue> GetCollection(IRedisClient redis)
{
_redisTypedClient = redis.As<TValue>();
return _redisTypedClient.GetHash<TKey>(_collectionKey);
}
public override void Add(TValue obj)
{
TKey Id = GetUniqueIdAction(obj);
RetryAction((redis) =>
{
GetCollection(redis).Add(Id, obj);
});
}
public override bool Remove(TValue obj)
{
TKey Id = GetUniqueIdAction(obj);
TKey defaultv = default(TKey);
return RetryAction<bool>((redis) =>
{
if (!Id.Equals(defaultv))
{
{
return GetCollection(redis).Remove(Id);
}
}
return false;
});
}
public override TValue this[TKey id]
{
get
{
return RetryAction<TValue>((redis) =>
{
if (GetCollection(redis).ContainsKey(id))
return GetCollection(redis)[id];
return default(TValue);
});
}
set
{
RetryAction((redis) =>
{
GetCollection(redis)[id] = value;
});
}
}
public override int Count
{
get
{
return RetryAction<int>((redis) =>
{
return GetCollection(redis).Count;
});
}
}
public IEnumerable<TValue> Where(Func<TValue, bool> predicate)
{
return RetryAction<IEnumerable<TValue>>((redis) =>
{
return GetCollection(redis).Values.Where(predicate);
});
}
public bool Any(Func<TValue, bool> predicate)
{
return RetryAction<bool>((redis) =>
{
return GetCollection(redis).Values.Any(predicate);
});
}
public override IEnumerator<TValue> GetEnumerator()
{
return RetryAction<IEnumerator<TValue>>((redis) =>
{
return GetCollection(redis).Values.GetEnumerator();
});
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return RetryAction<System.Collections.IEnumerator>((redis) =>
{
return ((System.Collections.IEnumerable)GetCollection(redis).Values).GetEnumerator();
});
}
public override void Clear()
{
RetryAction((redis) =>
{
GetCollection(redis).Clear();
});
}
public override bool Contains(TValue obj)
{
TKey Id = GetUniqueIdAction(obj);
return RetryAction<bool>((redis) =>
{
return GetCollection(redis).ContainsKey(Id);
});
}
public override bool ContainsKey(TKey obj)
{
return RetryAction<bool>((redis) =>
{
return GetCollection(redis).ContainsKey(obj);
});
}
public override void CopyTo(TValue[] array, int arrayIndex)
{
RetryAction((redis) =>
{
GetCollection(redis).Values.CopyTo(array, arrayIndex);
});
}
public override bool IsReadOnly
{
get
{
return RetryAction<bool>((redis) =>
{
return GetCollection(redis).IsReadOnly;
});
}
}
public override Func<TValue, TKey> GetUniqueIdAction
{
get
{
return _idAction;
}
set
{
_idAction = value;
}
}
private object _synclock = new object();
public override IDisposable Lock
{
get
{
lock (_synclock)
{
try
{
return new CacheTransaction(_collectionLock, _locktimeout);
}
catch (Exception ex)
{
Console.WriteLine(ex);
throw;
}
}
}
}
private Dictionary<int, IRedisClient> _redisconnectionpool = new Dictionary<int, IRedisClient>();
public IRedisClient RedisConnection
{
get
{
return RedisClientManager.Instance.GetClient();
}
}
private void RetryAction(Action<IRedisClient> action)
{
int i = 0;
while (true)
{
try
{
using (var redis = RedisConnection)
{
action(redis);
return;
}
}
catch (Exception ex)
{
if (i++ < 3)
{
continue;
}
throw;
}
}
}
private TOut RetryAction<TOut>(Func<IRedisClient, TOut> action)
{
int i = 0;
while (true)
{
try
{
using (var redis = RedisConnection)
{
TOut result = action(redis);
return result;
}
}
catch (Exception ex)
{
if (i++ < 3)
{
continue;
}
throw;
}
}
}
}
}
我在上面added a stress test with your HashCollection code,使用了尽可能多的我可以编译的东西,并同时得到运行 API 调用(在上面的 StackTrace 中显示)在 64 个线程中:
clientsManager = new PooledRedisClientManager(ipAddress);
redisCollection = new RedisCachedCollection<string, string>(
clientsManager, "Thread: " + Thread.CurrentThread.ManagedThreadId);
var StartedAt = DateTime.UtcNow;
Interlocked.Increment(ref running);
"Starting HashCollectionStressTests with {0} threads".Print(noOfThreads);
var threads = new List<Thread>();
for (int i = 0; i < noOfThreads; i++)
{
threads.Add(new Thread(WorkerLoop));
}
threads.ForEach(t => t.Start());
"Press Enter to Stop...".Print();
Console.ReadLine();
Interlocked.Decrement(ref running);
"Writes: {0}, Reads: {1}".Print(writeCount, readCount);
"{0} EndedAt: {1}".Print(GetType().Name, DateTime.UtcNow.ToLongTimeString());
"{0} TimeTaken: {1}s".Print(GetType().Name,(DateTime.UtcNow-StartedAt).TotalSeconds);
这是 WorkerLoop:
public void WorkerLoop()
{
while (Interlocked.CompareExchange(ref running, 0, 0) > 0)
{
redisCollection.ContainsKey("key");
Interlocked.Increment(ref readCount);
redisCollection["key"] = "value " + readCount;
Interlocked.Increment(ref writeCount);
var value = redisCollection["key"];
Interlocked.Increment(ref readCount);
if (value == null)
Console.WriteLine("value == null");
}
}
我还修改了您的 RetryAction API 以立即记录并 throw
这样我就可以检测到抛出的第一个异常:
private void RetryAction(Action<IRedisClient> action)
{
try
{
using (var redis = RedisConnection)
{
action(redis);
return;
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
throw;
}
}
我已经 运行 针对本地和网络 redis-server 实例进行了压力测试,但尚未看到异常。上次让它 运行 将近 40 分钟后得到了这个回复:
Starting HashCollectionStressTests with 64 threads
Press Enter to Stop...
Writes: 876755, Reads: 1753518
HashCollectionStressTests EndedAt: 2:10:01 AM
HashCollectionStressTests TimeTaken: 2292.985048s
基本上表明它同时执行了 2.6M+ 哈希集合 API,没有任何异常。
不幸的是,我无法确定您 运行 遇到了什么问题,但无法重现它。我确实发现 st运行ge 你在 _redisTypedClient
周围保留了一个非线程安全的实例引用:
private IRedisTypedClient<TValue> _redisTypedClient = null;
此处填充:
private IRedisHash<TKey, TValue> GetCollection(IRedisClient redis)
{
_redisTypedClient = redis.As<TValue>();
return _redisTypedClient.GetHash<TKey>(_collectionKey);
}
这不是必需的,因为这可能是一个局部变量。由于提供的代码不完整(即未编译),我不确定此实例是否被其他 API 多线程中调用的调用使用?
如果您可以将显示问题的重现放在一起,这将有助于确定问题。独立示例也有助于了解代码的使用方式。
如果有人告诉我 PooledRedisClientManager 在高容量场景下是否存在问题,真的会有帮助吗?
我正在使用一个单例客户端管理器,它在一分钟内被多个 WCF 线程调用 GetClient() 1000 次,并且每个线程都可以 read/update/insert 进入 Redis 集合(我正在使用 redis 哈希集合)。
我间歇性地看到这些错误并且通常在重试时消失。
所有 GetClient() 调用都在 Using 语句中。
谢谢 Rb
以下是我从日志中看到的错误: 错误 1
ServiceStack.Redis.RedisResponseException: Unknown reply on integer response: 123"Key":"7c3699524bcc457ab377ad1af17eb046","Value":"9527cb78-2e32-4695-ad33-7991f92eb3a2"}, sPort: 64493, LastCommand: HEXISTS urn:xxxxxxxxxxxxxxxx "118fdc26117244819eb712a82b8e86fd"
at ServiceStack.Redis.RedisNativeClient.CreateResponseError(String error)
at ServiceStack.Redis.RedisNativeClient.ReadLong()
at ServiceStack.Redis.RedisClient.HashContainsEntry(String hashId, String key)
at ServiceStack.Redis.Generic.RedisTypedClient`1.HashContainsEntry[TKey](IRedisHash`2 hash, TKey key)
at ServiceStack.Redis.Generic.RedisClientHash`2.ContainsKey(TKey key)
Error 2
ServiceStack.Redis.RedisResponseException: No more data, sPort: 65005, LastCommand: HSET urn:xxxxxxxxxxxxxxxxx "9ced6120a876405faccf5cb043e70807" {"ID":"9ced6120a87...
at ServiceStack.Redis.RedisNativeClient.CreateResponseError(String error)
at ServiceStack.Redis.RedisNativeClient.ReadLong()
at ServiceStack.Redis.RedisClient.SetEntryInHash(String hashId, String key, String value)
at ServiceStack.Redis.Generic.RedisTypedClient`1.SetEntryInHash[TKey](IRedisHash`2 hash, TKey key, T value)
at ServiceStack.Redis.Generic.RedisClientHash`2.set_Item(TKey key, T value)
错误 3
ServiceStack.Redis.RedisResponseException: Protocol error: expected '$', got ' ', sPort: 64993, LastCommand: HGET urn:xxxxxxxxxxxxxxxxxxxx "705befa18af74f61aafff50b4282de19"
at ServiceStack.Redis.RedisNativeClient.CreateResponseError(String error)
at ServiceStack.Redis.RedisNativeClient.ParseSingleLine(String r)
at ServiceStack.Redis.Generic.RedisTypedClient`1.GetValueFromHash[TKey](IRedisHash`2 hash, TKey key)
at ServiceStack.Redis.Generic.RedisClientHash`2.get_Item(TKey key)
Error 4
ServiceStack.Redis.RedisResponseException: Protocol error: invalid multibulk length, sPort: 65154, LastCommand: HSET urn:xxxxxxxxxxxxxx "39a5023eee374b28acbe5f63561c6211" {"ID":"39a5023eee3...
at ServiceStack.Redis.RedisNativeClient.CreateResponseError(String error)
at ServiceStack.Redis.RedisNativeClient.ReadLong()
at ServiceStack.Redis.RedisClient.SetEntryInHash(String hashId, String key, String value)
at ServiceStack.Redis.Generic.RedisTypedClient`1.SetEntryInHash[TKey](IRedisHash`2 hash, TKey key, T value)
at ServiceStack.Redis.Generic.RedisClientHash`2.set_Item(TKey key, T value)
代码:
基本上我围绕 RedisHash 创建了一个包装器 RedisCacheCollection...这是为了支持使用 .net 列表和字典的现有代码。
public class RedisCachedCollection<TKey, TValue> : CacheCollectionBase<TKey, TValue>, IEnumerable<TValue>
{
private string _collectionKey;
private string _collectionLock;
private IRedisTypedClient<TValue> _redisTypedClient = null;
private int _locktimeout;
private Func<TValue, TKey> _idAction;
public RedisCachedCollection(string collectionKey, int locktimeoutsecs = 5)
{
_collectionKey = string.Format("urn:{0}:{1}", "XXXXX", collectionKey);
_collectionLock = string.Format("{0}+lock", _collectionKey);
_locktimeout = locktimeoutsecs;
}
private IRedisHash<TKey, TValue> GetCollection(IRedisClient redis)
{
_redisTypedClient = redis.As<TValue>();
return _redisTypedClient.GetHash<TKey>(_collectionKey);
}
public override void Add(TValue obj)
{
TKey Id = GetUniqueIdAction(obj);
RetryAction((redis) =>
{
GetCollection(redis).Add(Id, obj);
});
}
public override bool Remove(TValue obj)
{
TKey Id = GetUniqueIdAction(obj);
TKey defaultv = default(TKey);
return RetryAction<bool>((redis) =>
{
if (!Id.Equals(defaultv))
{
{
return GetCollection(redis).Remove(Id);
}
}
return false;
});
}
public override TValue this[TKey id]
{
get
{
return RetryAction<TValue>((redis) =>
{
if (GetCollection(redis).ContainsKey(id))
return GetCollection(redis)[id];
return default(TValue);
});
}
set
{
RetryAction((redis) =>
{
GetCollection(redis)[id] = value;
});
}
}
public override int Count
{
get
{
return RetryAction<int>((redis) =>
{
return GetCollection(redis).Count;
});
}
}
public IEnumerable<TValue> Where(Func<TValue, bool> predicate)
{
return RetryAction<IEnumerable<TValue>>((redis) =>
{
return GetCollection(redis).Values.Where(predicate);
});
}
public bool Any(Func<TValue, bool> predicate)
{
return RetryAction<bool>((redis) =>
{
return GetCollection(redis).Values.Any(predicate);
});
}
public override IEnumerator<TValue> GetEnumerator()
{
return RetryAction<IEnumerator<TValue>>((redis) =>
{
return GetCollection(redis).Values.GetEnumerator();
});
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return RetryAction<System.Collections.IEnumerator>((redis) =>
{
return ((System.Collections.IEnumerable)GetCollection(redis).Values).GetEnumerator();
});
}
public override void Clear()
{
RetryAction((redis) =>
{
GetCollection(redis).Clear();
});
}
public override bool Contains(TValue obj)
{
TKey Id = GetUniqueIdAction(obj);
return RetryAction<bool>((redis) =>
{
return GetCollection(redis).ContainsKey(Id);
});
}
public override bool ContainsKey(TKey obj)
{
return RetryAction<bool>((redis) =>
{
return GetCollection(redis).ContainsKey(obj);
});
}
public override void CopyTo(TValue[] array, int arrayIndex)
{
RetryAction((redis) =>
{
GetCollection(redis).Values.CopyTo(array, arrayIndex);
});
}
public override bool IsReadOnly
{
get
{
return RetryAction<bool>((redis) =>
{
return GetCollection(redis).IsReadOnly;
});
}
}
public override Func<TValue, TKey> GetUniqueIdAction
{
get
{
return _idAction;
}
set
{
_idAction = value;
}
}
private object _synclock = new object();
public override IDisposable Lock
{
get
{
lock (_synclock)
{
try
{
return new CacheTransaction(_collectionLock, _locktimeout);
}
catch (Exception ex)
{
Console.WriteLine(ex);
throw;
}
}
}
}
private Dictionary<int, IRedisClient> _redisconnectionpool = new Dictionary<int, IRedisClient>();
public IRedisClient RedisConnection
{
get
{
return RedisClientManager.Instance.GetClient();
}
}
private void RetryAction(Action<IRedisClient> action)
{
int i = 0;
while (true)
{
try
{
using (var redis = RedisConnection)
{
action(redis);
return;
}
}
catch (Exception ex)
{
if (i++ < 3)
{
continue;
}
throw;
}
}
}
private TOut RetryAction<TOut>(Func<IRedisClient, TOut> action)
{
int i = 0;
while (true)
{
try
{
using (var redis = RedisConnection)
{
TOut result = action(redis);
return result;
}
}
catch (Exception ex)
{
if (i++ < 3)
{
continue;
}
throw;
}
}
}
}
}
我在上面added a stress test with your HashCollection code,使用了尽可能多的我可以编译的东西,并同时得到运行 API 调用(在上面的 StackTrace 中显示)在 64 个线程中:
clientsManager = new PooledRedisClientManager(ipAddress);
redisCollection = new RedisCachedCollection<string, string>(
clientsManager, "Thread: " + Thread.CurrentThread.ManagedThreadId);
var StartedAt = DateTime.UtcNow;
Interlocked.Increment(ref running);
"Starting HashCollectionStressTests with {0} threads".Print(noOfThreads);
var threads = new List<Thread>();
for (int i = 0; i < noOfThreads; i++)
{
threads.Add(new Thread(WorkerLoop));
}
threads.ForEach(t => t.Start());
"Press Enter to Stop...".Print();
Console.ReadLine();
Interlocked.Decrement(ref running);
"Writes: {0}, Reads: {1}".Print(writeCount, readCount);
"{0} EndedAt: {1}".Print(GetType().Name, DateTime.UtcNow.ToLongTimeString());
"{0} TimeTaken: {1}s".Print(GetType().Name,(DateTime.UtcNow-StartedAt).TotalSeconds);
这是 WorkerLoop:
public void WorkerLoop()
{
while (Interlocked.CompareExchange(ref running, 0, 0) > 0)
{
redisCollection.ContainsKey("key");
Interlocked.Increment(ref readCount);
redisCollection["key"] = "value " + readCount;
Interlocked.Increment(ref writeCount);
var value = redisCollection["key"];
Interlocked.Increment(ref readCount);
if (value == null)
Console.WriteLine("value == null");
}
}
我还修改了您的 RetryAction API 以立即记录并 throw
这样我就可以检测到抛出的第一个异常:
private void RetryAction(Action<IRedisClient> action)
{
try
{
using (var redis = RedisConnection)
{
action(redis);
return;
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
throw;
}
}
我已经 运行 针对本地和网络 redis-server 实例进行了压力测试,但尚未看到异常。上次让它 运行 将近 40 分钟后得到了这个回复:
Starting HashCollectionStressTests with 64 threads
Press Enter to Stop...
Writes: 876755, Reads: 1753518
HashCollectionStressTests EndedAt: 2:10:01 AM
HashCollectionStressTests TimeTaken: 2292.985048s
基本上表明它同时执行了 2.6M+ 哈希集合 API,没有任何异常。
不幸的是,我无法确定您 运行 遇到了什么问题,但无法重现它。我确实发现 st运行ge 你在 _redisTypedClient
周围保留了一个非线程安全的实例引用:
private IRedisTypedClient<TValue> _redisTypedClient = null;
此处填充:
private IRedisHash<TKey, TValue> GetCollection(IRedisClient redis)
{
_redisTypedClient = redis.As<TValue>();
return _redisTypedClient.GetHash<TKey>(_collectionKey);
}
这不是必需的,因为这可能是一个局部变量。由于提供的代码不完整(即未编译),我不确定此实例是否被其他 API 多线程中调用的调用使用?
如果您可以将显示问题的重现放在一起,这将有助于确定问题。独立示例也有助于了解代码的使用方式。