Cassandra 集合序列化器抛出异常
Cassandra collections serializer throws exception
我有一种方法可以将一个 ID、一组 UDT 和一个日期插入 table。
public class MessageQueueNext
{
public string Id { get; set; }
public List<UDTMessage> Messages { get; set; }
public DateTimeOffset Updated { get; set; }
}
private static Lazy<PreparedStatement> InsertStatement = new
Lazy<PreparedStatement>(
() => {
return CassandraDB.Instance.Session.Prepare
(
@"INSERT INTO SomeTable
(id, messages, updated)
VALUES (?, ?, ?)"
);
});
public async Task InsertAsync(SomeObj obj)
{
var statement = BindValues(
InsertStatement.Value,
new dynamic[] {
obj.Id, obj.Messages, obj.Updated
}
);
await CassandraDB.Instance.Session.ExecuteAsync(statement);
}
private BoundStatement BindValues(PreparedStatement ps, dynamic[] values)
{
var statement = ps.Bind
(
values[0] ?? Unset.Value, values[1] ?? new List<UDTMessage>(), values[2] ?? Unset.Value
);
return statement;
}
有时此方法会抛出异常。
Collection was modified; enumeration operation may not execute.
at System.Collections.Generic.List`1.Enumerator.MoveNextRare()
at Cassandra.Serialization.CollectionSerializer.Serialize(UInt16
protocolVersion, IEnumerable value)
at Cassandra.Serialization.GenericSerializer.Serialize(ProtocolVersion
version, Object value)
at Cassandra.QueryProtocolOptions.Write(FrameWriter wb, Boolean isPrepared)
at Cassandra.Requests.BaseRequest.WriteFrame(Int16 streamId, MemoryStream stream, ISerializer connectionSerializer)
at Cassandra.OperationState.WriteFrame(Int16 streamId, MemoryStream memoryStream, ISerializer serializer, Int64 timestamp)
at Cassandra.Connections.Connection.RunWriteQueueAction()
--- End of stack trace from previous location where exception was thrown ---
任何人都可以帮助我解决可能导致这种情况的原因吗?据我了解 obj.Messages 在路上的某个地方发生了变化?或者它可能在调用 InsertAsync(...) 的函数中发生了更改,但在调用 InsertAsync(...) 之后它立即更改了 obj.Messages 值?
正在调用 InsertAsync(...) 的方法
public async Task PersistMessageQueue(UDTMessage msg)
{
var DAO = new DAO();
var list = await _cache.GetMessages(msg.Id);
if (list == null)
{
list = new List<UDTMessage>();
}
list.Add(msg);
var someObj = new SomeObj();
someObj.ToId = msg.Id;
someObj.Messages = list;
someObj.Updated = DateTimeOffset.UtcNow;
await DAO.InsertAsync(messageQueueNext);
_cache.AddElement(msg);
}
查看父方法(来自您的评论):
public async Task PersistMessageQueue(UDTMessage msg) {
var DAO = new DAO();
var list = await _cache.GetMessages(msg.Id);
if (list == null)
{
list = new List<UDTMessage>();
}
list.Add(msg);
var someObj = new SomeObj();
someObj.ToId = msg.Id;
someObj.Messages = list;
someObj.Updated = DateTimeOffset.UtcNow;
await DAO.InsertAsync(messageQueueNext);
_cache.AddElement(msg);
}
看起来您正在将 List
对象存储在缓存中,将其传递给驱动程序,但每次调用 PersistMessageQueue()
都会修改此 List
对象,因此您可能 运行 进入驱动程序迭代 List
对象而另一个对 PersistMessageQueue()
的调用正在修改同一个 List
对象的问题。
解决方案是在调用 Session.ExecuteAsync()
之前克隆 List
对象(如果这是导致问题的原因)。
我有一种方法可以将一个 ID、一组 UDT 和一个日期插入 table。
public class MessageQueueNext
{
public string Id { get; set; }
public List<UDTMessage> Messages { get; set; }
public DateTimeOffset Updated { get; set; }
}
private static Lazy<PreparedStatement> InsertStatement = new
Lazy<PreparedStatement>(
() => {
return CassandraDB.Instance.Session.Prepare
(
@"INSERT INTO SomeTable
(id, messages, updated)
VALUES (?, ?, ?)"
);
});
public async Task InsertAsync(SomeObj obj)
{
var statement = BindValues(
InsertStatement.Value,
new dynamic[] {
obj.Id, obj.Messages, obj.Updated
}
);
await CassandraDB.Instance.Session.ExecuteAsync(statement);
}
private BoundStatement BindValues(PreparedStatement ps, dynamic[] values)
{
var statement = ps.Bind
(
values[0] ?? Unset.Value, values[1] ?? new List<UDTMessage>(), values[2] ?? Unset.Value
);
return statement;
}
有时此方法会抛出异常。
Collection was modified; enumeration operation may not execute. at System.Collections.Generic.List`1.Enumerator.MoveNextRare() at Cassandra.Serialization.CollectionSerializer.Serialize(UInt16 protocolVersion, IEnumerable value) at Cassandra.Serialization.GenericSerializer.Serialize(ProtocolVersion version, Object value) at Cassandra.QueryProtocolOptions.Write(FrameWriter wb, Boolean isPrepared) at Cassandra.Requests.BaseRequest.WriteFrame(Int16 streamId, MemoryStream stream, ISerializer connectionSerializer) at Cassandra.OperationState.WriteFrame(Int16 streamId, MemoryStream memoryStream, ISerializer serializer, Int64 timestamp) at Cassandra.Connections.Connection.RunWriteQueueAction() --- End of stack trace from previous location where exception was thrown ---
任何人都可以帮助我解决可能导致这种情况的原因吗?据我了解 obj.Messages 在路上的某个地方发生了变化?或者它可能在调用 InsertAsync(...) 的函数中发生了更改,但在调用 InsertAsync(...) 之后它立即更改了 obj.Messages 值?
正在调用 InsertAsync(...) 的方法
public async Task PersistMessageQueue(UDTMessage msg)
{
var DAO = new DAO();
var list = await _cache.GetMessages(msg.Id);
if (list == null)
{
list = new List<UDTMessage>();
}
list.Add(msg);
var someObj = new SomeObj();
someObj.ToId = msg.Id;
someObj.Messages = list;
someObj.Updated = DateTimeOffset.UtcNow;
await DAO.InsertAsync(messageQueueNext);
_cache.AddElement(msg);
}
查看父方法(来自您的评论):
public async Task PersistMessageQueue(UDTMessage msg) {
var DAO = new DAO();
var list = await _cache.GetMessages(msg.Id);
if (list == null)
{
list = new List<UDTMessage>();
}
list.Add(msg);
var someObj = new SomeObj();
someObj.ToId = msg.Id;
someObj.Messages = list;
someObj.Updated = DateTimeOffset.UtcNow;
await DAO.InsertAsync(messageQueueNext);
_cache.AddElement(msg);
}
看起来您正在将 List
对象存储在缓存中,将其传递给驱动程序,但每次调用 PersistMessageQueue()
都会修改此 List
对象,因此您可能 运行 进入驱动程序迭代 List
对象而另一个对 PersistMessageQueue()
的调用正在修改同一个 List
对象的问题。
解决方案是在调用 Session.ExecuteAsync()
之前克隆 List
对象(如果这是导致问题的原因)。