lib.itmens/sync/jobs.py

343 lines
12 KiB
Python
Raw Normal View History

2021-06-14 22:18:39 +02:00
import logging
import pytz
2021-08-29 17:52:42 +02:00
from dataclasses import dataclass
2021-06-14 22:18:39 +02:00
from datetime import datetime
2021-09-15 14:21:29 -04:00
from django.conf import settings
2021-06-14 22:18:39 +02:00
from django.utils import timezone
from django.core.exceptions import ObjectDoesNotExist
from openpyxl import load_workbook
from books.models import BookMark, Book, BookTag
from movies.models import MovieMark, Movie, MovieTag
from music.models import AlbumMark, Album, AlbumTag
from games.models import GameMark, Game, GameTag
from common.scraper import DoubanAlbumScraper, DoubanBookScraper, DoubanGameScraper, DoubanMovieScraper
from common.models import MarkStatusEnum
2021-08-29 17:52:42 +02:00
from .models import SyncTask
2021-06-14 22:18:39 +02:00
logger = logging.getLogger(__name__)
2021-09-01 11:41:21 +02:00
2021-12-12 18:15:42 -05:00
def __import_should_stop():
# TODO: using queue.connection.set(job.key + b':should_stop', 1, ex=30) on the caller side and connection.get(job.key + b':should_stop') on the worker side.
pass
2021-08-29 17:52:42 +02:00
2021-12-12 18:15:42 -05:00
def import_doufen_task(synctask):
sync_doufen_job(synctask, __import_should_stop)
2021-08-29 17:52:42 +02:00
2021-06-14 22:18:39 +02:00
2021-08-29 17:52:42 +02:00
class DoufenParser:
2021-06-14 22:18:39 +02:00
2021-08-29 17:52:42 +02:00
# indices in xlsx
URL_INDEX = 4
CONTENT_INDEX = 8
TAG_INDEX = 7
TIME_INDEX = 5
RATING_INDEX = 6
def __init__(self, task):
self.__file_path = task.file.path
self.__progress_sheet, self.__progress_row = task.get_breakpoint()
self.__is_new_task = True
2021-12-12 18:15:42 -05:00
if self.__progress_sheet is not None:
2021-08-29 17:52:42 +02:00
self.__is_new_task = False
if self.__progress_row is None:
self.__progress_row = 2
# data in the excel parse in python types
self.task = task
self.items = []
def __open_file(self):
self.__fp = open(self.__file_path, 'rb')
self.__wb = load_workbook(
self.__fp,
2021-06-14 22:18:39 +02:00
read_only=True,
data_only=True,
keep_links=False
)
2021-08-29 17:52:42 +02:00
def __close_file(self):
if self.__wb is not None:
self.__wb.close()
self.__fp.close()
2021-06-14 22:18:39 +02:00
2021-08-29 17:52:42 +02:00
def __get_item_classes_mapping(self):
'''
We assume that the sheets names won't change
'''
mappings = []
if self.task.sync_movie:
for sheet_name in ['想看', '在看', '看过']:
mappings.append({'sheet': sheet_name, 'mark_class': MovieMark,
'entity_class': Movie, 'tag_class': MovieTag, 'scraper': DoubanMovieScraper})
if self.task.sync_music:
for sheet_name in ['想听', '在听', '听过']:
mappings.append({'sheet': sheet_name, 'mark_class': AlbumMark,
'entity_class': Album, 'tag_class': AlbumTag, 'scraper': DoubanAlbumScraper})
if self.task.sync_book:
for sheet_name in ['想读', '在读', '读过']:
mappings.append({'sheet': sheet_name, 'mark_class': BookMark,
'entity_class': Book, 'tag_class': BookTag, 'scraper': DoubanBookScraper})
if self.task.sync_game:
for sheet_name in ['想玩', '在玩', '玩过']:
mappings.append({'sheet': sheet_name, 'mark_class': GameMark,
'entity_class': Game, 'tag_class': GameTag, 'scraper': DoubanGameScraper})
2021-06-14 22:18:39 +02:00
2021-08-29 17:52:42 +02:00
mappings.sort(key=lambda mapping: mapping['sheet'])
if not self.__is_new_task:
start_index = [mapping['sheet']
for mapping in mappings].index(self.__progress_sheet)
mappings = mappings[start_index:]
self.__mappings = mappings
return mappings
def __parse_items(self):
assert self.__wb is not None, 'workbook not found'
item_classes_mappings = self.__get_item_classes_mapping()
is_first_sheet = True
for mapping in item_classes_mappings:
2022-05-05 22:06:56 -04:00
if mapping['sheet'] not in self.__wb:
print(f"Sheet not found: {mapping['sheet']}")
continue
2021-08-29 17:52:42 +02:00
ws = self.__wb[mapping['sheet']]
2021-10-11 22:05:30 -04:00
max_row = ws.max_row
2021-08-29 17:52:42 +02:00
# empty sheet
2021-10-11 22:05:30 -04:00
if max_row <= 1:
2021-08-29 17:52:42 +02:00
continue
# decide starting position
start_row_index = 2
if not self.__is_new_task and is_first_sheet:
start_row_index = self.__progress_row
# parse data
2021-10-11 22:05:30 -04:00
i = start_row_index
for row in ws.iter_rows(min_row=start_row_index, max_row=max_row, values_only=True):
cells = [cell for cell in row]
url = cells[self.URL_INDEX - 1]
tags = cells[self.TAG_INDEX - 1]
2021-12-12 18:15:42 -05:00
tags = list(set(tags.split(','))) if tags else None
2021-10-11 22:05:30 -04:00
time = cells[self.TIME_INDEX - 1]
2021-08-29 17:52:42 +02:00
if time:
time = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
tz = pytz.timezone('Asia/Shanghai')
time = time.replace(tzinfo=tz)
else:
time = None
2021-10-11 22:05:30 -04:00
content = cells[self.CONTENT_INDEX - 1]
2021-08-29 17:52:42 +02:00
if not content:
content = ""
2021-10-11 22:05:30 -04:00
rating = cells[self.RATING_INDEX - 1]
2021-08-29 17:52:42 +02:00
rating = int(rating) * 2 if rating else None
self.items.append({
'data': DoufenRowData(url, tags, time, content, rating),
'entity_class': mapping['entity_class'],
'mark_class': mapping['mark_class'],
'tag_class': mapping['tag_class'],
'scraper': mapping['scraper'],
'sheet': mapping['sheet'],
'row_index': i,
})
2021-10-11 22:05:30 -04:00
i = i + 1
2021-08-29 17:52:42 +02:00
# set first sheet flag
is_first_sheet = False
def __get_item_number(self):
assert not self.__wb is None, 'workbook not found'
assert not self.__mappings is None, 'mappings not found'
sheets = [mapping['sheet'] for mapping in self.__mappings]
item_number = 0
for sheet in sheets:
2022-05-05 22:06:56 -04:00
if sheet in self.__wb:
item_number += self.__wb[sheet].max_row - 1
2021-08-29 17:52:42 +02:00
return item_number
def __update_total_items(self):
total = self.__get_item_number()
self.task.total_items = total
self.task.save(update_fields=["total_items"])
def parse(self):
try:
self.__open_file()
self.__parse_items()
if self.__is_new_task:
self.__update_total_items()
self.__close_file()
return self.items
except Exception as e:
2021-10-11 22:05:30 -04:00
logger.error(f'Error parsing {self.__file_path} {e}')
self.task.is_failed = True
2021-08-29 17:52:42 +02:00
finally:
self.__close_file()
2021-10-11 22:05:30 -04:00
return []
2021-08-29 17:52:42 +02:00
@dataclass
class DoufenRowData:
url: str
tags: list
time: datetime
content: str
rating: int
2021-12-20 22:59:32 -05:00
def add_new_mark(data, user, entity, entity_class, mark_class, tag_class, sheet, default_public):
2021-08-29 17:52:42 +02:00
params = {
'owner': user,
'created_time': data.time,
'edited_time': data.time,
'rating': data.rating,
'text': data.content,
'status': translate_status(sheet),
2021-12-20 22:59:32 -05:00
'visibility': 0 if default_public else 1,
2021-08-29 17:52:42 +02:00
entity_class.__name__.lower(): entity,
}
mark = mark_class.objects.create(**params)
entity.update_rating(None, data.rating)
if data.tags:
for tag in data.tags:
params = {
'content': tag,
entity_class.__name__.lower(): entity,
'mark': mark
}
2021-12-12 18:15:42 -05:00
try:
tag_class.objects.create(**params)
except Exception as e:
logger.error(f'Error creating tag {tag} {mark}: {e}')
2021-08-29 17:52:42 +02:00
def overwrite_mark(entity, entity_class, mark, mark_class, tag_class, data, sheet):
old_rating = mark.rating
2021-12-12 18:15:42 -05:00
old_tags = getattr(mark, mark_class.__name__.lower() + '_tags').all()
2021-08-29 17:52:42 +02:00
# update mark logic
mark.created_time = data.time
mark.edited_time = data.time
mark.text = data.content
mark.rating = data.rating
mark.status = translate_status(sheet)
mark.save()
entity.update_rating(old_rating, data.rating)
if old_tags:
for tag in old_tags:
tag.delete()
if data.tags:
for tag in data.tags:
params = {
'content': tag,
entity_class.__name__.lower(): entity,
'mark': mark
}
2021-12-12 18:15:42 -05:00
try:
tag_class.objects.create(**params)
except Exception as e:
logger.error(f'Error creating tag {tag} {mark}: {e}')
2021-08-29 17:52:42 +02:00
def sync_doufen_job(task, stop_check_func):
2021-09-01 11:41:21 +02:00
"""
TODO: Update task status every certain amount of items to reduce IO consumption
"""
task = SyncTask.objects.get(pk=task.pk)
if task.is_finished:
return
2021-10-11 22:05:30 -04:00
print(f'Task {task.pk}: loading')
2021-08-29 17:52:42 +02:00
parser = DoufenParser(task)
items = parser.parse()
# use pop to reduce memo consumption
while len(items) > 0 and not stop_check_func():
item = items.pop(0)
data = item['data']
entity_class = item['entity_class']
mark_class = item['mark_class']
tag_class = item['tag_class']
scraper = item['scraper']
sheet = item['sheet']
row_index = item['row_index']
# update progress
task.set_breakpoint(sheet, row_index, save=True)
# scrape the entity if not exists
try:
entity = entity_class.objects.get(source_url=data.url)
2021-10-11 22:05:30 -04:00
print(f'Task {task.pk}: {len(items)+1} remaining; matched {data.url}')
2021-08-29 17:52:42 +02:00
except ObjectDoesNotExist:
try:
2021-10-11 22:05:30 -04:00
print(f'Task {task.pk}: {len(items)+1} remaining; scraping {data.url}')
2021-08-29 17:52:42 +02:00
scraper.scrape(data.url)
form = scraper.save(request_user=task.user)
entity = form.instance
except Exception as e:
2021-10-11 22:05:30 -04:00
logger.error(f"Task {task.pk}: scrape failed: {data.url} {e}")
if settings.DEBUG:
logger.error("Expections during scraping data:", exc_info=e)
2021-08-29 17:52:42 +02:00
task.failed_urls.append(data.url)
2021-09-01 12:08:04 +02:00
task.finished_items += 1
task.save(update_fields=['failed_urls', 'finished_items'])
2021-08-29 17:52:42 +02:00
continue
# sync mark
try:
# already exists
params = {
'owner': task.user,
entity_class.__name__.lower(): entity
}
mark = mark_class.objects.get(**params)
if task.overwrite:
overwrite_mark(entity, entity_class, mark,
mark_class, tag_class, data, sheet)
else:
2021-09-01 12:08:04 +02:00
task.success_items += 1
task.finished_items += 1
task.save(update_fields=['success_items', 'finished_items'])
2021-08-29 17:52:42 +02:00
continue
except ObjectDoesNotExist:
add_new_mark(data, task.user, entity, entity_class,
mark_class, tag_class, sheet, task.default_public)
except Exception as e:
logger.error(
2021-10-11 22:05:30 -04:00
f"Task {task.pk}: error when syncing marks", exc_info=e)
2021-08-29 17:52:42 +02:00
task.failed_urls.append(data.url)
2021-09-01 12:08:04 +02:00
task.finished_items += 1
task.save(update_fields=['failed_urls', 'finished_items'])
2021-08-29 17:52:42 +02:00
continue
2021-09-01 11:41:21 +02:00
2021-08-29 17:52:42 +02:00
task.success_items += 1
2021-09-01 12:08:04 +02:00
task.finished_items += 1
task.save(update_fields=['success_items', 'finished_items'])
2021-08-29 17:52:42 +02:00
# if task finish
2021-10-11 22:05:30 -04:00
print(f'Task {task.pk}: stopping')
2021-08-29 17:52:42 +02:00
if len(items) == 0:
task.is_finished = True
2021-09-01 11:41:21 +02:00
task.clear_breakpoint()
2021-08-29 17:52:42 +02:00
task.save(update_fields=['is_finished', 'break_point'])
2021-06-14 22:18:39 +02:00
def translate_status(sheet_name):
if '' in sheet_name:
return MarkStatusEnum.WISH
elif '' in sheet_name:
return MarkStatusEnum.DO
elif '' in sheet_name:
return MarkStatusEnum.COLLECT
2021-08-29 17:52:42 +02:00
2021-06-14 22:18:39 +02:00
raise ValueError("Not valid status")