use rq for cron tasks

This commit is contained in:
Your Name 2023-10-21 05:41:38 +00:00 committed by Henri Dickson
parent dc5e7fc377
commit cd8b46c9c5
12 changed files with 297 additions and 56 deletions

View file

@ -99,7 +99,7 @@ RQ_QUEUES = {
"DB": _parsed_redis_url.path[1:],
"DEFAULT_TIMEOUT": -1,
}
for q in ["mastodon", "export", "import", "fetch", "crawl", "ap"]
for q in ["mastodon", "export", "import", "fetch", "crawl", "ap", "cron"]
}
_parsed_search_url = env.url("NEODB_SEARCH_URL")

2
catalog/jobs/__init__.py Normal file
View file

@ -0,0 +1,2 @@
from .discover import DiscoverGenerator
from .podcast import PodcastUpdater

95
catalog/jobs/discover.py Normal file
View file

@ -0,0 +1,95 @@
from datetime import timedelta
from django.core.cache import cache
from django.db.models import Count, F
from django.utils import timezone
from loguru import logger
from catalog.models import *
from common.models import BaseJob, JobManager
from journal.models import Comment, ShelfMember, q_item_in_category
MAX_ITEMS_PER_PERIOD = 12
MIN_MARKS = 1
MAX_DAYS_FOR_PERIOD = 96
MIN_DAYS_FOR_PERIOD = 6
@JobManager.register
class DiscoverGenerator(BaseJob):
interval = timedelta(hours=3)
def get_popular_marked_item_ids(self, category, days, exisiting_ids):
item_ids = [
m["item_id"]
for m in ShelfMember.objects.filter(q_item_in_category(category))
.filter(created_time__gt=timezone.now() - timedelta(days=days))
.exclude(item_id__in=exisiting_ids)
.values("item_id")
.annotate(num=Count("item_id"))
.filter(num__gte=MIN_MARKS)
.order_by("-num")[:MAX_ITEMS_PER_PERIOD]
]
return item_ids
def get_popular_commented_podcast_ids(self, days, exisiting_ids):
return list(
Comment.objects.filter(q_item_in_category(ItemCategory.Podcast))
.filter(created_time__gt=timezone.now() - timedelta(days=days))
.annotate(p=F("item__podcastepisode__program"))
.filter(p__isnull=False)
.exclude(p__in=exisiting_ids)
.values("p")
.annotate(num=Count("p"))
.filter(num__gte=MIN_MARKS)
.order_by("-num")
.values_list("p", flat=True)[:MAX_ITEMS_PER_PERIOD]
)
def cleanup_shows(self, items):
seasons = [i for i in items if i.__class__ == TVSeason]
for season in seasons:
if season.show in items:
items.remove(season.show)
return items
def run(self):
logger.info("Discover data update start.")
cache_key = "public_gallery"
gallery_categories = [
ItemCategory.Book,
ItemCategory.Movie,
ItemCategory.TV,
ItemCategory.Game,
ItemCategory.Music,
ItemCategory.Podcast,
]
gallery_list = []
for category in gallery_categories:
days = MAX_DAYS_FOR_PERIOD
item_ids = []
while days >= MIN_DAYS_FOR_PERIOD:
ids = self.get_popular_marked_item_ids(category, days, item_ids)
logger.info(f"Most marked {category} in last {days} days: {len(ids)}")
item_ids = ids + item_ids
days //= 2
if category == ItemCategory.Podcast:
days = MAX_DAYS_FOR_PERIOD // 4
extra_ids = self.get_popular_commented_podcast_ids(days, item_ids)
logger.info(
f"Most commented podcast in last {days} days: {len(extra_ids)}"
)
item_ids = extra_ids + item_ids
items = [Item.objects.get(pk=i) for i in item_ids]
if category == ItemCategory.TV:
items = self.cleanup_shows(items)
gallery_list.append(
{
"name": "popular_" + category.value,
"title": ""
+ (category.label if category != ItemCategory.Book else "图书"),
"items": items,
}
)
cache.set(cache_key, gallery_list, timeout=None)
logger.info("Discover data updated.")

