Cassandra 用户定义类型上的 Django 查询集抛出类型错误

Django Queryset on Cassandra User Defined Type throws Type Error

我正在使用 DataStax Python Driver and the Django Cassandra Engine 的组合通过 Django 应用程序与 Cassandra 通信。

以下是我使用用户定义类型列定义模型的方式(仅示例):

from cassandra.cqlengine import columns
from cassandra.cqlengine.usertype import UserType
from django_cassandra_engine.models import DjangoCassandraModel

class UserAddress(UserType)
    street = columns.Text()
    number = columns.Integer()

class User(DjangoCassandraModel):
    __table_name__ = 'users'

    user_id = columns.UUID(primary_key=True)
    name = columns.Text()
    address = columns.UserDefinedType(UserAddress)


class Meta:
    managed = False
    get_pk_field = 'user_id'

我正在尝试通过以下调用查询此模型:

User.objects.filter(user_id=user_id)

抛出此错误:

File "/usr/local/lib/python3.8/site-packages/cassandra/cqlengine/columns.py", line 1043, in to_python

if copied_value[name] is not None or isinstance(field, BaseContainerColumn):

TypeError: tuple indices must be integers or slices, not str

此错误源自 UserDefinedType class 声明,特别是此函数:

    def to_python(self, value):
    if value is None:
        return

    copied_value = deepcopy(value)
    for name, field in self.user_type._fields.items():
        if copied_value[name] is not None or isinstance(field, BaseContainerColumn):
            copied_value[name] = field.to_python(copied_value[name])

    return copied_value

在我看来,包含 DjangoCassandraQuerySet class 的 Django Cassandra 引擎与 UserDefinedType class 配合使用时效果不佳,但我'我不知道为什么。

到目前为止,我对这些库没有任何问题,我觉得奇怪的是不支持这样的操作。我希望我错过了某处的配置,或者至少该问题是可以解决的。

感谢您的宝贵时间。

感谢 Django Cassandra 引擎问题页面上的 this comment 解决了这个问题。

似乎 DataStax Python 驱动程序将未注册的类型解释为普通字典,导致它们被反序列化为一系列元组而不是正确的字典,解释了错误消息。

解决方案是在每次启动连接时注册 UDT。每次都必须这样做,因为它似乎不会持续存在。

为此,请在您的 UDT class 声明之后添加此代码:

from cassandra.cqlengine import columns
from cassandra.cqlengine.usertype import UserType    
from django.db import connections
from cassandra.cluster import UserTypeDoesNotExist

class UserAddress(UserType)
    street = columns.Text()
    number = columns.Integer()

user_types = [ UserAddress ]
database_name = 'DjangoDBName'
keyspace = 'CassandraKeyspace'
connection = connections[database_name]

for udt in user_types:
  try:
    connection.connection.cluster.register_user_type(keyspace, udt.type_name(), udt)
  except UserTypeDoesNotExist:
    pass

或者,如果您有 UDT class 声明包,则可以将此代码添加到 __init__.py 文件中。

为什么 Django Cassandra 引擎不自动注册这些类型,我不知道。