new catalog data model, wip, not enabled

This commit is contained in:
Your Name 2022-12-07 19:09:05 -05:00
parent 51538db80f
commit 9e219bfac9
66 changed files with 25869 additions and 0 deletions
catalog
test_data
https___api_themoviedb_org_3_find_tt0436992_api_key_19890604_language_zh_CN_external_source_imdb_idhttps___api_themoviedb_org_3_find_tt0827573_api_key_19890604_language_zh_CN_external_source_imdb_idhttps___api_themoviedb_org_3_find_tt1159991_api_key_19890604_language_zh_CN_external_source_imdb_idhttps___api_themoviedb_org_3_find_tt1375666_api_key_19890604_language_zh_CN_external_source_imdb_idhttps___api_themoviedb_org_3_movie_27205_api_key_19890604_language_zh_CN_append_to_response_external_ids_creditshttps___api_themoviedb_org_3_movie_282758_api_key_19890604_language_zh_CN_append_to_response_external_ids_creditshttps___api_themoviedb_org_3_movie_293767_api_key_19890604_language_zh_CN_append_to_response_external_ids_creditshttps___api_themoviedb_org_3_tv_57243_api_key_19890604_language_zh_CN_append_to_response_external_ids_creditshttps___api_themoviedb_org_3_tv_57243_season_4_api_key_19890604_language_zh_CN_append_to_response_external_ids_creditshttps___api_themoviedb_org_3_tv_57243_season_4_episode_1_api_key_19890604_language_zh_CN_append_to_response_external_ids_creditshttps___api_themoviedb_org_3_tv_86941_api_key_19890604_language_zh_CN_append_to_response_external_ids_creditshttps___book_douban_com_subject_1089243_https___book_douban_com_subject_2037260_https___book_douban_com_subject_35902899_https___images_na_ssl_images_amazon_com_images_S_compressed_photo_goodreads_com_books_1405546838i_77566_jpghttps___itunes_apple_com_lookup_id_1050430296https___movie_douban_com_subject_26895436_https___movie_douban_com_subject_3541415_https___movie_douban_com_subject_3627919_https___movie_douban_com_subject_4296866_https___www_douban_com_location_drama_24849279_https___www_goodreads_com_book_show_11798823https___www_goodreads_com_book_show_3597767https___www_goodreads_com_book_show_40961427https___www_goodreads_com_book_show_45064996https___www_goodreads_com_book_show_56821625https___www_goodreads_com_book_show_59952545https___www_goodreads_com_book_show_77566

0
catalog/__init__.py Normal file
View file

3
catalog/admin.py Normal file
View file

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

11
catalog/api.py Normal file
View file

@ -0,0 +1,11 @@
from ninja import NinjaAPI
from .models import Podcast
from django.conf import settings
api = NinjaAPI(title=settings.SITE_INFO['site_name'], version="1.0.0", description=settings.SITE_INFO['site_name'])
@api.get("/podcasts/{item_id}")
def get_item(request, item_id: int):
return Podcast.objects.filter(pk=item_id).first()

6
catalog/apps.py Normal file
View file

@ -0,0 +1,6 @@
from django.apps import AppConfig
class CatalogConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'catalog'

78
catalog/book/models.py Normal file
View file

@ -0,0 +1,78 @@
"""
Models for Book
Series -> Work -> Edition
Series is not fully implemented at the moment
Goodreads
Famous works have many editions
Google Books:
only has Edition level ("volume") data
Douban:
old editions has only CUBN(Chinese Unified Book Number)
work data seems asymmetric (a book page links to a work page, but may not listed on that work page as one of the editions)
"""
from django.db import models
from django.utils.translation import gettext_lazy as _
from catalog.common import *
from .utils import *
class Edition(Item):
isbn = PrimaryLookupIdDescriptor(IdType.ISBN)
asin = PrimaryLookupIdDescriptor(IdType.ASIN)
cubn = PrimaryLookupIdDescriptor(IdType.CUBN)
# douban_book = LookupIdDescriptor(IdType.DoubanBook)
# goodreads = LookupIdDescriptor(IdType.Goodreads)
languages = jsondata.ArrayField(_("语言"), null=True, blank=True, default=list)
publish_year = jsondata.IntegerField(_("发表年份"), null=True, blank=True)
publish_month = jsondata.IntegerField(_("发表月份"), null=True, blank=True)
pages = jsondata.IntegerField(blank=True, default=None)
authors = jsondata.ArrayField(_('作者'), null=False, blank=False, default=list)
translaters = jsondata.ArrayField(_('译者'), null=True, blank=True, default=list)
publishers = jsondata.ArrayField(_('出版方'), null=True, blank=True, default=list)
@property
def isbn10(self):
return isbn_13_to_10(self.isbn)
@isbn10.setter
def isbn10(self, value):
self.isbn = isbn_10_to_13(value)
def update_linked_items_from_extenal_page(self, page):
"""add Work from page.metadata['work'] if not yet"""
w = page.metadata.get('work', None)
if w:
work = Work.objects.filter(primary_lookup_id_type=w['lookup_id_type'], primary_lookup_id_value=w['lookup_id_value']).first()
if work:
if any(edition == self for edition in work.editions.all()):
return
else:
work = Work.objects.create(primary_lookup_id_type=w['lookup_id_type'], primary_lookup_id_value=w['lookup_id_value'], title=w['title'])
work.editions.add(self)
class Work(Item):
# douban_work = PrimaryLookupIdDescriptor(IdType.DoubanBook_Work)
# goodreads_work = PrimaryLookupIdDescriptor(IdType.Goodreads_Work)
editions = models.ManyToManyField(Edition, related_name='works') # , through='WorkEdition'
# def __str__(self):
# return self.title
# class Meta:
# proxy = True
class Series(Item):
# douban_serie = LookupIdDescriptor(IdType.DoubanBook_Serie)
# goodreads_serie = LookupIdDescriptor(IdType.Goodreads_Serie)
class Meta:
proxy = True

205
catalog/book/tests.py Normal file
View file

@ -0,0 +1,205 @@
from django.test import TestCase
from catalog.book.models import *
from catalog.common import *
class BookTestCase(TestCase):
def setUp(self):
hyperion = Edition.objects.create(title="Hyperion")
hyperion.pages = 500
hyperion.isbn = '9780553283686'
hyperion.save()
# hyperion.isbn10 = '0553283685'
def test_properties(self):
hyperion = Edition.objects.get(title="Hyperion")
self.assertEqual(hyperion.title, "Hyperion")
self.assertEqual(hyperion.pages, 500)
self.assertEqual(hyperion.primary_lookup_id_type, IdType.ISBN)
self.assertEqual(hyperion.primary_lookup_id_value, '9780553283686')
andymion = Edition(title="Andymion", pages=42)
self.assertEqual(andymion.pages, 42)
def test_lookupids(self):
hyperion = Edition.objects.get(title="Hyperion")
hyperion.asin = 'B004G60EHS'
self.assertEqual(hyperion.primary_lookup_id_type, IdType.ASIN)
self.assertEqual(hyperion.primary_lookup_id_value, 'B004G60EHS')
self.assertEqual(hyperion.isbn, None)
self.assertEqual(hyperion.isbn10, None)
def test_isbn(self):
hyperion = Edition.objects.get(title="Hyperion")
self.assertEqual(hyperion.isbn, '9780553283686')
self.assertEqual(hyperion.isbn10, '0553283685')
hyperion.isbn10 = '0575099437'
self.assertEqual(hyperion.isbn, '9780575099432')
self.assertEqual(hyperion.isbn10, '0575099437')
def test_work(self):
hyperion_print = Edition.objects.get(title="Hyperion")
hyperion_ebook = Edition(title="Hyperion")
hyperion_ebook.save()
hyperion_ebook.asin = 'B0043M6780'
hyperion = Work(title="Hyperion")
hyperion.save()
hyperion.editions.add(hyperion_print)
hyperion.editions.add(hyperion_ebook)
# andymion = Edition(title="Andymion", pages=42)
# serie = Serie(title="Hyperion Cantos")
class GoodreadsTestCase(TestCase):
def setUp(self):
pass
def test_parse(self):
t_type = IdType.Goodreads
t_id = '77566'
t_url = 'https://www.goodreads.com/zh/book/show/77566.Hyperion'
t_url2 = 'https://www.goodreads.com/book/show/77566'
p1 = SiteList.get_site_by_id_type(t_type)
p2 = SiteList.get_site_by_url(t_url)
self.assertEqual(p1.id_to_url(t_id), t_url2)
self.assertEqual(p2.url_to_id(t_url), t_id)
@use_local_response
def test_scrape(self):
t_url = 'https://www.goodreads.com/book/show/77566.Hyperion'
t_url2 = 'https://www.goodreads.com/book/show/77566'
isbn = '9780553283686'
site = SiteList.get_site_by_url(t_url)
self.assertEqual(site.ready, False)
self.assertEqual(site.url, t_url2)
site.get_page()
self.assertEqual(site.ready, False)
self.assertIsNotNone(site.page)
site.get_page_ready()
self.assertEqual(site.ready, True)
self.assertEqual(site.page.metadata.get('title'), 'Hyperion')
self.assertEqual(site.page.metadata.get('isbn'), isbn)
self.assertEqual(site.page.metadata['work']['lookup_id_value'], '1383900')
self.assertEqual(site.page.metadata['work']['title'], 'Hyperion')
edition = Edition.objects.get(primary_lookup_id_type=IdType.ISBN, primary_lookup_id_value=isbn)
page = edition.external_pages.all().first()
self.assertEqual(page.id_type, IdType.Goodreads)
self.assertEqual(page.id_value, '77566')
self.assertNotEqual(page.cover, '/media/item/default.svg')
self.assertEqual(edition.isbn, '9780553283686')
self.assertEqual(edition.title, 'Hyperion')
edition.delete()
site = SiteList.get_site_by_url(t_url)
self.assertEqual(site.ready, False)
self.assertEqual(site.url, t_url2)
site.get_page()
self.assertEqual(site.ready, True, 'previous page should still exist with data')
@use_local_response
def test_asin(self):
t_url = 'https://www.goodreads.com/book/show/45064996-hyperion'
site = SiteList.get_site_by_url(t_url)
site.get_page_ready()
self.assertEqual(site.page.item.title, 'Hyperion')
self.assertEqual(site.page.item.asin, 'B004G60EHS')
@use_local_response
def test_work(self):
# url = 'https://www.goodreads.com/work/editions/153313'
url1 = 'https://www.goodreads.com/book/show/3597767-rok-1984'
url2 = 'https://www.goodreads.com/book/show/40961427-1984'
p1 = SiteList.get_site_by_url(url1).get_page_ready()
p2 = SiteList.get_site_by_url(url2).get_page_ready()
w1 = p1.item.works.all().first()
w2 = p2.item.works.all().first()
self.assertEqual(w1.title, 'Nineteen Eighty-Four')
self.assertEqual(w2.title, 'Nineteen Eighty-Four')
self.assertEqual(w1, w2)
class DoubanTestCase(TestCase):
def setUp(self):
pass
def test_parse(self):
t_type = IdType.DoubanBook
t_id = '35902899'
t_url = 'https://m.douban.com/book/subject/35902899/'
t_url2 = 'https://book.douban.com/subject/35902899/'
p1 = SiteList.get_site_by_url(t_url)
p2 = SiteList.get_site_by_url(t_url2)
self.assertEqual(p1.url, t_url2)
self.assertEqual(p1.ID_TYPE, t_type)
self.assertEqual(p1.id_value, t_id)
self.assertEqual(p2.url, t_url2)
@use_local_response
def test_scrape(self):
t_url = 'https://book.douban.com/subject/35902899/'
site = SiteList.get_site_by_url(t_url)
self.assertEqual(site.ready, False)
site.get_page_ready()
self.assertEqual(site.ready, True)
self.assertEqual(site.page.metadata.get('title'), '1984 Nineteen Eighty-Four')
self.assertEqual(site.page.metadata.get('isbn'), '9781847498571')
self.assertEqual(site.page.id_type, IdType.DoubanBook)
self.assertEqual(site.page.id_value, '35902899')
self.assertEqual(site.page.item.isbn, '9781847498571')
self.assertEqual(site.page.item.title, '1984 Nineteen Eighty-Four')
@use_local_response
def test_work(self):
# url = 'https://www.goodreads.com/work/editions/153313'
url1 = 'https://book.douban.com/subject/1089243/'
url2 = 'https://book.douban.com/subject/2037260/'
p1 = SiteList.get_site_by_url(url1).get_page_ready()
p2 = SiteList.get_site_by_url(url2).get_page_ready()
w1 = p1.item.works.all().first()
w2 = p2.item.works.all().first()
self.assertEqual(w1.title, '黄金时代')
self.assertEqual(w2.title, '黄金时代')
self.assertEqual(w1, w2)
self.assertEqual(w1.editions.all().count(), 2)
self.assertEqual(w1.editions.all()[0].title, '黄金时代')
self.assertEqual(w1.editions.all()[1].title, 'Wang in Love and Bondage')
class MultiBookSitesTestCase(TestCase):
@use_local_response
def test_editions(self):
# isbn = '9781847498571'
url1 = 'https://www.goodreads.com/book/show/56821625-1984'
url2 = 'https://book.douban.com/subject/35902899/'
p1 = SiteList.get_site_by_url(url1).get_page_ready()
p2 = SiteList.get_site_by_url(url2).get_page_ready()
self.assertEqual(p1.item.id, p2.item.id)
@use_local_response
def test_works(self):
# url1 and url4 has same ISBN, hence they share same Edition instance, which belongs to 2 Work instances
url1 = 'https://book.douban.com/subject/1089243/'
url2 = 'https://book.douban.com/subject/2037260/'
url3 = 'https://www.goodreads.com/book/show/59952545-golden-age'
url4 = 'https://www.goodreads.com/book/show/11798823'
p1 = SiteList.get_site_by_url(url1).get_page_ready() # lxml bug may break this
w1 = p1.item.works.all().first()
p2 = SiteList.get_site_by_url(url2).get_page_ready()
w2 = p2.item.works.all().first()
self.assertEqual(w1, w2)
self.assertEqual(p1.item.works.all().count(), 1)
p3 = SiteList.get_site_by_url(url3).get_page_ready()
w3 = p3.item.works.all().first()
self.assertNotEqual(w3, w2)
p4 = SiteList.get_site_by_url(url4).get_page_ready()
self.assertEqual(p4.item.works.all().count(), 2)
self.assertEqual(p1.item.works.all().count(), 2)
w2e = w2.editions.all().order_by('title')
self.assertEqual(w2e.count(), 2)
self.assertEqual(w2e[0].title, 'Wang in Love and Bondage')
self.assertEqual(w2e[1].title, '黄金时代')
w3e = w3.editions.all().order_by('title')
self.assertEqual(w3e.count(), 2)
self.assertEqual(w3e[0].title, 'Golden Age: A Novel')
self.assertEqual(w3e[1].title, '黄金时代')
e = Edition.objects.get(primary_lookup_id_value=9781662601217)
self.assertEqual(e.title, 'Golden Age: A Novel')

45
catalog/book/utils.py Normal file
View file

@ -0,0 +1,45 @@
def check_digit_10(isbn):
assert len(isbn) == 9
sum = 0
for i in range(len(isbn)):
c = int(isbn[i])
w = i + 1
sum += w * c
r = sum % 11
return 'X' if r == 10 else str(r)
def check_digit_13(isbn):
assert len(isbn) == 12
sum = 0
for i in range(len(isbn)):
c = int(isbn[i])
w = 3 if i % 2 else 1
sum += w * c
r = 10 - (sum % 10)
return '0' if r == 10 else str(r)
def isbn_10_to_13(isbn):
if not isbn or len(isbn) != 10:
return None
return '978' + isbn[:-1] + check_digit_13('978' + isbn[:-1])
def isbn_13_to_10(isbn):
if not isbn or len(isbn) != 13 or isbn[:3] != '978':
return None
else:
return isbn[3:12] + check_digit_10(isbn[3:12])
def is_isbn_13(isbn):
return len(isbn) == 13
def is_isbn_10(isbn):
return len(isbn) == 10 and isbn[0] >= '0' and isbn[0] <= '9'
def is_asin(asin):
return len(asin) == 10 and asin[0].lower == 'b'

View file

@ -0,0 +1,8 @@
from .models import *
from .sites import *
from .downloaders import *
from .scrapers import *
from . import jsondata
__all__ = ('IdType', 'Item', 'ExternalPage', 'PageData', 'ParseError', 'ScraperMixin', 'AbstractSite', 'SiteList', 'jsondata', 'PrimaryLookupIdDescriptor', 'LookupIdDescriptor', 'setMockMode', 'use_local_response', 'RetryDownloader', 'BasicDownloader', 'ProxiedDownloader', 'BasicImageDownloader', 'RESPONSE_OK', 'RESPONSE_NETWORK_ERROR', 'RESPONSE_INVALID_CONTENT', 'RESPONSE_CENSORSHIP')

View file

@ -0,0 +1,186 @@
import requests
import filetype
from PIL import Image
from io import BytesIO
from requests.exceptions import RequestException
from django.conf import settings
from .utils import MockResponse
import re
import time
import logging
logger = logging.getLogger(__name__)
RESPONSE_OK = 0 # response is ready for pasring
RESPONSE_INVALID_CONTENT = -1 # content not valid but no need to retry
RESPONSE_NETWORK_ERROR = -2 # network error, retry next proxied url
RESPONSE_CENSORSHIP = -3 # censored, try sth special if possible
MockMode = False
def use_local_response(func):
def _func(args):
setMockMode(True)
func(args)
setMockMode(False)
return _func
def setMockMode(enabled):
global MockMode
MockMode = enabled
class DownloadError(Exception):
def __init__(self, downloader):
self.url = downloader.url
self.logs = downloader.logs
if downloader.response_type == RESPONSE_INVALID_CONTENT:
error = "Invalid Response"
elif downloader.response_type == RESPONSE_NETWORK_ERROR:
error = "Network Error"
elif downloader.response_type == RESPONSE_NETWORK_ERROR:
error = "Censored Content"
else:
error = "Unknown Error"
self.message = f"Download Failed: {error}, url: {self.url}"
super().__init__(self.message)
class BasicDownloader:
headers = {
# 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:107.0) Gecko/20100101 Firefox/107.0',
'User-Agent': 'Mozilla/5.0 (iPad; CPU OS 14_7_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Mobile/15E148 Safari/604.1',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'DNT': '1',
'Upgrade-Insecure-Requests': '1',
'Cache-Control': 'no-cache',
}
def __init__(self, url):
self.url = url
self.response_type = RESPONSE_OK
self.logs = []
def get_timeout(self):
return settings.SCRAPING_TIMEOUT
def validate_response(self, response):
if response is None:
return RESPONSE_NETWORK_ERROR
elif response.status_code == 200:
return RESPONSE_OK
else:
return RESPONSE_INVALID_CONTENT
def _download(self, url):
try:
if not MockMode:
# TODO cache = get/set from redis
resp = requests.get(url, headers=self.headers, timeout=self.get_timeout())
if settings.DOWNLOADER_SAVEDIR:
with open(settings.DOWNLOADER_SAVEDIR + '/' + re.sub(r'[^\w]', '_', url), 'w', encoding='utf-8') as fp:
fp.write(resp.text)
else:
resp = MockResponse(self.url)
response_type = self.validate_response(resp)
self.logs.append({'response_type': response_type, 'url': url, 'exception': None})
return resp, response_type
except RequestException as e:
self.logs.append({'response_type': RESPONSE_NETWORK_ERROR, 'url': url, 'exception': e})
return None, RESPONSE_NETWORK_ERROR
def download(self):
resp, self.response_type = self._download(self.url)
if self.response_type == RESPONSE_OK:
return resp
else:
raise DownloadError(self)
class ProxiedDownloader(BasicDownloader):
def get_proxied_urls(self):
urls = []
if settings.PROXYCRAWL_KEY is not None:
urls.append(f'https://api.proxycrawl.com/?token={settings.PROXYCRAWL_KEY}&url={self.url}')
if settings.SCRAPESTACK_KEY is not None:
# urls.append(f'http://api.scrapestack.com/scrape?access_key={settings.SCRAPESTACK_KEY}&url={self.url}')
urls.append(f'http://api.scrapestack.com/scrape?keep_headers=1&access_key={settings.SCRAPESTACK_KEY}&url={self.url}')
if settings.SCRAPERAPI_KEY is not None:
urls.append(f'http://api.scraperapi.com/?api_key={settings.SCRAPERAPI_KEY}&url={self.url}')
return urls
def get_special_proxied_url(self):
return f'{settings.LOCAL_PROXY}?url={self.url}' if settings.LOCAL_PROXY is not None else None
def download(self):
urls = self.get_proxied_urls()
last_try = False
url = urls.pop(0) if len(urls) else None
resp = None
while url:
resp, resp_type = self._download(url)
if resp_type == RESPONSE_OK or resp_type == RESPONSE_INVALID_CONTENT or last_try:
url = None
elif resp_type == RESPONSE_CENSORSHIP:
url = self.get_special_proxied_url()
last_try = True
else: # resp_type == RESPONSE_NETWORK_ERROR:
url = urls.pop(0) if len(urls) else None
self.response_type = resp_type
if self.response_type == RESPONSE_OK:
return resp
else:
raise DownloadError(self)
class RetryDownloader(BasicDownloader):
def download(self):
retries = settings.DOWNLOADER_RETRIES
while retries:
retries -= 1
resp, self.response_type = self._download(self.url)
if self.response_type == RESPONSE_OK:
return resp
elif self.response_type != RESPONSE_NETWORK_ERROR and retries == 0:
raise DownloadError(self)
elif retries > 0:
time.sleep((settings.DOWNLOADER_RETRIES - retries) * 0.5)
class ImageDownloaderMixin:
def __init__(self, url, referer=None):
if referer is not None:
self.headers['Referer'] = referer
super().__init__(url)
def validate_response(self, response):
if response and response.status_code == 200:
try:
raw_img = response.content
img = Image.open(BytesIO(raw_img))
img.load() # corrupted image will trigger exception
content_type = response.headers.get('Content-Type')
self.extention = filetype.get_type(mime=content_type.partition(';')[0].strip()).extension
return RESPONSE_OK
except Exception:
return RESPONSE_NETWORK_ERROR
if response and response.status_code >= 400 and response.status_code < 500:
return RESPONSE_INVALID_CONTENT
else:
return RESPONSE_NETWORK_ERROR
class BasicImageDownloader(ImageDownloaderMixin, BasicDownloader):
pass
class ProxiedImageDownloader(ImageDownloaderMixin, ProxiedDownloader):
pass

201
catalog/common/jsondata.py Normal file
View file

@ -0,0 +1,201 @@
import copy
from datetime import date, datetime
from importlib import import_module
import django
from django.conf import settings
from django.core.exceptions import FieldError
from django.db.models import fields
from django.utils import dateparse, timezone
from functools import partialmethod
from django.db.models import JSONField
__all__ = ('BooleanField', 'CharField', 'DateField', 'DateTimeField', 'DecimalField', 'EmailField', 'FloatField', 'IntegerField', 'IPAddressField', 'GenericIPAddressField', 'NullBooleanField', 'TextField', 'TimeField', 'URLField', 'ArrayField')
class JSONFieldDescriptor(object):
def __init__(self, field):
self.field = field
def __get__(self, instance, cls=None):
if instance is None:
return self
json_value = getattr(instance, self.field.json_field_name)
if isinstance(json_value, dict):
if self.field.attname in json_value or not self.field.has_default():
value = json_value.get(self.field.attname, None)
if hasattr(self.field, 'from_json'):
value = self.field.from_json(value)
return value
else:
default = self.field.get_default()
if hasattr(self.field, 'to_json'):
json_value[self.field.attname] = self.field.to_json(default)
else:
json_value[self.field.attname] = default
return default
return None
def __set__(self, instance, value):
json_value = getattr(instance, self.field.json_field_name)
if json_value:
assert isinstance(json_value, dict)
else:
json_value = {}
if hasattr(self.field, 'to_json'):
value = self.field.to_json(value)
if not value and self.field.blank and not self.field.null:
try:
del json_value[self.field.attname]
except KeyError:
pass
else:
json_value[self.field.attname] = value
setattr(instance, self.field.json_field_name, json_value)
class JSONFieldMixin(object):
"""
Override django.db.model.fields.Field.contribute_to_class
to make a field always private, and register custom access descriptor
"""
def __init__(self, *args, **kwargs):
self.json_field_name = kwargs.pop('json_field_name', 'metadata')
super(JSONFieldMixin, self).__init__(*args, **kwargs)
def contribute_to_class(self, cls, name, private_only=False):
self.set_attributes_from_name(name)
self.model = cls
self.concrete = False
self.column = self.json_field_name
cls._meta.add_field(self, private=True)
if not getattr(cls, self.attname, None):
descriptor = JSONFieldDescriptor(self)
setattr(cls, self.attname, descriptor)
if self.choices is not None:
setattr(cls, 'get_%s_display' % self.name,
partialmethod(cls._get_FIELD_display, field=self))
def get_lookup(self, lookup_name):
# Always return None, to make get_transform been called
return None
def get_transform(self, name):
class TransformFactoryWrapper:
def __init__(self, json_field, transform, original_lookup):
self.json_field = json_field
self.transform = transform
self.original_lookup = original_lookup
def __call__(self, lhs, **kwargs):
lhs = copy.copy(lhs)
lhs.target = self.json_field
lhs.output_field = self.json_field
transform = self.transform(lhs, **kwargs)
transform._original_get_lookup = transform.get_lookup
transform.get_lookup = lambda name: transform._original_get_lookup(self.original_lookup)
return transform
json_field = self.model._meta.get_field(self.json_field_name)
transform = json_field.get_transform(self.name)
if transform is None:
raise FieldError(
"JSONField '%s' has no support for key '%s' %s lookup" %
(self.json_field_name, self.name, name)
)
return TransformFactoryWrapper(json_field, transform, name)
class BooleanField(JSONFieldMixin, fields.BooleanField):
def __init__(self, *args, **kwargs):
super(BooleanField, self).__init__(*args, **kwargs)
if django.VERSION < (2, ):
self.blank = False
class CharField(JSONFieldMixin, fields.CharField):
pass
class DateField(JSONFieldMixin, fields.DateField):
def to_json(self, value):
if value:
assert isinstance(value, (datetime, date))
return value.strftime('%Y-%m-%d')
def from_json(self, value):
if value is not None:
return dateparse.parse_date(value)
class DateTimeField(JSONFieldMixin, fields.DateTimeField):
def to_json(self, value):
if value:
if not timezone.is_aware(value):
value = timezone.make_aware(value)
return value.isoformat()
def from_json(self, value):
if value:
return dateparse.parse_datetime(value)
class DecimalField(JSONFieldMixin, fields.DecimalField):
pass
class EmailField(JSONFieldMixin, fields.EmailField):
pass
class FloatField(JSONFieldMixin, fields.FloatField):
pass
class IntegerField(JSONFieldMixin, fields.IntegerField):
pass
class IPAddressField(JSONFieldMixin, fields.IPAddressField):
pass
class GenericIPAddressField(JSONFieldMixin, fields.GenericIPAddressField):
pass
class NullBooleanField(JSONFieldMixin, fields.NullBooleanField):
pass
class TextField(JSONFieldMixin, fields.TextField):
pass
class TimeField(JSONFieldMixin, fields.TimeField):
def to_json(self, value):
if value:
if not timezone.is_aware(value):
value = timezone.make_aware(value)
return value.isoformat()
def from_json(self, value):
if value:
return dateparse.parse_time(value)
class URLField(JSONFieldMixin, fields.URLField):
pass
class ArrayField(JSONFieldMixin, JSONField):
pass

