Cassandra 用户定义类型上的 Django 查询集抛出类型错误
Django Queryset on Cassandra User Defined Type throws Type Error
我正在使用 DataStax Python Driver and the Django Cassandra Engine 的组合通过 Django 应用程序与 Cassandra 通信。
以下是我使用用户定义类型列定义模型的方式(仅示例):
from cassandra.cqlengine import columns
from cassandra.cqlengine.usertype import UserType
from django_cassandra_engine.models import DjangoCassandraModel
class UserAddress(UserType)
street = columns.Text()
number = columns.Integer()
class User(DjangoCassandraModel):
__table_name__ = 'users'
user_id = columns.UUID(primary_key=True)
name = columns.Text()
address = columns.UserDefinedType(UserAddress)
class Meta:
managed = False
get_pk_field = 'user_id'
我正在尝试通过以下调用查询此模型:
User.objects.filter(user_id=user_id)
抛出此错误:
File "/usr/local/lib/python3.8/site-packages/cassandra/cqlengine/columns.py", line 1043, in to_python
if copied_value[name] is not None or isinstance(field, BaseContainerColumn):
TypeError: tuple indices must be integers or slices, not str
此错误源自 UserDefinedType
class 声明,特别是此函数:
def to_python(self, value):
if value is None:
return
copied_value = deepcopy(value)
for name, field in self.user_type._fields.items():
if copied_value[name] is not None or isinstance(field, BaseContainerColumn):
copied_value[name] = field.to_python(copied_value[name])
return copied_value
在我看来,包含 DjangoCassandraQuerySet
class 的 Django Cassandra 引擎与 UserDefinedType
class 配合使用时效果不佳,但我'我不知道为什么。
到目前为止,我对这些库没有任何问题,我觉得奇怪的是不支持这样的操作。我希望我错过了某处的配置,或者至少该问题是可以解决的。
感谢您的宝贵时间。
感谢 Django Cassandra 引擎问题页面上的 this comment 解决了这个问题。
似乎 DataStax Python 驱动程序将未注册的类型解释为普通字典,导致它们被反序列化为一系列元组而不是正确的字典,解释了错误消息。
解决方案是在每次启动连接时注册 UDT。每次都必须这样做,因为它似乎不会持续存在。
为此,请在您的 UDT class 声明之后添加此代码:
from cassandra.cqlengine import columns
from cassandra.cqlengine.usertype import UserType
from django.db import connections
from cassandra.cluster import UserTypeDoesNotExist
class UserAddress(UserType)
street = columns.Text()
number = columns.Integer()
user_types = [ UserAddress ]
database_name = 'DjangoDBName'
keyspace = 'CassandraKeyspace'
connection = connections[database_name]
for udt in user_types:
try:
connection.connection.cluster.register_user_type(keyspace, udt.type_name(), udt)
except UserTypeDoesNotExist:
pass
或者,如果您有 UDT class 声明包,则可以将此代码添加到 __init__.py
文件中。
为什么 Django Cassandra 引擎不自动注册这些类型,我不知道。
我正在使用 DataStax Python Driver and the Django Cassandra Engine 的组合通过 Django 应用程序与 Cassandra 通信。
以下是我使用用户定义类型列定义模型的方式(仅示例):
from cassandra.cqlengine import columns
from cassandra.cqlengine.usertype import UserType
from django_cassandra_engine.models import DjangoCassandraModel
class UserAddress(UserType)
street = columns.Text()
number = columns.Integer()
class User(DjangoCassandraModel):
__table_name__ = 'users'
user_id = columns.UUID(primary_key=True)
name = columns.Text()
address = columns.UserDefinedType(UserAddress)
class Meta:
managed = False
get_pk_field = 'user_id'
我正在尝试通过以下调用查询此模型:
User.objects.filter(user_id=user_id)
抛出此错误:
File "/usr/local/lib/python3.8/site-packages/cassandra/cqlengine/columns.py", line 1043, in to_python
if copied_value[name] is not None or isinstance(field, BaseContainerColumn):
TypeError: tuple indices must be integers or slices, not str
此错误源自 UserDefinedType
class 声明,特别是此函数:
def to_python(self, value):
if value is None:
return
copied_value = deepcopy(value)
for name, field in self.user_type._fields.items():
if copied_value[name] is not None or isinstance(field, BaseContainerColumn):
copied_value[name] = field.to_python(copied_value[name])
return copied_value
在我看来,包含 DjangoCassandraQuerySet
class 的 Django Cassandra 引擎与 UserDefinedType
class 配合使用时效果不佳,但我'我不知道为什么。
到目前为止,我对这些库没有任何问题,我觉得奇怪的是不支持这样的操作。我希望我错过了某处的配置,或者至少该问题是可以解决的。
感谢您的宝贵时间。
感谢 Django Cassandra 引擎问题页面上的 this comment 解决了这个问题。
似乎 DataStax Python 驱动程序将未注册的类型解释为普通字典,导致它们被反序列化为一系列元组而不是正确的字典,解释了错误消息。
解决方案是在每次启动连接时注册 UDT。每次都必须这样做,因为它似乎不会持续存在。
为此,请在您的 UDT class 声明之后添加此代码:
from cassandra.cqlengine import columns
from cassandra.cqlengine.usertype import UserType
from django.db import connections
from cassandra.cluster import UserTypeDoesNotExist
class UserAddress(UserType)
street = columns.Text()
number = columns.Integer()
user_types = [ UserAddress ]
database_name = 'DjangoDBName'
keyspace = 'CassandraKeyspace'
connection = connections[database_name]
for udt in user_types:
try:
connection.connection.cluster.register_user_type(keyspace, udt.type_name(), udt)
except UserTypeDoesNotExist:
pass
或者,如果您有 UDT class 声明包,则可以将此代码添加到 __init__.py
文件中。
为什么 Django Cassandra 引擎不自动注册这些类型,我不知道。