lib.itmens/common/utils.py
2024-05-09 11:22:29 -04:00

210 lines
6.7 KiB
Python

import functools
import uuid
from typing import TYPE_CHECKING
from discord import SyncWebhook
from django.conf import settings
from django.core.exceptions import PermissionDenied
from django.http import Http404, HttpRequest, HttpResponseRedirect, QueryDict
from django.utils import timezone
from django.utils.baseconv import base62
from django.utils.translation import gettext_lazy as _
from .config import PAGE_LINK_NUMBER
if TYPE_CHECKING:
from users.models import APIdentity, User
class AuthedHttpRequest(HttpRequest):
"""
A subclass of HttpRequest for type-checking only
"""
user: "User"
target_identity: "APIdentity"
class HTTPResponseHXRedirect(HttpResponseRedirect):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self["HX-Redirect"] = self["Location"]
status_code = 200
def user_identity_required(func): # TODO make this a middleware
@functools.wraps(func)
def wrapper(request, *args, **kwargs):
from users.models import APIdentity
identity = None
if request.user.is_authenticated:
try:
identity = APIdentity.objects.get(user=request.user)
except APIdentity.DoesNotExist:
return HttpResponseRedirect("/account/register")
request.identity = identity
return func(request, *args, **kwargs)
return wrapper
def target_identity_required(func):
@functools.wraps(func)
def wrapper(request, user_name, *args, **kwargs):
from users.models import APIdentity
try:
target = APIdentity.get_by_handle(user_name)
except APIdentity.DoesNotExist:
raise Http404(_("User not found"))
target_user = target.user
viewer = None
if target_user and not target_user.is_active:
raise Http404(_("User no longer exists"))
if request.user.is_authenticated:
try:
viewer = APIdentity.objects.get(user=request.user)
except APIdentity.DoesNotExist:
return HttpResponseRedirect("/account/register")
if request.user != target_user:
if target.is_blocking(viewer) or target.is_blocked_by(viewer):
raise PermissionDenied(_("Access denied"))
else:
viewer = None
request.target_identity = target
request.identity = viewer
return func(request, user_name, *args, **kwargs)
return wrapper
def profile_identity_required(func):
@functools.wraps(func)
def wrapper(request, user_name, *args, **kwargs):
from users.models import APIdentity
try:
target = APIdentity.get_by_handle(user_name, match_linked=True)
except APIdentity.DoesNotExist:
raise Http404(_("User not found"))
target_user = target.user
viewer = None
if target_user and not target_user.is_active:
raise Http404(_("User no longer exists"))
if request.user.is_authenticated:
try:
viewer = APIdentity.objects.get(user=request.user)
except APIdentity.DoesNotExist:
return HttpResponseRedirect("/account/register")
if request.user != target_user:
if target.is_blocking(viewer) or target.is_blocked_by(viewer):
raise PermissionDenied(_("Access denied"))
else:
viewer = None
request.target_identity = target
request.identity = viewer
return func(request, user_name, *args, **kwargs)
return wrapper
class PageLinksGenerator:
# TODO inherit django paginator
"""
Calculate the pages for multiple links pagination.
length -- the number of page links in pagination
"""
def __init__(
self, current_page: int, total_pages: int, query: QueryDict | None = None
):
length = PAGE_LINK_NUMBER
current_page = int(current_page)
self.query_string = ""
if query:
q = query.copy()
if q.get("page"):
q.pop("page")
self.query_string = q.urlencode()
if self.query_string:
self.query_string += "&"
self.current_page = current_page
self.previous_page = current_page - 1 if current_page > 1 else None
self.next_page = current_page + 1 if current_page < total_pages else None
self.start_page = 1
self.end_page = 1
self.page_range = None
self.has_prev = None
self.has_next = None
start_page = current_page - length // 2
end_page = current_page + length // 2
# decision is based on the start page and the end page
# both sides overflow
if (start_page < 1 and end_page > total_pages) or length >= total_pages:
self.start_page = 1
self.end_page = total_pages
self.has_prev = False
self.has_next = False
elif start_page < 1 and not end_page > total_pages:
self.start_page = 1
# this won't overflow because the total pages are more than the length
self.end_page = end_page - (start_page - 1)
self.has_prev = False
if end_page == total_pages:
self.has_next = False
else:
self.has_next = True
elif not start_page < 1 and end_page > total_pages:
self.end_page = total_pages
self.start_page = start_page - (end_page - total_pages)
self.has_next = False
if start_page == 1:
self.has_prev = False
else:
self.has_prev = True
# both sides do not overflow
elif not start_page < 1 and not end_page > total_pages:
self.start_page = start_page
self.end_page = end_page
self.has_prev = True
self.has_next = True
self.first_page = 1
self.last_page = total_pages
self.page_range = range(self.start_page, self.end_page + 1)
# assert self.has_prev is not None and self.has_next is not None
def GenerateDateUUIDMediaFilePath(filename, path_root):
ext = filename.split(".")[-1]
filename = "%s.%s" % (uuid.uuid4(), ext)
root = ""
if path_root.endswith("/"):
root = path_root
else:
root = path_root + "/"
return root + timezone.now().strftime("%Y/%m/%d") + f"{filename}"
def get_uuid_or_404(uuid_b62):
try:
i = base62.decode(uuid_b62)
return uuid.UUID(int=i)
except ValueError:
raise Http404("Malformed Base62 UUID")
def discord_send(channel, content, **args) -> bool:
dw = settings.DISCORD_WEBHOOKS.get(channel)
if not dw:
return False
webhook = SyncWebhook.from_url(dw)
webhook.send(content, **args)
return True