264
catalog/common/models.py Normal file
View file

@ -0,0 +1,264 @@
from polymorphic.models import PolymorphicModel
from django.db import models
from django.utils.translation import gettext_lazy as _
from django.utils import timezone
from django.core.files.uploadedfile import SimpleUploadedFile
from django.contrib.contenttypes.models import ContentType
import uuid
from .utils import DEFAULT_ITEM_COVER, item_cover_path
# from django.conf import settings
class IdType(models.TextChoices):
WikiData = 'wikidata', _('维基数据')
ISBN10 = 'isbn10', _('ISBN10')
ISBN = 'isbn', _('ISBN') # ISBN 13
ASIN = 'asin', _('ASIN')
ISSN = 'issn', _('ISSN')
CUBN = 'cubn', _('统一书号')
ISRC = 'isrc', _('ISRC') # only for songs
UPC = 'upc', _('GTIN UPC EAN码')
Feed = 'feed', _('Feed URL')
IMDB = 'imdb', _('IMDb')
TMDB_TV = 'tmdb_tv', _('TMDB剧集')
TMDB_TVSeason = 'tmdb_tvseason', _('TMDB剧集')
TMDB_TVEpisode = 'tmdb_tvepisode', _('TMDB剧集')
TMDB_Movie = 'tmdb_movie', _('TMDB电影')
Goodreads = 'goodreads', _('Goodreads')
Goodreads_Work = 'goodreads_work', _('Goodreads著作')
GoogleBook = 'googlebook', _('谷歌图书')
DoubanBook = 'doubanbook', _('豆瓣读书')
DoubanBook_Work = 'doubanbook_work', _('豆瓣读书著作')
DoubanMovie = 'doubanmovie', _('豆瓣电影')
DoubanMusic = 'doubanmusic', _('豆瓣音乐')
DoubanGame = 'doubangame', _('豆瓣游戏')
DoubanDrama = 'doubandrama', _('豆瓣舞台剧')
Bandcamp = 'bandcamp', _('Bandcamp')
Spotify_Album = 'spotify_album', _('Spotify专辑')
Spotify_Show = 'spotify_show', _('Spotify播客')
DoubanBook_Author = 'doubanbook_author', _('豆瓣读书作者')
DoubanCelebrity = 'doubanmovie_celebrity', _('豆瓣电影影人')
Goodreads_Author = 'goodreads_author', _('Goodreads作者')
Spotify_Artist = 'spotify_artist', _('Spotify艺术家')
TMDB_Person = 'tmdb_person', _('TMDB影人')
IGDB = 'igdb', _('IGDB游戏')
Steam = 'steam', _('Steam游戏')
ApplePodcast = 'apple_podcast', _('苹果播客')
class ItemType(models.TextChoices):
Book = 'book', _('')
TV = 'tv', _('剧集')
TVSeason = 'tvseason', _('剧集分季')
TVEpisode = 'tvepisode', _('剧集分集')
Movie = 'movie', _('电影')
Music = 'music', _('音乐')
Game = 'game', _('游戏')
Boardgame = 'boardgame', _('桌游')
Podcast = 'podcast', _('播客')
FanFic = 'fanfic', _('网文')
Performance = 'performance', _('演出')
Exhibition = 'exhibition', _('展览')
class SubItemType(models.TextChoices):
Season = 'season', _('剧集分季')
Episode = 'episode', _('剧集分集')
Version = 'version', _('版本')
# class CreditType(models.TextChoices):
# Author = 'author', _('作者')
# Translater = 'translater', _('译者')
# Producer = 'producer', _('出品人')
# Director = 'director', _('电影')
# Actor = 'actor', _('演员')
# Playwright = 'playwright', _('播客')
# VoiceActor = 'voiceactor', _('配音')
# Host = 'host', _('主持人')
# Developer = 'developer', _('开发者')
# Publisher = 'publisher', _('出版方')
class PrimaryLookupIdDescriptor(object): # TODO make it mixin of Field
def __init__(self, id_type):
self.id_type = id_type
def __get__(self, instance, cls=None):
if instance is None:
return self
if self.id_type != instance.primary_lookup_id_type:
return None
return instance.primary_lookup_id_value
def __set__(self, instance, id_value):
if id_value:
instance.primary_lookup_id_type = self.id_type
instance.primary_lookup_id_value = id_value
else:
instance.primary_lookup_id_type = None
instance.primary_lookup_id_value = None
class LookupIdDescriptor(object): # TODO make it mixin of Field
def __init__(self, id_type):
self.id_type = id_type
def __get__(self, instance, cls=None):
if instance is None:
return self
return instance.get_lookup_id(self.id_type)
def __set__(self, instance, value):
instance.set_lookup_id(self.id_type, value)
# class ItemId(models.Model):
# item = models.ForeignKey('Item', models.CASCADE)
# id_type = models.CharField(_("源网站"), blank=False, choices=IdType.choices, max_length=50)
# id_value = models.CharField(_("源网站ID"), blank=False, max_length=1000)
# class ItemCredit(models.Model):
# item = models.ForeignKey('Item', models.CASCADE)
# credit_type = models.CharField(_("类型"), choices=CreditType.choices, blank=False, max_length=50)
# name = models.CharField(_("名字"), blank=False, max_length=1000)
# def check_source_id(sid):
# if not sid:
# return True
# s = sid.split(':')
# if len(s) < 2:
# return False
# return sid[0] in IdType.values()
class Item(PolymorphicModel):
uid = models.UUIDField(default=uuid.uuid4, editable=False)
# item_type = models.CharField(_("类型"), choices=ItemType.choices, blank=False, max_length=50)
title = models.CharField(_("title in primary language"), max_length=1000, default="")
# title_ml = models.JSONField(_("title in different languages {['lang':'zh-cn', 'text':'', primary:True], ...}"), null=True, blank=True, default=list)
brief = models.TextField(_("简介"), blank=True, default="")
# brief_ml = models.JSONField(_("brief in different languages {['lang':'zh-cn', 'text':'', primary:True], ...}"), null=True, blank=True, default=list)
genres = models.JSONField(_("分类"), null=True, blank=True, default=list)
primary_lookup_id_type = models.CharField(_("isbn/cubn/imdb"), blank=False, null=True, max_length=50)
primary_lookup_id_value = models.CharField(_("1234/tt789"), blank=False, null=True, max_length=1000)
metadata = models.JSONField(_("其他信息"), blank=True, null=True, default=dict)
cover = models.ImageField(upload_to=item_cover_path, default=DEFAULT_ITEM_COVER, blank=True)
created_time = models.DateTimeField(auto_now_add=True)
edited_time = models.DateTimeField(auto_now=True)
# parent_item = models.ForeignKey('Item', null=True, on_delete=models.SET_NULL, related_name='child_items')
# identical_item = models.ForeignKey('Item', null=True, on_delete=models.SET_NULL, related_name='identical_items')
# def get_lookup_id(self, id_type: str) -> str:
# prefix = id_type.strip().lower() + ':'
# return next((x[len(prefix):] for x in self.lookup_ids if x.startswith(prefix)), None)
class Meta:
unique_together = [['polymorphic_ctype_id', 'primary_lookup_id_type', 'primary_lookup_id_value']]
def __str__(self):
return f"{self.id}{' ' + self.primary_lookup_id_type + ':' + self.primary_lookup_id_value if self.primary_lookup_id_value else ''} ({self.title})"
@classmethod
def get_best_lookup_id(cls, lookup_ids):
""" get best available lookup id, ideally commonly used """
best_id_types = [IdType.ISBN, IdType.CUBN, IdType.ASIN, IdType.IMDB, IdType.Feed, IdType.TMDB_TVSeason]
for t in best_id_types:
if lookup_ids.get(t):
return t, lookup_ids[t]
return list(lookup_ids.items())[0]
def update_lookup_ids(self, lookup_ids):
# TODO
# ll = set(lookup_ids)
# ll = list(filter(lambda a, b: b, ll))
# print(ll)
pass
METADATA_COPY_LIST = ['title', 'brief'] # list of metadata keys to copy from page to item
@classmethod
def copy_metadata(cls, metadata):
return dict((k, v) for k, v in metadata.items() if k in cls.METADATA_COPY_LIST and v is not None)
def merge_data_from_extenal_pages(self):
"""Subclass may override this"""
lookup_ids = []
for p in self.external_pages.all():
lookup_ids.append((p.id_type, p.id_value))
lookup_ids += p.other_lookup_ids.items()
for k in self.METADATA_COPY_LIST:
if not getattr(self, k) and p.metadata.get(k):
setattr(self, k, p.metadata.get(k))
if not self.cover and p.cover:
self.cover = p.cover
self.update_lookup_ids(lookup_ids)
def update_linked_items_from_extenal_page(self, page):
"""Subclass should override this"""
pass
class ItemLookupId(models.Model):
item = models.ForeignKey(Item, null=True, on_delete=models.SET_NULL, related_name='lookup_ids')
id_type = models.CharField(_("源网站"), blank=True, choices=IdType.choices, max_length=50)
id_value = models.CharField(_("源网站ID"), blank=True, max_length=1000)
raw_url = models.CharField(_("源网站ID"), blank=True, max_length=1000, unique=True)
class Meta:
unique_together = [['id_type', 'id_value']]
class ExternalPage(models.Model):
item = models.ForeignKey(Item, null=True, on_delete=models.SET_NULL, related_name='external_pages')
id_type = models.CharField(_("IdType of the source site"), blank=False, choices=IdType.choices, max_length=50)
id_value = models.CharField(_("Primary Id on the source site"), blank=False, max_length=1000)
url = models.CharField(_("url to the page"), blank=False, max_length=1000, unique=True)
cover = models.ImageField(upload_to=item_cover_path, default=DEFAULT_ITEM_COVER, blank=True)
other_lookup_ids = models.JSONField(default=dict)
metadata = models.JSONField(default=dict)
scraped_time = models.DateTimeField(null=True)
created_time = models.DateTimeField(auto_now_add=True)
edited_time = models.DateTimeField(auto_now=True)
class Meta:
unique_together = [['id_type', 'id_value']]
def __str__(self):
return f"{self.id}{':' + self.id_type + ':' + self.id_value if self.id_value else ''} ({self.url})"
def update_content(self, page_data):
self.other_lookup_ids = page_data.lookup_ids
self.metadata = page_data.metadata
if page_data.cover_image and page_data.cover_image_extention:
self.cover = SimpleUploadedFile('temp.' + page_data.cover_image_extention, page_data.cover_image)
self.scraped_time = timezone.now()
self.save()
@property
def ready(self):
return bool(self.metadata)
def get_all_lookup_ids(self):
d = self.other_lookup_ids.copy()
d[self.id_type] = self.id_value
d = {k: v for k, v in d.items() if bool(v)}
return d
def get_preferred_model(self):
model = self.metadata.get('preferred_model')
if model:
m = ContentType.objects.filter(app_label='catalog', model=model.lower()).first()
if m:
return m.model_class()
else:
raise ValueError(f'preferred model {model} does not exist')
return None
def get_dependent_urls(self):
ll = self.metadata.get('dependent_urls')
return ll if ll else []
def get_related_urls(self):
ll = self.metadata.get('related_urls')
return ll if ll else []

View file

@ -0,0 +1,23 @@
class ParseError(Exception):
def __init__(self, scraper, field):
msg = f'{type(scraper).__name__}: Error parsing field "{field}" for url {scraper.url}'
super().__init__(msg)
class ScraperMixin:
def set_field(self, field, value=None):
self.data[field] = value
def parse_str(self, query):
elem = self.html.xpath(query)
return elem[0].strip() if elem else None
def parse_field(self, field, query, error_when_missing=False):
elem = self.html.xpath(query)
if elem:
self.data[field] = elem[0].strip()
elif error_when_missing:
raise ParseError(self, field)
else:
self.data[field] = None
return elem

135
catalog/common/sites.py Normal file
View file

