auto detect starmode using Mastodon API

This commit is contained in:
Your Name 2024-07-06 12:48:27 -04:00 committed by Henri Dickson
parent dc141b0951
commit 27770c44d6
8 changed files with 103 additions and 99 deletions

View file

@ -28,6 +28,7 @@ class MastodonSiteCheck(BaseJob):
api_domain = site.api_domain or site.domain_name
domain, api_domain, v = detect_server_info(api_domain)
site.last_reachable_date = timezone.now()
site.detect_configurations()
except Exception as e:
logger.error(
f"Failed to detect server info for {site.domain_name}/{site.api_domain}",
@ -42,7 +43,14 @@ class MastodonSiteCheck(BaseJob):
site.disabled = True
count_disabled += 1
finally:
site.save(update_fields=["last_reachable_date", "disabled"])
site.save(
update_fields=[
"star_mode",
"max_status_len",
"last_reachable_date",
"disabled",
]
)
# try:
# if not verify_client(site):
# logger.error(

View file

@ -48,8 +48,7 @@ class Migration(migrations.Migration):
(
"star_mode",
models.PositiveIntegerField(
default=0,
verbose_name="0: custom emoji; 1: unicode moon; 2: text",
default=0, verbose_name="0: unicode moon; 1: custom emoji"
),
),
(

View file

@ -5,6 +5,55 @@ import django.db.models.functions.text
import django.utils.timezone
from django.conf import settings
from django.db import migrations, models
from tqdm import tqdm
from catalog.common import jsondata
def move_masto_email(apps, schema_editor):
User = apps.get_model("users", "User")
MastodonAccount = apps.get_model("mastodon", "MastodonAccount")
EmailAccount = apps.get_model("mastodon", "EmailAccount")
m = 0
e = 0
qs = User.objects.filter(username__isnull=True)
print(f"Deleting {qs.count()} nameless users.")
qs.delete()
for user in tqdm(User.objects.filter(is_active=True)):
if user.mastodon_username:
MastodonAccount.objects.update_or_create(
handle=f"{user.mastodon_username}@{user.mastodon_site}",
defaults={
"user": user,
"uid": user.mastodon_id,
"domain": user.mastodon_site,
"created": user.date_joined,
"last_refresh": user.mastodon_last_refresh,
"last_reachable": user.mastodon_last_reachable,
"followers": user.mastodon_followers,
"following": user.mastodon_following,
"blocks": user.mastodon_blocks,
"mutes": user.mastodon_mutes,
"domain_blocks": user.mastodon_domain_blocks,
"account_data": user.mastodon_account,
"access_data": {
"access_token": jsondata.encrypt_str(user.mastodon_token)
},
},
)
m += 1
if user.email:
EmailAccount.objects.update_or_create(
handle=user.email,
defaults={
"user": user,
"uid": user.email.split("@")[0],
"domain": user.email.split("@")[1],
"created": user.date_joined,
},
)
e += 1
print(f"{m} Mastodon, {e} Email migrated.")
class Migration(migrations.Migration):
@ -133,4 +182,5 @@ class Migration(migrations.Migration):
name="unique_social_type_handle",
),
),
migrations.RunPython(move_masto_email),
]

View file

@ -111,8 +111,12 @@ class SocialAccount(TypedModel):
logger.debug(f"{self} skip refreshing as it's done recently")
return False
if not self.check_alive():
dt = timezone.now() - self.last_reachable
logger.warning(f"{self} unreachable for {dt.days} days")
d = (
(timezone.now() - self.last_reachable).days
if self.last_reachable
else "unknown"
)
logger.warning(f"{self} unreachable for {d} days")
return False
if not self.refresh():
logger.warning(f"{self} refresh failed")

View file