35
catalog/jobs/podcast.py Normal file
View file

@ -0,0 +1,35 @@
import pprint
from datetime import timedelta
from time import sleep
from loguru import logger
from catalog.common.models import IdType
from catalog.models import Podcast
from catalog.sites import RSS
from common.models import BaseJob, JobManager
@JobManager.register
class PodcastUpdater(BaseJob):
interval = timedelta(hours=2)
def run(self):
logger.info("Podcasts update start.")
count = 0
qs = Podcast.objects.filter(
is_deleted=False, merged_to_item__isnull=True
).order_by("pk")
for p in qs:
if (
p.primary_lookup_id_type == IdType.RSS
and p.primary_lookup_id_value is not None
):
logger.info(f"updating {p}")
c = p.episodes.count()
site = RSS(p.feed_url)
site.scrape_additional_data()
c2 = p.episodes.count()
logger.info(f"updated {p}, {c2-c} new episodes.")
count += c2 - c
logger.info(f"Podcasts update finished, {count} new episodes total.")

View file

@ -0,0 +1,42 @@
from django.core.management.base import BaseCommand
from loguru import logger
from catalog.jobs import * # noqa
from common.models import JobManager
class Command(BaseCommand):
help = "Schedule timed jobs"
def add_arguments(self, parser):
parser.add_argument(
"--cancel",
action="store_true",
)
parser.add_argument(
"--schedule",
action="store_true",
)
parser.add_argument(
"--list",
action="store_true",
)
parser.add_argument(
"--runonce",
action="append",
)
def handle(self, *args, **options):
if options["cancel"]:
JobManager.cancel()
if options["schedule"]:
JobManager.cancel() # cancel previously scheduled jobs if any
JobManager.schedule()
if options["runonce"]:
for job_id in options["runonce"]:
run = JobManager.run(job_id)
if not run:
logger.error(f"Job not found: {job_id}")
if options["list"]:
jobs = JobManager.get_scheduled_job_ids()
logger.info(f"{len(jobs)} scheduled jobs: {jobs}")

View file

@ -1,20 +0,0 @@
import pprint
from django.core.management.base import BaseCommand
from redis import Redis
from rq import Queue
from rq.job import Job
class Command(BaseCommand):
help = "Delete a job"
def add_arguments(self, parser):
parser.add_argument("job_id", type=str, help="Job ID")
def handle(self, *args, **options):
redis = Redis()
job_id = str(options["job_id"])
job = Job.fetch(job_id, connection=redis)
job.delete()
self.stdout.write(self.style.SUCCESS(f"Deleted {job}"))

View file

@ -0,0 +1,45 @@
import pprint
import django_rq
from django.conf import settings
from django.core.management.base import BaseCommand
from redis import Redis
from rq import Queue
from rq.job import Job
class Command(BaseCommand):
help = "Show jobs in queue"
def add_arguments(self, parser):
parser.add_argument("--delete", action="append")
parser.add_argument("--list", action="store_true")
def handle(self, *args, **options):
if options["delete"]:
for job_id in options["delete"]:
job = Job.fetch(job_id, connection=django_rq.get_connection("fetch"))
job.delete()
self.stdout.write(self.style.SUCCESS(f"Deleted {job}"))
if options["list"]:
queues = settings.RQ_QUEUES.keys()
for q in queues:
queue = django_rq.get_queue(q)
for registry in [
queue.scheduled_job_registry,
queue.started_job_registry,
queue.deferred_job_registry,
queue.finished_job_registry,
queue.failed_job_registry,
queue.canceled_job_registry,
]:
for job_id in registry.get_job_ids():
try:
job = Job.fetch(
job_id, connection=django_rq.get_connection(q)
)
self.stdout.write(
self.style.SUCCESS(f"{registry.key} {repr(job)}")
)
except Exception as e:
print(f"Error fetching {registry.key} {job_id}")

