为自定义 Pony 编写 select 个函数

Writing select functions for customized Pony

很多时候,我写了如下查询:

pony.orm.select(u for u in User if u.available and u.friends > 0 and ...)

因此,我想编写自己的 select 版本,作为它的替代品。就像避免我每次都写谓词的第一部分,if u.available and u.friends > 0

我的问题更笼统:我如何编写一个像 select 这样的函数,它接受 select 方法或 count 方法可以接受的参数。

让我们按以下方式定义一个 User 实体:

from datetime import datetime, timedelta
from pony.orm import *

db = Database('sqlite', ':memory:')

class User(db.Entity):
    username = Required(str, unique=True)
    password = Required(str)
    friends = Set("User", reverse='friends')  # many-to-many symmetric relation
    online = Required(bool, default=False)    # if user is currently online
    last_visit = Optional(datetime)           # last visit time
    disabled = Required(bool, default=False)  # if user is disabled by administrator

sql_debug(True)
db.generate_mapping(create_tables=True)

现在我们可以定义一些方便的函数来检索最常用的用户类型。第一个函数将 return 未被管理员禁用的用户:

def active_users():
    return User.select(lambda user: not user.disabled)

在那个函数中,我使用 select 接受 lambda 函数的实体的 select 方法,但是可以使用接受生成器表达式的全局 select 函数编写相同的函数:

def active_users():
    return select(u for u in User if not user.disabled)

active_users函数的结果是一个查询对象。您可以调用查询对象的 filter 方法来生成更具体的查询。例如,我可以使用 active_users 函数来 select 名称以 'A' 字母开头的活跃用户:

users = active_users().filter(lambda user: user.name.startswith('A')) \
                      .order_by(User.name)[:10]

现在我想找到最近几天访问该网站的用户。我可以定义另一个函数,它使用从上一个函数中查询 return 并按以下方式扩充它:

def recent_users(days=1):
    return active_users().filter(lambda u: u.last_visit > datetime.now() - timedelta(days))

在这个例子中,我将 days 参数传递给函数并在过滤器中使用它的值。

您可以定义一组这样的函数,它们将构成您应用程序的数据访问层。更多示例:

def users_with_at_least_n_friends(n=1):
    return active_users().filter(lambda u: count(u.friends) >= n)

def online_users():
    return User.select(lambda u: u.online)

def online_users_with_at_least_n_online_friends(n=1):
    return online_users().filter(lambda u: count(f for f in u.friends if f.online) >= n)

users = online_users_with_at_least_n_online_friends(n=10) \
            .order_by(User.name)[:10]

在上面的例子中我定义了全局函数。另一种选择是将这些函数定义为 User 实体的 class 方法:

class User(db.Entity):
    username = Required(str, unique=True)
    ...
    @classmethod
    def name_starts_with(cls, prefix):
        return cls.select(lambda user: not user.disabled
                                       and user.name.startswith(prefix))

...
users = User.name_starts_with('A').order_by(desc(User.last_visit))[:10]

如果您可能想要一个可以应用于不同实体 class 的通用函数,那么您需要将实体 class 作为参数传递。例如,如果许多不同的 classes 具有 deleted 属性,并且您希望有一个通用方法 select 只有未删除的对象,您可以这样写:

def select_active(cls):
    return cls.select(lambda obj: not obj.deleted)

select_active(Message).filter(lambda msg: msg.author == current_user)

上面提供的所有函数都有一个缺点——它们不可组合。您不能从一个函数获取查询并用另一个函数扩充它。如果你想拥有一个可以扩充现有查询的函数,该函数应该接受查询作为参数。示例:

def active_users():
    return User.select(lambda user: not user.disabled)

def name_starts_with(query, prefix):
    return query.filter(lambda user: user.name.startswith('prefix'))

name_starts_with 函数可以应用于另一个查询:

users1 = name_starts_with(active_users(), 'A').order_by(User.last_visited)
users2 = name_starts_with(recent_users(), 'B').filter(lambda user: user.online)

我们还在开发查询扩展 API,这将允许程序员编写自定义查询方法。当我们发布这个 API 时,可以通过以下方式将自定义查询方法链接在一起:

select(u for u in User).recent(days=3).name_starts_with('A')[:10]

希望我回答了你的问题。如果是这种情况,请采纳正确答案。