lib.itmens/journal/importers/letterboxd.py

191 lines
7.1 KiB
Python
Raw Normal View History

2024-01-07 22:39:30 -05:00
import csv
import tempfile
import zipfile
from datetime import timedelta
from random import randint
2024-01-07 22:39:30 -05:00
2024-01-10 22:20:57 -05:00
import pytz
2024-01-07 22:39:30 -05:00
from django.utils.dateparse import parse_datetime
2024-06-07 22:29:10 -04:00
from django.utils.translation import gettext as _
2024-01-07 22:39:30 -05:00
from loguru import logger
2024-01-11 21:33:39 -05:00
from markdownify import markdownify as md
2024-01-07 22:39:30 -05:00
from catalog.common import *
from catalog.common.downloaders import *
from catalog.models import *
from journal.models import *
from users.models import *
2024-01-10 22:20:57 -05:00
_tz_sh = pytz.timezone("Asia/Shanghai")
2024-01-07 22:39:30 -05:00
class LetterboxdImporter(Task):
2024-12-26 01:51:24 -05:00
class Meta:
app_label = "journal" # workaround bug in TypedModel
2024-01-07 22:39:30 -05:00
TaskQueue = "import"
DefaultMetadata = {
"total": 0,
"processed": 0,
"skipped": 0,
"imported": 0,
"failed": 0,
"visibility": 0,
2024-01-11 21:33:39 -05:00
"failed_urls": [],
2024-01-07 22:39:30 -05:00
"file": None,
}
def get_item_by_url(self, url):
try:
2024-01-10 22:20:57 -05:00
h = BasicDownloader(url).download().html()
2024-01-11 21:33:39 -05:00
tu = h.xpath("//a[@data-track-action='TMDb']/@href")
2024-01-12 18:38:09 -05:00
iu = h.xpath("//a[@data-track-action='IMDb']/@href")
2024-01-11 21:33:39 -05:00
if not tu:
2024-01-10 22:20:57 -05:00
i = h.xpath('//span[@class="film-title-wrapper"]/a/@href')
u2 = "https://letterboxd.com" + i[0] # type:ignore
h = BasicDownloader(u2).download().html()
2024-01-11 21:33:39 -05:00
tu = h.xpath("//a[@data-track-action='TMDb']/@href")
2024-01-12 18:38:09 -05:00
iu = h.xpath("//a[@data-track-action='IMDb']/@href")
2024-01-11 21:33:39 -05:00
if not tu:
logger.error(f"Unknown TMDB for {url}")
2024-01-07 22:39:30 -05:00
return None
2024-01-11 21:33:39 -05:00
site = SiteManager.get_site_by_url(tu[0]) # type:ignore
2024-01-07 22:39:30 -05:00
if not site:
return None
2024-01-11 21:33:39 -05:00
if site.ID_TYPE == IdType.TMDB_TV:
site = SiteManager.get_site_by_url(f"{site.url}/season/1")
if not site:
return None
2024-01-12 18:38:09 -05:00
try:
site.get_resource_ready()
return site.get_item()
2024-04-06 00:13:50 -04:00
except Exception:
2024-01-12 18:38:09 -05:00
imdb_url = str(iu[0]) # type:ignore
logger.warning(
f"Fetching {url}: TMDB {site.url} failed, try IMDB {imdb_url}"
)
site = SiteManager.get_site_by_url(imdb_url)
if not site:
return None
site.get_resource_ready()
return site.get_item()
2024-01-07 22:39:30 -05:00
except Exception as e:
2024-01-12 18:38:09 -05:00
logger.error(f"Fetching {url}: error {e}")
2024-01-07 22:39:30 -05:00
2024-01-11 21:33:39 -05:00
def mark(self, url, shelf_type, date, rating=None, text=None, tags=None):
2024-01-07 22:39:30 -05:00
item = self.get_item_by_url(url)
if not item:
logger.error(f"Unable to get item for {url}")
2024-01-11 21:33:39 -05:00
self.progress(-1, url)
2024-01-07 22:39:30 -05:00
return
owner = self.user.identity
mark = Mark(owner, item)
if (
mark.shelf_type == shelf_type
or mark.shelf_type == ShelfType.COMPLETE
or (
2024-01-20 22:52:08 -05:00
mark.shelf_type in [ShelfType.PROGRESS, ShelfType.DROPPED]
2024-01-07 22:39:30 -05:00
and shelf_type == ShelfType.WISHLIST
)
):
2024-01-10 22:20:57 -05:00
self.progress(0)
2024-01-11 21:33:39 -05:00
return
2024-01-07 22:39:30 -05:00
visibility = self.metadata["visibility"]
shelf_time_offset = {
ShelfType.WISHLIST: " 20:00:00",
ShelfType.PROGRESS: " 21:00:00",
ShelfType.COMPLETE: " 22:00:00",
}
dt = parse_datetime(date + shelf_time_offset[shelf_type])
2024-01-10 22:20:57 -05:00
if dt:
dt += timedelta(seconds=randint(0, 3599))
2024-01-10 22:20:57 -05:00
dt = dt.replace(tzinfo=_tz_sh)
2024-01-11 21:33:39 -05:00
rating_grade = round(float(rating) * 2) if rating else None
comment = None
if text:
text = md(text)
if len(text) < 360:
comment = text
else:
2024-05-15 20:41:03 -04:00
title = _("a review of {item_title}").format(item_title=item.title)
2024-01-11 21:33:39 -05:00
Review.update_item_review(item, owner, title, text, visibility, dt)
2024-06-04 10:12:04 -04:00
tag_titles = [s.strip() for s in tags.split(",")] if tags else None
2024-01-07 22:39:30 -05:00
mark.update(
shelf_type,
2024-01-11 21:33:39 -05:00
comment_text=comment,
rating_grade=rating_grade,
2024-06-04 10:12:04 -04:00
tags=tag_titles,
2024-01-07 22:39:30 -05:00
visibility=visibility,
created_time=dt,
)
2024-01-10 22:20:57 -05:00
self.progress(1)
2024-01-11 21:33:39 -05:00
def progress(self, mark_state: int, url=None):
2024-01-10 22:20:57 -05:00
self.metadata["total"] += 1
self.metadata["processed"] += 1
match mark_state:
case 1:
self.metadata["imported"] += 1
case 0:
self.metadata["skipped"] += 1
case _:
self.metadata["failed"] += 1
2024-01-11 21:33:39 -05:00
if url:
self.metadata["failed_urls"].append(url)
2024-01-10 22:20:57 -05:00
self.message = f"{self.metadata['imported']} imported, {self.metadata['skipped']} skipped, {self.metadata['failed']} failed"
self.save(update_fields=["metadata", "message"])
2024-01-07 22:39:30 -05:00
def run(self):
uris = set()
filename = self.metadata["file"]
with zipfile.ZipFile(filename, "r") as zipref:
with tempfile.TemporaryDirectory() as tmpdirname:
logger.debug(f"Extracting {filename} to {tmpdirname}")
zipref.extractall(tmpdirname)
with open(tmpdirname + "/reviews.csv") as f:
reader = csv.DictReader(f, delimiter=",")
for row in reader:
uris.add(row["Letterboxd URI"])
self.mark(
row["Letterboxd URI"],
ShelfType.COMPLETE,
row["Watched Date"],
row["Rating"],
2024-01-10 22:20:57 -05:00
row["Review"],
2024-01-07 22:39:30 -05:00
row["Tags"],
)
with open(tmpdirname + "/ratings.csv") as f:
reader = csv.DictReader(f, delimiter=",")
for row in reader:
if row["Letterboxd URI"] in uris:
continue
uris.add(row["Letterboxd URI"])
self.mark(
row["Letterboxd URI"],
ShelfType.COMPLETE,
row["Date"],
row["Rating"],
)
with open(tmpdirname + "/watched.csv") as f:
reader = csv.DictReader(f, delimiter=",")
for row in reader:
if row["Letterboxd URI"] in uris:
continue
uris.add(row["Letterboxd URI"])
self.mark(
row["Letterboxd URI"],
ShelfType.COMPLETE,
row["Date"],
)
with open(tmpdirname + "/watchlist.csv") as f:
reader = csv.DictReader(f, delimiter=",")
for row in reader:
if row["Letterboxd URI"] in uris:
continue
uris.add(row["Letterboxd URI"])
self.mark(
row["Letterboxd URI"],
ShelfType.WISHLIST,
row["Date"],
)