lint fix site and import

This commit is contained in:
Your Name 2023-08-11 11:55:42 -04:00 committed by Henri Dickson
parent bcd35f3526
commit 4649a109bd
16 changed files with 151 additions and 122 deletions

View file

@ -98,6 +98,10 @@ class AbstractSite:
def scrape_additional_data(self):
pass
@staticmethod
def query_str(content, query: str) -> str:
return content.xpath(query)[0].strip()
@classmethod
def get_model_for_resource(cls, resource):
model = resource.get_preferred_model()

View file

@ -57,7 +57,8 @@ class AppleMusic(AbstractSite):
if content is None:
raise ParseError(self, f"localized content for {self.url}")
elem = content.xpath("//script[@id='serialized-server-data']/text()")
page_data = json.loads(elem[0])[0]
txt: str = elem[0] # type:ignore
page_data = json.loads(txt)[0]
album_data = page_data["data"]["sections"][0]["items"][0]
title = album_data["title"]
brief = album_data.get("modalPresentationDescriptor")
@ -67,11 +68,11 @@ class AppleMusic(AbstractSite):
track_data = page_data["data"]["seoData"]
date_elem = track_data.get("musicReleaseDate")
release_datetime = dateparser.parse(date_elem.strip()) if date_elem else None
release_date = (
dateparser.parse(date_elem.strip()).strftime("%Y-%m-%d")
if date_elem
else None
release_datetime.strftime("%Y-%m-%d") if release_datetime else None
)
track_list = [
f"{i}. {track['attributes']['name']}"
for i, track in enumerate(track_data["ogSongs"], 1)
@ -87,7 +88,10 @@ class AppleMusic(AbstractSite):
genre[0]
] # apple treat "Music" as a genre. Thus, only the first genre is obtained.
image_elem = content.xpath("//source[@type='image/jpeg']/@srcset")[0]
images = (
content.xpath("//source[@type='image/jpeg']/@srcset") if content else []
)
image_elem: str = images[0] if images else "" # type:ignore
image_url = image_elem.split(" ")[0] if image_elem else None
pd = ResourceContent(

View file

@ -4,6 +4,7 @@ import re
import urllib.parse
import dateparser
import dns.resolver
from catalog.common import *
from catalog.models import *
@ -32,14 +33,14 @@ class Bandcamp(AbstractSite):
hostname = parsed_url.netloc
try:
answers = dns.resolver.query(hostname, "CNAME")
for rdata in answers:
for rdata in answers: # type:ignore
if str(rdata.target) == "dom.bandcamp.com.":
return True
except Exception:
pass
try:
answers = dns.resolver.query(hostname, "A")
for rdata in answers:
for rdata in answers: # type:ignore
if str(rdata.address) == "35.241.62.186":
return True
except Exception:
@ -48,32 +49,36 @@ class Bandcamp(AbstractSite):
def scrape(self):
content = BasicDownloader(self.url).download().html()
try:
title = content.xpath("//h2[@class='trackTitle']/text()")[0].strip()
title = self.query_str(content, "//h2[@class='trackTitle']/text()")
artist = [
content.xpath("//div[@id='name-section']/h3/span/a/text()")[0].strip()
self.query_str(content, "//div[@id='name-section']/h3/span/a/text()")
]
except IndexError:
raise ValueError("given url contains no valid info")
genre = [] # TODO: parse tags
track_list = ""
release_nodes = content.xpath(
"//div[@class='tralbumData tralbum-credits']/text()"
try:
release_str = re.sub(
r"releas\w+ ",
"",
self.query_str(
content, "//div[@class='tralbumData tralbum-credits']/text()"
),
)
release_datetime = dateparser.parse(release_str) if release_str else None
release_date = (
dateparser.parse(
re.sub(r"releas\w+ ", "", release_nodes[0].strip())
).strftime("%Y-%m-%d")
if release_nodes
else None
release_datetime.strftime("%Y-%m-%d") if release_datetime else None
)
except:
release_date = None
duration = None
company = None
brief_nodes = content.xpath("//div[@class='tralbumData tralbum-about']/text()")
brief = "".join(brief_nodes) if brief_nodes else None
cover_url = content.xpath("//div[@id='tralbumArt']/a/@href")[0].strip()
brief = "".join(brief_nodes) if brief_nodes else None # type:ignore
cover_url = self.query_str(content, "//div[@id='tralbumArt']/a/@href")
bandcamp_page_data = json.loads(
content.xpath("//meta[@name='bc-page-properties']/@content")[0].strip()
self.query_str(content, "//meta[@name='bc-page-properties']/@content")
)
bandcamp_album_id = bandcamp_page_data["item_id"]

View file

@ -17,7 +17,7 @@ class Bangumi(AbstractSite):
DEFAULT_MODEL = None
@classmethod
def id_to_url(self, id_value):
def id_to_url(cls, id_value):
return f"https://bgm.tv/subject/{id_value}"
def scrape(self):

View file

@ -29,7 +29,7 @@ class BooksTW(AbstractSite):
isbn_elem = content.xpath(
"//div[@class='bd']/ul/li[starts-with(text(),'ISBN')]/text()"
)
isbn = isbn_elem[0].strip().split("", 1)[1].strip() if isbn_elem else None
isbn = isbn_elem[0].strip().split("", 1)[1].strip() if isbn_elem else None # type: ignore
# isbn_elem = content.xpath(
# "//div[@class='bd']/ul/li[starts-with(text(),'EISBN')]/text()"
@ -43,26 +43,26 @@ class BooksTW(AbstractSite):
orig_title = content.xpath("string(//h1/following-sibling::h2)")
authors = content.xpath("string(//div/ul/li[contains(text(),'作者:')])")
authors = authors.strip().split("", 1)[1].split(",") if authors else []
authors = authors.strip().split("", 1)[1].split(",") if authors else [] # type: ignore
if not authors:
authors = [content.xpath("string(//div/ul/li[contains(.,'作者:')]/a)")]
authors = [s.strip() for s in authors]
authors = [s.strip() for s in authors] # type: ignore
# author_orig = content.xpath("string(//div/ul/li[contains(text(),'原文作者:')])")
translators = content.xpath("string(//div/ul/li[contains(text(),'譯者:')])")
translators = (
translators.strip().split("", 1)[1].split(",") if translators else []
translators.strip().split("", 1)[1].split(",") if translators else [] # type: ignore
)
translators = [s.strip() for s in translators]
language_elem = content.xpath("//div/ul/li[starts-with(text(),'語言:')]/text()")
language = (
language_elem[0].strip().split("")[1].strip() if language_elem else None
language_elem[0].strip().split("")[1].strip() if language_elem else None # type: ignore
)
pub_house = content.xpath("string(//div/ul/li[contains(text(),'出版社:')])")
pub_house = (
pub_house.strip().split("", 1)[1].strip().split(" ", 1)[0]
pub_house.strip().split("", 1)[1].strip().split(" ", 1)[0] # type: ignore
if pub_house
else None
)
@ -70,7 +70,7 @@ class BooksTW(AbstractSite):
pub_date = content.xpath("string(//div/ul/li[contains(text(),'出版日期:')])")
pub_date = re.match(
r"(\d+)/(\d+)/(\d+)\s*$",
pub_date.strip().split("", 1)[1].strip().split(" ", 1)[0]
pub_date.strip().split("", 1)[1].strip().split(" ", 1)[0] # type: ignore
if pub_date
else "",
)
@ -82,10 +82,10 @@ class BooksTW(AbstractSite):
pub_month = None
spec = content.xpath("string(//div/ul/li[contains(text(),'規格:')])")
spec = spec.strip().split("", 1)[1].strip().split("/") if spec else []
spec = spec.strip().split("", 1)[1].strip().split("/") if spec else [] # type: ignore
if len(spec) > 1:
binding = spec[0].strip()
pages = spec[1].strip().split("")
pages = str(spec[1].strip()).split("")
pages = int(pages[0]) if len(pages) > 1 else None
if pages and (pages > 999999 or pages < 1):
pages = None
@ -95,7 +95,7 @@ class BooksTW(AbstractSite):
price = content.xpath("string(//div/ul/li[contains(text(),'定價:')])")
price = (
price.strip().split("", 1)[1].split("")[0].strip() + " NTD"
price.strip().split("", 1)[1].split("")[0].strip() + " NTD" # type: ignore
if price
else None
)
@ -111,7 +111,7 @@ class BooksTW(AbstractSite):
img_url = content.xpath(
"string(//div[contains(@class,'cover_img')]//img[contains(@class,'cover')]/@src)"
)
img_url = re.sub(r"&[wh]=\d+", "", img_url) if img_url else None
img_url = re.sub(r"&[wh]=\d+", "", img_url) if img_url else None # type: ignore
data = {
"title": title,

View file

@ -53,8 +53,7 @@ class Goodreads(AbstractSite):
h = dl.download().html()
# Next.JS version of GoodReads
# JSON.parse(document.getElementById('__NEXT_DATA__').innerHTML)['props']['pageProps']['apolloState']
elem = h.xpath('//script[@id="__NEXT_DATA__"]/text()')
src = elem[0].strip() if elem else None
src = self.query_str(h, '//script[@id="__NEXT_DATA__"]/text()')
if not src:
raise ParseError(self, "__NEXT_DATA__ element")
d = json.loads(src)["props"]["pageProps"]["apolloState"]
@ -134,16 +133,14 @@ class Goodreads_Work(AbstractSite):
def scrape(self, response=None):
content = BasicDownloader(self.url).download().html()
title_elem = content.xpath("//h1/a/text()")
title = title_elem[0].strip() if title_elem else None
title = self.query_str(content, "//h1/a/text()")
if not title:
raise ParseError(self, "title")
author_elem = content.xpath("//h2/a/text()")
author = author_elem[0].strip() if author_elem else None
first_published_elem = content.xpath("//h2/span/text()")
first_published = (
first_published_elem[0].strip() if first_published_elem else None
)
author = self.query_str(content, "//h2/a/text()")
try:
first_published = self.query_str(content, "//h2/span/text()")
except:
first_published = None
pd = ResourceContent(
metadata={
"title": title,

View file

@ -20,7 +20,7 @@ class GoogleBooks(AbstractSite):
DEFAULT_MODEL = Edition
@classmethod
def id_to_url(self, id_value):
def id_to_url(cls, id_value):
return "https://books.google.com/books?id=" + id_value
def scrape(self):

View file

@ -61,7 +61,7 @@ class IGDB(AbstractSite):
if get_mock_mode():
r = BasicDownloader(key).download().json()
else:
r = json.loads(_wrapper.api_request(p, q))
r = json.loads(_wrapper.api_request(p, q)) # type: ignore
if settings.DOWNLOADER_SAVEDIR:
with open(
settings.DOWNLOADER_SAVEDIR + "/" + get_mock_file(key),

View file

@ -57,11 +57,16 @@ class IMDB(AbstractSite):
season_number = res_data["tv_episode_results"][0]["season_number"]
episode_number = res_data["tv_episode_results"][0]["episode_number"]
url = f"https://www.themoviedb.org/tv/{tv_id}/season/{season_number}/episode/{episode_number}"
pd = None
if url:
tmdb = SiteManager.get_site_by_url(url)
if tmdb:
pd = tmdb.scrape()
pd.metadata["preferred_model"] = tmdb.DEFAULT_MODEL.__name__
pd.metadata["required_resources"] = [] # do not auto fetch parent season
pd.metadata["preferred_model"] = (
tmdb.DEFAULT_MODEL.__name__ if tmdb.DEFAULT_MODEL else None
)
# do not auto fetch parent season
pd.metadata["required_resources"] = []
if not pd:
# if IMDB id not found in TMDB, use real IMDB scraper
pd = self.scrape_imdb()
@ -69,8 +74,7 @@ class IMDB(AbstractSite):
def scrape_imdb(self):
h = BasicDownloader(self.url).download().html()
elem = h.xpath('//script[@id="__NEXT_DATA__"]/text()')
src = elem[0].strip() if elem else None
src = self.query_str(h, '//script[@id="__NEXT_DATA__"]/text()')
if not src:
raise ParseError(self, "__NEXT_DATA__ element")
d = json.loads(src)["props"]["pageProps"]["aboveTheFoldData"]
@ -120,15 +124,14 @@ class IMDB(AbstractSite):
def get_episode_list(show_id, season_id):
url = f"https://m.imdb.com/title/{show_id}/"
h = BasicDownloader(url).download().html()
show_url = "".join(
h.xpath('//a[@data-testid="hero-title-block__series-link"]/@href')
).split("?")[0]
u: str = h.xpath('//a[@data-testid="hero-title-block__series-link"]/@href') # type: ignore
show_url = "".join(u).split("?")[0]
if not show_url:
show_url = f"/title/{show_id}/"
url = f"https://m.imdb.com{show_url}episodes/?season={season_id}"
h = BasicDownloader(url).download().html()
episodes = []
for e in h.xpath('//div[@id="eplist"]/div/a'):
for e in h.xpath('//div[@id="eplist"]/div/a'): # type: ignore
episode_number = e.xpath(
'./span[contains(@class,"episode-list__title")]/text()'
)[0].strip()
@ -166,7 +169,10 @@ class IMDB(AbstractSite):
).first()
if not episode:
site = SiteManager.get_site_by_url(e["url"])
episode = site.get_resource_ready().item
if site:
res = site.get_resource_ready()
if res and res.item:
episode = res.item
episode.set_parent_item(season)
episode.save()
else:

View file

@ -68,8 +68,8 @@ class Spotify(AbstractSite):
else:
track_list.append(str(track["track_number"]) + ". " + track["name"])
track_list = "\n".join(track_list)
release_date = dateparser.parse(res_data["release_date"]).strftime("%Y-%m-%d")
dt = dateparser.parse(res_data["release_date"])
release_date = dt.strftime("%Y-%m-%d") if dt else None
gtin = None
if res_data["external_ids"].get("upc"):

View file

@ -31,30 +31,29 @@ class Steam(AbstractSite):
headers["Cookie"] = "wants_mature_content=1; birthtime=754700401;"
content = BasicDownloader(self.url, headers=headers).download().html()
title = content.xpath("//div[@class='apphub_AppName']/text()")[0]
title = self.query_str(content, "//div[@class='apphub_AppName']/text()")
developer = content.xpath("//div[@id='developers_list']/a/text()")
publisher = content.xpath(
"//div[@class='glance_ctn']//div[@class='dev_row'][2]//a/text()"
)
dt = content.xpath("//div[@class='release_date']/div[@class='date']/text()")
release_date = (
dateparser.parse(dt[0].replace(" ", "")).strftime("%Y-%m-%d")
if dt
else None
dts = self.query_str(
content, "//div[@class='release_date']/div[@class='date']/text()"
)
dt = dateparser.parse(dts.replace(" ", "")) if dts else None
release_date = dt.strftime("%Y-%m-%d") if dt else None
genre = content.xpath(
"//div[@class='details_block']/b[2]/following-sibling::a/text()"
)
platform = ["PC"]
brief = content.xpath("//div[@class='game_description_snippet']/text()")[
0
].strip()
brief = self.query_str(
content, "//div[@class='game_description_snippet']/text()"
)
# try Steam images if no image from IGDB
if pd.cover_image is None:
pd.metadata["cover_image_url"] = content.xpath(
"//img[@class='game_header_image_full']/@src"
)[0].replace("header.jpg", "library_600x900.jpg")
pd.metadata["cover_image_url"] = self.query_str(
content, "//img[@class='game_header_image_full']/@src"
).replace("header.jpg", "library_600x900.jpg")
(
pd.cover_image,
pd.cover_image_extention,
@ -62,9 +61,9 @@ class Steam(AbstractSite):
pd.metadata["cover_image_url"], self.url
)
if pd.cover_image is None:
pd.metadata["cover_image_url"] = content.xpath(
"//img[@class='game_header_image_full']/@src"
)[0]
pd.metadata["cover_image_url"] = self.query_str(
content, "//img[@class='game_header_image_full']/@src"
)
(
pd.cover_image,
pd.cover_image_extention,

View file

@ -44,7 +44,7 @@ class TMDB_Movie(AbstractSite):
DEFAULT_MODEL = Movie
@classmethod
def id_to_url(self, id_value):
def id_to_url(cls, id_value):
return f"https://www.themoviedb.org/movie/{id_value}"
def scrape(self):
@ -178,7 +178,7 @@ class TMDB_TV(AbstractSite):
DEFAULT_MODEL = TVShow
@classmethod
def id_to_url(self, id_value):
def id_to_url(cls, id_value):
return f"https://www.themoviedb.org/tv/{id_value}"
def scrape(self):
@ -338,6 +338,8 @@ class TMDB_TVSeason(AbstractSite):
return f"https://www.themoviedb.org/tv/{v[0]}/season/{v[1]}"
def scrape(self):
if not self.id_value:
raise ParseError(self, "id_value")
v = self.id_value.split("-")
show_id = v[0]
season_id = v[1]
@ -346,7 +348,7 @@ class TMDB_TVSeason(AbstractSite):
api_url = f"https://api.themoviedb.org/3/tv/{show_id}/season/{season_id}?api_key={settings.TMDB_API3_KEY}&language=zh-CN&append_to_response=external_ids,credits"
d = BasicDownloader(api_url).download().json()
if not d.get("id"):
raise ParseError("id")
raise ParseError(self, "id")
pd = ResourceContent(
metadata=_copy_dict(
d,
@ -401,9 +403,11 @@ class TMDB_TVSeason(AbstractSite):
# this should not happen
_logger.warning("Unexpected IMDB id for TMDB tv season")
elif pd.metadata.get("season_number") == 1:
res = SiteManager.get_site_by_url(
site = SiteManager.get_site_by_url(
f"https://www.themoviedb.org/tv/{show_id}"
).get_resource_ready()
)
if site:
res = site.get_resource_ready()
pd.lookup_ids[IdType.IMDB] = (
res.other_lookup_ids.get(IdType.IMDB) if res else None
)
@ -416,7 +420,7 @@ class TMDB_TVSeason(AbstractSite):
api_url2 = f"https://api.themoviedb.org/3/tv/{v[0]}/season/{v[1]}/episode/{ep}?api_key={settings.TMDB_API3_KEY}&language=zh-CN&append_to_response=external_ids,credits"
d2 = BasicDownloader(api_url2).download().json()
if not d2.get("id"):
raise ParseError("first episode id for season")
raise ParseError(self, "first episode id for season")
pd.lookup_ids[IdType.IMDB] = d2["external_ids"].get("imdb_id")
return pd
@ -445,6 +449,8 @@ class TMDB_TVEpisode(AbstractSite):
return f"https://www.themoviedb.org/tv/{v[0]}/season/{v[1]}/episode/{v[2]}"
def scrape(self):
if not self.id_value:
raise ParseError(self, "id_value")
v = self.id_value.split("-")
show_id = v[0]
season_id = v[1]
@ -454,7 +460,7 @@ class TMDB_TVEpisode(AbstractSite):
api_url = f"https://api.themoviedb.org/3/tv/{show_id}/season/{season_id}/episode/{episode_id}?api_key={settings.TMDB_API3_KEY}&language=zh-CN&append_to_response=external_ids,credits"
d = BasicDownloader(api_url).download().json()
if not d.get("id"):
raise ParseError("id")
raise ParseError(self, "id")
pd = ResourceContent(
metadata=_copy_dict(
d,

View file

@ -60,7 +60,11 @@
<td>{{ value.1|default:"-" }}</td>
</tr>
{% empty %}
<p>No data.</p>
<tr>
<td>-</td>
<td></td>
<td></td>
</tr>
{% endfor %}
{% empty %}
<p>No history for this item has been logged yet.</p>

View file

@ -8,6 +8,7 @@ import openpyxl
import pytz
from auditlog.context import set_actor
from django.conf import settings
from loguru import logger
from markdownify import markdownify as md
from user_messages import api as msg
@ -18,28 +19,27 @@ from catalog.sites.douban import DoubanDownloader
from common.utils import GenerateDateUUIDMediaFilePath
from journal.models import *
_logger = logging.getLogger(__name__)
_tz_sh = pytz.timezone("Asia/Shanghai")
def _fetch_remote_image(url):
try:
print(f"fetching remote image {url}")
logger.info(f"fetching remote image {url}")
imgdl = ProxiedImageDownloader(url)
raw_img = imgdl.download().content
ext = imgdl.extention
f = GenerateDateUUIDMediaFilePath(
None, "x." + ext, settings.MARKDOWNX_MEDIA_PATH
None, f"x.{ext}", settings.MARKDOWNX_MEDIA_PATH
)
file = settings.MEDIA_ROOT + f
local_url = settings.MEDIA_URL + f
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, "wb") as binary_file:
binary_file.write(raw_img)
# print(f'remote image saved as {local_url}')
# logger.info(f'remote image saved as {local_url}')
return local_url
except Exception:
print(f"unable to fetch remote image {url}")
logger.error(f"unable to fetch remote image {url}")
return url
@ -49,10 +49,9 @@ class DoubanImporter:
skipped = 0
imported = 0
failed = []
user = None
visibility = 0
mode = 0
file = None
file = ""
def __init__(self, user, visibility, mode):
self.user = user
@ -149,7 +148,7 @@ class DoubanImporter:
for name in config:
data[name] = []
if name in wb:
print(f"{self.user} parsing {name}")
logger.info(f"{self.user} parsing {name}")
for row in wb[name].iter_rows(min_row=2, values_only=True):
cells = [cell for cell in row]
if len(cells) > 6 and cells[0]:
@ -189,12 +188,12 @@ class DoubanImporter:
# return cells[3]
def import_from_file_task(self):
print(f"{self.user} import start")
logger.info(f"{self.user} import start")
msg.info(self.user, f"开始导入豆瓣标记和评论")
self.update_user_import_status(1)
with set_actor(self.user):
self.load_sheets()
print(f"{self.user} sheet loaded, {self.total} lines total")
logger.info(f"{self.user} sheet loaded, {self.total} lines total")
self.update_user_import_status(1)
for name, param in self.mark_sheet_config.items():
self.import_mark_sheet(self.mark_data[name], param[0], name)
@ -211,7 +210,7 @@ class DoubanImporter:
def import_mark_sheet(self, worksheet, shelf_type, sheet_name):
prefix = f"{self.user} {sheet_name}|"
if worksheet is None: # or worksheet.max_row < 2:
print(f"{prefix} empty sheet")
logger.warning(f"{prefix} empty sheet")
return
for cells in worksheet:
if len(cells) < 6:
@ -244,7 +243,7 @@ class DoubanImporter:
"""
item = self.get_item_by_url(url)
if not item:
print(f"{self.user} | match/fetch {url} failed")
logger.warning(f"{self.user} | match/fetch {url} failed")
return
mark = Mark(self.user, item)
if self.mode == 0 and (
@ -268,7 +267,7 @@ class DoubanImporter:
def import_review_sheet(self, worksheet, sheet_name):
prefix = f"{self.user} {sheet_name}|"
if worksheet is None: # or worksheet.max_row < 2:
print(f"{prefix} empty sheet")
logger.warning(f"{prefix} empty sheet")
return
for cells in worksheet:
if len(cells) < 6:
@ -307,17 +306,18 @@ class DoubanImporter:
item = None
try:
site = SiteManager.get_site_by_url(url)
if not site:
raise ValueError(f"URL unrecognized {url}")
item = site.get_item()
if not item:
print(f"fetching {url}")
logger.info(f"fetching {url}")
site.get_resource_ready()
item = site.get_item()
else:
# print(f"matched {url}")
# logger.info(f"matched {url}")
print(".", end="", flush=True)
except Exception as e:
print(f"fetching exception: {url} {e}")
_logger.error(f"scrape failed: {url}", exc_info=e)
logger.error(f"fetching exception: {url} {e}")
if item is None:
self.failed.append(url)
return item
@ -329,23 +329,24 @@ class DoubanImporter:
prefix = f"{self.user} |"
url = self.guess_entity_url(entity_title, rating, time)
if url is None:
print(f"{prefix} fetching review {review_url}")
logger.info(f"{prefix} fetching review {review_url}")
try:
h = DoubanDownloader(review_url).download().html()
for u in h.xpath("//header[@class='main-hd']/a/@href"):
urls = h.xpath("//header[@class='main-hd']/a/@href")
for u in urls: # type:ignore
if ".douban.com/subject/" in u:
url = u
if not url:
print(
logger.warning(
f"{prefix} fetching error {review_url} unable to locate entity url"
)
return
except Exception:
print(f"{prefix} fetching review exception {review_url}")
logger.error(f"{prefix} fetching review exception {review_url}")
return
item = self.get_item_by_url(url)
if not item:
print(f"{prefix} match/fetch {url} failed")
logger.warning(f"{prefix} match/fetch {url} failed")
return
if (
self.mode == 1

View file

@ -3,6 +3,7 @@ from datetime import datetime
import django_rq
from auditlog.context import set_actor
from django.utils import timezone
from django.utils.timezone import make_aware
from user_messages import api as msg
@ -45,12 +46,12 @@ class GoodreadsImporter:
total = 0
visibility = user.preference.default_visibility
with set_actor(user):
if match_list or match_shelf:
shelf = (
cls.parse_shelf(match_shelf[0], user)
if match_shelf
else cls.parse_list(match_list[0], user)
)
shelf = None
if match_shelf:
shelf = cls.parse_shelf(match_shelf[0], user)
elif match_list:
shelf = cls.parse_list(match_list[0], user)
if shelf:
if shelf["title"] and shelf["books"]:
collection = Collection.objects.create(
title=shelf["title"],
@ -119,7 +120,7 @@ class GoodreadsImporter:
@classmethod
def parse_shelf(cls, url, user):
# return {'title': 'abc', books: [{'book': obj, 'rating': 10, 'review': 'txt'}, ...]}
title = None
title = ""
books = []
url_shelf = url + "&view=table"
while url_shelf:
@ -205,7 +206,7 @@ class GoodreadsImporter:
pass # likely just download error
next_elem = content.xpath("//a[@class='next_page']/@href")
url_shelf = (
f"https://www.goodreads.com{next_elem[0].strip()}"
f"https://www.goodreads.com{next_elem[0].strip()}" # type:ignore
if next_elem
else None
)
@ -214,8 +215,8 @@ class GoodreadsImporter:
@classmethod
def parse_list(cls, url, user):
# return {'title': 'abc', books: [{'book': obj, 'rating': 10, 'review': 'txt'}, ...]}
title = None
description = None
title = ""
description = ""
books = []
url_shelf = url
while url_shelf:
@ -225,10 +226,12 @@ class GoodreadsImporter:
if not title_elem:
print(f"List parsing error {url_shelf}")
break
title = title_elem[0].strip()
description = content.xpath('//div[@class="mediumText"]/text()')[0].strip()
title: str = title_elem[0].strip() # type:ignore
desc_elem = content.xpath('//div[@class="mediumText"]/text()')
description: str = desc_elem[0].strip() # type:ignore
print("List title: " + title)
for link in content.xpath('//a[@class="bookTitle"]/@href'):
links = content.xpath('//a[@class="bookTitle"]/@href')
for link in links: # type:ignore
url_book = "https://www.goodreads.com" + link
try:
book = cls.get_book(url_book, user)
@ -244,7 +247,7 @@ class GoodreadsImporter:
pass # likely just download error
next_elem = content.xpath("//a[@class='next_page']/@href")
url_shelf = (
("https://www.goodreads.com" + next_elem[0].strip())
f"https://www.goodreads.com{next_elem[0].strip()}" # type:ignore
if next_elem
else None
)

View file

@ -1,5 +1,5 @@
[tool.pyright]
exclude = [ "media", ".venv", ".git", "playground", "**/tests.py", "neodb", "**/migrations", "**/commands", "**/importers", "**/sites" ]
exclude = [ "media", ".venv", ".git", "playground", "**/tests.py", "neodb", "**/migrations", "**/commands", "**/sites/douban_*" ]
[tool.djlint]
ignore="T002,T003,H006,H019,H020,H021,H023,H030,H031"