lib.itmens/journal/importers/csv.py
2025-03-04 18:32:54 -05:00

403 lines
14 KiB
Python

import csv
import datetime
import os
import tempfile
import zipfile
from typing import Dict, List, Optional
from django.conf import settings
from django.utils import timezone
from django.utils.dateparse import parse_datetime
from django.utils.translation import gettext as _
from loguru import logger
from catalog.common.sites import SiteManager
from catalog.models import Edition, IdType, Item, ItemCategory, SiteName
from journal.models import Mark, Note, Review, ShelfType
from users.models import Task
_PREFERRED_SITES = [
SiteName.Fediverse,
SiteName.RSS,
SiteName.TMDB,
SiteName.IMDB,
SiteName.GoogleBooks,
SiteName.Goodreads,
SiteName.IGDB,
]
class CsvImporter(Task):
class Meta:
app_label = "journal" # workaround bug in TypedModel
TaskQueue = "import"
DefaultMetadata = {
"total": 0,
"processed": 0,
"skipped": 0,
"imported": 0,
"failed": 0,
"failed_items": [],
"file": None,
"visibility": 0,
}
def get_item_by_info_and_links(
self, title: str, info_str: str, links_str: str
) -> Optional[Item]:
"""Find an item based on information from CSV export.
Args:
title: Item title
info_str: Item info string (space-separated key:value pairs)
links_str: Space-separated URLs
Returns:
Item if found, None otherwise
"""
site_url = settings.SITE_INFO["site_url"] + "/"
links = links_str.strip().split()
# look for local items first
for link in links:
if link.startswith("/") or link.startswith(site_url):
item = Item.get_by_url(link)
if item:
return item
sites = [SiteManager.get_site_by_url(link) for link in links]
sites = [site for site in sites if site]
sites.sort(
key=lambda x: _PREFERRED_SITES.index(x.SITE_NAME)
if x.SITE_NAME in _PREFERRED_SITES
else 99
)
# look for external items that already matched
for site in sites:
logger.debug(f"matching {site.url}")
item = site.get_item()
if item:
return item
# fetch external item if possible
for site in sites:
try:
logger.debug(f"fetching {site.url}")
site.get_resource_ready()
item = site.get_item()
if item:
return item
except Exception as e:
logger.error(f"Error fetching item: {e}")
# Try using the info string
if info_str:
info_dict = {}
for pair in info_str.strip().split():
if ":" in pair:
key, value = pair.split(":", 1)
info_dict[key] = value
# Check for ISBN, IMDB, etc.
item = None
for key, value in info_dict.items():
if key == "isbn" and value:
item = Edition.objects.filter(
primary_lookup_id_type=IdType.ISBN,
primary_lookup_id_value=value,
).first()
elif key == "imdb" and value:
item = Item.objects.filter(
primary_lookup_id_type=IdType.IMDB,
primary_lookup_id_value=value,
).first()
if item:
return item
return None
def parse_tags(self, tags_str: str) -> List[str]:
"""Parse space-separated tags string into a list of tags."""
if not tags_str:
return []
return [tag.strip() for tag in tags_str.split() if tag.strip()]
def parse_info(self, info_str: str) -> Dict[str, str]:
"""Parse info string into a dictionary."""
info_dict = {}
if not info_str:
return info_dict
for pair in info_str.split():
if ":" in pair:
key, value = pair.split(":", 1)
info_dict[key] = value
return info_dict
def parse_datetime(self, timestamp_str: str) -> Optional[datetime.datetime]:
"""Parse ISO format timestamp into datetime."""
if not timestamp_str:
return None
try:
dt = parse_datetime(timestamp_str)
if dt and dt.tzinfo is None:
dt = dt.replace(tzinfo=datetime.UTC)
return dt
except Exception as e:
logger.error(f"Error parsing datetime {timestamp_str}: {e}")
return None
def parse_shelf_type(self, status_str: str) -> ShelfType:
"""Parse shelf type string into ShelfType enum."""
if not status_str:
return ShelfType.WISHLIST
status_map = {
"wishlist": ShelfType.WISHLIST,
"progress": ShelfType.PROGRESS,
"complete": ShelfType.COMPLETE,
"dropped": ShelfType.DROPPED,
}
return status_map.get(status_str.lower(), ShelfType.WISHLIST)
def import_mark(self, row: Dict[str, str]) -> bool:
"""Import a mark from a CSV row."""
try:
item = self.get_item_by_info_and_links(
row.get("title", ""), row.get("info", ""), row.get("links", "")
)
if not item:
logger.error(f"Could not find item: {row.get('links', '')}")
self.metadata["failed_items"].append(
f"Could not find item: {row.get('links', '')}"
)
return False
owner = self.user.identity
mark = Mark(owner, item)
shelf_type = self.parse_shelf_type(row.get("status", ""))
rating_grade = None
if "rating" in row and row["rating"]:
try:
rating_grade = int(float(row["rating"]))
except (ValueError, TypeError):
pass
comment_text = row.get("comment", "")
tags = self.parse_tags(row.get("tags", ""))
# Parse timestamp
created_time = (
self.parse_datetime(row.get("timestamp", "")) or timezone.now()
)
if (
mark.shelf_type
and mark.created_time
and mark.created_time >= created_time
):
# skip if existing mark is newer
self.metadata["skipped"] = self.metadata.get("skipped", 0) + 1
return True
# Update the mark
mark.update(
shelf_type,
comment_text=comment_text,
rating_grade=rating_grade,
tags=tags,
created_time=created_time,
visibility=self.metadata.get("visibility", 0),
)
return True
except Exception as e:
logger.error(f"Error importing mark: {e}")
self.metadata["failed_items"].append(
f"Error importing mark for {row.get('title', '')}"
)
return False
def import_review(self, row: Dict[str, str]) -> bool:
"""Import a review from a CSV row."""
try:
item = self.get_item_by_info_and_links(
row.get("title", ""), row.get("info", ""), row.get("links", "")
)
if not item:
logger.error(f"Could not find item for review: {row.get('links', '')}")
self.metadata["failed_items"].append(
f"Could not find item for review: {row.get('links', '')}"
)
return False
owner = self.user.identity
review_title = row.get("title", "") # Second "title" field is review title
review_content = row.get("content", "")
# Parse timestamp
created_time = self.parse_datetime(row.get("timestamp", ""))
# Check if there's an existing review with the same or newer timestamp
existing_review = Review.objects.filter(
owner=owner, item=item, title=review_title
).first()
# Skip if existing review is newer or same age
if (
existing_review
and existing_review.created_time
and created_time
and existing_review.created_time >= created_time
):
logger.debug(
f"Skipping review import for {item.display_title}: existing review is newer or same age"
)
self.metadata["skipped"] = self.metadata.get("skipped", 0) + 1
return True
# Create/update the review
Review.update_item_review(
item,
owner,
review_title,
review_content,
created_time=created_time,
visibility=self.metadata.get("visibility", 0),
)
return True
except Exception as e:
logger.error(f"Error importing review: {e}")
self.metadata["failed_items"].append(
f"Error importing review for {row.get('title', '')}: {str(e)}"
)
return False
def import_note(self, row: Dict[str, str]) -> bool:
"""Import a note from a CSV row."""
try:
item = self.get_item_by_info_and_links(
row.get("title", ""), row.get("info", ""), row.get("links", "")
)
if not item:
logger.error(f"Could not find item for note: {row.get('links', '')}")
self.metadata["failed_items"].append(
f"Could not find item for note: {row.get('links', '')}"
)
return False
owner = self.user.identity
title = row.get("title", "") # Second "title" field is note title
content = row.get("content", "")
progress = row.get("progress", "")
# Parse timestamp
created_time = self.parse_datetime(row.get("timestamp", ""))
# Extract progress information
pt, pv = Note.extract_progress(progress)
# Check if a note with the same attributes already exists
existing_notes = Note.objects.filter(
item=item,
owner=owner,
title=title,
progress_type=pt,
progress_value=pv,
)
# If we have an exact content match, skip this import
for existing_note in existing_notes:
if existing_note.content == content:
logger.debug(
f"Skipping note import for {item.display_title}: duplicate note found"
)
self.metadata["skipped"] = self.metadata.get("skipped", 0) + 1
return True
# Create the note if no duplicate is found
Note.objects.create(
item=item,
owner=owner,
title=title,
content=content,
progress_type=pt,
progress_value=pv,
created_time=created_time,
visibility=self.metadata.get("visibility", 0),
)
return True
except Exception as e:
logger.error(f"Error importing note: {e}")
self.metadata["failed_items"].append(
f"Error importing note for {row.get('title', '')}: {str(e)}"
)
return False
def progress(self, success: bool) -> None:
"""Update import progress."""
self.metadata["total"] += 1
self.metadata["processed"] += 1
if success:
self.metadata["imported"] += 1
else:
self.metadata["failed"] += 1
self.message = f"{self.metadata['imported']} imported, {self.metadata['skipped']} skipped, {self.metadata['failed']} failed"
self.save(update_fields=["metadata", "message"])
def process_csv_file(self, file_path: str, import_function) -> None:
"""Process a CSV file using the specified import function."""
logger.debug(f"Processing {file_path}")
with open(file_path, "r") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
success = import_function(row)
self.progress(success)
def run(self) -> None:
"""Run the CSV import."""
filename = self.metadata["file"]
logger.debug(f"Importing {filename}")
with zipfile.ZipFile(filename, "r") as zipref:
with tempfile.TemporaryDirectory() as tmpdirname:
logger.debug(f"Extracting {filename} to {tmpdirname}")
zipref.extractall(tmpdirname)
# Look for mark, review, and note CSV files
for category in [
ItemCategory.Movie,
ItemCategory.TV,
ItemCategory.Music,
ItemCategory.Book,
ItemCategory.Game,
ItemCategory.Podcast,
ItemCategory.Performance,
]:
# Import marks
mark_file = os.path.join(tmpdirname, f"{category}_mark.csv")
if os.path.exists(mark_file):
self.process_csv_file(mark_file, self.import_mark)
# Import reviews
review_file = os.path.join(tmpdirname, f"{category}_review.csv")
if os.path.exists(review_file):
self.process_csv_file(review_file, self.import_review)
# Import notes
note_file = os.path.join(tmpdirname, f"{category}_note.csv")
if os.path.exists(note_file):
self.process_csv_file(note_file, self.import_note)
self.message = _("Import complete")
if self.metadata.get("failed_items", []):
self.message += f": {self.metadata['failed']} items failed ({len(self.metadata['failed_items'])} unique items)"
self.save()