lib.itmens/journal/models/rating.py

138 lines
4.8 KiB
Python

from datetime import datetime
from django.core.validators import MaxValueValidator, MinValueValidator, RegexValidator
from django.db import connection, models
from django.db.models import Avg, Count, Q
from django.utils.translation import gettext_lazy as _
from catalog.models import Item, ItemCategory
from users.models import APIdentity
from .common import Content
MIN_RATING_COUNT = 5
RATING_INCLUDES_CHILD_ITEMS = ["tvshow", "performance"]
class Rating(Content):
class Meta:
unique_together = [["owner", "item"]]
grade = models.PositiveSmallIntegerField(
default=0, validators=[MaxValueValidator(10), MinValueValidator(1)], null=True
)
@property
def ap_object(self):
return {
"id": self.absolute_url,
"type": "Rating",
"best": 10,
"worst": 1,
"value": self.grade,
"published": self.created_time.isoformat(),
"updated": self.edited_time.isoformat(),
"attributedTo": self.owner.actor_uri,
"relatedWith": self.item.absolute_url,
"url": self.absolute_url,
}
@classmethod
def update_by_ap_object(cls, owner, item, obj, post_id, visibility):
value = obj.get("value", 0) if obj else 0
if not value:
cls.objects.filter(owner=owner, item=item).delete()
return
best = obj.get("best", 5)
worst = obj.get("worst", 1)
if best <= worst:
return
if value < worst:
value = worst
if value > best:
value = best
if best != 10 or worst != 1:
value = round(9 * (value - worst) / (best - worst)) + 1
else:
value = round(value)
d = {
"grade": value,
"local": False,
"remote_id": obj["id"],
"visibility": visibility,
"created_time": datetime.fromisoformat(obj["published"]),
"edited_time": datetime.fromisoformat(obj["updated"]),
}
p, _ = cls.objects.update_or_create(owner=owner, item=item, defaults=d)
p.link_post_id(post_id)
return p
@staticmethod
def get_rating_for_item(item: Item) -> float | None:
stat = Rating.objects.filter(grade__isnull=False)
if item.class_name in RATING_INCLUDES_CHILD_ITEMS:
stat = stat.filter(item_id__in=item.child_item_ids + [item.id])
else:
stat = stat.filter(item=item)
stat = stat.aggregate(average=Avg("grade"), count=Count("item"))
return round(stat["average"], 1) if stat["count"] >= MIN_RATING_COUNT else None
@staticmethod
def get_rating_count_for_item(item: Item) -> int:
stat = Rating.objects.filter(grade__isnull=False)
if item.class_name in RATING_INCLUDES_CHILD_ITEMS:
stat = stat.filter(item_id__in=item.child_item_ids + [item.id])
else:
stat = stat.filter(item=item)
stat = stat.aggregate(count=Count("item"))
return stat["count"]
@staticmethod
def get_rating_distribution_for_item(item: Item):
stat = Rating.objects.filter(grade__isnull=False)
if item.class_name in RATING_INCLUDES_CHILD_ITEMS:
stat = stat.filter(item_id__in=item.child_item_ids + [item.id])
else:
stat = stat.filter(item=item)
stat = stat.values("grade").annotate(count=Count("grade")).order_by("grade")
g = [0] * 11
t = 0
for s in stat:
g[s["grade"]] = s["count"]
t += s["count"]
if t < MIN_RATING_COUNT:
return [0] * 5
r = [
100 * (g[1] + g[2]) // t,
100 * (g[3] + g[4]) // t,
100 * (g[5] + g[6]) // t,
100 * (g[7] + g[8]) // t,
100 * (g[9] + g[10]) // t,
]
return r
@staticmethod
def update_item_rating(
item: Item, owner: APIdentity, rating_grade: int | None, visibility: int = 0
):
if rating_grade and (rating_grade < 1 or rating_grade > 10):
raise ValueError(f"Invalid rating grade: {rating_grade}")
rating = Rating.objects.filter(owner=owner, item=item).first()
if not rating_grade:
if rating:
rating.delete()
rating = None
elif rating is None:
rating = Rating.objects.create(
owner=owner, item=item, grade=rating_grade, visibility=visibility
)
elif rating.grade != rating_grade or rating.visibility != visibility:
rating.visibility = visibility
rating.grade = rating_grade
rating.save()
return rating
@staticmethod
def get_item_rating(item: Item, owner: APIdentity) -> int | None:
rating = Rating.objects.filter(owner=owner, item=item).first()
return (rating.grade or None) if rating else None