from functools import cached_property import re from django.core import validators from django.core.exceptions import ValidationError from django.utils.deconstruct import deconstructible from django.db import models from django.db.models.functions import Lower from django.contrib.auth.models import AbstractUser from django.utils import timezone from django.core.serializers.json import DjangoJSONEncoder from django.utils.translation import gettext_lazy as _ from common.utils import GenerateDateUUIDMediaFilePath from django.conf import settings from management.models import Announcement from mastodon.api import * from django.urls import reverse from django.db.models import Q, F, Value from django.db.models.functions import Concat from django.templatetags.static import static import hashlib from loguru import logger RESERVED_USERNAMES = [ "connect", "oauth2_login", "__", "admin", "api", "me", ] @deconstructible class UsernameValidator(validators.RegexValidator): regex = r"^[a-zA-Z0-9_]{2,30}$" message = _( "Enter a valid username. This value may contain only unaccented lowercase a-z " "and uppercase A-Z letters, numbers, and _ characters." ) flags = re.ASCII def __call__(self, value): if value and value.lower() in RESERVED_USERNAMES: raise ValidationError(self.message, code=self.code) return super().__call__(value) def report_image_path(instance, filename): return GenerateDateUUIDMediaFilePath( instance, filename, settings.REPORT_MEDIA_PATH_ROOT ) class User(AbstractUser): username_validator = UsernameValidator() username = models.CharField( _("username"), max_length=100, unique=True, null=True, # allow null for newly registered users who has not set a user name help_text=_("Required. 50 characters or fewer. Letters, digits and _ only."), validators=[username_validator], error_messages={ "unique": _("A user with that username already exists."), }, ) email = models.EmailField( _("email address"), unique=True, default=None, null=True, ) pending_email = models.EmailField( _("email address pending verification"), default=None, null=True ) local_following = models.ManyToManyField( through="Follow", to="self", through_fields=("owner", "target"), symmetrical=False, related_name="local_followers", ) local_blocking = models.ManyToManyField( through="Block", to="self", through_fields=("owner", "target"), symmetrical=False, related_name="local_blocked_by", ) local_muting = models.ManyToManyField( through="Mute", to="self", through_fields=("owner", "target"), symmetrical=False, related_name="+", ) following = models.JSONField(default=list) muting = models.JSONField(default=list) # rejecting = local/external blocking + local/external blocked_by + domain_blocking + domain_blocked_by rejecting = models.JSONField(default=list) mastodon_id = models.CharField(max_length=100, default=None, null=True) mastodon_username = models.CharField(max_length=100, default=None, null=True) mastodon_site = models.CharField(max_length=100, default=None, null=True) mastodon_token = models.CharField(max_length=2048, default="") mastodon_refresh_token = models.CharField(max_length=2048, default="") mastodon_locked = models.BooleanField(default=False) mastodon_followers = models.JSONField(default=list) mastodon_following = models.JSONField(default=list) mastodon_mutes = models.JSONField(default=list) mastodon_blocks = models.JSONField(default=list) mastodon_domain_blocks = models.JSONField(default=list) mastodon_account = models.JSONField(default=dict) mastodon_last_refresh = models.DateTimeField(default=timezone.now) # store the latest read announcement id, # every time user read the announcement update this field read_announcement_index = models.PositiveIntegerField(default=0) class Meta: constraints = [ models.UniqueConstraint( Lower("username"), name="unique_username", ), models.UniqueConstraint( Lower("email"), name="unique_email", ), models.UniqueConstraint( Lower("mastodon_username"), Lower("mastodon_site"), name="unique_mastodon_username", ), models.UniqueConstraint( Lower("mastodon_id"), Lower("mastodon_site"), name="unique_mastodon_id", ), models.CheckConstraint( check=( Q(is_active=False) | Q(mastodon_username__isnull=False) | Q(email__isnull=False) ), name="at_least_one_login_method", ), ] @cached_property def mastodon_acct(self): return ( f"{self.mastodon_username}@{self.mastodon_site}" if self.mastodon_username else "" ) @property def locked(self): return self.mastodon_locked @property def display_name(self): return ( self.mastodon_account.get("display_name") if self.mastodon_account else (self.username or self.mastodon_acct or "") ) @property def avatar(self): if self.mastodon_account: return self.mastodon_account.get("avatar") or static("img/avatar.svg") if self.email: return ( "https://www.gravatar.com/avatar/" + hashlib.md5(self.email.lower().encode()).hexdigest() ) return static("img/avatar.svg") @property def handler(self): return self.mastodon_acct or self.username or f"~{self.pk}" @property def url(self): return reverse("journal:user_profile", args=[self.handler]) def __str__(self): return f'{self.pk}:{self.username or ""}:{self.mastodon_acct}' @property def ignoring(self): return self.muting + self.rejecting def follow(self, target: "User"): if ( target is None or target.locked or self.is_following(target) or self.is_blocking(target) or self.is_blocked_by(target) ): return False self.local_following.add(target) self.following.append(target.pk) self.save(update_fields=["following"]) return True def unfollow(self, target: "User"): if target and target in self.local_following.all(): self.local_following.remove(target) if ( target.pk in self.following and target.mastodon_acct not in self.mastodon_following ): self.following.remove(target.pk) self.save(update_fields=["following"]) return True return False def remove_follower(self, target: "User"): if target is None or self not in target.local_following.all(): return False target.local_following.remove(self) if ( self.pk in target.following and self.mastodon_acct not in target.mastodon_following ): target.following.remove(self.pk) target.save(update_fields=["following"]) return True def block(self, target: "User"): if target is None or target in self.local_blocking.all(): return False self.local_blocking.add(target) if target.pk in self.following: self.following.remove(target.pk) self.save(update_fields=["following"]) if self.pk in target.following: target.following.remove(self.pk) target.save(update_fields=["following"]) if target in self.local_following.all(): self.local_following.remove(target) if self in target.local_following.all(): target.local_following.remove(self) if target.pk not in self.rejecting: self.rejecting.append(target.pk) self.save(update_fields=["rejecting"]) if self.pk not in target.rejecting: target.rejecting.append(self.pk) target.save(update_fields=["rejecting"]) return True def unblock(self, target: "User"): if target and target in self.local_blocking.all(): self.local_blocking.remove(target) if not self.is_blocked_by(target): if target.pk in self.rejecting: self.rejecting.remove(target.pk) self.save(update_fields=["rejecting"]) if self.pk in target.rejecting: target.rejecting.remove(self.pk) target.save(update_fields=["rejecting"]) return True return False def mute(self, target: "User"): if ( target is None or target in self.local_muting.all() or target.mastodon_acct in self.mastodon_mutes ): return False self.local_muting.add(target) if target.pk not in self.muting: self.muting.append(target.pk) self.save() return True def unmute(self, target: "User"): if target and target in self.local_muting.all(): self.local_muting.remove(target) if target.pk in self.muting: self.muting.remove(target.pk) self.save() return True return False def get_preference(self): pref = Preference.objects.filter(user=self).first() # self.preference if not pref: pref = Preference.objects.create(user=self) return pref def clear(self): if self.mastodon_site == "removed" and not self.is_active: return self.first_name = self.mastodon_acct or "" self.last_name = self.email or "" self.is_active = False self.email = None # self.username = "~removed~" + str(self.pk) # to get ready for federation, username has to be reserved self.mastodon_username = None self.mastodon_id = None self.mastodon_site = "removed" self.mastodon_token = "" self.mastodon_locked = False self.mastodon_followers = [] self.mastodon_following = [] self.mastodon_mutes = [] self.mastodon_blocks = [] self.mastodon_domain_blocks = [] self.mastodon_account = {} def merge_relationships(self): self.muting = self.merged_muting_ids() self.rejecting = self.merged_rejecting_ids() # caculate following after rejecting is merged self.following = self.merged_following_ids() @classmethod def merge_rejected_by(cls): """ Caculate rejecting field to include blocked by for external users Should be invoked after invoking merge_relationships() for all users """ # FIXME this is quite inifficient, should only invoked in async task external_users = list( cls.objects.filter(mastodon_username__isnull=False, is_active=True) ) reject_changed = [] follow_changed = [] for u in external_users: for v in external_users: if v.pk in u.rejecting and u.pk not in v.rejecting: v.rejecting.append(u.pk) if v not in reject_changed: reject_changed.append(v) if u.pk in v.following: v.following.remove(u.pk) if v not in follow_changed: follow_changed.append(v) for u in reject_changed: u.save(update_fields=["rejecting"]) for u in follow_changed: u.save(update_fields=["following"]) return len(follow_changed) + len(reject_changed) def refresh_mastodon_data(self): """Try refresh account data from mastodon server, return true if refreshed successfully, note it will not save to db""" self.mastodon_last_refresh = timezone.now() code, mastodon_account = verify_account(self.mastodon_site, self.mastodon_token) if code == 401 and self.mastodon_refresh_token: self.mastodon_token = refresh_access_token( self.mastodon_site, self.mastodon_refresh_token ) if self.mastodon_token: code, mastodon_account = verify_account( self.mastodon_site, self.mastodon_token ) updated = False if mastodon_account: self.mastodon_account = mastodon_account self.mastodon_locked = mastodon_account["locked"] if self.mastodon_username != mastodon_account["username"]: logger.warning( f"username changed from {self} to {mastodon_account['username']}" ) self.mastodon_username = mastodon_account["username"] # self.mastodon_token = token # user.mastodon_id = mastodon_account['id'] self.mastodon_followers = get_related_acct_list( self.mastodon_site, self.mastodon_token, f"/api/v1/accounts/{self.mastodon_id}/followers", ) self.mastodon_following = get_related_acct_list( self.mastodon_site, self.mastodon_token, f"/api/v1/accounts/{self.mastodon_id}/following", ) self.mastodon_mutes = get_related_acct_list( self.mastodon_site, self.mastodon_token, "/api/v1/mutes" ) self.mastodon_blocks = get_related_acct_list( self.mastodon_site, self.mastodon_token, "/api/v1/blocks" ) self.mastodon_domain_blocks = get_related_acct_list( self.mastodon_site, self.mastodon_token, "/api/v1/domain_blocks" ) self.merge_relationships() updated = True elif code == 401: logger.error(f"401 {self}") self.mastodon_token = "" return updated def merged_following_ids(self): fl = [] for m in self.mastodon_following: target = User.get(m) if target and ( (not target.mastodon_locked) or self.mastodon_acct in target.mastodon_followers ): fl.append(target.pk) for user in self.local_following.all(): if user.pk not in fl and not user.locked and not user.is_blocking(self): fl.append(user.pk) fl = [x for x in fl if x not in self.rejecting] return sorted(fl) def merged_muting_ids(self): external_muting_user_ids = list( User.objects.all() .annotate(acct=Concat("mastodon_username", Value("@"), "mastodon_site")) .filter(acct__in=self.mastodon_mutes) .values_list("pk", flat=True) ) l = list( set( external_muting_user_ids + list(self.local_muting.all().values_list("pk", flat=True)) ) ) return sorted(l) def merged_rejecting_ids(self): domain_blocked_user_ids = list( User.objects.filter( mastodon_site__in=self.mastodon_domain_blocks ).values_list("pk", flat=True) ) external_blocking_user_ids = list( User.objects.all() .annotate(acct=Concat("mastodon_username", Value("@"), "mastodon_site")) .filter(acct__in=self.mastodon_blocks) .values_list("pk", flat=True) ) l = list( set( domain_blocked_user_ids + external_blocking_user_ids + list(self.local_blocking.all().values_list("pk", flat=True)) + list(self.local_blocked_by.all().values_list("pk", flat=True)) # type: ignore + list(self.local_muting.all().values_list("pk", flat=True)) ) ) return sorted(l) def is_blocking(self, target): return ( ( target in self.local_blocking.all() or target.mastodon_acct in self.mastodon_blocks or target.mastodon_site in self.mastodon_domain_blocks ) if target.is_authenticated else self.preference.no_anonymous_view # type: ignore ) def is_blocked_by(self, target): return target.is_authenticated and target.is_blocking(self) def is_muting(self, target): return target.pk in self.muting or target.mastodon_acct in self.mastodon_mutes def is_following(self, target): return ( self.mastodon_acct in target.mastodon_followers if target.locked else target.pk in self.following # or target.mastodon_acct in self.mastodon_following # or self.mastodon_acct in target.mastodon_followers ) def is_followed_by(self, target): return target.is_following(self) def get_mark_for_item(self, item): params = {item.__class__.__name__.lower() + "_id": item.id, "owner": self} mark = item.mark_class.objects.filter(**params).first() return mark def get_max_visibility(self, viewer): if not viewer.is_authenticated: return 0 elif viewer == self: return 2 elif viewer.is_blocked_by(self): return -1 elif viewer.is_following(self): return 1 else: return 0 @property def unread_announcements(self): unread_announcements = Announcement.objects.filter( pk__gt=self.read_announcement_index ).order_by("-pk") return unread_announcements @classmethod def get(cls, name, case_sensitive=False): if isinstance(name, str): sp = name.split("@") if name.startswith("~"): try: query_kwargs = {"pk": int(name[1:])} except: return None elif len(sp) == 1: query_kwargs = { "username__iexact" if case_sensitive else "username": name } elif len(sp) == 2: query_kwargs = { "mastodon_username__iexact" if case_sensitive else "mastodon_username": sp[0], "mastodon_site__iexact" if case_sensitive else "mastodon_site": sp[1], } else: return None elif isinstance(name, int): query_kwargs = {"pk": name} else: return None return User.objects.filter(**query_kwargs).first() class Preference(models.Model): user = models.OneToOneField(User, models.CASCADE, primary_key=True) profile_layout = models.JSONField( blank=True, default=list, ) discover_layout = models.JSONField( blank=True, default=list, ) export_status = models.JSONField( blank=True, null=True, encoder=DjangoJSONEncoder, default=dict ) import_status = models.JSONField( blank=True, null=True, encoder=DjangoJSONEncoder, default=dict ) default_no_share = models.BooleanField(default=False) default_visibility = models.PositiveSmallIntegerField(default=0) classic_homepage = models.PositiveSmallIntegerField(null=False, default=0) mastodon_publish_public = models.BooleanField(null=False, default=False) mastodon_append_tag = models.CharField(max_length=2048, default="") show_last_edit = models.PositiveSmallIntegerField(default=0) no_anonymous_view = models.PositiveSmallIntegerField(default=0) def __str__(self): return str(self.user) class Follow(models.Model): owner = models.ForeignKey(User, on_delete=models.CASCADE, related_name="+") target = models.ForeignKey(User, on_delete=models.CASCADE, related_name="+") created_time = models.DateTimeField(auto_now_add=True) edited_time = models.DateTimeField(auto_now=True) class Block(models.Model): owner = models.ForeignKey(User, on_delete=models.CASCADE, related_name="+") target = models.ForeignKey(User, on_delete=models.CASCADE, related_name="+") created_time = models.DateTimeField(auto_now_add=True) edited_time = models.DateTimeField(auto_now=True) class Mute(models.Model): owner = models.ForeignKey(User, on_delete=models.CASCADE, related_name="+") target = models.ForeignKey(User, on_delete=models.CASCADE, related_name="+") created_time = models.DateTimeField(auto_now_add=True) edited_time = models.DateTimeField(auto_now=True) class Report(models.Model): submit_user = models.ForeignKey( User, on_delete=models.SET_NULL, related_name="sumbitted_reports", null=True ) reported_user = models.ForeignKey( User, on_delete=models.SET_NULL, related_name="accused_reports", null=True ) image = models.ImageField( upload_to=report_image_path, blank=True, default="", ) is_read = models.BooleanField(default=False) submitted_time = models.DateTimeField(auto_now_add=True) message = models.CharField(max_length=1000)