use rq to manage doufen import

This commit is contained in:
Your Name 2021-12-12 18:15:42 -05:00
parent d0663db03a
commit 0c699aa7dc
6 changed files with 23 additions and 91 deletions

View file

@ -243,7 +243,7 @@ class Mark(UserOwnedEntity):
text = models.CharField(max_length=5000, blank=True, default='')
def __str__(self):
return f"({self.id}) {self.owner} {self.status.upper()}"
return f"Mark({self.id} {self.owner} {self.status.upper()})"
class Meta:
abstract = True

View file

@ -4,7 +4,3 @@ from django.conf import settings
class SyncConfig(AppConfig):
name = 'sync'
def ready(self):
from sync.jobs import sync_task_manager
sync_task_manager.start()

View file

@ -1,10 +1,5 @@
import logging
import pytz
import signal
import sys
import queue
import threading
import time
from dataclasses import dataclass
from datetime import datetime
from django.conf import settings
@ -19,70 +14,17 @@ from common.scraper import DoubanAlbumScraper, DoubanBookScraper, DoubanGameScra
from common.models import MarkStatusEnum
from .models import SyncTask
__all__ = ['sync_task_manager']
logger = logging.getLogger(__name__)
class SyncTaskManger:
def __import_should_stop():
# TODO: using queue.connection.set(job.key + b':should_stop', 1, ex=30) on the caller side and connection.get(job.key + b':should_stop') on the worker side.
pass
# in seconds
__CHECK_NEW_TASK_TIME_INTERVAL = 0.05
MAX_WORKERS = 256
def __init__(self):
self.__task_queue = queue.Queue(0)
self.__stop_event = threading.Event()
self.__worker_threads = []
def __listen_for_new_task(self):
while not self.__stop_event.is_set():
time.sleep(self.__CHECK_NEW_TASK_TIME_INTERVAL)
while not self.__task_queue.empty() and not self.is_full():
task = self.__task_queue.get_nowait()
self.__start_new_worker(task)
def __start_new_worker(self, task):
new_worker = threading.Thread(
target=sync_doufen_job, args=[task, self.is_stopped], daemon=True
)
self.__worker_threads.append(new_worker)
new_worker.start()
def __enqueue_existing_tasks(self):
for task in SyncTask.objects.filter(is_finished=False):
self.__task_queue.put_nowait(task)
def is_full(self):
return len(self.__worker_threads) >= self.MAX_WORKERS
def add_task(self, task):
self.__task_queue.put_nowait(task)
def stop(self, signum, frame):
print('rceived signal ', signum)
logger.info(f'rceived signal {signum}')
self.__stop_event.set()
# for worker_thread in self.__worker_threads:
# worker_thread.join()
print("stopped")
logger.info(f'stopped')
def is_stopped(self):
return self.__stop_event.is_set()
def start(self):
if settings.START_SYNC:
self.__enqueue_existing_tasks() # enqueue
listen_new_task_thread = threading.Thread(
target=self.__listen_for_new_task, daemon=True)
self.__worker_threads.append(listen_new_task_thread)
listen_new_task_thread.start()
def import_doufen_task(synctask):
sync_doufen_job(synctask, __import_should_stop)
class DoufenParser:
@ -98,7 +40,7 @@ class DoufenParser:
self.__file_path = task.file.path
self.__progress_sheet, self.__progress_row = task.get_breakpoint()
self.__is_new_task = True
if not self.__progress_sheet is None:
if self.__progress_sheet is not None:
self.__is_new_task = False
if self.__progress_row is None:
self.__progress_row = 2
@ -177,7 +119,7 @@ class DoufenParser:
cells = [cell for cell in row]
url = cells[self.URL_INDEX - 1]
tags = cells[self.TAG_INDEX - 1]
tags = tags.split(',') if tags else None
tags = list(set(tags.split(','))) if tags else None
time = cells[self.TIME_INDEX - 1]
if time:
time = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
@ -265,12 +207,15 @@ def add_new_mark(data, user, entity, entity_class, mark_class, tag_class, sheet,
entity_class.__name__.lower(): entity,
'mark': mark
}
tag_class.objects.create(**params)
try:
tag_class.objects.create(**params)
except Exception as e:
logger.error(f'Error creating tag {tag} {mark}: {e}')
def overwrite_mark(entity, entity_class, mark, mark_class, tag_class, data, sheet):
old_rating = mark.rating
old_tags = getattr(mark, mark_class.__name__.lower()+'_tags').all()
old_tags = getattr(mark, mark_class.__name__.lower() + '_tags').all()
# update mark logic
mark.created_time = data.time
mark.edited_time = data.time
@ -289,7 +234,10 @@ def overwrite_mark(entity, entity_class, mark, mark_class, tag_class, data, shee
entity_class.__name__.lower(): entity,
'mark': mark
}
tag_class.objects.create(**params)
try:
tag_class.objects.create(**params)
except Exception as e:
logger.error(f'Error creating tag {tag} {mark}: {e}')
def sync_doufen_job(task, stop_check_func):
@ -388,15 +336,3 @@ def translate_status(sheet_name):
return MarkStatusEnum.COLLECT
raise ValueError("Not valid status")
sync_task_manager = SyncTaskManger()
# sync_task_manager.start()
if not settings.DEBUG:
# TODO: it seems this prevent ^C from working properly
signal.signal(signal.SIGTERM, sync_task_manager.stop)
if sys.platform.startswith('linux'):
signal.signal(signal.SIGHUP, sync_task_manager.stop)
signal.signal(signal.SIGINT, sync_task_manager.stop)

View file

@ -69,7 +69,7 @@ class SyncTask(models.Model):
def __str__(self):
"""Unicode representation of SyncTask."""
return str(self.user.username) + '@' + str(self.started_time) + self.get_status_emoji()
return f'{self.id} {self.user} {self.file} {self.get_status_emoji()} {self.success_items}/{self.finished_items}/{self.total_items}'
def get_status_emoji(self):
return ("" if self.is_failed else "") if self.is_finished else ""

View file

@ -3,7 +3,7 @@ from django.contrib.auth.decorators import login_required
from django.http import HttpResponseBadRequest, JsonResponse, HttpResponse
from .models import SyncTask
from .forms import SyncTaskForm
from .jobs import sync_task_manager
from .jobs import import_doufen_task
import tempfile
import os
from threading import Thread
@ -11,6 +11,7 @@ import openpyxl
from django.utils.datastructures import MultiValueDictKeyError
from openpyxl.utils.exceptions import InvalidFileException
from zipfile import BadZipFile
import django_rq
@login_required
@ -25,7 +26,7 @@ def sync_douban(request):
wb = openpyxl.open(uploaded_file, read_only=True,
data_only=True, keep_links=False)
wb.close()
except (MultiValueDictKeyError, InvalidFileException, BadZipFile) as e :
except (MultiValueDictKeyError, InvalidFileException, BadZipFile) as e:
# raise e
return HttpResponseBadRequest(content="invalid excel file")
@ -35,8 +36,7 @@ def sync_douban(request):
# stop all preivous task
SyncTask.objects.filter(user=request.user, is_finished=False).update(is_finished=True)
form.save()
sync_task_manager.add_task(form.instance)
django_rq.get_queue('doufen').enqueue(import_doufen_task, form.instance, job_id=f'SyncTask_{form.instance.id}')
return HttpResponse(status=204)
else:
return HttpResponseBadRequest()

View file

@ -881,7 +881,7 @@ def export_reviews(request):
def export_marks(request):
if request.method == 'POST':
if not request.user.preference.export_status.get('marks_pending'):
django_rq.enqueue(export_marks_task, request.user)
django_rq.get_queue('export').enqueue(export_marks_task, request.user)
request.user.preference.export_status['marks_pending'] = True
request.user.preference.save()
return redirect(reverse("users:data"))