refactor accounts sync
This commit is contained in:
parent
4cc0fdd0ac
commit
e29b921d34
14 changed files with 131 additions and 137 deletions
|
@ -560,7 +560,7 @@ CORS_ALLOW_METHODS = (
|
|||
# "PUT",
|
||||
)
|
||||
|
||||
DEACTIVATE_AFTER_UNREACHABLE_DAYS = 120
|
||||
DEACTIVATE_AFTER_UNREACHABLE_DAYS = 365
|
||||
|
||||
DEFAULT_RELAY_SERVER = "https://relay.neodb.net/inbox"
|
||||
|
||||
|
|
|
@ -70,9 +70,9 @@ class Bluesky:
|
|||
account.session_string = session_string
|
||||
account.base_url = base_url
|
||||
if account.pk:
|
||||
account.refresh(save=True, did_refresh=False)
|
||||
account.refresh(save=True, did_check=False)
|
||||
else:
|
||||
account.refresh(save=False, did_refresh=False)
|
||||
account.refresh(save=False, did_check=False)
|
||||
return account
|
||||
|
||||
|
||||
|
@ -108,40 +108,53 @@ class BlueskyAccount(SocialAccount):
|
|||
def url(self):
|
||||
return f"https://{self.handle}"
|
||||
|
||||
def refresh(self, save=True, did_refresh=True):
|
||||
if did_refresh:
|
||||
did = self.uid
|
||||
did_r = DidResolver()
|
||||
handle_r = HandleResolver(timeout=5)
|
||||
did_doc = did_r.resolve(did)
|
||||
if not did_doc:
|
||||
logger.warning(f"ATProto refresh failed: did {did} -> <missing doc>")
|
||||
return False
|
||||
resolved_handle = did_doc.get_handle()
|
||||
if not resolved_handle:
|
||||
logger.warning(f"ATProto refresh failed: did {did} -> <missing handle>")
|
||||
return False
|
||||
resolved_did = handle_r.resolve(resolved_handle)
|
||||
resolved_pds = did_doc.get_pds_endpoint()
|
||||
if did != resolved_did:
|
||||
logger.warning(
|
||||
f"ATProto refresh failed: did {did} -> handle {resolved_handle} -> did {resolved_did}"
|
||||
)
|
||||
return False
|
||||
if resolved_handle != self.handle:
|
||||
logger.debug(
|
||||
f"ATProto refresh: handle changed for did {did}: handle {self.handle} -> {resolved_handle}"
|
||||
)
|
||||
self.handle = resolved_handle
|
||||
if resolved_pds != self.base_url:
|
||||
logger.debug(
|
||||
f"ATProto refresh: pds changed for did {did}: handle {self.base_url} -> {resolved_pds}"
|
||||
)
|
||||
self.base_url = resolved_pds
|
||||
def check_alive(self, save=True):
|
||||
did = self.uid
|
||||
did_r = DidResolver()
|
||||
handle_r = HandleResolver(timeout=5)
|
||||
did_doc = did_r.resolve(did)
|
||||
if not did_doc:
|
||||
logger.warning(f"ATProto refresh failed: did {did} -> <missing doc>")
|
||||
return False
|
||||
resolved_handle = did_doc.get_handle()
|
||||
if not resolved_handle:
|
||||
logger.warning(f"ATProto refresh failed: did {did} -> <missing handle>")
|
||||
return False
|
||||
resolved_did = handle_r.resolve(resolved_handle)
|
||||
resolved_pds = did_doc.get_pds_endpoint()
|
||||
if did != resolved_did:
|
||||
logger.warning(
|
||||
f"ATProto refresh failed: did {did} -> handle {resolved_handle} -> did {resolved_did}"
|
||||
)
|
||||
return False
|
||||
if resolved_handle != self.handle:
|
||||
logger.debug(
|
||||
f"ATProto refresh: handle changed for did {did}: handle {self.handle} -> {resolved_handle}"
|
||||
)
|
||||
self.handle = resolved_handle
|
||||
if resolved_pds != self.base_url:
|
||||
logger.debug(
|
||||
f"ATProto refresh: pds changed for did {did}: handle {self.base_url} -> {resolved_pds}"
|
||||
)
|
||||
self.base_url = resolved_pds
|
||||
self.last_reachable = timezone.now()
|
||||
if save:
|
||||
self.save(
|
||||
update_fields=[
|
||||
"access_data",
|
||||
"handle",
|
||||
"last_reachable",
|
||||
]
|
||||
)
|
||||
return True
|
||||
|
||||
def refresh(self, save=True, did_check=True):
|
||||
if did_check:
|
||||
self.check_alive(save=save)
|
||||
profile = self._client.me
|
||||
if not profile:
|
||||
logger.warning("Bluesky: client not logged in.") # this should not happen
|
||||
return None
|
||||
return False
|
||||
if self.handle != profile.handle:
|
||||
logger.warning(
|
||||
"ATProto refresh: handle mismatch {self.handle} from did doc -> {profile.handle} from PDS"
|
||||
|
@ -150,17 +163,14 @@ class BlueskyAccount(SocialAccount):
|
|||
k: v for k, v in profile.__dict__.items() if isinstance(v, (int, str))
|
||||
}
|
||||
self.last_refresh = timezone.now()
|
||||
self.last_reachable = self.last_refresh
|
||||
if save:
|
||||
self.save(
|
||||
update_fields=[
|
||||
"access_data",
|
||||
"account_data",
|
||||
"handle",
|
||||
"last_refresh",
|
||||
"last_reachable",
|
||||
]
|
||||
)
|
||||
return True
|
||||
|
||||
def post(
|
||||
self,
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
from datetime import timedelta
|
||||
|
||||
from django.db import models
|
||||
from django.db.models.functions import Lower
|
||||
from django.utils import timezone
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from loguru import logger
|
||||
from typedmodels.models import TypedModel
|
||||
|
||||
from catalog.common import jsondata
|
||||
|
@ -66,15 +69,12 @@ class SocialAccount(TypedModel):
|
|||
]
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"({self.pk}){self.platform}#{self.handle}:{self.uid}@{self.domain}"
|
||||
return f"({self.pk}){self.platform}@{self.handle}"
|
||||
|
||||
@property
|
||||
def platform(self) -> Platform:
|
||||
return Platform(self.type.replace("mastodon.", "", 1).replace("account", "", 1))
|
||||
|
||||
def sync_later(self):
|
||||
pass
|
||||
|
||||
def to_dict(self):
|
||||
# skip cached_property, datetime and other non-serializable fields
|
||||
d = {
|
||||
|
@ -101,5 +101,26 @@ class SocialAccount(TypedModel):
|
|||
def check_alive(self) -> bool:
|
||||
return False
|
||||
|
||||
def sync(self) -> bool:
|
||||
def refresh(self) -> bool:
|
||||
return False
|
||||
|
||||
def refresh_graph(self, save=True) -> bool:
|
||||
return False
|
||||
|
||||
def sync(self, skip_graph=False, sleep_hours=0) -> bool:
|
||||
if self.last_refresh and self.last_refresh > timezone.now() - timedelta(
|
||||
hours=sleep_hours
|
||||
):
|
||||
logger.debug(f"{self} skip refreshing as it's done recently")
|
||||
return False
|
||||
if not self.check_alive():
|
||||
dt = timezone.now() - self.last_reachable
|
||||
logger.warning(f"{self} unreachable for {dt.days} days")
|
||||
return False
|
||||
if not self.refresh():
|
||||
logger.warning(f"{self} refresh failed")
|
||||
return False
|
||||
if not skip_graph:
|
||||
self.refresh_graph()
|
||||
logger.debug(f"{self} refreshed")
|
||||
return True
|
||||
|
|
|
@ -15,7 +15,8 @@ _code_ttl = 60 * 15
|
|||
|
||||
|
||||
class EmailAccount(SocialAccount):
|
||||
pass
|
||||
def sync(self, skip_graph=False, sleep_hours=0) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
class Email:
|
||||
|
|
|
@ -576,6 +576,8 @@ class Mastodon:
|
|||
existing_account.account_data = mastodon_account.account_data
|
||||
existing_account.save(update_fields=["access_data", "account_data"])
|
||||
return existing_account
|
||||
# for fresh account, ping them for convenience
|
||||
Takahe.fetch_remote_identity(mastodon_account.handle)
|
||||
return mastodon_account
|
||||
|
||||
|
||||
|
@ -774,6 +776,7 @@ class MastodonAccount(SocialAccount):
|
|||
"domain_blocks",
|
||||
]
|
||||
)
|
||||
return True
|
||||
|
||||
def boost(self, post_url: str):
|
||||
boost_toot(self._api_domain, self.access_token, post_url)
|
||||
|
@ -830,9 +833,5 @@ class MastodonAccount(SocialAccount):
|
|||
raise PermissionDenied()
|
||||
raise RequestAborted()
|
||||
|
||||
def sync_later(self):
|
||||
Takahe.fetch_remote_identity(self.handle)
|
||||
# TODO
|
||||
|
||||
def get_reauthorize_url(self):
|
||||
return reverse("mastodon:login") + "?domain=" + self.domain
|
||||
|
|
|
@ -12,6 +12,7 @@ from django.utils import timezone
|
|||
from loguru import logger
|
||||
|
||||
from catalog.common import jsondata
|
||||
from takahe.utils import Takahe
|
||||
|
||||
from .common import SocialAccount
|
||||
|
||||
|
@ -183,6 +184,8 @@ class Threads:
|
|||
account.domain = Threads.DOMAIN
|
||||
account.token_expires_at = expires_at
|
||||
account.refresh(save=False)
|
||||
# for fresh account, ping them for convenience
|
||||
Takahe.fetch_remote_identity(account.handle + "@" + Threads.DOMAIN)
|
||||
return account
|
||||
|
||||
|
||||
|
@ -237,7 +240,7 @@ class ThreadsAccount(SocialAccount):
|
|||
if self.handle != data["username"]:
|
||||
if self.handle:
|
||||
logger.info(f'{self} handle changed to {data["username"]}')
|
||||
self.handle = data["username"]
|
||||
self.handle = str(data["username"])
|
||||
self.account_data = data
|
||||
self.last_refresh = timezone.now()
|
||||
if save:
|
||||
|
|
|
@ -8,6 +8,7 @@ from django.utils.translation import gettext as _
|
|||
|
||||
from common.views import render_error
|
||||
from mastodon.models.common import SocialAccount
|
||||
from users.models import User
|
||||
from users.views.account import auth_login, logout_takahe
|
||||
|
||||
|
||||
|
@ -24,12 +25,12 @@ def process_verified_account(request: HttpRequest, account: SocialAccount):
|
|||
|
||||
|
||||
def login_existing_user(request: HttpRequest, account: SocialAccount):
|
||||
user = authenticate(request, social_account=account)
|
||||
user: User | None = authenticate(request, social_account=account) # type:ignore
|
||||
if not user:
|
||||
return render_error(request, _("Authentication failed"), _("Invalid user."))
|
||||
existing_user = account.user
|
||||
auth_login(request, existing_user)
|
||||
account.sync_later()
|
||||
user.sync_accounts_later()
|
||||
if not existing_user.username or not existing_user.identity:
|
||||
# this should not happen
|
||||
response = redirect(reverse("users:register"))
|
||||
|
@ -50,7 +51,7 @@ def register_new_user(request: HttpRequest, account: SocialAccount):
|
|||
|
||||
def reconnect_account(request, account: SocialAccount):
|
||||
if account.user == request.user:
|
||||
account.sync_later()
|
||||
account.user.sync_accounts_later()
|
||||
messages.add_message(
|
||||
request,
|
||||
messages.INFO,
|
||||
|
@ -73,7 +74,7 @@ def reconnect_account(request, account: SocialAccount):
|
|||
del request.session["new_user"]
|
||||
return render(request, "users/welcome.html")
|
||||
else:
|
||||
account.sync_later()
|
||||
request.user.sync_accounts_later()
|
||||
messages.add_message(
|
||||
request,
|
||||
messages.INFO,
|
||||
|
|
|
@ -15,12 +15,12 @@ class MastodonUserSync(BaseJob):
|
|||
interval = timedelta(hours=interval_hours)
|
||||
|
||||
def run(self):
|
||||
logger.info("Mastodon User Sync start.")
|
||||
inactive_threshold = timezone.now() - timedelta(days=90)
|
||||
batch = (24 + self.interval_hours - 1) // self.interval_hours
|
||||
if batch < 1:
|
||||
batch = 1
|
||||
m = timezone.now().hour // self.interval_hours
|
||||
inactive_threshold = timezone.now() - timedelta(days=30)
|
||||
batches = (24 + self.interval_hours - 1) // self.interval_hours
|
||||
if batches < 1:
|
||||
batches = 1
|
||||
batch = timezone.now().hour // self.interval_hours
|
||||
logger.info(f"User accounts sync job starts batch {batch+1} of {batches}")
|
||||
qs = (
|
||||
User.objects.exclude(
|
||||
preference__mastodon_skip_userinfo=True,
|
||||
|
@ -30,15 +30,15 @@ class MastodonUserSync(BaseJob):
|
|||
username__isnull=False,
|
||||
is_active=True,
|
||||
)
|
||||
.annotate(idmod=F("id") % batch)
|
||||
.filter(idmod=m)
|
||||
.annotate(idmod=F("id") % batches)
|
||||
.filter(idmod=batch)
|
||||
)
|
||||
for user in qs.iterator():
|
||||
skip_detail = False
|
||||
skip_graph = False
|
||||
if not user.last_login or user.last_login < inactive_threshold:
|
||||
last_usage = user.last_usage
|
||||
if not last_usage or last_usage < inactive_threshold:
|
||||
logger.info(f"Skip {user} detail because of inactivity.")
|
||||
skip_detail = True
|
||||
user.refresh_mastodon_data(skip_detail, self.interval_hours)
|
||||
logger.info("Mastodon User Sync finished.")
|
||||
skip_graph = True
|
||||
logger.debug(f"User accounts sync for {user}, skip_graph:{skip_graph}")
|
||||
user.sync_accounts(skip_graph, self.interval_hours)
|
||||
logger.info("User accounts sync job finished.")
|
||||
|
|
|
@ -1,24 +1,6 @@
|
|||
import hashlib
|
||||
import re
|
||||
from functools import cached_property
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import AbstractUser
|
||||
from django.core import validators
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.core.serializers.json import DjangoJSONEncoder
|
||||
from django.db import models
|
||||
from django.db.models import F, Q, Value
|
||||
from django.db.models.functions import Concat, Lower
|
||||
from django.templatetags.static import static
|
||||
from django.urls import reverse
|
||||
from django.utils import timezone
|
||||
from django.utils.deconstruct import deconstructible
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from loguru import logger
|
||||
|
||||
from mastodon.api import *
|
||||
from takahe.utils import Takahe
|
||||
|
||||
from .user import User
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ from datetime import timedelta
|
|||
from functools import cached_property
|
||||
from typing import TYPE_CHECKING, ClassVar
|
||||
|
||||
import django_rq
|
||||
import httpx
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import AbstractUser, BaseUserManager
|
||||
|
@ -221,7 +222,7 @@ class User(AbstractUser):
|
|||
return settings.SITE_INFO["site_url"] + self.url
|
||||
|
||||
def __str__(self):
|
||||
return f'USER:{self.pk}:{self.username or "<missing>"}:{self.mastodon or self.email_account or ""}'
|
||||
return f'{self.pk}:{self.username or "<missing>"}'
|
||||
|
||||
@property
|
||||
def registration_complete(self):
|
||||
|
@ -331,49 +332,44 @@ class User(AbstractUser):
|
|||
if url:
|
||||
try:
|
||||
r = httpx.get(url)
|
||||
f = ContentFile(r.content, name=identity.icon_uri.split("/")[-1])
|
||||
identity.icon.save(f.name, f, save=False)
|
||||
changed = True
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"fetch icon failed: {identity} {identity.icon_uri}",
|
||||
f"fetch icon failed: {identity} {url}",
|
||||
extra={"exception": e},
|
||||
)
|
||||
r = None
|
||||
if r:
|
||||
name = str(self.pk) + "-" + url.split("/")[-1].split("?")[0][-100:]
|
||||
f = ContentFile(r.content, name=name)
|
||||
identity.icon.save(name, f, save=False)
|
||||
changed = True
|
||||
if changed:
|
||||
identity.save()
|
||||
Takahe.update_state(identity, "outdated")
|
||||
|
||||
def refresh_mastodon_data(self, skip_detail=False, sleep_hours=0):
|
||||
"""Try refresh account data from mastodon server, return True if refreshed successfully"""
|
||||
mastodon = self.mastodon
|
||||
if not mastodon:
|
||||
return False
|
||||
if mastodon.last_refresh and mastodon.last_refresh > timezone.now() - timedelta(
|
||||
hours=sleep_hours
|
||||
):
|
||||
logger.debug(f"Skip refreshing Mastodon data for {self}")
|
||||
return
|
||||
logger.debug(f"Refreshing Mastodon data for {self}")
|
||||
if not mastodon.check_alive():
|
||||
if (
|
||||
timezone.now() - self.mastodon_last_reachable
|
||||
> timedelta(days=settings.DEACTIVATE_AFTER_UNREACHABLE_DAYS)
|
||||
and not self.email
|
||||
):
|
||||
logger.warning(f"Deactivate {self} bc unable to reach for too long")
|
||||
self.is_active = False
|
||||
self.save(update_fields=["is_active"])
|
||||
return False
|
||||
if not mastodon.refresh():
|
||||
return False
|
||||
if skip_detail:
|
||||
return True
|
||||
def sync_accounts(self, skip_graph=False, sleep_hours=0):
|
||||
"""Try refresh account data from 3p server"""
|
||||
for account in self.social_accounts.all():
|
||||
account.sync(skip_graph=skip_graph, sleep_hours=sleep_hours)
|
||||
if not self.preference.mastodon_skip_userinfo:
|
||||
self.sync_identity()
|
||||
if skip_graph:
|
||||
return
|
||||
if not self.preference.mastodon_skip_relationship:
|
||||
mastodon.refresh_graph()
|
||||
self.sync_relationship()
|
||||
return True
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def sync_accounts_task(user_id):
|
||||
user = User.objects.get(pk=user_id)
|
||||
logger.info(f"{user} accounts sync start")
|
||||
if user.sync_accounts():
|
||||
logger.info(f"{user} accounts sync done")
|
||||
else:
|
||||
logger.warning(f"{user} accounts sync failed")
|
||||
|
||||
def sync_accounts_later(self):
|
||||
django_rq.get_queue("mastodon").enqueue(User.sync_accounts_task, self.pk)
|
||||
|
||||
@cached_property
|
||||
def unread_announcements(self):
|
||||
|
|
|
@ -1,14 +0,0 @@
|
|||
from loguru import logger
|
||||
|
||||
from .models import User
|
||||
|
||||
|
||||
def refresh_mastodon_data_task(user_id):
|
||||
user = User.objects.get(pk=user_id)
|
||||
if not user.mastodon:
|
||||
logger.info(f"{user} mastodon data refresh skipped")
|
||||
return
|
||||
if user.refresh_mastodon_data():
|
||||
logger.info(f"{user} mastodon data refreshed")
|
||||
else:
|
||||
logger.warning(f"{user} mastodon data refresh failed")
|
|
@ -13,7 +13,6 @@ from common.utils import (
|
|||
HTTPResponseHXRedirect,
|
||||
target_identity_required,
|
||||
)
|
||||
from mastodon.api import *
|
||||
from takahe.utils import Takahe
|
||||
|
||||
from ..models import APIdentity
|
||||
|
|
|
@ -19,10 +19,8 @@ from journal.importers.goodreads import GoodreadsImporter
|
|||
from journal.importers.letterboxd import LetterboxdImporter
|
||||
from journal.importers.opml import OPMLImporter
|
||||
from journal.models import ShelfType, reset_journal_visibility_for_user
|
||||
from mastodon.api import *
|
||||
from social.models import reset_social_visibility_for_user
|
||||
|
||||
from ..tasks import *
|
||||
from .account import *
|
||||
|
||||
|
||||
|
@ -146,10 +144,8 @@ def export_marks(request):
|
|||
|
||||
@login_required
|
||||
def sync_mastodon(request):
|
||||
if request.method == "POST" and request.user.mastodon:
|
||||
django_rq.get_queue("mastodon").enqueue(
|
||||
refresh_mastodon_data_task, request.user.pk
|
||||
)
|
||||
if request.method == "POST":
|
||||
request.user.sync_accounts_later()
|
||||
messages.add_message(request, messages.INFO, _("Sync in progress."))
|
||||
return redirect(reverse("users:info"))
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue