lib.itmens/mastodon/api.py
2022-12-29 14:30:31 -05:00

566 lines
19 KiB
Python

import requests
import string
import random
import functools
import logging
from django.core.exceptions import ObjectDoesNotExist
from django.conf import settings
from django.shortcuts import reverse
from urllib.parse import quote
from .models import CrossSiteUserInfo, MastodonApplication
from mastodon.utils import rating_to_emoji
import re
logger = logging.getLogger(__name__)
# See https://docs.joinmastodon.org/methods/accounts/
# returns user info
# retruns the same info as verify account credentials
# GET
API_GET_ACCOUNT = "/api/v1/accounts/:id"
# returns user info if valid, 401 if invalid
# GET
API_VERIFY_ACCOUNT = "/api/v1/accounts/verify_credentials"
# obtain token
# GET
API_OBTAIN_TOKEN = "/oauth/token"
# obatin auth code
# GET
API_OAUTH_AUTHORIZE = "/oauth/authorize"
# revoke token
# POST
API_REVOKE_TOKEN = "/oauth/revoke"
# relationships
# GET
API_GET_RELATIONSHIPS = "/api/v1/accounts/relationships"
# toot
# POST
API_PUBLISH_TOOT = "/api/v1/statuses"
# create new app
# POST
API_CREATE_APP = "/api/v1/apps"
# search
# GET
API_SEARCH = "/api/v2/search"
TWITTER_DOMAIN = "twitter.com"
TWITTER_API_ME = "https://api.twitter.com/2/users/me"
TWITTER_API_POST = "https://api.twitter.com/2/tweets"
TWITTER_API_TOKEN = "https://api.twitter.com/2/oauth2/token"
USER_AGENT = f"{settings.CLIENT_NAME}/1.0"
get = functools.partial(requests.get, timeout=settings.MASTODON_TIMEOUT)
put = functools.partial(requests.put, timeout=settings.MASTODON_TIMEOUT)
post = functools.partial(requests.post, timeout=settings.MASTODON_TIMEOUT)
# low level api below
def get_relationships(site, id_list, token): # no longer in use
url = "https://" + site + API_GET_RELATIONSHIPS
payload = {"id[]": id_list}
headers = {"User-Agent": USER_AGENT, "Authorization": f"Bearer {token}"}
response = get(url, headers=headers, params=payload)
return response.json()
def post_toot(site, content, visibility, token, local_only=False, update_id=None):
headers = {
"User-Agent": USER_AGENT,
"Authorization": f"Bearer {token}",
"Idempotency-Key": random_string_generator(16),
}
if site == TWITTER_DOMAIN:
url = TWITTER_API_POST
payload = {"text": content if len(content) <= 150 else content[0:150] + "..."}
response = post(url, headers=headers, json=payload)
if response.status_code == 201:
response.status_code = 200
if response.status_code != 200:
logger.error(f"Error {url} {response.status_code}")
else:
url = "https://" + site + API_PUBLISH_TOOT
payload = {
"status": content,
"visibility": visibility,
}
if local_only:
payload["local_only"] = True
try:
if update_id:
response = put(url + "/" + update_id, headers=headers, data=payload)
if update_id is None or response.status_code != 200:
response = post(url, headers=headers, data=payload)
if response.status_code == 201:
response.status_code = 200
if response.status_code != 200:
logger.error(f"Error {url} {response.status_code}")
except Exception:
response = None
return response
def get_instance_info(domain_name):
if domain_name.lower().strip() == TWITTER_DOMAIN:
return TWITTER_DOMAIN, ""
try:
url = f"https://{domain_name}/api/v1/instance"
response = get(url, headers={"User-Agent": USER_AGENT})
j = response.json()
return j["uri"].lower().split("//")[-1].split("/")[0], j["version"]
except Exception:
logger.error(f"Error {url}")
return domain_name, ""
def create_app(domain_name):
# naive protocal strip
is_http = False
if domain_name.startswith("https://"):
domain_name = domain_name.replace("https://", "")
elif domain_name.startswith("http://"):
is_http = True
domain_name = domain_name.replace("http://", "")
if domain_name.endswith("/"):
domain_name = domain_name[0:-1]
if not is_http:
url = "https://" + domain_name + API_CREATE_APP
else:
url = "http://" + domain_name + API_CREATE_APP
payload = {
"client_name": settings.CLIENT_NAME,
"scopes": settings.MASTODON_CLIENT_SCOPE,
"redirect_uris": settings.REDIRECT_URIS,
"website": settings.APP_WEBSITE,
}
response = post(url, data=payload, headers={"User-Agent": USER_AGENT})
return response
def get_site_id(username, user_site, target_site, token):
url = "https://" + target_site + API_SEARCH
payload = {
"limit": 1,
"type": "accounts",
"resolve": True,
"q": f"{username}@{user_site}",
}
headers = {"User-Agent": USER_AGENT, "Authorization": f"Bearer {token}"}
response = get(url, params=payload, headers=headers)
try:
data = response.json()
except Exception:
logger.error(f"Error parsing JSON from {url}")
return None
if "accounts" not in data:
return None
elif (
len(data["accounts"]) == 0
): # target site may return empty if no cache of this user
return None
elif (
data["accounts"][0]["acct"] != f"{username}@{user_site}"
): # or return another user with a similar id which needs to be skipped
return None
else:
return data["accounts"][0]["id"]
# high level api below
def get_relationship(request_user, target_user, useless_token=None):
return [
{
"blocked_by": target_user.is_blocking(request_user),
"following": request_user.is_following(target_user),
}
]
def get_cross_site_id(target_user, target_site, token):
"""
Firstly attempt to query local database, if the cross site id
doesn't exsit then make a query to mastodon site, then save the
result into database.
Return target_user at target_site cross site id.
"""
if target_site == target_user.mastodon_site:
return target_user.mastodon_id
if target_site == TWITTER_DOMAIN:
return None
try:
cross_site_info = CrossSiteUserInfo.objects.get(
uid=f"{target_user.username}@{target_user.mastodon_site}",
target_site=target_site,
)
except ObjectDoesNotExist:
cross_site_id = get_site_id(
target_user.username, target_user.mastodon_site, target_site, token
)
if not cross_site_id:
logger.error(
f"unable to find cross_site_id for {target_user} on {target_site}"
)
return None
cross_site_info = CrossSiteUserInfo.objects.create(
uid=f"{target_user.username}@{target_user.mastodon_site}",
target_site=target_site,
site_id=cross_site_id,
local_id=target_user.id,
)
return cross_site_info.site_id
# utils below
def random_string_generator(n):
s = string.ascii_letters + string.punctuation + string.digits
return "".join(random.choice(s) for i in range(n))
def verify_account(site, token):
if site == TWITTER_DOMAIN:
url = (
TWITTER_API_ME
+ "?user.fields=id,username,name,description,profile_image_url,created_at,protected"
)
try:
response = get(
url,
headers={"User-Agent": USER_AGENT, "Authorization": f"Bearer {token}"},
)
if response.status_code != 200:
logger.error(f"Error {url} {response.status_code}")
return response.status_code, None
r = response.json()["data"]
r["display_name"] = r["name"]
r["note"] = r["description"]
r["avatar"] = r["profile_image_url"]
r["avatar_static"] = r["profile_image_url"]
r["locked"] = r["protected"]
r["url"] = f'https://{TWITTER_DOMAIN}/{r["username"]}'
return 200, r
except Exception:
return -1, None
url = "https://" + site + API_VERIFY_ACCOUNT
try:
response = get(
url, headers={"User-Agent": USER_AGENT, "Authorization": f"Bearer {token}"}
)
return response.status_code, (
response.json() if response.status_code == 200 else None
)
except Exception:
return -1, None
def get_related_acct_list(site, token, api):
if site == TWITTER_DOMAIN:
return []
url = "https://" + site + api
results = []
while url:
response = get(
url, headers={"User-Agent": USER_AGENT, "Authorization": f"Bearer {token}"}
)
url = None
if response.status_code == 200:
results.extend(
map(
lambda u: (
u["acct"]
if u["acct"].find("@") != -1
else u["acct"] + "@" + site
)
if "acct" in u
else u,
response.json(),
)
)
if "Link" in response.headers:
for ls in response.headers["Link"].split(","):
li = ls.strip().split(";")
if li[1].strip() == 'rel="next"':
url = li[0].strip().replace(">", "").replace("<", "")
return results
class TootVisibilityEnum:
PUBLIC = "public"
PRIVATE = "private"
DIRECT = "direct"
UNLISTED = "unlisted"
def get_mastodon_application(domain):
app = MastodonApplication.objects.filter(domain_name=domain).first()
if app is not None:
return app, ""
if domain == TWITTER_DOMAIN:
return None, "Twitter未配置"
error_msg = None
try:
response = create_app(domain)
except (requests.exceptions.Timeout, ConnectionError):
error_msg = "联邦网络请求超时。"
logger.error(f"Error creating app for {domain}: Timeout")
except Exception as e:
error_msg = "联邦网络请求失败 " + str(e)
logger.error(f"Error creating app for {domain}: {e}")
else:
# fill the form with returned data
if response.status_code != 200:
error_msg = "实例连接错误,代码: " + str(response.status_code)
logger.error(f"Error creating app for {domain}: {response.status_code}")
else:
try:
data = response.json()
except Exception:
error_msg = "实例返回内容无法识别"
logger.error(
f"Error creating app for {domain}: unable to parse response"
)
else:
if settings.MASTODON_ALLOW_ANY_SITE:
app = MastodonApplication.objects.create(
domain_name=domain,
app_id=data["id"],
client_id=data["client_id"],
client_secret=data["client_secret"],
vapid_key=data["vapid_key"] if "vapid_key" in data else "",
)
else:
error_msg = "不支持其它实例登录"
logger.error(f"Disallowed to create app for {domain}")
return app, error_msg
def get_mastodon_login_url(app, login_domain, version, request):
url = request.scheme + "://" + request.get_host() + reverse("users:OAuth2_login")
if login_domain == TWITTER_DOMAIN:
return f"https://twitter.com/i/oauth2/authorize?response_type=code&client_id={app.client_id}&redirect_uri={quote(url)}&scope={quote(settings.TWITTER_CLIENT_SCOPE)}&state=state&code_challenge=challenge&code_challenge_method=plain"
scope = (
settings.MASTODON_LEGACY_CLIENT_SCOPE
if "Pixelfed" in version
else settings.MASTODON_CLIENT_SCOPE
)
return (
"https://"
+ login_domain
+ "/oauth/authorize?client_id="
+ app.client_id
+ "&scope="
+ quote(scope)
+ "&redirect_uri="
+ url
+ "&response_type=code"
)
def obtain_token(site, request, code):
"""Returns token if success else None."""
mast_app = MastodonApplication.objects.get(domain_name=site)
redirect_uri = (
request.scheme + "://" + request.get_host() + reverse("users:OAuth2_login")
)
payload = {
"client_id": mast_app.client_id,
"client_secret": mast_app.client_secret,
"redirect_uri": redirect_uri,
"grant_type": "authorization_code",
"code": code,
}
headers = {"User-Agent": USER_AGENT}
auth = None
if mast_app.is_proxy:
url = "https://" + mast_app.proxy_to + API_OBTAIN_TOKEN
elif site == TWITTER_DOMAIN:
url = TWITTER_API_TOKEN
auth = (mast_app.client_id, mast_app.client_secret)
del payload["client_secret"]
payload["code_verifier"] = "challenge"
else:
url = "https://" + mast_app.domain_name + API_OBTAIN_TOKEN
try:
response = post(url, data=payload, headers=headers, auth=auth)
# {"token_type":"bearer","expires_in":7200,"access_token":"VGpkOEZGR3FQRDJ5NkZ0dmYyYWIwS0dqeHpvTnk4eXp0NV9nWDJ2TEpmM1ZTOjE2NDg3ODMxNTU4Mzc6MToxOmF0OjE","scope":"block.read follows.read offline.access tweet.write users.read mute.read","refresh_token":"b1pXbGEzeUF1WE5yZHJOWmxTeWpvMTBrQmZPd0czLU0tQndZQTUyU3FwRDVIOjE2NDg3ODMxNTU4Mzg6MToxOnJ0OjE"}
if response.status_code != 200:
logger.error(f"Error {url} {response.status_code}")
return None, None
except Exception as e:
logger.error(f"Error {url} {e}")
return None, None
data = response.json()
return data.get("access_token"), data.get("refresh_token", "")
def refresh_access_token(site, refresh_token):
if site != TWITTER_DOMAIN:
return None
mast_app = MastodonApplication.objects.get(domain_name=site)
url = TWITTER_API_TOKEN
payload = {
"client_id": mast_app.client_id,
"refresh_token": refresh_token,
"grant_type": "refresh_token",
}
headers = {"User-Agent": USER_AGENT}
auth = (mast_app.client_id, mast_app.client_secret)
response = post(url, data=payload, headers=headers, auth=auth)
if response.status_code != 200:
logger.error(f"Error {url} {response.status_code}")
return None
data = response.json()
return data.get("access_token")
def revoke_token(site, token):
mast_app = MastodonApplication.objects.get(domain_name=site)
payload = {
"client_id": mast_app.client_id,
"client_secret": mast_app.client_secret,
"token": token,
}
if mast_app.is_proxy:
url = "https://" + mast_app.proxy_to + API_REVOKE_TOKEN
else:
url = "https://" + site + API_REVOKE_TOKEN
post(url, data=payload, headers={"User-Agent": USER_AGENT})
def share_mark(mark):
user = mark.owner
if mark.visibility == 2:
visibility = TootVisibilityEnum.DIRECT
elif mark.visibility == 1:
visibility = TootVisibilityEnum.PRIVATE
elif user.get_preference().mastodon_publish_public:
visibility = TootVisibilityEnum.PUBLIC
else:
visibility = TootVisibilityEnum.UNLISTED
tags = (
"\n"
+ user.get_preference().mastodon_append_tag.replace(
"[category]", str(mark.item.verbose_category_name)
)
if user.get_preference().mastodon_append_tag
else ""
)
stars = rating_to_emoji(
mark.rating,
MastodonApplication.objects.get(domain_name=user.mastodon_site).star_mode,
)
content = f"{mark.translated_status}{mark.item.title}{stars}\n{mark.item.absolute_url}\n{mark.text}{tags}"
update_id = None
if mark.shared_link: # "https://mastodon.social/@username/1234567890"
r = re.match(
r".+/(\w+)$", mark.shared_link
) # might be re.match(r'.+/([^/]+)$', u) if Pleroma supports edit
update_id = r[1] if r else None
response = post_toot(
user.mastodon_site, content, visibility, user.mastodon_token, False, update_id
)
if response and response.status_code in [200, 201]:
j = response.json()
if "url" in j:
mark.shared_link = j["url"]
elif "data" in j:
mark.shared_link = (
f"https://twitter.com/{user.username}/status/{j['data']['id']}"
)
if mark.shared_link:
mark.save(update_fields=["shared_link"])
return True
else:
return False
def share_review(review):
user = review.owner
if review.visibility == 2:
visibility = TootVisibilityEnum.DIRECT
elif review.visibility == 1:
visibility = TootVisibilityEnum.PRIVATE
elif user.get_preference().mastodon_publish_public:
visibility = TootVisibilityEnum.PUBLIC
else:
visibility = TootVisibilityEnum.UNLISTED
tags = (
"\n"
+ user.get_preference().mastodon_append_tag.replace(
"[category]", str(review.item.verbose_category_name)
)
if user.get_preference().mastodon_append_tag
else ""
)
content = f"发布了关于《{review.item.title}》的评论\n{review.url}\n{review.title}{tags}"
update_id = None
if review.shared_link: # "https://mastodon.social/@username/1234567890"
r = re.match(
r".+/(\w+)$", review.shared_link
) # might be re.match(r'.+/([^/]+)$', u) if Pleroma supports edit
update_id = r[1] if r else None
response = post_toot(
user.mastodon_site, content, visibility, user.mastodon_token, False, update_id
)
if response and response.status_code in [200, 201]:
j = response.json()
if "url" in j:
review.shared_link = j["url"]
elif "data" in j:
review.shared_link = (
f"https://twitter.com/{user.username}/status/{j['data']['id']}"
)
if review.shared_link:
review.save(update_fields=["shared_link"])
return True
else:
return False
def share_collection(collection, comment, user, visibility_no):
if visibility_no == 2:
visibility = TootVisibilityEnum.DIRECT
elif visibility_no == 1:
visibility = TootVisibilityEnum.PRIVATE
elif user.get_preference().mastodon_publish_public:
visibility = TootVisibilityEnum.PUBLIC
else:
visibility = TootVisibilityEnum.UNLISTED
tags = (
"\n" + user.get_preference().mastodon_append_tag.replace("[category]", "收藏单")
if user.get_preference().mastodon_append_tag
else ""
)
content = f"分享收藏单《{collection.title}\n{collection.absolute_url}\n{comment}{tags}"
response = post_toot(user.mastodon_site, content, visibility, user.mastodon_token)
if response and response.status_code in [200, 201]:
j = response.json()
if "url" in j:
shared_link = j["url"]
elif "data" in j:
shared_link = (
f"https://twitter.com/{user.username}/status/{j['data']['id']}"
)
if shared_link:
pass
return True
else:
return False