add cli tools for tasks
This commit is contained in:
parent
49effa9905
commit
c5434b44eb
12 changed files with 550 additions and 343 deletions
|
@ -483,6 +483,10 @@ class Migration(migrations.Migration):
|
|||
("apple_podcast", "Apple Podcast"),
|
||||
("apple_music", "Apple Music"),
|
||||
("fedi", "Fediverse"),
|
||||
("qidian", "Qidian"),
|
||||
("ypshuo", "Ypshuo"),
|
||||
("ao3", "Archive of Our Own"),
|
||||
("jjwxc", "JinJiang"),
|
||||
],
|
||||
max_length=50,
|
||||
verbose_name="IdType of the source site",
|
||||
|
@ -582,6 +586,10 @@ class Migration(migrations.Migration):
|
|||
("apple_podcast", "Apple Podcast"),
|
||||
("apple_music", "Apple Music"),
|
||||
("fedi", "Fediverse"),
|
||||
("qidian", "Qidian"),
|
||||
("ypshuo", "Ypshuo"),
|
||||
("ao3", "Archive of Our Own"),
|
||||
("jjwxc", "JinJiang"),
|
||||
],
|
||||
max_length=50,
|
||||
verbose_name="source site",
|
||||
|
|
|
@ -9,12 +9,13 @@ from rq.job import Job
|
|||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Show jobs in queue"
|
||||
help = "Manage jobs in RQ"
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument("--retry", action="append")
|
||||
parser.add_argument("--delete", action="append")
|
||||
parser.add_argument("--list", action="store_true")
|
||||
parser.add_argument("--prune", action="store_true")
|
||||
|
||||
def handle(self, *args, **options):
|
||||
if options["retry"]:
|
||||
|
@ -30,7 +31,7 @@ class Command(BaseCommand):
|
|||
job = Job.fetch(job_id, connection=django_rq.get_connection("fetch"))
|
||||
job.delete()
|
||||
self.stdout.write(self.style.SUCCESS(f"Deleted {job}"))
|
||||
if options["list"]:
|
||||
if options["list"] or options["prune"]:
|
||||
queues = settings.RQ_QUEUES.keys()
|
||||
for q in queues:
|
||||
queue = django_rq.get_queue(q)
|
||||
|
@ -42,13 +43,39 @@ class Command(BaseCommand):
|
|||
queue.failed_job_registry,
|
||||
queue.canceled_job_registry,
|
||||
]:
|
||||
if options["prune"]:
|
||||
registry.cleanup()
|
||||
for job_id in registry.get_job_ids():
|
||||
try:
|
||||
job = Job.fetch(
|
||||
job_id, connection=django_rq.get_connection(q)
|
||||
)
|
||||
if (
|
||||
options["prune"]
|
||||
and q != "cron"
|
||||
and job.get_status()
|
||||
in [
|
||||
"finished",
|
||||
"failed",
|
||||
"canceled",
|
||||
]
|
||||
):
|
||||
job.delete()
|
||||
if options["list"]:
|
||||
self.stdout.write(
|
||||
self.style.SUCCESS(f"{registry.key} {repr(job)}")
|
||||
registry.key.ljust(20)
|
||||
+ str(job.get_status()).ljust(10)
|
||||
+ job_id.ljust(40)
|
||||
+ (
|
||||
job.enqueued_at.strftime("%Y-%m-%d %H:%M:%S")
|
||||
if job.enqueued_at
|
||||
else ""
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Fetching {registry.key} {job_id} error: {e}")
|
||||
self.stdout.write(
|
||||
registry.key.ljust(20)
|
||||
+ "error".ljust(10)
|
||||
+ job_id.ljust(40)
|
||||
+ str(e)
|
||||
)
|
||||
|
|
3
journal/exporters/__init__.py
Normal file
3
journal/exporters/__init__.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
from .doufen import DoufenExporter
|
||||
|
||||
__all__ = ["DoufenExporter"]
|
|
@ -1,12 +1,12 @@
|
|||
import os
|
||||
from datetime import datetime
|
||||
|
||||
from django.conf import settings
|
||||
from openpyxl import Workbook
|
||||
|
||||
from catalog.models import *
|
||||
from catalog.models import IdType, ItemCategory, TVEpisode
|
||||
from common.utils import GenerateDateUUIDMediaFilePath
|
||||
from journal.models import *
|
||||
from journal.models import Review, ShelfType, q_item_in_category
|
||||
from users.models import Task
|
||||
|
||||
|
||||
def _get_source_url(item):
|
||||
|
@ -28,9 +28,22 @@ def _get_source_url(item):
|
|||
return res.url if res else ""
|
||||
|
||||
|
||||
def export_marks_task(user):
|
||||
user.preference.export_status["marks_pending"] = True
|
||||
user.preference.save(update_fields=["export_status"])
|
||||
# def export_marks_task(user):
|
||||
# user.preference.export_status["marks_pending"] = True
|
||||
# user.preference.save(update_fields=["export_status"])
|
||||
class DoufenExporter(Task):
|
||||
class Meta:
|
||||
app_label = "journal" # workaround bug in TypedModel
|
||||
|
||||
TaskQueue = "export"
|
||||
DefaultMetadata = {
|
||||
"file": None,
|
||||
"total": 0,
|
||||
}
|
||||
|
||||
def run(self):
|
||||
user = self.user
|
||||
|
||||
filename = GenerateDateUUIDMediaFilePath(
|
||||
"f.xlsx", settings.MEDIA_ROOT + "/" + settings.EXPORT_FILE_PATH_ROOT
|
||||
)
|
||||
|
@ -57,7 +70,9 @@ def export_marks_task(user):
|
|||
]:
|
||||
ws = wb.create_sheet(title=label)
|
||||
shelf = user.shelf_manager.get_shelf(status)
|
||||
q = q_item_in_category(ItemCategory.Movie) | q_item_in_category(ItemCategory.TV)
|
||||
q = q_item_in_category(ItemCategory.Movie) | q_item_in_category(
|
||||
ItemCategory.TV
|
||||
)
|
||||
marks = shelf.members.all().filter(q).order_by("created_time")
|
||||
ws.append(heading)
|
||||
for mm in marks:
|
||||
|
@ -201,7 +216,11 @@ def export_marks_task(user):
|
|||
+ " / "
|
||||
+ ",".join(game.platform or [])
|
||||
+ " / "
|
||||
+ (game.release_date.strftime("%Y-%m-%d") if game.release_date else "")
|
||||
+ (
|
||||
game.release_date.strftime("%Y-%m-%d")
|
||||
if game.release_date
|
||||
else ""
|
||||
)
|
||||
)
|
||||
tags = ",".join(mark.tags)
|
||||
world_rating = (game.rating / 2) if game.rating else None
|
||||
|
@ -291,7 +310,9 @@ def export_marks_task(user):
|
|||
target = "《" + review.item.title + "》"
|
||||
url = review.absolute_url
|
||||
timestamp = review.created_time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
my_rating = None # (mark.rating_grade / 2) if mark.rating_grade else None
|
||||
my_rating = (
|
||||
None # (mark.rating_grade / 2) if mark.rating_grade else None
|
||||
)
|
||||
content = review.body
|
||||
target_source_url = _get_source_url(review.item)
|
||||
target_url = review.item.absolute_url
|
||||
|
@ -309,9 +330,12 @@ def export_marks_task(user):
|
|||
ws.append(line)
|
||||
|
||||
wb.save(filename=filename)
|
||||
user.preference.export_status["marks_pending"] = False
|
||||
user.preference.export_status["marks_file"] = filename
|
||||
user.preference.export_status["marks_date"] = datetime.now().strftime(
|
||||
"%Y-%m-%d %H:%M"
|
||||
)
|
||||
user.preference.save(update_fields=["export_status"])
|
||||
self.metadata["file"] = filename
|
||||
self.message = "Export complete."
|
||||
self.save()
|
||||
# user.preference.export_status["marks_pending"] = False
|
||||
# user.preference.export_status["marks_file"] = filename
|
||||
# user.preference.export_status["marks_date"] = datetime.now().strftime(
|
||||
# "%Y-%m-%d %H:%M"
|
||||
# )
|
||||
# user.preference.save(update_fields=["export_status"])
|
||||
|
|
6
journal/importers/__init__.py
Normal file
6
journal/importers/__init__.py
Normal file
|
@ -0,0 +1,6 @@
|
|||
from .douban import DoubanImporter
|
||||
from .goodreads import GoodreadsImporter
|
||||
from .letterboxd import LetterboxdImporter
|
||||
from .opml import OPMLImporter
|
||||
|
||||
__all__ = ["LetterboxdImporter", "OPMLImporter", "DoubanImporter", "GoodreadsImporter"]
|
|
@ -18,8 +18,10 @@ _tz_sh = pytz.timezone("Asia/Shanghai")
|
|||
|
||||
|
||||
class LetterboxdImporter(Task):
|
||||
class Meta:
|
||||
app_label = "journal" # workaround bug in TypedModel
|
||||
|
||||
TaskQueue = "import"
|
||||
TaskType = "import.letterboxd"
|
||||
DefaultMetadata = {
|
||||
"total": 0,
|
||||
"processed": 0,
|
||||
|
@ -31,9 +33,6 @@ class LetterboxdImporter(Task):
|
|||
"file": None,
|
||||
}
|
||||
|
||||
class Meta:
|
||||
proxy = True
|
||||
|
||||
def get_item_by_url(self, url):
|
||||
try:
|
||||
h = BasicDownloader(url).download().html()
|
||||
|
|
24
journal/migrations/0004_tasks.py
Normal file
24
journal/migrations/0004_tasks.py
Normal file
|
@ -0,0 +1,24 @@
|
|||
# Generated by Django 4.2.17 on 2024-12-26 06:33
|
||||
|
||||
from django.db import migrations
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("users", "0006_alter_task_type"),
|
||||
("journal", "0003_note_progress"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name="DoufenExporter",
|
||||
fields=[],
|
||||
options={
|
||||
"proxy": True,
|
||||
"indexes": [],
|
||||
"constraints": [],
|
||||
},
|
||||
bases=("users.task",),
|
||||
),
|
||||
]
|
67
users/management/commands/tasks.py
Normal file
67
users/management/commands/tasks.py
Normal file
|
@ -0,0 +1,67 @@
|
|||
import pprint
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.management.base import BaseCommand
|
||||
from httpx import delete
|
||||
from tqdm import tqdm
|
||||
|
||||
from users.models import Task
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Manage tasks"
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument("--id", action="append")
|
||||
parser.add_argument("--user", action="append")
|
||||
parser.add_argument("--type", action="append")
|
||||
parser.add_argument("--pending", action="store_true")
|
||||
parser.add_argument("--failed", action="store_true")
|
||||
parser.add_argument("--complete", action="store_true")
|
||||
parser.add_argument("--list", action="store_true")
|
||||
parser.add_argument("--prune", action="store_true")
|
||||
parser.add_argument("--rerun", action="store_true")
|
||||
parser.add_argument("--requeue", action="store_true")
|
||||
# parser.add_argument("--set-fail", action="store_true")
|
||||
parser.add_argument("--delete", action="store_true")
|
||||
|
||||
def handle(self, *args, **options):
|
||||
tasks = Task.objects.all().order_by("id")
|
||||
states = []
|
||||
if options["pending"]:
|
||||
states += [Task.States.pending]
|
||||
if options["failed"]:
|
||||
states += [Task.States.failed]
|
||||
if options["complete"]:
|
||||
states += [Task.States.complete]
|
||||
if states:
|
||||
tasks = tasks.filter(state__in=states)
|
||||
if options["user"]:
|
||||
tasks = tasks.filter(user__username__in=options["user"])
|
||||
if options["id"]:
|
||||
tasks = tasks.filter(id__in=options["id"])
|
||||
if options["type"]:
|
||||
tasks = tasks.filter(type__in=options["type"])
|
||||
if options["list"]:
|
||||
for task in tasks.order_by("-created_time"):
|
||||
self.stdout.write(
|
||||
str(task.pk).ljust(10)
|
||||
+ str(task.type).ljust(30)
|
||||
+ str(Task.States(task.state).label).ljust(10)
|
||||
+ task.created_time.strftime("%Y-%m-%d %H:%M ")
|
||||
+ task.edited_time.strftime("%Y-%m-%d %H:%M ")
|
||||
+ str(task.user)
|
||||
)
|
||||
if options["rerun"]:
|
||||
for task in tqdm(tasks):
|
||||
task.state = Task.States.pending
|
||||
task.save(update_fields=["state"])
|
||||
Task._run(task.pk)
|
||||
if options["requeue"]:
|
||||
for task in tqdm(tasks):
|
||||
task.state = Task.States.pending
|
||||
task.save(update_fields=["state"])
|
||||
task.enqueue()
|
||||
if options["delete"]:
|
||||
for task in tqdm(tasks):
|
||||
task.delete()
|
27
users/migrations/0006_alter_task_type.py
Normal file
27
users/migrations/0006_alter_task_type.py
Normal file
|
@ -0,0 +1,27 @@
|
|||
# Generated by Django 4.2.17 on 2024-12-26 04:34
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
dependencies = [
|
||||
("users", "0005_remove_follow_owner_remove_follow_target_and_more"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name="task",
|
||||
name="type",
|
||||
field=models.CharField(
|
||||
choices=[
|
||||
("journal.doufenexporter", "doufen exporter"),
|
||||
("journal.letterboxdimporter", "letterboxd importer"),
|
||||
],
|
||||
db_index=True,
|
||||
max_length=255,
|
||||
),
|
||||
),
|
||||
migrations.RunSQL(
|
||||
sql="UPDATE users_task SET type='journal.letterboxdimporter' WHERE type='import.letterboxd'"
|
||||
),
|
||||
]
|
|
@ -3,14 +3,14 @@ from auditlog.context import set_actor
|
|||
from django.db import models
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from loguru import logger
|
||||
from typedmodels.models import TypedModel
|
||||
from user_messages import api as msg
|
||||
|
||||
from .user import User
|
||||
|
||||
|
||||
class Task(models.Model):
|
||||
class Task(TypedModel):
|
||||
TaskQueue = "default"
|
||||
TaskType = "unknown"
|
||||
DefaultMetadata = {}
|
||||
|
||||
class States(models.IntegerChoices):
|
||||
|
@ -20,7 +20,7 @@ class Task(models.Model):
|
|||
failed = 3, _("Failed") # type:ignore[reportCallIssue]
|
||||
|
||||
user = models.ForeignKey(User, models.CASCADE, null=False)
|
||||
type = models.CharField(max_length=20, null=False)
|
||||
# type = models.CharField(max_length=20, null=False)
|
||||
state = models.IntegerField(choices=States.choices, default=States.pending)
|
||||
metadata = models.JSONField(null=False, default=dict)
|
||||
message = models.TextField(default="")
|
||||
|
@ -34,50 +34,60 @@ class Task(models.Model):
|
|||
def job_id(self):
|
||||
if not self.pk:
|
||||
raise ValueError("task not saved yet")
|
||||
return f"{self.type}-{self.user}-{self.pk}"
|
||||
return f"{self.type}-{self.pk}"
|
||||
|
||||
def __str__(self):
|
||||
return self.job_id
|
||||
|
||||
@classmethod
|
||||
def latest_task(cls, user: User):
|
||||
return (
|
||||
cls.objects.filter(user=user, type=cls.TaskType)
|
||||
.order_by("-created_time")
|
||||
.first()
|
||||
)
|
||||
return cls.objects.filter(user=user).order_by("-created_time").first()
|
||||
|
||||
@classmethod
|
||||
def enqueue(cls, user: User, **kwargs) -> "Task":
|
||||
def create(cls, user: User, **kwargs) -> "Task":
|
||||
d = cls.DefaultMetadata.copy()
|
||||
d.update(kwargs)
|
||||
t = cls.objects.create(user=user, type=cls.TaskType, metadata=d)
|
||||
django_rq.get_queue(cls.TaskQueue).enqueue(cls._run, t.pk, job_id=t.job_id)
|
||||
t = cls.objects.create(user=user, metadata=d)
|
||||
return t
|
||||
|
||||
@classmethod
|
||||
def _run(cls, task_id: int):
|
||||
task = cls.objects.get(pk=task_id)
|
||||
try:
|
||||
logger.info(f"running {task}")
|
||||
if task.state != cls.States.pending:
|
||||
logger.warning(
|
||||
f"task {task_id} is not pending, skipping", extra={"task": task_id}
|
||||
)
|
||||
return
|
||||
task.state = cls.States.started
|
||||
task.save(update_fields=["state"])
|
||||
task.save()
|
||||
with set_actor(task.user):
|
||||
try:
|
||||
task.run()
|
||||
task.state = cls.States.complete
|
||||
task.save(update_fields=["state"])
|
||||
ok = True
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
f"error running {cls.__name__}", extra={"exception": e, "task": task_id}
|
||||
f"error running {cls.__name__}",
|
||||
extra={"exception": e, "task": task_id},
|
||||
)
|
||||
task.message = "Error occured."
|
||||
task.state = cls.States.failed
|
||||
task.save(update_fields=["state", "message"])
|
||||
task = cls.objects.get(pk=task_id)
|
||||
if task.message:
|
||||
if task.state == cls.States.complete:
|
||||
msg.success(task.user, f"[{task.type}] {task.message}")
|
||||
ok = False
|
||||
task.refresh_from_db()
|
||||
task.state = cls.States.complete if ok else cls.States.failed
|
||||
task.save()
|
||||
task.notify()
|
||||
|
||||
def enqueue(self):
|
||||
return django_rq.get_queue(self.TaskQueue).enqueue(
|
||||
self._run, self.pk, job_id=self.job_id
|
||||
)
|
||||
|
||||
def notify(self) -> None:
|
||||
ok = self.state == self.States.complete
|
||||
message = self.message or (None if ok else "Error occured.")
|
||||
if ok:
|
||||
msg.success(self.user, f"[{self.type}] {message}")
|
||||
else:
|
||||
msg.error(task.user, f"[{task.type}] {task.message}")
|
||||
msg.error(self.user, f"[{self.type}] {message}")
|
||||
|
||||
def run(self) -> None:
|
||||
raise NotImplementedError("subclass must implement this")
|
||||
|
|
|
@ -210,10 +210,16 @@
|
|||
method="post"
|
||||
enctype="multipart/form-data">
|
||||
{% csrf_token %}
|
||||
<input type="submit"
|
||||
{% if export_status.marks_pending %} value="{% trans 'Export in progress' %}" {% else %} value="{% trans 'Export marks and reviews' %}" {% endif %} />
|
||||
{% if export_status.marks_file %}
|
||||
<a href="{% url 'users:export_marks' %}" download>{% trans 'Download' %} {{ export_status.marks_date }}</a>
|
||||
<input type="submit" value="{% trans 'Export marks and reviews' %}" />
|
||||
{% if export_task %}
|
||||
<br>
|
||||
{% trans 'Last export' %}: {{ export_task.created_time }}
|
||||
{% trans 'Status' %}: {{ export_task.get_state_display }}
|
||||
<br>
|
||||
{{ export_task.message }}
|
||||
{% if letterboxd_task.metadata.file %}
|
||||
<a href="{% url 'users:export_marks' %}" download>{% trans 'Download' %}</a>
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
</form>
|
||||
</details>
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
import datetime
|
||||
import os
|
||||
|
||||
import django_rq
|
||||
from django.conf import settings
|
||||
from django.contrib import messages
|
||||
from django.contrib.auth.decorators import login_required
|
||||
|
@ -13,13 +12,16 @@ from django.utils import translation
|
|||
from django.utils.translation import gettext as _
|
||||
|
||||
from common.utils import GenerateDateUUIDMediaFilePath
|
||||
from journal.exporters.doufen import export_marks_task
|
||||
from journal.importers.douban import DoubanImporter
|
||||
from journal.importers.goodreads import GoodreadsImporter
|
||||
from journal.importers.letterboxd import LetterboxdImporter
|
||||
from journal.importers.opml import OPMLImporter
|
||||
from journal.exporters import DoufenExporter
|
||||
from journal.importers import (
|
||||
DoubanImporter,
|
||||
GoodreadsImporter,
|
||||
LetterboxdImporter,
|
||||
OPMLImporter,
|
||||
)
|
||||
from journal.models import ShelfType, reset_journal_visibility_for_user
|
||||
from social.models import reset_social_visibility_for_user
|
||||
from users.models import Task
|
||||
|
||||
from .account import *
|
||||
|
||||
|
@ -94,7 +96,7 @@ def data(request):
|
|||
{
|
||||
"allow_any_site": settings.MASTODON_ALLOW_ANY_SITE,
|
||||
"import_status": request.user.preference.import_status,
|
||||
"export_status": request.user.preference.export_status,
|
||||
"export_task": DoufenExporter.latest_task(request.user),
|
||||
"letterboxd_task": LetterboxdImporter.latest_task(request.user),
|
||||
"years": years,
|
||||
},
|
||||
|
@ -122,14 +124,18 @@ def export_reviews(request):
|
|||
@login_required
|
||||
def export_marks(request):
|
||||
if request.method == "POST":
|
||||
django_rq.get_queue("export").enqueue(export_marks_task, request.user)
|
||||
request.user.preference.export_status["marks_pending"] = True
|
||||
request.user.preference.save()
|
||||
DoufenExporter.create(request.user).enqueue()
|
||||
messages.add_message(request, messages.INFO, _("Generating exports."))
|
||||
return redirect(reverse("users:data"))
|
||||
else:
|
||||
task = DoufenExporter.latest_task(request.user)
|
||||
if not task or task.state != Task.States.complete:
|
||||
messages.add_message(
|
||||
request, messages.ERROR, _("Export file not available.")
|
||||
)
|
||||
return redirect(reverse("users:data"))
|
||||
try:
|
||||
with open(request.user.preference.export_status["marks_file"], "rb") as fh:
|
||||
with open(task.metadata["file"], "rb") as fh:
|
||||
response = HttpResponse(
|
||||
fh.read(), content_type="application/vnd.ms-excel"
|
||||
)
|
||||
|
@ -215,11 +221,11 @@ def import_letterboxd(request):
|
|||
with open(f, "wb+") as destination:
|
||||
for chunk in request.FILES["file"].chunks():
|
||||
destination.write(chunk)
|
||||
LetterboxdImporter.enqueue(
|
||||
LetterboxdImporter.create(
|
||||
request.user,
|
||||
visibility=int(request.POST.get("visibility", 0)),
|
||||
file=f,
|
||||
)
|
||||
).enqueue()
|
||||
messages.add_message(
|
||||
request, messages.INFO, _("File is uploaded and will be imported soon.")
|
||||
)
|
||||
|
|
Loading…
Add table
Reference in a new issue