lib.itmens/common/utils.py

242 lines
7.9 KiB
Python
Raw Permalink Normal View History

2023-08-13 23:11:12 -04:00
import functools
2021-08-01 12:36:03 +02:00
import uuid
2023-07-20 21:59:49 -04:00
from typing import TYPE_CHECKING
2025-01-05 10:09:27 -05:00
import django_rq
2024-04-07 20:18:01 -04:00
from discord import SyncWebhook
from django.conf import settings
2024-07-05 22:18:13 -04:00
from django.core.exceptions import ObjectDoesNotExist, PermissionDenied
2024-10-27 14:45:18 -04:00
from django.core.paginator import Paginator
from django.core.signing import b62_decode
2023-12-30 22:20:15 -05:00
from django.http import Http404, HttpRequest, HttpResponseRedirect, QueryDict
2021-08-01 12:36:03 +02:00
from django.utils import timezone
2024-06-07 22:29:10 -04:00
from django.utils.translation import gettext as _
2021-08-01 12:36:03 +02:00
2024-10-27 14:45:18 -04:00
from .config import ITEMS_PER_PAGE, ITEMS_PER_PAGE_OPTIONS, PAGE_LINK_NUMBER
2024-12-30 01:51:19 -05:00
from .models import int_
2023-12-30 22:20:15 -05:00
2023-07-20 21:59:49 -04:00
if TYPE_CHECKING:
from users.models import APIdentity, User
class AuthedHttpRequest(HttpRequest):
"""
A subclass of HttpRequest for type-checking only
"""
user: "User"
target_identity: "APIdentity"
2023-01-11 19:11:31 -05:00
2023-08-13 18:00:10 -04:00
class HTTPResponseHXRedirect(HttpResponseRedirect):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self["HX-Redirect"] = self["Location"]
status_code = 200
2023-12-09 16:25:41 -05:00
def user_identity_required(func): # TODO make this a middleware
@functools.wraps(func)
def wrapper(request, *args, **kwargs):
from users.models import APIdentity
identity = None
if request.user.is_authenticated:
try:
identity = APIdentity.objects.get(user=request.user)
except APIdentity.DoesNotExist:
return HttpResponseRedirect("/account/register")
request.identity = identity
return func(request, *args, **kwargs)
return wrapper
2023-08-13 23:11:12 -04:00
def target_identity_required(func):
@functools.wraps(func)
def wrapper(request, user_name, *args, **kwargs):
from users.models import APIdentity
try:
2024-02-25 23:04:50 -05:00
target = APIdentity.get_by_handle(user_name)
except APIdentity.DoesNotExist:
2024-04-23 23:57:49 -04:00
raise Http404(_("User not found"))
2024-02-25 23:04:50 -05:00
target_user = target.user
viewer = None
if target_user and not target_user.is_active:
2024-04-23 23:57:49 -04:00
raise Http404(_("User no longer exists"))
2024-02-25 23:04:50 -05:00
if request.user.is_authenticated:
try:
viewer = APIdentity.objects.get(user=request.user)
except APIdentity.DoesNotExist:
return HttpResponseRedirect("/account/register")
if request.user != target_user:
if target.is_blocking(viewer) or target.is_blocked_by(viewer):
2024-04-23 23:57:49 -04:00
raise PermissionDenied(_("Access denied"))
2024-02-25 23:04:50 -05:00
else:
viewer = None
request.target_identity = target
request.identity = viewer
return func(request, user_name, *args, **kwargs)
return wrapper
def profile_identity_required(func):
@functools.wraps(func)
def wrapper(request, user_name, *args, **kwargs):
from users.models import APIdentity
try:
target = APIdentity.get_by_handle(user_name, match_linked=True)
# this should trigger ObjectDoesNotExist if Takahe identity is not sync-ed
blocked = target.restricted
2024-07-05 22:18:13 -04:00
except ObjectDoesNotExist:
2024-04-23 23:57:49 -04:00
raise Http404(_("User not found"))
target_user = target.user
2023-12-09 16:25:41 -05:00
viewer = None
if target_user and not target_user.is_active:
2024-04-23 23:57:49 -04:00
raise Http404(_("User no longer exists"))
if blocked:
raise PermissionDenied(_("Access denied"))
elif request.user.is_authenticated:
2023-12-09 16:25:41 -05:00
try:
viewer = APIdentity.objects.get(user=request.user)
except APIdentity.DoesNotExist:
return HttpResponseRedirect("/account/register")
if request.user != target_user:
if target.is_blocking(viewer) or target.is_blocked_by(viewer):
2024-04-23 23:57:49 -04:00
raise PermissionDenied(_("Access denied"))
2023-12-09 16:25:41 -05:00
else:
viewer = None
2023-08-13 23:11:12 -04:00
request.target_identity = target
2023-12-09 16:25:41 -05:00
request.identity = viewer
2023-08-13 23:11:12 -04:00
return func(request, user_name, *args, **kwargs)
return wrapper
2024-10-27 14:45:18 -04:00
class CustomPaginator(Paginator):
def __init__(self, object_list, request=None) -> None:
per_page = ITEMS_PER_PAGE
if request:
try:
if request.GET.get("per_page"):
2024-12-30 01:51:19 -05:00
per_page = int_(request.GET.get("per_page"))
2024-10-27 14:45:18 -04:00
elif request.COOKIES.get("per_page"):
2024-12-30 01:51:19 -05:00
per_page = int_(request.COOKIES.get("per_page"))
2024-10-27 14:45:18 -04:00
except ValueError:
pass
if per_page not in ITEMS_PER_PAGE_OPTIONS:
per_page = ITEMS_PER_PAGE
super().__init__(object_list, per_page)
2020-07-03 15:36:23 +08:00
class PageLinksGenerator:
# TODO inherit django paginator
"""
Calculate the pages for multiple links pagination.
length -- the number of page links in pagination
"""
2023-01-11 19:11:31 -05:00
2023-12-30 22:20:15 -05:00
def __init__(
self, current_page: int, total_pages: int, query: QueryDict | None = None
):
length = PAGE_LINK_NUMBER
2024-12-30 01:51:19 -05:00
current_page = int_(current_page)
2023-12-30 22:20:15 -05:00
self.query_string = ""
if query:
q = query.copy()
if q.get("page"):
q.pop("page")
self.query_string = q.urlencode()
if self.query_string:
self.query_string += "&"
2020-07-03 15:36:23 +08:00
self.current_page = current_page
2022-01-22 14:04:21 -05:00
self.previous_page = current_page - 1 if current_page > 1 else None
self.next_page = current_page + 1 if current_page < total_pages else None
self.start_page = 1
self.end_page = 1
2020-07-03 15:36:23 +08:00
self.page_range = None
self.has_prev = None
self.has_next = None
start_page = current_page - length // 2
end_page = current_page + length // 2
# decision is based on the start page and the end page
# both sides overflow
2023-01-11 19:11:31 -05:00
if (start_page < 1 and end_page > total_pages) or length >= total_pages:
2020-07-03 15:36:23 +08:00
self.start_page = 1
self.end_page = total_pages
self.has_prev = False
self.has_next = False
2023-01-11 19:11:31 -05:00
2020-07-03 15:36:23 +08:00
elif start_page < 1 and not end_page > total_pages:
self.start_page = 1
# this won't overflow because the total pages are more than the length
self.end_page = end_page - (start_page - 1)
self.has_prev = False
if end_page == total_pages:
self.has_next = False
else:
self.has_next = True
2023-01-11 19:11:31 -05:00
2020-07-03 15:36:23 +08:00
elif not start_page < 1 and end_page > total_pages:
self.end_page = total_pages
self.start_page = start_page - (end_page - total_pages)
self.has_next = False
if start_page == 1:
self.has_prev = False
else:
self.has_prev = True
# both sides do not overflow
elif not start_page < 1 and not end_page > total_pages:
self.start_page = start_page
self.end_page = end_page
self.has_prev = True
self.has_next = True
self.first_page = 1
self.last_page = total_pages
self.page_range = range(self.start_page, self.end_page + 1)
2020-10-03 23:27:41 +02:00
# assert self.has_prev is not None and self.has_next is not None
2024-01-10 22:20:57 -05:00
def GenerateDateUUIDMediaFilePath(filename, path_root):
2023-01-11 19:11:31 -05:00
ext = filename.split(".")[-1]
2021-08-01 12:36:03 +02:00
filename = "%s.%s" % (uuid.uuid4(), ext)
2023-01-11 19:11:31 -05:00
root = ""
if path_root.endswith("/"):
2021-08-01 12:36:03 +02:00
root = path_root
else:
2023-01-11 19:11:31 -05:00
root = path_root + "/"
return root + timezone.now().strftime("%Y/%m/%d") + f"{filename}"
def get_uuid_or_404(uuid_b62):
try:
2024-05-29 10:48:45 -04:00
i = b62_decode(uuid_b62)
return uuid.UUID(int=i)
except ValueError:
raise Http404("Malformed Base62 UUID")
2024-04-07 20:18:01 -04:00
def discord_send(channel, content, **args) -> bool:
2025-01-05 10:09:27 -05:00
dw = settings.DISCORD_WEBHOOKS.get(channel) or settings.DISCORD_WEBHOOKS.get(
"default"
)
2024-04-07 20:18:01 -04:00
if not dw:
return False
2024-12-30 17:50:22 -05:00
if "thread_name" in args:
args["thread_name"] = args["thread_name"][:99]
2025-01-05 10:09:27 -05:00
django_rq.get_queue("fetch").enqueue(_discord_send, dw, content, **args)
return True
def _discord_send(dw, content, **args):
2024-04-07 20:18:01 -04:00
webhook = SyncWebhook.from_url(dw)
webhook.send(content, **args)