lib.itmens/mastodon/models/threads.py
2024-07-03 16:44:58 -04:00

247 lines
9.4 KiB
Python

import functools
from datetime import timedelta
from urllib.parse import quote
import requests
from django.conf import settings
from django.core.exceptions import RequestAborted
from django.http import HttpRequest
from django.urls import reverse
from django.utils import timezone
from loguru import logger
from catalog.common import jsondata
from .common import SocialAccount
get = functools.partial(
requests.get,
timeout=settings.THREADS_TIMEOUT,
headers={"User-Agent": settings.NEODB_USER_AGENT},
)
put = functools.partial(
requests.put,
timeout=settings.THREADS_TIMEOUT,
headers={"User-Agent": settings.NEODB_USER_AGENT},
)
post = functools.partial(
requests.post,
timeout=settings.THREADS_TIMEOUT,
headers={"User-Agent": settings.NEODB_USER_AGENT},
)
delete = functools.partial(
requests.post,
timeout=settings.THREADS_TIMEOUT,
headers={"User-Agent": settings.NEODB_USER_AGENT},
)
class Threads:
SCOPE = "threads_basic,threads_content_publish"
DOMAIN = "threads.net"
@staticmethod
def generate_auth_url(request: HttpRequest):
redirect_url = request.build_absolute_uri(reverse("mastodon:threads_oauth"))
url = f"https://threads.net/oauth/authorize?client_id={settings.THREADS_APP_ID}&redirect_uri={redirect_url}&scope={Threads.SCOPE}&response_type=code"
return url
@staticmethod
def obtain_token(
request: HttpRequest, code: str
) -> tuple[str, int, str] | tuple[None, None, None]:
redirect_url = request.build_absolute_uri(reverse("mastodon:threads_oauth"))
payload = {
"client_id": settings.THREADS_APP_ID,
"client_secret": settings.THREADS_APP_SECRET,
"redirect_uri": redirect_url,
"grant_type": "authorization_code",
"code": code,
}
url = "https://graph.threads.net/oauth/access_token"
try:
response = post(url, data=payload)
except Exception as e:
logger.warning(f"Error {url} {e}")
return None, None, None
if response.status_code != 200:
logger.warning(f"Error {url} {response.status_code}")
return None, None, None
data = response.json()
if data.get("error_type"):
logger.warning(f"Error {url} {data}")
return None, None, None
short_token = data.get("access_token")
user_id = data.get("user_id")
# exchange for a 60-days token
url = f"https://graph.threads.net/access_token?grant_type=th_exchange_token&client_secret={settings.THREADS_APP_SECRET}&access_token={short_token}"
try:
response = get(url)
except Exception as e:
logger.warning(f"Error {url} {e}")
return None, None, None
if response.status_code != 200:
logger.warning(f"Error {url} {response.status_code}")
return None, None, None
data = response.json()
if data.get("error_type"):
logger.warning(f"Error {url} {data}")
return None, None, None
return data.get("access_token"), data.get("expires_in"), str(user_id)
@staticmethod
def refresh_token(token: str) -> tuple[str, int] | tuple[None, None]:
url = f"https://graph.threads.net/refresh_access_token?grant_type=th_refresh_token&access_token={token}"
try:
response = get(url)
except Exception as e:
logger.warning(f"Error {url} {e}")
return None, None
if response.status_code != 200:
logger.warning(f"Error {url} {response.status_code}")
return None, None
data = response.json()
if data.get("error_type"):
logger.warning(f"Error {url} {data}")
return None, None
return data.get("access_token"), data.get("expires_in")
@staticmethod
def get_profile(
token: str, user_id: str | None = None
) -> dict[str, str | int] | None:
url = f'https://graph.threads.net/v1.0/{user_id or "me"}?fields=id,username,threads_profile_picture_url,threads_biography&access_token={token}'
try:
response = get(url)
except Exception as e:
logger.warning(f"Error {url} {e}")
return None
if response.status_code != 200:
logger.warning(f"Error {url} {response.status_code}")
return None
data = response.json()
if data.get("error_type"):
logger.warning(f"Error {url} {data}")
return None
return data
@staticmethod
def post_single(token: str, user_id: str, text: str):
url = f"https://graph.threads.net/v1.0/{user_id}/threads?media_type=TEXT&access_token={token}&text={quote(text)}"
response = post(url)
if response.status_code != 200:
return None
media_container_id = (response.json() or {}).get("id")
if not media_container_id:
return None
url = f"https://graph.threads.net/v1.0/{user_id}/threads_publish?creation_id={media_container_id}&access_token={token}"
response = post(url)
if response.status_code != 200:
return None
return (response.json() or {}).get("id")
@staticmethod
def get_single(token: str, media_id: str) -> dict | None:
# url = f"https://graph.threads.net/v1.0/{media_id}?fields=id,media_product_type,media_type,media_url,permalink,owner,username,text,timestamp,shortcode,thumbnail_url,children,is_quote_post&access_token={token}"
url = f"https://graph.threads.net/v1.0/{media_id}?fields=id,permalink,is_quote_post&access_token={token}"
response = post(url)
if response.status_code != 200:
return None
return response.json()
@staticmethod
def authenticate(request: HttpRequest, code: str) -> "ThreadsAccount | None":
token, expire, uid = Threads.obtain_token(request, code)
if not token or not expire:
return None
expires_at = timezone.now() + timedelta(seconds=expire)
existing_account = ThreadsAccount.objects.filter(
uid=uid, domain=Threads.DOMAIN
).first()
if existing_account:
existing_account.access_token = token
existing_account.token_expires_at = expires_at
existing_account.last_reachable = timezone.now()
existing_account.save(update_fields=["access_data", "last_reachable"])
existing_account.refresh()
return existing_account
account = ThreadsAccount()
account.uid = uid
account.access_token = token
account.domain = Threads.DOMAIN
account.token_expires_at = expires_at
account.refresh(save=False)
return account
class ThreadsAccount(SocialAccount):
access_token = jsondata.EncryptedTextField(
json_field_name="access_data", default=""
)
token_expires_at = jsondata.DateTimeField(json_field_name="access_data", null=True)
username = jsondata.CharField(json_field_name="account_data", default="")
threads_profile_picture_url = jsondata.CharField(
json_field_name="account_data", default=""
)
threads_biography = jsondata.CharField(json_field_name="account_data", default="")
@property
def url(self):
return f"https://threads.net/@{self.handle}"
def check_alive(self, save=True) -> bool:
# refresh token
if not self.access_token:
logger.warning(f"{self} token missing")
return False
if self.token_expires_at and timezone.now() > self.token_expires_at:
logger.warning(f"{self} token expired")
return False
if self.last_reachable and timezone.now() < self.last_reachable + timedelta(
hours=1
):
return True
token, expire = Threads.refresh_token(self.access_token)
if not token or not expire:
return False
self.access_token = token
self.last_reachable = timezone.now()
self.token_expires_at = self.last_reachable + timedelta(seconds=expire)
if save:
self.save(update_fields=["access_data", "last_reachable"])
return True
def refresh(self, save=True) -> bool:
if not self.access_token:
logger.warning(f"{self} token missing")
return False
if self.token_expires_at and timezone.now() > self.token_expires_at:
logger.warning(f"{self} token expired")
return False
data = Threads.get_profile(self.access_token)
if not data:
logger.warning(f"{self} unable to get profile")
return False
if self.handle != data["username"]:
if self.handle:
logger.info(f'{self} handle changed to {data["username"]}')
self.handle = data["username"]
self.account_data = data
self.last_refresh = timezone.now()
if save:
self.save(update_fields=["account_data", "handle", "last_refresh"])
return True
def post(self, content: str, **kwargs):
text = (
content.replace(settings.STAR_SOLID + " ", "🌕")
.replace(settings.STAR_HALF + " ", "🌗")
.replace(settings.STAR_EMPTY + " ", "🌑")
)
media_id = Threads.post_single(self.access_token, self.uid, text)
if media_id:
d = Threads.get_single(self.access_token, media_id)
if d:
return {"id": media_id, "url": d["permalink"]}
raise RequestAborted()