lib.itmens/journal/importers/csv.py

263 lines
9.4 KiB
Python

import csv
import os
import tempfile
import zipfile
from typing import Dict
from django.utils import timezone
from loguru import logger
from catalog.models import ItemCategory
from journal.models import Mark, Note, Review
from .base import BaseImporter
class CsvImporter(BaseImporter):
class Meta:
app_label = "journal" # workaround bug in TypedModel
def import_mark(self, row: Dict[str, str]) -> str:
"""Import a mark from a CSV row.
Returns:
str: 'imported', 'skipped', or 'failed' indicating the import result
"""
try:
item = self.get_item_by_info_and_links(
row.get("title", ""),
row.get("info", ""),
row.get("links", "").strip().split(),
)
if not item:
logger.error(f"Could not find item: {row.get('links', '')}")
self.metadata["failed_items"].append(
f"Could not find item: {row.get('links', '')}"
)
return "failed"
owner = self.user.identity
mark = Mark(owner, item)
shelf_type = self.parse_shelf_type(row.get("status", ""))
rating_grade = None
if "rating" in row and row["rating"]:
try:
rating_grade = int(float(row["rating"]))
except (ValueError, TypeError):
pass
comment_text = row.get("comment", "")
tags = self.parse_tags(row.get("tags", ""))
# Parse timestamp
created_time = (
self.parse_datetime(row.get("timestamp", "")) or timezone.now()
)
if (
mark.shelf_type
and mark.created_time
and mark.created_time >= created_time
):
# skip if existing mark is newer
return "skipped"
# Update the mark
mark.update(
shelf_type,
comment_text=comment_text,
rating_grade=rating_grade,
tags=tags,
created_time=created_time,
visibility=self.metadata.get("visibility", 0),
)
return "imported"
except Exception as e:
logger.error(f"Error importing mark: {e}")
self.metadata["failed_items"].append(
f"Error importing mark for {row.get('title', '')}"
)
return "failed"
def import_review(self, row: Dict[str, str]) -> str:
"""Import a review from a CSV row.
Returns:
str: 'imported', 'skipped', or 'failed' indicating the import result
"""
try:
item = self.get_item_by_info_and_links(
row.get("title", ""),
row.get("info", ""),
row.get("links", "").strip().split(),
)
if not item:
logger.error(f"Could not find item for review: {row.get('links', '')}")
self.metadata["failed_items"].append(
f"Could not find item for review: {row.get('links', '')}"
)
return "failed"
owner = self.user.identity
review_title = row.get("title", "") # Second "title" field is review title
review_content = row.get("content", "")
# Parse timestamp
created_time = self.parse_datetime(row.get("timestamp", ""))
# Check if there's an existing review with the same or newer timestamp
existing_review = Review.objects.filter(
owner=owner, item=item, title=review_title
).first()
# Skip if existing review is newer or same age
if (
existing_review
and existing_review.created_time
and created_time
and existing_review.created_time >= created_time
):
logger.debug(
f"Skipping review import for {item.display_title}: existing review is newer or same age"
)
return "skipped"
# Create/update the review
Review.update_item_review(
item,
owner,
review_title,
review_content,
created_time=created_time,
visibility=self.metadata.get("visibility", 0),
)
return "imported"
except Exception as e:
logger.error(f"Error importing review: {e}")
self.metadata["failed_items"].append(
f"Error importing review for {row.get('title', '')}: {str(e)}"
)
return "failed"
def import_note(self, row: Dict[str, str]) -> str:
"""Import a note from a CSV row.
Returns:
str: 'imported', 'skipped', or 'failed' indicating the import result
"""
try:
item = self.get_item_by_info_and_links(
row.get("title", ""),
row.get("info", ""),
row.get("links", "").strip().split(),
)
if not item:
logger.error(f"Could not find item for note: {row.get('links', '')}")
self.metadata["failed_items"].append(
f"Could not find item for note: {row.get('links', '')}"
)
return "failed"
owner = self.user.identity
title = row.get("title", "") # Second "title" field is note title
content = row.get("content", "")
progress = row.get("progress", "")
# Parse timestamp
created_time = self.parse_datetime(row.get("timestamp", ""))
# Extract progress information
pt, pv = Note.extract_progress(progress)
# Check if a note with the same attributes already exists
existing_notes = Note.objects.filter(
item=item,
owner=owner,
title=title,
progress_type=pt,
progress_value=pv,
)
# If we have an exact content match, skip this import
for existing_note in existing_notes:
if existing_note.content == content:
logger.debug(
f"Skipping note import for {item.display_title}: duplicate note found"
)
return "skipped"
# Create the note if no duplicate is found
Note.objects.create(
item=item,
owner=owner,
title=title,
content=content,
progress_type=pt,
progress_value=pv,
created_time=created_time,
visibility=self.metadata.get("visibility", 0),
)
return "imported"
except Exception as e:
logger.error(f"Error importing note: {e}")
self.metadata["failed_items"].append(
f"Error importing note for {row.get('title', '')}: {str(e)}"
)
return "failed"
def process_csv_file(self, file_path: str, import_function) -> None:
"""Process a CSV file using the specified import function."""
logger.debug(f"Processing {file_path}")
with open(file_path, "r") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
result = import_function(row)
self.progress(result)
def run(self) -> None:
"""Run the CSV import."""
filename = self.metadata["file"]
logger.debug(f"Importing {filename}")
with zipfile.ZipFile(filename, "r") as zipref:
with tempfile.TemporaryDirectory() as tmpdirname:
zipref.extractall(tmpdirname)
# Count total rows in all CSV files first
total_rows = 0
csv_files = []
for category in [
ItemCategory.Movie,
ItemCategory.TV,
ItemCategory.Music,
ItemCategory.Book,
ItemCategory.Game,
ItemCategory.Podcast,
ItemCategory.Performance,
]:
for file_type in ["mark", "review", "note"]:
file_path = os.path.join(
tmpdirname, f"{category}_{file_type}.csv"
)
if os.path.exists(file_path):
with open(file_path, "r") as csvfile:
row_count = sum(1 for _ in csv.DictReader(csvfile))
total_rows += row_count
csv_files.append((file_path, file_type))
# Set the total count in metadata
self.metadata["total"] = total_rows
self.message = f"found {total_rows} records to import"
self.save(update_fields=["metadata", "message"])
# Now process all files
for file_path, file_type in csv_files:
import_function = getattr(self, f"import_{file_type}")
self.process_csv_file(file_path, import_function)
self.message = f"{self.metadata['imported']} items imported, {self.metadata['skipped']} skipped, {self.metadata['failed']} failed."
self.save()