lib.itmens/mastodon/auth.py

146 lines
6 KiB
Python
Raw Normal View History

2020-05-05 23:50:48 +08:00
from django.contrib.auth.backends import ModelBackend, UserModel
from django.shortcuts import reverse
from .api import *
2020-10-22 21:45:05 +02:00
from .models import MastodonApplication
from django.conf import settings
2022-04-01 03:07:56 -04:00
from urllib.parse import quote
def get_mastodon_application(domain):
app = MastodonApplication.objects.filter(domain_name=domain).first()
if app is not None:
return app, ''
if domain == TWITTER_DOMAIN:
return None, 'Twitter未配置'
error_msg = None
try:
response = create_app(domain)
except (requests.exceptions.Timeout, ConnectionError):
error_msg = _("联邦网络请求超时。")
except Exception as e:
error_msg = str(e)
else:
# fill the form with returned data
if response.status_code != 200:
error_msg = "实例连接错误,代码: " + str(response.status_code)
print(f'Error connecting {domain}: {response.status_code} {response.content.decode("utf-8")}')
else:
try:
data = response.json()
except Exception as e:
error_msg = "实例返回内容无法识别"
print(f'Error connecting {domain}: {response.status_code} {response.content.decode("utf-8")} {e}')
else:
app = MastodonApplication.objects.create(domain_name=domain, app_id=data['id'], client_id=data['client_id'],
client_secret=data['client_secret'], vapid_key=data['vapid_key'] if 'vapid_key' in data else '')
return app, error_msg
2022-04-01 03:34:34 -04:00
def get_mastodon_login_url(app, login_domain, version, request):
2022-04-01 03:07:56 -04:00
url = request.scheme + "://" + request.get_host() + reverse('users:OAuth2_login')
if login_domain == TWITTER_DOMAIN:
return f"https://twitter.com/i/oauth2/authorize?response_type=code&client_id={app.client_id}&redirect_uri={quote(url)}&scope={quote(settings.TWITTER_CLIENT_SCOPE)}&state=state&code_challenge=challenge&code_challenge_method=plain"
2022-04-01 03:34:34 -04:00
scope = 'read' if 'Pixelfed' in version else settings.MASTODON_CLIENT_SCOPE
return "https://" + login_domain + "/oauth/authorize?client_id=" + app.client_id + "&scope=" + quote(scope) + "&redirect_uri=" + url + "&response_type=code"
2020-05-05 23:50:48 +08:00
2020-10-22 21:45:05 +02:00
def obtain_token(site, request, code):
2020-05-05 23:50:48 +08:00
""" Returns token if success else None. """
2020-10-22 21:45:05 +02:00
mast_app = MastodonApplication.objects.get(domain_name=site)
2022-04-01 03:07:56 -04:00
redirect_uri = request.scheme + "://" + request.get_host() + reverse('users:OAuth2_login')
2020-05-05 23:50:48 +08:00
payload = {
2020-10-22 21:45:05 +02:00
'client_id': mast_app.client_id,
'client_secret': mast_app.client_secret,
2022-04-01 03:07:56 -04:00
'redirect_uri': redirect_uri,
2020-05-05 23:50:48 +08:00
'grant_type': 'authorization_code',
'code': code,
2022-04-01 03:07:56 -04:00
'code_verifier': 'challenge'
2020-05-05 23:50:48 +08:00
}
2022-04-01 03:07:56 -04:00
headers = {'User-Agent': 'NeoDB/1.0'}
auth = None
2020-10-22 21:45:05 +02:00
if mast_app.is_proxy:
url = 'https://' + mast_app.proxy_to + API_OBTAIN_TOKEN
2022-04-01 03:07:56 -04:00
elif site == TWITTER_DOMAIN:
url = 'https://api.twitter.com/2/oauth2/token'
auth = (mast_app.client_id, mast_app.client_secret)
del payload['client_secret']
2020-10-22 21:45:05 +02:00
else:
url = 'https://' + mast_app.domain_name + API_OBTAIN_TOKEN
2022-04-01 03:07:56 -04:00
response = post(url, data=payload, headers=headers, auth=auth)
# {"token_type":"bearer","expires_in":7200,"access_token":"VGpkOEZGR3FQRDJ5NkZ0dmYyYWIwS0dqeHpvTnk4eXp0NV9nWDJ2TEpmM1ZTOjE2NDg3ODMxNTU4Mzc6MToxOmF0OjE","scope":"block.read follows.read offline.access tweet.write users.read mute.read","refresh_token":"b1pXbGEzeUF1WE5yZHJOWmxTeWpvMTBrQmZPd0czLU0tQndZQTUyU3FwRDVIOjE2NDg3ODMxNTU4Mzg6MToxOnJ0OjE"}
2020-05-05 23:50:48 +08:00
if response.status_code != 200:
2022-04-01 03:07:56 -04:00
print(url)
print(response.status_code)
print(response.text)
return None, None
2020-05-05 23:50:48 +08:00
data = response.json()
2022-04-01 03:07:56 -04:00
return data.get('access_token'), data.get('refresh_token', '')
2020-05-05 23:50:48 +08:00
2022-04-01 03:07:56 -04:00
def refresh_access_token(site, refresh_token):
if site != TWITTER_DOMAIN:
return None
mast_app = MastodonApplication.objects.get(domain_name=site)
url = 'https://api.twitter.com/2/oauth2/token'
payload = {
'client_id': mast_app.client_id,
'refresh_token': refresh_token,
'grant_type': 'refresh_token',
2020-05-05 23:50:48 +08:00
}
2022-04-01 03:07:56 -04:00
headers = {'User-Agent': 'NeoDB/1.0'}
auth = (mast_app.client_id, mast_app.client_secret)
response = post(url, data=payload, headers=headers, auth=auth)
2020-05-05 23:50:48 +08:00
if response.status_code != 200:
2022-04-01 03:07:56 -04:00
print(url)
print(response.status_code)
print(response.text)
2020-05-05 23:50:48 +08:00
return None
2022-04-01 03:07:56 -04:00
data = response.json()
return data.get('access_token')
2020-05-05 23:50:48 +08:00
2020-10-22 21:45:05 +02:00
def revoke_token(site, token):
mast_app = MastodonApplication.objects.get(domain_name=site)
2020-05-05 23:50:48 +08:00
payload = {
2020-10-22 21:45:05 +02:00
'client_id': mast_app.client_id,
'client_secret': mast_app.client_secret,
2022-04-01 03:07:56 -04:00
'token': token
2020-05-05 23:50:48 +08:00
}
2020-10-22 21:45:05 +02:00
if mast_app.is_proxy:
url = 'https://' + mast_app.proxy_to + API_REVOKE_TOKEN
else:
url = 'https://' + site + API_REVOKE_TOKEN
2022-04-01 03:07:56 -04:00
post(url, data=payload, headers={'User-Agent': 'NeoDB/1.0'})
2020-05-05 23:50:48 +08:00
class OAuth2Backend(ModelBackend):
""" Used to glue OAuth2 and Django User model """
# "authenticate() should check the credentials it gets and returns
# a user object that matches those credentials."
# arg request is an interface specification, not used in this implementation
2022-04-01 03:07:56 -04:00
def authenticate(self, request, token=None, username=None, site=None, **kwargs):
2020-05-05 23:50:48 +08:00
""" when username is provided, assume that token is newly obtained and valid """
2020-10-22 21:45:05 +02:00
if token is None or site is None:
2020-05-05 23:50:48 +08:00
return
if username is None:
2022-04-01 03:07:56 -04:00
code, user_data = verify_account(site, token)
if code == 200:
userid = user_data['id']
2020-05-05 23:50:48 +08:00
else:
# aquiring user data fail means token is invalid thus auth fail
return None
# when username is provided, assume that token is newly obtained and valid
try:
2022-04-01 03:07:56 -04:00
user = UserModel._default_manager.get(mastodon_id=userid, mastodon_site=site)
2020-05-05 23:50:48 +08:00
except UserModel.DoesNotExist:
return None
else:
if self.user_can_authenticate(user):
return user
return None