@ -0,0 +1,135 @@
from typing import *
import re
from .models import ExternalPage
from dataclasses import dataclass, field
@dataclass
class PageData:
lookup_ids: dict = field(default_factory=dict)
metadata: dict = field(default_factory=dict)
cover_image = None
cover_image_extention: str = None
class AbstractSite:
"""
Abstract class to represent a site
"""
ID_TYPE = None
WIKI_PROPERTY_ID = 'P0undefined0'
DEFAULT_MODEL = None
URL_PATTERNS = [r"\w+://undefined/(\d+)"]
@classmethod
def validate_url(self, url: str):
u = next(iter([re.match(p, url) for p in self.URL_PATTERNS if re.match(p, url)]), None)
return u is not None
@classmethod
def id_to_url(self, id_value):
return 'https://undefined/' + id_value
@classmethod
def url_to_id(self, url: str):
u = next(iter([re.match(p, url) for p in self.URL_PATTERNS if re.match(p, url)]), None)
return u[1] if u else None
def __str__(self):
return f'<{self.__class__.__name__}: {self.url}>'
def __init__(self, url=None):
self.id_value = self.url_to_id(url) if url else None
self.url = self.id_to_url(self.id_value) if url else None
self.page = None
def get_page(self):
if not self.page:
self.page = ExternalPage.objects.filter(url=self.url).first()
if self.page is None:
self.page = ExternalPage(id_type=self.ID_TYPE, id_value=self.id_value, url=self.url)
return self.page
def scrape(self) -> PageData:
"""subclass should implement this, return PageData object"""
data = PageData()
return data
def get_item(self):
p = self.get_page()
if not p:
raise ValueError(f'page not available for {self.url}')
model = p.get_preferred_model()
if not model:
model = self.DEFAULT_MODEL
t, v = model.get_best_lookup_id(p.get_all_lookup_ids())
if t is not None:
p.item = model.objects.filter(primary_lookup_id_type=t, primary_lookup_id_value=v).first()
if p.item is None:
obj = model.copy_metadata(p.metadata)
obj['primary_lookup_id_type'] = t
obj['primary_lookup_id_value'] = v
p.item = model.objects.create(**obj)
return p.item
@property
def ready(self):
return bool(self.page and self.page.ready)
def get_page_ready(self, auto_save=True, auto_create=True, auto_link=True):
"""return a page scraped, or scrape if not yet"""
if auto_link:
auto_create = True
if auto_create:
auto_save = True
p = self.get_page()
pagedata = {}
if not self.page:
return None
if not p.ready:
pagedata = self.scrape()
p.update_content(pagedata)
if not p.ready:
logger.error(f'unable to get page {self.url} ready')
return None
if auto_create and p.item is None:
self.get_item()
if auto_save:
p.save()
if p.item:
p.item.merge_data_from_extenal_pages()
p.item.save()
if auto_link:
# todo rewrite this
p.item.update_linked_items_from_extenal_page(p)
return p
def get_dependent_pages_ready(self, urls):
# set depth = 2 so in a case of douban season can find an IMDB episode then a TMDB Serie
pass
class SiteList:
registry = {}
@classmethod
def register(cls, target) -> Callable:
id_type = target.ID_TYPE
if id_type in cls.registry:
raise ValueError(f'Site for {id_type} already exists')
cls.registry[id_type] = target
return target
@classmethod
def get_site_by_id_type(cls, typ: str):
return cls.registry[typ]() if typ in cls.registry else None
@classmethod
def get_site_by_url(cls, url: str):
cls = next(filter(lambda p: p.validate_url(url), cls.registry.values()), None)
return cls(url) if cls else None
@classmethod
def get_id_by_url(cls, url: str):
site = cls.get_site_by_url(url)
return site.url_to_id(url) if site else None

51
catalog/common/utils.py Normal file
View file

@ -0,0 +1,51 @@
from pathlib import Path
# import hashlib
import json
from io import StringIO
import logging
import re
from django.utils import timezone
import uuid
logger = logging.getLogger(__name__)
DEFAULT_ITEM_COVER = 'item/default.svg'
def item_cover_path(page, filename):
fn = timezone.now().strftime('%Y/%m/%d/') + str(uuid.uuid4()) + '.' + filename.split('.')[-1]
return 'items/' + page.id_type + '/' + fn
TestDataDir = str(Path(__file__).parent.parent.parent.absolute()) + '/test_data/'
class MockResponse:
def get_mock_file(self, url):
fn = TestDataDir + re.sub(r'[^\w]', '_', url)
return re.sub(r'_key_[A-Za-z0-9]+', '_key_19890604', fn)
def __init__(self, url):
self.url = url
fn = self.get_mock_file(url)
try:
self.content = Path(fn).read_bytes()
self.status_code = 200
logger.debug(f"use local response for {url} from {fn}")
except Exception:
self.content = b'Error: response file not found'
self.status_code = 404
logger.debug(f"local response not found for {url} at {fn}")
@property
def text(self):
return self.content.decode('utf-8')
def json(self):
return json.load(StringIO(self.text))
@property
def headers(self):
return {'Content-Type': 'image/jpeg' if self.url.endswith('jpg') else 'text/html'}

11
catalog/game/models.py Normal file
View file

@ -0,0 +1,11 @@
from catalog.common import *
class Game(Item):
igdb = LookupIdDescriptor(IdType.IGDB)
steam = LookupIdDescriptor(IdType.Steam)
douban_game = LookupIdDescriptor(IdType.DoubanGame)
platforms = jsondata.ArrayField(default=list)
class Meta:
proxy = True

View file

@ -0,0 +1,22 @@
from django.core.management.base import BaseCommand
import pprint
from catalog.common import SiteList
from catalog.sites import *
class Command(BaseCommand):
help = 'Scrape a catalog item from external page (but not save it)'
def add_arguments(self, parser):
parser.add_argument('url', type=str, help='URL to scrape')
def handle(self, *args, **options):
url = str(options['url'])
site = SiteList.get_site_by_url(url)
if site is None:
self.stdout.write(self.style.ERROR(f'Unknown site for {url}'))
return
self.stdout.write(f'Fetching from {site}')
page = site.get_page_ready(auto_link=False, auto_save=False)
self.stdout.write(self.style.SUCCESS(f'Done.'))
pprint.pp(page.metadata)

25
catalog/models.py Normal file
View file

@ -0,0 +1,25 @@
from .book.models import Edition, Work, Series
from .movie.models import Movie
from .tv.models import TVShow, TVSeason, TVEpisode
from .music.models import Album
from .game.models import Game
from .podcast.models import Podcast
from .performance.models import Performance
# class Exhibition(Item):
# class Meta:
# proxy = True
# class Fanfic(Item):
# class Meta:
# proxy = True
# class Boardgame(Item):
# class Meta:
# proxy = True

8
catalog/movie/models.py Normal file
View file

@ -0,0 +1,8 @@
from catalog.common import *
class Movie(Item):
imdb = PrimaryLookupIdDescriptor(IdType.IMDB)
tmdb_movie = PrimaryLookupIdDescriptor(IdType.TMDB_Movie)
douban_movie = PrimaryLookupIdDescriptor(IdType.DoubanMovie)
duration = jsondata.IntegerField(blank=True, default=None)

90
catalog/movie/tests.py Normal file
View file

@ -0,0 +1,90 @@
from django.test import TestCase
from catalog.common import *
class DoubanMovieTestCase(TestCase):
def test_parse(self):
t_id = '3541415'
t_url = 'https://movie.douban.com/subject/3541415/'
p1 = SiteList.get_site_by_id_type(IdType.DoubanMovie)
self.assertIsNotNone(p1)
self.assertEqual(p1.validate_url(t_url), True)
p2 = SiteList.get_site_by_url(t_url)
self.assertEqual(p1.id_to_url(t_id), t_url)
self.assertEqual(p2.url_to_id(t_url), t_id)
@use_local_response
def test_scrape(self):
t_url = 'https://movie.douban.com/subject/3541415/'
site = SiteList.get_site_by_url(t_url)
self.assertEqual(site.ready, False)
self.assertEqual(site.id_value, '3541415')
site.get_page_ready()
self.assertEqual(site.page.metadata['title'], '盗梦空间')
self.assertEqual(site.page.item.primary_lookup_id_type, IdType.IMDB)
self.assertEqual(site.page.item.__class__.__name__, 'Movie')
self.assertEqual(site.page.item.imdb, 'tt1375666')
class TMDBMovieTestCase(TestCase):
def test_parse(self):
t_id = '293767'
t_url = 'https://www.themoviedb.org/movie/293767-billy-lynn-s-long-halftime-walk'
t_url2 = 'https://www.themoviedb.org/movie/293767'
p1 = SiteList.get_site_by_id_type(IdType.TMDB_Movie)
self.assertIsNotNone(p1)
self.assertEqual(p1.validate_url(t_url), True)
self.assertEqual(p1.validate_url(t_url2), True)
p2 = SiteList.get_site_by_url(t_url)
self.assertEqual(p1.id_to_url(t_id), t_url2)
self.assertEqual(p2.url_to_id(t_url), t_id)
@use_local_response
def test_scrape(self):
t_url = 'https://www.themoviedb.org/movie/293767'
site = SiteList.get_site_by_url(t_url)
self.assertEqual(site.ready, False)
self.assertEqual(site.id_value, '293767')
site.get_page_ready()
self.assertEqual(site.page.metadata['title'], '比利·林恩的中场战事')
self.assertEqual(site.page.item.primary_lookup_id_type, IdType.IMDB)
self.assertEqual(site.page.item.__class__.__name__, 'Movie')
self.assertEqual(site.page.item.imdb, 'tt2513074')
class IMDBMovieTestCase(TestCase):
def test_parse(self):
t_id = 'tt1375666'
t_url = 'https://www.imdb.com/title/tt1375666/'
t_url2 = 'https://www.imdb.com/title/tt1375666/'
p1 = SiteList.get_site_by_id_type(IdType.IMDB)
self.assertIsNotNone(p1)
self.assertEqual(p1.validate_url(t_url), True)
self.assertEqual(p1.validate_url(t_url2), True)
p2 = SiteList.get_site_by_url(t_url)
self.assertEqual(p1.id_to_url(t_id), t_url2)
self.assertEqual(p2.url_to_id(t_url), t_id)
@use_local_response
def test_scrape(self):
t_url = 'https://www.imdb.com/title/tt1375666/'
site = SiteList.get_site_by_url(t_url)
self.assertEqual(site.ready, False)
self.assertEqual(site.id_value, 'tt1375666')
site.get_page_ready()
self.assertEqual(site.page.metadata['title'], '盗梦空间')
self.assertEqual(site.page.item.primary_lookup_id_type, IdType.IMDB)
self.assertEqual(site.page.item.imdb, 'tt1375666')
class MultiMovieSitesTestCase(TestCase):
@use_local_response
def test_movies(self):
url1 = 'https://www.themoviedb.org/movie/27205-inception'
url2 = 'https://movie.douban.com/subject/3541415/'
url3 = 'https://www.imdb.com/title/tt1375666/'
p1 = SiteList.get_site_by_url(url1).get_page_ready()
p2 = SiteList.get_site_by_url(url2).get_page_ready()
p3 = SiteList.get_site_by_url(url3).get_page_ready()
self.assertEqual(p1.item.id, p2.item.id)
self.assertEqual(p2.item.id, p3.item.id)

10
catalog/music/models.py Normal file
View file

@ -0,0 +1,10 @@
from catalog.common import *
class Album(Item):
upc = LookupIdDescriptor(IdType.UPC)
douban_music = LookupIdDescriptor(IdType.DoubanMusic)
spotify_album = LookupIdDescriptor(IdType.Spotify_Album)
class Meta:
proxy = True

View file

@ -0,0 +1,13 @@
from catalog.common import *
from django.utils.translation import gettext_lazy as _
class Performance(Item):
douban_drama = LookupIdDescriptor(IdType.DoubanDrama)
versions = jsondata.ArrayField(_('版本'), null=False, blank=False, default=list)
directors = jsondata.ArrayField(_('导演'), null=False, blank=False, default=list)
playwrights = jsondata.ArrayField(_('编剧'), null=False, blank=False, default=list)
actors = jsondata.ArrayField(_('主演'), null=False, blank=False, default=list)
class Meta:
proxy = True

View file

@ -0,0 +1,37 @@
from django.test import TestCase
from catalog.common import *
class DoubanDramaTestCase(TestCase):
def setUp(self):
pass
def test_parse(self):
t_id = '24849279'
t_url = 'https://www.douban.com/location/drama/24849279/'
p1 = SiteList.get_site_by_id_type(IdType.DoubanDrama)
self.assertIsNotNone(p1)
p1 = SiteList.get_site_by_url(t_url)
self.assertIsNotNone(p1)
self.assertEqual(p1.validate_url(t_url), True)
self.assertEqual(p1.id_to_url(t_id), t_url)
self.assertEqual(p1.url_to_id(t_url), t_id)
@use_local_response
def test_scrape(self):
t_url = 'https://www.douban.com/location/drama/24849279/'
site = SiteList.get_site_by_url(t_url)
self.assertEqual(site.ready, False)
page = site.get_page_ready()
self.assertEqual(site.ready, True)
self.assertEqual(page.metadata['title'], '红花侠')
item = site.get_item()
self.assertEqual(item.title, '红花侠')
# self.assertEqual(i.other_titles, ['スカーレットピンパーネル', 'THE SCARLET PIMPERNEL'])
# self.assertEqual(len(i.brief), 545)
# self.assertEqual(i.genres, ['音乐剧'])
# self.assertEqual(i.versions, ['08星组公演版', '10年月組公演版', '17年星組公演版', 'ュージカル2017年版'])
# self.assertEqual(i.directors, ['小池修一郎', '小池 修一郎', '石丸さち子'])
# self.assertEqual(i.playwrights, ['小池修一郎', 'Baroness Orczy原作', '小池 修一郎'])
# self.assertEqual(i.actors, ['安蘭けい', '柚希礼音', '遠野あすか', '霧矢大夢', '龍真咲'])

13
catalog/podcast/models.py Normal file
View file

@ -0,0 +1,13 @@
from catalog.common import *
class Podcast(Item):
feed_url = PrimaryLookupIdDescriptor(IdType.Feed)
apple_podcast = PrimaryLookupIdDescriptor(IdType.ApplePodcast)
# ximalaya = LookupIdDescriptor(IdType.Ximalaya)
# xiaoyuzhou = LookupIdDescriptor(IdType.Xiaoyuzhou)
hosts = jsondata.ArrayField(default=list)
# class PodcastEpisode(Item):
# pass

30
catalog/podcast/tests.py Normal file
View file

@ -0,0 +1,30 @@
from django.test import TestCase
from catalog.podcast.models import *
from catalog.common import *
class ApplePodcastTestCase(TestCase):
def setUp(self):
pass
def test_parse(self):
t_id = '657765158'
t_url = 'https://podcasts.apple.com/us/podcast/%E5%A4%A7%E5%86%85%E5%AF%86%E8%B0%88/id657765158'
t_url2 = 'https://podcasts.apple.com/us/podcast/id657765158'
p1 = SiteList.get_site_by_id_type(IdType.ApplePodcast)
self.assertIsNotNone(p1)
self.assertEqual(p1.validate_url(t_url), True)
p2 = SiteList.get_site_by_url(t_url)
self.assertEqual(p1.id_to_url(t_id), t_url2)
self.assertEqual(p2.url_to_id(t_url), t_id)
@use_local_response
def test_scrape(self):
t_url = 'https://podcasts.apple.com/gb/podcast/the-new-yorker-radio-hour/id1050430296'
site = SiteList.get_site_by_url(t_url)
self.assertEqual(site.ready, False)
self.assertEqual(site.id_value, '1050430296')
site.get_page_ready()
self.assertEqual(site.page.metadata['title'], 'The New Yorker Radio Hour')
# self.assertEqual(site.page.metadata['feed_url'], 'http://feeds.wnyc.org/newyorkerradiohour')
self.assertEqual(site.page.metadata['feed_url'], 'http://feeds.feedburner.com/newyorkerradiohour')

