diff --git a/mastodon/api.py b/mastodon/api.py index 5739685b..fb28c729 100644 --- a/mastodon/api.py +++ b/mastodon/api.py @@ -67,13 +67,12 @@ put = functools.partial(requests.put, timeout=settings.MASTODON_TIMEOUT) post = functools.partial(requests.post, timeout=settings.MASTODON_TIMEOUT) +def get_api_domain(domain): + app = MastodonApplication.objects.filter(domain_name=domain).first() + return app.api_domain if app and app.api_domain else domain + + # low level api below -def get_relationships(site, id_list, token): # no longer in use - url = "https://" + site + API_GET_RELATIONSHIPS - payload = {"id[]": id_list} - headers = {"User-Agent": USER_AGENT, "Authorization": f"Bearer {token}"} - response = get(url, headers=headers, params=payload) - return response.json() def post_toot( @@ -100,7 +99,7 @@ def post_toot( if response.status_code != 200: logger.error(f"Error {url} {response.status_code}") else: - url = "https://" + site + API_PUBLISH_TOOT + url = "https://" + get_api_domain(site) + API_PUBLISH_TOOT payload = { "status": content, "visibility": visibility, @@ -124,19 +123,6 @@ def post_toot( return response -def get_instance_info(domain_name): - if domain_name.lower().strip() == TWITTER_DOMAIN: - return TWITTER_DOMAIN, "" - url = f"https://{domain_name}/api/v1/instance" - try: - response = get(url, headers={"User-Agent": USER_AGENT}) - j = response.json() - return j["uri"].lower().split("//")[-1].split("/")[0], j["version"] - except Exception: - logger.error(f"Error {url}") - return domain_name, "" - - def create_app(domain_name): # naive protocal strip is_http = False @@ -164,80 +150,6 @@ def create_app(domain_name): return response -def get_site_id(username, user_site, target_site, token): - url = "https://" + target_site + API_SEARCH - payload = { - "limit": 1, - "type": "accounts", - "resolve": True, - "q": f"{username}@{user_site}", - } - headers = {"User-Agent": USER_AGENT, "Authorization": f"Bearer {token}"} - response = get(url, params=payload, headers=headers) - try: - data = response.json() - except Exception: - logger.error(f"Error parsing JSON from {url}") - return None - if "accounts" not in data: - return None - elif ( - len(data["accounts"]) == 0 - ): # target site may return empty if no cache of this user - return None - elif ( - data["accounts"][0]["acct"] != f"{username}@{user_site}" - ): # or return another user with a similar id which needs to be skipped - return None - else: - return data["accounts"][0]["id"] - - -# high level api below -def get_relationship(request_user, target_user, useless_token=None): - return [ - { - "blocked_by": target_user.is_blocking(request_user), - "following": request_user.is_following(target_user), - } - ] - - -def get_cross_site_id(target_user, target_site, token): - """ - Firstly attempt to query local database, if the cross site id - doesn't exsit then make a query to mastodon site, then save the - result into database. - Return target_user at target_site cross site id. - """ - if target_site == target_user.mastodon_site: - return target_user.mastodon_id - if target_site == TWITTER_DOMAIN: - return None - - try: - cross_site_info = CrossSiteUserInfo.objects.get( - uid=f"{target_user.username}@{target_user.mastodon_site}", - target_site=target_site, - ) - except ObjectDoesNotExist: - cross_site_id = get_site_id( - target_user.username, target_user.mastodon_site, target_site, token - ) - if not cross_site_id: - logger.error( - f"unable to find cross_site_id for {target_user} on {target_site}" - ) - return None - cross_site_info = CrossSiteUserInfo.objects.create( - uid=f"{target_user.username}@{target_user.mastodon_site}", - target_site=target_site, - site_id=cross_site_id, - local_id=target_user.id, - ) - return cross_site_info.site_id - - # utils below def random_string_generator(n): s = string.ascii_letters + string.punctuation + string.digits @@ -268,7 +180,7 @@ def verify_account(site, token): return 200, r except Exception: return -1, None - url = "https://" + site + API_VERIFY_ACCOUNT + url = "https://" + get_api_domain(site) + API_VERIFY_ACCOUNT try: response = get( url, headers={"User-Agent": USER_AGENT, "Authorization": f"Bearer {token}"} @@ -283,7 +195,7 @@ def verify_account(site, token): def get_related_acct_list(site, token, api): if site == TWITTER_DOMAIN: return [] - url = "https://" + site + api + url = "https://" + get_api_domain(site) + api results = [] while url: response = get( @@ -318,7 +230,32 @@ class TootVisibilityEnum: UNLISTED = "unlisted" -def get_mastodon_application(domain): +def get_mastodon_application(login_domain): + domain = login_domain + api_domain = "" + server_version = "" + app = MastodonApplication.objects.filter(domain_name=domain).first() + if not app: + # detect the correct domains + url = f"https://{login_domain}/api/v1/instance" + try: + response = get(url, headers={"User-Agent": USER_AGENT}) + if response.status_code != 200: + logger.error(f"Error connecting {domain}: {response.status_code}") + return None, "实例连接错误,代码: " + str(response.status_code) + j = response.json() + domain = j["uri"].lower().split("//")[-1].split("/")[0] + api_domain = domain + if "urls" in j and "streaming_api" in j["urls"]: + api_domain = j["urls"]["streaming_api"].split("://")[1] + server_version = j["version"] + except (requests.exceptions.Timeout, ConnectionError): + logger.error(f"Error connecting {login_domain}: Timeout") + return None, "连接实例请求超时" + except Exception as e: + logger.error(f"Error connecting {login_domain}: {e}") + return None, "无法识别实例信息" + app = MastodonApplication.objects.filter(domain_name=domain).first() if app is not None: return app, "" @@ -326,18 +263,20 @@ def get_mastodon_application(domain): return None, "Twitter未配置" error_msg = None try: - response = create_app(domain) + response = create_app(api_domain) except (requests.exceptions.Timeout, ConnectionError): error_msg = "联邦网络请求超时。" - logger.error(f"Error creating app for {domain}: Timeout") + logger.error(f"Error creating app for {domain} on {api_domain}: Timeout") except Exception as e: error_msg = "联邦网络请求失败 " + str(e) - logger.error(f"Error creating app for {domain}: {e}") + logger.error(f"Error creating app for {domain} on {api_domain}: {e}") else: # fill the form with returned data if response.status_code != 200: error_msg = "实例连接错误,代码: " + str(response.status_code) - logger.error(f"Error creating app for {domain}: {response.status_code}") + logger.error( + f"Error creating app for {domain} on {api_domain}: {response.status_code}" + ) else: try: data = response.json() @@ -350,6 +289,8 @@ def get_mastodon_application(domain): if settings.MASTODON_ALLOW_ANY_SITE: app = MastodonApplication.objects.create( domain_name=domain, + api_domain=api_domain, + server_version=server_version, app_id=data["id"], client_id=data["client_id"], client_secret=data["client_secret"], @@ -361,10 +302,11 @@ def get_mastodon_application(domain): return app, error_msg -def get_mastodon_login_url(app, login_domain, version, request): +def get_mastodon_login_url(app, login_domain, request): url = request.scheme + "://" + request.get_host() + reverse("users:OAuth2_login") if login_domain == TWITTER_DOMAIN: return f"https://twitter.com/i/oauth2/authorize?response_type=code&client_id={app.client_id}&redirect_uri={quote(url)}&scope={quote(settings.TWITTER_CLIENT_SCOPE)}&state=state&code_challenge=challenge&code_challenge_method=plain" + version = app.server_version or "" scope = ( settings.MASTODON_LEGACY_CLIENT_SCOPE if "Pixelfed" in version @@ -406,7 +348,11 @@ def obtain_token(site, request, code): del payload["client_secret"] payload["code_verifier"] = "challenge" else: - url = "https://" + mast_app.domain_name + API_OBTAIN_TOKEN + url = ( + "https://" + + (mast_app.api_domain or mast_app.domain_name) + + API_OBTAIN_TOKEN + ) try: response = post(url, data=payload, headers=headers, auth=auth) # {"token_type":"bearer","expires_in":7200,"access_token":"VGpkOEZGR3FQRDJ5NkZ0dmYyYWIwS0dqeHpvTnk4eXp0NV9nWDJ2TEpmM1ZTOjE2NDg3ODMxNTU4Mzc6MToxOmF0OjE","scope":"block.read follows.read offline.access tweet.write users.read mute.read","refresh_token":"b1pXbGEzeUF1WE5yZHJOWmxTeWpvMTBrQmZPd0czLU0tQndZQTUyU3FwRDVIOjE2NDg3ODMxNTU4Mzg6MToxOnJ0OjE"} @@ -452,7 +398,7 @@ def revoke_token(site, token): if mast_app.is_proxy: url = "https://" + mast_app.proxy_to + API_REVOKE_TOKEN else: - url = "https://" + site + API_REVOKE_TOKEN + url = "https://" + get_api_domain(site) + API_REVOKE_TOKEN post(url, data=payload, headers={"User-Agent": USER_AGENT}) diff --git a/mastodon/migrations/0002_add_api_domain_server_version.py b/mastodon/migrations/0002_add_api_domain_server_version.py new file mode 100644 index 00000000..14ee5ec7 --- /dev/null +++ b/mastodon/migrations/0002_add_api_domain_server_version.py @@ -0,0 +1,27 @@ +# Generated by Django 3.2.16 on 2023-02-14 06:04 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("mastodon", "0001_initial"), + ] + + operations = [ + migrations.AddField( + model_name="mastodonapplication", + name="api_domain", + field=models.CharField( + blank=True, max_length=100, verbose_name="domain for api call" + ), + ), + migrations.AddField( + model_name="mastodonapplication", + name="server_version", + field=models.CharField( + blank=True, max_length=100, verbose_name="type and verion" + ), + ), + ] diff --git a/mastodon/models.py b/mastodon/models.py index f6e3658c..2cb20ab8 100644 --- a/mastodon/models.py +++ b/mastodon/models.py @@ -5,6 +5,8 @@ from django.utils.translation import gettext_lazy as _ class MastodonApplication(models.Model): domain_name = models.CharField(_("site domain name"), max_length=100, unique=True) + api_domain = models.CharField(_("domain for api call"), max_length=100, blank=True) + server_version = models.CharField(_("type and verion"), max_length=100, blank=True) app_id = models.CharField(_("in-site app id"), max_length=100) client_id = models.CharField(_("client id"), max_length=100) client_secret = models.CharField(_("client secret"), max_length=100) diff --git a/users/account.py b/users/account.py index a53b5a3c..e6e22f33 100644 --- a/users/account.py +++ b/users/account.py @@ -68,8 +68,7 @@ def connect(request): login_domain = ( login_domain.strip().lower().split("//")[-1].split("/")[0].split("@")[-1] ) - domain, version = get_instance_info(login_domain) - app, error_msg = get_mastodon_application(domain) + app, error_msg = get_mastodon_application(login_domain) if app is None: return render( request, @@ -80,9 +79,9 @@ def connect(request): }, ) else: - login_url = get_mastodon_login_url(app, login_domain, version, request) + login_url = get_mastodon_login_url(app, login_domain, request) resp = redirect(login_url) - resp.set_cookie("mastodon_domain", domain) + resp.set_cookie("mastodon_domain", app.domain_name) return resp