lib.itmens/takahe/models.py

2236 lines
71 KiB
Python
Raw Normal View History

2023-07-20 21:59:49 -04:00
import datetime
2023-08-22 17:13:52 +00:00
import os
2023-09-03 20:11:46 +00:00
import random
2023-07-20 21:59:49 -04:00
import re
import secrets
import ssl
import time
2024-06-04 16:51:51 -04:00
from datetime import date, timedelta
2023-07-20 21:59:49 -04:00
from functools import cached_property, partial
from typing import TYPE_CHECKING, Literal, Optional
from urllib.parse import urlparse
import httpx
import urlman
from cachetools import TTLCache, cached
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from django.conf import settings
from django.contrib.auth.models import AbstractBaseUser, BaseUserManager
2023-08-22 17:13:52 +00:00
from django.core.files.storage import FileSystemStorage
2023-07-20 21:59:49 -04:00
from django.db import models, transaction
from django.template.defaultfilters import linebreaks_filter
from django.utils import timezone
from django.utils.safestring import mark_safe
2023-08-22 17:13:52 +00:00
from django.utils.translation import gettext_lazy as _
2023-07-20 21:59:49 -04:00
from loguru import logger
from lxml import etree
2023-08-15 15:46:11 -04:00
from .html import ContentRenderer, FediverseHtmlParser
2023-07-20 21:59:49 -04:00
from .uris import *
if TYPE_CHECKING:
2023-12-05 23:14:29 -05:00
from django_stubs_ext.db.models.manager import RelatedManager
2023-07-20 21:59:49 -04:00
2023-11-27 23:02:58 -05:00
2024-05-23 11:27:03 -04:00
# class TakaheSession(models.Model):
# session_key = models.CharField(_("session key"), max_length=40, primary_key=True)
# session_data = models.TextField(_("session data"))
# expire_date = models.DateTimeField(_("expire date"), db_index=True)
2023-11-12 20:24:17 -05:00
2024-05-23 11:27:03 -04:00
# class Meta:
# db_table = "django_session"
2023-11-12 20:24:17 -05:00
2023-07-20 21:59:49 -04:00
class Snowflake:
"""
Snowflake ID generator and parser.
"""
# Epoch is 2022/1/1 at midnight, as these are used for _created_ times in our
# own database, not original publish times (which would need an earlier one)
EPOCH = 1641020400
TYPE_POST = 0b000
TYPE_POST_INTERACTION = 0b001
TYPE_IDENTITY = 0b010
TYPE_REPORT = 0b011
TYPE_FOLLOW = 0b100
@classmethod
def generate(cls, type_id: int) -> int:
"""
Generates a snowflake-style ID for the given "type". They are designed
to fit inside 63 bits (a signed bigint)
ID layout is:
* 41 bits of millisecond-level timestamp (enough for EPOCH + 69 years)
* 19 bits of random data (1% chance of clash at 10000 per millisecond)
* 3 bits of type information
We use random data rather than a sequence ID to try and avoid pushing
this job onto the DB - we may do that in future. If a clash does
occur, the insert will fail and Stator will retry the work for anything
that's coming in remotely, leaving us to just handle that scenario for
our own posts, likes, etc.
"""
# Get the current time in milliseconds
now: int = int((time.time() - cls.EPOCH) * 1000)
# Generate random data
rand_seq: int = secrets.randbits(19)
# Compose them together
return (now << 22) | (rand_seq << 3) | type_id
@classmethod
def generate_post_at(cls, t: float) -> int:
"""
Generates a snowflake-style ID for post at given time
post time before EPOCH (2022) will be mixed in with Jan 2022
"""
if t > cls.EPOCH:
now: int = int((t - cls.EPOCH) * 1000)
else:
2024-05-26 22:57:49 -04:00
now = int(t) if t > 0 else 0
# Generate random data
rand_seq: int = secrets.randbits(19)
# Compose them together
return (now << 22) | (rand_seq << 3) | cls.TYPE_POST
2023-07-20 21:59:49 -04:00
@classmethod
def get_type(cls, snowflake: int) -> int:
"""
Returns the type of a given snowflake ID
"""
if snowflake < (1 << 22):
raise ValueError("Not a valid Snowflake ID")
return snowflake & 0b111
@classmethod
def get_time(cls, snowflake: int) -> float:
"""
Returns the generation time (in UNIX timestamp seconds) of the ID
"""
if snowflake < (1 << 22):
raise ValueError("Not a valid Snowflake ID")
return ((snowflake >> 22) / 1000) + cls.EPOCH
# Handy pre-baked methods for django model defaults
@classmethod
def generate_post(cls) -> int:
return cls.generate(cls.TYPE_POST)
@classmethod
def generate_post_interaction(cls) -> int:
return cls.generate(cls.TYPE_POST_INTERACTION)
@classmethod
def generate_identity(cls) -> int:
return cls.generate(cls.TYPE_IDENTITY)
@classmethod
def generate_report(cls) -> int:
return cls.generate(cls.TYPE_REPORT)
@classmethod
def generate_follow(cls) -> int:
return cls.generate(cls.TYPE_FOLLOW)
class RsaKeys:
@classmethod
def generate_keypair(cls) -> tuple[str, str]:
"""
Generates a new RSA keypair
"""
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
private_key_serialized = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode("ascii")
public_key_serialized = (
private_key.public_key()
.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
.decode("ascii")
)
return private_key_serialized, public_key_serialized
2023-09-03 20:11:46 +00:00
class Invite(models.Model):
"""
An invite token, good for one signup.
"""
class Meta:
# managed = False
db_table = "users_invite"
# Should always be lowercase
token = models.CharField(max_length=500, unique=True)
# Admin note about this code
note = models.TextField(null=True, blank=True)
# Uses remaining (null means "infinite")
uses = models.IntegerField(null=True, blank=True)
# Expiry date
expires = models.DateTimeField(null=True, blank=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
@classmethod
def create_random(cls, uses=None, expires=None, note=None):
return cls.objects.create(
token="".join(
random.choice("abcdefghkmnpqrstuvwxyz23456789") for i in range(20)
),
uses=uses,
expires=expires,
note=note,
)
@property
def valid(self):
if self.uses is not None:
if self.uses <= 0:
return False
if self.expires is not None:
return self.expires >= timezone.now()
return True
2023-07-20 21:59:49 -04:00
class User(AbstractBaseUser):
2023-12-05 23:14:29 -05:00
if TYPE_CHECKING:
identities: RelatedManager["Identity"]
2023-07-20 21:59:49 -04:00
class Meta:
# managed = False
db_table = "users_user"
email = models.EmailField(unique=True)
admin = models.BooleanField(default=False)
moderator = models.BooleanField(default=False)
banned = models.BooleanField(default=False)
deleted = models.BooleanField(default=False)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
last_seen = models.DateTimeField(auto_now_add=True)
USERNAME_FIELD = "email"
EMAIL_FIELD = "email"
REQUIRED_FIELDS: list[str] = []
@property
def is_active(self):
return not (self.deleted or self.banned)
@property
def is_superuser(self):
return self.admin
@property
def is_staff(self):
return self.admin
def has_module_perms(self, module):
return self.admin
def has_perm(self, perm):
return self.admin
# @cached_property
# def config_user(self) -> Config.UserOptions:
# return Config.load_user(self)
class Domain(models.Model):
"""
Represents a domain that a user can have an account on.
For protocol reasons, if we want to allow custom usernames
per domain, each "display" domain (the one in the handle) must either let
us serve on it directly, or have a "service" domain that maps
to it uniquely that we can serve on that.
That way, someone coming in with just an Actor URI as their
entrypoint can still try to webfinger preferredUsername@actorDomain
and we can return an appropriate response.
It's possible to just have one domain do both jobs, of course.
This model also represents _other_ servers' domains, which we treat as
display domains for now, until we start doing better probing.
"""
domain = models.CharField(max_length=250, primary_key=True)
service_domain = models.CharField(
max_length=250,
null=True,
blank=True,
db_index=True,
unique=True,
)
# state = StateField(DomainStates)
state = models.CharField(max_length=100, default="outdated")
state_changed = models.DateTimeField(auto_now_add=True)
# nodeinfo 2.0 detail about the remote server
nodeinfo = models.JSONField(null=True, blank=True)
# If we own this domain
local = models.BooleanField()
# If we have blocked this domain from interacting with us
blocked = models.BooleanField(default=False)
# Domains can be joinable by any user of the instance (as the default one
# should)
public = models.BooleanField(default=False)
# If this is the default domain (shown as the default entry for new users)
default = models.BooleanField(default=False)
# Domains can also be linked to one or more users for their private use
# This should be display domains ONLY
users = models.ManyToManyField("takahe.User", related_name="domains", blank=True)
# Free-form notes field for admins
notes = models.TextField(blank=True, null=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
class urls(urlman.Urls):
root = "/admin/domains/"
create = "/admin/domains/create/"
edit = "/admin/domains/{self.domain}/"
delete = "{edit}delete/"
root_federation = "/admin/federation/"
edit_federation = "/admin/federation/{self.domain}/"
class Meta:
# managed = False
db_table = "users_domain"
indexes: list = []
@classmethod
def get_remote_domain(cls, domain: str) -> "Domain":
return cls.objects.get_or_create(domain=domain.lower(), local=False)[0]
@classmethod
def get_domain(cls, domain: str) -> Optional["Domain"]:
try:
return cls.objects.get(
models.Q(domain=domain.lower())
| models.Q(service_domain=domain.lower())
)
except cls.DoesNotExist:
return None
@property
def uri_domain(self) -> str:
if self.service_domain:
return self.service_domain
return self.domain
@classmethod
def available_for_user(cls, user):
"""
Returns domains that are available for the user to put an identity on
"""
return cls.objects.filter(
models.Q(public=True) | models.Q(users__id=user.id),
local=True,
).order_by("-default", "domain")
def __str__(self):
return self.domain
2023-08-22 17:13:52 +00:00
def upload_store():
return FileSystemStorage(
location=settings.TAKAHE_MEDIA_ROOT, base_url=settings.TAKAHE_MEDIA_URL
)
def upload_namer(prefix, instance, filename):
"""
Names uploaded images.
By default, obscures the original name with a random UUID.
"""
_, old_extension = os.path.splitext(filename)
new_filename = secrets.token_urlsafe(20)
now = timezone.now()
return f"{prefix}/{now.year}/{now.month}/{now.day}/{new_filename}{old_extension}"
2023-07-20 21:59:49 -04:00
class Identity(models.Model):
"""
Represents both local and remote Fediverse identities (actors)
"""
2024-06-04 16:51:51 -04:00
if TYPE_CHECKING:
domain_id: str
inbound_follows: "models.QuerySet[Follow]"
hashtag_features: "models.QuerySet[HashtagFeature]"
2023-07-20 21:59:49 -04:00
class Restriction(models.IntegerChoices):
none = 0
limited = 1
blocked = 2
ACTOR_TYPES = ["person", "service", "application", "group", "organization"]
id = models.BigIntegerField(primary_key=True, default=Snowflake.generate_identity)
# The Actor URI is essentially also a PK - we keep the default numeric
# one around as well for making nice URLs etc.
actor_uri = models.CharField(max_length=500, unique=True)
# state = StateField(IdentityStates)
state = models.CharField(max_length=100, default="outdated")
state_changed = models.DateTimeField(auto_now_add=True)
2023-08-22 17:13:52 +00:00
state_next_attempt = models.DateTimeField(blank=True, null=True)
state_locked_until = models.DateTimeField(null=True, blank=True, db_index=True)
2023-07-20 21:59:49 -04:00
local = models.BooleanField(db_index=True)
users = models.ManyToManyField(
"takahe.User",
related_name="identities",
blank=True,
)
username = models.CharField(max_length=500, blank=True, null=True)
# Must be a display domain if present
domain = models.ForeignKey(
Domain,
blank=True,
null=True,
on_delete=models.PROTECT,
related_name="identities",
)
2024-05-15 20:41:03 -04:00
name = models.CharField(
max_length=500, blank=True, null=True, verbose_name=_("Display Name")
)
summary = models.TextField(blank=True, null=True, verbose_name=_("Bio"))
2023-08-22 17:13:52 +00:00
manually_approves_followers = models.BooleanField(
2024-05-15 20:41:03 -04:00
default=False, verbose_name=_("Manually approve new followers")
)
discoverable = models.BooleanField(
default=True,
verbose_name=_("Include profile and posts in discovery"),
)
indexable = models.BooleanField(
default=True, verbose_name=_("Include posts in search results")
2023-08-22 17:13:52 +00:00
)
2023-07-20 21:59:49 -04:00
profile_uri = models.CharField(max_length=500, blank=True, null=True)
inbox_uri = models.CharField(max_length=500, blank=True, null=True)
shared_inbox_uri = models.CharField(max_length=500, blank=True, null=True)
outbox_uri = models.CharField(max_length=500, blank=True, null=True)
icon_uri = models.CharField(max_length=500, blank=True, null=True)
image_uri = models.CharField(max_length=500, blank=True, null=True)
followers_uri = models.CharField(max_length=500, blank=True, null=True)
following_uri = models.CharField(max_length=500, blank=True, null=True)
featured_collection_uri = models.CharField(max_length=500, blank=True, null=True)
actor_type = models.CharField(max_length=100, default="person")
2023-08-22 17:13:52 +00:00
icon = models.ImageField(
upload_to=partial(upload_namer, "profile_images"),
blank=True,
null=True,
2024-05-15 20:41:03 -04:00
verbose_name=_("Profile picture"),
2023-08-22 17:13:52 +00:00
storage=upload_store,
)
image = models.ImageField(
upload_to=partial(upload_namer, "background_images"),
blank=True,
null=True,
2024-05-15 20:41:03 -04:00
verbose_name=_("Header picture"),
2023-08-22 17:13:52 +00:00
storage=upload_store,
)
2023-07-20 21:59:49 -04:00
# Should be a list of {"name":..., "value":...} dicts
metadata = models.JSONField(blank=True, null=True)
# Should be a list of object URIs (we don't want a full M2M here)
pinned = models.JSONField(blank=True, null=True)
# Admin-only moderation fields
sensitive = models.BooleanField(default=False)
restriction = models.IntegerField(
choices=Restriction.choices, default=Restriction.none, db_index=True
)
admin_notes = models.TextField(null=True, blank=True)
private_key = models.TextField(null=True, blank=True)
public_key = models.TextField(null=True, blank=True)
public_key_id = models.TextField(null=True, blank=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
fetched = models.DateTimeField(null=True, blank=True)
deleted = models.DateTimeField(null=True, blank=True)
# objects = IdentityManager()
### Model attributes ###
class Meta:
# managed = False
db_table = "users_identity"
verbose_name_plural = "identities"
unique_together = [("username", "domain")]
indexes: list = [] # We need this so Stator can add its own
class urls(urlman.Urls):
view = "/@{self.username}@{self.domain_id}/"
replies = "{view}replies/"
settings = "{view}settings/"
action = "{view}action/"
followers = "{view}followers/"
following = "{view}following/"
search = "{view}search/"
activate = "{view}activate/"
admin = "/admin/identities/"
admin_edit = "{admin}{self.pk}/"
djadmin_edit = "/djadmin/users/identity/{self.id}/change/"
2023-12-31 08:32:19 -05:00
def get_scheme(self, url): # pyright: ignore
2023-07-20 21:59:49 -04:00
return "https"
def get_hostname(self, url):
return self.instance.domain.uri_domain
def __str__(self):
if self.username and self.domain:
return self.handle
return self.actor_uri
def absolute_profile_uri(self):
"""
Returns a profile URI that is always absolute, for sending out to
other servers.
"""
if self.local:
return f"https://{self.domain.uri_domain}/@{self.username}/"
else:
return self.profile_uri
@property
def handle(self):
if self.username is None:
return "(unknown user)"
if self.domain_id:
return f"{self.username}@{self.domain_id}"
return f"{self.username}@(unknown server)"
2023-08-15 15:46:11 -04:00
@property
def url(self):
return (
f"/users/{self.username}/"
if self.local
else f"/users/@{self.username}@{self.domain_id}/"
)
2023-07-20 21:59:49 -04:00
@property
def user_pk(self):
user = self.users.first()
return user.pk if user else None
@classmethod
def fetch_webfinger_url(cls, domain: str) -> str:
"""
Given a domain (hostname), returns the correct webfinger URL to use
based on probing host-meta.
"""
with httpx.Client(
2023-11-26 17:23:53 -05:00
timeout=settings.TAKAHE_REMOTE_TIMEOUT,
2023-07-20 21:59:49 -04:00
headers={"User-Agent": settings.TAKAHE_USER_AGENT},
) as client:
try:
response = client.get(
f"https://{domain}/.well-known/host-meta",
follow_redirects=True,
headers={"Accept": "application/xml"},
)
# In the case of anything other than a success, we'll still try
# hitting the webfinger URL on the domain we were given to handle
# incorrectly setup servers.
if response.status_code == 200 and response.content.strip():
tree = etree.fromstring(response.content)
template = tree.xpath(
"string(.//*[local-name() = 'Link' and @rel='lrdd' and (not(@type) or @type='application/jrd+json')]/@template)"
)
if template:
return template # type: ignore
except (httpx.RequestError, etree.ParseError):
pass
return f"https://{domain}/.well-known/webfinger?resource={{uri}}"
@classmethod
def fetch_webfinger(cls, handle: str) -> tuple[str | None, str | None]:
"""
Given a username@domain handle, returns a tuple of
(actor uri, canonical handle) or None, None if it does not resolve.
"""
domain = handle.split("@")[1].lower()
try:
webfinger_url = cls.fetch_webfinger_url(domain)
except ssl.SSLCertVerificationError:
return None, None
# Go make a Webfinger request
with httpx.Client(
2023-11-26 17:23:53 -05:00
timeout=settings.TAKAHE_REMOTE_TIMEOUT,
2023-07-20 21:59:49 -04:00
headers={"User-Agent": settings.TAKAHE_USER_AGENT},
) as client:
try:
response = client.get(
webfinger_url.format(uri=f"acct:{handle}"),
follow_redirects=True,
headers={"Accept": "application/json"},
)
response.raise_for_status()
except (httpx.HTTPError, ssl.SSLCertVerificationError) as ex:
response = getattr(ex, "response", None)
if (
response
and response.status_code < 500
and response.status_code not in [400, 401, 403, 404, 406, 410]
):
raise ValueError(
f"Client error fetching webfinger: {response.status_code}",
response.content,
)
return None, None
try:
data = response.json()
except ValueError:
# Some servers return these with a 200 status code!
if b"not found" in response.content.lower():
return None, None
raise ValueError(
"JSON parse error fetching webfinger",
response.content,
)
try:
if data["subject"].startswith("acct:"):
data["subject"] = data["subject"][5:]
for link in data["links"]:
if (
link.get("type") == "application/activity+json"
and link.get("rel") == "self"
):
return link["href"], data["subject"]
except KeyError:
# Server returning wrong payload structure
pass
return None, None
@classmethod
def by_username_and_domain(
cls,
username: str,
domain: str | Domain,
fetch: bool = False,
local: bool = False,
):
"""
Get an Identity by username and domain.
When fetch is True, a failed lookup will do a webfinger lookup to attempt to do
a lookup by actor_uri, creating an Identity record if one does not exist. When
local is True, lookups will be restricted to local domains.
If domain is a Domain, domain.local is used instead of passsed local.
"""
if username.startswith("@"):
raise ValueError("Username must not start with @")
domain_instance = None
if isinstance(domain, Domain):
domain_instance = domain
local = domain.local
domain = domain.domain
else:
domain = domain.lower()
try:
if local:
return cls.objects.get(
username__iexact=username,
domain_id=domain,
local=True,
)
else:
return cls.objects.get(
username__iexact=username,
domain_id=domain,
)
except cls.DoesNotExist:
if fetch and not local:
actor_uri, handle = cls.fetch_webfinger(f"{username}@{domain}")
if handle is None:
return None
# See if this actually does match an existing actor
try:
return cls.objects.get(actor_uri=actor_uri)
except cls.DoesNotExist:
pass
# OK, make one
username, domain = handle.split("@")
if not domain_instance:
domain_instance = Domain.get_remote_domain(domain)
return cls.objects.create(
actor_uri=actor_uri,
username=username,
domain_id=domain_instance,
local=False,
)
return None
def generate_keypair(self):
if not self.local:
raise ValueError("Cannot generate keypair for remote user")
self.private_key, self.public_key = RsaKeys.generate_keypair()
self.public_key_id = self.actor_uri + "#main-key"
self.save()
2023-12-02 15:34:14 -05:00
def ensure_uris(self):
"""
Ensures that local identities have all the URIs populated on their fields
(this lets us add new ones easily)
"""
if self.local:
self.inbox_uri = self.actor_uri + "inbox/"
self.outbox_uri = self.actor_uri + "outbox/"
self.featured_collection_uri = self.actor_uri + "collections/featured/"
self.followers_uri = self.actor_uri + "followers/"
self.following_uri = self.actor_uri + "following/"
self.shared_inbox_uri = f"https://{self.domain.uri_domain}/inbox/"
2023-12-06 00:21:28 -05:00
self.save()
2023-12-02 15:34:14 -05:00
2024-06-04 16:51:51 -04:00
def get_remote_targets(self):
"""
Returns an iterable with Identities of followers that have unique
shared_inbox among each other to be used as target.
"""
if not self.local:
return []
remote_follower_ids = Follow.objects.filter(
target=self,
target__local=False,
state__in=["unrequested", "pending_approval", "accepting", "accepted"],
).values_list("source", flat=True)
deduped_targets = set()
shared_inboxes = set()
for target in Identity.objects.filter(pk__in=remote_follower_ids):
if not target.shared_inbox_uri:
deduped_targets.add(target)
elif target.shared_inbox_uri not in shared_inboxes:
shared_inboxes.add(target.shared_inbox_uri)
deduped_targets.add(target)
return deduped_targets
def fanout(self, type: str, **kwargs):
for target in self.get_remote_targets():
FanOut.objects.create(
identity=target, subject_identity=self, type=type, **kwargs
)
2023-07-20 21:59:49 -04:00
class Follow(models.Model):
"""
When one user (the source) follows other (the target)
"""
id = models.BigIntegerField(primary_key=True, default=Snowflake.generate_follow)
source = models.ForeignKey(
"takahe.Identity",
on_delete=models.CASCADE,
related_name="outbound_follows",
)
target = models.ForeignKey(
"takahe.Identity",
on_delete=models.CASCADE,
related_name="inbound_follows",
)
boosts = models.BooleanField(
default=True, help_text="Also follow boosts from this user"
)
2024-06-03 12:59:24 -04:00
notify = models.BooleanField(
default=False, help_text="Notify about posts from this user"
)
2023-07-20 21:59:49 -04:00
uri = models.CharField(blank=True, null=True, max_length=500)
note = models.TextField(blank=True, null=True)
# state = StateField(FollowStates)
state = models.CharField(max_length=100, default="unrequested")
state_changed = models.DateTimeField(auto_now_add=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
class Meta:
# managed = False
db_table = "users_follow"
unique_together = [("source", "target")]
indexes: list = [] # We need this so Stator can add its own
def __str__(self):
return f"#{self.id}: {self.source}{self.target}"
2023-08-15 15:46:11 -04:00
class PostQuerySet(models.QuerySet):
def not_hidden(self):
query = self.exclude(state__in=["deleted", "deleted_fanned_out"])
return query
def public(self, include_replies: bool = False):
query = self.filter(
visibility__in=[
Post.Visibilities.public,
Post.Visibilities.local_only,
],
)
if not include_replies:
return query.filter(in_reply_to__isnull=True)
return query
def local_public(self, include_replies: bool = False):
query = self.filter(
visibility__in=[
Post.Visibilities.public,
Post.Visibilities.local_only,
],
local=True,
)
if not include_replies:
return query.filter(in_reply_to__isnull=True)
return query
def unlisted(self, include_replies: bool = False):
query = self.filter(
visibility__in=[
Post.Visibilities.public,
Post.Visibilities.local_only,
Post.Visibilities.unlisted,
],
)
if not include_replies:
return query.filter(in_reply_to__isnull=True)
return query
def visible_to(self, identity: Identity | None, include_replies: bool = False):
if identity is None:
return self.unlisted(include_replies=include_replies)
query = self.filter(
models.Q(
visibility__in=[
Post.Visibilities.public,
Post.Visibilities.local_only,
Post.Visibilities.unlisted,
]
)
| models.Q(
visibility=Post.Visibilities.followers,
author__inbound_follows__source=identity,
)
| models.Q(
mentions=identity,
)
| models.Q(author=identity)
).distinct()
if not include_replies:
return query.filter(in_reply_to__isnull=True)
return query
# def tagged_with(self, hashtag: str | Hashtag):
# if isinstance(hashtag, str):
# tag_q = models.Q(hashtags__contains=hashtag)
# else:
# tag_q = models.Q(hashtags__contains=hashtag.hashtag)
# if hashtag.aliases:
# for alias in hashtag.aliases:
# tag_q |= models.Q(hashtags__contains=alias)
# return self.filter(tag_q)
class PostManager(models.Manager):
def get_queryset(self):
return PostQuerySet(self.model, using=self._db)
def not_hidden(self):
return self.get_queryset().not_hidden()
def public(self, include_replies: bool = False):
return self.get_queryset().public(include_replies=include_replies)
def local_public(self, include_replies: bool = False):
return self.get_queryset().local_public(include_replies=include_replies)
def unlisted(self, include_replies: bool = False):
return self.get_queryset().unlisted(include_replies=include_replies)
# def tagged_with(self, hashtag: str | Hashtag):
# return self.get_queryset().tagged_with(hashtag=hashtag)
2023-07-20 21:59:49 -04:00
class Post(models.Model):
"""
A post (status, toot) that is either local or remote.
"""
2024-05-27 15:44:12 -04:00
if TYPE_CHECKING:
2024-06-13 20:44:15 -04:00
author_id: int
2024-05-27 15:44:12 -04:00
interactions: "models.QuerySet[PostInteraction]"
attachments: "models.QuerySet[PostAttachment]"
2023-07-20 21:59:49 -04:00
class Visibilities(models.IntegerChoices):
public = 0
local_only = 4
unlisted = 1
followers = 2
mentioned = 3
class Types(models.TextChoices):
article = "Article"
audio = "Audio"
event = "Event"
image = "Image"
note = "Note"
page = "Page"
question = "Question"
video = "Video"
id = models.BigIntegerField(primary_key=True, default=Snowflake.generate_post)
# The author (attributedTo) of the post
author = models.ForeignKey(
"takahe.Identity",
on_delete=models.CASCADE,
related_name="posts",
)
# The state the post is in
# state = StateField(PostStates)
state = models.CharField(max_length=100, default="new")
state_changed = models.DateTimeField(auto_now_add=True)
2024-06-13 20:44:15 -04:00
state_next_attempt = models.DateTimeField(blank=True, null=True)
state_locked_until = models.DateTimeField(null=True, blank=True, db_index=True)
2023-07-20 21:59:49 -04:00
# If it is our post or not
local = models.BooleanField()
# The canonical object ID
object_uri = models.CharField(max_length=2048, blank=True, null=True, unique=True)
# Who should be able to see this Post
visibility = models.IntegerField(
choices=Visibilities.choices,
default=Visibilities.public,
)
# The main (HTML) content
content = models.TextField()
# The language of the content
language = models.CharField(default="", blank=True)
2023-07-20 21:59:49 -04:00
type = models.CharField(
max_length=20,
choices=Types.choices,
default=Types.note,
)
type_data = models.JSONField(
blank=True,
null=True, # , encoder=PostTypeDataEncoder, decoder=PostTypeDataDecoder
)
# If the contents of the post are sensitive, and the summary (content
# warning) to show if it is
sensitive = models.BooleanField(default=False)
summary = models.TextField(blank=True, null=True)
# The public, web URL of this Post on the original server
url = models.CharField(max_length=2048, blank=True, null=True)
# The Post it is replying to as an AP ID URI
# (as otherwise we'd have to pull entire threads to use IDs)
in_reply_to = models.CharField(max_length=500, blank=True, null=True, db_index=True)
# The identities the post is directly to (who can see it if not public)
to = models.ManyToManyField(
"takahe.Identity",
related_name="posts_to",
blank=True,
)
# The identities mentioned in the post
mentions = models.ManyToManyField(
"takahe.Identity",
related_name="posts_mentioning",
blank=True,
)
# Hashtags in the post
hashtags = models.JSONField(blank=True, null=True)
emojis = models.ManyToManyField(
"takahe.Emoji",
related_name="posts_using_emoji",
blank=True,
)
# Like/Boost/etc counts
stats = models.JSONField(blank=True, null=True)
# When the post was originally created (as opposed to when we received it)
published = models.DateTimeField(default=timezone.now)
# If the post has been edited after initial publication
edited = models.DateTimeField(blank=True, null=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
2023-09-16 11:09:57 -04:00
objects: PostManager = PostManager()
2023-07-20 21:59:49 -04:00
class Meta:
# managed = False
db_table = "activities_post"
class urls(urlman.Urls):
view = "{self.author.urls.view}posts/{self.id}/"
object_uri = "{self.author.actor_uri}posts/{self.id}/"
action_like = "{view}like/"
action_unlike = "{view}unlike/"
action_boost = "{view}boost/"
action_unboost = "{view}unboost/"
action_bookmark = "{view}bookmark/"
action_unbookmark = "{view}unbookmark/"
action_delete = "{view}delete/"
action_edit = "{view}edit/"
action_report = "{view}report/"
action_reply = "/compose/?reply_to={self.id}"
admin_edit = "/djadmin/activities/post/{self.id}/change/"
2023-12-31 08:32:19 -05:00
def get_scheme(self, url): # pyright: ignore
2023-07-20 21:59:49 -04:00
return "https"
def get_hostname(self, url):
return self.instance.author.domain.uri_domain
def __str__(self):
return f"{self.author} #{self.id}"
def get_absolute_url(self):
return self.urls.view
def absolute_object_uri(self):
"""
Returns an object URI that is always absolute, for sending out to
other servers.
"""
if self.local:
return self.author.absolute_profile_uri() + f"posts/{self.id}/"
else:
return self.object_uri
def in_reply_to_post(self) -> Optional["Post"]:
"""
Returns the actual Post object we're replying to, if we can find it
"""
if self.in_reply_to is None:
return None
return (
Post.objects.filter(object_uri=self.in_reply_to)
.select_related("author")
.first()
)
2023-11-27 23:02:58 -05:00
def add_to_timeline(self, owner: Identity):
"""
Creates a TimelineEvent for this post on owner's timeline
"""
return TimelineEvent.objects.update_or_create(
identity=owner,
type=TimelineEvent.Types.post,
subject_post=self,
subject_identity=self.author,
defaults={
"published": self.published,
},
)[0]
2023-07-20 21:59:49 -04:00
@classmethod
def create_local(
cls,
author: Identity,
content: str,
2024-06-15 21:54:39 -04:00
raw_prepend_content: str,
raw_append_content: str,
2023-07-20 21:59:49 -04:00
summary: str | None = None,
sensitive: bool = False,
visibility: int = Visibilities.public,
reply_to: Optional["Post"] = None,
attachments: list | None = None,
type_data: dict | None = None,
published: datetime.datetime | None = None,
2024-06-13 20:44:15 -04:00
edited: datetime.datetime | None = None,
2023-07-20 21:59:49 -04:00
) -> "Post":
with transaction.atomic():
# Find mentions in this post
2024-06-11 18:48:29 -04:00
mentions = cls.mentions_from_content(content, author)
2023-07-20 21:59:49 -04:00
if reply_to:
mentions.add(reply_to.author)
# Maintain local-only for replies
if reply_to.visibility == reply_to.Visibilities.local_only:
visibility = reply_to.Visibilities.local_only
# Find emoji in this post
2024-06-11 18:48:29 -04:00
emojis = Emoji.emojis_from_content(content, None)
2023-07-20 21:59:49 -04:00
# Strip all unwanted HTML and apply linebreaks filter, grabbing hashtags on the way
parser = FediverseHtmlParser(linebreaks_filter(content), find_hashtags=True)
2024-06-15 21:54:39 -04:00
content = (
parser.html.replace("<p>", "<p>" + raw_prepend_content, 1)
+ raw_append_content
)
2023-07-20 21:59:49 -04:00
hashtags = (
sorted([tag[: Hashtag.MAXIMUM_LENGTH] for tag in parser.hashtags])
or None
)
post_obj = {
"author": author,
"content": content,
"summary": summary or None,
"sensitive": bool(summary) or sensitive,
"local": True,
"visibility": visibility,
"hashtags": hashtags,
"in_reply_to": reply_to.object_uri if reply_to else None,
}
2024-06-13 20:44:15 -04:00
if edited:
post_obj["edited"] = edited
if published:
_delta = timezone.now() - published
if _delta > datetime.timedelta(0):
post_obj["published"] = published
if _delta > datetime.timedelta(days=settings.FANOUT_LIMIT_DAYS):
post_obj["id"] = Snowflake.generate_post_at(
published.timestamp()
)
post_obj["state"] = "fanned_out" # add post quietly if it's old
2024-05-26 22:39:51 -04:00
with transaction.atomic(using="takahe"):
# Make the Post object
post = cls.objects.create(**post_obj)
2024-06-11 18:48:29 -04:00
post.mentions.set(mentions)
post.emojis.set(emojis)
2024-05-26 22:39:51 -04:00
post.object_uri = post.urls.object_uri
post.url = post.absolute_object_uri()
if attachments:
post.attachments.set(attachments)
# if question: # FIXME
# post.type = question["type"]
# post.type_data = PostTypeData(__root__=question).__root__
if type_data:
post.type_data = type_data
post.save()
2023-07-20 21:59:49 -04:00
# Recalculate parent stats for replies
if reply_to:
reply_to.calculate_stats()
2024-06-11 18:48:29 -04:00
if post.state == "fanned_out":
2023-12-03 15:30:21 -05:00
# add post to auther's timeline directly if it's old
2023-11-27 23:02:58 -05:00
post.add_to_timeline(author)
2023-07-20 21:59:49 -04:00
return post
def edit_local(
self,
content: str,
2024-06-15 21:54:39 -04:00
raw_prepend_content: str,
raw_append_content: str,
2023-07-20 21:59:49 -04:00
summary: str | None = None,
sensitive: bool | None = None,
visibility: int = Visibilities.public,
attachments: list | None = None,
attachment_attributes: list | None = None,
type_data: dict | None = None,
2023-11-27 23:02:58 -05:00
published: datetime.datetime | None = None,
2024-06-13 20:44:15 -04:00
edited: datetime.datetime | None = None,
2023-07-20 21:59:49 -04:00
):
with transaction.atomic():
# Strip all HTML and apply linebreaks filter
parser = FediverseHtmlParser(linebreaks_filter(content), find_hashtags=True)
2024-06-15 21:54:39 -04:00
self.content = (
parser.html.replace("<p>", "<p>" + raw_prepend_content, 1)
+ raw_append_content
)
2023-07-20 21:59:49 -04:00
self.hashtags = (
sorted([tag[: Hashtag.MAXIMUM_LENGTH] for tag in parser.hashtags])
or None
)
self.summary = summary or None
self.sensitive = bool(summary) if sensitive is None else sensitive
self.visibility = visibility
2024-06-13 20:44:15 -04:00
self.edited = edited or timezone.now()
2023-07-20 21:59:49 -04:00
self.mentions.set(self.mentions_from_content(content, self.author))
self.emojis.set(Emoji.emojis_from_content(content, None))
2023-12-22 23:59:48 -05:00
if attachments is not None:
self.attachments.set(attachments or []) # type: ignore
2023-07-20 21:59:49 -04:00
if type_data:
self.type_data = type_data
self.save()
for attrs in attachment_attributes or []:
attachment = next(
(a for a in attachments or [] if str(a.id) == attrs.id), None
)
if attachment is None:
continue
attachment.name = attrs.description
attachment.save()
self.state = "edited"
self.state_changed = timezone.now()
self.state_next_attempt = None
self.state_locked_until = None
self.save()
@classmethod
def mentions_from_content(cls, content, author) -> set[Identity]:
mention_hits = FediverseHtmlParser(content, find_mentions=True).mentions
mentions = set()
for handle in mention_hits:
handle = handle.lower()
if "@" in handle:
username, domain = handle.split("@", 1)
2023-11-26 17:23:53 -05:00
local = False
2023-07-20 21:59:49 -04:00
else:
username = handle
domain = author.domain_id
2023-11-26 17:23:53 -05:00
local = author.local
2023-07-20 21:59:49 -04:00
identity = Identity.by_username_and_domain(
2023-11-26 17:23:53 -05:00
username=username, domain=domain, fetch=True, local=local
2023-07-20 21:59:49 -04:00
)
if identity is not None:
mentions.add(identity)
return mentions
def calculate_stats(self, save=True):
"""
Recalculates our stats dict
"""
from .models import PostInteraction
self.stats = {
"likes": self.interactions.filter(
type=PostInteraction.Types.like,
state__in=["new", "fanned_out"],
).count(),
"boosts": self.interactions.filter(
type=PostInteraction.Types.boost,
state__in=["new", "fanned_out"],
).count(),
2024-06-13 20:44:15 -04:00
"replies": Post.objects.filter(in_reply_to=self.object_uri)
.exclude(state__in=["deleted", "deleted_fanned_out"])
.count(),
2023-07-20 21:59:49 -04:00
}
if save:
self.save()
2023-08-15 15:46:11 -04:00
@property
def safe_content_local(self):
return ContentRenderer(local=True).render_post(self.content, self)
2023-07-20 21:59:49 -04:00
2023-11-23 10:11:42 -05:00
class FanOut(models.Model):
"""
An activity that needs to get to an inbox somewhere.
"""
class Meta:
# managed = False
db_table = "activities_fanout"
class Types(models.TextChoices):
post = "post"
post_edited = "post_edited"
post_deleted = "post_deleted"
interaction = "interaction"
undo_interaction = "undo_interaction"
identity_edited = "identity_edited"
identity_deleted = "identity_deleted"
identity_created = "identity_created"
identity_moved = "identity_moved"
state = models.CharField(max_length=100, default="outdated")
state_changed = models.DateTimeField(auto_now_add=True)
# The user this event is targeted at
# We always need this, but if there is a shared inbox URL on the user
# we'll deliver to that and won't have fanouts for anyone else with the
# same one.
identity = models.ForeignKey(
"takahe.Identity",
on_delete=models.CASCADE,
related_name="fan_outs",
)
# What type of activity it is
type = models.CharField(max_length=100, choices=Types.choices)
# Links to the appropriate objects
subject_post = models.ForeignKey(
"takahe.Post",
on_delete=models.CASCADE,
blank=True,
null=True,
related_name="fan_outs",
)
subject_post_interaction = models.ForeignKey(
"takahe.PostInteraction",
on_delete=models.CASCADE,
blank=True,
null=True,
related_name="fan_outs",
)
subject_identity = models.ForeignKey(
"takahe.Identity",
on_delete=models.CASCADE,
blank=True,
null=True,
related_name="subject_fan_outs",
)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
2023-12-22 23:59:48 -05:00
class PostAttachment(models.Model):
"""
An attachment to a Post. Could be an image, a video, etc.
"""
post = models.ForeignKey(
"takahe.post",
on_delete=models.CASCADE,
related_name="attachments",
blank=True,
null=True,
)
author = models.ForeignKey(
"takahe.Identity",
on_delete=models.CASCADE,
related_name="attachments",
blank=True,
null=True,
)
# state = StateField(graph=PostAttachmentStates)
state = models.CharField(max_length=100, default="new")
state_changed = models.DateTimeField(auto_now_add=True)
mimetype = models.CharField(max_length=200)
# Files may not be populated if it's remote and not cached on our side yet
file = models.FileField(
upload_to=partial(upload_namer, "attachments"),
null=True,
blank=True,
storage=upload_store,
)
thumbnail = models.ImageField(
upload_to=partial(upload_namer, "attachment_thumbnails"),
null=True,
blank=True,
storage=upload_store,
)
remote_url = models.CharField(max_length=500, null=True, blank=True)
# This is the description for images, at least
name = models.TextField(null=True, blank=True)
width = models.IntegerField(null=True, blank=True)
height = models.IntegerField(null=True, blank=True)
focal_x = models.FloatField(null=True, blank=True)
focal_y = models.FloatField(null=True, blank=True)
blurhash = models.TextField(null=True, blank=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
class Meta:
# managed = False
db_table = "activities_postattachment"
2024-04-20 22:02:42 -04:00
def is_image(self):
return self.mimetype in [
"image/apng",
"image/avif",
"image/gif",
"image/jpeg",
"image/png",
"image/webp",
]
def is_video(self):
return self.mimetype in [
"video/mp4",
"video/ogg",
"video/webm",
]
def thumbnail_url(self) -> RelativeAbsoluteUrl:
if self.thumbnail:
return RelativeAbsoluteUrl(self.thumbnail.url)
elif self.file:
return RelativeAbsoluteUrl(self.file.url)
else:
return ProxyAbsoluteUrl(
f"/proxy/post_attachment/{self.pk}/",
remote_url=self.remote_url,
)
def full_url(self):
if self.file:
return RelativeAbsoluteUrl(self.file.url)
if self.is_image():
return ProxyAbsoluteUrl(
f"/proxy/post_attachment/{self.pk}/",
remote_url=self.remote_url,
)
return RelativeAbsoluteUrl(self.remote_url)
@property
def file_display_name(self):
if self.remote_url:
return self.remote_url.rsplit("/", 1)[-1]
if self.file:
return self.file.name.rsplit("/", 1)[-1]
return f"attachment ({self.mimetype})"
2023-12-22 23:59:48 -05:00
2023-07-20 21:59:49 -04:00
class EmojiQuerySet(models.QuerySet):
def usable(self, domain: Domain | None = None):
"""
Returns all usable emoji, optionally filtering by domain too.
"""
visible_q = models.Q(local=True) | models.Q(public=True)
if True: # Config.system.emoji_unreviewed_are_public:
visible_q |= models.Q(public__isnull=True)
qs = self.filter(visible_q)
if domain:
if not domain.local:
qs = qs.filter(domain=domain)
return qs
class EmojiManager(models.Manager):
def get_queryset(self):
return EmojiQuerySet(self.model, using=self._db)
def usable(self, domain: Domain | None = None):
return self.get_queryset().usable(domain)
class Emoji(models.Model):
class Meta:
# managed = False
db_table = "activities_emoji"
# Normalized Emoji without the ':'
shortcode = models.SlugField(max_length=100, db_index=True)
domain = models.ForeignKey(
"takahe.Domain", null=True, blank=True, on_delete=models.CASCADE
)
local = models.BooleanField(default=True)
# Should this be shown in the public UI?
public = models.BooleanField(null=True)
object_uri = models.CharField(max_length=500, blank=True, null=True, unique=True)
mimetype = models.CharField(max_length=200)
# Files may not be populated if it's remote and not cached on our side yet
file = models.ImageField(
# upload_to=partial(upload_emoji_namer, "emoji"),
null=True,
blank=True,
)
# A link to the custom emoji
remote_url = models.CharField(max_length=500, blank=True, null=True)
# Used for sorting custom emoji in the picker
category = models.CharField(max_length=100, blank=True, null=True)
# State of this Emoji
# state = StateField(EmojiStates)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
objects = EmojiManager()
@classmethod
def emojis_from_content(cls, content: str, domain: Domain | None) -> list["Emoji"]:
"""
Return a parsed and sanitized of emoji found in content without
the surrounding ':'.
"""
emoji_hits = FediverseHtmlParser(
content, find_emojis=True, emoji_domain=domain
).emojis
emojis = sorted({emoji for emoji in emoji_hits})
q = models.Q(local=True) | models.Q(public=True) | models.Q(public__isnull=True)
if domain and not domain.local:
q = q & models.Q(domain=domain)
return list(
cls.objects.filter(local=(domain is None) or domain.local)
.filter(q)
.filter(shortcode__in=emojis)
)
@classmethod
@cached(cache=TTLCache(maxsize=1000, ttl=60))
def get_by_domain(cls, shortcode, domain: Domain | None) -> "Emoji | None":
"""
Given an emoji shortcode and optional domain, looks up the single
emoji and returns it. Raises Emoji.DoesNotExist if there isn't one.
"""
try:
if domain is None or domain.local:
return cls.objects.get(local=True, shortcode=shortcode)
else:
return cls.objects.get(domain=domain, shortcode=shortcode)
except Emoji.DoesNotExist:
return None
@property
def fullcode(self):
return f":{self.shortcode}:"
@property
def is_usable(self) -> bool:
"""
Return True if this Emoji is usable.
"""
return self.public or self.public is None
def full_url(self, always_show=False) -> RelativeAbsoluteUrl:
if self.is_usable or always_show:
if self.file:
2023-08-22 17:13:52 +00:00
return AutoAbsoluteUrl(settings.TAKAHE_MEDIA_URL + self.file.name)
2023-08-15 15:46:11 -04:00
# return AutoAbsoluteUrl(self.file.url)
2023-07-20 21:59:49 -04:00
elif self.remote_url:
return ProxyAbsoluteUrl(
f"/proxy/emoji/{self.pk}/",
remote_url=self.remote_url,
)
return StaticAbsoluteUrl("img/blank-emoji-128.png")
def as_html(self):
if self.is_usable:
return mark_safe(
f'<img src="{self.full_url().relative}" class="emoji" alt="Emoji {self.shortcode}">'
)
return self.fullcode
class HashtagQuerySet(models.QuerySet):
def public(self):
public_q = models.Q(public=True)
if True: # Config.system.hashtag_unreviewed_are_public:
public_q |= models.Q(public__isnull=True)
return self.filter(public_q)
def hashtag_or_alias(self, hashtag: str):
return self.filter(
models.Q(hashtag=hashtag) | models.Q(aliases__contains=hashtag)
)
class HashtagManager(models.Manager):
def get_queryset(self):
return HashtagQuerySet(self.model, using=self._db)
def public(self):
return self.get_queryset().public()
def hashtag_or_alias(self, hashtag: str):
return self.get_queryset().hashtag_or_alias(hashtag)
class Hashtag(models.Model):
class Meta:
# managed = False
db_table = "activities_hashtag"
MAXIMUM_LENGTH = 100
# Normalized hashtag without the '#'
hashtag = models.SlugField(primary_key=True, max_length=100)
# Friendly display override
name_override = models.CharField(max_length=100, null=True, blank=True)
# Should this be shown in the public UI?
public = models.BooleanField(null=True)
# State of this Hashtag
# state = StateField(HashtagStates)
state = models.CharField(max_length=100, default="outdated")
state_changed = models.DateTimeField(auto_now_add=True)
2024-06-04 16:51:51 -04:00
state_next_attempt = models.DateTimeField(blank=True, null=True)
state_locked_until = models.DateTimeField(null=True, blank=True, db_index=True)
2023-07-20 21:59:49 -04:00
# Metrics for this Hashtag
stats = models.JSONField(null=True, blank=True)
# Timestamp of last time the stats were updated
stats_updated = models.DateTimeField(null=True, blank=True)
# List of other hashtags that are considered similar
aliases = models.JSONField(null=True, blank=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
objects = HashtagManager()
class urls(urlman.Urls):
view = "/tags/{self.hashtag}/"
follow = "/tags/{self.hashtag}/follow/"
unfollow = "/tags/{self.hashtag}/unfollow/"
admin = "/admin/hashtags/"
admin_edit = "{admin}{self.hashtag}/"
admin_enable = "{admin_edit}enable/"
admin_disable = "{admin_edit}disable/"
timeline = "/tags/{self.hashtag}/"
hashtag_regex = re.compile(r"\B#([a-zA-Z0-9(_)]+\b)(?!;)")
def save(self, *args, **kwargs):
self.hashtag = self.hashtag.lstrip("#")
if self.name_override:
self.name_override = self.name_override.lstrip("#")
return super().save(*args, **kwargs)
@property
def display_name(self):
return self.name_override or self.hashtag
def __str__(self):
return self.display_name
def usage_months(self, num: int = 12) -> dict[date, int]:
"""
Return the most recent num months of stats
"""
if not self.stats:
return {}
results = {}
for key, val in self.stats.items():
parts = key.split("-")
if len(parts) == 2:
year = int(parts[0])
month = int(parts[1])
results[date(year, month, 1)] = val
return dict(sorted(results.items(), reverse=True)[:num])
def usage_days(self, num: int = 7) -> dict[date, int]:
"""
Return the most recent num days of stats
"""
if not self.stats:
return {}
results = {}
for key, val in self.stats.items():
parts = key.split("-")
if len(parts) == 3:
year = int(parts[0])
month = int(parts[1])
day = int(parts[2])
results[date(year, month, day)] = val
return dict(sorted(results.items(), reverse=True)[:num])
2024-06-04 16:51:51 -04:00
@property
def needs_update(self):
if self.stats_updated is None:
return True
return timezone.now() - self.stats_updated > timedelta(hours=1)
@classmethod
def ensure_hashtag(cls, name, update=None):
"""
Properly strips/trims/lowercases the hashtag name, and makes sure a Hashtag
object exists in the database, and returns it.
"""
name = name.strip().lstrip("#").lower()[: Hashtag.MAXIMUM_LENGTH]
hashtag, created = cls.objects.get_or_create(hashtag=name)
if created or update or hashtag.needs_update:
hashtag.state = "outdated"
hashtag.state_changed = timezone.now()
hashtag.state_next_attempt = None
hashtag.state_locked_until = None
hashtag.save(
update_fields=[
"state",
"state_changed",
"state_next_attempt",
"state_locked_until",
]
)
return hashtag
class HashtagFeature(models.Model):
identity = models.ForeignKey(
"takahe.Identity",
on_delete=models.CASCADE,
related_name="hashtag_features",
)
hashtag = models.ForeignKey(
"takahe.Hashtag",
on_delete=models.CASCADE,
related_name="featurers",
)
created = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = "users_hashtagfeature"
2023-07-20 21:59:49 -04:00
class PostInteraction(models.Model):
"""
Handles both boosts and likes
"""
class Types(models.TextChoices):
like = "like"
boost = "boost"
vote = "vote"
pin = "pin"
id = models.BigIntegerField(
primary_key=True,
default=Snowflake.generate_post_interaction,
)
# The state the boost is in
# state = StateField(PostInteractionStates)
state = models.CharField(max_length=100, default="new")
state_changed = models.DateTimeField(auto_now_add=True)
# The canonical object ID
object_uri = models.CharField(max_length=500, blank=True, null=True, unique=True)
# What type of interaction it is
type = models.CharField(max_length=100, choices=Types.choices)
# The user who boosted/liked/etc.
identity = models.ForeignKey(
"takahe.Identity",
on_delete=models.CASCADE,
related_name="interactions",
)
# The post that was boosted/liked/etc
post = models.ForeignKey(
"takahe.Post",
on_delete=models.CASCADE,
related_name="interactions",
)
# Used to store any interaction extra text value like the vote
# in the question/poll case
value = models.CharField(max_length=50, blank=True, null=True)
# When the activity was originally created (as opposed to when we received it)
# Mastodon only seems to send this for boosts, not likes
published = models.DateTimeField(default=timezone.now)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
class Meta:
# managed = False
db_table = "activities_postinteraction"
class Block(models.Model):
"""
When one user (the source) mutes or blocks another (the target)
"""
# state = StateField(BlockStates)
state = models.CharField(max_length=100, default="new")
state_changed = models.DateTimeField(auto_now_add=True)
source = models.ForeignKey(
"takahe.Identity",
on_delete=models.CASCADE,
related_name="outbound_blocks",
)
target = models.ForeignKey(
"takahe.Identity",
on_delete=models.CASCADE,
related_name="inbound_blocks",
)
uri = models.CharField(blank=True, null=True, max_length=500)
# If it is a mute, we will stop delivering any activities from target to
# source, but we will still deliver activities from source to target.
# A full block (mute=False) stops activities both ways.
mute = models.BooleanField()
include_notifications = models.BooleanField(default=False)
expires = models.DateTimeField(blank=True, null=True)
note = models.TextField(blank=True, null=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
class Meta:
# managed = False
db_table = "users_block"
def __str__(self):
return f"#{self.pk}: {self.source} blocks {self.target}"
### Alternate fetchers/constructors ###
@classmethod
def maybe_get(
cls, source, target, mute=False, require_active=False
) -> Optional["Block"]:
"""
Returns a Block if it exists between source and target
"""
try:
if require_active:
return cls.objects.filter(
status__in=["new", "sent", "awaiting_expiry"]
).get(source=source, target=target, mute=mute)
else:
return cls.objects.get(source=source, target=target, mute=mute)
except cls.DoesNotExist:
return None
@classmethod
def create_local_block(cls, source, target) -> "Block":
"""
Creates or updates a full Block from a local Identity to the target
(which can be local or remote).
"""
if not source.local:
raise ValueError("You cannot block from a remote Identity")
block = cls.maybe_get(source=source, target=target, mute=False)
if block is not None:
2024-04-06 00:13:50 -04:00
if block.state not in ["new", "sent", "awaiting_expiry"]:
2023-07-20 21:59:49 -04:00
block.state = BlockStates.new # type:ignore
block.save()
else:
with transaction.atomic():
block = cls.objects.create(
source=source,
target=target,
mute=False,
)
block.uri = source.actor_uri + f"block/{block.pk}/"
block.save()
return block
@classmethod
def create_local_mute(
cls,
source,
target,
duration=None,
include_notifications=False,
) -> "Block":
"""
Creates or updates a muting Block from a local Identity to the target
(which can be local or remote).
"""
if not source.local:
raise ValueError("You cannot mute from a remote Identity")
block = cls.maybe_get(source=source, target=target, mute=True)
if block is not None:
2024-04-06 00:13:50 -04:00
if block not in ["new", "sent", "awaiting_expiry"]:
2023-07-20 21:59:49 -04:00
block.state = BlockStates.new # type:ignore
if duration:
block.expires = timezone.now() + datetime.timedelta(seconds=duration)
block.include_notifications = include_notifications
block.save()
else:
with transaction.atomic():
block = cls.objects.create(
source=source,
target=target,
mute=True,
include_notifications=include_notifications,
expires=(
timezone.now() + datetime.timedelta(seconds=duration)
if duration
else None
),
)
block.uri = source.actor_uri + f"block/{block.pk}/"
block.save()
return block
2023-08-13 18:00:10 -04:00
class InboxMessage(models.Model):
"""
an incoming inbox message that needs processing.
Yes, this is kind of its own message queue built on the state graph system.
It's fine. It'll scale up to a decent point.
"""
message = models.JSONField()
# state = StateField(InboxMessageStates)
state = models.CharField(max_length=100, default="received")
state_changed = models.DateTimeField(auto_now_add=True)
class Meta:
# managed = False
db_table = "users_inboxmessage"
@classmethod
def create_internal(cls, payload):
"""
Creates an internal action message
"""
cls.objects.create(
message={
"type": "__internal__",
"object": payload,
}
)
2023-08-15 23:46:00 -04:00
2023-11-27 23:02:58 -05:00
class TimelineEvent(models.Model):
"""
Something that has happened to an identity that we want them to see on one
or more timelines, like posts, likes and follows.
"""
class Types(models.TextChoices):
post = "post"
boost = "boost" # A boost from someone (post substitute)
mentioned = "mentioned"
liked = "liked" # Someone liking one of our posts
followed = "followed"
follow_requested = "follow_requested"
boosted = "boosted" # Someone boosting one of our posts
announcement = "announcement" # Server announcement
identity_created = "identity_created" # New identity created
# The user this event is for
identity = models.ForeignKey(
Identity,
on_delete=models.CASCADE,
related_name="timeline_events",
)
# What type of event it is
type = models.CharField(max_length=100, choices=Types.choices)
# The subject of the event (which is used depends on the type)
subject_post = models.ForeignKey(
Post,
on_delete=models.CASCADE,
blank=True,
null=True,
related_name="timeline_events",
)
subject_post_interaction = models.ForeignKey(
PostInteraction,
on_delete=models.CASCADE,
blank=True,
null=True,
related_name="timeline_events",
)
subject_identity = models.ForeignKey(
Identity,
on_delete=models.CASCADE,
blank=True,
null=True,
related_name="timeline_events_about_us",
)
published = models.DateTimeField(default=timezone.now)
seen = models.BooleanField(default=False)
dismissed = models.BooleanField(default=False)
created = models.DateTimeField(auto_now_add=True)
class Meta:
indexes = [
# This relies on a DB that can use left subsets of indexes
models.Index(
fields=["identity", "type", "subject_post", "subject_identity"]
),
models.Index(fields=["identity", "type", "subject_identity"]),
models.Index(fields=["identity", "created"]),
]
# managed = False
db_table = "activities_timelineevent"
2023-08-15 23:46:00 -04:00
class Config(models.Model):
"""
A configuration setting for either the server or a specific user or identity.
The possible options and their defaults are defined at the bottom of the file.
"""
key = models.CharField(max_length=500)
user = models.ForeignKey(
User,
blank=True,
null=True,
related_name="configs",
on_delete=models.CASCADE,
)
identity = models.ForeignKey(
Identity,
blank=True,
null=True,
related_name="configs",
on_delete=models.CASCADE,
)
domain = models.ForeignKey(
Domain,
blank=True,
null=True,
related_name="configs",
on_delete=models.CASCADE,
)
json = models.JSONField(blank=True, null=True)
image = models.ImageField(
blank=True,
null=True,
)
class Meta:
# managed = False
db_table = "core_config"
unique_together = [
("key", "user", "identity", "domain"),
]
2024-01-15 13:09:12 -05:00
class Relay(models.Model):
inbox_uri = models.CharField(max_length=500, unique=True)
# state = StateField(RelayStates)
state = models.CharField(max_length=100, default="new")
state_changed = models.DateTimeField(auto_now_add=True)
state_next_attempt = models.DateTimeField(blank=True, null=True)
state_locked_until = models.DateTimeField(null=True, blank=True, db_index=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
class Meta:
# managed = False
db_table = "users_relay"
2024-04-17 00:00:40 -04:00
class Announcement(models.Model):
"""
A server-wide announcement that users all see and can dismiss.
"""
text = models.TextField()
published = models.BooleanField(
default=False,
)
start = models.DateTimeField(
null=True,
blank=True,
)
end = models.DateTimeField(
null=True,
blank=True,
)
include_unauthenticated = models.BooleanField(default=False)
# Note that this is against User, not Identity - it's one of the few places
# where we want it to be per login.
seen = models.ManyToManyField("User", blank=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
class Meta:
# managed = False
db_table = "users_announcement"
@property
def html(self) -> str:
from journal.models import render_md
return mark_safe(render_md(self.text))
2024-06-10 17:28:20 -04:00
class Application(models.Model):
"""
OAuth applications
"""
class Meta:
db_table = "api_application"
client_id = models.CharField(max_length=500)
client_secret = models.CharField(max_length=500)
redirect_uris = models.TextField()
scopes = models.TextField()
name = models.CharField(max_length=500)
website = models.CharField(max_length=500, blank=True, null=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
class Authorization(models.Model):
"""
An authorization code as part of the OAuth flow
"""
class Meta:
db_table = "api_authorization"
application = models.ForeignKey(
"takahe.Application",
on_delete=models.CASCADE,
related_name="authorizations",
)
user = models.ForeignKey(
"takahe.User",
blank=True,
null=True,
on_delete=models.CASCADE,
related_name="authorizations",
)
identity = models.ForeignKey(
"takahe.Identity",
blank=True,
null=True,
on_delete=models.CASCADE,
related_name="authorizations",
)
code = models.CharField(max_length=128, blank=True, null=True, unique=True)
token = models.OneToOneField(
"takahe.Token",
blank=True,
null=True,
on_delete=models.CASCADE,
)
scopes = models.JSONField()
redirect_uri = models.TextField(blank=True, null=True)
valid_for_seconds = models.IntegerField(default=60)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
class Token(models.Model):
"""
An (access) token to call the API with.
Can be either tied to a user, or app-level only.
"""
class Meta:
db_table = "api_token"
identity_id: int | None
application = models.ForeignKey(
"takahe.Application",
on_delete=models.CASCADE,
related_name="tokens",
)
user = models.ForeignKey(
"takahe.User",
blank=True,
null=True,
on_delete=models.CASCADE,
related_name="tokens",
)
identity = models.ForeignKey(
"takahe.Identity",
blank=True,
null=True,
on_delete=models.CASCADE,
related_name="tokens",
)
token = models.CharField(max_length=500, unique=True)
scopes = models.JSONField()
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
revoked = models.DateTimeField(blank=True, null=True)
# push_subscription: "PushSubscription"
2024-06-15 23:38:33 -04:00
class Bookmark(models.Model):
class Meta:
db_table = "users_bookmark"
identity = models.ForeignKey(
"takahe.Identity",
on_delete=models.CASCADE,
related_name="bookmarks",
)
post = models.ForeignKey(
"takahe.Post",
on_delete=models.CASCADE,
related_name="bookmarks",
)
created = models.DateTimeField(auto_now_add=True)