View file

@ -0,0 +1,8 @@
from ..common.sites import SiteList
from .apple_podcast import ApplePodcast
from .douban_book import DoubanBook
from .douban_movie import DoubanMovie
from .douban_drama import DoubanDrama
from .goodreads import Goodreads
from .tmdb import TMDB_Movie
from .imdb import IMDB

View file

@ -0,0 +1,40 @@
from catalog.common import *
from catalog.podcast.models import *
import logging
logger = logging.getLogger(__name__)
@SiteList.register
class ApplePodcast(AbstractSite):
ID_TYPE = IdType.ApplePodcast
URL_PATTERNS = [r"https://[^.]+.apple.com/\w+/podcast/*[^/?]*/id(\d+)"]
WIKI_PROPERTY_ID = 'P5842'
DEFAULT_MODEL = Podcast
@classmethod
def id_to_url(self, id_value):
return "https://podcasts.apple.com/us/podcast/id" + id_value
def scrape(self):
api_url = f'https://itunes.apple.com/lookup?id={self.id_value}'
dl = BasicDownloader(api_url)
resp = dl.download()
r = resp.json()['results'][0]
pd = PageData(metadata={
'title': r['trackName'],
'feed_url': r['feedUrl'],
'hosts': [r['artistName']],
'genres': r['genres'],
'cover_image_url': r['artworkUrl600'],
})
pd.lookup_ids[IdType.Feed] = pd.metadata.get('feed_url')
if pd.metadata["cover_image_url"]:
imgdl = BasicImageDownloader(pd.metadata["cover_image_url"], self.url)
try:
pd.cover_image = imgdl.download().content
pd.cover_image_extention = imgdl.extention
except Exception:
logger.debug(f'failed to download cover for {self.url} from {pd.metadata["cover_image_url"]}')
return pd

28
catalog/sites/douban.py Normal file
View file

@ -0,0 +1,28 @@
import re
from catalog.common import *
RE_NUMBERS = re.compile(r"\d+\d*")
RE_WHITESPACES = re.compile(r"\s+")
class DoubanDownloader(ProxiedDownloader):
def validate_response(self, response):
if response is None:
return RESPONSE_NETWORK_ERROR
elif response.status_code == 204:
return RESPONSE_CENSORSHIP
elif response.status_code == 200:
content = response.content.decode('utf-8')
if content.find('关于豆瓣') == -1:
# if content.find('你的 IP 发出') == -1:
# error = error + 'Content not authentic' # response is garbage
# else:
# error = error + 'IP banned'
return RESPONSE_NETWORK_ERROR
elif content.find('<title>页面不存在</title>') != -1 or content.find('呃... 你想访问的条目豆瓣不收录。') != -1: # re.search('不存在[^<]+</title>', content, re.MULTILINE):
return RESPONSE_CENSORSHIP
else:
return RESPONSE_OK
else:
return RESPONSE_INVALID_CONTENT

View file

@ -0,0 +1,131 @@
from lxml import html
from catalog.common import *
from .douban import *
from catalog.book.models import *
from catalog.book.utils import *
import logging
logger = logging.getLogger(__name__)
@SiteList.register
class DoubanBook(AbstractSite, ScraperMixin):
ID_TYPE = IdType.DoubanBook
URL_PATTERNS = [r"\w+://book\.douban\.com/subject/(\d+)/{0,1}", r"\w+://m.douban.com/book/subject/(\d+)/{0,1}"]
WIKI_PROPERTY_ID = '?'
DEFAULT_MODEL = Edition
@classmethod
def id_to_url(self, id_value):
return "https://book.douban.com/subject/" + id_value + "/"
def scrape(self):
self.data = {}
self.html = html.fromstring(DoubanDownloader(self.url).download().text.strip())
self.parse_field('title', "/html/body//h1/span/text()")
self.parse_field('isbn', "//div[@id='info']//span[text()='ISBN:']/following::text()")
# TODO does douban store ASIN as ISBN, need more cleanup if so
if not self.data['title']:
if self.data['isbn']:
self.data['title'] = 'isbn: ' + isbn
else:
raise ParseError(self, 'title')
self.parse_field('cover_image_url', "//*[@id='mainpic']/a/img/@src")
self.parse_field('brief', "//h2/span[text()='内容简介']/../following-sibling::div[1]//div[@class='intro'][not(ancestor::span[@class='short'])]/p/text()")
self.parse_field('series', "//div[@id='info']//span[text()='丛书:']/following-sibling::a[1]/text()")
self.parse_field('producer', "//div[@id='info']//span[text()='出品方:']/following-sibling::a[1]/text()")
self.parse_field('cubn', "//div[@id='info']//span[text()='统一书号:']/following::text()")
self.parse_field('subtitle', "//div[@id='info']//span[text()='副标题:']/following::text()")
self.parse_field('orig_title', "//div[@id='info']//span[text()='原作名:']/following::text()")
self.parse_field('language', "//div[@id='info']//span[text()='语言:']/following::text()")
self.parse_field('pub_house', "//div[@id='info']//span[text()='出版社:']/following::text()")
self.parse_field('pub_date', "//div[@id='info']//span[text()='出版年:']/following::text()")
year_month_day = RE_NUMBERS.findall(self.data['pub_date']) if self.data['pub_date'] else []
if len(year_month_day) in (2, 3):
pub_year = int(year_month_day[0])
pub_month = int(year_month_day[1])
elif len(year_month_day) == 1:
pub_year = int(year_month_day[0])
pub_month = None
else:
pub_year = None
pub_month = None
if pub_year and pub_month and pub_year < pub_month:
pub_year, pub_month = pub_month, pub_year
pub_year = None if pub_year is not None and pub_year not in range(
0, 3000) else pub_year
pub_month = None if pub_month is not None and pub_month not in range(
1, 12) else pub_month
self.parse_field('binding', "//div[@id='info']//span[text()='装帧:']/following::text()")
self.parse_field('price', "//div[@id='info']//span[text()='定价:']/following::text()")
self.parse_field('pages', "//div[@id='info']//span[text()='页数:']/following::text()")
if self.data['pages'] is not None:
self.data['pages'] = int(RE_NUMBERS.findall(self.data['pages'])[0]) if RE_NUMBERS.findall(self.data['pages']) else None
if self.data['pages'] and (self.data['pages'] > 999999 or self.data['pages'] < 1):
self.data['pages'] = None
contents = None
try:
contents_elem = self.html.xpath(
"//h2/span[text()='目录']/../following-sibling::div[1]")[0]
# if next the id of next sibling contains `dir`, that would be the full contents
if "dir" in contents_elem.getnext().xpath("@id")[0]:
contents_elem = contents_elem.getnext()
contents = '\n'.join(p.strip() for p in contents_elem.xpath("text()")[:-2]) if len(contents_elem) else None
else:
contents = '\n'.join(p.strip() for p in contents_elem.xpath("text()")) if len(contents_elem) else None
except Exception:
pass
self.data['contents'] = contents
# there are two html formats for authors and translators
authors_elem = self.html.xpath("""//div[@id='info']//span[text()='作者:']/following-sibling::br[1]/
preceding-sibling::a[preceding-sibling::span[text()='作者:']]/text()""")
if not authors_elem:
authors_elem = self.html.xpath(
"""//div[@id='info']//span[text()=' 作者']/following-sibling::a/text()""")
if authors_elem:
authors = []
for author in authors_elem:
authors.append(RE_WHITESPACES.sub(' ', author.strip())[:200])
else:
authors = None
self.data['authors'] = authors
translators_elem = self.html.xpath("""//div[@id='info']//span[text()='译者:']/following-sibling::br[1]/
preceding-sibling::a[preceding-sibling::span[text()='译者:']]/text()""")
if not translators_elem:
translators_elem = self.html.xpath(
"""//div[@id='info']//span[text()=' 译者']/following-sibling::a/text()""")
if translators_elem:
translators = []
for translator in translators_elem:
translators.append(RE_WHITESPACES.sub(' ', translator.strip()))
else:
translators = None
self.data['translators'] = translators
self.data['work'] = {}
work_link = self.parse_str('//h2/span[text()="这本书的其他版本"]/following-sibling::span[@class="pl"]/a/@href')
if work_link:
# TODO move logic to a differnet class
r = re.match(r'\w+://book.douban.com/works/(\d+)', work_link)
self.data['work']['lookup_id_type'] = IdType.DoubanBook_Work
self.data['work']['lookup_id_value'] = r[1] if r else None
self.data['work']['title'] = self.data['title']
self.data['work']['url'] = work_link
pd = PageData(metadata=self.data)
pd.lookup_ids[IdType.ISBN] = self.data.get('isbn')
pd.lookup_ids[IdType.CUBN] = self.data.get('cubn')
if self.data["cover_image_url"]:
imgdl = BasicImageDownloader(self.data["cover_image_url"], self.url)
try:
pd.cover_image = imgdl.download().content
pd.cover_image_extention = imgdl.extention
except Exception:
logger.debug(f'failed to download cover for {self.url} from {self.data["cover_image_url"]}')
return pd

View file

@ -0,0 +1,59 @@
from lxml import html
from catalog.common import *
from ..performance.models import Performance
from .douban import DoubanDownloader
import logging
logger = logging.getLogger(__name__)
@SiteList.register
class DoubanDrama(AbstractSite):
ID_TYPE = IdType.DoubanDrama
URL_PATTERNS = [r"\w+://www.douban.com/location/drama/(\d+)/"]
WIKI_PROPERTY_ID = 'P6443'
DEFAULT_MODEL = Performance
@classmethod
def id_to_url(self, id_value):
return "https://www.douban.com/location/drama/" + id_value + "/"
def scrape(self):
h = html.fromstring(DoubanDownloader(self.url).download().text)
data = {}
title_elem = h.xpath("/html/body//h1/span/text()")
if title_elem:
data["title"] = title_elem[0].strip()
else:
raise ParseError(self, "title")
data['other_titles'] = [s.strip() for s in title_elem[1:]]
other_title_elem = h.xpath("//dl//dt[text()='又名:']/following::dd[@itemprop='name']/text()")
if len(other_title_elem) > 0:
data['other_titles'].append(other_title_elem[0].strip())
plot_elem = h.xpath("//div[@id='link-report']/text()")
if len(plot_elem) == 0:
plot_elem = h.xpath("//div[@class='abstract']/text()")
data['brief'] = '\n'.join(plot_elem) if len(plot_elem) > 0 else ''
data['genres'] = [s.strip() for s in h.xpath("//dl//dt[text()='类型:']/following-sibling::dd[@itemprop='genre']/text()")]
data['versions'] = [s.strip() for s in h.xpath("//dl//dt[text()='版本:']/following-sibling::dd[@class='titles']/a//text()")]
data['directors'] = [s.strip() for s in h.xpath("//div[@class='meta']/dl//dt[text()='导演:']/following-sibling::dd/a[@itemprop='director']//text()")]
data['playwrights'] = [s.strip() for s in h.xpath("//div[@class='meta']/dl//dt[text()='编剧:']/following-sibling::dd/a[@itemprop='author']//text()")]
data['actors'] = [s.strip() for s in h.xpath("//div[@class='meta']/dl//dt[text()='主演:']/following-sibling::dd/a[@itemprop='actor']//text()")]
img_url_elem = h.xpath("//img[@itemprop='image']/@src")
data['cover_image_url'] = img_url_elem[0].strip() if img_url_elem else None
pd = PageData(metadata=data)
if pd.metadata["cover_image_url"]:
imgdl = BasicImageDownloader(pd.metadata["cover_image_url"], self.url)
try:
pd.cover_image = imgdl.download().content
pd.cover_image_extention = imgdl.extention
except Exception:
logger.debug(f'failed to download cover for {self.url} from {pd.metadata["cover_image_url"]}')
return pd

View file