@ -38,6 +38,12 @@ class TootVisibilityEnum(StrEnum):
UNLISTED = "unlisted"
get = functools.partial(requests.get, timeout=settings.MASTODON_TIMEOUT)
put = functools.partial(requests.put, timeout=settings.MASTODON_TIMEOUT)
post = functools.partial(requests.post, timeout=settings.MASTODON_TIMEOUT)
delete = functools.partial(requests.post, timeout=settings.MASTODON_TIMEOUT)
_sites_cache_key = "login_sites"
# See https://docs.joinmastodon.org/methods/accounts/
# returns user info
@ -317,10 +323,10 @@ def detect_server_info(login_domain: str) -> tuple[str, str, str]:
try:
response = get(url, headers={"User-Agent": USER_AGENT})
except Exception as e:
logger.error(f"Error connecting {login_domain}", extra={"exception": e})
logger.warning(f"Error connecting {login_domain}", extra={"exception": e})
raise Exception(f"Error connecting to instance {login_domain}")
if response.status_code != 200:
logger.error(f"Error connecting {login_domain}", extra={"response": response})
logger.warning(f"Error in response from {login_domain} {response.status_code}")
raise Exception(
f"Instance {login_domain} returned error code {response.status_code}"
)
@ -328,7 +334,9 @@ def detect_server_info(login_domain: str) -> tuple[str, str, str]:
j = response.json()
domain = j["uri"].lower().split("//")[-1].split("/")[0]
except Exception as e:
logger.error(f"Error connecting {login_domain}", extra={"exception": e})
logger.warning(
f"Error pasring response from {login_domain}", extra={"exception": e}
)
raise Exception(f"Instance {login_domain} returned invalid data")
server_version = j["version"]
api_domain = domain
@ -416,13 +424,6 @@ def get_toot_visibility(visibility, user) -> TootVisibilityEnum:
return TootVisibilityEnum.UNLISTED
get = functools.partial(requests.get, timeout=settings.MASTODON_TIMEOUT)
put = functools.partial(requests.put, timeout=settings.MASTODON_TIMEOUT)
post = functools.partial(requests.post, timeout=settings.MASTODON_TIMEOUT)
delete = functools.partial(requests.post, timeout=settings.MASTODON_TIMEOUT)
_sites_cache_key = "login_sites"
def get_or_create_fediverse_application(login_domain):
domain = login_domain
app = MastodonApplication.objects.filter(domain_name__iexact=domain).first()
@ -516,7 +517,7 @@ class MastodonApplication(models.Model):
client_secret = models.CharField(_("client secret"), max_length=200)
vapid_key = models.CharField(_("vapid key"), max_length=200, null=True, blank=True)
star_mode = models.PositiveIntegerField(
_("0: custom emoji; 1: unicode moon; 2: text"), blank=False, default=0
_("0: unicode moon; 1: custom emoji"), blank=False, default=0
)
max_status_len = models.PositiveIntegerField(
_("max toot len"), blank=False, default=500
@ -529,6 +530,24 @@ class MastodonApplication(models.Model):
def __str__(self):
return self.domain_name
def detect_configurations(self):
api_domain = self.api_domain or self.domain_name
url = f"https://{api_domain}/api/v1/instance"
response = get(url, headers={"User-Agent": settings.NEODB_USER_AGENT})
if response.status_code == 200:
j = response.json()
max_chars = (
j.get("configuration", {}).get("statuses", {}).get("max_characters")
)
if max_chars:
self.max_status_len = max_chars
url = f"https://{api_domain}/api/v1/custom_emojis"
response = get(url, headers={"User-Agent": settings.NEODB_USER_AGENT})
if response.status_code == 200:
j = response.json()
if next(filter(lambda e: e["shortcode"] == "star_half", j), None):
self.star_mode = 1
class Mastodon:
@staticmethod
@ -631,8 +650,8 @@ class MastodonAccount(SocialAccount):
def rating_to_emoji(self, rating_grade: int | None) -> str:
from journal.models.renderers import render_rating
app = self.application # TODO fix star mode data flip in app
return render_rating(rating_grade, (0 if app.star_mode else 1) if app else 0)
app = self.application
return render_rating(rating_grade, app.star_mode if app else 0)
def _get(self, url: str):
url = url if url.startswith("https://") else f"https://{self._api_domain}{url}"

View file

@ -416,6 +416,11 @@ class Migration(migrations.Migration):
),
("state", models.CharField(default="new", max_length=100)),
("state_changed", models.DateTimeField(auto_now_add=True)),
("state_next_attempt", models.DateTimeField(blank=True, null=True)),
(
"state_locked_until",
models.DateTimeField(blank=True, db_index=True, null=True),
),
(
"object_uri",
models.CharField(

View file

@ -20,33 +20,6 @@ if TYPE_CHECKING:
from users.models import User as NeoUser
def _int(s: str):
try:
return int(s)
except Exception:
return -1
def _rating_to_emoji(score: int, star_mode=0):
"""convert score(0~10) to mastodon star emoji code"""
if score is None or score == "" or score == 0:
return ""
solid_stars = score // 2
half_star = int(bool(score % 2))
empty_stars = 5 - solid_stars if not half_star else 5 - solid_stars - 1
if star_mode == 1:
emoji_code = "🌕" * solid_stars + "🌗" * half_star + "🌑" * empty_stars
else:
emoji_code = (
settings.STAR_SOLID * solid_stars
+ settings.STAR_HALF * half_star
+ settings.STAR_EMPTY * empty_stars
)
emoji_code = emoji_code.replace("::", ": :")
emoji_code = " " + emoji_code + " "
return emoji_code
class Takahe:
Visibilities = Post.Visibilities

View file

@ -1,54 +0,0 @@
from datetime import timedelta
from django.core.management.base import BaseCommand
from django.utils import timezone
from tqdm import tqdm
from catalog.common import jsondata
from mastodon.models import Email, MastodonAccount, mastodon
from mastodon.models.email import EmailAccount
from users.models import Preference, User
class Command(BaseCommand):
def handle(self, *args, **options):
m = 0
e = 0
qs = User.objects.filter(username__isnull=True)
print(f"Deleting {qs.count()} nameless users.")
qs.delete()
for user in tqdm(User.objects.filter(is_active=True)):
if user.mastodon_username:
MastodonAccount.objects.update_or_create(
handle=f"{user.mastodon_username}@{user.mastodon_site}",
defaults={
"user": user,
"uid": user.mastodon_id,
"domain": user.mastodon_site,
"created": user.date_joined,
"last_refresh": user.mastodon_last_refresh,
"last_reachable": user.mastodon_last_reachable,
"followers": user.mastodon_followers,
"following": user.mastodon_following,
"blocks": user.mastodon_blocks,
"mutes": user.mastodon_mutes,
"domain_blocks": user.mastodon_domain_blocks,
"account_data": user.mastodon_account,
"access_data": {
"access_token": jsondata.encrypt_str(user.mastodon_token)
},
},
)
m += 1
if user.email:
EmailAccount.objects.update_or_create(
handle=user.email,
defaults={
"user": user,
"uid": user.email.split("@")[0],
"domain": user.email.split("@")[1],
"created": user.date_joined,
},
)
e += 1
print(f"{m} Mastodon, {e} Email migrated.")