lib.itmens/users/management/commands/user.py

161 lines
5.7 KiB
Python
Raw Normal View History

from django.core.management.base import BaseCommand
2023-07-02 17:00:21 -04:00
from tqdm import tqdm
import httpx
2023-07-02 17:00:21 -04:00
from users.models import Preference, User
from takahe.models import Identity, Domain
from takahe.utils import Takahe
2023-07-02 17:00:21 -04:00
class Command(BaseCommand):
2024-12-27 05:18:43 -05:00
help = "Manage users"
2023-07-02 17:00:21 -04:00
def add_arguments(self, parser):
2024-12-27 05:18:43 -05:00
parser.add_argument("--list", action="store_true", help="list all users")
parser.add_argument("--verbose", action="store_true")
parser.add_argument("--fix", action="store_true")
2023-07-02 17:00:21 -04:00
parser.add_argument(
2024-12-27 05:18:43 -05:00
"--integrity",
2023-07-02 17:00:21 -04:00
action="store_true",
2024-12-27 05:18:43 -05:00
help="check and fix integrity for missing data for user models",
2023-07-02 17:00:21 -04:00
)
parser.add_argument(
"--remote",
action="store_true",
help="reset state for remote domains/users with previous connection issues",
)
2023-07-02 17:00:21 -04:00
parser.add_argument(
2024-12-27 05:18:43 -05:00
"--super", action="store", nargs="*", help="list or toggle superuser"
2023-07-02 17:00:21 -04:00
)
parser.add_argument(
2024-12-27 05:18:43 -05:00
"--staff", action="store", nargs="*", help="list or toggle staff"
2023-07-02 17:00:21 -04:00
)
2024-12-27 05:18:43 -05:00
parser.add_argument("--active", action="store", nargs="*", help="toggle active")
2023-07-02 17:00:21 -04:00
def handle(self, *args, **options):
self.verbose = options["verbose"]
self.fix = options["fix"]
2024-12-27 05:18:43 -05:00
self.users = User.objects.filter(is_active=True)
if options["list"]:
self.list(self.users)
2023-07-02 17:00:21 -04:00
if options["integrity"]:
self.integrity()
if options["remote"]:
self.check_remote()
2024-12-27 05:18:43 -05:00
if options["super"] is not None:
self.superuser(options["super"])
if options["staff"] is not None:
self.staff(options["staff"])
if options["active"]:
self.set_active(options["active"])
def list(self, users):
for user in users:
self.stdout.write(
user.username.ljust(20)
+ str(user.date_joined.date()).ljust(12)
2024-12-30 01:51:19 -05:00
+ str(user.last_login.date() if user.last_login else "").ljust(12)
2024-12-27 05:18:43 -05:00
+ str(list(user.social_accounts.all())),
)
2023-07-02 17:00:21 -04:00
def integrity(self):
count = 0
self.stdout.write("Checking local users")
2024-11-20 21:46:36 -05:00
for user in tqdm(User.objects.filter(is_active=True)):
i = user.identity.takahe_identity
if i.public_key is None:
count += 1
if self.fix:
i.generate_keypair()
if i.inbox_uri is None:
count += 1
if self.fix:
i.ensure_uris()
2023-07-02 17:00:21 -04:00
if not Preference.objects.filter(user=user).first():
if self.fix:
Preference.objects.create(user=user)
count += 1
def check_remote(self):
headers = {
"Accept": "application/json,application/activity+json,application/ld+json"
}
with httpx.Client(timeout=0.5) as client:
count = 0
self.stdout.write("Checking remote domains")
for d in tqdm(
Domain.objects.filter(
local=False, blocked=False, state="connection_issue"
)
):
try:
response = client.get(
f"https://{d.domain}/.well-known/nodeinfo",
follow_redirects=True,
headers=headers,
)
if response.status_code == 200 and "json" in response.headers.get(
"content-type", ""
):
count += 1
if self.fix:
Takahe.update_state(d, "outdated")
except Exception:
pass
self.stdout.write(f"{count} issues")
count = 0
self.stdout.write("Checking remote identities")
for i in tqdm(
Identity.objects.filter(
2025-01-17 22:07:46 -05:00
# public_key__isnull=True,
local=False,
restriction=0,
state="connection_issue",
)
):
try:
response = client.request(
"get",
i.actor_uri,
headers=headers,
follow_redirects=True,
)
if (
response.status_code == 200
and "json" in response.headers.get("content-type", "")
and "@context" in response.text
):
Takahe.update_state(i, "outdated")
except Exception:
pass
self.stdout.write(f"{count} issues")
2024-12-27 05:18:43 -05:00
def superuser(self, v):
if v == []:
self.stdout.write("Super users:")
self.list(self.users.filter(is_superuser=True))
else:
for n in v:
u = User.objects.get(username=n, is_active=True)
u.is_superuser = not u.is_superuser
u.save()
self.stdout.write(f"update {u} superuser: {u.is_superuser}")
def staff(self, v):
if v == []:
self.stdout.write("Staff users:")
self.list(self.users.filter(is_staff=True))
else:
for n in v:
u = User.objects.get(username=n, is_active=True)
u.is_staff = not u.is_staff
u.save()
self.stdout.write(f"update {u} staff: {u.is_staff}")
def set_active(self, v):
for n in v:
u = User.objects.get(username=n)
u.is_active = not u.is_active
u.save()
self.stdout.write(f"update {u} is_active: {u.is_active}")