lib.itmens/catalog/sites/bangumi.py
2025-02-09 05:20:22 +00:00

362 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from collections import OrderedDict
from typing import Any
import httpx
from django.conf import settings
from loguru import logger
from catalog.book.utils import detect_isbn_asin
from catalog.common import *
from catalog.game.models import GameReleaseType
from catalog.models import *
from common.models.lang import detect_language
_logger = logging.getLogger(__name__)
@SiteManager.register
class Bangumi(AbstractSite):
SITE_NAME = SiteName.Bangumi
ID_TYPE = IdType.Bangumi
URL_PATTERNS = [
r"https://bgm\.tv/subject/(\d+)",
r"https://bangumi\.tv/subject/(\d+)",
r"https://chii\.in/subject/(\d+)",
]
WIKI_PROPERTY_ID = ""
DEFAULT_MODEL = None
@classmethod
def get_category(
cls, o: dict[str, Any], fetch_resources: bool = False
) -> tuple[ItemCategory, dict[str, Any]]:
dt = o.get("date")
pub_year = None
pub_month = None
release_date = None
release_type = None
showtime = None
year = None
related_resources = []
match o["type"]:
case 1:
model = "Edition"
category = ItemCategory.Book
if o["series"] and fetch_resources:
# model = "Series" TODO
res = (
BasicDownloader(
f"https://api.bgm.tv/v0/subjects/{o['id']}/subjects",
headers={
"User-Agent": settings.NEODB_USER_AGENT,
},
)
.download()
.json()
)
for s in res:
if s["relation"] != "单行本":
continue
related_resources.append(
{
"url": cls.id_to_url(s["id"]),
}
)
if dt:
d = dt.split("-")
pub_year = d[0]
pub_month = d[1]
case 2 | 6:
is_season = o["platform"] in {
"TV",
"OVA", # may be movie in other sites
"WEB",
"电视剧",
"欧美剧",
"日剧",
"华语剧",
"综艺",
}
category = ItemCategory.TV if is_season else ItemCategory.Movie
model = "TVSeason" if is_season else "Movie"
if "舞台剧" in [
t["name"] for t in o["tags"]
]: # 只能这样判断舞台剧了bangumi三次元分类太少
category = ItemCategory.Performance
model = "Performance"
if dt:
year = dt.split("-")[0]
showtime = [
{"time": dt, "region": "首播日期" if is_season else "发布日期"}
]
case 3:
model = "Album"
category = ItemCategory.Music
case 4:
model = "Game"
category = ItemCategory.Game
match o["platform"]:
case "游戏":
release_type = GameReleaseType.GAME
case "扩展包":
release_type = GameReleaseType.DLC
if dt:
release_date = dt
case _:
raise ValueError(
f"Unknown type {o['type']} for bangumi subject {o['id']}"
)
return category, {
"preferred_model": model,
"related_resources": related_resources,
"pub_year": pub_year,
"pub_month": pub_month,
"release_date": release_date,
"release_type": release_type,
"showtime": showtime,
"year": year,
}
@classmethod
def id_to_url(cls, id_value):
return f"https://bgm.tv/subject/{id_value}"
@classmethod
async def search_task(
cls, q: str, page: int, category: str, page_size: int
) -> list[ExternalSearchResultItem]:
results = []
bgm_type = {
"all": None,
"movietv": [2, 6],
"movie": [2, 6],
"tv": [2, 6],
"book": [1],
"game": [4],
"performance": [6],
"music": [3],
}
if category not in bgm_type:
return results
search_url = f"https://api.bgm.tv/v0/search/subjects?limit={page_size}&offset={(page - 1) * page_size}"
async with httpx.AsyncClient() as client:
try:
response = await client.post(
search_url,
headers={"User-Agent": settings.NEODB_USER_AGENT},
json={"keyword": q, "filter": {"type": bgm_type[category]}},
timeout=2,
)
if response.status_code == 200:
r = response.json()
for s in r["data"]:
cat, _ = cls.get_category(s)
results.append(
ExternalSearchResultItem(
category=cat,
source_site=cls.SITE_NAME,
source_url=cls.id_to_url(s["id"]),
title=s["name"],
subtitle="",
brief=s.get("summary", ""),
cover_url=s["images"].get("common"),
)
)
except httpx.ReadTimeout:
logger.warning("Bangumi search timeout", extra={"query": q})
except Exception as e:
logger.error("Bangumi search error", extra={"query": q, "exception": e})
return results
def scrape(self):
api_url = f"https://api.bgm.tv/v0/subjects/{self.id_value}"
o = (
BasicDownloader(
api_url,
headers={
"User-Agent": settings.NEODB_USER_AGENT,
},
)
.download()
.json()
)
category, data = self.get_category(o, True)
title = o.get("name_cn") or o.get("name")
orig_title = o.get("name") if o.get("name") != title else None
brief = o.get("summary")
episodes = o.get("total_episodes", 0)
genre = None
platform = None
other_title = []
imdb_code = None
isbn_type = None
isbn = None
language = None
pub_house = None
orig_creator = None
authors = []
site = None
director = None
playwright = None
actor = None
pages = None
price = None
opening_date = None
closing_date = None
location = None
for i in o.get("infobox", []):
k = i["key"].lower()
v = i["value"]
match k:
case "别名":
other_title = (
[d["v"] for d in v]
if isinstance(v, list)
else ([v] if isinstance(v, str) else [])
)
case "话数":
try:
episodes = int(v)
except ValueError:
pass
case "imdb_id":
imdb_code = v
case "isbn":
isbn_type, isbn = detect_isbn_asin(v)
case "语言":
language = v
case "出版社":
pub_house = v
case "导演":
director = v
case "编剧" | "脚本":
playwright = (
[d["v"] for d in v]
if isinstance(v, list)
else ([v] if isinstance(v, str) else [])
)
case "原作":
match category:
case ItemCategory.Book:
authors.append(v)
case ItemCategory.Performance:
orig_creator = (
[d["v"] for d in v]
if isinstance(v, list)
else ([v] if isinstance(v, str) else [])
)
case "作画":
authors.append(v)
case "作者":
authors.extend(
[d["v"] for d in v]
if isinstance(v, list)
else ([v] if isinstance(v, str) else [])
)
case "平台":
platform = (
[d["v"] for d in v]
if isinstance(v, list)
else ([v] if isinstance(v, str) else [])
)
case "游戏类型" | "类型":
genre = (
[d["v"] for d in v]
if isinstance(v, list)
else ([v] if isinstance(v, str) else [])
)
case "官方网站" | "website":
site = (
v[0]["v"]
if isinstance(v, list)
else (v if isinstance(v, str) else None)
)
case "页数":
pages = v
case "价格":
price = v
case "开始":
opening_date = v
case "结束":
closing_date = v
case "演出":
if category == ItemCategory.Performance:
director = v
case "主演":
if category == ItemCategory.Performance:
actor = (
[{"name": d["v"], "role": None} for d in v]
if isinstance(v, list)
else (
[{"name": w, "role": None} for w in v.split("")]
if isinstance(v, str)
else []
)
)
case "会场" | "演出地点":
location = v
img_url = o["images"].get("large") or o["images"].get("common")
raw_img = None
ext = None
if img_url:
raw_img, ext = BasicImageDownloader.download_image(
img_url, None, headers={}
)
titles = OrderedDict.fromkeys(
[title] + (other_title or []) + ([orig_title] if orig_title else [])
)
if o.get("name_cn"):
titles[o.get("name_cn")] = "zh-cn"
localized_title = [
{"lang": lang or detect_language(t), "text": t}
for t, lang in titles.items()
]
localized_desc = (
[{"lang": detect_language(brief), "text": brief}] if brief else []
)
data.update(
{
"localized_title": localized_title,
"localized_description": localized_desc,
"title": title,
"orig_title": orig_title,
"other_title": other_title or None,
"orig_creator": orig_creator,
"author": authors,
"genre": genre,
"translator": None,
"director": director,
"playwright": playwright,
"actor": actor,
"language": language,
"platform": platform,
"imdb_code": imdb_code,
"pub_house": pub_house,
"binding": None,
"episode_count": episodes or None,
"official_site": site,
"site": site,
"isbn": isbn,
"brief": brief,
"cover_image_url": img_url,
"pages": pages,
"price": price,
"opening_date": opening_date,
"closing_date": closing_date,
"location": location,
}
)
lookup_ids = {}
if isbn:
lookup_ids[isbn_type] = isbn
if imdb_code:
lookup_ids[IdType.IMDB] = imdb_code
return ResourceContent(
metadata={k: v for k, v in data.items() if v is not None},
cover_image=raw_img,
cover_image_extention=ext,
lookup_ids=lookup_ids,
)