lib.itmens/mastodon/api.py

554 lines
18 KiB
Python
Raw Normal View History

2020-10-22 21:45:05 +02:00
import functools
2023-07-20 21:59:49 -04:00
import html
2023-07-20 22:59:21 -04:00
import random
2022-11-06 01:06:31 +00:00
import re
2023-07-20 22:59:21 -04:00
import string
from urllib.parse import quote
2023-11-20 01:59:26 -05:00
import django_rq
2023-07-20 22:59:21 -04:00
import requests
from django.conf import settings
2023-07-09 02:12:28 -04:00
from loguru import logger
2020-10-22 21:45:05 +02:00
2023-07-20 22:59:21 -04:00
from mastodon.utils import rating_to_emoji
from .models import MastodonApplication
2020-10-22 21:45:05 +02:00
# See https://docs.joinmastodon.org/methods/accounts/
# returns user info
# retruns the same info as verify account credentials
# GET
2022-12-29 14:30:31 -05:00
API_GET_ACCOUNT = "/api/v1/accounts/:id"
2020-10-22 21:45:05 +02:00
# returns user info if valid, 401 if invalid
# GET
2022-12-29 14:30:31 -05:00
API_VERIFY_ACCOUNT = "/api/v1/accounts/verify_credentials"
2020-10-22 21:45:05 +02:00
# obtain token
# GET
2022-12-29 14:30:31 -05:00
API_OBTAIN_TOKEN = "/oauth/token"
2020-10-22 21:45:05 +02:00
# obatin auth code
# GET
2022-12-29 14:30:31 -05:00
API_OAUTH_AUTHORIZE = "/oauth/authorize"
2020-10-22 21:45:05 +02:00
# revoke token
# POST
2022-12-29 14:30:31 -05:00
API_REVOKE_TOKEN = "/oauth/revoke"
2020-10-22 21:45:05 +02:00
# relationships
# GET
2022-12-29 14:30:31 -05:00
API_GET_RELATIONSHIPS = "/api/v1/accounts/relationships"
2020-10-22 21:45:05 +02:00
# toot
# POST
2022-12-29 14:30:31 -05:00
API_PUBLISH_TOOT = "/api/v1/statuses"
2020-10-22 21:45:05 +02:00
# create new app
# POST
2022-12-29 14:30:31 -05:00
API_CREATE_APP = "/api/v1/apps"
2020-10-22 21:45:05 +02:00
# search
# GET
2022-12-29 14:30:31 -05:00
API_SEARCH = "/api/v2/search"
2020-10-22 21:45:05 +02:00
USER_AGENT = f"NeoDB/{settings.NEODB_VERSION} (+{settings.SITE_INFO.get('site_url', 'undefined')})"
2022-07-06 18:53:34 -04:00
get = functools.partial(requests.get, timeout=settings.MASTODON_TIMEOUT)
2022-11-06 01:06:31 +00:00
put = functools.partial(requests.put, timeout=settings.MASTODON_TIMEOUT)
post = functools.partial(requests.post, timeout=settings.MASTODON_TIMEOUT)
2020-10-22 21:45:05 +02:00
def get_api_domain(domain):
app = MastodonApplication.objects.filter(domain_name=domain).first()
return app.api_domain if app and app.api_domain else domain
2020-10-22 21:45:05 +02:00
# low level api below
2023-08-20 21:46:53 +00:00
def boost_toot(site, token, toot_url):
domain = get_api_domain(site)
headers = {
"User-Agent": USER_AGENT,
"Authorization": f"Bearer {token}",
}
url = (
"https://"
+ domain
+ API_SEARCH
+ "?type=statuses&resolve=true&q="
+ quote(toot_url)
)
try:
response = get(url, headers=headers)
if response.status_code != 200:
logger.error(f"Error search {toot_url} on {domain} {response.status_code}")
return None
j = response.json()
if "statuses" in j and len(j["statuses"]) > 0:
s = j["statuses"][0]
if s["uri"] != toot_url and s["url"] != toot_url:
logger.error(
f"Error status url mismatch {s['uri']} or {s['uri']} != {toot_url}"
)
return None
if s["reblogged"]:
logger.info(f"Already boosted {toot_url}")
# TODO unboost and boost again?
return None
url = (
"https://"
+ domain
+ API_PUBLISH_TOOT
+ "/"
+ j["statuses"][0]["id"]
+ "/reblog"
)
response = post(url, headers=headers)
if response.status_code != 200:
logger.error(
f"Error search {toot_url} on {domain} {response.status_code}"
)
return None
return response.json()
except Exception:
logger.error(f"Error search {toot_url} on {domain}")
return None
2023-11-20 01:59:26 -05:00
def boost_toot_later(user, post_url):
if user and user.mastodon_token and user.mastodon_site and post_url:
django_rq.get_queue("fetch").enqueue(
boost_toot, user.mastodon_site, user.mastodon_token, post_url
)
2023-01-31 11:05:30 -05:00
def post_toot(
site,
content,
visibility,
token,
local_only=False,
update_id=None,
spoiler_text=None,
):
2020-10-22 21:45:05 +02:00
headers = {
2022-12-29 14:30:31 -05:00
"User-Agent": USER_AGENT,
"Authorization": f"Bearer {token}",
"Idempotency-Key": random_string_generator(16),
2020-10-22 21:45:05 +02:00
}
2023-02-07 21:28:51 -05:00
response = None
2023-07-18 16:41:55 -04:00
url = "https://" + get_api_domain(site) + API_PUBLISH_TOOT
payload = {
"status": content,
"visibility": visibility,
}
if spoiler_text:
payload["spoiler_text"] = spoiler_text
if local_only:
payload["local_only"] = True
try:
if update_id:
response = put(url + "/" + update_id, headers=headers, data=payload)
2023-07-18 17:31:51 -04:00
if not update_id or (response is not None and response.status_code != 200):
2023-07-18 16:41:55 -04:00
headers["Idempotency-Key"] = random_string_generator(16)
response = post(url, headers=headers, data=payload)
2023-07-18 17:31:51 -04:00
if response is not None and response.status_code == 201:
2022-11-06 01:06:31 +00:00
response.status_code = 200
2023-07-18 17:31:51 -04:00
if response is not None and response.status_code != 200:
2022-11-06 01:06:31 +00:00
logger.error(f"Error {url} {response.status_code}")
2023-07-18 16:41:55 -04:00
except Exception:
response = None
2020-10-22 21:45:05 +02:00
return response
def create_app(domain_name):
# naive protocal strip
is_http = False
if domain_name.startswith("https://"):
2022-12-29 14:30:31 -05:00
domain_name = domain_name.replace("https://", "")
2020-10-22 21:45:05 +02:00
elif domain_name.startswith("http://"):
is_http = True
2022-12-29 14:30:31 -05:00
domain_name = domain_name.replace("http://", "")
if domain_name.endswith("/"):
2020-10-22 21:45:05 +02:00
domain_name = domain_name[0:-1]
if not is_http:
2022-12-29 14:30:31 -05:00
url = "https://" + domain_name + API_CREATE_APP
2020-10-22 21:45:05 +02:00
else:
2022-12-29 14:30:31 -05:00
url = "http://" + domain_name + API_CREATE_APP
2020-10-22 21:45:05 +02:00
payload = {
2023-07-10 15:29:29 -04:00
"client_name": settings.SITE_INFO["site_name"],
2022-12-29 14:30:31 -05:00
"scopes": settings.MASTODON_CLIENT_SCOPE,
"redirect_uris": settings.REDIRECT_URIS,
2023-07-10 15:29:29 -04:00
"website": settings.SITE_INFO["site_url"],
2020-10-22 21:45:05 +02:00
}
2022-12-29 14:30:31 -05:00
response = post(url, data=payload, headers={"User-Agent": USER_AGENT})
2020-10-22 21:45:05 +02:00
return response
2023-11-11 00:53:03 -05:00
def webfinger(site, username) -> dict | None:
url = f"https://{site}/.well-known/webfinger?resource=acct:{username}@{site}"
try:
response = get(url, headers={"User-Agent": USER_AGENT})
if response.status_code != 200:
logger.error(f"Error webfinger {username}@{site} {response.status_code}")
return None
j = response.json()
return j
except Exception:
logger.error(f"Error webfinger {username}@{site}")
return None
2020-10-22 21:45:05 +02:00
# utils below
def random_string_generator(n):
s = string.ascii_letters + string.punctuation + string.digits
2022-12-29 14:30:31 -05:00
return "".join(random.choice(s) for i in range(n))
2020-10-22 21:45:05 +02:00
def verify_account(site, token):
url = "https://" + get_api_domain(site) + API_VERIFY_ACCOUNT
2022-01-01 17:11:16 +00:00
try:
2022-12-29 14:30:31 -05:00
response = get(
url, headers={"User-Agent": USER_AGENT, "Authorization": f"Bearer {token}"}
)
return response.status_code, (
response.json() if response.status_code == 200 else None
)
2022-01-01 17:11:16 +00:00
except Exception:
return -1, None
2022-04-01 03:07:56 -04:00
def get_related_acct_list(site, token, api):
url = "https://" + get_api_domain(site) + api
results = []
while url:
2022-12-29 14:30:31 -05:00
response = get(
url, headers={"User-Agent": USER_AGENT, "Authorization": f"Bearer {token}"}
)
url = None
if response.status_code == 200:
2022-12-29 14:30:31 -05:00
results.extend(
map(
lambda u: (
u["acct"]
if u["acct"].find("@") != -1
else u["acct"] + "@" + site
)
if "acct" in u
else u,
response.json(),
)
)
if "Link" in response.headers:
for ls in response.headers["Link"].split(","):
li = ls.strip().split(";")
if li[1].strip() == 'rel="next"':
2022-12-29 14:30:31 -05:00
url = li[0].strip().replace(">", "").replace("<", "")
return results
2020-10-22 21:45:05 +02:00
class TootVisibilityEnum:
2022-12-29 14:30:31 -05:00
PUBLIC = "public"
PRIVATE = "private"
DIRECT = "direct"
UNLISTED = "unlisted"
2022-04-01 20:03:37 -04:00
2023-02-14 17:19:00 -05:00
def detect_server_info(login_domain):
url = f"https://{login_domain}/api/v1/instance"
try:
response = get(url, headers={"User-Agent": USER_AGENT})
except Exception as e:
2023-07-20 21:59:49 -04:00
logger.error(f"Error connecting {login_domain}: {e}")
raise Exception(f"无法连接 {login_domain}")
2023-02-14 17:19:00 -05:00
if response.status_code != 200:
logger.error(f"Error connecting {login_domain}: {response.status_code}")
raise Exception(f"实例 {login_domain} 返回错误,代码: {response.status_code}")
2023-02-14 17:19:00 -05:00
try:
j = response.json()
domain = j["uri"].lower().split("//")[-1].split("/")[0]
except Exception as e:
logger.error(f"Error connecting {login_domain}: {e}")
raise Exception(f"实例 {login_domain} 返回信息无法识别")
server_version = j["version"]
api_domain = domain
if domain != login_domain:
url = f"https://{domain}/api/v1/instance"
try:
response = get(url, headers={"User-Agent": USER_AGENT})
j = response.json()
except:
api_domain = login_domain
logger.info(
f"detect_server_info: {login_domain} {domain} {api_domain} {server_version}"
)
return domain, api_domain, server_version
2023-02-14 17:19:00 -05:00
def get_or_create_fediverse_application(login_domain):
domain = login_domain
2023-07-08 00:44:22 -04:00
app = MastodonApplication.objects.filter(domain_name__iexact=domain).first()
if not app:
2023-07-08 00:44:22 -04:00
app = MastodonApplication.objects.filter(api_domain__iexact=domain).first()
2023-02-14 17:19:00 -05:00
if app:
return app
if not settings.MASTODON_ALLOW_ANY_SITE:
logger.error(f"Disallowed to create app for {domain}")
raise Exception("不支持其它实例登录")
domain, api_domain, server_version = detect_server_info(login_domain)
if login_domain != domain:
2023-07-08 00:44:22 -04:00
app = MastodonApplication.objects.filter(domain_name__iexact=domain).first()
2023-02-14 17:19:00 -05:00
if app:
return app
response = create_app(api_domain)
if response.status_code != 200:
logger.error(
f"Error creating app for {domain} on {api_domain}: {response.status_code}"
)
raise Exception("实例注册应用失败,代码: " + str(response.status_code))
2022-04-01 20:03:37 -04:00
try:
2023-02-14 17:19:00 -05:00
data = response.json()
except Exception:
logger.error(f"Error creating app for {domain}: unable to parse response")
raise Exception("实例注册应用失败,返回内容无法识别")
app = MastodonApplication.objects.create(
2023-07-08 00:44:22 -04:00
domain_name=domain.lower(),
api_domain=api_domain.lower(),
2023-02-14 17:19:00 -05:00
server_version=server_version,
app_id=data["id"],
client_id=data["client_id"],
client_secret=data["client_secret"],
vapid_key=data["vapid_key"] if "vapid_key" in data else "",
)
return app
2022-04-01 20:03:37 -04:00
def get_mastodon_login_url(app, login_domain, request):
2023-07-10 15:29:29 -04:00
url = settings.REDIRECT_URIS
version = app.server_version or ""
2022-12-29 14:30:31 -05:00
scope = (
settings.MASTODON_LEGACY_CLIENT_SCOPE
2023-01-13 02:43:38 -05:00
if "Pixelfed" in version
2022-12-29 14:30:31 -05:00
else settings.MASTODON_CLIENT_SCOPE
)
return (
"https://"
+ login_domain
+ "/oauth/authorize?client_id="
+ app.client_id
+ "&scope="
+ quote(scope)
+ "&redirect_uri="
+ url
+ "&response_type=code"
)
2022-04-01 20:03:37 -04:00
def obtain_token(site, request, code):
2022-12-29 14:30:31 -05:00
"""Returns token if success else None."""
2022-04-01 20:03:37 -04:00
mast_app = MastodonApplication.objects.get(domain_name=site)
2023-07-10 15:29:29 -04:00
redirect_uri = settings.REDIRECT_URIS
2022-04-01 20:03:37 -04:00
payload = {
2022-12-29 14:30:31 -05:00
"client_id": mast_app.client_id,
"client_secret": mast_app.client_secret,
"redirect_uri": redirect_uri,
"grant_type": "authorization_code",
"code": code,
2022-04-01 20:03:37 -04:00
}
2022-12-29 14:30:31 -05:00
headers = {"User-Agent": USER_AGENT}
2022-04-01 20:03:37 -04:00
auth = None
if mast_app.is_proxy:
2022-12-29 14:30:31 -05:00
url = "https://" + mast_app.proxy_to + API_OBTAIN_TOKEN
2022-04-01 20:03:37 -04:00
else:
url = (
"https://"
+ (mast_app.api_domain or mast_app.domain_name)
+ API_OBTAIN_TOKEN
)
2022-05-11 16:08:41 -04:00
try:
response = post(url, data=payload, headers=headers, auth=auth)
# {"token_type":"bearer","expires_in":7200,"access_token":"VGpkOEZGR3FQRDJ5NkZ0dmYyYWIwS0dqeHpvTnk4eXp0NV9nWDJ2TEpmM1ZTOjE2NDg3ODMxNTU4Mzc6MToxOmF0OjE","scope":"block.read follows.read offline.access tweet.write users.read mute.read","refresh_token":"b1pXbGEzeUF1WE5yZHJOWmxTeWpvMTBrQmZPd0czLU0tQndZQTUyU3FwRDVIOjE2NDg3ODMxNTU4Mzg6MToxOnJ0OjE"}
if response.status_code != 200:
logger.error(f"Error {url} {response.status_code}")
return None, None
except Exception as e:
logger.error(f"Error {url} {e}")
2022-04-01 20:03:37 -04:00
return None, None
data = response.json()
2022-12-29 14:30:31 -05:00
return data.get("access_token"), data.get("refresh_token", "")
2022-04-01 20:03:37 -04:00
def refresh_access_token(site, refresh_token):
2023-07-18 16:41:55 -04:00
pass
2022-04-01 20:03:37 -04:00
def revoke_token(site, token):
mast_app = MastodonApplication.objects.get(domain_name=site)
payload = {
2022-12-29 14:30:31 -05:00
"client_id": mast_app.client_id,
"client_secret": mast_app.client_secret,
"token": token,
2022-04-01 20:03:37 -04:00
}
if mast_app.is_proxy:
2022-12-29 14:30:31 -05:00
url = "https://" + mast_app.proxy_to + API_REVOKE_TOKEN
2022-04-01 20:03:37 -04:00
else:
url = "https://" + get_api_domain(site) + API_REVOKE_TOKEN
2022-12-29 14:30:31 -05:00
post(url, data=payload, headers={"User-Agent": USER_AGENT})
2022-04-01 20:03:37 -04:00
2023-01-31 21:21:50 -05:00
def get_status_id_by_url(url):
if not url:
return None
r = re.match(
r".+/(\w+)$", url
) # might be re.match(r'.+/([^/]+)$', u) if Pleroma supports edit
return r[1] if r else None
def get_spoiler_text(text, item):
if text.find(">!") != -1:
2023-06-05 02:46:26 -04:00
spoiler_text = f"关于《{item.display_title}》 可能有关键情节等敏感内容"
2023-01-31 21:21:50 -05:00
return spoiler_text, text.replace(">!", "").replace("!<", "")
else:
return None, text
def get_visibility(visibility, user):
if visibility == 2:
return TootVisibilityEnum.DIRECT
elif visibility == 1:
return TootVisibilityEnum.PRIVATE
elif user.preference.mastodon_publish_public:
2023-01-31 21:21:50 -05:00
return TootVisibilityEnum.PUBLIC
else:
return TootVisibilityEnum.UNLISTED
def share_mark(mark):
2023-01-09 09:43:35 -05:00
from catalog.common import ItemCategory
2023-07-20 21:59:49 -04:00
user = mark.owner.user
if mark.visibility == 2:
visibility = TootVisibilityEnum.DIRECT
elif mark.visibility == 1:
visibility = TootVisibilityEnum.PRIVATE
elif user.preference.mastodon_publish_public:
visibility = TootVisibilityEnum.PUBLIC
else:
visibility = TootVisibilityEnum.UNLISTED
2022-12-29 14:30:31 -05:00
tags = (
"\n"
+ user.preference.mastodon_append_tag.replace(
2023-01-09 09:43:35 -05:00
"[category]", str(ItemCategory(mark.item.category).label)
2022-12-29 14:30:31 -05:00
)
if user.preference.mastodon_append_tag
2022-12-29 14:30:31 -05:00
else ""
)
stars = rating_to_emoji(
mark.rating_grade,
2022-12-29 14:30:31 -05:00
MastodonApplication.objects.get(domain_name=user.mastodon_site).star_mode,
)
2023-06-05 02:46:26 -04:00
content = f"{mark.action_label}{mark.item.display_title}{stars}\n{mark.item.absolute_url}\n{mark.comment_text or ''}{tags}"
2023-01-31 21:21:50 -05:00
update_id = get_status_id_by_url(mark.shared_link)
spoiler_text, content = get_spoiler_text(content, mark.item)
2022-12-29 14:30:31 -05:00
response = post_toot(
2023-01-31 11:05:30 -05:00
user.mastodon_site,
content,
visibility,
user.mastodon_token,
False,
update_id,
spoiler_text,
2022-12-29 14:30:31 -05:00
)
2023-07-18 17:31:51 -04:00
if response is not None and response.status_code in [200, 201]:
j = response.json()
2022-12-29 14:30:31 -05:00
if "url" in j:
mark.shared_link = j["url"]
2022-05-29 08:43:52 -04:00
if mark.shared_link:
2022-12-29 14:30:31 -05:00
mark.save(update_fields=["shared_link"])
2023-07-18 17:31:51 -04:00
return True, 200
else:
2023-01-10 16:52:00 -05:00
logger.error(response)
2023-07-18 17:31:51 -04:00
return False, response.status_code if response is not None else -1
def share_review(review):
2023-01-09 09:54:41 -05:00
from catalog.common import ItemCategory
user = review.owner
if review.visibility == 2:
visibility = TootVisibilityEnum.DIRECT
elif review.visibility == 1:
visibility = TootVisibilityEnum.PRIVATE
elif user.preference.mastodon_publish_public:
visibility = TootVisibilityEnum.PUBLIC
else:
visibility = TootVisibilityEnum.UNLISTED
2022-12-29 14:30:31 -05:00
tags = (
"\n"
+ user.preference.mastodon_append_tag.replace(
2023-01-09 09:54:41 -05:00
"[category]", str(ItemCategory(review.item.category).label)
2022-12-29 14:30:31 -05:00
)
if user.preference.mastodon_append_tag
2022-12-29 14:30:31 -05:00
else ""
)
2023-06-05 02:46:26 -04:00
content = f"发布了关于《{review.item.display_title}》的评论\n{review.title}\n{review.absolute_url}{tags}"
2022-11-06 01:06:31 +00:00
update_id = None
if review.metadata.get(
"shared_link"
): # "https://mastodon.social/@username/1234567890"
2022-12-29 14:30:31 -05:00
r = re.match(
r".+/(\w+)$", review.metadata.get("shared_link")
2022-12-29 14:30:31 -05:00
) # might be re.match(r'.+/([^/]+)$', u) if Pleroma supports edit
2022-11-06 01:06:31 +00:00
update_id = r[1] if r else None
2022-12-29 14:30:31 -05:00
response = post_toot(
user.mastodon_site, content, visibility, user.mastodon_token, False, update_id
)
2023-07-18 17:31:51 -04:00
if response is not None and response.status_code in [200, 201]:
j = response.json()
2022-12-29 14:30:31 -05:00
if "url" in j:
review.metadata["shared_link"] = j["url"]
review.save()
return True
else:
return False
2022-06-11 14:46:08 -04:00
def share_collection(collection, comment, user, visibility_no):
if visibility_no == 2:
visibility = TootVisibilityEnum.DIRECT
elif visibility_no == 1:
visibility = TootVisibilityEnum.PRIVATE
elif user.preference.mastodon_publish_public:
2022-06-11 14:46:08 -04:00
visibility = TootVisibilityEnum.PUBLIC
else:
visibility = TootVisibilityEnum.UNLISTED
2022-12-29 14:30:31 -05:00
tags = (
"\n" + user.preference.mastodon_append_tag.replace("[category]", "收藏单")
if user.preference.mastodon_append_tag
2022-12-29 14:30:31 -05:00
else ""
)
2023-01-16 14:03:27 -05:00
user_str = (
""
2023-07-20 21:59:49 -04:00
if user == collection.owner.user
else (
2023-07-20 21:59:49 -04:00
" @" + collection.owner.user.mastodon_acct + " "
if collection.owner.user.mastodon_acct
else " " + collection.owner.username + " "
)
2023-01-16 14:03:27 -05:00
)
content = f"分享{user_str}的收藏单《{collection.title}\n{collection.absolute_url}\n{comment}{tags}"
2022-06-11 14:46:08 -04:00
response = post_toot(user.mastodon_site, content, visibility, user.mastodon_token)
2023-07-18 17:31:51 -04:00
if response is not None and response.status_code in [200, 201]:
2022-06-11 14:46:08 -04:00
return True
else:
return False