diff --git a/common/index.py b/common/index.py index c2389db2..5494802e 100644 --- a/common/index.py +++ b/common/index.py @@ -1,167 +1,7 @@ -import logging -import meilisearch from django.conf import settings -from django.db.models.signals import post_save, post_delete -INDEX_NAME = 'items' -INDEX_SEARCHABLE_ATTRIBUTES = ['title', 'orig_title', 'other_title', 'subtitle', 'artist', 'author', 'translator', 'developer', 'director', 'actor', 'playwright', 'pub_house', 'company', 'publisher', 'isbn', 'imdb_code'] -INDEXABLE_DIRECT_TYPES = ['BigAutoField', 'BooleanField', 'CharField', 'PositiveIntegerField', 'PositiveSmallIntegerField', 'TextField', 'ArrayField'] -INDEXABLE_TIME_TYPES = ['DateTimeField'] -INDEXABLE_DICT_TYPES = ['JSONField'] -INDEXABLE_FLOAT_TYPES = ['DecimalField'] -# NONINDEXABLE_TYPES = ['ForeignKey', 'FileField',] -SEARCH_PAGE_SIZE = 20 - - -logger = logging.getLogger(__name__) - - -def item_post_save_handler(sender, instance, created, **kwargs): - if not created and settings.MEILISEARCH_INDEX_NEW_ONLY: - return - Indexer.replace_item(instance) - - -def item_post_delete_handler(sender, instance, **kwargs): - Indexer.delete_item(instance) - - -def tag_post_save_handler(sender, instance, **kwargs): - pass - - -def tag_post_delete_handler(sender, instance, **kwargs): - pass - - -class Indexer: - class_map = {} - _instance = None - - @classmethod - def instance(self): - if self._instance is None: - self._instance = meilisearch.Client(settings.MEILISEARCH_SERVER, settings.MEILISEARCH_KEY).index(INDEX_NAME) - return self._instance - - @classmethod - def init(self): - meilisearch.Client(settings.MEILISEARCH_SERVER, settings.MEILISEARCH_KEY).create_index(INDEX_NAME, {'primaryKey': '_id'}) - self.update_settings() - - @classmethod - def update_settings(self): - self.instance().update_searchable_attributes(INDEX_SEARCHABLE_ATTRIBUTES) - self.instance().update_filterable_attributes(['_class', 'tags', 'source_site']) - self.instance().update_settings({'displayedAttributes': ['_id', '_class', 'id', 'title', 'tags']}) - - @classmethod - def get_stats(self): - return self.instance().get_stats() - - @classmethod - def update_model_indexable(self, model): - if settings.MEILISEARCH_SERVER is None: - return - self.class_map[model.__name__] = model - model.indexable_fields = ['tags'] - model.indexable_fields_time = [] - model.indexable_fields_dict = [] - model.indexable_fields_float = [] - for field in model._meta.get_fields(): - type = field.get_internal_type() - if type in INDEXABLE_DIRECT_TYPES: - model.indexable_fields.append(field.name) - elif type in INDEXABLE_TIME_TYPES: - model.indexable_fields_time.append(field.name) - elif type in INDEXABLE_DICT_TYPES: - model.indexable_fields_dict.append(field.name) - elif type in INDEXABLE_FLOAT_TYPES: - model.indexable_fields_float.append(field.name) - post_save.connect(item_post_save_handler, sender=model) - post_delete.connect(item_post_delete_handler, sender=model) - - @classmethod - def obj_to_dict(self, obj): - pk = f'{obj.__class__.__name__}-{obj.id}' - item = { - '_id': pk, - '_class': obj.__class__.__name__, - # 'id': obj.id - } - for field in obj.__class__.indexable_fields: - item[field] = getattr(obj, field) - for field in obj.__class__.indexable_fields_time: - item[field] = getattr(obj, field).timestamp() - for field in obj.__class__.indexable_fields_float: - item[field] = float(getattr(obj, field)) if getattr(obj, field) else None - for field in obj.__class__.indexable_fields_dict: - d = getattr(obj, field) - if d.__class__ is dict: - item.update(d) - item = {k: v for k, v in item.items() if v} - return item - - @classmethod - def replace_item(self, obj): - try: - self.instance().add_documents([self.obj_to_dict(obj)]) - except Exception as e: - logger.error(f"replace item error: \n{e}") - - @classmethod - def delete_item(self, obj): - pk = f'{obj.__class__.__name__}-{obj.id}' - try: - self.instance().delete_document(pk) - except Exception as e: - logger.error(f"delete item error: \n{e}") - - @classmethod - def patch_item(self, obj, fields): - pk = f'{obj.__class__.__name__}-{obj.id}' - data = {} - for f in fields: - data[f] = getattr(obj, f) - try: - self.instance().update_documents(documents=[data], primary_key=[pk]) - except Exception as e: - logger.error(f"patch item error: \n{e}") - - @classmethod - def search(self, q, page=1, category=None, tag=None, sort=None): - if category or tag: - f = [] - if category == 'music': - f.append("(_class = 'Album' OR _class = 'Song')") - elif category: - f.append(f"_class = '{category}'") - if tag: - f.append(f"tags = '{tag}'") - filter = ' AND '.join(f) - else: - filter = None - options = { - 'offset': (page - 1) * SEARCH_PAGE_SIZE, - 'limit': SEARCH_PAGE_SIZE, - 'filter': filter, - 'facetsDistribution': ['_class'], - 'sort': None - } - r = self.instance().search(q, options) - # print(r) - import types - results = types.SimpleNamespace() - results.items = list([x for x in map(lambda i: self.item_to_obj(i), r['hits']) if x is not None]) - results.num_pages = (r['nbHits'] + SEARCH_PAGE_SIZE - 1) // SEARCH_PAGE_SIZE - # print(results) - return results - - @classmethod - def item_to_obj(self, item): - try: - return self.class_map[item['_class']].objects.get(id=item['id']) - except Exception as e: - logger.error(f"unable to load search result item from db:\n{item}") - return None +if settings.SEARCH_BACKEND == 'MEILISEARCH': + from .search.meilisearch import Indexer +elif settings.SEARCH_BACKEND == 'TYPESENSE': + from .search.typesense import Indexer diff --git a/common/management/commands/index_stats.py b/common/management/commands/index_stats.py index 50976b58..28a9f07e 100644 --- a/common/management/commands/index_stats.py +++ b/common/management/commands/index_stats.py @@ -1,5 +1,5 @@ from django.core.management.base import BaseCommand -from common.index import Indexer, INDEX_NAME +from common.index import Indexer from django.conf import settings from movies.models import Movie from books.models import Book @@ -12,14 +12,11 @@ from datetime import timedelta from django.utils import timezone -BATCH_SIZE = 10000 - - class Command(BaseCommand): help = 'Check search index' def handle(self, *args, **options): - print(f'Connecting to search server {settings.MEILISEARCH_SERVER} for index: {INDEX_NAME}') + print(f'Connecting to search server') stats = Indexer.get_stats() print(stats) st = Indexer.instance().get_all_update_status() diff --git a/common/management/commands/init_index.py b/common/management/commands/init_index.py index 64378af3..bede5635 100644 --- a/common/management/commands/init_index.py +++ b/common/management/commands/init_index.py @@ -1,5 +1,5 @@ from django.core.management.base import BaseCommand -from common.index import Indexer, INDEX_NAME +from common.index import Indexer from django.conf import settings @@ -7,7 +7,7 @@ class Command(BaseCommand): help = 'Initialize the search index' def handle(self, *args, **options): - print(f'Connecting to search server {settings.MEILISEARCH_SERVER} for index: {INDEX_NAME}') + print(f'Connecting to search server') try: Indexer.init() self.stdout.write(self.style.SUCCESS('Index created.')) diff --git a/common/management/commands/reindex.py b/common/management/commands/reindex.py index f950ca31..125aac01 100644 --- a/common/management/commands/reindex.py +++ b/common/management/commands/reindex.py @@ -1,5 +1,5 @@ from django.core.management.base import BaseCommand -from common.index import Indexer, INDEX_NAME +from common.index import Indexer from django.conf import settings from movies.models import Movie from books.models import Book @@ -23,8 +23,8 @@ class Command(BaseCommand): def handle(self, *args, **options): h = int(options['hours']) - print(f'Connecting to search server {settings.MEILISEARCH_SERVER} for index: {INDEX_NAME}') - if Indexer.get_stats()['isIndexing']: + print(f'Connecting to search server') + if Indexer.busy(): print('Please wait for previous updates') # Indexer.update_settings() # self.stdout.write(self.style.SUCCESS('Index settings updated.')) @@ -35,7 +35,6 @@ class Command(BaseCommand): for p in tqdm(pg.page_range): items = list(map(lambda o: Indexer.obj_to_dict(o), pg.get_page(p).object_list)) if items: - Indexer.instance().update_documents(documents=items) - while Indexer.get_stats()['isIndexing']: + Indexer.replace_batch(items) + while Indexer.busy(): sleep(0.5) - diff --git a/common/search/meilisearch.py b/common/search/meilisearch.py new file mode 100644 index 00000000..3506eb8e --- /dev/null +++ b/common/search/meilisearch.py @@ -0,0 +1,177 @@ +import logging +import meilisearch +from django.conf import settings +from django.db.models.signals import post_save, post_delete + + +INDEX_NAME = 'items' +SEARCHABLE_ATTRIBUTES = ['title', 'orig_title', 'other_title', 'subtitle', 'artist', 'author', 'translator', 'developer', 'director', 'actor', 'playwright', 'pub_house', 'company', 'publisher', 'isbn', 'imdb_code'] +INDEXABLE_DIRECT_TYPES = ['BigAutoField', 'BooleanField', 'CharField', 'PositiveIntegerField', 'PositiveSmallIntegerField', 'TextField', 'ArrayField'] +INDEXABLE_TIME_TYPES = ['DateTimeField'] +INDEXABLE_DICT_TYPES = ['JSONField'] +INDEXABLE_FLOAT_TYPES = ['DecimalField'] +# NONINDEXABLE_TYPES = ['ForeignKey', 'FileField',] +SEARCH_PAGE_SIZE = 20 + + +logger = logging.getLogger(__name__) + + +def item_post_save_handler(sender, instance, created, **kwargs): + if not created and settings.SEARCH_INDEX_NEW_ONLY: + return + Indexer.replace_item(instance) + + +def item_post_delete_handler(sender, instance, **kwargs): + Indexer.delete_item(instance) + + +def tag_post_save_handler(sender, instance, **kwargs): + pass + + +def tag_post_delete_handler(sender, instance, **kwargs): + pass + + +class Indexer: + class_map = {} + _instance = None + + @classmethod + def instance(self): + if self._instance is None: + self._instance = meilisearch.Client(settings.MEILISEARCH_SERVER, settings.MEILISEARCH_KEY).index(INDEX_NAME) + return self._instance + + @classmethod + def init(self): + meilisearch.Client(settings.MEILISEARCH_SERVER, settings.MEILISEARCH_KEY).create_index(INDEX_NAME, {'primaryKey': '_id'}) + self.update_settings() + + @classmethod + def update_settings(self): + self.instance().update_searchable_attributes(SEARCHABLE_ATTRIBUTES) + self.instance().update_filterable_attributes(['_class', 'tags', 'source_site']) + self.instance().update_settings({'displayedAttributes': ['_id', '_class', 'id', 'title', 'tags']}) + + @classmethod + def get_stats(self): + return self.instance().get_stats() + + @classmethod + def busy(self): + return self.instance().get_stats()['isIndexing'] + + @classmethod + def update_model_indexable(self, model): + if settings.SEARCH_BACKEND is None: + return + self.class_map[model.__name__] = model + model.indexable_fields = ['tags'] + model.indexable_fields_time = [] + model.indexable_fields_dict = [] + model.indexable_fields_float = [] + for field in model._meta.get_fields(): + type = field.get_internal_type() + if type in INDEXABLE_DIRECT_TYPES: + model.indexable_fields.append(field.name) + elif type in INDEXABLE_TIME_TYPES: + model.indexable_fields_time.append(field.name) + elif type in INDEXABLE_DICT_TYPES: + model.indexable_fields_dict.append(field.name) + elif type in INDEXABLE_FLOAT_TYPES: + model.indexable_fields_float.append(field.name) + post_save.connect(item_post_save_handler, sender=model) + post_delete.connect(item_post_delete_handler, sender=model) + + @classmethod + def obj_to_dict(self, obj): + pk = f'{obj.__class__.__name__}-{obj.id}' + item = { + '_id': pk, + '_class': obj.__class__.__name__, + # 'id': obj.id + } + for field in obj.__class__.indexable_fields: + item[field] = getattr(obj, field) + for field in obj.__class__.indexable_fields_time: + item[field] = getattr(obj, field).timestamp() + for field in obj.__class__.indexable_fields_float: + item[field] = float(getattr(obj, field)) if getattr(obj, field) else None + for field in obj.__class__.indexable_fields_dict: + d = getattr(obj, field) + if d.__class__ is dict: + item.update(d) + item = {k: v for k, v in item.items() if v} + return item + + @classmethod + def replace_item(self, obj): + try: + self.instance().add_documents([self.obj_to_dict(obj)]) + except Exception as e: + logger.error(f"replace item error: \n{e}") + + def replace_item(self, objects): + try: + self.instance().update_documents(documents=objects) + except Exception as e: + logger.error(f"replace batch error: \n{e}") + + @classmethod + def delete_item(self, obj): + pk = f'{obj.__class__.__name__}-{obj.id}' + try: + self.instance().delete_document(pk) + except Exception as e: + logger.error(f"delete item error: \n{e}") + + @classmethod + def patch_item(self, obj, fields): + pk = f'{obj.__class__.__name__}-{obj.id}' + data = {} + for f in fields: + data[f] = getattr(obj, f) + try: + self.instance().update_documents(documents=[data], primary_key=[pk]) + except Exception as e: + logger.error(f"patch item error: \n{e}") + + @classmethod + def search(self, q, page=1, category=None, tag=None, sort=None): + if category or tag: + f = [] + if category == 'music': + f.append("(_class = 'Album' OR _class = 'Song')") + elif category: + f.append(f"_class = '{category}'") + if tag: + f.append(f"tags = '{tag}'") + filter = ' AND '.join(f) + else: + filter = None + options = { + 'offset': (page - 1) * SEARCH_PAGE_SIZE, + 'limit': SEARCH_PAGE_SIZE, + 'filter': filter, + 'facetsDistribution': ['_class'], + 'sort': None + } + r = self.instance().search(q, options) + # print(r) + import types + results = types.SimpleNamespace() + results.items = list([x for x in map(lambda i: self.item_to_obj(i), r['hits']) if x is not None]) + results.num_pages = (r['nbHits'] + SEARCH_PAGE_SIZE - 1) // SEARCH_PAGE_SIZE + # print(results) + return results + + @classmethod + def item_to_obj(self, item): + try: + return self.class_map[item['_class']].objects.get(id=item['id']) + except Exception as e: + logger.error(f"unable to load search result item from db:\n{item}") + return None diff --git a/common/search/typesense.py b/common/search/typesense.py new file mode 100644 index 00000000..263e55cc --- /dev/null +++ b/common/search/typesense.py @@ -0,0 +1,182 @@ +import logging +import typesense +from django.conf import settings +from django.db.models.signals import post_save, post_delete + + +INDEX_NAME = 'items' +SEARCHABLE_ATTRIBUTES = ['title', 'orig_title', 'other_title', 'subtitle', 'artist', 'author', 'translator', 'developer', 'director', 'actor', 'playwright', 'pub_house', 'company', 'publisher', 'isbn', 'imdb_code'] +FILTERABLE_ATTRIBUTES = ['_class', 'tags', 'source_site'] +INDEXABLE_DIRECT_TYPES = ['BigAutoField', 'BooleanField', 'CharField', 'PositiveIntegerField', 'PositiveSmallIntegerField', 'TextField', 'ArrayField'] +INDEXABLE_TIME_TYPES = ['DateTimeField'] +INDEXABLE_DICT_TYPES = ['JSONField'] +INDEXABLE_FLOAT_TYPES = ['DecimalField'] +SORTING_ATTRIBUTE = None +# NONINDEXABLE_TYPES = ['ForeignKey', 'FileField',] +SEARCH_PAGE_SIZE = 20 + + +logger = logging.getLogger(__name__) + + +def item_post_save_handler(sender, instance, created, **kwargs): + if not created and settings.SEARCH_INDEX_NEW_ONLY: + return + Indexer.replace_item(instance) + + +def item_post_delete_handler(sender, instance, **kwargs): + Indexer.delete_item(instance) + + +def tag_post_save_handler(sender, instance, **kwargs): + pass + + +def tag_post_delete_handler(sender, instance, **kwargs): + pass + + +class Indexer: + class_map = {} + _instance = None + + @classmethod + def instance(self): + if self._instance is None: + self._instance = typesense.Client(settings.TYPESENSE_CONNECTION) + return self._instance + + @classmethod + def init(self): + # self.instance().collections[INDEX_NAME].delete() + fields = [ + {"name": "_class", "type": "string", "facet": True}, + {"name": "source_site", "type": "string", "facet": True}, + {"name": "tags", "type": "string[]", "locale": "zh", "facet": True}, + {"name": ".*", "type": "auto", "locale": "zh"}, + ] + self.instance().collections.create({ + "name": INDEX_NAME, + "fields": fields + }) + + @classmethod + def update_settings(self): + # https://github.com/typesense/typesense/issues/96 + print('not supported by typesense yet') + pass + + @classmethod + def get_stats(self): + return self.instance().collections[INDEX_NAME].retrieve() + + @classmethod + def busy(self): + return False + + @classmethod + def update_model_indexable(self, model): + if settings.SEARCH_BACKEND is None: + return + self.class_map[model.__name__] = model + model.indexable_fields = ['tags'] + model.indexable_fields_time = [] + model.indexable_fields_dict = [] + model.indexable_fields_float = [] + for field in model._meta.get_fields(): + type = field.get_internal_type() + if type in INDEXABLE_DIRECT_TYPES: + model.indexable_fields.append(field.name) + elif type in INDEXABLE_TIME_TYPES: + model.indexable_fields_time.append(field.name) + elif type in INDEXABLE_DICT_TYPES: + model.indexable_fields_dict.append(field.name) + elif type in INDEXABLE_FLOAT_TYPES: + model.indexable_fields_float.append(field.name) + post_save.connect(item_post_save_handler, sender=model) + post_delete.connect(item_post_delete_handler, sender=model) + + @classmethod + def obj_to_dict(self, obj): + pk = f'{obj.__class__.__name__}-{obj.id}' + item = { + '_class': obj.__class__.__name__, + } + for field in obj.__class__.indexable_fields: + item[field] = getattr(obj, field) + for field in obj.__class__.indexable_fields_time: + item[field] = getattr(obj, field).timestamp() + for field in obj.__class__.indexable_fields_float: + item[field] = float(getattr(obj, field)) if getattr(obj, field) else None + for field in obj.__class__.indexable_fields_dict: + d = getattr(obj, field) + if d.__class__ is dict: + item.update(d) + item = {k: v for k, v in item.items() if v and (k in SEARCHABLE_ATTRIBUTES or k in FILTERABLE_ATTRIBUTES or k == 'id')} + item['_id'] = item['id'] + item['id'] = pk # typesense requires primary key to be named 'id', type string + return item + + @classmethod + def replace_item(self, obj): + try: + self.instance().collections[INDEX_NAME].documents.upsert(self.obj_to_dict(obj), { + 'dirty_values': 'coerce_or_drop' + }) + except Exception as e: + logger.error(f"replace item error: \n{e}") + + @classmethod + def replace_batch(self, objects): + try: + self.instance().collections[INDEX_NAME].documents.import_(objects, {'action': 'upsert'}) + except Exception as e: + logger.error(f"replace batch error: \n{e}") + + @classmethod + def delete_item(self, obj): + pk = f'{obj.__class__.__name__}-{obj.id}' + try: + self.instance().collections[INDEX_NAME].documents[pk].delete() + except Exception as e: + logger.error(f"delete item error: \n{e}") + + @classmethod + def search(self, q, page=1, category=None, tag=None, sort=None): + f = [] + if category == 'music': + f.append('_class: [Album, Song]') + elif category: + f.append('_class: ' + category) + else: + f.append('') + if tag: + f.append(f"tags: '{tag}'") + filter = ' && '.join(f) + options = { + 'q': q, + 'page': page, + 'per_page': SEARCH_PAGE_SIZE, + 'query_by': ','.join(SEARCHABLE_ATTRIBUTES), + 'filter_by': filter, + # 'facetsDistribution': ['_class'], + # 'sort_by': None, + } + # print(q) + r = self.instance().collections[INDEX_NAME].documents.search(options) + # print(r) + import types + results = types.SimpleNamespace() + results.items = list([x for x in map(lambda i: self.item_to_obj(i['document']), r['hits']) if x is not None]) + results.num_pages = (r['found'] + SEARCH_PAGE_SIZE - 1) // SEARCH_PAGE_SIZE + # print(results) + return results + + @classmethod + def item_to_obj(self, item): + try: + return self.class_map[item['_class']].objects.get(id=item['_id']) + except Exception as e: + logger.error(f"unable to load search result item from db:\n{item}") + return None diff --git a/common/views.py b/common/views.py index 93ce685d..9abc9a93 100644 --- a/common/views.py +++ b/common/views.py @@ -38,7 +38,7 @@ def home(request): @login_required def search(request): - if settings.MEILISEARCH_SERVER is None: + if settings.SEARCH_BACKEND is None: return search2(request) category = request.GET.get("c", default='all').strip().lower() if category == 'all':