@ -0,0 +1,261 @@
from lxml import html
from catalog.common import *
from .douban import *
from catalog.movie.models import *
from catalog.tv.models import *
import logging
from django.db import models
from django.utils.translation import gettext_lazy as _
logger = logging.getLogger(__name__)
class MovieGenreEnum(models.TextChoices):
DRAMA = 'Drama', _('剧情')
KIDS = 'Kids', _('儿童')
COMEDY = 'Comedy', _('喜剧')
BIOGRAPHY = 'Biography', _('传记')
ACTION = 'Action', _('动作')
HISTORY = 'History', _('历史')
ROMANCE = 'Romance', _('爱情')
WAR = 'War', _('战争')
SCI_FI = 'Sci-Fi', _('科幻')
CRIME = 'Crime', _('犯罪')
ANIMATION = 'Animation', _('动画')
WESTERN = 'Western', _('西部')
MYSTERY = 'Mystery', _('悬疑')
FANTASY = 'Fantasy', _('奇幻')
THRILLER = 'Thriller', _('惊悚')
ADVENTURE = 'Adventure', _('冒险')
HORROR = 'Horror', _('恐怖')
DISASTER = 'Disaster', _('灾难')
DOCUMENTARY = 'Documentary', _('纪录片')
MARTIAL_ARTS = 'Martial-Arts', _('武侠')
SHORT = 'Short', _('短片')
ANCIENT_COSTUM = 'Ancient-Costum', _('古装')
EROTICA = 'Erotica', _('情色')
SPORT = 'Sport', _('运动')
GAY_LESBIAN = 'Gay/Lesbian', _('同性')
OPERA = 'Opera', _('戏曲')
MUSIC = 'Music', _('音乐')
FILM_NOIR = 'Film-Noir', _('黑色电影')
MUSICAL = 'Musical', _('歌舞')
REALITY_TV = 'Reality-TV', _('真人秀')
FAMILY = 'Family', _('家庭')
TALK_SHOW = 'Talk-Show', _('脱口秀')
NEWS = 'News', _('新闻')
SOAP = 'Soap', _('肥皂剧')
TV_MOVIE = 'TV Movie', _('电视电影')
THEATRE = 'Theatre', _('舞台艺术')
OTHER = 'Other', _('其他')
# MovieGenreTranslator = ChoicesDictGenerator(MovieGenreEnum)
@SiteList.register
class DoubanMovie(AbstractSite):
ID_TYPE = IdType.DoubanMovie
URL_PATTERNS = [r"\w+://movie\.douban\.com/subject/(\d+)/{0,1}", r"\w+://m.douban.com/movie/subject/(\d+)/{0,1}"]
WIKI_PROPERTY_ID = '?'
# no DEFAULT_MODEL as it may be either TV Season and Movie
@classmethod
def id_to_url(self, id_value):
return "https://movie.douban.com/subject/" + id_value + "/"
def scrape(self):
content = html.fromstring(DoubanDownloader(self.url).download().text.strip())
try:
raw_title = content.xpath(
"//span[@property='v:itemreviewed']/text()")[0].strip()
except IndexError:
raise ParseError(self, 'title')
orig_title = content.xpath(
"//img[@rel='v:image']/@alt")[0].strip()
title = raw_title.split(orig_title)[0].strip()
# if has no chinese title
if title == '':
title = orig_title
if title == orig_title:
orig_title = None
# there are two html formats for authors and translators
other_title_elem = content.xpath(
"//div[@id='info']//span[text()='又名:']/following-sibling::text()[1]")
other_title = other_title_elem[0].strip().split(
' / ') if other_title_elem else None
imdb_elem = content.xpath(
"//div[@id='info']//span[text()='IMDb链接:']/following-sibling::a[1]/text()")
if not imdb_elem:
imdb_elem = content.xpath(
"//div[@id='info']//span[text()='IMDb:']/following-sibling::text()[1]")
imdb_code = imdb_elem[0].strip() if imdb_elem else None
director_elem = content.xpath(
"//div[@id='info']//span[text()='导演']/following-sibling::span[1]/a/text()")
director = director_elem if director_elem else None
playwright_elem = content.xpath(
"//div[@id='info']//span[text()='编剧']/following-sibling::span[1]/a/text()")
playwright = list(map(lambda a: a[:200], playwright_elem)) if playwright_elem else None
actor_elem = content.xpath(
"//div[@id='info']//span[text()='主演']/following-sibling::span[1]/a/text()")
actor = list(map(lambda a: a[:200], actor_elem)) if actor_elem else None
# construct genre translator
genre_translator = {}
attrs = [attr for attr in dir(MovieGenreEnum) if '__' not in attr]
for attr in attrs:
genre_translator[getattr(MovieGenreEnum, attr).label] = getattr(
MovieGenreEnum, attr).value
genre_elem = content.xpath("//span[@property='v:genre']/text()")
if genre_elem:
genre = []
for g in genre_elem:
g = g.split(' ')[0]
if g == '紀錄片': # likely some original data on douban was corrupted
g = '纪录片'
elif g == '鬼怪':
g = '惊悚'
if g in genre_translator:
genre.append(genre_translator[g])
elif g in genre_translator.values():
genre.append(g)
else:
logger.error(f'unable to map genre {g}')
else:
genre = None
showtime_elem = content.xpath(
"//span[@property='v:initialReleaseDate']/text()")
if showtime_elem:
showtime = []
for st in showtime_elem:
parts = st.split('(')
if len(parts) == 1:
time = st.split('(')[0]
region = ''
else:
time = st.split('(')[0]
region = st.split('(')[1][0:-1]
showtime.append({time: region})
else:
showtime = None
site_elem = content.xpath(
"//div[@id='info']//span[text()='官方网站:']/following-sibling::a[1]/@href")
site = site_elem[0].strip()[:200] if site_elem else None
if site and not re.match(r'http.+', site):
site = None
area_elem = content.xpath(
"//div[@id='info']//span[text()='制片国家/地区:']/following-sibling::text()[1]")
if area_elem:
area = [a.strip()[:100] for a in area_elem[0].split('/')]
else:
area = None
language_elem = content.xpath(
"//div[@id='info']//span[text()='语言:']/following-sibling::text()[1]")
if language_elem:
language = [a.strip() for a in language_elem[0].split(' / ')]
else:
language = None
year_elem = content.xpath("//span[@class='year']/text()")
year = int(re.search(r'\d+', year_elem[0])[0]) if year_elem and re.search(r'\d+', year_elem[0]) else None
duration_elem = content.xpath("//span[@property='v:runtime']/text()")
other_duration_elem = content.xpath(
"//span[@property='v:runtime']/following-sibling::text()[1]")
if duration_elem:
duration = duration_elem[0].strip()
if other_duration_elem:
duration += other_duration_elem[0].rstrip()
duration = duration.split('/')[0].strip()
else:
duration = None
season_elem = content.xpath(
"//*[@id='season']/option[@selected='selected']/text()")
if not season_elem:
season_elem = content.xpath(
"//div[@id='info']//span[text()='季数:']/following-sibling::text()[1]")
season = int(season_elem[0].strip()) if season_elem else None
else:
season = int(season_elem[0].strip())
episodes_elem = content.xpath(
"//div[@id='info']//span[text()='集数:']/following-sibling::text()[1]")
episodes = int(episodes_elem[0].strip()) if episodes_elem and episodes_elem[0].strip().isdigit() else None
single_episode_length_elem = content.xpath(
"//div[@id='info']//span[text()='单集片长:']/following-sibling::text()[1]")
single_episode_length = single_episode_length_elem[0].strip(
)[:100] if single_episode_length_elem else None
# if has field `episodes` not none then must be series
is_series = True if episodes else False
brief_elem = content.xpath("//span[@class='all hidden']")
if not brief_elem:
brief_elem = content.xpath("//span[@property='v:summary']")
brief = '\n'.join([e.strip() for e in brief_elem[0].xpath(
'./text()')]) if brief_elem else None
img_url_elem = content.xpath("//img[@rel='v:image']/@src")
img_url = img_url_elem[0].strip() if img_url_elem else None
pd = PageData(metadata={
'title': title,
'orig_title': orig_title,
'other_title': other_title,
'imdb_code': imdb_code,
'director': director,
'playwright': playwright,
'actor': actor,
'genre': genre,
'showtime': showtime,
'site': site,
'area': area,
'language': language,
'year': year,
'duration': duration,
'season_number': season,
'episodes': episodes,
'single_episode_length': single_episode_length,
'brief': brief,
'is_series': is_series,
'cover_image_url': img_url,
})
pd.metadata['preferred_model'] = ('TVSeason' if season else 'TVShow') if is_series else 'Movie'
# tmdb_api_url = f"https://api.themoviedb.org/3/find/{self.imdb_code}?api_key={settings.TMDB_API3_KEY}&language=zh-CN&external_source=imdb_id"
# res_data = BasicDownloader(tmdb_api_url).download().json()
# if 'movie_results' in res_data and len(res_data['movie_results']) > 0:
# pd.metadata['preferred_model'] = 'Movie'
# elif 'tv_results' in res_data and len(res_data['tv_results']) > 0:
# pd.metadata['preferred_model'] = 'TVShow'
# elif 'tv_season_results' in res_data and len(res_data['tv_season_results']) > 0:
# pd.metadata['preferred_model'] = 'TVSeason'
# elif 'tv_episode_results' in res_data and len(res_data['tv_episode_results']) > 0:
# pd.metadata['preferred_model'] = 'TVSeason'
if imdb_code:
pd.lookup_ids[IdType.IMDB] = imdb_code
if pd.metadata["cover_image_url"]:
imgdl = BasicImageDownloader(pd.metadata["cover_image_url"], self.url)
try:
pd.cover_image = imgdl.download().content
pd.cover_image_extention = imgdl.extention
except Exception:
logger.debug(f'failed to download cover for {self.url} from {pd.metadata["cover_image_url"]}')
return pd

View file

@ -0,0 +1,85 @@
import re
from catalog.book.models import Edition
from catalog.common import *
from lxml import html
import json
import logging
logger = logging.getLogger(__name__)
class GoodreadsDownloader(RetryDownloader):
def validate_response(self, response):
if response is None:
return RESPONSE_NETWORK_ERROR
elif response.status_code == 200:
if response.text.find('__NEXT_DATA__') != -1:
return RESPONSE_OK
else: # retry if return legacy version
return RESPONSE_NETWORK_ERROR
else:
return RESPONSE_INVALID_CONTENT
@SiteList.register
class Goodreads(AbstractSite):
ID_TYPE = IdType.Goodreads
WIKI_PROPERTY_ID = 'P2968'
DEFAULT_MODEL = Edition
URL_PATTERNS = [r".+goodreads.com/.*book/show/(\d+)", r".+goodreads.com/.*book/(\d+)"]
@classmethod
def id_to_url(self, id_value):
return "https://www.goodreads.com/book/show/" + id_value
def scrape(self, response=None):
data = {}
if response is not None:
content = response.text
else:
dl = GoodreadsDownloader(self.url)
content = dl.download().text
h = html.fromstring(content.strip())
# Next.JS version of GoodReads
# JSON.parse(document.getElementById('__NEXT_DATA__').innerHTML)['props']['pageProps']['apolloState']
elem = h.xpath('//script[@id="__NEXT_DATA__"]/text()')
src = elem[0].strip() if elem else None
if not src:
raise ParseError(self, '__NEXT_DATA__ element')
d = json.loads(src)['props']['pageProps']['apolloState']
o = {'Book': [], 'Work': [], 'Series': [], 'Contributor': []}
for v in d.values():
t = v.get('__typename')
if t and t in o:
o[t].append(v)
b = next(filter(lambda x: x.get('title'), o['Book']), None)
if not b:
raise ParseError(self, 'Book json')
data['title'] = b['title']
data['brief'] = b['description']
data['isbn'] = b['details'].get('isbn13')
asin = b['details'].get('asin')
if asin and asin != data['isbn']:
data['asin'] = asin
data['pages'] = b['details'].get('numPages')
data['cover_image_url'] = b['imageUrl']
data['work'] = {}
w = next(filter(lambda x: x.get('details'), o['Work']), None)
if w:
data['work']['lookup_id_type'] = IdType.Goodreads_Work
data['work']['lookup_id_value'] = str(w['legacyId'])
data['work']['title'] = w['details']['originalTitle']
data['work']['url'] = w['details']['webUrl']
pd = PageData(metadata=data)
pd.lookup_ids[IdType.ISBN] = data.get('isbn')
pd.lookup_ids[IdType.ASIN] = data.get('asin')
if data["cover_image_url"]:
imgdl = BasicImageDownloader(data["cover_image_url"], self.url)
try:
pd.cover_image = imgdl.download().content
pd.cover_image_extention = imgdl.extention
except Exception:
logger.debug(f'failed to download cover for {self.url} from {data["cover_image_url"]}')
return pd

50
catalog/sites/imdb.py Normal file
View file

@ -0,0 +1,50 @@
from django.conf import settings
from catalog.common import *
from .douban import *
from catalog.movie.models import *
from catalog.tv.models import *
import logging
logger = logging.getLogger(__name__)
@SiteList.register
class IMDB(AbstractSite):
ID_TYPE = IdType.IMDB
URL_PATTERNS = [r'\w+://www.imdb.com/title/(tt\d+)']
WIKI_PROPERTY_ID = '?'
@classmethod
def id_to_url(self, id_value):
return "https://www.imdb.com/title/" + id_value + "/"
def scrape(self):
self.scraped = False
api_url = f"https://api.themoviedb.org/3/find/{self.id_value}?api_key={settings.TMDB_API3_KEY}&language=zh-CN&external_source=imdb_id"
res_data = BasicDownloader(api_url).download().json()
if 'movie_results' in res_data and len(res_data['movie_results']) > 0:
url = f"https://www.themoviedb.org/movie/{res_data['movie_results'][0]['id']}"
elif 'tv_results' in res_data and len(res_data['tv_results']) > 0:
url = f"https://www.themoviedb.org/tv/{res_data['tv_results'][0]['id']}"
elif 'tv_season_results' in res_data and len(res_data['tv_season_results']) > 0:
# this should not happen given IMDB only has ids for either show or episode
tv_id = res_data['tv_season_results'][0]['show_id']
season_number = res_data['tv_season_results'][0]['season_number']
url = f"https://www.themoviedb.org/tv/{tv_id}/season/{season_number}/episode/{episode_number}"
elif 'tv_episode_results' in res_data and len(res_data['tv_episode_results']) > 0:
tv_id = res_data['tv_episode_results'][0]['show_id']
season_number = res_data['tv_episode_results'][0]['season_number']
episode_number = res_data['tv_episode_results'][0]['episode_number']
if season_number == 0:
url = f"https://www.themoviedb.org/tv/{tv_id}/season/{season_number}/episode/{episode_number}"
elif episode_number == 1:
url = f"https://www.themoviedb.org/tv/{tv_id}/season/{season_number}"
else:
raise ParseError(self, "IMDB id matching TMDB but not first episode, this is not supported")
else:
raise ParseError(self, "IMDB id not found in TMDB")
tmdb = SiteList.get_site_by_url(url)
pd = tmdb.scrape()
pd.metadata['preferred_model'] = tmdb.DEFAULT_MODEL.__name__
return pd

307
catalog/sites/tmdb.py Normal file
View file

