2021-06-14 22:18:39 +02:00
import logging
import pytz
2021-08-29 17:52:42 +02:00
from dataclasses import dataclass
2021-06-14 22:18:39 +02:00
from datetime import datetime
2021-09-15 14:21:29 -04:00
from django . conf import settings
2021-06-14 22:18:39 +02:00
from django . utils import timezone
from django . core . exceptions import ObjectDoesNotExist
from openpyxl import load_workbook
from books . models import BookMark , Book , BookTag
from movies . models import MovieMark , Movie , MovieTag
from music . models import AlbumMark , Album , AlbumTag
from games . models import GameMark , Game , GameTag
from common . scraper import DoubanAlbumScraper , DoubanBookScraper , DoubanGameScraper , DoubanMovieScraper
from common . models import MarkStatusEnum
2021-08-29 17:52:42 +02:00
from . models import SyncTask
2021-06-14 22:18:39 +02:00
logger = logging . getLogger ( __name__ )
2021-09-01 11:41:21 +02:00
2021-12-12 18:15:42 -05:00
def __import_should_stop ( ) :
# TODO: using queue.connection.set(job.key + b':should_stop', 1, ex=30) on the caller side and connection.get(job.key + b':should_stop') on the worker side.
pass
2021-08-29 17:52:42 +02:00
2021-12-12 18:15:42 -05:00
def import_doufen_task ( synctask ) :
sync_doufen_job ( synctask , __import_should_stop )
2021-08-29 17:52:42 +02:00
2021-06-14 22:18:39 +02:00
2021-08-29 17:52:42 +02:00
class DoufenParser :
2021-06-14 22:18:39 +02:00
2021-08-29 17:52:42 +02:00
# indices in xlsx
URL_INDEX = 4
CONTENT_INDEX = 8
TAG_INDEX = 7
TIME_INDEX = 5
RATING_INDEX = 6
def __init__ ( self , task ) :
self . __file_path = task . file . path
self . __progress_sheet , self . __progress_row = task . get_breakpoint ( )
self . __is_new_task = True
2021-12-12 18:15:42 -05:00
if self . __progress_sheet is not None :
2021-08-29 17:52:42 +02:00
self . __is_new_task = False
if self . __progress_row is None :
self . __progress_row = 2
# data in the excel parse in python types
self . task = task
self . items = [ ]
def __open_file ( self ) :
self . __fp = open ( self . __file_path , ' rb ' )
self . __wb = load_workbook (
self . __fp ,
2021-06-14 22:18:39 +02:00
read_only = True ,
data_only = True ,
keep_links = False
)
2021-08-29 17:52:42 +02:00
def __close_file ( self ) :
if self . __wb is not None :
self . __wb . close ( )
self . __fp . close ( )
2021-06-14 22:18:39 +02:00
2021-08-29 17:52:42 +02:00
def __get_item_classes_mapping ( self ) :
'''
We assume that the sheets names won ' t change
'''
mappings = [ ]
if self . task . sync_movie :
for sheet_name in [ ' 想看 ' , ' 在看 ' , ' 看过 ' ] :
mappings . append ( { ' sheet ' : sheet_name , ' mark_class ' : MovieMark ,
' entity_class ' : Movie , ' tag_class ' : MovieTag , ' scraper ' : DoubanMovieScraper } )
if self . task . sync_music :
for sheet_name in [ ' 想听 ' , ' 在听 ' , ' 听过 ' ] :
mappings . append ( { ' sheet ' : sheet_name , ' mark_class ' : AlbumMark ,
' entity_class ' : Album , ' tag_class ' : AlbumTag , ' scraper ' : DoubanAlbumScraper } )
if self . task . sync_book :
for sheet_name in [ ' 想读 ' , ' 在读 ' , ' 读过 ' ] :
mappings . append ( { ' sheet ' : sheet_name , ' mark_class ' : BookMark ,
' entity_class ' : Book , ' tag_class ' : BookTag , ' scraper ' : DoubanBookScraper } )
if self . task . sync_game :
for sheet_name in [ ' 想玩 ' , ' 在玩 ' , ' 玩过 ' ] :
mappings . append ( { ' sheet ' : sheet_name , ' mark_class ' : GameMark ,
' entity_class ' : Game , ' tag_class ' : GameTag , ' scraper ' : DoubanGameScraper } )
2021-06-14 22:18:39 +02:00
2021-08-29 17:52:42 +02:00
mappings . sort ( key = lambda mapping : mapping [ ' sheet ' ] )
if not self . __is_new_task :
start_index = [ mapping [ ' sheet ' ]
for mapping in mappings ] . index ( self . __progress_sheet )
mappings = mappings [ start_index : ]
self . __mappings = mappings
return mappings
def __parse_items ( self ) :
assert self . __wb is not None , ' workbook not found '
item_classes_mappings = self . __get_item_classes_mapping ( )
is_first_sheet = True
for mapping in item_classes_mappings :
2022-05-05 22:06:56 -04:00
if mapping [ ' sheet ' ] not in self . __wb :
print ( f " Sheet not found: { mapping [ ' sheet ' ] } " )
continue
2021-08-29 17:52:42 +02:00
ws = self . __wb [ mapping [ ' sheet ' ] ]
2021-10-11 22:05:30 -04:00
max_row = ws . max_row
2021-08-29 17:52:42 +02:00
# empty sheet
2021-10-11 22:05:30 -04:00
if max_row < = 1 :
2021-08-29 17:52:42 +02:00
continue
# decide starting position
start_row_index = 2
if not self . __is_new_task and is_first_sheet :
start_row_index = self . __progress_row
# parse data
2021-10-11 22:05:30 -04:00
i = start_row_index
for row in ws . iter_rows ( min_row = start_row_index , max_row = max_row , values_only = True ) :
cells = [ cell for cell in row ]
url = cells [ self . URL_INDEX - 1 ]
tags = cells [ self . TAG_INDEX - 1 ]
2021-12-12 18:15:42 -05:00
tags = list ( set ( tags . split ( ' , ' ) ) ) if tags else None
2021-10-11 22:05:30 -04:00
time = cells [ self . TIME_INDEX - 1 ]
2021-08-29 17:52:42 +02:00
if time :
time = datetime . strptime ( time , " % Y- % m- %d % H: % M: % S " )
tz = pytz . timezone ( ' Asia/Shanghai ' )
time = time . replace ( tzinfo = tz )
else :
time = None
2021-10-11 22:05:30 -04:00
content = cells [ self . CONTENT_INDEX - 1 ]
2021-08-29 17:52:42 +02:00
if not content :
content = " "
2021-10-11 22:05:30 -04:00
rating = cells [ self . RATING_INDEX - 1 ]
2021-08-29 17:52:42 +02:00
rating = int ( rating ) * 2 if rating else None
self . items . append ( {
' data ' : DoufenRowData ( url , tags , time , content , rating ) ,
' entity_class ' : mapping [ ' entity_class ' ] ,
' mark_class ' : mapping [ ' mark_class ' ] ,
' tag_class ' : mapping [ ' tag_class ' ] ,
' scraper ' : mapping [ ' scraper ' ] ,
' sheet ' : mapping [ ' sheet ' ] ,
' row_index ' : i ,
} )
2021-10-11 22:05:30 -04:00
i = i + 1
2021-08-29 17:52:42 +02:00
# set first sheet flag
is_first_sheet = False
def __get_item_number ( self ) :
assert not self . __wb is None , ' workbook not found '
assert not self . __mappings is None , ' mappings not found '
sheets = [ mapping [ ' sheet ' ] for mapping in self . __mappings ]
item_number = 0
for sheet in sheets :
2022-05-05 22:06:56 -04:00
if sheet in self . __wb :
item_number + = self . __wb [ sheet ] . max_row - 1
2021-08-29 17:52:42 +02:00
return item_number
def __update_total_items ( self ) :
total = self . __get_item_number ( )
self . task . total_items = total
self . task . save ( update_fields = [ " total_items " ] )
def parse ( self ) :
try :
self . __open_file ( )
self . __parse_items ( )
if self . __is_new_task :
self . __update_total_items ( )
self . __close_file ( )
return self . items
except Exception as e :
2021-10-11 22:05:30 -04:00
logger . error ( f ' Error parsing { self . __file_path } { e } ' )
self . task . is_failed = True
2021-08-29 17:52:42 +02:00
finally :
self . __close_file ( )
2021-10-11 22:05:30 -04:00
return [ ]
2021-08-29 17:52:42 +02:00
@dataclass
class DoufenRowData :
url : str
tags : list
time : datetime
content : str
rating : int
2021-12-20 22:59:32 -05:00
def add_new_mark ( data , user , entity , entity_class , mark_class , tag_class , sheet , default_public ) :
2021-08-29 17:52:42 +02:00
params = {
' owner ' : user ,
' created_time ' : data . time ,
' edited_time ' : data . time ,
' rating ' : data . rating ,
' text ' : data . content ,
' status ' : translate_status ( sheet ) ,
2021-12-20 22:59:32 -05:00
' visibility ' : 0 if default_public else 1 ,
2021-08-29 17:52:42 +02:00
entity_class . __name__ . lower ( ) : entity ,
}
mark = mark_class . objects . create ( * * params )
entity . update_rating ( None , data . rating )
if data . tags :
for tag in data . tags :
params = {
' content ' : tag ,
entity_class . __name__ . lower ( ) : entity ,
' mark ' : mark
}
2021-12-12 18:15:42 -05:00
try :
tag_class . objects . create ( * * params )
except Exception as e :
logger . error ( f ' Error creating tag { tag } { mark } : { e } ' )
2021-08-29 17:52:42 +02:00
def overwrite_mark ( entity , entity_class , mark , mark_class , tag_class , data , sheet ) :
old_rating = mark . rating
2021-12-12 18:15:42 -05:00
old_tags = getattr ( mark , mark_class . __name__ . lower ( ) + ' _tags ' ) . all ( )
2021-08-29 17:52:42 +02:00
# update mark logic
mark . created_time = data . time
mark . edited_time = data . time
mark . text = data . content
mark . rating = data . rating
mark . status = translate_status ( sheet )
mark . save ( )
entity . update_rating ( old_rating , data . rating )
if old_tags :
for tag in old_tags :
tag . delete ( )
if data . tags :
for tag in data . tags :
params = {
' content ' : tag ,
entity_class . __name__ . lower ( ) : entity ,
' mark ' : mark
}
2021-12-12 18:15:42 -05:00
try :
tag_class . objects . create ( * * params )
except Exception as e :
logger . error ( f ' Error creating tag { tag } { mark } : { e } ' )
2021-08-29 17:52:42 +02:00
def sync_doufen_job ( task , stop_check_func ) :
2021-09-01 11:41:21 +02:00
"""
TODO : Update task status every certain amount of items to reduce IO consumption
"""
2021-09-15 11:21:13 +02:00
task = SyncTask . objects . get ( pk = task . pk )
if task . is_finished :
return
2021-10-11 22:05:30 -04:00
print ( f ' Task { task . pk } : loading ' )
2021-08-29 17:52:42 +02:00
parser = DoufenParser ( task )
items = parser . parse ( )
# use pop to reduce memo consumption
while len ( items ) > 0 and not stop_check_func ( ) :
item = items . pop ( 0 )
data = item [ ' data ' ]
entity_class = item [ ' entity_class ' ]
mark_class = item [ ' mark_class ' ]
tag_class = item [ ' tag_class ' ]
scraper = item [ ' scraper ' ]
sheet = item [ ' sheet ' ]
row_index = item [ ' row_index ' ]
# update progress
task . set_breakpoint ( sheet , row_index , save = True )
# scrape the entity if not exists
try :
entity = entity_class . objects . get ( source_url = data . url )
2021-10-11 22:05:30 -04:00
print ( f ' Task { task . pk } : { len ( items ) + 1 } remaining; matched { data . url } ' )
2021-08-29 17:52:42 +02:00
except ObjectDoesNotExist :
try :
2021-10-11 22:05:30 -04:00
print ( f ' Task { task . pk } : { len ( items ) + 1 } remaining; scraping { data . url } ' )
2021-08-29 17:52:42 +02:00
scraper . scrape ( data . url )
form = scraper . save ( request_user = task . user )
entity = form . instance
except Exception as e :
2021-10-11 22:05:30 -04:00
logger . error ( f " Task { task . pk } : scrape failed: { data . url } { e } " )
if settings . DEBUG :
logger . error ( " Expections during scraping data: " , exc_info = e )
2021-08-29 17:52:42 +02:00
task . failed_urls . append ( data . url )
2021-09-01 12:08:04 +02:00
task . finished_items + = 1
task . save ( update_fields = [ ' failed_urls ' , ' finished_items ' ] )
2021-08-29 17:52:42 +02:00
continue
# sync mark
try :
# already exists
params = {
' owner ' : task . user ,
entity_class . __name__ . lower ( ) : entity
}
mark = mark_class . objects . get ( * * params )
if task . overwrite :
overwrite_mark ( entity , entity_class , mark ,
mark_class , tag_class , data , sheet )
else :
2021-09-01 12:08:04 +02:00
task . success_items + = 1
task . finished_items + = 1
task . save ( update_fields = [ ' success_items ' , ' finished_items ' ] )
2021-08-29 17:52:42 +02:00
continue
except ObjectDoesNotExist :
add_new_mark ( data , task . user , entity , entity_class ,
mark_class , tag_class , sheet , task . default_public )
except Exception as e :
logger . error (
2021-10-11 22:05:30 -04:00
f " Task { task . pk } : error when syncing marks " , exc_info = e )
2021-08-29 17:52:42 +02:00
task . failed_urls . append ( data . url )
2021-09-01 12:08:04 +02:00
task . finished_items + = 1
task . save ( update_fields = [ ' failed_urls ' , ' finished_items ' ] )
2021-08-29 17:52:42 +02:00
continue
2021-09-01 11:41:21 +02:00
2021-08-29 17:52:42 +02:00
task . success_items + = 1
2021-09-01 12:08:04 +02:00
task . finished_items + = 1
task . save ( update_fields = [ ' success_items ' , ' finished_items ' ] )
2021-08-29 17:52:42 +02:00
# if task finish
2021-10-11 22:05:30 -04:00
print ( f ' Task { task . pk } : stopping ' )
2021-08-29 17:52:42 +02:00
if len ( items ) == 0 :
task . is_finished = True
2021-09-01 11:41:21 +02:00
task . clear_breakpoint ( )
2021-08-29 17:52:42 +02:00
task . save ( update_fields = [ ' is_finished ' , ' break_point ' ] )
2021-06-14 22:18:39 +02:00
def translate_status ( sheet_name ) :
if ' 想 ' in sheet_name :
return MarkStatusEnum . WISH
elif ' 在 ' in sheet_name :
return MarkStatusEnum . DO
elif ' 过 ' in sheet_name :
return MarkStatusEnum . COLLECT
2021-08-29 17:52:42 +02:00
2021-06-14 22:18:39 +02:00
raise ValueError ( " Not valid status " )