View file

@ -1,31 +0,0 @@
import pprint
from django.core.management.base import BaseCommand
from redis import Redis
from rq import Queue
from rq.job import Job
class Command(BaseCommand):
help = "Show jobs in queue"
def add_arguments(self, parser):
parser.add_argument("queue", type=str, help="Queue")
def handle(self, *args, **options):
redis = Redis()
queue = Queue(str(options["queue"]), connection=redis)
for registry in [
queue.started_job_registry,
queue.deferred_job_registry,
queue.finished_job_registry,
queue.failed_job_registry,
queue.scheduled_job_registry,
]:
self.stdout.write(self.style.SUCCESS(f"Registry {registry}"))
for job_id in registry.get_job_ids():
try:
job = Job.fetch(job_id, connection=redis)
pprint.pp(job)
except Exception as e:
print(f"Error fetching {job_id}")

View file

@ -0,0 +1,72 @@
from datetime import timedelta
import django_rq
from loguru import logger
from rq.job import Job
from rq.registry import ScheduledJobRegistry
class BaseJob:
interval = timedelta(seconds=1)
@classmethod
def cancel(cls):
job_id = cls.__name__
try:
job = Job.fetch(id=job_id, connection=django_rq.get_connection("cron"))
if job.get_status() in ["queued", "scheduled"]:
logger.info(f"Cancel queued job: {job_id}")
job.cancel()
registry = ScheduledJobRegistry(queue=django_rq.get_queue("cron"))
registry.remove(job)
except:
pass
@classmethod
def schedule(cls):
job_id = cls.__name__
logger.info(f"Scheduling job: {job_id}")
django_rq.get_queue("cron").enqueue_in(
cls.interval, cls._run, job_id=job_id, result_ttl=0, failure_ttl=0
)
@classmethod
def _run(cls):
cls.schedule() # schedule next run
cls().run()
def run(self):
pass
class JobManager:
registry = set()
@classmethod
def register(cls, target):
cls.registry.add(target)
return target
@classmethod
def schedule(cls):
for j in cls.registry:
j.schedule()
@classmethod
def cancel(cls):
for j in cls.registry:
j.cancel()
@classmethod
def run(cls, job_id):
for j in cls.registry:
if j.__name__ == job_id:
logger.info(f"Run job: {job_id}")
j().run()
return True
return False
@classmethod
def get_scheduled_job_ids(cls):
registry = ScheduledJobRegistry(queue=django_rq.get_queue("cron"))
return registry.get_job_ids()

View file

@ -166,14 +166,14 @@ services:
neodb-worker:
<<: *neodb-service
command: neodb-manage rqworker --with-scheduler import export mastodon fetch crawl ap
command: neodb-manage rqworker --with-scheduler import export mastodon fetch crawl ap cron
depends_on:
migration:
condition: service_completed_successfully
neodb-worker-extra:
<<: *neodb-service
command: neodb-manage rqworker --with-scheduler fetch crawl ap
command: neodb-manage rqworker fetch crawl ap
depends_on:
migration:
condition: service_completed_successfully
@ -231,7 +231,7 @@ services:
dev-neodb-worker:
<<: *dev-neodb-service
command: neodb-manage rqworker --with-scheduler import export mastodon fetch crawl ap
command: neodb-manage rqworker --with-scheduler import export mastodon fetch crawl ap cron
dev-takahe-web:
<<: *dev-neodb-service

View file

@ -9,5 +9,6 @@ echo NeoDB initializing...
takahe-manage migrate || exit $?
neodb-manage migrate || exit $?
neodb-manage cron --schedule || exit $?
echo NeoDB initialization complete.

View file

@ -6,4 +6,4 @@ djlint~=1.32.1
isort~=5.12.0
lxml-stubs
pre-commit
pyright==1.1.327
pyright==1.1.332