@ -0,0 +1,307 @@
"""
The Movie Database
"""
import re
from django.conf import settings
from catalog.common import *
from .douban import *
from catalog.movie.models import *
from catalog.tv.models import *
import logging
logger = logging.getLogger(__name__)
def _copy_dict(s, key_map):
d = {}
for src, dst in key_map.items():
d[dst if dst else src] = s.get(src)
return d
genre_map = {
'Sci-Fi & Fantasy': 'Sci-Fi',
'War & Politics': 'War',
'儿童': 'Kids',
'冒险': 'Adventure',
'剧情': 'Drama',
'动作': 'Action',
'动作冒险': 'Action',
'动画': 'Animation',
'历史': 'History',
'喜剧': 'Comedy',
'奇幻': 'Fantasy',
'家庭': 'Family',
'恐怖': 'Horror',
'悬疑': 'Mystery',
'惊悚': 'Thriller',
'战争': 'War',
'新闻': 'News',
'爱情': 'Romance',
'犯罪': 'Crime',
'电视电影': 'TV Movie',
'真人秀': 'Reality-TV',
'科幻': 'Sci-Fi',
'纪录': 'Documentary',
'肥皂剧': 'Soap',
'脱口秀': 'Talk-Show',
'西部': 'Western',
'音乐': 'Music',
}
@SiteList.register
class TMDB_Movie(AbstractSite):
ID_TYPE = IdType.TMDB_Movie
URL_PATTERNS = [r'\w+://www.themoviedb.org/movie/(\d+)']
WIKI_PROPERTY_ID = '?'
DEFAULT_MODEL = Movie
@classmethod
def id_to_url(self, id_value):
return "https://www.themoviedb.org/movie/" + id_value
def scrape(self):
is_series = False
if is_series:
api_url = f"https://api.themoviedb.org/3/tv/{self.id_value}?api_key={settings.TMDB_API3_KEY}&language=zh-CN&append_to_response=external_ids,credits"
else:
api_url = f"https://api.themoviedb.org/3/movie/{self.id_value}?api_key={settings.TMDB_API3_KEY}&language=zh-CN&append_to_response=external_ids,credits"
res_data = BasicDownloader(api_url).download().json()
if is_series:
title = res_data['name']
orig_title = res_data['original_name']
year = int(res_data['first_air_date'].split(
'-')[0]) if res_data['first_air_date'] else None
imdb_code = res_data['external_ids']['imdb_id']
showtime = [{res_data['first_air_date']: "首播日期"}
] if res_data['first_air_date'] else None
duration = None
else:
title = res_data['title']
orig_title = res_data['original_title']
year = int(res_data['release_date'].split('-')
[0]) if res_data['release_date'] else None
showtime = [{res_data['release_date']: "发布日期"}
] if res_data['release_date'] else None
imdb_code = res_data['imdb_id']
# in minutes
duration = res_data['runtime'] if res_data['runtime'] else None
genre = list(map(lambda x: genre_map[x['name']] if x['name']
in genre_map else 'Other', res_data['genres']))
language = list(map(lambda x: x['name'], res_data['spoken_languages']))
brief = res_data['overview']
if is_series:
director = list(map(lambda x: x['name'], res_data['created_by']))
else:
director = list(map(lambda x: x['name'], filter(
lambda c: c['job'] == 'Director', res_data['credits']['crew'])))
playwright = list(map(lambda x: x['name'], filter(
lambda c: c['job'] == 'Screenplay', res_data['credits']['crew'])))
actor = list(map(lambda x: x['name'], res_data['credits']['cast']))
area = []
other_info = {}
# other_info['TMDB评分'] = res_data['vote_average']
# other_info['分级'] = res_data['contentRating']
# other_info['Metacritic评分'] = res_data['metacriticRating']
# other_info['奖项'] = res_data['awards']
# other_info['TMDB_ID'] = id
if is_series:
other_info['Seasons'] = res_data['number_of_seasons']
other_info['Episodes'] = res_data['number_of_episodes']
# TODO: use GET /configuration to get base url
img_url = ('https://image.tmdb.org/t/p/original/' + res_data['poster_path']) if res_data['poster_path'] is not None else None
pd = PageData(metadata={
'title': title,
'orig_title': orig_title,
'other_title': None,
'imdb_code': imdb_code,
'director': director,
'playwright': playwright,
'actor': actor,
'genre': genre,
'showtime': showtime,
'site': None,
'area': area,
'language': language,
'year': year,
'duration': duration,
'season': None,
'episodes': None,
'single_episode_length': None,
'brief': brief,
'cover_image_url': img_url,
})
if imdb_code:
pd.lookup_ids[IdType.IMDB] = imdb_code
if pd.metadata["cover_image_url"]:
imgdl = BasicImageDownloader(pd.metadata["cover_image_url"], self.url)
try:
pd.cover_image = imgdl.download().content
pd.cover_image_extention = imgdl.extention
except Exception:
logger.debug(f'failed to download cover for {self.url} from {pd.metadata["cover_image_url"]}')
return pd
@SiteList.register
class TMDB_TV(AbstractSite):
ID_TYPE = IdType.TMDB_TV
URL_PATTERNS = [r'\w+://www.themoviedb.org/tv/(\d+)[^/]*$', r'\w+://www.themoviedb.org/tv/(\d+)[^/]*/seasons']
WIKI_PROPERTY_ID = '?'
DEFAULT_MODEL = TVShow
@classmethod
def id_to_url(self, id_value):
return "https://www.themoviedb.org/tv/" + id_value
def scrape(self):
is_series = True
if is_series:
api_url = f"https://api.themoviedb.org/3/tv/{self.id_value}?api_key={settings.TMDB_API3_KEY}&language=zh-CN&append_to_response=external_ids,credits"
else:
api_url = f"https://api.themoviedb.org/3/movie/{self.id_value}?api_key={settings.TMDB_API3_KEY}&language=zh-CN&append_to_response=external_ids,credits"
res_data = BasicDownloader(api_url).download().json()
if is_series:
title = res_data['name']
orig_title = res_data['original_name']
year = int(res_data['first_air_date'].split(
'-')[0]) if res_data['first_air_date'] else None
imdb_code = res_data['external_ids']['imdb_id']
showtime = [{res_data['first_air_date']: "首播日期"}
] if res_data['first_air_date'] else None
duration = None
else:
title = res_data['title']
orig_title = res_data['original_title']
year = int(res_data['release_date'].split('-')
[0]) if res_data['release_date'] else None
showtime = [{res_data['release_date']: "发布日期"}
] if res_data['release_date'] else None
imdb_code = res_data['imdb_id']
# in minutes
duration = res_data['runtime'] if res_data['runtime'] else None
genre = list(map(lambda x: genre_map[x['name']] if x['name']
in genre_map else 'Other', res_data['genres']))
language = list(map(lambda x: x['name'], res_data['spoken_languages']))
brief = res_data['overview']
if is_series:
director = list(map(lambda x: x['name'], res_data['created_by']))
else:
director = list(map(lambda x: x['name'], filter(
lambda c: c['job'] == 'Director', res_data['credits']['crew'])))
playwright = list(map(lambda x: x['name'], filter(
lambda c: c['job'] == 'Screenplay', res_data['credits']['crew'])))
actor = list(map(lambda x: x['name'], res_data['credits']['cast']))
area = []
other_info = {}
# other_info['TMDB评分'] = res_data['vote_average']
# other_info['分级'] = res_data['contentRating']
# other_info['Metacritic评分'] = res_data['metacriticRating']
# other_info['奖项'] = res_data['awards']
# other_info['TMDB_ID'] = id
if is_series:
other_info['Seasons'] = res_data['number_of_seasons']
other_info['Episodes'] = res_data['number_of_episodes']
# TODO: use GET /configuration to get base url
img_url = ('https://image.tmdb.org/t/p/original/' + res_data['poster_path']) if res_data['poster_path'] is not None else None
pd = PageData(metadata={
'title': title,
'orig_title': orig_title,
'other_title': None,
'imdb_code': imdb_code,
'director': director,
'playwright': playwright,
'actor': actor,
'genre': genre,
'showtime': showtime,
'site': None,
'area': area,
'language': language,
'year': year,
'duration': duration,
'season': None,
'episodes': None,
'single_episode_length': None,
'brief': brief,
'cover_image_url': img_url,
})
if imdb_code:
pd.lookup_ids[IdType.IMDB] = imdb_code
if pd.metadata["cover_image_url"]:
imgdl = BasicImageDownloader(pd.metadata["cover_image_url"], self.url)
try:
pd.cover_image = imgdl.download().content
pd.cover_image_extention = imgdl.extention
except Exception:
logger.debug(f'failed to download cover for {self.url} from {pd.metadata["cover_image_url"]}')
return pd
@SiteList.register
class TMDB_TVSeason(AbstractSite):
ID_TYPE = IdType.TMDB_TVSeason
URL_PATTERNS = [r'\w+://www.themoviedb.org/tv/(\d+)[^/]*/season/(\d+)[^/]*$']
WIKI_PROPERTY_ID = '?'
DEFAULT_MODEL = TVSeason
ID_PATTERN = r'^(\d+)-(\d+)$'
@classmethod
def url_to_id(cls, url: str):
u = next(iter([re.match(p, url) for p in cls.URL_PATTERNS if re.match(p, url)]), None)
return u[1] + '-' + u[2] if u else None
@classmethod
def id_to_url(cls, id_value):
v = id_value.split('-')
return f"https://www.themoviedb.org/tv/{v[0]}/season/{v[1]}"
def scrape(self):
v = self.id_value.split('-')
api_url = f"https://api.themoviedb.org/3/tv/{v[0]}/season/{v[1]}?api_key={settings.TMDB_API3_KEY}&language=zh-CN&append_to_response=external_ids,credits"
d = BasicDownloader(api_url).download().json()
if not d.get('id'):
raise ParseError('id')
pd = PageData(metadata=_copy_dict(d, {'name': 'title', 'overview': 'brief', 'air_date': 'air_date', 'season_number': 0, 'external_ids': 0}))
pd.lookup_ids[IdType.IMDB] = d['external_ids'].get('imdb_id')
pd.metadata['cover_image_url'] = ('https://image.tmdb.org/t/p/original/' + d['poster_path']) if d['poster_path'] else None
pd.metadata['title'] = pd.metadata['title'] if pd.metadata['title'] else f'Season {d["season_number"]}'
pd.metadata['episode_number_list'] = list(map(lambda ep: ep['episode_number'], d['episodes']))
pd.metadata['episode_count'] = len(pd.metadata['episode_number_list'])
if pd.metadata["cover_image_url"]:
imgdl = BasicImageDownloader(pd.metadata["cover_image_url"], self.url)
try:
pd.cover_image = imgdl.download().content
pd.cover_image_extention = imgdl.extention
except Exception:
logger.debug(f'failed to download cover for {self.url} from {pd.metadata["cover_image_url"]}')
# get external id from 1st episode
if pd.lookup_ids[IdType.IMDB]:
logger.warning("Unexpected IMDB id for TMDB tv season")
elif len(pd.metadata['episode_number_list']) == 0:
logger.warning("Unable to lookup IMDB id for TMDB tv season with zero episodes")
else:
ep = pd.metadata['episode_number_list'][0]
api_url2 = f"https://api.themoviedb.org/3/tv/{v[0]}/season/{v[1]}/episode/{ep}?api_key={settings.TMDB_API3_KEY}&language=zh-CN&append_to_response=external_ids,credits"
d2 = BasicDownloader(api_url2).download().json()
if not d2.get('id'):
raise ParseError('episode id for season')
pd.lookup_ids[IdType.IMDB] = d2['external_ids'].get('imdb_id')
return pd

8
catalog/tests.py Normal file
View file

@ -0,0 +1,8 @@
from django.test import TestCase
from catalog.book.tests import *
from catalog.movie.tests import *
from catalog.tv.tests import *
from catalog.podcast.tests import *
from catalog.performance.tests import *
# imported tests with same name might be ignored silently

53
catalog/tv/models.py Normal file
View file

@ -0,0 +1,53 @@
"""
Models for TV
TVShow -> TVSeason -> TVEpisode
TVEpisode is not fully implemented at the moment
Three way linking between Douban / IMDB / TMDB are quite messy
IMDB:
most widely used.
no ID for Season, only for Show and Episode
TMDB:
most friendly API.
for some TV specials, both shown as an Episode of Season 0 and a Movie, with same IMDB id
Douban:
most wanted by our users.
for single season show, IMDB id of the show id used
for multi season show, IMDB id for Ep 1 will be used to repensent that season
tv specials are are shown as movies
For now, we follow Douban convention, but keep an eye on it in case it breaks its own rules...
"""
from catalog.common import *
from django.db import models
class TVShow(Item):
imdb = PrimaryLookupIdDescriptor(IdType.IMDB)
tmdb_tv = PrimaryLookupIdDescriptor(IdType.TMDB_TV)
imdb = PrimaryLookupIdDescriptor(IdType.IMDB)
season_count = jsondata.IntegerField(blank=True, default=None)
class TVSeason(Item):
douban_movie = PrimaryLookupIdDescriptor(IdType.DoubanMovie)
imdb = PrimaryLookupIdDescriptor(IdType.IMDB)
tmdb_tvseason = PrimaryLookupIdDescriptor(IdType.TMDB_TVSeason)
series = models.ForeignKey(TVShow, null=True, on_delete=models.SET_NULL, related_name='seasons')
season_number = models.PositiveIntegerField()
episode_count = jsondata.IntegerField(blank=True, default=None)
METADATA_COPY_LIST = ['title', 'brief', 'season_number', 'episode_count']
class TVEpisode(Item):
series = models.ForeignKey(TVShow, null=True, on_delete=models.SET_NULL, related_name='episodes')
season = models.ForeignKey(TVSeason, null=True, on_delete=models.SET_NULL, related_name='episodes')
episode_number = models.PositiveIntegerField()
imdb = PrimaryLookupIdDescriptor(IdType.IMDB)
METADATA_COPY_LIST = ['title', 'brief', 'episode_number']

124
catalog/tv/tests.py Normal file
View file

