diff --git a/common/models.py b/common/models.py index dcfe360a..f595be28 100644 --- a/common/models.py +++ b/common/models.py @@ -243,7 +243,7 @@ class Mark(UserOwnedEntity): text = models.CharField(max_length=5000, blank=True, default='') def __str__(self): - return f"({self.id}) {self.owner} {self.status.upper()}" + return f"Mark({self.id} {self.owner} {self.status.upper()})" class Meta: abstract = True diff --git a/sync/apps.py b/sync/apps.py index 14518e54..84c86ccf 100644 --- a/sync/apps.py +++ b/sync/apps.py @@ -4,7 +4,3 @@ from django.conf import settings class SyncConfig(AppConfig): name = 'sync' - - def ready(self): - from sync.jobs import sync_task_manager - sync_task_manager.start() \ No newline at end of file diff --git a/sync/jobs.py b/sync/jobs.py index d8325bde..9c5c4aa4 100644 --- a/sync/jobs.py +++ b/sync/jobs.py @@ -1,10 +1,5 @@ import logging import pytz -import signal -import sys -import queue -import threading -import time from dataclasses import dataclass from datetime import datetime from django.conf import settings @@ -19,70 +14,17 @@ from common.scraper import DoubanAlbumScraper, DoubanBookScraper, DoubanGameScra from common.models import MarkStatusEnum from .models import SyncTask -__all__ = ['sync_task_manager'] logger = logging.getLogger(__name__) -class SyncTaskManger: +def __import_should_stop(): + # TODO: using queue.connection.set(job.key + b':should_stop', 1, ex=30) on the caller side and connection.get(job.key + b':should_stop') on the worker side. + pass - # in seconds - __CHECK_NEW_TASK_TIME_INTERVAL = 0.05 - MAX_WORKERS = 256 - def __init__(self): - self.__task_queue = queue.Queue(0) - self.__stop_event = threading.Event() - self.__worker_threads = [] - - def __listen_for_new_task(self): - while not self.__stop_event.is_set(): - time.sleep(self.__CHECK_NEW_TASK_TIME_INTERVAL) - while not self.__task_queue.empty() and not self.is_full(): - task = self.__task_queue.get_nowait() - self.__start_new_worker(task) - - def __start_new_worker(self, task): - new_worker = threading.Thread( - target=sync_doufen_job, args=[task, self.is_stopped], daemon=True - ) - self.__worker_threads.append(new_worker) - new_worker.start() - - def __enqueue_existing_tasks(self): - for task in SyncTask.objects.filter(is_finished=False): - self.__task_queue.put_nowait(task) - - def is_full(self): - return len(self.__worker_threads) >= self.MAX_WORKERS - - def add_task(self, task): - self.__task_queue.put_nowait(task) - - def stop(self, signum, frame): - print('rceived signal ', signum) - logger.info(f'rceived signal {signum}') - - self.__stop_event.set() - # for worker_thread in self.__worker_threads: - # worker_thread.join() - - print("stopped") - logger.info(f'stopped') - - def is_stopped(self): - return self.__stop_event.is_set() - - def start(self): - if settings.START_SYNC: - self.__enqueue_existing_tasks() # enqueue - - listen_new_task_thread = threading.Thread( - target=self.__listen_for_new_task, daemon=True) - - self.__worker_threads.append(listen_new_task_thread) - - listen_new_task_thread.start() +def import_doufen_task(synctask): + sync_doufen_job(synctask, __import_should_stop) class DoufenParser: @@ -98,7 +40,7 @@ class DoufenParser: self.__file_path = task.file.path self.__progress_sheet, self.__progress_row = task.get_breakpoint() self.__is_new_task = True - if not self.__progress_sheet is None: + if self.__progress_sheet is not None: self.__is_new_task = False if self.__progress_row is None: self.__progress_row = 2 @@ -177,7 +119,7 @@ class DoufenParser: cells = [cell for cell in row] url = cells[self.URL_INDEX - 1] tags = cells[self.TAG_INDEX - 1] - tags = tags.split(',') if tags else None + tags = list(set(tags.split(','))) if tags else None time = cells[self.TIME_INDEX - 1] if time: time = datetime.strptime(time, "%Y-%m-%d %H:%M:%S") @@ -265,12 +207,15 @@ def add_new_mark(data, user, entity, entity_class, mark_class, tag_class, sheet, entity_class.__name__.lower(): entity, 'mark': mark } - tag_class.objects.create(**params) + try: + tag_class.objects.create(**params) + except Exception as e: + logger.error(f'Error creating tag {tag} {mark}: {e}') def overwrite_mark(entity, entity_class, mark, mark_class, tag_class, data, sheet): old_rating = mark.rating - old_tags = getattr(mark, mark_class.__name__.lower()+'_tags').all() + old_tags = getattr(mark, mark_class.__name__.lower() + '_tags').all() # update mark logic mark.created_time = data.time mark.edited_time = data.time @@ -289,7 +234,10 @@ def overwrite_mark(entity, entity_class, mark, mark_class, tag_class, data, shee entity_class.__name__.lower(): entity, 'mark': mark } - tag_class.objects.create(**params) + try: + tag_class.objects.create(**params) + except Exception as e: + logger.error(f'Error creating tag {tag} {mark}: {e}') def sync_doufen_job(task, stop_check_func): @@ -388,15 +336,3 @@ def translate_status(sheet_name): return MarkStatusEnum.COLLECT raise ValueError("Not valid status") - - -sync_task_manager = SyncTaskManger() - -# sync_task_manager.start() - -if not settings.DEBUG: - # TODO: it seems this prevent ^C from working properly - signal.signal(signal.SIGTERM, sync_task_manager.stop) - if sys.platform.startswith('linux'): - signal.signal(signal.SIGHUP, sync_task_manager.stop) - signal.signal(signal.SIGINT, sync_task_manager.stop) diff --git a/sync/models.py b/sync/models.py index ba74cd06..a3d10d7c 100644 --- a/sync/models.py +++ b/sync/models.py @@ -69,7 +69,7 @@ class SyncTask(models.Model): def __str__(self): """Unicode representation of SyncTask.""" - return str(self.user.username) + '@' + str(self.started_time) + self.get_status_emoji() + return f'{self.id} {self.user} {self.file} {self.get_status_emoji()} {self.success_items}/{self.finished_items}/{self.total_items}' def get_status_emoji(self): return ("❌" if self.is_failed else "✔") if self.is_finished else "⚡" diff --git a/sync/views.py b/sync/views.py index 69aa7de6..e669fe65 100644 --- a/sync/views.py +++ b/sync/views.py @@ -3,7 +3,7 @@ from django.contrib.auth.decorators import login_required from django.http import HttpResponseBadRequest, JsonResponse, HttpResponse from .models import SyncTask from .forms import SyncTaskForm -from .jobs import sync_task_manager +from .jobs import import_doufen_task import tempfile import os from threading import Thread @@ -11,6 +11,7 @@ import openpyxl from django.utils.datastructures import MultiValueDictKeyError from openpyxl.utils.exceptions import InvalidFileException from zipfile import BadZipFile +import django_rq @login_required @@ -25,7 +26,7 @@ def sync_douban(request): wb = openpyxl.open(uploaded_file, read_only=True, data_only=True, keep_links=False) wb.close() - except (MultiValueDictKeyError, InvalidFileException, BadZipFile) as e : + except (MultiValueDictKeyError, InvalidFileException, BadZipFile) as e: # raise e return HttpResponseBadRequest(content="invalid excel file") @@ -35,8 +36,7 @@ def sync_douban(request): # stop all preivous task SyncTask.objects.filter(user=request.user, is_finished=False).update(is_finished=True) form.save() - sync_task_manager.add_task(form.instance) - + django_rq.get_queue('doufen').enqueue(import_doufen_task, form.instance, job_id=f'SyncTask_{form.instance.id}') return HttpResponse(status=204) else: return HttpResponseBadRequest() diff --git a/users/views.py b/users/views.py index 8d33bdd3..e2c1b2ba 100644 --- a/users/views.py +++ b/users/views.py @@ -881,7 +881,7 @@ def export_reviews(request): def export_marks(request): if request.method == 'POST': if not request.user.preference.export_status.get('marks_pending'): - django_rq.enqueue(export_marks_task, request.user) + django_rq.get_queue('export').enqueue(export_marks_task, request.user) request.user.preference.export_status['marks_pending'] = True request.user.preference.save() return redirect(reverse("users:data"))