async update index when tag or rating change

This commit is contained in:
Your Name 2023-06-26 00:16:14 -04:00 committed by Henri Dickson
parent e12ef38650
commit 9ebdfc1144
3 changed files with 96 additions and 34 deletions

View file

@ -64,20 +64,9 @@ class Command(BaseCommand):
) # if h == 0 else c.objects.filter(edited_time__gt=timezone.now() - timedelta(hours=h))
pg = Paginator(qs.order_by("id"), BATCH_SIZE)
for p in tqdm(pg.page_range):
items = list(
map(
lambda o: Indexer.obj_to_dict(o),
[
x
for x in pg.get_page(p).object_list
if hasattr(x, "indexable_fields")
],
)
)
if items:
Indexer.replace_batch(items)
while Indexer.busy():
sleep(0.5)
Indexer.replace_batch(pg.get_page(p).object_list)
while Indexer.busy():
sleep(0.5)
def handle(self, *args, **options):
if options["init"]:

View file

@ -2,10 +2,14 @@ import types
import logging
import typesense
from typesense.exceptions import ObjectNotFound
from typesense.collection import Collection
from django.conf import settings
from django.db.models.signals import post_save, post_delete
from catalog.models import Item
from pprint import pprint
import django_rq
from rq.job import Job
from django_redis import get_redis_connection
INDEX_NAME = "catalog"
SEARCHABLE_ATTRIBUTES = [
@ -48,22 +52,66 @@ SEARCH_PAGE_SIZE = 20
logger = logging.getLogger(__name__)
def item_post_save_handler(sender, instance, created, **kwargs):
_PENDING_INDEX_KEY = "pending_index_ids"
_PENDING_INDEX_QUEUE = "import"
_PENDING_INDEX_JOB_ID = "pending_index_flush2"
def _update_index_task():
item_ids = get_redis_connection("default").spop(_PENDING_INDEX_KEY, 1000)
updated = 0
while item_ids:
items = Item.objects.filter(id__in=item_ids)
Indexer.replace_batch(items)
updated += len(items)
item_ids = get_redis_connection("default").spop(_PENDING_INDEX_KEY, 1000)
logger.info(f"Index updated for {updated} items")
def enqueue_update_index(item_ids):
if not item_ids:
return
get_redis_connection("default").sadd(_PENDING_INDEX_KEY, *item_ids)
queued = False
try:
job = Job.fetch(
id=_PENDING_INDEX_JOB_ID,
connection=django_rq.get_connection(_PENDING_INDEX_QUEUE),
)
queued = job.get_status() == "queued"
except:
queued = False
if not queued:
django_rq.get_queue(_PENDING_INDEX_QUEUE).enqueue(
_update_index_task, job_id=_PENDING_INDEX_JOB_ID
)
def _item_post_save_handler(sender, instance, created, **kwargs):
if not created and settings.SEARCH_INDEX_NEW_ONLY:
return
Indexer.replace_item(instance)
def item_post_delete_handler(sender, instance, **kwargs):
def _item_post_delete_handler(sender, instance, **kwargs):
Indexer.delete_item(instance)
def tag_post_save_handler(sender, instance, **kwargs):
def _list_post_save_handler(sender, instance, created, **kwargs):
ids = instance.items.all().values_list("id", flat=True)
enqueue_update_index(ids)
def _list_post_delete_handler(sender, instance, **kwargs):
pass
def tag_post_delete_handler(sender, instance, **kwargs):
pass
def _piece_post_save_handler(sender, instance, created, **kwargs):
enqueue_update_index([instance.item_id])
def _piece_post_delete_handler(sender, instance, **kwargs):
enqueue_update_index([instance.item_id])
class Indexer:
@ -71,10 +119,12 @@ class Indexer:
_instance = None
@classmethod
def instance(cls):
def instance(cls) -> Collection:
if cls._instance is None:
cls._instance = typesense.Client(settings.TYPESENSE_CONNECTION)
return cls._instance
cls._instance = typesense.Client(settings.TYPESENSE_CONNECTION).collections[
INDEX_NAME
]
return cls._instance # type: ignore
@classmethod
def config(cls):
@ -132,26 +182,26 @@ class Indexer:
@classmethod
def init(cls):
idx = cls.instance().collections
idx = typesense.Client(settings.TYPESENSE_CONNECTION).collections
if idx:
# idx.delete()
idx.create(cls.config())
@classmethod
def delete_index(cls):
idx = cls.instance().collections[INDEX_NAME]
idx = cls.instance()
if idx:
idx.delete()
@classmethod
def update_settings(cls):
idx = cls.instance().collections[INDEX_NAME]
idx = cls.instance()
if idx:
idx.update(cls.config())
@classmethod
def get_stats(cls):
idx = cls.instance().collections[INDEX_NAME]
idx = cls.instance()
if idx:
return idx.retrieve()
@ -180,8 +230,20 @@ class Indexer:
for f in ["imdb", "isbn", "barcode"]: # FIXME
if hasattr(i, f):
model.indexable_fields.append(f)
post_save.connect(item_post_save_handler, sender=model)
post_delete.connect(item_post_delete_handler, sender=model)
post_save.connect(_item_post_save_handler, sender=model)
post_delete.connect(_item_post_delete_handler, sender=model)
@classmethod
def register_list_model(cls, list_model):
post_save.connect(_list_post_save_handler, sender=list_model)
# post_delete.connect(_list_post_delete_handler, sender=list_model) # covered in list_model delete signal
post_save.connect(_piece_post_save_handler, sender=list_model.MEMBER_CLASS)
post_delete.connect(_piece_post_delete_handler, sender=list_model.MEMBER_CLASS)
@classmethod
def register_piece_model(cls, model):
post_save.connect(_piece_post_save_handler, sender=model)
post_delete.connect(_piece_post_delete_handler, sender=model)
@classmethod
def obj_to_dict(cls, obj):
@ -218,7 +280,7 @@ class Indexer:
if obj.is_deleted or obj.merged_to_item_id:
return cls.delete_item(obj)
try:
cls.instance().collections[INDEX_NAME].documents.upsert(
cls.instance().documents.upsert(
cls.obj_to_dict(obj), {"dirty_values": "coerce_or_drop"}
)
except Exception as e:
@ -227,9 +289,15 @@ class Indexer:
@classmethod
def replace_batch(cls, objects):
try:
cls.instance().collections[INDEX_NAME].documents.import_(
objects, {"action": "upsert"}
items = list(
map(
lambda o: cls.obj_to_dict(o),
[x for x in objects if hasattr(x, "indexable_fields")],
)
)
# TODO check is_deleted=False, merged_to_item_id__isnull=True and call delete_batch()
if items:
cls.instance().documents.import_(items, {"action": "upsert"})
except Exception as e:
logger.error(f"replace batch error: \n{e}")
@ -237,9 +305,9 @@ class Indexer:
def delete_item(cls, obj):
pk = obj.uuid
try:
cls.instance().collections[INDEX_NAME].documents[pk].delete()
cls.instance().documents[pk].delete()
except Exception as e:
logger.error(f"delete item error: \n{e}")
logger.warn(f"delete item error: \n{e}")
@classmethod
def search(cls, q, page=1, category=None, tag=None, sort=None):
@ -266,7 +334,7 @@ class Indexer:
results = types.SimpleNamespace()
try:
r = cls.instance().collections[INDEX_NAME].documents.search(options)
r = cls.instance().documents.search(options)
# pprint(r)
results.items = list(
[

View file

@ -8,3 +8,8 @@ class JournalConfig(AppConfig):
def ready(self):
# load key modules in proper order, make sure class inject and signal works as expected
from . import api
from .models import Tag, Rating
from catalog.models import Indexer
Indexer.register_list_model(Tag)
Indexer.register_piece_model(Rating)