lib.itmens/users/account.py
2024-07-01 18:05:19 -04:00

509 lines
17 KiB
Python

from datetime import timedelta
from urllib.parse import quote
import django_rq
from django import forms
from django.conf import settings
from django.contrib import auth, messages
from django.contrib.auth import authenticate
from django.contrib.auth.decorators import login_required
from django.core.exceptions import BadRequest, ObjectDoesNotExist
from django.core.validators import EmailValidator
from django.db import transaction
from django.shortcuts import redirect, render
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext as _
from django.views.decorators.http import require_http_methods
from loguru import logger
from common.config import *
from common.utils import AuthedHttpRequest
from journal.models import remove_data_by_user
from mastodon.models import Email, Mastodon
from mastodon.models.common import Platform, SocialAccount
from mastodon.models.email import EmailAccount
from takahe.utils import Takahe
from .models import User
from .tasks import *
@require_http_methods(["GET"])
def login(request):
selected_domain = request.GET.get("domain", default="")
sites = Mastodon.get_sites()
if request.GET.get("next"):
request.session["next_url"] = request.GET.get("next")
invite_status = -1 if settings.INVITE_ONLY else 0
if settings.INVITE_ONLY and request.GET.get("invite"):
if Takahe.verify_invite(request.GET.get("invite")):
invite_status = 1
request.session["invite"] = request.GET.get("invite")
else:
invite_status = -2
return render(
request,
"users/login.html",
{
"sites": sites,
"scope": quote(settings.MASTODON_CLIENT_SCOPE),
"selected_domain": selected_domain,
"allow_any_site": settings.MASTODON_ALLOW_ANY_SITE,
"enable_email": settings.ENABLE_LOGIN_EMAIL,
"enable_threads": settings.ENABLE_LOGIN_THREADS,
"enable_bluesky": settings.ENABLE_LOGIN_BLUESKY,
"invite_status": invite_status,
},
)
# connect will send verification email or redirect to mastodon server
@require_http_methods(["GET", "POST"])
def connect(request):
if request.method == "POST" and request.POST.get("method") == "email":
login_email = request.POST.get("email", "")
try:
EmailValidator()(login_email)
except Exception:
return render(
request,
"common/error.html",
{"msg": _("Invalid email address")},
)
Email.send_login_email(request, login_email, "login")
return render(
request,
"users/verify.html",
{
"msg": _("Verification"),
"secondary_msg": _(
"Verification email is being sent, please check your inbox."
),
"action": "login",
},
)
login_domain = (
request.session["swap_domain"]
if request.session.get("swap_login")
else (request.POST.get("domain") or request.GET.get("domain"))
)
if not login_domain:
return render(
request,
"common/error.html",
{
"msg": _("Missing instance domain"),
"secondary_msg": "",
},
)
login_domain = (
login_domain.strip().lower().split("//")[-1].split("/")[0].split("@")[-1]
)
try:
login_url = Mastodon.generate_auth_url(login_domain, request)
return redirect(login_url)
except Exception as e:
return render(
request,
"common/error.html",
{
"msg": _("Error connecting to instance"),
"secondary_msg": f"{login_domain} {e}",
},
)
# mastodon server redirect back to here
@require_http_methods(["GET"])
def connect_redirect_back(request):
code = request.GET.get("code")
if not code:
return render(
request,
"common/error.html",
{
"msg": _("Authentication failed"),
"secondary_msg": _("Invalid response from Fediverse instance."),
},
)
site = request.session.get("mastodon_domain")
if not site:
return render(
request,
"common/error.html",
{
"msg": _("Authentication failed"),
"secondary_msg": _("Invalid cookie data."),
},
)
try:
token, refresh_token = Mastodon.obtain_token(site, code, request)
except ObjectDoesNotExist:
raise BadRequest(_("Invalid instance domain"))
if not token:
return render(
request,
"common/error.html",
{
"msg": _("Authentication failed"),
"secondary_msg": _("Invalid token from Fediverse instance."),
},
)
if request.session.get("swap_login", False) and request.user.is_authenticated:
# swap login for existing user
return swap_login(request, token, site, refresh_token)
account = Mastodon.authenticate(site, token, refresh_token)
if not account:
return render(
request,
"common/error.html",
{
"msg": _("Authentication failed"),
"secondary_msg": _("Invalid account data from Fediverse instance."),
},
)
if account.user: # existing user
user: User | None = authenticate(request, social_account=account) # type: ignore
if not user:
return render(
request,
"common/error.html",
{
"msg": _("Authentication failed"),
"secondary_msg": _("Invalid user."),
},
)
return login_existing_user(request, user)
elif not settings.MASTODON_ALLOW_ANY_SITE: # directly create a new user
new_user = User.register(
account=account,
username=account.username,
)
auth_login(request, new_user)
return render(request, "users/welcome.html")
else: # check invite and ask for username
return register_new_user(request, account)
def register_new_user(request, account: SocialAccount):
if settings.INVITE_ONLY:
if not Takahe.verify_invite(request.session.get("invite")):
return render(
request,
"common/error.html",
{
"msg": _("Authentication failed"),
"secondary_msg": _("Registration is for invitation only"),
},
)
del request.session["invite"]
if request.user.is_authenticated:
auth.logout(request)
request.session["verified_account"] = account.to_dict()
if account.platform == Platform.EMAIL:
email_readyonly = True
data = {"email": account.handle}
else:
email_readyonly = False
data = {"email": ""}
form = RegistrationForm(data)
return render(
request,
"users/register.html",
{"form": form, "email_readyonly": email_readyonly},
)
def login_existing_user(request, existing_user):
auth_login(request, existing_user)
if not existing_user.username or not existing_user.identity:
response = redirect(reverse("users:register"))
elif request.session.get("next_url") is not None:
response = redirect(request.session.get("next_url"))
del request.session["next_url"]
else:
response = redirect(reverse("common:home"))
response.delete_cookie(settings.TAKAHE_SESSION_COOKIE_NAME)
return response
@login_required
def logout(request):
return auth_logout(request)
@login_required
@require_http_methods(["POST"])
def reconnect(request):
if request.META.get("HTTP_AUTHORIZATION"):
raise BadRequest("Only for web login")
request.session["swap_login"] = True
request.session["swap_domain"] = request.POST["domain"]
return connect(request)
class RegistrationForm(forms.ModelForm):
email = forms.EmailField(required=False)
class Meta:
model = User
fields = ["username"]
def clean_username(self):
username = self.cleaned_data.get("username")
if username and self.instance and self.instance.username:
username = self.instance.username
elif (
username
and User.objects.filter(username__iexact=username)
.exclude(pk=self.instance.pk if self.instance else -1)
.exists()
):
raise forms.ValidationError(_("This username is already in use."))
return username
def clean_email(self):
email = self.cleaned_data.get("email", "").strip()
if (
email
and EmailAccount.objects.filter(handle__iexact=email)
.exclude(user_id=self.instance.pk if self.instance else -1)
.exists()
):
raise forms.ValidationError(_("This email address is already in use."))
return email
@require_http_methods(["GET", "POST"])
def verify_code(request):
if request.method == "GET":
return render(request, "users/verify.html")
code = request.POST.get("code", "").strip()
if not code:
return render(
request,
"users/verify.html",
{
"error": _("Invalid verification code"),
},
)
account = Email.authenticate(request, code)
if not account:
return render(
request,
"users/verify.html",
{
"error": _("Invalid verification code"),
},
)
if request.user.is_authenticated:
# existing logged in user to verify a pending email
if request.user.email_account == account:
# same email, nothing to do
return render(request, "users/welcome.html")
if account.user and account.user != request.user:
# email used by another user
return render(
request,
"common/error.html",
{
"msg": _("Authentication failed"),
"secondary_msg": _("Email already in use"),
},
)
with transaction.atomic():
if request.user.email_account:
request.user.email_account.delete()
account.user = request.user
account.save()
if request.session.get("new_user", 0):
try:
del request.session["new_user"]
except KeyError:
pass
return render(request, "users/welcome.html")
else:
return redirect(reverse("users:info"))
if account.user:
# existing user: log back in
user = authenticate(request, social_account=account)
if user:
return login_existing_user(request, user)
else:
return render(
request,
"common/error.html",
{
"msg": _("Authentication failed"),
"secondary_msg": _("Invalid user."),
},
)
# new user: check invite and ask for username
return register_new_user(request, account)
@require_http_methods(["GET", "POST"])
def register(request: AuthedHttpRequest):
if not settings.MASTODON_ALLOW_ANY_SITE:
return render(request, "users/welcome.html")
form = RegistrationForm(
request.POST,
instance=(
User.objects.get(pk=request.user.pk)
if request.user.is_authenticated
else None
),
)
verified_account = SocialAccount.from_dict(request.session.get("verified_account"))
email_readonly = (
verified_account is not None and verified_account.platform == Platform.EMAIL
)
error = None
if request.method == "POST" and form.is_valid():
if request.user.is_authenticated:
# logged in user to change email
current_email = (
request.user.email_account.handle
if request.user.email_account
else None
)
if (
form.cleaned_data["email"]
and form.cleaned_data["email"] != current_email
):
Email.send_login_email(request, form.cleaned_data["email"], "verify")
return render(request, "users/verify.html")
else:
# new user finishes login process
if not form.cleaned_data["username"]:
error = _("Valid username required")
elif User.objects.filter(
username__iexact=form.cleaned_data["username"]
).exists():
error = _("Username in use")
else:
# create new user
new_user = User.register(
username=form.cleaned_data["username"], account=verified_account
)
auth_login(request, new_user)
if not email_readonly and form.cleaned_data["email"]:
# verify email if presented
Email.send_login_email(
request, form.cleaned_data["email"], "verify"
)
request.session["new_user"] = 1
return render(request, "users/verify.html")
return render(request, "users/welcome.html")
return render(
request,
"users/register.html",
{"form": form, "email_readonly": email_readonly, "error": error},
)
def swap_login(request, token, site, refresh_token):
del request.session["swap_login"]
del request.session["swap_domain"]
account = Mastodon.authenticate(site, token, refresh_token)
current_user = request.user
if account:
if account.user == current_user:
messages.add_message(
request,
messages.ERROR,
_("Unable to update login information: identical identity."),
)
elif account.user:
messages.add_message(
request,
messages.ERROR,
_("Unable to update login information: identity in use."),
)
else:
with transaction.atomic():
if current_user.mastodon:
current_user.mastodon.delete()
account.user = current_user
account.save()
current_user.mastodon_username = account.username
current_user.mastodon_id = account.account_data["id"]
current_user.mastodon_site = account.domain
current_user.mastodon_token = account.access_token
current_user.mastodon_refresh_token = account.refresh_token
current_user.mastodon_account = account.account_data
current_user.save(
update_fields=[
"username",
"mastodon_id",
"mastodon_username",
"mastodon_site",
"mastodon_token",
"mastodon_refresh_token",
"mastodon_account",
]
)
django_rq.get_queue("mastodon").enqueue(
refresh_mastodon_data_task, current_user.pk, token
)
messages.add_message(
request,
messages.INFO,
_("Login information updated.") + account.handle,
)
else:
messages.add_message(
request, messages.ERROR, _("Invalid account data from Fediverse instance.")
)
return redirect(reverse("users:info"))
def clear_preference_cache(request):
for key in list(request.session.keys()):
if key.startswith("p_"):
del request.session[key]
def auth_login(request, user):
auth.login(request, user, backend="mastodon.auth.OAuth2Backend")
request.session.pop("verified_account", None)
clear_preference_cache(request)
if (
user.mastodon_last_refresh < timezone.now() - timedelta(hours=1)
or user.mastodon_account == {}
):
django_rq.get_queue("mastodon").enqueue(refresh_mastodon_data_task, user.pk)
def auth_logout(request):
auth.logout(request)
response = redirect("/")
response.delete_cookie(settings.TAKAHE_SESSION_COOKIE_NAME)
return response
def clear_data_task(user_id):
user = User.objects.get(pk=user_id)
user_str = str(user)
if user.identity:
remove_data_by_user(user.identity)
Takahe.delete_identity(user.identity.pk)
user.clear()
logger.warning(f"User {user_str} data cleared.")
@login_required
def clear_data(request):
if request.META.get("HTTP_AUTHORIZATION"):
raise BadRequest("Only for web login")
if request.method == "POST":
v = request.POST.get("verification")
if v and (v == request.user.mastodon_acct or v == request.user.email):
django_rq.get_queue("mastodon").enqueue(clear_data_task, request.user.id)
return auth_logout(request)
else:
messages.add_message(request, messages.ERROR, _("Account mismatch."))
return redirect(reverse("users:data"))