fix migration issues; cron should now register in apps.py

This commit is contained in:
Her Email 2023-12-03 11:38:16 -05:00 committed by Henri Dickson
parent 0004130e98
commit b66cee3755
8 changed files with 100 additions and 72 deletions

View file

@ -11,5 +11,8 @@ class CatalogConfig(AppConfig):
from catalog.models import init_catalog_audit_log, init_catalog_search_models
from journal import models as journal_models
# register cron jobs
from catalog.jobs import DiscoverGenerator, PodcastUpdater # isort:skip
init_catalog_search_models()
init_catalog_audit_log()

View file

@ -78,10 +78,5 @@ class JobManager:
@classmethod
def reschedule_all(cls):
# TODO rewrite lazy import in a better way
from catalog.jobs import DiscoverGenerator, PodcastUpdater
from mastodon.jobs import MastodonSiteCheck
from users.jobs import MastodonUserSync
cls.cancel_all()
cls.schedule_all()

View file

@ -3,3 +3,7 @@ from django.apps import AppConfig
class MastodonConfig(AppConfig):
name = "mastodon"
def ready(self):
# register cron jobs
from mastodon.jobs import MastodonSiteCheck # isort:skip

View file

@ -1,3 +1,5 @@
import time
from contextlib import nullcontext
from datetime import datetime, timezone
from django.conf import settings
@ -10,11 +12,11 @@ from tqdm import tqdm
from catalog.common import *
from catalog.common.models import *
from catalog.models import *
from catalog.models import PodcastEpisode, TVEpisode
from journal.models import *
from takahe.models import Identity as TakaheIdentity
from takahe.models import Post as TakahePost
from takahe.models import TimelineEvent, set_disable_timeline
from takahe.models import TimelineEvent, set_migration_mode
from takahe.utils import *
from users.models import APIdentity
from users.models import User as NeoUser
@ -50,39 +52,42 @@ class Command(BaseCommand):
"--post-new",
action="store_true",
)
parser.add_argument(
"--csv",
action="store_true",
)
parser.add_argument("--start", default=0, action="store")
parser.add_argument("--count", default=0, action="store")
def process_post(self):
logger.info(f"Generating posts...")
set_disable_timeline(True)
set_migration_mode(True)
qs = Piece.objects.filter(
polymorphic_ctype__in=[
content_type_id(ShelfMember),
content_type_id(Comment),
content_type_id(Review),
content_type_id(Collection),
]
).order_by("id")
if self.starting_id:
qs = qs.filter(id__gte=self.starting_id)
pg = Paginator(qs, BATCH_SIZE)
tracker = tqdm(pg.page_range)
for page in tracker:
with transaction.atomic(using="default"):
with transaction.atomic(using="takahe"):
for p in pg.page(page):
tracker.set_postfix_str(f"{p.id}")
if p.__class__ == ShelfMember:
mark = Mark(p.owner, p.item)
Takahe.post_mark(mark, self.post_new)
elif p.__class__ == Comment:
if p.item.__class__ in [PodcastEpisode, TVEpisode]:
Takahe.post_comment(p, self.post_new)
elif p.__class__ == Review:
Takahe.post_review(p, self.post_new)
elif p.__class__ == Collection:
Takahe.post_collection(p)
set_disable_timeline(False)
with transaction.atomic(using="default"):
with transaction.atomic(using="takahe"):
tracker = tqdm(qs.iterator(), total=self.count_est or qs.count())
for p in tracker:
tracker.set_postfix_str(f"{p.id}")
if p.__class__ == ShelfMember:
mark = Mark(p.owner, p.item)
Takahe.post_mark(mark, self.post_new)
elif p.__class__ == Comment:
if p.item.__class__ in [PodcastEpisode, TVEpisode]:
Takahe.post_comment(p, self.post_new)
elif p.__class__ == Review:
Takahe.post_review(p, self.post_new)
elif p.__class__ == Collection:
Takahe.post_collection(p)
set_migration_mode(False)
def process_timeline(self):
def add_event(post_id, author_id, owner_id, published):
@ -96,48 +101,63 @@ class Command(BaseCommand):
},
)
logger.info(f"Generating cache for timeline...")
logger.info(f"Generating identity cache for timeline...")
followers = {
apid.pk: apid.followers if apid.is_active else []
for apid in APIdentity.objects.filter(local=True)
}
cnt = TakahePost.objects.count()
qs = TakahePost.objects.filter(local=True).order_by("published")
pg = Paginator(qs, BATCH_SIZE)
cnt = qs.count()
# pg = Paginator(qs, BATCH_SIZE)
logger.info(f"Generating timeline...")
for p in tqdm(pg.page_range):
with transaction.atomic(using="takahe"):
posts = pg.page(p)
events = []
for post in posts:
events.append(
TimelineEvent(
identity_id=post.author_id,
type="post",
subject_post_id=post.pk,
subject_identity_id=post.author_id,
published=post.published,
)
csv = ""
# for p in tqdm(pg.page_range):
# with nullcontext() if self.csv else transaction.atomic(using="takahe"):
# posts = pg.page(p)
events = []
for post in tqdm(qs.iterator(), total=cnt):
if self.csv:
csv += f"post,{post.author_id},{post.pk},{post.author_id},{post.published},{post.created},false,false\n"
else:
events.append(
TimelineEvent(
identity_id=post.author_id,
type="post",
subject_post_id=post.pk,
subject_identity_id=post.author_id,
published=post.published,
)
if post.visibility != 3 and post.published > TIMELINE_START:
for follower_id in followers[post.author_id]:
events.append(
TimelineEvent(
identity_id=follower_id,
type="post",
subject_post_id=post.pk,
subject_identity_id=post.author_id,
published=post.published,
)
)
if post.visibility != 3 and post.published > TIMELINE_START:
for follower_id in followers[post.author_id]:
if self.csv:
csv += f"post,{follower_id},{post.pk},{post.author_id},{post.published},{post.created},false,false\n"
else:
events.append(
TimelineEvent(
identity_id=follower_id,
type="post",
subject_post_id=post.pk,
subject_identity_id=post.author_id,
published=post.published,
)
TimelineEvent.objects.bulk_create(events, ignore_conflicts=True)
# for post in posts:
# add_event(post.pk, post.author_id, post.author_id, post.published)
# if post.visibility != 3:
# for follower_id in followers[post.author_id]:
# add_event(
# post.pk, post.author_id, follower_id, post.published
# )
)
if not self.csv:
TimelineEvent.objects.bulk_create(events, ignore_conflicts=True)
# for post in posts:
# add_event(post.pk, post.author_id, post.author_id, post.published)
# if post.visibility != 3:
# for follower_id in followers[post.author_id]:
# add_event(
# post.pk, post.author_id, follower_id, post.published
# )
if self.csv:
logger.info(f"Writing timeline.csv...")
with open(settings.MEDIA_ROOT + "/timeline.csv", "w") as csvfile:
csvfile.write(
"type,identity_id,subject_post_id,subject_identity_id,published,created,seen,dismissed\n"
)
csvfile.write(csv)
def process_like(self):
logger.info(f"Processing likes...")
@ -148,11 +168,12 @@ class Command(BaseCommand):
if post_id:
Takahe.like_post(post_id, like.owner.pk)
else:
logger.warning(f"Post not found for like {like.id}")
logger.warning(f"Post not found for {like.target.owner}:{like.target}")
def handle(self, *args, **options):
self.verbose = options["verbose"]
self.post_new = options["post_new"]
self.csv = options["csv"]
self.starting_id = int(options["start"])
self.count_est = int(options["count"])

View file

@ -33,12 +33,12 @@ if TYPE_CHECKING:
from django.db.models.manager import RelatedManager
_disable_timeline = False
_migration_mode = False
def set_disable_timeline(disable: bool):
global _disable_timeline
_disable_timeline = disable
def set_migration_mode(disable: bool):
global _migration_mode
_migration_mode = disable
class TakaheSession(models.Model):
@ -1035,14 +1035,16 @@ class Post(models.Model):
) -> "Post":
with transaction.atomic():
# Find mentions in this post
mentions = cls.mentions_from_content(content, author)
mentions = (
set() if _migration_mode else cls.mentions_from_content(content, author)
)
if reply_to:
mentions.add(reply_to.author)
# Maintain local-only for replies
if reply_to.visibility == reply_to.Visibilities.local_only:
visibility = reply_to.Visibilities.local_only
# Find emoji in this post
emojis = Emoji.emojis_from_content(content, None)
emojis = [] if _migration_mode else Emoji.emojis_from_content(content, None)
# Strip all unwanted HTML and apply linebreaks filter, grabbing hashtags on the way
parser = FediverseHtmlParser(linebreaks_filter(content), find_hashtags=True)
content = parser.html.replace("<p>", "<p>" + raw_prepend_content, 1)
@ -1070,7 +1072,7 @@ class Post(models.Model):
if (
timezone.now() - published
> datetime.timedelta(days=settings.FANOUT_LIMIT_DAYS)
and _disable_timeline
and _migration_mode
):
post.state = "fanned_out" # add post quietly if it's old
# if attachments:# FIXME
@ -1084,7 +1086,7 @@ class Post(models.Model):
# Recalculate parent stats for replies
if reply_to:
reply_to.calculate_stats()
if post.state == "fanned_out" and not _disable_timeline:
if post.state == "fanned_out" and not _migration_mode:
post.add_to_timeline(author)
return post
@ -1132,7 +1134,7 @@ class Post(models.Model):
self.state_changed = timezone.now()
self.state_next_attempt = None
self.state_locked_until = None
if _disable_timeline: # NeoDB: disable fanout during migration
if _migration_mode: # NeoDB: disable fanout during migration
self.state = "edited_fanned_out"
self.save()

View file

@ -79,7 +79,7 @@ class Takahe:
handler = "@" + u.username
if not user:
logger.info(f"Creating takahe user {u}")
user = User.objects.create(pk=u.pk, email=handler)
user = User.objects.create(pk=u.pk, email=handler, password=u.password)
else:
if user.email != handler:
logger.warning(f"Updating takahe user {u} email to {handler}")

View file

@ -6,3 +6,6 @@ class UsersConfig(AppConfig):
def ready(self):
from . import api
# register cron jobs
from users.jobs import MastodonUserSync # isort:skip

View file

@ -91,7 +91,7 @@ def init_identity(apps, schema_editor):
else user.mastodon_last_reachable + timedelta(days=90),
)
takahe_user = TakaheUser.objects.create(
pk=user.pk, email=handler, admin=user.is_superuser
pk=user.pk, email=handler, admin=user.is_superuser, password=user.password
)
takahe_identity = TakaheIdentity.objects.create(
pk=user.pk,