基于每个条目的 Django Queryset 过滤

Django Queryset filtering based on each entry

给定下面的汽车沿着某条道路行驶的 Django 模型以及开始和结束时间:

class Travel(models.Model):
  car = models.CharField()
  road = models.CharField()
  start = models.DateTimeField()
  end = models.DateTimeField()

我想识别与目标汽车 x 在同一条道路上至少 m 分钟的一组汽车 X。

我应该如何获得所需的车组X?

我的尝试:

假设我使用过滤来获取 x 所处的行程集 T。

T <-- Travel.objects.filter(car=x)

然后我用暴力破解:

for t in T:
  possible_travels <-- filter Travel.objects with car=/=x, road=t.road, start < t.end, end > t.start 
  
  confirmed_travels <-- further filter possible_travels with the overlapping region being at least m minutes long

  confirmed_cars <-- confirmed_travels.values('cars').distinct()

但是,问题是:

  1. 循环查询可能会涉及到很多DB命中
  2. 此外,confirmed_cars 给出了一个 QuerySet 对象。所以看来我需要以某种方式将这些 QuerySet 对象附加在一起。我看到其他帖子在做一些事情,比如转换为列表,然后附加,最后转换回 QuerySet,但有些人说这不是一个好方法,我应该做这样的事情吗?

有没有更好的方法来解决这个问题? for 循环真的有必要吗?我可以完全避免吗?

编辑: 对于 2),我想一种方法是提取 car 属性并附加到列表中,然后执行 .filter(car__in=the_list),与其他相比这不是一个大问题。

我们可以有两种方法:

  1. 这是我使用查询集且没有任何循环的解决方案。为了找到 startend 次之间的交集,我将查询集分为 4 类:
  • 开始 1 -> 开始 2 -> 结束 1 -> 结束 2 (q_slides_before) (交点 = 结束 1 - 开始 2)
  • start1 -> start2 -> end2 -> end1 (q_contains) (交点 = end2 - start2)
  • start2 -> start1 -> end1 -> end2 (q_is_contained) (交点 = end1 - start1)
  • start2 -> start1 -> end2 -> end1 (q_slides_after) (交点 = end2 - start1)
from django.db.models import OuterRef, ExpressionWrapper,\
                             F, Q, functions, DurationField, FloatField

# Use this function instead of "annotate_delta" if your db supports DurationField (postgres supports this but sqlite does not)
def annotate_delta_with_duration_support(qs, start, end):
    duration_exp = ExpressionWrapper(end - start, output_field=DurationField())
    return qs.annotate(delta=functions.ExtractMinute(duration_exp))


def annotate_delta(qs, start, end):
    duration_exp = ExpressionWrapper((end - start) / (60 * 10**6),
                                     output_field=FloatField())
    return qs.annotate(delta=duration_exp)


x = 'mycar'
m = 20

q_is_contained = Q(start__gte=OuterRef('start')) & Q(end__lte=OuterRef('end'))
qs = annotate_delta(Travel.objects, F('start'), F('end'))
qs = qs.filter(q_is_contained, delta__gte=m, car=x, road=OuterRef('road'))
res1 = Travel.objects.exclude(car=x).annotate(ex=Exists(qs)).filter(ex=True)


q_contains = Q(start__lte=OuterRef('start')) & Q(end__gte=OuterRef('end'))
qs = annotate_delta(Travel.objects, OuterRef('start'), OuterRef('end'))
qs = qs.filter(q_contains, delta__gte=m, car=x, road=OuterRef('road'))
res2 = Travel.objects.exclude(car=x).annotate(ex=Exists(qs)).filter(ex=True)


q_slides_before = Q(start__lte=OuterRef('start')) & \
                  Q(end__lte=OuterRef('end')) & \
                  Q(end__gte=OuterRef('start'))
qs = annotate_delta(Travel.objects, OuterRef('start'), F('end'))
qs = qs.filter(q_slides_before, delta__gte=m, car=x, road=OuterRef('road'))
res3 = Travel.objects.exclude(car=x).annotate(ex=Exists(qs)).filter(ex=True)


q_slides_after = Q(start__gte=OuterRef('start')) & \
                 Q(end__gte=OuterRef('end')) & \
                 Q(start__lte=OuterRef('end'))
qs = annotate_delta(Travel.objects, F('start'), OuterRef('end'))
qs = qs.filter(q_slides_after, delta__gte=m, car=x, road=OuterRef('road'))
res4 = Travel.objects.exclude(car=x).annotate(ex=Exists(qs)).filter(ex=True)

res = res1 | res2 | res3 | res4

  1. 感谢@SUTerliakov,我发现还有一个更简洁的解决方案:
from django.db.models import OuterRef, ExpressionWrapper,\
                             F, functions, DurationField, FloatField


# Use this function instead of "annotate_delta" if your db supports DurationField (postgres supports this but sqlite does not)
def annotate_delta_with_duration_support(qs, start, end):
    duration_exp = ExpressionWrapper(end - start, output_field=DurationField())
    return qs.annotate(delta=functions.ExtractMinute(duration_exp))


def annotate_delta(qs, start, end):
    duration_exp = ExpressionWrapper((end - start) / (60 * 10**6),
                                     output_field=FloatField())
    return qs.annotate(delta=duration_exp)


x = 'mycar'
m = 20

qs = annotate_delta(Travel.objects, functions.Greatest(F('start'), OuterRef('start')),
                    functions.Least(F('end'), OuterRef('end')))
qs = qs.filter(q_slides_after, delta__gte=m, car=x, road=OuterRef('road'))
res = Travel.objects.exclude(car=x).annotate(ex=Exists(qs)).filter(ex=True)

如果您不编写可重用的库并且肯定受困于某些特定的数据库后端,您也可以使用原始 SQL。这不是最佳选择,但如果此查询的性能成为瓶颈 - 单个查询可能会更快。像这样的东西(未经测试,但应该在 postgres 中工作,需要稍微修改以与其他后端一起工作):

from django.db import connection
with connection.cursor() as cur:
    cur.execute('''
        SELECT others.car
        FROM {table} base
            INNER JOIN {table} as others
                on base.car <> others.car
                AND ((LEAST(base.end, others.end) - GREATEST(base.start, others.start)) > interval '%s minutes')
        WHERE base.car = %s
    '''.format(table=Travel._meta.db_table), (5, 'car1'))
    cur.fetchall()  # should return list of 1-tuples with proper cars