diff --git a/journal/models/collection.py b/journal/models/collection.py index f204bc56..27a386cc 100644 --- a/journal/models/collection.py +++ b/journal/models/collection.py @@ -80,6 +80,8 @@ class Collection(List): ) def save(self, *args, **kwargs): + from takahe.utils import Takahe + if getattr(self, "catalog_item", None) is None: self.catalog_item = CatalogCollection() if ( @@ -91,6 +93,21 @@ class Collection(List): self.catalog_item.cover = self.cover # type: ignore self.catalog_item.save() super().save(*args, **kwargs) + Takahe.post_collection(self) + + @property + def ap_object(self): + return { + "id": self.absolute_url, + "type": "Collection", + "name": self.title, + "content": self.brief, + "mediaType": "text/markdown", + "published": self.created_time.isoformat(), + "updated": self.edited_time.isoformat(), + "attributedTo": self.owner.actor_uri, + "href": self.absolute_url, + } class FeaturedCollection(Piece): diff --git a/takahe/ap_handlers.py b/takahe/ap_handlers.py index 3c55b229..a7cb5c07 100644 --- a/takahe/ap_handlers.py +++ b/takahe/ap_handlers.py @@ -90,8 +90,9 @@ def post_fetched(pk, obj): if not post.type_data: logger.warning(f"Post {post} has no type_data") return - items = _parse_items(post.type_data["object"]["tag"]) - pieces = _parse_piece_objects(post.type_data["object"]["relatedWith"]) + ap_object = post.type_data.get("object", {}) + items = _parse_items(ap_object.get("tag")) + pieces = _parse_piece_objects(ap_object.get("relatedWith")) logger.info(f"Post {post} has items {items} and pieces {pieces}") if len(items) == 0: logger.warning(f"Post {post} has no remote items") diff --git a/takahe/management/commands/backfill_takahe.py b/takahe/management/commands/backfill_takahe.py index 956caa3f..01c1f290 100644 --- a/takahe/management/commands/backfill_takahe.py +++ b/takahe/management/commands/backfill_takahe.py @@ -1,6 +1,7 @@ from django.conf import settings from django.contrib.contenttypes.models import ContentType from django.core.management.base import BaseCommand +from django.core.paginator import Paginator from django.db.models import Count, F from loguru import logger from tqdm import tqdm @@ -9,10 +10,15 @@ from catalog.common import * from catalog.common.models import * from catalog.models import * from journal.models import * +from takahe.models import Identity as TakaheIdentity +from takahe.models import Post as TakahePost +from takahe.models import TimelineEvent, set_disable_timeline from takahe.utils import * from users.models import APIdentity from users.models import User as NeoUser +BATCH_SIZE = 1000 + def content_type_id(cls): return ContentType.objects.get(app_label="journal", model=cls.__name__.lower()).pk @@ -28,6 +34,10 @@ class Command(BaseCommand): "--post", action="store_true", ) + parser.add_argument( + "--timeline", + action="store_true", + ) parser.add_argument( "--like", action="store_true", @@ -40,7 +50,8 @@ class Command(BaseCommand): parser.add_argument("--count", default=0, action="store") def process_post(self): - logger.info(f"Processing posts...") + logger.info(f"Generating posts...") + set_disable_timeline(True) qs = Piece.objects.filter( polymorphic_ctype__in=[ content_type_id(ShelfMember), @@ -50,17 +61,77 @@ class Command(BaseCommand): ).order_by("id") if self.starting_id: qs = qs.filter(id__gte=self.starting_id) - tracker = tqdm(qs.iterator(), total=self.count_est or qs.count()) - for p in tracker: - tracker.set_postfix_str(f"{p.id}") - if p.__class__ == ShelfMember: - mark = Mark(p.owner, p.item) - Takahe.post_mark(mark, self.post_new) - elif p.__class__ == Comment: - if p.item.__class__ in [PodcastEpisode, TVEpisode]: - Takahe.post_comment(p, self.post_new) - elif p.__class__ == Review: - Takahe.post_review(p, self.post_new) + pg = Paginator(qs, BATCH_SIZE) + tracker = tqdm(pg.page_range) + for page in tracker: + with transaction.atomic(using="default"): + with transaction.atomic(using="takahe"): + for p in pg.page(page): + tracker.set_postfix_str(f"{p.id}") + if p.__class__ == ShelfMember: + mark = Mark(p.owner, p.item) + Takahe.post_mark(mark, self.post_new) + elif p.__class__ == Comment: + if p.item.__class__ in [PodcastEpisode, TVEpisode]: + Takahe.post_comment(p, self.post_new) + elif p.__class__ == Review: + Takahe.post_review(p, self.post_new) + set_disable_timeline(False) + + def process_timeline(self): + def add_event(post_id, author_id, owner_id, published): + TimelineEvent.objects.get_or_create( + identity_id=owner_id, + type="post", + subject_post_id=post_id, + subject_identity_id=author_id, + defaults={ + "published": published, + }, + ) + + logger.info(f"Generating cache for timeline...") + followers = { + apid.pk: apid.followers if apid.is_active else [] + for apid in APIdentity.objects.filter(local=True) + } + cnt = TakahePost.objects.count() + qs = TakahePost.objects.filter(local=True).order_by("published") + pg = Paginator(qs, BATCH_SIZE) + logger.info(f"Generating timeline...") + for p in tqdm(pg.page_range): + with transaction.atomic(using="takahe"): + posts = pg.page(p) + events = [] + for post in posts: + events.append( + TimelineEvent( + identity_id=post.author_id, + type="post", + subject_post_id=post.pk, + subject_identity_id=post.author_id, + published=post.published, + ) + ) + if post.visibility != 3: + for follower_id in followers[post.author_id]: + events.append( + TimelineEvent( + identity_id=follower_id, + type="post", + subject_post_id=post.pk, + subject_identity_id=post.author_id, + published=post.published, + ) + ) + TimelineEvent.objects.bulk_create(events, ignore_conflicts=True) + # for post in posts: + # add_event(post.pk, post.author_id, post.author_id, post.published) + # if post.visibility != 3: + # for follower_id in followers[post.author_id]: + # add_event( + # post.pk, post.author_id, follower_id, post.published + # ) def process_like(self): logger.info(f"Processing likes...") @@ -82,6 +153,9 @@ class Command(BaseCommand): if options["post"]: self.process_post() + if options["timeline"]: + self.process_timeline() + if options["like"]: self.process_like() diff --git a/takahe/models.py b/takahe/models.py index 3221ad8e..9bb8d234 100644 --- a/takahe/models.py +++ b/takahe/models.py @@ -33,6 +33,14 @@ if TYPE_CHECKING: from django.db.models.manager import RelatedManager +_disable_timeline = False + + +def set_disable_timeline(disable: bool): + global _disable_timeline + _disable_timeline = disable + + class TakaheSession(models.Model): session_key = models.CharField(_("session key"), max_length=40, primary_key=True) session_data = models.TextField(_("session data")) @@ -984,6 +992,20 @@ class Post(models.Model): .first() ) + def add_to_timeline(self, owner: Identity): + """ + Creates a TimelineEvent for this post on owner's timeline + """ + return TimelineEvent.objects.update_or_create( + identity=owner, + type=TimelineEvent.Types.post, + subject_post=self, + subject_identity=self.author, + defaults={ + "published": self.published, + }, + )[0] + @classmethod def create_local( cls, @@ -1032,8 +1054,10 @@ class Post(models.Model): post.emojis.set(emojis) if published and published < timezone.now(): post.published = published - if timezone.now() - published > datetime.timedelta( - days=settings.FANOUT_LIMIT_DAYS + if ( + timezone.now() - published + > datetime.timedelta(days=settings.FANOUT_LIMIT_DAYS) + and _disable_timeline ): post.state = "fanned_out" # add post quietly if it's old # if attachments:# FIXME @@ -1047,13 +1071,8 @@ class Post(models.Model): # Recalculate parent stats for replies if reply_to: reply_to.calculate_stats() - if post.state == "fanned_out": - FanOut.objects.create( - identity=author, - type="post", - subject_post=post, - ) - + if post.state == "fanned_out" and not _disable_timeline: + post.add_to_timeline(author) return post def edit_local( @@ -1066,7 +1085,7 @@ class Post(models.Model): attachments: list | None = None, attachment_attributes: list | None = None, type_data: dict | None = None, - edited: datetime.datetime | None = None, + published: datetime.datetime | None = None, ): with transaction.atomic(): # Strip all HTML and apply linebreaks filter @@ -1100,12 +1119,8 @@ class Post(models.Model): self.state_changed = timezone.now() self.state_next_attempt = None self.state_locked_until = None - if edited and edited < timezone.now(): - self.published = edited - if timezone.now() - edited > datetime.timedelta( - days=settings.FANOUT_LIMIT_DAYS - ): - self.state = "edited_fanned_out" # add post quietly if it's old + if _disable_timeline: # NeoDB: disable fanout during migration + self.state = "edited_fanned_out" self.save() @classmethod @@ -1687,6 +1702,75 @@ class InboxMessage(models.Model): ) +class TimelineEvent(models.Model): + """ + Something that has happened to an identity that we want them to see on one + or more timelines, like posts, likes and follows. + """ + + class Types(models.TextChoices): + post = "post" + boost = "boost" # A boost from someone (post substitute) + mentioned = "mentioned" + liked = "liked" # Someone liking one of our posts + followed = "followed" + follow_requested = "follow_requested" + boosted = "boosted" # Someone boosting one of our posts + announcement = "announcement" # Server announcement + identity_created = "identity_created" # New identity created + + # The user this event is for + identity = models.ForeignKey( + Identity, + on_delete=models.CASCADE, + related_name="timeline_events", + ) + + # What type of event it is + type = models.CharField(max_length=100, choices=Types.choices) + + # The subject of the event (which is used depends on the type) + subject_post = models.ForeignKey( + Post, + on_delete=models.CASCADE, + blank=True, + null=True, + related_name="timeline_events", + ) + subject_post_interaction = models.ForeignKey( + PostInteraction, + on_delete=models.CASCADE, + blank=True, + null=True, + related_name="timeline_events", + ) + subject_identity = models.ForeignKey( + Identity, + on_delete=models.CASCADE, + blank=True, + null=True, + related_name="timeline_events_about_us", + ) + + published = models.DateTimeField(default=timezone.now) + seen = models.BooleanField(default=False) + dismissed = models.BooleanField(default=False) + + created = models.DateTimeField(auto_now_add=True) + + class Meta: + indexes = [ + # This relies on a DB that can use left subsets of indexes + models.Index( + fields=["identity", "type", "subject_post", "subject_identity"] + ), + models.Index(fields=["identity", "type", "subject_identity"]), + models.Index(fields=["identity", "created"]), + ] + # managed = False + db_table = "activities_timelineevent" + + class Config(models.Model): """ A configuration setting for either the server or a specific user or identity. diff --git a/takahe/utils.py b/takahe/utils.py index 9fdadcc1..1f462302 100644 --- a/takahe/utils.py +++ b/takahe/utils.py @@ -370,7 +370,7 @@ class Takahe: content, visibility=visibility, type_data=data, - edited=post_time, + published=post_time, ) else: post = Post.create_local( @@ -414,6 +414,49 @@ class Takahe: else: return Takahe.Visibilities.unlisted + @staticmethod + def post_collection(collection): + existing_post = collection.latest_post + user = collection.owner.user + visibility = Takahe.visibility_n2t( + collection, user.preference.mastodon_publish_public + ) + if existing_post and visibility != existing_post.visibility: + Takahe.delete_posts([existing_post]) + existing_post = None + data = { + "object": { + # "tag": [item.ap_object_ref for item in collection.items], + "relatedWith": [collection.ap_object], + } + } + if existing_post and existing_post.type_data == data: + return existing_post + action_label = "创建" + category = "收藏单" + item_link = collection.absolute_url + pre_conetent = f'{action_label}{category}《{collection.title}》
' + content = "" + data = { + "object": { + # "tag": [item.ap_object_ref for item in collection.items], + "relatedWith": [collection.ap_object], + } + } + post = Takahe.post( + collection.owner.pk, + pre_conetent, + content, + visibility, + data, + existing_post.pk if existing_post else None, + collection.created_time, + ) + if not post: + return + collection.link_post_id(post.pk) + return post + @staticmethod def post_comment(comment, share_as_new_post: bool) -> Post | None: from catalog.common import ItemCategory