individual scrapers

This commit is contained in:
Your Name 2021-12-10 21:55:16 -05:00
parent f7248b2e0c
commit e62c5987e7
10 changed files with 1741 additions and 1658 deletions

File diff suppressed because it is too large Load diff

common/scrapers/ Normal file
View file

@ -0,0 +1,198 @@
import re
from common.models import SourceSiteEnum
from movies.models import Movie, MovieGenreEnum
from movies.forms import MovieForm
from books.models import Book
from books.forms import BookForm
from music.models import Album, Song
from music.forms import AlbumForm, SongForm
from games.models import Game
from games.forms import GameForm
from common.scraper import *
def find_entity(source_url):
for bangumi
# to be added when new scrape method is implemented
result = Game.objects.filter(source_url=source_url)
if result:
return result[0]
raise ObjectDoesNotExist
class BangumiScraper(AbstractScraper):
site_name = SourceSiteEnum.BANGUMI.value
host = ''
# for interface coherence
data_class = type("FakeDataClass", (object,), {})()
data_class.objects = type("FakeObjectsClass", (object,), {})()
data_class.objects.get = find_entity
# should be set at scrape_* method
form_class = ''
regex = re.compile(r"https{0,1}://bgm\.tv/subject/\d+")
def scrape(self, url):
This is the scraping portal
headers['Host'] =
content = self.download_page(url, headers)
# download image
img_url = 'http:' + content.xpath("//div[@class='infobox']//img[1]/@src")[0]
raw_img, ext = self.download_image(img_url, url)
# Test category
category_code = content.xpath("//div[@id='headerSearch']//option[@selected]/@value")[0]
handler_map = {
'1': self.scrape_book,
'2': self.scrape_movie,
'3': self.scrape_album,
'4': self.scrape_game
data = handler_map[category_code](self, content)
data['source_url'] = self.get_effective_url(url)
self.raw_data, self.raw_img, self.img_ext = data, raw_img, ext
return data, raw_img
def scrape_game(self, content):
self.data_class = Game
self.form_class = GameForm
title_elem = content.xpath("//a[@property='v:itemreviewed']/text()")
if not title_elem:
raise ValueError("no game info found on this page")
title = None
title = title_elem[0].strip()
other_title_elem = content.xpath(
if not other_title_elem:
other_title_elem = content.xpath(
other_title = other_title_elem if other_title_elem else []
chinese_name_elem = content.xpath(
if not chinese_name_elem:
chinese_name_elem = content.xpath(
if chinese_name_elem:
chinese_name = chinese_name_elem[0]
# switch chinese name with original name
title, chinese_name = chinese_name, title
# actually the name appended is original
developer_elem = content.xpath(
if not developer_elem:
developer_elem = content.xpath(
developer = developer_elem if developer_elem else None
publisher_elem = content.xpath(
if not publisher_elem:
publisher_elem = content.xpath(
publisher = publisher_elem if publisher_elem else None
platform_elem = content.xpath(
if not platform_elem:
platform_elem = content.xpath(
platform = platform_elem if platform_elem else None
genre_elem = content.xpath(
if not genre_elem:
genre_elem = content.xpath(
genre = genre_elem if genre_elem else None
date_elem = content.xpath(
if not date_elem:
date_elem = content.xpath(
release_date = parse_date(date_elem[0]) if date_elem else None
brief = ''.join(content.xpath("//div[@property='v:summary']/text()"))
other_info = {}
other_elem = content.xpath(
if other_elem:
other_info['游玩人数'] = other_elem[0]
other_elem = content.xpath(
if other_elem:
other_info['引擎'] = ' '.join(other_elem)
other_elem = content.xpath(
if other_elem:
other_info['售价'] = ' '.join(other_elem)
other_elem = content.xpath(
if other_elem:
other_info['网站'] = other_elem[0]
other_elem = content.xpath(
"//ul[@id='infobox']/li[child::span[contains(text(),'剧本')]]/a/text()") or content.xpath(
if other_elem:
other_info['剧本'] = ' '.join(other_elem)
other_elem = content.xpath(
"//ul[@id='infobox']/li[child::span[contains(text(),'编剧')]]/a/text()") or content.xpath(
if other_elem:
other_info['编剧'] = ' '.join(other_elem)
other_elem = content.xpath(
"//ul[@id='infobox']/li[child::span[contains(text(),'音乐')]]/a/text()") or content.xpath(
if other_elem:
other_info['音乐'] = ' '.join(other_elem)
other_elem = content.xpath(
"//ul[@id='infobox']/li[child::span[contains(text(),'美术')]]/a/text()") or content.xpath(
if other_elem:
other_info['美术'] = ' '.join(other_elem)
data = {
'title': title,
'other_title': None,
'developer': developer,
'publisher': publisher,
'release_date': release_date,
'genre': genre,
'platform': platform,
'brief': brief,
'other_info': other_info,
'source_site': self.site_name,
return data
def scrape_movie(self, content):
self.data_class = Movie
self.form_class = MovieForm
raise NotImplementedError
def scrape_book(self, content):
self.data_class = Book
self.form_class = BookForm
raise NotImplementedError
def scrape_album(self, content):
self.data_class = Album
self.form_class = AlbumForm
raise NotImplementedError

common/scrapers/ Normal file
View file

@ -0,0 +1,687 @@
import requests
import re
import filetype
from lxml import html
from common.models import SourceSiteEnum
from movies.models import Movie, MovieGenreEnum
from movies.forms import MovieForm
from books.models import Book
from books.forms import BookForm
from music.models import Album
from music.forms import AlbumForm
from games.models import Game
from games.forms import GameForm
from django.conf import settings
from PIL import Image
from io import BytesIO
from common.scraper import *
class DoubanScrapperMixin:
def download_page(cls, url, headers):
url = cls.get_effective_url(url)
r = None
error = 'DoubanScrapper: error occured when downloading ' + url
content = None
last_error = None
def get(url):
nonlocal r
# print('Douban GET ' + url)
r = requests.get(url, timeout=settings.SCRAPING_TIMEOUT)
except Exception as e:
r = requests.Response()
r.status_code = f"Exception when GET {url} {e}" + url
# print('Douban CODE ' + str(r.status_code))
return r
def check_content():
nonlocal r, error, content, last_error
content = None
last_error = None
if r.status_code == 200:
content = r.content.decode('utf-8')
if content.find('关于豆瓣') == -1:
if content.find('你的 IP 发出') == -1:
error = error + 'Content not authentic' # response is garbage
error = error + 'IP banned'
content = None
last_error = 'network'
elif'不存在[^<]+</title>', content, re.MULTILINE):
content = None
last_error = 'censorship'
error = error + 'Not found or hidden by Douban'
last_error = 'network'
error = error + str(r.status_code)
def fix_wayback_links():
nonlocal content
# fix links
content = re.sub(r'href="http[^"]+http', r'href="http', content)
content = re.sub(r'src="[^"]+/(s\d+\.\w+)"',
r'src="\1"', content)
content = re.sub(r'src="[^"]+/(p\d+\.\w+)"',
r'src="\1"', content)
# Wayback Machine: get latest available
def wayback():
nonlocal r, error, content
error = error + '\nWayback: '
get('' + url)
if r.status_code == 200:
w = r.json()
if w['archived_snapshots'] and w['archived_snapshots']['closest']:
if content is not None:
error = error + 'No snapshot available'
error = error + str(r.status_code)
# Wayback Machine: guess via CDX API
def wayback_cdx():
nonlocal r, error, content
error = error + '\nWayback: '
get('' + url)
if r.status_code == 200:
dates = re.findall(r'[^\s]+\s+(\d+)\s+[^\s]+\s+[^\s]+\s+\d+\s+[^\s]+\s+\d{5,}',
# assume snapshots whose size >9999 contain real content, use the latest one of them
if len(dates) > 0:
get('' + dates[-1] + '/' + url)
if content is not None:
error = error + 'No snapshot available'
error = error + str(r.status_code)
def latest():
nonlocal r, error, content
if settings.SCRAPESTACK_KEY is not None:
error = error + '\nScrapeStack: '
elif settings.SCRAPERAPI_KEY is not None:
error = error + '\nScraperAPI: '
error = error + '\nDirect: '
if last_error == 'network' and settings.PROXYCRAWL_KEY is not None:
error = error + '\nProxyCrawl: '
if content is None:
if content is None:
raise RuntimeError(error)
# with open('/tmp/temp.html', 'w', encoding='utf-8') as fp:
# fp.write(content)
return html.fromstring(content)
def download_image(cls, url, item_url=None):
raw_img = None
ext = None
if settings.SCRAPESTACK_KEY is not None:
dl_url = f'{settings.SCRAPESTACK_KEY}&url={url}'
elif settings.SCRAPERAPI_KEY is not None:
dl_url = f'{settings.SCRAPERAPI_KEY}&url={url}'
dl_url = url
img_response = requests.get(dl_url, timeout=settings.SCRAPING_TIMEOUT)
if img_response.status_code == 200:
raw_img = img_response.content
img =
img.load() # corrupted image will trigger exception
content_type = img_response.headers.get('Content-Type')
ext = filetype.get_type(mime=content_type.partition(';')[0].strip()).extension
logger.error(f"Douban: download image failed {img_response.status_code} {dl_url} {item_url}")
# raise RuntimeError(f"Douban: download image failed {img_response.status_code} {dl_url}")
except Exception as e:
raw_img = None
ext = None
logger.error(f"Douban: download image failed {e} {dl_url} {item_url}")
if raw_img is None and settings.PROXYCRAWL_KEY is not None:
dl_url = f'{settings.PROXYCRAWL_KEY}&url={url}'
img_response = requests.get(dl_url, timeout=settings.SCRAPING_TIMEOUT)
if img_response.status_code == 200:
raw_img = img_response.content
img =
img.load() # corrupted image will trigger exception
content_type = img_response.headers.get('Content-Type')
ext = filetype.get_type(mime=content_type.partition(';')[0].strip()).extension
logger.error(f"Douban: download image failed {img_response.status_code} {dl_url} {item_url}")
except Exception as e:
raw_img = None
ext = None
logger.error(f"Douban: download image failed {e} {dl_url} {item_url}")
return raw_img, ext
class DoubanBookScraper(DoubanScrapperMixin, AbstractScraper):
site_name = SourceSiteEnum.DOUBAN.value
host = ""
data_class = Book
form_class = BookForm
regex = re.compile(r"https://book\.douban\.com/subject/\d+/{0,1}")
def scrape(self, url):
headers['Host'] =
content = self.download_page(url, headers)
# parsing starts here
title = content.xpath("/html/body//h1/span/text()")[0].strip()
except IndexError:
raise ValueError("given url contains no book info")
subtitle_elem = content.xpath(
subtitle = subtitle_elem[0].strip() if subtitle_elem else None
orig_title_elem = content.xpath(
orig_title = orig_title_elem[0].strip() if orig_title_elem else None
language_elem = content.xpath(
language = language_elem[0].strip() if language_elem else None
pub_house_elem = content.xpath(
pub_house = pub_house_elem[0].strip() if pub_house_elem else None
pub_date_elem = content.xpath(
pub_date = pub_date_elem[0].strip() if pub_date_elem else ''
year_month_day = RE_NUMBERS.findall(pub_date)
if len(year_month_day) in (2, 3):
pub_year = int(year_month_day[0])
pub_month = int(year_month_day[1])
elif len(year_month_day) == 1:
pub_year = int(year_month_day[0])
pub_month = None
pub_year = None
pub_month = None
if pub_year and pub_month and pub_year < pub_month:
pub_year, pub_month = pub_month, pub_year
pub_year = None if pub_year is not None and pub_year not in range(
0, 3000) else pub_year
pub_month = None if pub_month is not None and pub_month not in range(
1, 12) else pub_month
binding_elem = content.xpath(
binding = binding_elem[0].strip() if binding_elem else None
price_elem = content.xpath(
price = price_elem[0].strip() if price_elem else None
pages_elem = content.xpath(
pages = pages_elem[0].strip() if pages_elem else None
if pages is not None:
pages = int(RE_NUMBERS.findall(pages)[
0]) if RE_NUMBERS.findall(pages) else None
isbn_elem = content.xpath(
isbn = isbn_elem[0].strip() if isbn_elem else None
brief_elem = content.xpath(
brief = '\n'.join(p.strip()
for p in brief_elem) if brief_elem else None
contents = None
contents_elem = content.xpath(
# if next the id of next sibling contains `dir`, that would be the full contents
if "dir" in contents_elem.getnext().xpath("@id")[0]:
contents_elem = contents_elem.getnext()
contents = '\n'.join(p.strip() for p in contents_elem.xpath(
"text()")[:-2]) if contents_elem else None
contents = '\n'.join(p.strip() for p in contents_elem.xpath(
"text()")) if contents_elem else None
except Exception:
img_url_elem = content.xpath("//*[@id='mainpic']/a/img/@src")
img_url = img_url_elem[0].strip() if img_url_elem else None
raw_img, ext = self.download_image(img_url, url)
# there are two html formats for authors and translators
authors_elem = content.xpath("""//div[@id='info']//span[text()='作者:']/following-sibling::br[1]/
if not authors_elem:
authors_elem = content.xpath(
"""//div[@id='info']//span[text()=' 作者']/following-sibling::a/text()""")
if authors_elem:
authors = []
for author in authors_elem:
authors.append(RE_WHITESPACES.sub(' ', author.strip()))
authors = None
translators_elem = content.xpath("""//div[@id='info']//span[text()='译者:']/following-sibling::br[1]/
if not translators_elem:
translators_elem = content.xpath(
"""//div[@id='info']//span[text()=' 译者']/following-sibling::a/text()""")
if translators_elem:
translators = []
for translator in translators_elem:
translators.append(RE_WHITESPACES.sub(' ', translator.strip()))
translators = None
other = {}
cncode_elem = content.xpath(
if cncode_elem:
other['统一书号'] = cncode_elem[0].strip()
series_elem = content.xpath(
if series_elem:
other['丛书'] = series_elem[0].strip()
imprint_elem = content.xpath(
if imprint_elem:
other['出品方'] = imprint_elem[0].strip()
data = {
'title': title,
'subtitle': subtitle,
'orig_title': orig_title,
'author': authors,
'translator': translators,
'language': language,
'pub_house': pub_house,
'pub_year': pub_year,
'pub_month': pub_month,
'binding': binding,
'price': price,
'pages': pages,
'isbn': isbn,
'brief': brief,
'contents': contents,
'other_info': other,
'source_site': self.site_name,
'source_url': self.get_effective_url(url),
self.raw_data, self.raw_img, self.img_ext = data, raw_img, ext
return data, raw_img
class DoubanMovieScraper(DoubanScrapperMixin, AbstractScraper):
site_name = SourceSiteEnum.DOUBAN.value
host = ''
data_class = Movie
form_class = MovieForm
regex = re.compile(r"https://movie\.douban\.com/subject/\d+/{0,1}")
def scrape(self, url):
headers['Host'] =
content = self.download_page(url, headers)
# parsing starts here
raw_title = content.xpath(
except IndexError:
raise ValueError("given url contains no movie info")
orig_title = content.xpath(
title = raw_title.split(orig_title)[0].strip()
# if has no chinese title
if title == '':
title = orig_title
if title == orig_title:
orig_title = None
# there are two html formats for authors and translators
other_title_elem = content.xpath(
other_title = other_title_elem[0].strip().split(
' / ') if other_title_elem else None
imdb_elem = content.xpath(
if not imdb_elem:
imdb_elem = content.xpath(
imdb_code = imdb_elem[0].strip() if imdb_elem else None
director_elem = content.xpath(
director = director_elem if director_elem else None
playwright_elem = content.xpath(
playwright = playwright_elem if playwright_elem else None
actor_elem = content.xpath(
actor = actor_elem if actor_elem else None
# construct genre translator
genre_translator = {}
attrs = [attr for attr in dir(MovieGenreEnum) if '__' not in attr]
for attr in attrs:
genre_translator[getattr(MovieGenreEnum, attr).label] = getattr(
MovieGenreEnum, attr).value
genre_elem = content.xpath("//span[@property='v:genre']/text()")
if genre_elem:
genre = []
for g in genre_elem:
genre = None
showtime_elem = content.xpath(
if showtime_elem:
showtime = []
for st in showtime_elem:
parts = st.split('(')
if len(parts) == 1:
time = st.split('(')[0]
region = ''
time = st.split('(')[0]
region = st.split('(')[1][0:-1]
showtime.append({time: region})
showtime = None
site_elem = content.xpath(
site = site_elem[0].strip() if site_elem else None
area_elem = content.xpath(
if area_elem:
area = [a.strip() for a in area_elem[0].split(' / ')]
area = None
language_elem = content.xpath(
if language_elem:
language = [a.strip() for a in language_elem[0].split(' / ')]
language = None
year_elem = content.xpath("//span[@class='year']/text()")
year = int(year_elem[0][1:-1]) if year_elem else None
duration_elem = content.xpath("//span[@property='v:runtime']/text()")
other_duration_elem = content.xpath(
if duration_elem:
duration = duration_elem[0].strip()
if other_duration_elem:
duration += other_duration_elem[0].rstrip()
duration = None
season_elem = content.xpath(
if not season_elem:
season_elem = content.xpath(
season = int(season_elem[0].strip()) if season_elem else None
season = int(season_elem[0].strip())
episodes_elem = content.xpath(
episodes = int(episodes_elem[0].strip()) if episodes_elem else None
single_episode_length_elem = content.xpath(
single_episode_length = single_episode_length_elem[0].strip(
) if single_episode_length_elem else None
# if has field `episodes` not none then must be series
is_series = True if episodes else False
brief_elem = content.xpath("//span[@class='all hidden']")
if not brief_elem:
brief_elem = content.xpath("//span[@property='v:summary']")
brief = '\n'.join([e.strip() for e in brief_elem[0].xpath(
'./text()')]) if brief_elem else None
img_url_elem = content.xpath("//img[@rel='v:image']/@src")
img_url = img_url_elem[0].strip() if img_url_elem else None
raw_img, ext = self.download_image(img_url, url)
data = {
'title': title,
'orig_title': orig_title,
'other_title': other_title,
'imdb_code': imdb_code,
'director': director,
'playwright': playwright,
'actor': actor,
'genre': genre,
'showtime': showtime,
'site': site,
'area': area,
'language': language,
'year': year,
'duration': duration,
'season': season,
'episodes': episodes,
'single_episode_length': single_episode_length,
'brief': brief,
'is_series': is_series,
'source_site': self.site_name,
'source_url': self.get_effective_url(url),
self.raw_data, self.raw_img, self.img_ext = data, raw_img, ext
return data, raw_img
class DoubanAlbumScraper(DoubanScrapperMixin, AbstractScraper):
site_name = SourceSiteEnum.DOUBAN.value
host = ''
data_class = Album
form_class = AlbumForm
regex = re.compile(r"https://music\.douban\.com/subject/\d+/{0,1}")
def scrape(self, url):
headers['Host'] =
content = self.download_page(url, headers)
# parsing starts here
title = content.xpath("//h1/span/text()")[0].strip()
except IndexError:
raise ValueError("given url contains no album info")
if not title:
raise ValueError("given url contains no album info")
artists_elem = content.xpath("//div[@id='info']/span/span[@class='pl']/a/text()")
artist = None if not artists_elem else artists_elem
genre_elem = content.xpath(
genre = genre_elem[0].strip() if genre_elem else None
date_elem = content.xpath(
release_date = parse_date(date_elem[0].strip()) if date_elem else None
company_elem = content.xpath(
company = company_elem[0].strip() if company_elem else None
track_list_elem = content.xpath(
if track_list_elem:
track_list = '\n'.join([track.strip() for track in track_list_elem])
track_list = None
brief_elem = content.xpath("//span[@class='all hidden']")
if not brief_elem:
brief_elem = content.xpath("//span[@property='v:summary']")
brief = '\n'.join([e.strip() for e in brief_elem[0].xpath(
'./text()')]) if brief_elem else None
other_info = {}
other_elem = content.xpath(
if other_elem:
other_info['又名'] = other_elem[0].strip()
other_elem = content.xpath(
if other_elem:
other_info['专辑类型'] = other_elem[0].strip()
other_elem = content.xpath(
if other_elem:
other_info['介质'] = other_elem[0].strip()
other_elem = content.xpath(
if other_elem:
other_info['ISRC'] = other_elem[0].strip()
other_elem = content.xpath(
if other_elem:
other_info['条形码'] = other_elem[0].strip()
other_elem = content.xpath(
if other_elem:
other_info['碟片数'] = other_elem[0].strip()
img_url_elem = content.xpath("//div[@id='mainpic']//img/@src")
img_url = img_url_elem[0].strip() if img_url_elem else None
raw_img, ext = self.download_image(img_url, url)
data = {
'title': title,
'artist': artist,
'genre': genre,
'release_date': release_date,
'duration': None,
'company': company,
'track_list': track_list,
'brief': brief,
'other_info': other_info,
'source_site': self.site_name,
'source_url': self.get_effective_url(url),
self.raw_data, self.raw_img, self.img_ext = data, raw_img, ext
return data, raw_img
class DoubanGameScraper(DoubanScrapperMixin, AbstractScraper):
site_name = SourceSiteEnum.DOUBAN.value
host = ''
data_class = Game
form_class = GameForm
regex = re.compile(r"https://www\.douban\.com/game/\d+/{0,1}")
def scrape(self, url):
headers['Host'] = ''
content = self.download_page(url, headers)
raw_title = content.xpath(
except IndexError:
raise ValueError("given url contains no game info")
title = raw_title
other_title_elem = content.xpath(
other_title = other_title_elem[0].strip().split(' / ') if other_title_elem else None
developer_elem = content.xpath(
developer = developer_elem[0].strip().split(' / ') if developer_elem else None
publisher_elem = content.xpath(
publisher = publisher_elem[0].strip().split(' / ') if publisher_elem else None
platform_elem = content.xpath(
platform = platform_elem if platform_elem else None
genre_elem = content.xpath(
genre = None
if genre_elem:
genre = [g for g in genre_elem if g != '游戏']
date_elem = content.xpath(
release_date = parse_date(date_elem[0].strip()) if date_elem else None
brief_elem = content.xpath("//div[@class='mod item-desc']/p/text()")
brief = '\n'.join(brief_elem) if brief_elem else None
img_url_elem = content.xpath(
img_url = img_url_elem[0].strip() if img_url_elem else None
raw_img, ext = self.download_image(img_url, url)
data = {
'title': title,
'other_title': other_title,
'developer': developer,
'publisher': publisher,
'release_date': release_date,
'genre': genre,
'platform': platform,
'brief': brief,
'other_info': None,
'source_site': self.site_name,
'source_url': self.get_effective_url(url),
self.raw_data, self.raw_img, self.img_ext = data, raw_img, ext
return data, raw_img

View file

@ -0,0 +1,156 @@
import requests
import re
import filetype
from lxml import html
from common.models import SourceSiteEnum
from movies.models import Movie, MovieGenreEnum
from movies.forms import MovieForm
from books.models import Book
from books.forms import BookForm
from music.models import Album, Song
from music.forms import AlbumForm, SongForm
from games.models import Game
from games.forms import GameForm
from django.conf import settings
from PIL import Image
from io import BytesIO
from common.scraper import *
class GoodreadsScraper(AbstractScraper):
site_name = SourceSiteEnum.GOODREADS.value
host = ""
data_class = Book
form_class = BookForm
regex = re.compile(r"https://www\.goodreads\.com/show/\d+")
def get_effective_url(cls, raw_url):
u = re.match(r"https://www\.goodreads\.com/book/show/\d+", raw_url)
return u[0] if u else None
def scrape(self, url, response=None):
This is the scraping portal
if response is not None:
content = html.fromstring(response.content.decode('utf-8'))
headers['Host'] =
content = self.download_page(url, headers)
title = content.xpath("//h1[@id='bookTitle']/text()")[0].strip()
except IndexError:
raise ValueError("given url contains no book info")
subtitle = None
orig_title_elem = content.xpath("//div[@id='bookDataBox']//div[text()='Original Title']/following-sibling::div/text()")
orig_title = orig_title_elem[0].strip() if orig_title_elem else None
language_elem = content.xpath('//div[@itemprop="inLanguage"]/text()')
language = language_elem[0].strip() if language_elem else None
pub_house_elem = content.xpath("//div[contains(text(), 'Published') and @class='row']/text()")
months = ['January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December']
r = re.compile('.*Published.*(' + '|'.join(months) + ').*(\\d\\d\\d\\d).+by\\s*(.+)\\s*', re.DOTALL)
pub = r.match(pub_house_elem[0])
pub_year = pub[2]
pub_month = months.index(pub[1]) + 1
pub_house = pub[3].strip()
except Exception:
pub_year = None
pub_month = None
pub_house = None
pub_house_elem = content.xpath("//nobr[contains(text(), 'first published')]/text()")
pub = re.match(r'.*first published\s+(.+\d\d\d\d).*', pub_house_elem[0], re.DOTALL)
first_pub = pub[1]
except Exception:
first_pub = None
binding_elem = content.xpath('//span[@itemprop="bookFormat"]/text()')
binding = binding_elem[0].strip() if binding_elem else None
pages_elem = content.xpath('//span[@itemprop="numberOfPages"]/text()')
pages = pages_elem[0].strip() if pages_elem else None
if pages is not None:
pages = int(RE_NUMBERS.findall(pages)[
0]) if RE_NUMBERS.findall(pages) else None
isbn_elem = content.xpath('//span[@itemprop="isbn"]/text()')
if not isbn_elem:
isbn_elem = content.xpath('//div[@itemprop="isbn"]/text()') # this is likely ASIN
isbn = isbn_elem[0].strip() if isbn_elem else None
brief_elem = content.xpath('//div[@id="description"]/span[@style="display:none"]/text()')
if brief_elem:
brief = '\n'.join(p.strip() for p in brief_elem)
brief_elem = content.xpath('//div[@id="description"]/span/text()')
brief = '\n'.join(p.strip() for p in brief_elem) if brief_elem else None
genre = content.xpath('//div[@class="bigBoxBody"]/div/div/div/a/text()')
genre = genre[0] if genre else None
book_title = re.sub('\n', '', content.xpath('//h1[@id="bookTitle"]/text()')[0]).strip()
author = content.xpath('//a[@class="authorName"]/span/text()')[0]
contents = None
img_url_elem = content.xpath("//img[@id='coverImage']/@src")
img_url = img_url_elem[0].strip() if img_url_elem else None
raw_img, ext = self.download_image(img_url, url)
authors_elem = content.xpath("//a[@class='authorName'][not(../span[@class='authorName greyText smallText role'])]/span/text()")
if authors_elem:
authors = []
for author in authors_elem:
authors.append(RE_WHITESPACES.sub(' ', author.strip()))
authors = None
translators = None
authors_elem = content.xpath("//a[@class='authorName'][../span/text()='(Translator)']/span/text()")
if authors_elem:
translators = []
for translator in authors_elem:
translators.append(RE_WHITESPACES.sub(' ', translator.strip()))
translators = None
other = {}
if first_pub:
other['首版时间'] = first_pub
if genre:
other['分类'] = genre
series_elem = content.xpath("//h2[@id='bookSeries']/a/text()")
if series_elem:
other['丛书'] = re.sub(r'\(\s*(.+[^\s])\s*#.*\)', '\\1', series_elem[0].strip())
data = {
'title': title,
'subtitle': subtitle,
'orig_title': orig_title,
'author': authors,
'translator': translators,
'language': language,
'pub_house': pub_house,
'pub_year': pub_year,
'pub_month': pub_month,
'binding': binding,
'pages': pages,
'isbn': isbn,
'brief': brief,
'contents': contents,
'other_info': other,
'cover_url': img_url,
'source_site': self.site_name,
'source_url': self.get_effective_url(url),
data['source_url'] = self.get_effective_url(url)
self.raw_data, self.raw_img, self.img_ext = data, raw_img, ext
return data, raw_img

common/scrapers/ Normal file
View file

@ -0,0 +1,96 @@
import requests
import re
import filetype
from lxml import html
from common.models import SourceSiteEnum
from movies.models import Movie, MovieGenreEnum
from movies.forms import MovieForm
from books.models import Book
from books.forms import BookForm
from music.models import Album, Song
from music.forms import AlbumForm, SongForm
from games.models import Game
from games.forms import GameForm
from django.conf import settings
from PIL import Image
from io import BytesIO
from common.scraper import *
class GoogleBooksScraper(AbstractScraper):
site_name = SourceSiteEnum.GOOGLEBOOKS.value
host = ""
data_class = Book
form_class = BookForm
regex = re.compile(r"https://books\.google\.com/books\?id=([^&#]+)")
def get_effective_url(cls, raw_url):
u = re.match(r"https://books\.google\.com/books\?id=[^&#]+", raw_url)
return u[0] if u else None
def scrape(self, url, response=None):
m = self.regex.match(url)
if m:
api_url = f'{m[1]}'
raise ValueError("not valid url")
b = requests.get(api_url).json()
other = {}
title = b['volumeInfo']['title']
subtitle = b['volumeInfo']['subtitle'] if 'subtitle' in b['volumeInfo'] else None
pub_year = None
pub_month = None
if 'publishedDate' in b['volumeInfo']:
pub_date = b['volumeInfo']['publishedDate']
pub_year = re.sub(r'(\d\d\d\d).+', r'\1', pub_date)
pub_month = re.sub(r'(\d\d\d\d)-(\d+).+', r'\2', pub_date) if len(pub_date) > 5 else None
pub_house = b['volumeInfo']['publisher'] if 'publisher' in b['volumeInfo'] else None
language = b['volumeInfo']['language'] if 'language' in b['volumeInfo'] else None
pages = b['volumeInfo']['pageCount'] if 'pageCount' in b['volumeInfo'] else None
if 'mainCategory' in b['volumeInfo']:
other['分类'] = b['volumeInfo']['mainCategory']
authors = b['volumeInfo']['authors'] if 'authors' in b['volumeInfo'] else None
if 'description' in b['volumeInfo']:
brief = b['volumeInfo']['description']
elif 'textSnippet' in b['volumeInfo']:
brief = b["volumeInfo"]["textSnippet"]["searchInfo"]
brief = ''
brief = re.sub(r'<.*?>', '', brief.replace('<br', '\n<br'))
img_url = b['volumeInfo']['imageLinks']['thumbnail'] if 'imageLinks' in b['volumeInfo'] else None
isbn10 = None
isbn13 = None
for iid in b['volumeInfo']['industryIdentifiers'] if 'industryIdentifiers' in b['volumeInfo'] else []:
if iid['type'] == 'ISBN_10':
isbn10 = iid['identifier']
if iid['type'] == 'ISBN_13':
isbn13 = iid['identifier']
isbn = isbn13 if isbn13 is not None else isbn10
data = {
'title': title,
'subtitle': subtitle,
'orig_title': None,
'author': authors,
'translator': None,
'language': language,
'pub_house': pub_house,
'pub_year': pub_year,
'pub_month': pub_month,
'binding': None,
'pages': pages,
'isbn': isbn,
'brief': brief,
'contents': None,
'other_info': other,
'cover_url': img_url,
'source_site': self.site_name,
'source_url': self.get_effective_url(url),
raw_img, ext = self.download_image(img_url, url)
self.raw_data, self.raw_img, self.img_ext = data, raw_img, ext
return data, raw_img

common/scrapers/ Normal file
View file

@ -0,0 +1,108 @@
import requests
import re
from common.models import SourceSiteEnum
from movies.forms import MovieForm
from movies.models import Movie
from django.conf import settings
from common.scraper import *
class ImdbMovieScraper(AbstractScraper):
site_name = SourceSiteEnum.IMDB.value
host = ''
data_class = Movie
form_class = MovieForm
regex = re.compile(r"(?<=https://www\.imdb\.com/title/)[a-zA-Z0-9]+")
def scrape(self, url):
effective_url = self.get_effective_url(url)
if effective_url is None:
raise ValueError("not valid url")
api_url = self.get_api_url(effective_url)
r = requests.get(api_url)
res_data = r.json()
if not res_data['type'] in ['Movie', 'TVSeries']:
raise ValueError("not movie/series item")
if res_data['type'] == 'Movie':
is_series = False
elif res_data['type'] == 'TVSeries':
is_series = True
title = res_data['title']
orig_title = res_data['originalTitle']
imdb_code = self.regex.findall(effective_url)[0]
director = []
for direct_dict in res_data['directorList']:
playwright = []
for writer_dict in res_data['writerList']:
actor = []
for actor_dict in res_data['actorList']:
genre = res_data['genres'].split(', ')
area = res_data['countries'].split(', ')
language = res_data['languages'].split(', ')
year = int(res_data['year'])
duration = res_data['runtimeStr']
brief = res_data['plotLocal'] if res_data['plotLocal'] else res_data['plot']
if res_data['releaseDate']:
showtime = [{res_data['releaseDate']: "发布日期"}]
showtime = None
other_info = {}
if res_data['contentRating']:
other_info['分级'] = res_data['contentRating']
if res_data['imDbRating']:
other_info['IMDb评分'] = res_data['imDbRating']
if res_data['metacriticRating']:
other_info['Metacritic评分'] = res_data['metacriticRating']
if res_data['awards']:
other_info['奖项'] = res_data['awards']
raw_img, ext = self.download_image(res_data['image'], url)
data = {
'title': title,
'orig_title': orig_title,
'other_title': None,
'imdb_code': imdb_code,
'director': director,
'playwright': playwright,
'actor': actor,
'genre': genre,
'showtime': showtime,
'site': None,
'area': area,
'language': language,
'year': year,
'duration': duration,
'season': None,
'episodes': None,
'single_episode_length': None,
'brief': brief,
'is_series': is_series,
'other_info': other_info,
'source_site': self.site_name,
'source_url': effective_url,
self.raw_data, self.raw_img, self.img_ext = data, raw_img, ext
return data, raw_img
def get_effective_url(cls, raw_url):
code = cls.regex.findall(raw_url)
if code:
return f"{code[0]}/"
return None
def get_api_url(cls, url):
return f"{settings.IMDB_API_KEY}/{cls.regex.findall(url)[0]}/FullActor,"

common/scrapers/ Normal file
View file

@ -0,0 +1,284 @@
import requests
import re
import time
from common.models import SourceSiteEnum
from music.models import Album, Song
from music.forms import AlbumForm, SongForm
from django.conf import settings
from common.scraper import *
spotify_token = None
spotify_token_expire_time = time.time()
class SpotifyTrackScraper(AbstractScraper):
site_name = SourceSiteEnum.SPOTIFY.value
host = ''
data_class = Song
form_class = SongForm
regex = re.compile(r"(?<=https://open\.spotify\.com/track/)[a-zA-Z0-9]+")
def scrape(self, url):
Request from API, not really scraping
global spotify_token, spotify_token_expire_time
if spotify_token is None or is_spotify_token_expired():
effective_url = self.get_effective_url(url)
if effective_url is None:
raise ValueError("not valid url")
api_url = self.get_api_url(effective_url)
headers = {
'Authorization': f"Bearer {spotify_token}"
r = requests.get(api_url, headers=headers)
res_data = r.json()
artist = []
for artist_dict in res_data['artists']:
if not artist:
artist = None
title = res_data['name']
release_date = parse_date(res_data['album']['release_date'])
duration = res_data['duration_ms']
if res_data['external_ids'].get('isrc'):
isrc = res_data['external_ids']['isrc']
isrc = None
raw_img, ext = self.download_image(res_data['album']['images'][0]['url'], url)
data = {
'title': title,
'artist': artist,
'genre': None,
'release_date': release_date,
'duration': duration,
'isrc': isrc,
'album': None,
'brief': None,
'other_info': None,
'source_site': self.site_name,
'source_url': effective_url,
self.raw_data, self.raw_img, self.img_ext = data, raw_img, ext
return data, raw_img
def get_effective_url(cls, raw_url):
code = cls.regex.findall(raw_url)
if code:
return f"{code[0]}"
return None
def get_api_url(cls, url):
return "" + cls.regex.findall(url)[0]
class SpotifyAlbumScraper(AbstractScraper):
site_name = SourceSiteEnum.SPOTIFY.value
host = ''
data_class = Album
form_class = AlbumForm
regex = re.compile(r"(?<=https://open\.spotify\.com/album/)[a-zA-Z0-9]+")
def scrape(self, url):
Request from API, not really scraping
global spotify_token, spotify_token_expire_time
if spotify_token is None or is_spotify_token_expired():
effective_url = self.get_effective_url(url)
if effective_url is None:
raise ValueError("not valid url")
api_url = self.get_api_url(effective_url)
headers = {
'Authorization': f"Bearer {spotify_token}"
r = requests.get(api_url, headers=headers)
res_data = r.json()
artist = []
for artist_dict in res_data['artists']:
title = res_data['name']
genre = ', '.join(res_data['genres'])
company = []
for com in res_data['copyrights']:
duration = 0
track_list = []
track_urls = []
for track in res_data['tracks']['items']:
duration += track['duration_ms']
if res_data['tracks']['items'][-1]['disc_number'] > 1:
# more than one disc
track['disc_number']) + '-' + str(track['track_number']) + '. ' + track['name'])
track_list.append(str(track['track_number']) + '. ' + track['name'])
track_list = '\n'.join(track_list)
release_date = parse_date(res_data['release_date'])
other_info = {}
if res_data['external_ids'].get('upc'):
# bar code
other_info['UPC'] = res_data['external_ids']['upc']
raw_img, ext = self.download_image(res_data['images'][0]['url'], url)
data = {
'title': title,
'artist': artist,
'genre': genre,
'track_list': track_list,
'release_date': release_date,
'duration': duration,
'company': company,
'brief': None,
'other_info': other_info,
'source_site': self.site_name,
'source_url': effective_url,
# set tracks_data, used for adding tracks
self.track_urls = track_urls
self.raw_data, self.raw_img, self.img_ext = data, raw_img, ext
return data, raw_img
def get_effective_url(cls, raw_url):
code = cls.regex.findall(raw_url)
if code:
return f"{code[0]}"
return None
def save(cls, request_user):
form = super().save(request_user)
task = Thread(
args=(form.instance, request_user),
return form
def get_api_url(cls, url):
return "" + cls.regex.findall(url)[0]
def add_tracks(cls, album: Album, request_user):
to_be_updated_tracks = []
for track_url in cls.track_urls:
track = cls.get_track_or_none(track_url)
# seems lik if fire too many requests at the same time
# spotify would limit access
if track is None:
task = Thread(
args=(track_url, album, request_user),
cls.bulk_update_track_album(to_be_updated_tracks, album, request_user)
def get_track_or_none(cls, track_url: str):
instance = Song.objects.get(source_url=track_url)
return instance
except ObjectDoesNotExist:
return None
def scrape_and_save_track(cls, url: str, album: Album, request_user):
data, img = SpotifyTrackScraper.scrape(url)
SpotifyTrackScraper.raw_data['album'] = album
def bulk_update_track_album(cls, tracks, album, request_user):
for track in tracks:
track.last_editor = request_user
track.edited_time =
track.album = album
Song.objects.bulk_update(tracks, [
def get_spotify_token():
global spotify_token, spotify_token_expire_time
if spotify_token is None or is_spotify_token_expired():
return spotify_token
def is_spotify_token_expired():
global spotify_token_expire_time
return True if spotify_token_expire_time <= time.time() else False
def invoke_spotify_token():
global spotify_token, spotify_token_expire_time
r =
"grant_type": "client_credentials"
"Authorization": f"Basic {settings.SPOTIFY_CREDENTIAL}"
data = r.json()
if r.status_code == 401:
# token expired, try one more time
# this maybe caused by external operations,
# for example debugging using a http client
r =
"grant_type": "client_credentials"
"Authorization": f"Basic {settings.SPOTIFY_CREDENTIAL}"
data = r.json()
elif r.status_code != 200:
raise Exception(f"Request to spotify API fails. Reason: {r.reason}")
# minus 2 for execution time error
spotify_token_expire_time = int(data['expires_in']) + time.time() - 2
spotify_token = data['access_token']

common/scrapers/ Normal file
View file

@ -0,0 +1,63 @@
import re
from common.models import SourceSiteEnum
from games.models import Game
from games.forms import GameForm
from common.scraper import *
class SteamGameScraper(AbstractScraper):
site_name = SourceSiteEnum.STEAM.value
host = ''
data_class = Game
form_class = GameForm
regex = re.compile(r"https://store\.steampowered\.com/app/\d+/{0,1}")
def scrape(self, url):
headers['Host'] =
headers['Cookie'] = "wants_mature_content=1; birthtime=754700401;"
content = self.download_page(url, headers)
title = content.xpath("//div[@class='apphub_AppName']/text()")[0]
developer = content.xpath("//div[@id='developers_list']/a/text()")
publisher = content.xpath("//div[@class='glance_ctn']//div[@class='dev_row'][2]//a/text()")
release_date = parse_date(
genre = content.xpath(
platform = ['PC']
brief = content.xpath(
img_url = content.xpath(
)[0].replace("header.jpg", "library_600x900.jpg")
raw_img, ext = self.download_image(img_url, url)
# no 600x900 picture
if raw_img is None:
img_url = content.xpath("//img[@class='game_header_image_full']/@src")[0]
raw_img, ext = self.download_image(img_url, url)
data = {
'title': title,
'other_title': None,
'developer': developer,
'publisher': publisher,
'release_date': release_date,
'genre': genre,
'platform': platform,
'brief': brief,
'other_info': None,
'source_site': self.site_name,
'source_url': self.get_effective_url(url),
self.raw_data, self.raw_img, self.img_ext = data, raw_img, ext
return data, raw_img

common/scrapers/ Normal file
View file

@ -0,0 +1,137 @@
import requests
import re
from common.models import SourceSiteEnum
from movies.models import Movie
from movies.forms import MovieForm
from django.conf import settings
from common.scraper import *
class TmdbMovieScraper(AbstractScraper):
site_name = SourceSiteEnum.TMDB.value
host = ''
data_class = Movie
form_class = MovieForm
regex = re.compile(r"https://www\.themoviedb\.org/(movie|tv)/([a-zA-Z0-9]+)")
genre_map = {
'Sci-Fi & Fantasy': 'Sci-Fi',
'War & Politics': 'War',
'儿童': 'Kids',
'冒险': 'Adventure',
'剧情': 'Drama',
'动作': 'Action',
'动作冒险': 'Action',
'动画': 'Animation',
'历史': 'History',
'喜剧': 'Comedy',
'奇幻': 'Fantasy',
'家庭': 'Family',
'恐怖': 'Horror',
'悬疑': 'Mystery',
'惊悚': 'Thriller',
'战争': 'War',
'新闻': 'News',
'爱情': 'Romance',
'犯罪': 'Crime',
'电视电影': 'TV Movie',
'真人秀': 'Reality-TV',
'科幻': 'Sci-Fi',
'纪录': 'Documentary',
'肥皂剧': 'Soap',
'脱口秀': 'Talk-Show',
'西部': 'Western',
'音乐': 'Music',
def scrape(self, url):
m = self.regex.match(url)
if m:
effective_url = m[0]
raise ValueError("not valid url")
effective_url = m[0]
is_series = m[1] == 'tv'
id = m[2]
if is_series:
api_url = f"{id}?api_key={settings.TMDB_API3_KEY}&language=zh-CN&append_to_response=external_ids,credits"
api_url = f"{id}?api_key={settings.TMDB_API3_KEY}&language=zh-CN&append_to_response=external_ids,credits"
r = requests.get(api_url)
res_data = r.json()
if is_series:
title = res_data['name']
orig_title = res_data['original_name']
year = int(res_data['first_air_date'].split('-')[0])
imdb_code = res_data['external_ids']['imdb_id']
showtime = [{res_data['first_air_date']: "首播日期"}]
duration = None
title = res_data['title']
orig_title = res_data['original_title']
year = int(res_data['release_date'].split('-')[0])
showtime = [{res_data['release_date']: "发布日期"}]
imdb_code = res_data['imdb_id']
duration = res_data['runtime'] # in minutes
genre = list(map(lambda x: self.genre_map[x['name']] if x['name'] in self.genre_map else 'Other', res_data['genres']))
language = list(map(lambda x: x['name'], res_data['spoken_languages']))
brief = res_data['overview']
if is_series:
director = list(map(lambda x: x['name'], res_data['created_by']))
director = list(map(lambda x: x['name'], filter(lambda c: c['job'] == 'Director', res_data['credits']['crew'])))
playwright = list(map(lambda x: x['name'], filter(lambda c: c['job'] == 'Screenplay', res_data['credits']['crew'])))
actor = list(map(lambda x: x['name'], res_data['credits']['cast']))
area = []
other_info = {}
other_info['TMDB评分'] = res_data['vote_average']
# other_info['分级'] = res_data['contentRating']
# other_info['Metacritic评分'] = res_data['metacriticRating']
# other_info['奖项'] = res_data['awards']
other_info['TMDB_ID'] = id
if is_series:
other_info['Seasons'] = res_data['number_of_seasons']
other_info['Episodes'] = res_data['number_of_episodes']
img_url = '' + res_data['poster_path'] # TODO: use GET /configuration to get base url
raw_img, ext = self.download_image(img_url, url)
data = {
'title': title,
'orig_title': orig_title,
'other_title': None,
'imdb_code': imdb_code,
'director': director,
'playwright': playwright,
'actor': actor,
'genre': genre,
'showtime': showtime,
'site': None,
'area': area,
'language': language,
'year': year,
'duration': duration,
'season': None,
'episodes': None,
'single_episode_length': None,
'brief': brief,
'is_series': is_series,
'other_info': other_info,
'source_site': self.site_name,
'source_url': effective_url,
self.raw_data, self.raw_img, self.img_ext = data, raw_img, ext
return data, raw_img
def get_effective_url(cls, raw_url):
m = cls.regex.match(raw_url)
if raw_url:
return m[0]
return None

View file

@ -2,7 +2,8 @@ from urllib.parse import quote_plus
from enum import Enum
from common.models import SourceSiteEnum
from django.conf import settings
from common.scraper import GoodreadsScraper, get_spotify_token
from common.scrapers.goodreads import GoodreadsScraper
from common.scrapers.spotify import get_spotify_token
import requests
from lxml import html
import logging