django 如何按2列对结果进行排序、将预取结果限制为一条记录以及按预取列进行排序

ut6juiuv  于 2023-06-07  发布在  Go
关注(0)|答案(1)|浏览(161)

我有很多问题我自己解决不了。
1.在CaseManager.get_queryset()中:我试图添加一个where子句,说明case__id等于movement__case__id

queryset=Movement.objects.filter(case__id=models.F('case__id')).order_by('-date'),

但是查询变成

...
WHERE ("cm_movement"."case_id" = ("cm_movement"."case_id") AND 
...

完整查询:

SELECT "cm_movement"."id",
       "cm_movement"."case_id",
       "cm_movement"."direction",
       "cm_movement"."date",
       "cm_movement"."reason_id",
       "cm_movement"."details",
       "cm_movement"."created_at",
       "cm_movement"."updated_at",
       "cm_case"."id",
       "cm_case"."police_station_id",
       "cm_case"."number",
       "cm_case"."year",
       "cm_case"."kind",
       "cm_case"."registered_at",
       "cm_case"."created_at",
       "cm_case"."updated_at",
       "cm_movement_reason"."id",
       "cm_movement_reason"."direction",
       "cm_movement_reason"."reason",
       "cm_movement_reason"."created_at",
       "cm_movement_reason"."updated_at"
  FROM "cm_movement"
 INNER JOIN "cm_case"
    ON ("cm_movement"."case_id" = "cm_case"."id")
 INNER JOIN "cm_movement_reason"
    ON ("cm_movement"."reason_id" = "cm_movement_reason"."id")
 WHERE ("cm_movement"."case_id" = ("cm_movement"."case_id") AND "cm_movement"."case_id" IN ('41', '35', '29', '26', '44', '40', '34', '39', '32', '38', '31', '33', '30', '28', '27', '25', '43', '37', '36', '42'))
 ORDER BY "cm_movement"."date" DESC

1.我怎样才能只得到最早或最晚(只有一个运动),这是在prefetch_relatedCaseManager.get_queryset()
1.如何按latest_movement对由CaseManager.get_queryset()生成的结果进行排序,并用于:CaseAdmin.latest_movement()
1.在CaseAdmin.ordering中,应该按年份以DESC顺序对结果进行排序,如果有多个记录具有相同的year,则应该按numberDESC顺序对它们进行排序,这就是为什么我使用CaseAdmin.ordering = ['-year', '-number'],但实际情况是它们只按year排序。

如何修复?
以下是我的代码:

管理员:

class PoliceStationManager(models.Manager):
    def get_queryset(self, *args, **kwargs) -> models.QuerySet:
        return super().get_queryset(*args, **kwargs).annotate(
            misdemeanors_count=models.Count('case', filter=models.Q(case__kind=Case.Kind.MISDEMEANOR)),
            traffics_count=models.Count('case', filter=models.Q(case__kind=Case.Kind.TRAFFIC)),
            cases_count=models.Count('case'),
        )

class CaseManager(models.Manager):
    def get_queryset(self, *args, **kwargs) -> models.QuerySet:
        return super(CaseManager, self).get_queryset(*args, **kwargs).select_related('police_station').prefetch_related(
            models.Prefetch(
                'movement_set',
                queryset=Movement.objects.filter(case__id=models.F('case__id')).order_by('-date'),
                # Does not work:
                # queryset=Movement.objects.filter(case_id=models.F('case__id')).earliest('-date'),
                to_attr='latest_movement'
            )
        )

class MovementManager(models.Manager):
    def get_queryset(self, *args, **kwargs) -> models.QuerySet:
        return super(MovementManager, self).get_queryset(*args, **kwargs).select_related(
            'reason', 'case').prefetch_related('case__police_station')

型号:

TABLE_PREFIX = 'cm_'  # cases_
TABLE_SUFFIX = ''     # _tbl

# Create your models here.

class PoliceStation(models.Model):
    class Meta:
        db_table = f'{TABLE_PREFIX}policestation{TABLE_SUFFIX}'
        verbose_name = _('police station')
        verbose_name_plural = _('police stations')

    objects = PoliceStationManager()

    id = models.BigAutoField(
        unique=True, primary_key=True, null=False, blank=False, db_index=True,
        db_column='id', verbose_name=_('Case ID'))
    name = models.CharField(
        unique=False, null=False, blank=False, db_index=True,
        db_column='name', verbose_name=_('Police Station Name'),
        max_length=25)
    created_at = models.DateTimeField(
        auto_now=False, auto_now_add=True, db_column='created_at',
        verbose_name=_('Created at'))
    updated_at = models.DateTimeField(
        auto_now=True, auto_now_add=False, db_column='updated_at',
        verbose_name=_('Updated at'))

    def __str__(self):
        return f'{self.name}'

class Case(models.Model):
    class Meta:
        db_table = f'{TABLE_PREFIX}case{TABLE_SUFFIX}'
        verbose_name = _('case')
        verbose_name_plural = _('cases')
        ordering = ['-year', '-number', 'police_station__name', 'kind']
        constraints = [
            models.UniqueConstraint(fields=['number', 'year', 'kind', 'police_station'], name='unique_case2')]

    class Kind(models.TextChoices):
        MISDEMEANOR = 'M', _('Misdemeanor')
        TRAFFIC = 'T', _('Traffic')

    objects = CaseManager()

    id = models.BigAutoField(
        unique=True, primary_key=True, null=False, blank=False, db_index=True,
        db_column='id', verbose_name=_('Case ID'))
    police_station = models.ForeignKey(
        PoliceStation, on_delete=models.PROTECT,
        unique=False, null=False, blank=False, db_index=True,
        db_column='police_station_id', verbose_name=_('Police Station'))
    number = models.CharField(
        unique=False, null=False, blank=False, db_index=True,
        db_column='number', verbose_name=_('Case Number'),
        max_length=4)
    year = models.CharField(
        unique=False, null=False, blank=False, db_index=True,
        db_column='year', verbose_name=_('Case Year'),
        max_length=4)
    kind = models.CharField(
        unique=False, null=False, blank=False, db_index=False,
        db_column='kind', verbose_name=_('Case Kind'),
        max_length=1, choices=Kind.choices, default=Kind.MISDEMEANOR)
    registered_at = models.DateTimeField(
        auto_now=False, auto_now_add=False, db_column='registered_at', db_index=True,
        null=True, default=timezone.now,    # TODO: remove this line, once we update all current db records
        verbose_name=_('Registration Date'))
    created_at = models.DateTimeField(
        auto_now=False, auto_now_add=True, db_column='created_at',
        verbose_name=_('Created at'))
    updated_at = models.DateTimeField(
        auto_now=True, auto_now_add=False, db_column='updated_at',
        verbose_name=_('Updated at'))

    def __str__(self):
        context = {
            'number': self.number,
            'year': self.year,
            'kind': self.get_kind_display(),
            # 'police_station': '???'
            'police_station': self.police_station.name
        }
        return _('%(number)s/%(year)s %(kind)s %(police_station)s') % context

class MovementReason(models.Model):
    class Meta:
        db_table = f'{TABLE_PREFIX}movement_reason{TABLE_SUFFIX}'
        verbose_name = _('movement reason')
        verbose_name_plural = _('movement reasons')
        ordering = ['direction']

    class Direction(models.TextChoices):
        SENT = 'S', _('Outbound')
        RETURNED = 'R', _('Inbound')

    id = models.BigAutoField(
        unique=True, primary_key=True, null=False, blank=False, db_index=True,
        db_column='id', verbose_name=_('Movement Reason ID'))
    direction = models.CharField(
        unique=False, null=False, blank=False, db_index=False,
        db_column='direction', verbose_name=_('Movement Type'),
        max_length=1, choices=Direction.choices, default=Direction.SENT)
    reason = models.CharField(
        unique=False, null=False, blank=False, db_index=True,
        db_column='reason', verbose_name=_('Reason'),
        max_length=50)
    created_at = models.DateTimeField(
        auto_now=False, auto_now_add=True, db_column='created_at',
        verbose_name=_('Created at'))
    updated_at = models.DateTimeField(
        auto_now=True, auto_now_add=False, db_column='updated_at',
        verbose_name=_('Updated at'))

    def __str__(self):
        return f'{self.get_direction_display()}: {self.reason}'

class Movement(models.Model):
    class Meta:
        db_table = f'{TABLE_PREFIX}movement{TABLE_SUFFIX}'
        verbose_name = _('movement')
        verbose_name_plural = _('movements')

    class Direction(models.TextChoices):
        SENT = 'S', _('Sent')
        RETURNED = 'R', _('Returned')

    objects = MovementManager()

    id = models.BigAutoField(
        unique=True, primary_key=True, null=False, blank=False, db_index=True,
        db_column='id', verbose_name=_('Movement ID'))
    case = models.ForeignKey(
        Case, on_delete=models.CASCADE,
        unique=False, null=False, blank=False, db_index=True,
        db_column='case_id',
        verbose_name=_('Case number'))
    direction = models.CharField(
        unique=False, null=False, blank=False, db_index=False,
        db_column='direction', verbose_name=_('Movement Direction'),
        max_length=1, choices=Direction.choices, default=Direction.SENT)
    date = models.DateField(
        auto_now=False, auto_now_add=False, db_column='date', db_index=True,
        verbose_name=_('Movement Date'))
    reason = models.ForeignKey(
        MovementReason, on_delete=models.PROTECT,
        unique=False, null=False, blank=False, db_index=True,
        db_column='reason_id',
        verbose_name=_('Movement Reason'))
    details = models.TextField(
        unique=False, null=False, blank=True, db_index=False,
        db_column='details',
        verbose_name=_('Movement Details'))
    created_at = models.DateTimeField(
        auto_now=False, auto_now_add=True, db_column='created_at',
        verbose_name=_('Created at'))
    updated_at = models.DateTimeField(
        auto_now=True, auto_now_add=False, db_column='updated_at',
        verbose_name=_('Updated at'))

    def __str__(self):
        on_date = _('on date')
        return f"{self.case} {self.get_direction_display()} {on_date} {self.date}: {self.reason}"

应用管理员:

# ------------------------------
# Cases
# ------------------------------

# admin.site.register(Case)
@admin.register(Case)
class CaseAdmin(myadmin.DefaultModelAdmin):
    list_display = ['case_number', 'number', 'year', 'kind', 'police_station_name', 'registered_at', 'latest_movement']
    list_select_related = ['police_station']
    list_select_prefetch = ['movement_sets']
    fields = ['registered_at', ('number', 'year'), ('kind', 'police_station'), ('created_at', 'updated_at',)]
    ordering = ['-year', '-number']
    date_hierarchy = 'registered_at'

    # Computed columns

    @admin.display(empty_value="???", ordering='police_station__name', description=_('Police Station Name'))
    def police_station_name(self, case):
        anchor_url = (
            reverse('admin:case_movements_policestation_changelist')
            + '?'
            + urlencode({
                # 'id': case.police_station_id,
                'id': case.police_station.id,
            })
        )
        anchor_title = case.police_station.name
        return format_html(f'<a href="{anchor_url}">{anchor_title}</a>') if anchor_title else ''

    @admin.display(empty_value="", ordering='movement__date', description=_('Latest movement'))
    # @admin.display(empty_value="", ordering='movement__latest_movement', description=_('Latest movement'))
    def latest_movement(self, case):
        if hasattr(case, 'latest_movement') and case.latest_movement:
            movement = case.latest_movement[0]
            anchor_url = (
                reverse('admin:case_movements_movement_changelist')
                + '?'
                + urlencode({
                    'case__id': case.id,
                })
            )
            anchor_title = _('%(direction)s: %(reason)s on %(date)s') % {
                'direction': movement.reason.get_direction_display(),
                'reason': movement.reason.reason,
                'date': movement.date,
            }
            return format_html(f'<a href="{anchor_url}">{anchor_title}</a>') if anchor_title else ''
        return None

    @admin.display(ordering=Concat('year', Value(' '), 'number'))
    def case_number(self, case):
        return f'{case.number} / {case.year}'

# ------------------------------
# Police Stations
# ------------------------------

@admin.register(PoliceStation)
class PoliceStationAdmin(myadmin.DefaultModelAdmin):
    list_display = ['fullname', 'misdemeanors_count', 'traffics_count', 'cases_count']
    fields = ['name', ('created_at', 'updated_at',)]
    ordering = ['name']

    # Computed columns

    @admin.display(description=_('Police Station Name'), ordering='name')
    def fullname(self, police_station):
        return f'{_("police station")} {police_station.name}'

    @admin.display(description=_('Misdemeanors count'), ordering='misdemeanors_count')
    def misdemeanors_count(self, police_station):
        anchor_url = (
            reverse('admin:case_movements_case_changelist')
            + '?'
            + urlencode({
                'police_station_id': police_station.id,
                'kind': Case.Kind.MISDEMEANOR,
            })
        )
        anchor_title = police_station.misdemeanors_count
        return format_html(f'<a href="{anchor_url}">{anchor_title}</a>') if anchor_title > 0 else anchor_title

    @admin.display(description=_('Traffics count'), ordering='traffics_count')
    def traffics_count(self, police_station):
        anchor_url = (
            reverse('admin:case_movements_case_changelist')
            + '?'
            + urlencode({
                'police_station_id': police_station.id,
                'kind': Case.Kind.TRAFFIC,
            })
        )
        anchor_title = police_station.traffics_count
        return format_html(f'<a href="{anchor_url}">{anchor_title}</a>') if anchor_title > 0 else anchor_title

    @admin.display(description=_('Total cases'), ordering='cases_count')
    def cases_count(self, police_station):
        anchor_url = (
            reverse('admin:case_movements_case_changelist')
            + '?'
            + urlencode({
                'police_station_id': police_station.id,
            })
        )
        anchor_title = police_station.cases_count
        return format_html(f'<a href="{anchor_url}">{anchor_title}</a>') if anchor_title > 0 else anchor_title

# ------------------------------
# Movements
# ------------------------------

@admin.register(Movement)
class MovementAdmin(myadmin.DefaultModelAdmin):
    list_display = ['case', 'date', 'direction', 'computed_reason']
    list_select_related = ['case', 'case__police_station', 'reason']
    fields = [('case', 'date'), ('direction', 'reason'), 'details', ('created_at', 'updated_at')]
    ordering = ['-date']
    date_hierarchy = 'date'

    # Computed columns

    @admin.display(description=_('movement reason'), empty_value="???", ordering='reason__reason')
    def computed_reason(self, movement):
        anchor_url = (
            reverse('admin:case_movements_movementreason_changelist')
            + '?'
            + urlencode({
                'id': movement.reason.id,
            })
        )
        anchor_title = movement.reason.reason
        return format_html(f'<a href="{anchor_url}">{anchor_title}</a>') if anchor_title else ''

# ------------------------------
# Movement Reasons
# ------------------------------

@admin.register(MovementReason)
class MovementReasonAdmin(myadmin.DefaultModelAdmin):
    list_display = ['direction', 'reason']
    fields = [('direction', 'reason',), ('created_at', 'updated_at')]
    readonly_fields_in_edit = ('created_at', 'updated_at')
    exclude_in_add = ('created_at', 'updated_at')
    ordering = ['direction']

下面是我自定义的DefaultModelAdmin:

class DefaultModelAdmin(admin.ModelAdmin):
    # Displaying fields
    list_display = ('__str__',)

    # Select related table in the sql query
    list_select_related = False

    # Editable fields records
    list_editable = ()

    ''' Exclude fields from the form in add & edit pages.
        This will override readonly_fields '''
    exclude = ()

    ''' Exclude fields from the form in add page
        This will override readonly_fields '''
    exclude_in_add = ()  # ['created_at', 'updated_at',]

    ''' Exclude fields from the form in edit page
        This will override readonly_fields '''
    exclude_in_edit = ()

    # Grouping fields. ex: fields = [('number', 'year'), ('kind', 'police_station')]
    # if it's not declared, then it will include all fields by default.
    # if it's declared to an empty value (regardless of the type) then it won't include any field
    # fields = []

    # fields to be read-only mode in add & edit pages
    readonly_fields = ()

    # fields to be read-only mode in add page
    readonly_fields_in_add = ()

    # fields to be read-only mode in edit page
    readonly_fields_in_edit = ()  # ['created_at', 'updated_at',]

    # Control complex layout. structure
    """
        * classes:
           - A list or tuple containing extra CSS classes to
             apply to the fieldset.
        * fields:
            - A list or tuple of field names to display in this
              fieldset. This key is required.
            - As with the fields option, to display multiple fields
              on the same line, wrap those fields in their own tuple.
            - fields can contain values defined in readonly_fields
              to be displayed as read-only.
            - If you add the name of a callable to fields, the same
              rule applies as with the fields option: the callable
              must be listed in readonly_fields.
        * description:
            - A text to be displayed at the top of each fieldset,
              under the heading of the fieldset. This string is
              not rendered for TabularInline due to its layout.
            - Note that this value is not HTML-escaped when it’s
              displayed in the admin interface. This lets you
              include HTML if you so desire. Alternatively you
              can use plain text and django.utils.html.escape()
              to escape any HTML special characters.
    """
    # fieldsets = [
    #    (
    #        None,
    #        {
    #            'fields': ['field1', 'field2', 'field3', ]
    #        },
    #    ),
    #    (
    #        'Title 1',
    #        {
    #            'description': 'Some text goes here..',
    #            'classes': ['collapse'],
    #            'fields': ['field1', 'field2', 'field3', ]
    #        },
    #    ),
    # ]

    # Filtering
    # see:
    #       https://docs.djangoproject.com/en/4.2/ref/contrib/admin/#django.contrib.admin.ModelAdmin.filter_horizontal
    #       https://stackoverflow.com/questions/22968631/how-to-filter-filter-horizontal-in-django-admin
    filter_horizontal = ()
    filter_vertical = ()

    # Specify the model-form to use. ex: form = CaseForm
    # form =

    # Sorting records
    ordering = []

    # Dates
    date_hierarchy = None

    # Records per page
    list_per_page = 20

    # Page Actions
    actions_on_top = True
    actions_on_bottom = True
    actions_selection_counter = True

    # True, “Save and add another” will be replaced by a
    # “Save as new” button that creates a new object 
    # (with a new ID) rather than updating the existing object.
    save_as = False

    # Replace empty values
    empty_value_display = ''

    # Provides a quick-and-dirty way to override some Field options for use in the admin.
    # Ex: use RichTextEditorWidget for large text fields instead of the default <textarea>
    formfield_overrides = {}

    # https://docs.djangoproject.com/en/4.2/ref/contrib/admin/#django.contrib.admin.ModelAdmin.inlines
    inlines = ()

    # ------------------------------
    # Override admin`s methods
    # ------------------------------

    def get_readonly_fields(self, request, obj=None):
        """ What fields to be in readonly mode in add and/or edit pages
            * readonly_fields           available in    add and edit pages
            * readonly_fields_in_add    available in    add page
            * readonly_fields_in_edit   available in    edit page """
        readonly = list(self.readonly_fields) if self.readonly_fields else []
        if obj:  # editing an existing object
            return readonly + iterate(self.readonly_fields_in_edit, True) if self.readonly_fields_in_edit else readonly
        return readonly + iterate(self.readonly_fields_in_add, True) if self.readonly_fields_in_add else readonly

    def get_fields(self, request, obj=None):
        """ what fields to show, exclude and make readonly in add & edit pages"""
        return scan_and_remove(self.fields, self.get_exclude(request, obj))

    def get_exclude(self, request, obj=None):
        """ What fields to be excluded in add and/or edit pages
            * exclude           from    add and edit pages
            * exclude_in_add    from    add page
            * exclude_in_edit   from    edit page """
        exclude = list(self.exclude) if self.exclude else []
        if obj:  # editing an existing object
            return exclude + iterate(self.exclude_in_edit, True) if self.exclude_in_edit else exclude
        return exclude + iterate(self.exclude_in_add, True) if self.exclude_in_add else exclude

对不起,长职位。我只想把一切都告诉你,让你明白我的错误在哪里。
提前感谢任何帮助。

hvvq6cgz

hvvq6cgz1#

我有点困惑你想用

queryset=Movement.objects.filter(case__id=models.F('case__id')).order_by('-date')

因为Django在预取时已经过滤了结果。预取作为第二个查询进行解析,这就是为什么过滤器不引用Case表的列,而是引用检索到的所有Case示例的列表

"cm_movement"."case_id" IN ('41', '35', '29', '26', '44', '40', '34', '39', '32', '38', '31', '33', '30', '28', '27', '25', '43', '37', '36', '42'))

在我看来,你试图实现的不是预取的作用。通常,预取的目的是解决迭代查询集和访问相关条目以进行显示或序列化的问题。您将无法使用预取的结果来对原始结果集进行排序。
如果您想按最后一个移动对事例进行排序,则应考虑使用子查询

subquery = Movement.objects.all()
subquery = subquery.filter(case_id=OuterRef("pk"))  # Retreiving all movement realted to the current case in the sql query
subquery = subquery.order_by("-date")
subquery = subquery.valeus("date")  # field you want to retrieve from the subquery

qs = Case.objects.all()
qs = qs.annotate(last_movement_date=Subquery(subquery[:1])  # Computing the last movement date for each case
qs = qs.order_by("-last_movement_date")

您不应该在CaseManager中执行此操作,而应该在admin类的.get_queryset()方法中执行此操作

相关问题