使用 .Net SDK 查询 Cosmos DB 以获取不同派生类型的列表 Microsoft.Azure.Cosmos

Query Cosmos DB to get a list of different derived types using the .Net SDK Microsoft.Azure.Cosmos

我们有一个接口和一个具有多个派生类型的基 class。

public interface IEvent
{
    [JsonProperty("id")]
    public string Id { get; set; }

    string Type { get; }
}

public abstract class EventBase: IEvent
{
    public string Id { get; set; }
    public abstract string Type { get; }
}

public class UserCreated : EventBase
{
    public override string Type { get; } = typeof(UserCreated).AssemblyQualifiedName;
}

public class UserUpdated : EventBase
{
    public override string Type { get; } = typeof(UserUpdated).AssemblyQualifiedName;
}

我们使用 .Net SDK v3 Microsoft.Azure.Cosmos 将这些不同派生类型的事件存储在 Cosmos DB 中的同一容器中。然后我们想要读取所有事件并将它们反序列化为正确的类型。

public class CosmosDbTests
{
    [Fact]
    public async Task TestFetchingDerivedTypes()
    {
        var endpoint = "";
        var authKey = "";
        var databaseId ="";
        var containerId="";

        var client = new CosmosClient(endpoint, authKey);

        var container = client.GetContainer(databaseId, containerId);

        await container.CreateItemAsync(new UserCreated{ Id = Guid.NewGuid().ToString() });
        await container.CreateItemAsync(new UserUpdated{ Id = Guid.NewGuid().ToString() });

        var queryable = container.GetItemLinqQueryable<IEvent>();

        var query = queryable.ToFeedIterator();
        var list = new List<IEvent>();

        while (query.HasMoreResults)
        {
            list.AddRange(await query.ReadNextAsync());
        }

        Assert.NotEmpty(list);
    }
}

似乎没有任何选项可以告诉 GetItemLinqQueryable 如何处理类型。是否有任何其他方法或方法可以在一个查询中支持多个派生类型?

如果有帮助,可以将事件放在某种包装实体中,但不允许将它们作为序列化字符串存储在 属性.

Stephen 的评论清楚地为我指明了正确的方向,并且在这个博客的帮助下 https://thomaslevesque.com/2019/10/15/handling-type-hierarchies-in-cosmos-db-part-2/ 我最终得到了一个类似于以下示例的解决方案,我们有一个自定义 CosmosSerializer使用读取类型 属性.

的自定义 JsonConverter
public interface IEvent
{
    [JsonProperty("id")]
    public string Id { get; set; }

    [JsonProperty("$type")]
    string Type { get; }
}

public abstract class EventBase: IEvent
{
    public string Id { get; set; }
    public string Type => GetType().AssemblyQualifiedName;
}

public class UserCreated : EventBase
{
}

public class UserUpdated : EventBase
{
}

EventJsonConverter 读取 Type 属性.

public class EventJsonConverter : JsonConverter
{
    // This converter handles only deserialization, not serialization.
    public override bool CanRead => true;
    public override bool CanWrite => false;

    public override bool CanConvert(Type objectType)
    {
        // Only if the target type is the abstract base class
        return objectType == typeof(IEvent);
    }

    public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
    {
        // First, just read the JSON as a JObject
        var obj = JObject.Load(reader);

        // Then look at the $type property:
        var typeName = obj["$type"]?.Value<string>();

        return typeName == null ? null : obj.ToObject(Type.GetType(typeName), serializer);

    }

    public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
    {
        throw new NotSupportedException("This converter handles only deserialization, not serialization.");
    }
}

NewtonsoftJsonCosmosSerializer 接受一个用于序列化的 JsonSerializerSettings

public class NewtonsoftJsonCosmosSerializer : CosmosSerializer
{
    private static readonly Encoding DefaultEncoding = new UTF8Encoding(false, true);

    private readonly JsonSerializer _serializer;

    public NewtonsoftJsonCosmosSerializer(JsonSerializerSettings settings)
    {
        _serializer = JsonSerializer.Create(settings);
    }

    public override T FromStream<T>(Stream stream)
    {
        if (typeof(Stream).IsAssignableFrom(typeof(T)))
        {
            return (T)(object)stream;
        }

        using var sr = new StreamReader(stream);
        using var jsonTextReader = new JsonTextReader(sr);

        return _serializer.Deserialize<T>(jsonTextReader);
    }

    public override Stream ToStream<T>(T input)
    {
        var streamPayload = new MemoryStream();
        using var streamWriter = new StreamWriter(streamPayload, encoding: DefaultEncoding, bufferSize: 1024, leaveOpen: true);
        using JsonWriter writer = new JsonTextWriter(streamWriter);

        writer.Formatting = _serializer.Formatting;
        _serializer.Serialize(writer, input);
        writer.Flush();
        streamWriter.Flush();

        streamPayload.Position = 0;
        return streamPayload;
    }
}

CosmosClient 现在是用我们自己的 NewtonsoftJsonCosmosSerializerEventJsonConverter 创建的。

public class CosmosDbTests
{
    [Fact]
    public async Task TestFetchingDerivedTypes()
    {
        var endpoint = "";
        var authKey = "";
        var databaseId ="";
        var containerId="";

        var client = new CosmosClient(endpoint, authKey, new CosmosClientOptions
        {
            Serializer = new NewtonsoftJsonCosmosSerializer(new JsonSerializerSettings
            {
                Converters = { new EventJsonConverter() }
            })
        });

        var container = client.GetContainer(databaseId, containerId);

        await container.CreateItemAsync(new UserCreated{ Id = Guid.NewGuid().ToString() });
        await container.CreateItemAsync(new UserUpdated{ Id = Guid.NewGuid().ToString() });

        var queryable = container.GetItemLinqQueryable<IEvent>();

        var query = queryable.ToFeedIterator();
        var list = new List<IEvent>();

        while (query.HasMoreResults)
        {
            list.AddRange(await query.ReadNextAsync());
        }

        Assert.NotEmpty(list);
    }
}