Cassandra 集合序列化器抛出异常

Cassandra collections serializer throws exception

我有一种方法可以将一个 ID、一组 UDT 和一个日期插入 table。

public class MessageQueueNext
{
    public string Id { get; set; }
    public List<UDTMessage> Messages { get; set; }
    public DateTimeOffset Updated { get; set; }
}

private static Lazy<PreparedStatement> InsertStatement = new 
    Lazy<PreparedStatement>(
        () => {
            return CassandraDB.Instance.Session.Prepare
            (
                @"INSERT INTO SomeTable
                (id, messages, updated)
                VALUES (?, ?, ?)"
            );
        });

public async Task InsertAsync(SomeObj obj)
{
    var statement = BindValues(
        InsertStatement.Value,
        new dynamic[] {
            obj.Id, obj.Messages, obj.Updated
        }
    );

    await CassandraDB.Instance.Session.ExecuteAsync(statement);
}

private BoundStatement BindValues(PreparedStatement ps, dynamic[] values)
{
    var statement = ps.Bind
    (
        values[0] ?? Unset.Value, values[1] ?? new List<UDTMessage>(), values[2] ?? Unset.Value
    );

    return statement;
}

有时此方法会抛出异常。

Collection was modified; enumeration operation may not execute. at System.Collections.Generic.List`1.Enumerator.MoveNextRare() at Cassandra.Serialization.CollectionSerializer.Serialize(UInt16 protocolVersion, IEnumerable value) at Cassandra.Serialization.GenericSerializer.Serialize(ProtocolVersion version, Object value) at Cassandra.QueryProtocolOptions.Write(FrameWriter wb, Boolean isPrepared) at Cassandra.Requests.BaseRequest.WriteFrame(Int16 streamId, MemoryStream stream, ISerializer connectionSerializer) at Cassandra.OperationState.WriteFrame(Int16 streamId, MemoryStream memoryStream, ISerializer serializer, Int64 timestamp) at Cassandra.Connections.Connection.RunWriteQueueAction() --- End of stack trace from previous location where exception was thrown ---

任何人都可以帮助我解决可能导致这种情况的原因吗?据我了解 obj.Messages 在路上的某个地方发生了变化?或者它可能在调用 InsertAsync(...) 的函数中发生了更改,但在调用 InsertAsync(...) 之后它立即更改了 obj.Messages 值?

正在调用 InsertAsync(...) 的方法

public async Task PersistMessageQueue(UDTMessage msg)
{
    var DAO = new DAO();
    var list = await _cache.GetMessages(msg.Id);
    if (list == null)
    {
        list = new List<UDTMessage>();
    }
    list.Add(msg);
    var someObj = new SomeObj();
    someObj.ToId = msg.Id;
    someObj.Messages = list;
    someObj.Updated = DateTimeOffset.UtcNow;
    await DAO.InsertAsync(messageQueueNext);
    _cache.AddElement(msg);
}

查看父方法(来自您的评论):

public async Task PersistMessageQueue(UDTMessage msg) {    
    var DAO = new DAO();     
    var list = await _cache.GetMessages(msg.Id);   
    if (list == null)
    {   
        list = new List<UDTMessage>();  
    }  
    list.Add(msg);   
    var someObj = new SomeObj();  
    someObj.ToId = msg.Id;  
    someObj.Messages = list;    
    someObj.Updated = DateTimeOffset.UtcNow;  
    await DAO.InsertAsync(messageQueueNext); 
    _cache.AddElement(msg); 
}

看起来您正在将 List 对象存储在缓存中,将其传递给驱动程序,但每次调用 PersistMessageQueue() 都会修改此 List 对象,因此您可能 运行 进入驱动程序迭代 List 对象而另一个对 PersistMessageQueue() 的调用正在修改同一个 List 对象的问题。

解决方案是在调用 Session.ExecuteAsync() 之前克隆 List 对象(如果这是导致问题的原因)。