@ -0,0 +1,124 @@
from django.test import TestCase
from catalog.common import *
from catalog.tv.models import *
class TMDBTVTestCase(TestCase):
def test_parse(self):
t_id = '57243'
t_url = 'https://www.themoviedb.org/tv/57243-doctor-who'
t_url1 = 'https://www.themoviedb.org/tv/57243-doctor-who/seasons'
t_url2 = 'https://www.themoviedb.org/tv/57243'
p1 = SiteList.get_site_by_id_type(IdType.TMDB_TV)
self.assertIsNotNone(p1)
self.assertEqual(p1.validate_url(t_url), True)
self.assertEqual(p1.validate_url(t_url1), True)
self.assertEqual(p1.validate_url(t_url2), True)
p2 = SiteList.get_site_by_url(t_url)
self.assertEqual(p1.id_to_url(t_id), t_url2)
self.assertEqual(p2.url_to_id(t_url), t_id)
wrong_url = 'https://www.themoviedb.org/tv/57243-doctor-who/season/13'
s1 = SiteList.get_site_by_url(wrong_url)
self.assertNotIsInstance(s1, TVShow)
@use_local_response
def test_scrape(self):
t_url = 'https://www.themoviedb.org/tv/57243-doctor-who'
site = SiteList.get_site_by_url(t_url)
self.assertEqual(site.ready, False)
self.assertEqual(site.id_value, '57243')
site.get_page_ready()
self.assertEqual(site.ready, True)
self.assertEqual(site.page.metadata['title'], '神秘博士')
self.assertEqual(site.page.item.primary_lookup_id_type, IdType.IMDB)
self.assertEqual(site.page.item.__class__.__name__, 'TVShow')
self.assertEqual(site.page.item.imdb, 'tt0436992')
class TMDBTVSeasonTestCase(TestCase):
def test_parse(self):
t_id = '57243-11'
t_url = 'https://www.themoviedb.org/tv/57243-doctor-who/season/11'
t_url_unique = 'https://www.themoviedb.org/tv/57243/season/11'
p1 = SiteList.get_site_by_id_type(IdType.TMDB_TVSeason)
self.assertIsNotNone(p1)
self.assertEqual(p1.validate_url(t_url), True)
self.assertEqual(p1.validate_url(t_url_unique), True)
p2 = SiteList.get_site_by_url(t_url)
self.assertEqual(p1.id_to_url(t_id), t_url_unique)
self.assertEqual(p2.url_to_id(t_url), t_id)
@use_local_response
def test_scrape(self):
t_url = 'https://www.themoviedb.org/tv/57243-doctor-who/season/4'
site = SiteList.get_site_by_url(t_url)
self.assertEqual(site.ready, False)
self.assertEqual(site.id_value, '57243-4')
site.get_page_ready()
self.assertEqual(site.ready, True)
self.assertEqual(site.page.metadata['title'], '第 4 季')
self.assertEqual(site.page.item.primary_lookup_id_type, IdType.IMDB)
self.assertEqual(site.page.item.__class__.__name__, 'TVSeason')
self.assertEqual(site.page.item.imdb, 'tt1159991')
class DoubanMovieTVTestCase(TestCase):
@use_local_response
def test_scrape(self):
url3 = 'https://movie.douban.com/subject/3627919/'
p3 = SiteList.get_site_by_url(url3).get_page_ready()
self.assertEqual(p3.item.__class__.__name__, 'TVSeason')
@use_local_response
def test_scrape_singleseason(self):
url3 = 'https://movie.douban.com/subject/26895436/'
p3 = SiteList.get_site_by_url(url3).get_page_ready()
self.assertEqual(p3.item.__class__.__name__, 'TVShow')
class MultiTVSitesTestCase(TestCase):
@use_local_response
def test_tvshows(self):
url1 = 'https://www.themoviedb.org/tv/57243-doctor-who'
url2 = 'https://www.imdb.com/title/tt0436992/'
# url3 = 'https://movie.douban.com/subject/3541415/'
p1 = SiteList.get_site_by_url(url1).get_page_ready()
p2 = SiteList.get_site_by_url(url2).get_page_ready()
# p3 = SiteList.get_site_by_url(url3).get_page_ready()
self.assertEqual(p1.item.id, p2.item.id)
# self.assertEqual(p2.item.id, p3.item.id)
@use_local_response
def test_tvseasons(self):
url1 = 'https://www.themoviedb.org/tv/57243-doctor-who/season/4'
url2 = 'https://www.imdb.com/title/tt1159991/'
url3 = 'https://movie.douban.com/subject/3627919/'
p1 = SiteList.get_site_by_url(url1).get_page_ready()
p2 = SiteList.get_site_by_url(url2).get_page_ready()
p3 = SiteList.get_site_by_url(url3).get_page_ready()
self.assertEqual(p1.item.imdb, p2.item.imdb)
self.assertEqual(p2.item.imdb, p3.item.imdb)
self.assertEqual(p1.item.id, p2.item.id)
self.assertEqual(p2.item.id, p3.item.id)
@use_local_response
def test_miniseries(self):
url1 = 'https://www.themoviedb.org/tv/86941-the-north-water'
url3 = 'https://movie.douban.com/subject/26895436/'
p1 = SiteList.get_site_by_url(url1).get_page_ready()
p3 = SiteList.get_site_by_url(url3).get_page_ready()
self.assertEqual(p3.item.__class__.__name__, 'TVShow')
self.assertEqual(p1.item.id, p3.item.id)
@use_local_response
def test_tvspecial(self):
url1 = 'https://www.themoviedb.org/movie/282758-doctor-who-the-runaway-bride'
url2 = 'hhttps://www.imdb.com/title/tt0827573/'
url3 = 'https://movie.douban.com/subject/4296866/'
p1 = SiteList.get_site_by_url(url1).get_page_ready()
p2 = SiteList.get_site_by_url(url2).get_page_ready()
p3 = SiteList.get_site_by_url(url3).get_page_ready()
self.assertEqual(p1.item.imdb, p2.item.imdb)
self.assertEqual(p2.item.imdb, p3.item.imdb)
self.assertEqual(p1.item.id, p2.item.id)
self.assertEqual(p2.item.id, p3.item.id)

6
catalog/urls.py Normal file
View file

@ -0,0 +1,6 @@
from django.urls import path
from .api import api
urlpatterns = [
path("", api.urls),
]

3
catalog/views.py Normal file
View file

@ -0,0 +1,3 @@
from django.shortcuts import render
# Create your views here.

View file

@ -0,0 +1 @@
{"movie_results":[],"person_results":[],"tv_results":[{"adult":false,"backdrop_path":"/sRfl6vyzGWutgG0cmXmbChC4iN6.jpg","id":57243,"name":"神秘博士","original_language":"en","original_name":"Doctor Who","overview":"名为“博士”的宇宙最后一个时间领主有着重生的能力、体力及优越的智力利用时光机器TARDIS英国传统的蓝色警亭展开他勇敢的时光冒险之旅拯救外星生物、地球与时空。","poster_path":"/sz4zF5z9zyFh8Z6g5IQPNq91cI7.jpg","media_type":"tv","genre_ids":[10759,18,10765],"popularity":158.575,"first_air_date":"2005-03-26","vote_average":7.402,"vote_count":2475,"origin_country":["GB"]}],"tv_episode_results":[],"tv_season_results":[]}

View file

@ -0,0 +1 @@
{"movie_results":[{"adult":false,"backdrop_path":"/13qDzilftzRZMUEHcpi57VLqNPw.jpg","id":282758,"title":"神秘博士:逃跑新娘","original_language":"en","original_title":"Doctor Who: The Runaway Bride","overview":"失去了罗斯的博士正在心灰意冷而正在举行婚礼的多娜却被突然传送到塔迪斯里。博士带坏脾气的多娜返回地球却被一群外星机器人追杀塔迪斯上演了一场公路飚车。后来博士发现多娜身上带有异常含量的Huon粒子而该粒子来源于上一代宇宙霸主。而博士的母星加利弗雷在宇宙中崛起时已经消灭了所有的Huon粒子。最终博士揭开了一个藏于地球40亿年的秘密。","poster_path":"/gkTCC4VLv8jATM3kouAUK3EaoGd.jpg","media_type":"movie","genre_ids":[878],"popularity":7.214,"release_date":"2006-12-25","video":false,"vote_average":7.739,"vote_count":201}],"person_results":[],"tv_results":[],"tv_episode_results":[{"id":1008547,"name":"2006年圣诞特辑逃跑新娘","overview":"失去了罗斯的博士正在心灰意冷而正在举行婚礼的多娜却被突然传送到塔迪斯里。博士带坏脾气的多娜返回地球却被一群外星机器人追杀塔迪斯上演了一场公路飚车。后来博士发现多娜身上带有异常含量的Huon粒子而该粒子来源于上一代宇宙霸主。而博士的母星加利弗雷在宇宙中崛起时已经消灭了所有的Huon粒子。最终博士揭开了一个藏于地球40亿年的秘密。","media_type":"tv_episode","vote_average":6.8,"vote_count":14,"air_date":"2006-12-25","episode_number":4,"production_code":"NCFT094N","runtime":64,"season_number":0,"show_id":57243,"still_path":"/mkJufoqvEBMVvnVUjYlR9lGarZB.jpg"}],"tv_season_results":[]}

View file

@ -0,0 +1 @@
{"movie_results":[],"person_results":[],"tv_results":[],"tv_episode_results":[{"id":941505,"name":"活宝搭档","overview":"博士在伦敦发现艾迪派斯公司新产品药物有问题人类服用后会悄悄的产生土豆状生物并在夜里1点10分逃走回到保姆身边于是博士潜入公司决定探查究竟在探查时遇到了多娜原来Adiposian人丢失了他们的繁育星球于是跑到地球利用人类做代孕母繁殖宝宝。最后保姆在高空中被抛弃脂肪球回到了父母身边博士邀请多娜一同旅行。【Rose从平行宇宙回归】","media_type":"tv_episode","vote_average":7.2,"vote_count":43,"air_date":"2008-04-05","episode_number":1,"production_code":"","runtime":null,"season_number":4,"show_id":57243,"still_path":"/cq1zrCS267vGXa3rCYQkVKNJE9v.jpg"}],"tv_season_results":[]}

View file

@ -0,0 +1 @@
{"movie_results":[{"adult":false,"backdrop_path":"/s3TBrRGB1iav7gFOCNx3H31MoES.jpg","id":27205,"title":"盗梦空间","original_language":"en","original_title":"Inception","overview":"道姆·柯布与同事阿瑟和纳什在一次针对日本能源大亨齐藤的盗梦行动中失败,反被齐藤利用。齐藤威逼利诱因遭通缉而流亡海外的柯布帮他拆分他竞争对手的公司,采取极端措施在其唯一继承人罗伯特·费希尔的深层潜意识中种下放弃家族公司、自立门户的想法。为了重返美国,柯布偷偷求助于岳父迈尔斯,吸收了年轻的梦境设计师艾里阿德妮、梦境演员艾姆斯和药剂师约瑟夫加入行动。在一层层递进的梦境中,柯布不仅要对付费希尔潜意识的本能反抗,还必须直面已逝妻子梅的处处破坏,实际情况远比预想危险得多…","poster_path":"/lQEjWasu07JbQHdfFI5VnEUfId2.jpg","media_type":"movie","genre_ids":[28,878,12],"popularity":74.425,"release_date":"2010-07-15","video":false,"vote_average":8.359,"vote_count":32695}],"person_results":[],"tv_results":[],"tv_episode_results":[],"tv_season_results":[]}

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,10 @@
{
"resultCount":1,
"results": [
{"wrapperType":"track", "kind":"podcast", "artistId":127981066, "collectionId":1050430296, "trackId":1050430296, "artistName":"WNYC Studios and The New Yorker", "collectionName":"The New Yorker Radio Hour", "trackName":"The New Yorker Radio Hour", "collectionCensoredName":"The New Yorker Radio Hour", "trackCensoredName":"The New Yorker Radio Hour", "artistViewUrl":"https://podcasts.apple.com/us/artist/wnyc/127981066?uo=4", "collectionViewUrl":"https://podcasts.apple.com/us/podcast/the-new-yorker-radio-hour/id1050430296?uo=4", "feedUrl":"http://feeds.feedburner.com/newyorkerradiohour", "trackViewUrl":"https://podcasts.apple.com/us/podcast/the-new-yorker-radio-hour/id1050430296?uo=4", "artworkUrl30":"https://is2-ssl.mzstatic.com/image/thumb/Podcasts115/v4/e3/83/42/e38342fa-712d-ec74-2f31-946601e04e27/mza_2714925949638887112.png/30x30bb.jpg", "artworkUrl60":"https://is2-ssl.mzstatic.com/image/thumb/Podcasts115/v4/e3/83/42/e38342fa-712d-ec74-2f31-946601e04e27/mza_2714925949638887112.png/60x60bb.jpg", "artworkUrl100":"https://is2-ssl.mzstatic.com/image/thumb/Podcasts115/v4/e3/83/42/e38342fa-712d-ec74-2f31-946601e04e27/mza_2714925949638887112.png/100x100bb.jpg", "collectionPrice":0.00, "trackPrice":0.00, "collectionHdPrice":0, "releaseDate":"2022-11-29T11:00:00Z", "collectionExplicitness":"notExplicit", "trackExplicitness":"cleaned", "trackCount":150, "trackTimeMillis":1097, "country":"USA", "currency":"USD", "primaryGenreName":"News Commentary", "contentAdvisoryRating":"Clean", "artworkUrl600":"https://is2-ssl.mzstatic.com/image/thumb/Podcasts115/v4/e3/83/42/e38342fa-712d-ec74-2f31-946601e04e27/mza_2714925949638887112.png/600x600bb.jpg", "genreIds":["1530", "26", "1489", "1527"], "genres":["News Commentary", "Podcasts", "News", "Politics"]}]
}

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long