lib.itmens/sync/jobs.py
Henri Dickson 14b003a44a add all NeoDB features to NiceDB (#115)
* fix scraping failure with wepb image (merge upstream/fix-webp-scrape)

* add filetype to requirements

* add proxycrawl.com as fallback for douban scraper

* load 3p js/css from cdn

* add fix-cover task

* fix book/album cover tasks

* scrapestack

* bandcamp scrape and preview ;
manage.py scrape <url> ;
make ^C work when DEBUG

* use scrapestack when fix cover

* add user agent to improve compatibility

* search BandCamp for music albums

* add missing MovieGenre

* fix search 500 when song has no parent album

* adjust timeout

* individual scrapers

* fix tmdb parser

* export marks via rq; pref to send public toot; move import to data page

* fix spotify import

* fix edge cases

* export: fix dupe tags

* use rq to manage doufen import

* add django command to manage rq jobs

* fix export edge case

* tune rq admin

* fix detail page 502 step 1: async pull mastodon follow/block/mute list

* fix detail page 502 step 2: calculate relationship by local cached data

* manual sync mastodon follow info

* domain_blocks parsing fix

* marks by who i follows

* adjust label

* use username in urls

* add page to list a user\'s review

* review widget on user home page

* fix preview 500

* fix typo

* minor fix

* fix google books parsing

* allow mark/review visible to oneself

* fix auto sync masto for new user

* fix search 500

* add command to restart a sync task

* reset visibility

* delete user data

* fix tag search result pagination

* not upgrade to django 4 yet

* basic doc

* wip: collection

* wip

* wip

* collection use htmx

* show in-collection section for entities

* fix typo

* add su for easier debug

* fix some 500s

* fix login using alternative domain

* hide data from disabled user

* add item to list from detail page

* my tags

* collection: inline comment edit

* show number of ratings

* fix collection delete

* more detail in collection view

* use item template in search result

* fix 500

* write index to meilisearch

* fix search

* reindex in batch

* fix 500

* show search result from meilisearch

* more search commands

* index less fields

* index new items only

* search highlights

* fix 500

* auto set search category

* classic search if no meili server

* fix index stats error

* support typesense backend

* workaround typesense bug

* make external search async

* fix 500, typo

* fix cover scripts

* fix minor issue in douban parser

* supports m.douban.com and customized bandcamp domain

* move account

* reword with gender-friendly and instance-neutral language

* Friendica does not have vapid_key in api response

* enable anonymous search

* tweak book result template

* API v0

API v0

* fix meilisearch reindex

* fix search by url error

* login via twitter.com

* login via pixelfed

* minor fix

* no refresh on inactive users

* support refresh access token

* get rid of /users/number-id/

* refresh twitter handler automatically

* paste image when review

* support PixelFed (very long token)

* fix django-markdownx version

* ignore single quote for meilisearch for now

* update logo

* show book review/mark from same isbn

* show movie review/mark from same imdb

* fix login with older mastodon servers

* import Goodreads book list and profile

* add timestamp to Goodreads import

* support new google books api

* import goodreads list

* minor goodreads fix

* click corner action icon to add to wishlist

* clean up duplicated code

* fix anonymous search

* fix 500

* minor fix search 500

* show rating only if votes > 5

* Entity.refresh_rating()

* preference to append text when sharing; clean up duplicated code

* fix missing data for user tagged view

* fix page link for tag view

* fix 500 when language field longer than 10

* fix 500 when sharing mark for song

* fix error when reimport goodread profile

* fix minor typo

* fix a rare 500

* error log dump less

* fix tags in marks export

* fix missing param in pagination

* import douban review

* clarify text

* fix missing sheet in review import

* review: show in progress

* scrape douban: ignore unknown genre

* minor fix

* improve review import by guess entity urls

* clear guide text for review import

* improve review import form text

* workaround some 500

* fix mark import error

* fix img in review import

* load external results earlier

* ignore search server errors

* simplify user register flow to avoid inconsistent state

* Add a learn more link on login page

* Update login.html

* show mark created timestamp as mark time

* no 500 for api error

* redirect for expired tokens

* ensure preference object created.

* mark collections

* tag list

* fix tag display

* fix sorting etc

* fix 500

* fix potential export 500; save shared links

* fix share to twittwe

* fix review url

* fix 500

* fix 500

* add timeline, etc

* missing status change in timeline

* missing id in timeline

* timeline view by default

* workaround bug in markdownx...

* fix typo

* option to create new collection when add from detail page

* add missing announcement and tags in timeline home

* add missing announcement

* add missing announcement

* opensearch

* show fediverse shared link

* public review no longer requires login

* fix markdownx bug

* fix 500

* use cloudflare cdn

* validate jquery load and domain input

* fix 500

* tips for goodreads import

* collaborative collection

* show timeline and profile link on nav bar

* minor tweak

* share collection

* fix Goodreads search

* show wish mark in timeline

* resync failed urls with local proxy

* resync failed urls with local proxy: check proxy first

* scraper minor fix

* resync failed urls

* fix fields limit

* fix douban parsing error

* resync

* scraper minor fix

* scraper minor fix

* scraper minor fix

* local proxy

* local proxy

* sync default config from neodb

* configurable site name

* fix 500

* fix 500 for anonymous user

* add sentry

* add git version in log

* add git version in log

* no longer rely on cdnjs.cloudflare.com

* move jq/cash to _common_libs template partial

* fix rare js error

* fix 500

* avoid double submission error

* import tag in lower case

* catch some js network errors

* catch some js network errors

* support more goodread urls

* fix unaired tv in tmdb

* support more google book urls

* fix related series

* more goodreads urls

* robust googlebooks search

* robust search

* Update settings.py

* Update scraper.py

* Update requirements.txt

* make nicedb work

* doc update

* simplify permission check

* update doc

* update doc for bug report link

* skip spotify tracks

* fix 500

* improve search api

* blind fix import compatibility

* show years for movie in timeline

* show years for movie in timeline; thinner font

* export reviews

* revert user home to use jquery https://github.com/fabiospampinato/cash/issues/246

* IGDB

* use IGDB for Steam

* use TMDB for IMDb

* steam: igdb then fallback to steam

* keep change history

* keep change history: add django settings

* Steam: keep localized title/brief while merging IGDB

* basic Docker support

* rescrape

* Create codeql-analysis.yml

* Create SECURITY.md

* Create pysa.yml

Co-authored-by: doubaniux <goodsir@vivaldi.net>
Co-authored-by: Your Name <you@example.com>
Co-authored-by: Their Name <they@example.com>
Co-authored-by: Mt. Front <mfcndw@gmail.com>
2022-11-09 19:56:50 +01:00

344 lines
12 KiB
Python

import logging
import pytz
from dataclasses import dataclass
from datetime import datetime
from django.conf import settings
from django.utils import timezone
from django.core.exceptions import ObjectDoesNotExist
from openpyxl import load_workbook
from books.models import BookMark, Book, BookTag
from movies.models import MovieMark, Movie, MovieTag
from music.models import AlbumMark, Album, AlbumTag
from games.models import GameMark, Game, GameTag
from common.scraper import DoubanAlbumScraper, DoubanBookScraper, DoubanGameScraper, DoubanMovieScraper
from common.models import MarkStatusEnum
from .models import SyncTask
logger = logging.getLogger(__name__)
def __import_should_stop():
# TODO: using queue.connection.set(job.key + b':should_stop', 1, ex=30) on the caller side and connection.get(job.key + b':should_stop') on the worker side.
pass
def import_doufen_task(synctask):
sync_doufen_job(synctask, __import_should_stop)
class DoufenParser:
# indices in xlsx
URL_INDEX = 4
CONTENT_INDEX = 8
TAG_INDEX = 7
TIME_INDEX = 5
RATING_INDEX = 6
def __init__(self, task):
self.__file_path = task.file.path
self.__progress_sheet, self.__progress_row = task.get_breakpoint()
self.__is_new_task = True
if self.__progress_sheet is not None:
self.__is_new_task = False
if self.__progress_row is None:
self.__progress_row = 2
# data in the excel parse in python types
self.task = task
self.items = []
def __open_file(self):
self.__fp = open(self.__file_path, 'rb')
self.__wb = load_workbook(
self.__fp,
read_only=True,
data_only=True,
keep_links=False
)
def __close_file(self):
if self.__wb is not None:
self.__wb.close()
self.__fp.close()
def __get_item_classes_mapping(self):
'''
We assume that the sheets names won't change
'''
mappings = []
if self.task.sync_movie:
for sheet_name in ['想看', '在看', '看过']:
mappings.append({'sheet': sheet_name, 'mark_class': MovieMark,
'entity_class': Movie, 'tag_class': MovieTag, 'scraper': DoubanMovieScraper})
if self.task.sync_music:
for sheet_name in ['想听', '在听', '听过']:
mappings.append({'sheet': sheet_name, 'mark_class': AlbumMark,
'entity_class': Album, 'tag_class': AlbumTag, 'scraper': DoubanAlbumScraper})
if self.task.sync_book:
for sheet_name in ['想读', '在读', '读过']:
mappings.append({'sheet': sheet_name, 'mark_class': BookMark,
'entity_class': Book, 'tag_class': BookTag, 'scraper': DoubanBookScraper})
if self.task.sync_game:
for sheet_name in ['想玩', '在玩', '玩过']:
mappings.append({'sheet': sheet_name, 'mark_class': GameMark,
'entity_class': Game, 'tag_class': GameTag, 'scraper': DoubanGameScraper})
mappings.sort(key=lambda mapping: mapping['sheet'])
if not self.__is_new_task:
start_index = [mapping['sheet']
for mapping in mappings].index(self.__progress_sheet)
mappings = mappings[start_index:]
self.__mappings = mappings
return mappings
def __parse_items(self):
assert self.__wb is not None, 'workbook not found'
item_classes_mappings = self.__get_item_classes_mapping()
is_first_sheet = True
for mapping in item_classes_mappings:
if mapping['sheet'] not in self.__wb:
print(f"Sheet not found: {mapping['sheet']}")
continue
ws = self.__wb[mapping['sheet']]
max_row = ws.max_row
# empty sheet
if max_row <= 1:
continue
# decide starting position
start_row_index = 2
if not self.__is_new_task and is_first_sheet:
start_row_index = self.__progress_row
# parse data
tz = pytz.timezone('Asia/Shanghai')
i = start_row_index
for row in ws.iter_rows(min_row=start_row_index, max_row=max_row, values_only=True):
cells = [cell for cell in row]
url = cells[self.URL_INDEX - 1]
tags = cells[self.TAG_INDEX - 1]
tags = list(set(tags.lower().split(','))) if tags else None
time = cells[self.TIME_INDEX - 1]
if time and type(time) == str:
time = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
time = time.replace(tzinfo=tz)
elif time and type(time) == datetime:
time = time.replace(tzinfo=tz)
else:
time = None
content = cells[self.CONTENT_INDEX - 1]
if not content:
content = ""
rating = cells[self.RATING_INDEX - 1]
rating = int(rating) * 2 if rating else None
self.items.append({
'data': DoufenRowData(url, tags, time, content, rating),
'entity_class': mapping['entity_class'],
'mark_class': mapping['mark_class'],
'tag_class': mapping['tag_class'],
'scraper': mapping['scraper'],
'sheet': mapping['sheet'],
'row_index': i,
})
i = i + 1
# set first sheet flag
is_first_sheet = False
def __get_item_number(self):
assert self.__wb is not None, 'workbook not found'
assert self.__mappings is not None, 'mappings not found'
sheets = [mapping['sheet'] for mapping in self.__mappings]
item_number = 0
for sheet in sheets:
if sheet in self.__wb:
item_number += self.__wb[sheet].max_row - 1
return item_number
def __update_total_items(self):
total = self.__get_item_number()
self.task.total_items = total
self.task.save(update_fields=["total_items"])
def parse(self):
try:
self.__open_file()
self.__parse_items()
if self.__is_new_task:
self.__update_total_items()
self.__close_file()
return self.items
except Exception as e:
logger.error(f'Error parsing {self.__file_path} {e}')
self.task.is_failed = True
finally:
self.__close_file()
return []
@dataclass
class DoufenRowData:
url: str
tags: list
time: datetime
content: str
rating: int
def add_new_mark(data, user, entity, entity_class, mark_class, tag_class, sheet, default_public):
params = {
'owner': user,
'created_time': data.time,
'edited_time': data.time,
'rating': data.rating,
'text': data.content,
'status': translate_status(sheet),
'visibility': 0 if default_public else 1,
entity_class.__name__.lower(): entity,
}
mark = mark_class.objects.create(**params)
entity.update_rating(None, data.rating)
if data.tags:
for tag in data.tags:
params = {
'content': tag,
entity_class.__name__.lower(): entity,
'mark': mark
}
try:
tag_class.objects.create(**params)
except Exception as e:
logger.error(f'Error creating tag {tag} {mark}: {e}')
def overwrite_mark(entity, entity_class, mark, mark_class, tag_class, data, sheet):
old_rating = mark.rating
old_tags = getattr(mark, mark_class.__name__.lower() + '_tags').all()
# update mark logic
mark.created_time = data.time
mark.edited_time = data.time
mark.text = data.content
mark.rating = data.rating
mark.status = translate_status(sheet)
mark.save()
entity.update_rating(old_rating, data.rating)
if old_tags:
for tag in old_tags:
tag.delete()
if data.tags:
for tag in data.tags:
params = {
'content': tag,
entity_class.__name__.lower(): entity,
'mark': mark
}
try:
tag_class.objects.create(**params)
except Exception as e:
logger.error(f'Error creating tag {tag} {mark}: {e}')
def sync_doufen_job(task, stop_check_func):
"""
TODO: Update task status every certain amount of items to reduce IO consumption
"""
task = SyncTask.objects.get(pk=task.pk)
if task.is_finished:
return
print(f'Task {task.pk}: loading')
parser = DoufenParser(task)
items = parser.parse()
# use pop to reduce memo consumption
while len(items) > 0 and not stop_check_func():
item = items.pop(0)
data = item['data']
entity_class = item['entity_class']
mark_class = item['mark_class']
tag_class = item['tag_class']
scraper = item['scraper']
sheet = item['sheet']
row_index = item['row_index']
# update progress
task.set_breakpoint(sheet, row_index, save=True)
# scrape the entity if not exists
try:
entity = entity_class.objects.get(source_url=data.url)
print(f'Task {task.pk}: {len(items)+1} remaining; matched {data.url}')
except ObjectDoesNotExist:
try:
print(f'Task {task.pk}: {len(items)+1} remaining; scraping {data.url}')
scraper.scrape(data.url)
form = scraper.save(request_user=task.user)
entity = form.instance
except Exception as e:
logger.error(f"Task {task.pk}: scrape failed: {data.url} {e}")
if settings.DEBUG:
logger.error("Expections during scraping data:", exc_info=e)
task.failed_urls.append(data.url)
task.finished_items += 1
task.save(update_fields=['failed_urls', 'finished_items'])
continue
# sync mark
try:
# already exists
params = {
'owner': task.user,
entity_class.__name__.lower(): entity
}
mark = mark_class.objects.get(**params)
if task.overwrite:
overwrite_mark(entity, entity_class, mark,
mark_class, tag_class, data, sheet)
else:
task.success_items += 1
task.finished_items += 1
task.save(update_fields=['success_items', 'finished_items'])
continue
except ObjectDoesNotExist:
add_new_mark(data, task.user, entity, entity_class,
mark_class, tag_class, sheet, task.default_public)
except Exception as e:
logger.error(
f"Task {task.pk}: error when syncing marks", exc_info=e)
task.failed_urls.append(data.url)
task.finished_items += 1
task.save(update_fields=['failed_urls', 'finished_items'])
continue
task.success_items += 1
task.finished_items += 1
task.save(update_fields=['success_items', 'finished_items'])
# if task finish
print(f'Task {task.pk}: stopping')
if len(items) == 0:
task.is_finished = True
task.clear_breakpoint()
task.save(update_fields=['is_finished', 'break_point'])
def translate_status(sheet_name):
if '' in sheet_name:
return MarkStatusEnum.WISH
elif '' in sheet_name:
return MarkStatusEnum.DO
elif '' in sheet_name:
return MarkStatusEnum.COLLECT
raise ValueError("Not valid status")