@ -554,9 +554,9 @@ def click_to_scrape(request):
if request.method == "POST":
url = request.POST.get("url")
if url:
from common.scraper import scrape_douban_book
from common.scraper import DoubanBookScraper
scraped_book, raw_cover = scrape_douban_book(url)
scraped_book, raw_cover = DoubanBookScraper.scrape(url)
except TimeoutError:
return render(request, 'common/error.html', {'msg': _("爬取数据失败😫,请重试")})
except ValueError:
@ -7,10 +7,15 @@ from django.db.models import Q
from markdownx.models import MarkdownxField
from users.models import User
from mastodon.api import get_relationships, get_cross_site_id
from .utils import clean_url
# abstract base classes
class SourceSiteEnum(models.IntegerChoices):
DOUBAN = 1, _("豆瓣")
class Resource(models.Model):
rating_total_score = models.PositiveIntegerField(null=True, blank=True)
@ -24,6 +29,8 @@ class Resource(models.Model):
brief = models.TextField(blank=True, default="")
other_info = postgres.JSONField(
blank=True, null=True, encoder=DjangoJSONEncoder, default=dict)
# source_url = models.URLField(max_length=500)
# source_site = models.SmallIntegerField()
class Meta:
abstract = True
@ -35,7 +42,7 @@ class Resource(models.Model):
def save(self, *args, **kwargs):
""" update rating before save to db """
""" update rating and strip source url scheme & querystring before save to db """
if self.rating_number and self.rating_total_score:
self.rating = Decimal(
str(round(self.rating_total_score / self.rating_number, 1)))
@ -43,6 +50,7 @@ class Resource(models.Model):
self.rating = None
raise IntegrityError()
# self.source = clean_url(self.source)
super().save(*args, **kwargs)
def calculate_rating(self, old_rating, new_rating):
@ -5,13 +5,14 @@ import logging
from lxml import html
import re
from boofilsic.settings import LUMINATI_USERNAME, LUMINATI_PASSWORD, DEBUG
from django.utils.translation import ugettext_lazy as _
from movies.models import MovieGenreEnum
from common.models import SourceSiteEnum
RE_NUMBERS = re.compile(r"\d+\d*")
RE_WHITESPACES = re.compile(r"\s+")
# without slash at the end
RE_DOUBAN_BOOK_URL = re.compile(r"\d+")
RE_DOUBAN_MOVIE_URL = re.compile(r"\d+")
'Host': '',
@ -35,6 +36,10 @@ PORT = 22225
logger = logging.getLogger(__name__)
# register all implemented scraper in form of {host: class,}
registry = {}
def log_url(func):
Catch exceptions and log then pass the exceptions.
@ -53,336 +58,405 @@ def log_url(func):
return wrapper
def download_page(url, regex, headers):
url = regex.findall(url)
if not url:
raise ValueError("not valid url")
url = url[0] + '/'
class AbstractScraper:
session_id = random.random()
proxy_url = ('' %
proxies = {
'http': proxy_url,
'https': proxy_url,
# if DEBUG:
# proxies = None
r = requests.get(url, proxies=proxies, headers=headers, timeout=TIMEOUT)
# r = requests.get(url, headers=DEFAULT_REQUEST_HEADERS, timeout=TIMEOUT)
# subclasses must specify those two variables
site = None
host = None
return html.fromstring(r.content.decode('utf-8'))
def __init_subclass__(cls, **kwargs):
# this statement initialize the subclasses
assert is not None, "class variable `site` must be specified"
assert is not None, "class variable `host` must be specified"
assert isinstance(, str), "`host` must be type str"
assert isinstance(, int), "`site` must be type int"
assert hasattr(cls, 'scrape') and callable(cls.scrape), "scaper must have method `.scrape()`"
# decorate the scrape method
cls.scrape = classmethod(log_url(cls.scrape))
registry[] = cls
def download_image(url):
if url is None:
raw_img = None
session_id = random.random()
proxy_url = ('' %
proxies = {
'http': proxy_url,
'https': proxy_url,
# if DEBUG:
# proxies = None
if url:
img_response = requests.get(
'accept': 'image/webp,image/apng,image/*,*/*;q=0.8',
'accept-encoding': 'gzip, deflate',
'accept-language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7,fr-FR;q=0.6,fr;q=0.5,zh-TW;q=0.4',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36 Edg/81.0.416.72',
'cache-control': 'no-cache',
'dnt': '1',
if img_response.status_code == 200:
raw_img = img_response.content
return raw_img
def scrape(self, url):
Scrape/request model schema specified data from given url and return it.
Implementations of subclasses to this method would be decorated as class method.
raise NotImplementedError("Subclass should implement this method")
def scrape_douban_book(url):
headers['Host'] = ''
content = download_page(url, regex, headers)
# parsing starts here
title = content.xpath("/html/body//h1/span/text()")[0].strip()
except IndexError:
raise ValueError("given url contains no book info")
subtitle_elem = content.xpath("//div[@id='info']//span[text()='副标题:']/following::text()")
subtitle = subtitle_elem[0].strip() if subtitle_elem else None
orig_title_elem = content.xpath("//div[@id='info']//span[text()='原作名:']/following::text()")
orig_title = orig_title_elem[0].strip() if orig_title_elem else None
language_elem = content.xpath("//div[@id='info']//span[text()='语言:']/following::text()")
language = language_elem[0].strip() if language_elem else None
pub_house_elem = content.xpath("//div[@id='info']//span[text()='出版社:']/following::text()")
pub_house = pub_house_elem[0].strip() if pub_house_elem else None
pub_date_elem = content.xpath("//div[@id='info']//span[text()='出版年:']/following::text()")
pub_date = pub_date_elem[0].strip() if pub_date_elem else ''
year_month_day = RE_NUMBERS.findall(pub_date)
if len(year_month_day) in (2, 3):
pub_year = int(year_month_day[0])
pub_month = int(year_month_day[1])
elif len(year_month_day) == 1:
pub_year = int(year_month_day[0])
pub_month = None
pub_year = None
pub_month = None
if pub_year and pub_month and pub_year < pub_month:
pub_year, pub_month = pub_month, pub_year
pub_year = None if pub_year is not None and not pub_year in range(0, 3000) else pub_year
pub_month = None if pub_month is not None and not pub_month in range(1, 12) else pub_month
binding_elem = content.xpath("//div[@id='info']//span[text()='装帧:']/following::text()")
binding = binding_elem[0].strip() if binding_elem else None
price_elem = content.xpath("//div[@id='info']//span[text()='定价:']/following::text()")
price = price_elem[0].strip() if price_elem else None
pages_elem = content.xpath("//div[@id='info']//span[text()='页数:']/following::text()")
pages = pages_elem[0].strip() if pages_elem else None
if pages is not None:
pages = int(RE_NUMBERS.findall(pages)[0]) if RE_NUMBERS.findall(pages) else None
isbn_elem = content.xpath("//div[@id='info']//span[text()='ISBN:']/following::text()")
isbn = isbn_elem[0].strip() if isbn_elem else None
brief_elem = content.xpath("//h2/span[text()='内容简介']/../following-sibling::div[1]//div[@class='intro'][not(ancestor::span[@class='short'])]/p/text()")
brief = '\n'.join(p.strip() for p in brief_elem) if brief_elem else None
contents = None
contents_elem = content.xpath("//h2/span[text()='目录']/../following-sibling::div[1]")[0]
# if next the id of next sibling contains `dir`, that would be the full contents
if "dir" in contents_elem.getnext().xpath("@id")[0]:
contents_elem = contents_elem.getnext()
contents = '\n'.join(p.strip() for p in contents_elem.xpath("text()")[:-2]) if contents_elem else None
def download_page(cls, url, regex, headers):
url = regex.findall(url)
if not url:
raise ValueError("not valid url")
contents = '\n'.join(p.strip() for p in contents_elem.xpath("text()")) if contents_elem else None
url = url[0] + '/'
img_url_elem = content.xpath("//*[@id='mainpic']/a/img/@src")
img_url = img_url_elem[0].strip() if img_url_elem else None
raw_img = download_image(img_url)
session_id = random.random()
proxy_url = ('' %
proxies = {
'http': proxy_url,
'https': proxy_url,
# if DEBUG:
# proxies = None
r = requests.get(url, proxies=proxies, headers=headers, timeout=TIMEOUT)
# r = requests.get(url, headers=DEFAULT_REQUEST_HEADERS, timeout=TIMEOUT)
# there are two html formats for authors and translators
authors_elem = content.xpath("""//div[@id='info']//span[text()='作者:']/following-sibling::br[1]/
if not authors_elem:
authors_elem = content.xpath("""//div[@id='info']//span[text()=' 作者']/following-sibling::a/text()""")
if authors_elem:
authors = []
for author in authors_elem:
authors.append(RE_WHITESPACES.sub(' ', author.strip()))
authors = None
return html.fromstring(r.content.decode('utf-8'))
translators_elem = content.xpath("""//div[@id='info']//span[text()='译者:']/following-sibling::br[1]/
if not translators_elem:
translators_elem = content.xpath("""//div[@id='info']//span[text()=' 译者']/following-sibling::a/text()""")
if translators_elem:
translators = []
for translator in translators_elem:
translators.append(RE_WHITESPACES.sub(' ', translator.strip()))
translators = None
other = {}
cncode_elem = content.xpath("//div[@id='info']//span[text()='统一书号:']/following::text()")
if cncode_elem:
other['统一书号'] = cncode_elem[0].strip()
series_elem = content.xpath("//div[@id='info']//span[text()='丛书:']/following-sibling::a[1]/text()")
if series_elem:
other['丛书'] = series_elem[0].strip()
imprint_elem = content.xpath("//div[@id='info']//span[text()='出品方:']/following-sibling::a[1]/text()")
if imprint_elem:
other['出品方'] = imprint_elem[0].strip()
data = {
'title' : title,
'subtitle' : subtitle,
'orig_title' : orig_title,
'author' : authors,
'translator' : translators,
'language' : language,
'pub_house' : pub_house,
'pub_year' : pub_year,
'pub_month' : pub_month,
'binding' : binding,
'price' : price,
'pages' : pages,
'isbn' : isbn,
'brief' : brief,
'contents' : contents,
'other_info' : other
return data, raw_img
def download_image(cls, url):
if url is None:
raw_img = None
session_id = random.random()
proxy_url = ('' %
proxies = {
'http': proxy_url,
'https': proxy_url,
# if DEBUG:
# proxies = None
if url:
img_response = requests.get(
'accept': 'image/webp,image/apng,image/*,*/*;q=0.8',
'accept-encoding': 'gzip, deflate',
'accept-language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7,fr-FR;q=0.6,fr;q=0.5,zh-TW;q=0.4',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36 Edg/81.0.416.72',
'cache-control': 'no-cache',
'dnt': '1',
if img_response.status_code == 200:
raw_img = img_response.content
return raw_img
def scrape_douban_movie(url):
headers['Host'] = ''
content = download_page(url, regex, headers)
class DoubanBookScraper(AbstractScraper):
site = SourceSiteEnum.DOUBAN.value
host = ""
regex = re.compile(r"\d+")
# parsing starts here
raw_title = content.xpath(
except IndexError:
raise ValueError("given url contains no movie info")
orig_title = content.xpath(
title = raw_title.split(orig_title)[0].strip()
# if has no chinese title
if title == '':
title = orig_title
def scrape(self, url):
regex = self.regex
headers['Host'] =
content = self.download_page(url, regex, headers)
if title == orig_title:
orig_title = None
# parsing starts here
title = content.xpath("/html/body//h1/span/text()")[0].strip()
except IndexError:
raise ValueError("given url contains no book info")
# there are two html formats for authors and translators
other_title_elem = content.xpath(
other_title = other_title_elem[0].strip().split(' / ') if other_title_elem else None
subtitle_elem = content.xpath(
subtitle = subtitle_elem[0].strip() if subtitle_elem else None
imdb_elem = content.xpath(
imdb_code = imdb_elem[0].strip() if imdb_elem else None
orig_title_elem = content.xpath(
orig_title = orig_title_elem[0].strip() if orig_title_elem else None
director_elem = content.xpath("//div[@id='info']//span[text()='导演']/following-sibling::span[1]/a/text()")
director = director_elem if director_elem else None
language_elem = content.xpath(
language = language_elem[0].strip() if language_elem else None
playwright_elem = content.xpath("//div[@id='info']//span[text()='编剧']/following-sibling::span[1]/a/text()")
playwright = playwright_elem if playwright_elem else None
pub_house_elem = content.xpath(
pub_house = pub_house_elem[0].strip() if pub_house_elem else None
actor_elem = content.xpath("//div[@id='info']//span[text()='主演']/following-sibling::span[1]/a/text()")
actor = actor_elem if actor_elem else None
pub_date_elem = content.xpath(
pub_date = pub_date_elem[0].strip() if pub_date_elem else ''
year_month_day = RE_NUMBERS.findall(pub_date)
if len(year_month_day) in (2, 3):
pub_year = int(year_month_day[0])
pub_month = int(year_month_day[1])
elif len(year_month_day) == 1:
pub_year = int(year_month_day[0])
pub_month = None
pub_year = None
pub_month = None
if pub_year and pub_month and pub_year < pub_month:
pub_year, pub_month = pub_month, pub_year
pub_year = None if pub_year is not None and not pub_year in range(
0, 3000) else pub_year
pub_month = None if pub_month is not None and not pub_month in range(
1, 12) else pub_month
# construct genre translator
genre_translator = {}
attrs = [attr for attr in dir(MovieGenreEnum) if not '__' in attr]
for attr in attrs:
genre_translator[getattr(MovieGenreEnum, attr).label] = getattr(
MovieGenreEnum, attr).value
binding_elem = content.xpath(
binding = binding_elem[0].strip() if binding_elem else None
genre_elem = content.xpath("//span[@property='v:genre']/text()")
if genre_elem:
genre = []
for g in genre_elem:
genre = None
price_elem = content.xpath(
price = price_elem[0].strip() if price_elem else None
showtime_elem = content.xpath("//span[@property='v:initialReleaseDate']/text()")
if showtime_elem:
showtime = []
for st in showtime_elem:
parts = st.split('(')
if len(parts) == 1:
time = st.split('(')[0]
region = ''
pages_elem = content.xpath(
pages = pages_elem[0].strip() if pages_elem else None
if pages is not None:
pages = int(RE_NUMBERS.findall(pages)[
0]) if RE_NUMBERS.findall(pages) else None
isbn_elem = content.xpath(
isbn = isbn_elem[0].strip() if isbn_elem else None
brief_elem = content.xpath(
brief = '\n'.join(p.strip() for p in brief_elem) if brief_elem else None
contents = None
contents_elem = content.xpath(
# if next the id of next sibling contains `dir`, that would be the full contents
if "dir" in contents_elem.getnext().xpath("@id")[0]:
contents_elem = contents_elem.getnext()
contents = '\n'.join(p.strip() for p in contents_elem.xpath(
"text()")[:-2]) if contents_elem else None
time = st.split('(')[0]
region = st.split('(')[1][0:-1]
showtime.append({time: region})
showtime = None
site_elem = content.xpath("//div[@id='info']//span[text()='官方网站:']/following-sibling::a[1]/@href")
site = site_elem[0].strip() if site_elem else None
contents = '\n'.join(p.strip() for p in contents_elem.xpath(
"text()")) if contents_elem else None
area_elem = content.xpath("//div[@id='info']//span[text()='制片国家/地区:']/following-sibling::text()[1]")
if area_elem:
area = [a.strip() for a in area_elem[0].split(' / ')]
area = None
img_url_elem = content.xpath("//*[@id='mainpic']/a/img/@src")
img_url = img_url_elem[0].strip() if img_url_elem else None
raw_img = self.download_image(img_url)
language_elem = content.xpath("//div[@id='info']//span[text()='语言:']/following-sibling::text()[1]")
if language_elem:
language = [a.strip() for a in language_elem[0].split(' / ')]
language = None
# there are two html formats for authors and translators
authors_elem = content.xpath("""//div[@id='info']//span[text()='作者:']/following-sibling::br[1]/
if not authors_elem:
authors_elem = content.xpath(
"""//div[@id='info']//span[text()=' 作者']/following-sibling::a/text()""")
if authors_elem:
authors = []
for author in authors_elem:
authors.append(RE_WHITESPACES.sub(' ', author.strip()))
authors = None
year_elem = content.xpath("//span[@class='year']/text()")
year = int(year_elem[0][1:-1]) if year_elem else None
translators_elem = content.xpath("""//div[@id='info']//span[text()='译者:']/following-sibling::br[1]/
if not translators_elem:
translators_elem = content.xpath(
"""//div[@id='info']//span[text()=' 译者']/following-sibling::a/text()""")
if translators_elem:
translators = []
for translator in translators_elem:
translators.append(RE_WHITESPACES.sub(' ', translator.strip()))
translators = None
duration_elem = content.xpath("//span[@property='v:runtime']/text()")
other_duration_elem = content.xpath("//span[@property='v:runtime']/following-sibling::text()[1]")
if duration_elem:
duration = duration_elem[0].strip()
if other_duration_elem:
duration += other_duration_elem[0].rstrip()
duration = None
other = {}
cncode_elem = content.xpath(
if cncode_elem:
other['统一书号'] = cncode_elem[0].strip()
series_elem = content.xpath(
if series_elem:
other['丛书'] = series_elem[0].strip()
imprint_elem = content.xpath(
if imprint_elem:
other['出品方'] = imprint_elem[0].strip()
data = {
'title': title,
'subtitle': subtitle,
'orig_title': orig_title,
'author': authors,
'translator': translators,
'language': language,
'pub_house': pub_house,
'pub_year': pub_year,
'pub_month': pub_month,
'binding': binding,
'price': price,
'pages': pages,
'isbn': isbn,
'brief': brief,
'contents': contents,
'other_info': other
return data, raw_img
class DoubanMovieScraper(AbstractScraper):
site = SourceSiteEnum.DOUBAN.value
host = ''
regex = re.compile(r"\d+")
def scrape(self, url):
regex = self.regex
headers['Host'] = ''
content = self.download_page(url, regex, headers)
# parsing starts here
raw_title = content.xpath(
except IndexError:
raise ValueError("given url contains no movie info")
orig_title = content.xpath(
title = raw_title.split(orig_title)[0].strip()
# if has no chinese title
if title == '':
title = orig_title
if title == orig_title:
orig_title = None
# there are two html formats for authors and translators
other_title_elem = content.xpath(
other_title = other_title_elem[0].strip().split(
' / ') if other_title_elem else None
imdb_elem = content.xpath(
imdb_code = imdb_elem[0].strip() if imdb_elem else None
director_elem = content.xpath(
director = director_elem if director_elem else None
playwright_elem = content.xpath(
playwright = playwright_elem if playwright_elem else None
actor_elem = content.xpath(
actor = actor_elem if actor_elem else None
# construct genre translator
genre_translator = {}
attrs = [attr for attr in dir(MovieGenreEnum) if not '__' in attr]
for attr in attrs:
genre_translator[getattr(MovieGenreEnum, attr).label] = getattr(
MovieGenreEnum, attr).value
genre_elem = content.xpath("//span[@property='v:genre']/text()")
if genre_elem:
genre = []
for g in genre_elem:
genre = None
showtime_elem = content.xpath(
if showtime_elem:
showtime = []
for st in showtime_elem:
parts = st.split('(')
if len(parts) == 1:
time = st.split('(')[0]
region = ''
time = st.split('(')[0]
region = st.split('(')[1][0:-1]
showtime.append({time: region})
showtime = None
site_elem = content.xpath(
site = site_elem[0].strip() if site_elem else None
area_elem = content.xpath(
if area_elem:
area = [a.strip() for a in area_elem[0].split(' / ')]
area = None
language_elem = content.xpath(
if language_elem:
language = [a.strip() for a in language_elem[0].split(' / ')]
language = None
year_elem = content.xpath("//span[@class='year']/text()")
year = int(year_elem[0][1:-1]) if year_elem else None
duration_elem = content.xpath("//span[@property='v:runtime']/text()")
other_duration_elem = content.xpath(
if duration_elem:
duration = duration_elem[0].strip()
if other_duration_elem:
duration += other_duration_elem[0].rstrip()
duration = None
season_elem = content.xpath("//*[@id='season']/option[@selected='selected']/text()")
episodes_elem = content.xpath(
episodes = int(episodes_elem[0].strip()) if episodes_elem else None
episodes_elem = content.xpath(
episodes = int(episodes_elem[0].strip()) if episodes_elem else None
single_episode_length_elem = content.xpath(
single_episode_length = single_episode_length_elem[0].strip() if single_episode_length_elem else None
single_episode_length_elem = content.xpath(
single_episode_length = single_episode_length_elem[0].strip(
) if single_episode_length_elem else None
# if has field `episodes` not none then must be series
is_series = True if episodes else False
# if has field `episodes` not none then must be series
is_series = True if episodes else False
brief_elem = content.xpath("//span[@class='all hidden']")
if not brief_elem:
brief_elem = content.xpath("//span[@property='v:summary']")
brief = '\n'.join([e.strip() for e in brief_elem[0].xpath(
'./text()')]) if brief_elem else None
img_url_elem = content.xpath("//img[@rel='v:image']/@src")
img_url = img_url_elem[0].strip() if img_url_elem else None
raw_img = self.download_image(img_url)
data = {
'title': title,
'orig_title': orig_title,
'other_title': other_title,
'imdb_code': imdb_code,
'director': director,
'playwright': playwright,
'actor': actor,
'genre': genre,
'showtime': showtime,
'site': site,
'area': area,
'language': language,
'year': year,
'duration': duration,
'season': season,
'episodes': episodes,
'single_episode_length': single_episode_length,
'brief': brief,
'is_series': is_series,
return data, raw_img
@ -9,9 +9,9 @@ register = template.Library()
def strip_scheme(value):
""" Strip the `https://.../` part of urls"""
if value.startswith("https://"):
value = value.replace("https://", '')
value = value.lstrip("https://")
elif value.startswith("http://"):
value = value.replace("http://", '')
value = value.lstrip("http://")
if value.endswith('/'):
value = value[0:-1]
@ -62,4 +62,17 @@ def ChoicesDictGenerator(choices_enum):
for attr in dir(choices_enum):
if not '__' in attr:
choices_dict[getattr(choices_enum, attr).value] = getattr(choices_enum, attr).label
return choices_dict
return choices_dict
def clean_url(url):
strip scheme and querystring of the url.
if url.startswith("https://"):
url = url.lstrip("https://")
elif url.startswith("http://"):
url = url.lstrip("http://")
if url.endswith('/'):
url = url.rstrip("/")
url = url.source.split('?')[0].split('#')[0]
@ -558,9 +558,9 @@ def click_to_scrape(request):
if request.method == "POST":
url = request.POST.get("url")
if url:
from common.scraper import scrape_douban_movie
from common.scraper import DoubanMovieScraper
scraped_movie, raw_cover = scrape_douban_movie(url)
scraped_movie, raw_cover = DoubanMovieScraper.scrape(url)
except TimeoutError:
return render(request, 'common/error.html', {'msg': _("爬取数据失败😫,请重试")})
except ValueError:
