add csv importer
This commit is contained in:
parent
e586d660a8
commit
5911d422f5
7 changed files with 719 additions and 23 deletions
|
@ -447,6 +447,9 @@ LANGUAGE_CODE, PREFERRED_LANGUAGES = _init_language_settings(
|
|||
env("NEODB_PREFERRED_LANGUAGES")
|
||||
)
|
||||
|
||||
if TESTING: # force en if testing
|
||||
LANGUAGE_CODE = "en"
|
||||
|
||||
LOCALE_PATHS = [os.path.join(BASE_DIR, "locale")]
|
||||
|
||||
TIME_ZONE = env("NEODB_TIMEZONE", default="Asia/Shanghai") # type: ignore
|
||||
|
|
|
@ -1,6 +1,13 @@
|
|||
from .csv import CsvImporter
|
||||
from .douban import DoubanImporter
|
||||
from .goodreads import GoodreadsImporter
|
||||
from .letterboxd import LetterboxdImporter
|
||||
from .opml import OPMLImporter
|
||||
|
||||
__all__ = ["LetterboxdImporter", "OPMLImporter", "DoubanImporter", "GoodreadsImporter"]
|
||||
__all__ = [
|
||||
"CsvImporter",
|
||||
"LetterboxdImporter",
|
||||
"OPMLImporter",
|
||||
"DoubanImporter",
|
||||
"GoodreadsImporter",
|
||||
]
|
||||
|
|
394
journal/importers/csv.py
Normal file
394
journal/importers/csv.py
Normal file
|
@ -0,0 +1,394 @@
|
|||
import csv
|
||||
import datetime
|
||||
import os
|
||||
import tempfile
|
||||
import zipfile
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from django.conf import settings
|
||||
from django.utils import timezone
|
||||
from django.utils.dateparse import parse_datetime
|
||||
from django.utils.translation import gettext as _
|
||||
from loguru import logger
|
||||
|
||||
from catalog.common.sites import SiteManager
|
||||
from catalog.models import Edition, IdType, Item, ItemCategory
|
||||
from journal.models import Mark, Note, Review, ShelfType
|
||||
from users.models import Task
|
||||
|
||||
|
||||
class CsvImporter(Task):
|
||||
class Meta:
|
||||
app_label = "journal" # workaround bug in TypedModel
|
||||
|
||||
TaskQueue = "import"
|
||||
DefaultMetadata = {
|
||||
"total": 0,
|
||||
"processed": 0,
|
||||
"skipped": 0,
|
||||
"imported": 0,
|
||||
"failed": 0,
|
||||
"failed_items": [],
|
||||
"file": None,
|
||||
"visibility": 0,
|
||||
}
|
||||
|
||||
def get_item_by_info_and_links(
|
||||
self, title: str, info_str: str, links_str: str
|
||||
) -> Optional[Item]:
|
||||
"""Find an item based on information from CSV export.
|
||||
|
||||
Args:
|
||||
title: Item title
|
||||
info_str: Item info string (space-separated key:value pairs)
|
||||
links_str: Space-separated URLs
|
||||
|
||||
Returns:
|
||||
Item if found, None otherwise
|
||||
"""
|
||||
site_url = settings.SITE_INFO["site_url"] + "/"
|
||||
|
||||
links = links_str.strip().split()
|
||||
for link in links:
|
||||
if link.startswith("/") or link.startswith(site_url):
|
||||
item = Item.get_by_url(link)
|
||||
if item:
|
||||
return item
|
||||
for link in links:
|
||||
site = SiteManager.get_site_by_url(link)
|
||||
if site:
|
||||
site.get_resource_ready()
|
||||
item = site.get_item()
|
||||
if item:
|
||||
return item
|
||||
# Try using the info string
|
||||
if info_str:
|
||||
info_dict = {}
|
||||
for pair in info_str.strip().split():
|
||||
if ":" in pair:
|
||||
key, value = pair.split(":", 1)
|
||||
info_dict[key] = value
|
||||
|
||||
# Check for ISBN, IMDB, etc.
|
||||
item = None
|
||||
for key, value in info_dict.items():
|
||||
if key == "isbn" and value:
|
||||
item = Edition.objects.filter(
|
||||
primary_lookup_id_type=IdType.ISBN,
|
||||
primary_lookup_id_value=value,
|
||||
).first()
|
||||
elif key == "imdb" and value:
|
||||
item = Item.objects.filter(
|
||||
primary_lookup_id_type=IdType.IMDB,
|
||||
primary_lookup_id_value=value,
|
||||
).first()
|
||||
if item:
|
||||
return item
|
||||
return None
|
||||
|
||||
def parse_tags(self, tags_str: str) -> List[str]:
|
||||
"""Parse space-separated tags string into a list of tags."""
|
||||
if not tags_str:
|
||||
return []
|
||||
return [tag.strip() for tag in tags_str.split() if tag.strip()]
|
||||
|
||||
def parse_info(self, info_str: str) -> Dict[str, str]:
|
||||
"""Parse info string into a dictionary."""
|
||||
info_dict = {}
|
||||
if not info_str:
|
||||
return info_dict
|
||||
|
||||
for pair in info_str.split():
|
||||
if ":" in pair:
|
||||
key, value = pair.split(":", 1)
|
||||
info_dict[key] = value
|
||||
|
||||
return info_dict
|
||||
|
||||
def parse_datetime(self, timestamp_str: str) -> Optional[datetime.datetime]:
|
||||
"""Parse ISO format timestamp into datetime."""
|
||||
if not timestamp_str:
|
||||
return None
|
||||
|
||||
try:
|
||||
dt = parse_datetime(timestamp_str)
|
||||
if dt and dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=datetime.UTC)
|
||||
return dt
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing datetime {timestamp_str}: {e}")
|
||||
return None
|
||||
|
||||
def parse_shelf_type(self, status_str: str) -> ShelfType:
|
||||
"""Parse shelf type string into ShelfType enum."""
|
||||
if not status_str:
|
||||
return ShelfType.WISHLIST
|
||||
|
||||
status_map = {
|
||||
"wishlist": ShelfType.WISHLIST,
|
||||
"progress": ShelfType.PROGRESS,
|
||||
"complete": ShelfType.COMPLETE,
|
||||
"dropped": ShelfType.DROPPED,
|
||||
}
|
||||
|
||||
return status_map.get(status_str.lower(), ShelfType.WISHLIST)
|
||||
|
||||
def import_mark(self, row: Dict[str, str]) -> bool:
|
||||
"""Import a mark from a CSV row."""
|
||||
try:
|
||||
item = self.get_item_by_info_and_links(
|
||||
row.get("title", ""), row.get("info", ""), row.get("links", "")
|
||||
)
|
||||
|
||||
if not item:
|
||||
logger.error(f"Could not find item: {row.get('links', '')}")
|
||||
self.metadata["failed_items"].append(
|
||||
f"Could not find item: {row.get('links', '')}"
|
||||
)
|
||||
return False
|
||||
|
||||
owner = self.user.identity
|
||||
mark = Mark(owner, item)
|
||||
|
||||
shelf_type = self.parse_shelf_type(row.get("status", ""))
|
||||
rating_grade = None
|
||||
if "rating" in row and row["rating"]:
|
||||
try:
|
||||
rating_grade = int(float(row["rating"]))
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
comment_text = row.get("comment", "")
|
||||
tags = self.parse_tags(row.get("tags", ""))
|
||||
|
||||
# Parse timestamp
|
||||
created_time = (
|
||||
self.parse_datetime(row.get("timestamp", "")) or timezone.now()
|
||||
)
|
||||
|
||||
if (
|
||||
mark.shelf_type
|
||||
and mark.created_time
|
||||
and mark.created_time >= created_time
|
||||
):
|
||||
# skip if existing mark is newer
|
||||
self.metadata["skipped"] = self.metadata.get("skipped", 0) + 1
|
||||
return True
|
||||
|
||||
# Update the mark
|
||||
mark.update(
|
||||
shelf_type,
|
||||
comment_text=comment_text,
|
||||
rating_grade=rating_grade,
|
||||
tags=tags,
|
||||
created_time=created_time,
|
||||
visibility=self.metadata.get("visibility", 0),
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error importing mark: {e}")
|
||||
self.metadata["failed_items"].append(
|
||||
f"Error importing mark for {row.get('title', '')}"
|
||||
)
|
||||
return False
|
||||
|
||||
def import_review(self, row: Dict[str, str]) -> bool:
|
||||
"""Import a review from a CSV row."""
|
||||
try:
|
||||
item = self.get_item_by_info_and_links(
|
||||
row.get("title", ""), row.get("info", ""), row.get("links", "")
|
||||
)
|
||||
|
||||
if not item:
|
||||
logger.error(f"Could not find item for review: {row.get('links', '')}")
|
||||
self.metadata["failed_items"].append(
|
||||
f"Could not find item for review: {row.get('links', '')}"
|
||||
)
|
||||
return False
|
||||
|
||||
owner = self.user.identity
|
||||
review_title = row.get("title", "") # Second "title" field is review title
|
||||
review_content = row.get("content", "")
|
||||
|
||||
# Parse timestamp
|
||||
created_time = self.parse_datetime(row.get("timestamp", ""))
|
||||
|
||||
# Check if there's an existing review with the same or newer timestamp
|
||||
existing_review = Review.objects.filter(
|
||||
owner=owner, item=item, title=review_title
|
||||
).first()
|
||||
# Skip if existing review is newer or same age
|
||||
if (
|
||||
existing_review
|
||||
and existing_review.created_time
|
||||
and created_time
|
||||
and existing_review.created_time >= created_time
|
||||
):
|
||||
logger.debug(
|
||||
f"Skipping review import for {item.display_title}: existing review is newer or same age"
|
||||
)
|
||||
self.metadata["skipped"] = self.metadata.get("skipped", 0) + 1
|
||||
return True
|
||||
|
||||
# Create/update the review
|
||||
Review.update_item_review(
|
||||
item,
|
||||
owner,
|
||||
review_title,
|
||||
review_content,
|
||||
created_time=created_time,
|
||||
visibility=self.metadata.get("visibility", 0),
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error importing review: {e}")
|
||||
self.metadata["failed_items"].append(
|
||||
f"Error importing review for {row.get('title', '')}: {str(e)}"
|
||||
)
|
||||
return False
|
||||
|
||||
def import_note(self, row: Dict[str, str]) -> bool:
|
||||
"""Import a note from a CSV row."""
|
||||
try:
|
||||
item = self.get_item_by_info_and_links(
|
||||
row.get("title", ""), row.get("info", ""), row.get("links", "")
|
||||
)
|
||||
|
||||
if not item:
|
||||
logger.error(f"Could not find item for note: {row.get('links', '')}")
|
||||
self.metadata["failed_items"].append(
|
||||
f"Could not find item for note: {row.get('links', '')}"
|
||||
)
|
||||
return False
|
||||
|
||||
owner = self.user.identity
|
||||
title = row.get("title", "") # Second "title" field is note title
|
||||
content = row.get("content", "")
|
||||
progress = row.get("progress", "")
|
||||
|
||||
# Parse timestamp
|
||||
created_time = self.parse_datetime(row.get("timestamp", ""))
|
||||
|
||||
# Extract progress information
|
||||
pt, pv = Note.extract_progress(progress)
|
||||
|
||||
# Check if a note with the same attributes already exists
|
||||
existing_notes = Note.objects.filter(
|
||||
item=item,
|
||||
owner=owner,
|
||||
title=title,
|
||||
progress_type=pt,
|
||||
progress_value=pv,
|
||||
)
|
||||
|
||||
# If we have an exact content match, skip this import
|
||||
for existing_note in existing_notes:
|
||||
if existing_note.content == content:
|
||||
logger.debug(
|
||||
f"Skipping note import for {item.display_title}: duplicate note found"
|
||||
)
|
||||
self.metadata["skipped"] = self.metadata.get("skipped", 0) + 1
|
||||
return True
|
||||
|
||||
# Create the note if no duplicate is found
|
||||
Note.objects.create(
|
||||
item=item,
|
||||
owner=owner,
|
||||
title=title,
|
||||
content=content,
|
||||
progress_type=pt,
|
||||
progress_value=pv,
|
||||
created_time=created_time,
|
||||
visibility=self.metadata.get("visibility", 0),
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error importing note: {e}")
|
||||
if "failed_items" not in self.metadata:
|
||||
self.metadata["failed_items"] = []
|
||||
self.metadata["failed_items"].append(
|
||||
f"Error importing note for {row.get('title', '')}: {str(e)}"
|
||||
)
|
||||
return False
|
||||
|
||||
def progress(self, success: bool) -> None:
|
||||
"""Update import progress."""
|
||||
self.metadata["total"] += 1
|
||||
self.metadata["processed"] += 1
|
||||
|
||||
if success:
|
||||
self.metadata["imported"] += 1
|
||||
else:
|
||||
self.metadata["failed"] += 1
|
||||
|
||||
self.message = f"{self.metadata['imported']} imported, {self.metadata['skipped']} skipped, {self.metadata['failed']} failed"
|
||||
self.save(update_fields=["metadata", "message"])
|
||||
|
||||
def process_csv_file(self, file_path: str, import_function) -> None:
|
||||
"""Process a CSV file using the specified import function."""
|
||||
logger.debug(f"Processing {file_path}")
|
||||
with open(file_path, "r") as csvfile:
|
||||
reader = csv.DictReader(csvfile)
|
||||
for row in reader:
|
||||
success = import_function(row)
|
||||
self.progress(success)
|
||||
|
||||
def validate_file(self, filename: str) -> bool:
|
||||
"""Validate that the given file is a valid CSV export ZIP file.
|
||||
|
||||
Args:
|
||||
filename: Path to the file to validate
|
||||
|
||||
Returns:
|
||||
bool: True if the file is valid, False otherwise
|
||||
"""
|
||||
return os.path.exists(filename) and zipfile.is_zipfile(filename)
|
||||
|
||||
def run(self) -> None:
|
||||
"""Run the CSV import."""
|
||||
# Ensure failed_items is initialized
|
||||
if "failed_items" not in self.metadata:
|
||||
self.metadata["failed_items"] = []
|
||||
|
||||
filename = self.metadata["file"]
|
||||
logger.debug(f"Importing {filename}")
|
||||
|
||||
# Validate the file before processing
|
||||
if not self.validate_file(filename):
|
||||
self.save()
|
||||
return
|
||||
|
||||
with zipfile.ZipFile(filename, "r") as zipref:
|
||||
with tempfile.TemporaryDirectory() as tmpdirname:
|
||||
logger.debug(f"Extracting {filename} to {tmpdirname}")
|
||||
zipref.extractall(tmpdirname)
|
||||
|
||||
# Look for mark, review, and note CSV files
|
||||
for category in [
|
||||
ItemCategory.Movie,
|
||||
ItemCategory.TV,
|
||||
ItemCategory.Music,
|
||||
ItemCategory.Book,
|
||||
ItemCategory.Game,
|
||||
ItemCategory.Podcast,
|
||||
ItemCategory.Performance,
|
||||
]:
|
||||
# Import marks
|
||||
mark_file = os.path.join(tmpdirname, f"{category}_mark.csv")
|
||||
if os.path.exists(mark_file):
|
||||
self.process_csv_file(mark_file, self.import_mark)
|
||||
|
||||
# Import reviews
|
||||
review_file = os.path.join(tmpdirname, f"{category}_review.csv")
|
||||
if os.path.exists(review_file):
|
||||
self.process_csv_file(review_file, self.import_review)
|
||||
|
||||
# Import notes
|
||||
note_file = os.path.join(tmpdirname, f"{category}_note.csv")
|
||||
if os.path.exists(note_file):
|
||||
self.process_csv_file(note_file, self.import_note)
|
||||
|
||||
self.message = _("Import complete")
|
||||
if self.metadata.get("failed_items", []):
|
||||
self.message += f": {self.metadata['failed']} items failed ({len(self.metadata['failed_items'])} unique items)"
|
||||
self.save()
|
3
journal/tests/__init__.py
Normal file
3
journal/tests/__init__.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
from .csv import *
|
||||
from .piece import *
|
||||
from .search import *
|
283
journal/tests/csv.py
Normal file
283
journal/tests/csv.py
Normal file
|
@ -0,0 +1,283 @@
|
|||
import csv
|
||||
import os
|
||||
import zipfile
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
from django.test import TestCase
|
||||
from django.utils.dateparse import parse_datetime
|
||||
from loguru import logger
|
||||
|
||||
from catalog.models import Edition, IdType, Movie, TVEpisode, TVSeason, TVShow
|
||||
from journal.exporters import CsvExporter
|
||||
from journal.importers import CsvImporter
|
||||
from users.models import User
|
||||
|
||||
from ..models import *
|
||||
|
||||
|
||||
class CsvExportImportTest(TestCase):
|
||||
databases = "__all__"
|
||||
|
||||
def setUp(self):
|
||||
# Create test items of different types
|
||||
self.book1 = Edition.objects.create(
|
||||
localized_title=[{"lang": "en", "text": "Hyperion"}],
|
||||
primary_lookup_id_type=IdType.ISBN,
|
||||
primary_lookup_id_value="9780553283686",
|
||||
author=["Dan Simmons"],
|
||||
pub_year=1989,
|
||||
)
|
||||
self.book2 = Edition.objects.create(
|
||||
localized_title=[{"lang": "en", "text": "Dune"}],
|
||||
primary_lookup_id_type=IdType.ISBN,
|
||||
primary_lookup_id_value="9780441172719",
|
||||
author=["Frank Herbert"],
|
||||
pub_year=1965,
|
||||
)
|
||||
self.movie1 = Movie.objects.create(
|
||||
localized_title=[{"lang": "en", "text": "Inception"}],
|
||||
primary_lookup_id_type=IdType.IMDB,
|
||||
primary_lookup_id_value="tt1375666",
|
||||
director=["Christopher Nolan"],
|
||||
year=2010,
|
||||
)
|
||||
self.movie2 = Movie.objects.create(
|
||||
localized_title=[{"lang": "en", "text": "The Matrix"}],
|
||||
primary_lookup_id_type=IdType.IMDB,
|
||||
primary_lookup_id_value="tt0133093",
|
||||
director=["Lana Wachowski", "Lilly Wachowski"],
|
||||
year=1999,
|
||||
)
|
||||
self.tvshow = TVShow.objects.create(
|
||||
localized_title=[{"lang": "en", "text": "Breaking Bad"}],
|
||||
primary_lookup_id_type=IdType.IMDB,
|
||||
primary_lookup_id_value="tt0903747",
|
||||
year=2008,
|
||||
)
|
||||
self.tvseason = TVSeason.objects.create(
|
||||
localized_title=[{"lang": "en", "text": "Breaking Bad Season 1"}],
|
||||
show=self.tvshow,
|
||||
season_number=1,
|
||||
)
|
||||
self.tvepisode1 = TVEpisode.objects.create(
|
||||
localized_title=[{"lang": "en", "text": "Pilot"}],
|
||||
season=self.tvseason,
|
||||
episode_number=1,
|
||||
)
|
||||
self.tvepisode2 = TVEpisode.objects.create(
|
||||
localized_title=[{"lang": "en", "text": "Cat's in the Bag..."}],
|
||||
season=self.tvseason,
|
||||
episode_number=2,
|
||||
)
|
||||
|
||||
# Create user for testing
|
||||
self.user1 = User.register(email="export@test.com", username="exporter")
|
||||
self.user2 = User.register(email="import@test.com", username="importer")
|
||||
self.dt = parse_datetime("2021-01-01T00:00:00Z")
|
||||
|
||||
def test_csv_export_import(self):
|
||||
# Create marks, reviews and notes for user1
|
||||
|
||||
# Book marks with ratings and tags
|
||||
mark_book1 = Mark(self.user1.identity, self.book1)
|
||||
mark_book1.update(
|
||||
ShelfType.COMPLETE,
|
||||
"Great sci-fi classic",
|
||||
10,
|
||||
["sci-fi", "favorite", "space"],
|
||||
1,
|
||||
created_time=self.dt,
|
||||
)
|
||||
|
||||
mark_book2 = Mark(self.user1.identity, self.book2)
|
||||
mark_book2.update(
|
||||
ShelfType.PROGRESS, "Reading it now", None, ["sci-fi", "desert"], 1
|
||||
)
|
||||
|
||||
# Movie marks with ratings
|
||||
mark_movie1 = Mark(self.user1.identity, self.movie1)
|
||||
mark_movie1.update(
|
||||
ShelfType.COMPLETE, "Mind-bending", 8, ["mindbender", "scifi"], 1
|
||||
)
|
||||
|
||||
mark_movie2 = Mark(self.user1.identity, self.movie2)
|
||||
mark_movie2.update(ShelfType.WISHLIST, "Need to rewatch", None, [], 1)
|
||||
|
||||
# TV show mark
|
||||
mark_tvshow = Mark(self.user1.identity, self.tvshow)
|
||||
mark_tvshow.update(ShelfType.WISHLIST, "Heard it's good", None, ["drama"], 1)
|
||||
|
||||
# TV episode marks
|
||||
mark_episode1 = Mark(self.user1.identity, self.tvepisode1)
|
||||
mark_episode1.update(ShelfType.COMPLETE, "Great start", 9, [], 1)
|
||||
|
||||
mark_episode2 = Mark(self.user1.identity, self.tvepisode2)
|
||||
mark_episode2.update(ShelfType.COMPLETE, "It gets better", 9, [], 1)
|
||||
|
||||
# Create reviews
|
||||
Review.update_item_review(
|
||||
self.book1,
|
||||
self.user1.identity,
|
||||
"My thoughts on Hyperion",
|
||||
"A masterpiece of science fiction that weaves multiple storylines into a captivating narrative.",
|
||||
visibility=1,
|
||||
created_time=self.dt,
|
||||
)
|
||||
|
||||
Review.update_item_review(
|
||||
self.movie1,
|
||||
self.user1.identity,
|
||||
"Inception Review",
|
||||
"Christopher Nolan at his best. The movie plays with reality and dreams in a fascinating way.",
|
||||
visibility=1,
|
||||
)
|
||||
|
||||
# Create notes
|
||||
Note.objects.create(
|
||||
item=self.book2,
|
||||
owner=self.user1.identity,
|
||||
title="Reading progress",
|
||||
content="Just finished the first part. The world-building is incredible.\n\n - p 125",
|
||||
progress_type=Note.ProgressType.PAGE,
|
||||
progress_value="p 125",
|
||||
visibility=1,
|
||||
)
|
||||
|
||||
Note.objects.create(
|
||||
item=self.tvshow,
|
||||
owner=self.user1.identity,
|
||||
title="Before watching",
|
||||
content="Things to look out for according to friends:\n- Character development\n- Color symbolism\n\n - e 0",
|
||||
progress_type=Note.ProgressType.EPISODE,
|
||||
progress_value="2",
|
||||
visibility=1,
|
||||
)
|
||||
|
||||
# Export data to CSV
|
||||
exporter = CsvExporter.create(user=self.user1)
|
||||
exporter.run()
|
||||
export_path = exporter.metadata["file"]
|
||||
logger.debug(f"exported to {export_path}")
|
||||
self.assertTrue(os.path.exists(export_path))
|
||||
|
||||
# Validate the number of CSV rows in the export files
|
||||
with TemporaryDirectory() as extract_dir:
|
||||
with zipfile.ZipFile(export_path, "r") as zip_ref:
|
||||
zip_ref.extractall(extract_dir)
|
||||
logger.debug(f"unzipped to {extract_dir}")
|
||||
|
||||
# Expected row counts (data rows, excluding header)
|
||||
expected_data_rows = {
|
||||
"book_mark.csv": 2, # 2 book marks
|
||||
"book_review.csv": 1, # 1 book review
|
||||
"book_note.csv": 1, # 1 book note
|
||||
"movie_mark.csv": 2, # 2 movie marks
|
||||
"movie_review.csv": 1, # 1 movie review
|
||||
"movie_note.csv": 0, # No movie notes
|
||||
"tv_mark.csv": 3, # 3 TV marks (show + 2 episodes)
|
||||
"tv_note.csv": 1, # 1 TV note
|
||||
"tv_review.csv": 0, # No TV reviews
|
||||
"music_mark.csv": 0, # No music marks
|
||||
"music_review.csv": 0, # No music reviews
|
||||
"music_note.csv": 0, # No music notes
|
||||
"game_mark.csv": 0, # No game marks
|
||||
"game_review.csv": 0, # No game reviews
|
||||
"game_note.csv": 0, # No game notes
|
||||
"podcast_mark.csv": 0, # No podcast marks
|
||||
"podcast_review.csv": 0, # No podcast reviews
|
||||
"podcast_note.csv": 0, # No podcast notes
|
||||
"performance_mark.csv": 0, # No performance marks
|
||||
"performance_review.csv": 0, # No performance reviews
|
||||
"performance_note.csv": 0, # No performance notes
|
||||
}
|
||||
|
||||
# Check each file
|
||||
for filename, expected_data_count in expected_data_rows.items():
|
||||
file_path = os.path.join(extract_dir, filename)
|
||||
if os.path.exists(file_path):
|
||||
with open(file_path, "r") as file:
|
||||
csv_reader = csv.reader(file)
|
||||
# Skip header row
|
||||
next(csv_reader, None)
|
||||
# Count data rows
|
||||
row_count = sum(1 for _ in csv_reader)
|
||||
self.assertEqual(
|
||||
row_count,
|
||||
expected_data_count,
|
||||
f"File {filename} has {row_count} data rows, expected {expected_data_count}",
|
||||
)
|
||||
|
||||
# Check header row is present by reopening the file
|
||||
with open(file_path, "r") as header_check:
|
||||
first_line = next(header_check, "")
|
||||
self.assertTrue(
|
||||
first_line.strip(),
|
||||
f"File {filename} has no header row",
|
||||
)
|
||||
elif expected_data_count > 0:
|
||||
self.fail(
|
||||
f"Expected file {filename} with {expected_data_count} data rows, but file not found"
|
||||
)
|
||||
|
||||
importer = CsvImporter.create(user=self.user2, file=export_path, visibility=2)
|
||||
importer.run()
|
||||
self.assertEqual(importer.message, "Import complete")
|
||||
|
||||
# Verify imported data
|
||||
|
||||
# Check marks
|
||||
mark_book1_imported = Mark(self.user2.identity, self.book1)
|
||||
self.assertEqual(mark_book1_imported.shelf_type, ShelfType.COMPLETE)
|
||||
self.assertEqual(mark_book1_imported.comment_text, "Great sci-fi classic")
|
||||
self.assertEqual(mark_book1_imported.rating_grade, 10)
|
||||
self.assertEqual(mark_book1_imported.visibility, 2)
|
||||
self.assertEqual(mark_book1_imported.created_time, self.dt)
|
||||
self.assertEqual(
|
||||
set(mark_book1_imported.tags), set(["sci-fi", "favorite", "space"])
|
||||
)
|
||||
|
||||
mark_book2_imported = Mark(self.user2.identity, self.book2)
|
||||
self.assertEqual(mark_book2_imported.shelf_type, ShelfType.PROGRESS)
|
||||
self.assertEqual(mark_book2_imported.comment_text, "Reading it now")
|
||||
self.assertIsNone(mark_book2_imported.rating_grade)
|
||||
self.assertEqual(set(mark_book2_imported.tags), set(["sci-fi", "desert"]))
|
||||
|
||||
mark_movie1_imported = Mark(self.user2.identity, self.movie1)
|
||||
self.assertEqual(mark_movie1_imported.shelf_type, ShelfType.COMPLETE)
|
||||
self.assertEqual(mark_movie1_imported.comment_text, "Mind-bending")
|
||||
self.assertEqual(mark_movie1_imported.rating_grade, 8)
|
||||
self.assertEqual(set(mark_movie1_imported.tags), set(["mindbender", "scifi"]))
|
||||
|
||||
mark_episode1_imported = Mark(self.user2.identity, self.tvepisode1)
|
||||
self.assertEqual(mark_episode1_imported.shelf_type, ShelfType.COMPLETE)
|
||||
self.assertEqual(mark_episode1_imported.comment_text, "Great start")
|
||||
self.assertEqual(mark_episode1_imported.rating_grade, 9)
|
||||
|
||||
# Check reviews
|
||||
book1_reviews = Review.objects.filter(
|
||||
owner=self.user2.identity, item=self.book1
|
||||
)
|
||||
self.assertEqual(book1_reviews.count(), 1)
|
||||
self.assertEqual(book1_reviews[0].title, "My thoughts on Hyperion")
|
||||
self.assertEqual(book1_reviews[0].created_time, self.dt)
|
||||
self.assertIn("masterpiece of science fiction", book1_reviews[0].body)
|
||||
|
||||
movie1_reviews = Review.objects.filter(
|
||||
owner=self.user2.identity, item=self.movie1
|
||||
)
|
||||
self.assertEqual(movie1_reviews.count(), 1)
|
||||
self.assertEqual(movie1_reviews[0].title, "Inception Review")
|
||||
self.assertIn("Christopher Nolan", movie1_reviews[0].body)
|
||||
|
||||
# Check notes
|
||||
book2_notes = Note.objects.filter(owner=self.user2.identity, item=self.book2)
|
||||
self.assertEqual(book2_notes.count(), 1)
|
||||
self.assertEqual(book2_notes[0].title, "Reading progress")
|
||||
self.assertIn("world-building is incredible", book2_notes[0].content)
|
||||
self.assertEqual(book2_notes[0].progress_type, Note.ProgressType.PAGE)
|
||||
self.assertEqual(book2_notes[0].progress_value, "125")
|
||||
|
||||
tvshow_notes = Note.objects.filter(owner=self.user2.identity, item=self.tvshow)
|
||||
self.assertEqual(tvshow_notes.count(), 1)
|
||||
self.assertEqual(tvshow_notes[0].title, "Before watching")
|
||||
self.assertIn("Character development", tvshow_notes[0].content)
|
|
@ -6,7 +6,7 @@ from catalog.models import Edition
|
|||
from journal.models.common import Debris
|
||||
from users.models import User
|
||||
|
||||
from .models import *
|
||||
from ..models import *
|
||||
|
||||
|
||||
class CollectionTest(TestCase):
|
||||
|
@ -300,24 +300,3 @@ class NoteTest(TestCase):
|
|||
self.assertEqual(c, "test ")
|
||||
self.assertEqual(t, Note.ProgressType.CHAPTER)
|
||||
self.assertEqual(v, "2")
|
||||
|
||||
|
||||
class SearchTest(TestCase):
|
||||
databases = "__all__"
|
||||
|
||||
def setUp(self):
|
||||
self.book1 = Edition.objects.create(title="Hyperion")
|
||||
self.book2 = Edition.objects.create(title="Andymion")
|
||||
self.user1 = User.register(email="x@y.com", username="userx")
|
||||
self.index = JournalIndex.instance()
|
||||
self.index.delete_by_owner([self.user1.identity.pk])
|
||||
|
||||
def test_post(self):
|
||||
mark = Mark(self.user1.identity, self.book1)
|
||||
mark.update(ShelfType.WISHLIST, "a gentle comment", 9, ["Sci-Fi", "fic"], 0)
|
||||
mark = Mark(self.user1.identity, self.book2)
|
||||
mark.update(ShelfType.WISHLIST, "a gentle comment", None, ["nonfic"], 1)
|
||||
q = JournalQueryParser("gentle")
|
||||
q.filter_by_owner(self.user1.identity)
|
||||
r = self.index.search(q)
|
||||
self.assertEqual(r.total, 2)
|
27
journal/tests/search.py
Normal file
27
journal/tests/search.py
Normal file
|
@ -0,0 +1,27 @@
|
|||
from django.test import TestCase
|
||||
|
||||
from catalog.models import Edition
|
||||
from users.models import User
|
||||
|
||||
from ..models import *
|
||||
|
||||
|
||||
class SearchTest(TestCase):
|
||||
databases = "__all__"
|
||||
|
||||
def setUp(self):
|
||||
self.book1 = Edition.objects.create(title="Hyperion")
|
||||
self.book2 = Edition.objects.create(title="Andymion")
|
||||
self.user1 = User.register(email="x@y.com", username="userx")
|
||||
self.index = JournalIndex.instance()
|
||||
self.index.delete_by_owner([self.user1.identity.pk])
|
||||
|
||||
def test_post(self):
|
||||
mark = Mark(self.user1.identity, self.book1)
|
||||
mark.update(ShelfType.WISHLIST, "a gentle comment", 9, ["Sci-Fi", "fic"], 0)
|
||||
mark = Mark(self.user1.identity, self.book2)
|
||||
mark.update(ShelfType.WISHLIST, "a gentle comment", None, ["nonfic"], 1)
|
||||
q = JournalQueryParser("gentle")
|
||||
q.filter_by_owner(self.user1.identity)
|
||||
r = self.index.search(q)
|
||||
self.assertEqual(r.total, 2)
|
Loading…
Add table
Reference in a new issue