From 9ebdfc1144bdae749344ff1d39d328f82212645b Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 26 Jun 2023 00:16:14 -0400 Subject: [PATCH] async update index when tag or rating change --- catalog/management/commands/index.py | 17 +---- catalog/search/typesense.py | 108 ++++++++++++++++++++++----- journal/apps.py | 5 ++ 3 files changed, 96 insertions(+), 34 deletions(-) diff --git a/catalog/management/commands/index.py b/catalog/management/commands/index.py index cb808ea4..3eb96a58 100644 --- a/catalog/management/commands/index.py +++ b/catalog/management/commands/index.py @@ -64,20 +64,9 @@ class Command(BaseCommand): ) # if h == 0 else c.objects.filter(edited_time__gt=timezone.now() - timedelta(hours=h)) pg = Paginator(qs.order_by("id"), BATCH_SIZE) for p in tqdm(pg.page_range): - items = list( - map( - lambda o: Indexer.obj_to_dict(o), - [ - x - for x in pg.get_page(p).object_list - if hasattr(x, "indexable_fields") - ], - ) - ) - if items: - Indexer.replace_batch(items) - while Indexer.busy(): - sleep(0.5) + Indexer.replace_batch(pg.get_page(p).object_list) + while Indexer.busy(): + sleep(0.5) def handle(self, *args, **options): if options["init"]: diff --git a/catalog/search/typesense.py b/catalog/search/typesense.py index 94e5a05b..fc57b219 100644 --- a/catalog/search/typesense.py +++ b/catalog/search/typesense.py @@ -2,10 +2,14 @@ import types import logging import typesense from typesense.exceptions import ObjectNotFound +from typesense.collection import Collection from django.conf import settings from django.db.models.signals import post_save, post_delete from catalog.models import Item from pprint import pprint +import django_rq +from rq.job import Job +from django_redis import get_redis_connection INDEX_NAME = "catalog" SEARCHABLE_ATTRIBUTES = [ @@ -48,22 +52,66 @@ SEARCH_PAGE_SIZE = 20 logger = logging.getLogger(__name__) -def item_post_save_handler(sender, instance, created, **kwargs): +_PENDING_INDEX_KEY = "pending_index_ids" +_PENDING_INDEX_QUEUE = "import" +_PENDING_INDEX_JOB_ID = "pending_index_flush2" + + +def _update_index_task(): + item_ids = get_redis_connection("default").spop(_PENDING_INDEX_KEY, 1000) + updated = 0 + while item_ids: + items = Item.objects.filter(id__in=item_ids) + Indexer.replace_batch(items) + updated += len(items) + item_ids = get_redis_connection("default").spop(_PENDING_INDEX_KEY, 1000) + logger.info(f"Index updated for {updated} items") + + +def enqueue_update_index(item_ids): + if not item_ids: + return + get_redis_connection("default").sadd(_PENDING_INDEX_KEY, *item_ids) + queued = False + try: + job = Job.fetch( + id=_PENDING_INDEX_JOB_ID, + connection=django_rq.get_connection(_PENDING_INDEX_QUEUE), + ) + queued = job.get_status() == "queued" + except: + queued = False + if not queued: + django_rq.get_queue(_PENDING_INDEX_QUEUE).enqueue( + _update_index_task, job_id=_PENDING_INDEX_JOB_ID + ) + + +def _item_post_save_handler(sender, instance, created, **kwargs): if not created and settings.SEARCH_INDEX_NEW_ONLY: return Indexer.replace_item(instance) -def item_post_delete_handler(sender, instance, **kwargs): +def _item_post_delete_handler(sender, instance, **kwargs): Indexer.delete_item(instance) -def tag_post_save_handler(sender, instance, **kwargs): +def _list_post_save_handler(sender, instance, created, **kwargs): + ids = instance.items.all().values_list("id", flat=True) + enqueue_update_index(ids) + + +def _list_post_delete_handler(sender, instance, **kwargs): pass -def tag_post_delete_handler(sender, instance, **kwargs): - pass +def _piece_post_save_handler(sender, instance, created, **kwargs): + enqueue_update_index([instance.item_id]) + + +def _piece_post_delete_handler(sender, instance, **kwargs): + enqueue_update_index([instance.item_id]) class Indexer: @@ -71,10 +119,12 @@ class Indexer: _instance = None @classmethod - def instance(cls): + def instance(cls) -> Collection: if cls._instance is None: - cls._instance = typesense.Client(settings.TYPESENSE_CONNECTION) - return cls._instance + cls._instance = typesense.Client(settings.TYPESENSE_CONNECTION).collections[ + INDEX_NAME + ] + return cls._instance # type: ignore @classmethod def config(cls): @@ -132,26 +182,26 @@ class Indexer: @classmethod def init(cls): - idx = cls.instance().collections + idx = typesense.Client(settings.TYPESENSE_CONNECTION).collections if idx: # idx.delete() idx.create(cls.config()) @classmethod def delete_index(cls): - idx = cls.instance().collections[INDEX_NAME] + idx = cls.instance() if idx: idx.delete() @classmethod def update_settings(cls): - idx = cls.instance().collections[INDEX_NAME] + idx = cls.instance() if idx: idx.update(cls.config()) @classmethod def get_stats(cls): - idx = cls.instance().collections[INDEX_NAME] + idx = cls.instance() if idx: return idx.retrieve() @@ -180,8 +230,20 @@ class Indexer: for f in ["imdb", "isbn", "barcode"]: # FIXME if hasattr(i, f): model.indexable_fields.append(f) - post_save.connect(item_post_save_handler, sender=model) - post_delete.connect(item_post_delete_handler, sender=model) + post_save.connect(_item_post_save_handler, sender=model) + post_delete.connect(_item_post_delete_handler, sender=model) + + @classmethod + def register_list_model(cls, list_model): + post_save.connect(_list_post_save_handler, sender=list_model) + # post_delete.connect(_list_post_delete_handler, sender=list_model) # covered in list_model delete signal + post_save.connect(_piece_post_save_handler, sender=list_model.MEMBER_CLASS) + post_delete.connect(_piece_post_delete_handler, sender=list_model.MEMBER_CLASS) + + @classmethod + def register_piece_model(cls, model): + post_save.connect(_piece_post_save_handler, sender=model) + post_delete.connect(_piece_post_delete_handler, sender=model) @classmethod def obj_to_dict(cls, obj): @@ -218,7 +280,7 @@ class Indexer: if obj.is_deleted or obj.merged_to_item_id: return cls.delete_item(obj) try: - cls.instance().collections[INDEX_NAME].documents.upsert( + cls.instance().documents.upsert( cls.obj_to_dict(obj), {"dirty_values": "coerce_or_drop"} ) except Exception as e: @@ -227,9 +289,15 @@ class Indexer: @classmethod def replace_batch(cls, objects): try: - cls.instance().collections[INDEX_NAME].documents.import_( - objects, {"action": "upsert"} + items = list( + map( + lambda o: cls.obj_to_dict(o), + [x for x in objects if hasattr(x, "indexable_fields")], + ) ) + # TODO check is_deleted=False, merged_to_item_id__isnull=True and call delete_batch() + if items: + cls.instance().documents.import_(items, {"action": "upsert"}) except Exception as e: logger.error(f"replace batch error: \n{e}") @@ -237,9 +305,9 @@ class Indexer: def delete_item(cls, obj): pk = obj.uuid try: - cls.instance().collections[INDEX_NAME].documents[pk].delete() + cls.instance().documents[pk].delete() except Exception as e: - logger.error(f"delete item error: \n{e}") + logger.warn(f"delete item error: \n{e}") @classmethod def search(cls, q, page=1, category=None, tag=None, sort=None): @@ -266,7 +334,7 @@ class Indexer: results = types.SimpleNamespace() try: - r = cls.instance().collections[INDEX_NAME].documents.search(options) + r = cls.instance().documents.search(options) # pprint(r) results.items = list( [ diff --git a/journal/apps.py b/journal/apps.py index 4c656fc9..b33dd330 100644 --- a/journal/apps.py +++ b/journal/apps.py @@ -8,3 +8,8 @@ class JournalConfig(AppConfig): def ready(self): # load key modules in proper order, make sure class inject and signal works as expected from . import api + from .models import Tag, Rating + from catalog.models import Indexer + + Indexer.register_list_model(Tag) + Indexer.register_piece_model(Rating)