Skip to content
Snippets Groups Projects
Select Git revision
  • 91092be410aa0a09b90e5491eb4cf6413d5875da
  • main default protected
  • v0.5.1
  • v0.5
  • v0.4.3.1
  • v0.4.2
  • v0.4.1
  • v0.4
  • v0.3
9 results

business_process.py

Blame
  • Bryan Brancotte's avatar
    Bryan BRANCOTTE authored
    allows to disable auto indexing, provide a basic in-memory storage for get_setting_from_storage/set_setting_in_storage
    91092be4
    History
    business_process.py 5.21 KiB
    import re
    from typing import Optional, List
    
    from django.contrib.postgres.lookups import Unaccent
    from django.contrib.postgres.search import TrigramSimilarity
    from django.db import connection
    from django.db.models import Exists, OuterRef, Case, When, Value, F
    from django.db.transaction import atomic
    
    from autocomplete_multi_models import utils, models
    
    _pattern = re.compile("[^\\w\\d]")
    
    _AUTOCOMPLETE_MIN_LENGTH = utils.DEFAULT_AUTOCOMPLETE_MIN_LENGTH
    _AUTOCOMPLETE_MIN_SIMILARITY = utils.DEFAULT_AUTOCOMPLETE_MIN_SIMILARITY
    _AUTOCOMPLETE_LIMIT = utils.DEFAULT_AUTOCOMPLETE_LIMIT
    _CAN_BE_INDEXED_BY_AUTOCOMPLETE_FUNCTION_NAME = utils.DEFAULT_CAN_BE_INDEXED_BY_AUTOCOMPLETE_FUNCTION_NAME
    __in_mem_storage = {
        utils.AUTO_UPDATE_ENABLED: True,
        utils.REBUILD_NEEDED: True,
    }
    
    
    def get_setting_from_storage(key, default):
        return __in_mem_storage.get(key, default)
    
    
    def set_setting_in_storage(key, value):
        __in_mem_storage[key] = value
    
    
    def split_string(value):
        return _pattern.split(value)
    
    
    @atomic
    def rebuild_index():
        """
        Rebuild the whole index, faster way of updating the index when a large amount of instances has been changed.
        :return:
        """
        models.IndexedWord.objects.all().delete()
        objects = dict()
        with connection.cursor() as cursor:
            for model, field_names in utils.get_indexed_fields().items():
                for instance in model.objects.only(*field_names):
                    _add_instance_to_index(instance, field_names, objects, cursor)
        models.IndexedWord.objects.bulk_create(objects.values())
    
    
    # def clean_duplicate():
    #     models.IndexedWord.objects.annotate(
    #         is_duplicate=Exists(
    #             models.IndexedWord.objects.filter(
    #                 word__iexact=OuterRef('word'),
    #                 pk__gt=OuterRef('pk'),
    #             )
    #         )
    #     ).filter(is_duplicate=True).delete()
    
    
    def add_instance_to_index(instance, field_names: List[str]):
        """
        index all word from the specified field of the instance, and then update the index.
    
        Warning: Should only be used for few instance as index update is slow, for large insertion, prefer rebuild_index
        :param instance: the instance to study
        :param field_names: fields to consider
        """
        objects = dict()
        with connection.cursor() as cursor:
            _add_instance_to_index(instance, field_names, objects, cursor)
        _update_in_index(objects)
    
    
    def _add_instance_to_index(instance, field_names: List[str], objects: list, cursor):
        f = getattr(instance, _CAN_BE_INDEXED_BY_AUTOCOMPLETE_FUNCTION_NAME, None)
        if f and not f():
            return
        for field_name in field_names:
            _add_text_to_index(getattr(instance, field_name), objects, cursor)
    
    
    def add_text_to_index(value: str):
        """
        index all word from the string, and then update the index.
    
        Warning: Should only be used for few instance as index update is slow, for large insertion, prefer rebuild_index
    
        :param value: the string to tokenize and add to the index
        """
        objects = dict()
        with connection.cursor() as cursor:
            _add_text_to_index(value, objects, cursor)
        _update_in_index(objects)
    
    
    def _update_in_index(objects: dict):
        objects_to_create = []
        for ac_word_upper, o in objects.items():
            changed = (
                models.IndexedWord.objects.annotate(ac_word=Unaccent('word'))
                .filter(ac_word__iexact=ac_word_upper)
                .update(occurrence=F('occurrence') + Value(o.occurrence))
            )
            if changed == 0:
                objects_to_create.append(o)
        models.IndexedWord.objects.bulk_create(objects_to_create)
    
    
    def _add_text_to_index(value: str, objects: list, cursor):
        if value is None or value == '':
            return
        for word in split_string(value):
            len_word = len(word)
            if len_word < _AUTOCOMPLETE_MIN_LENGTH or word.isdecimal() or len_word > 64:
                continue
            cursor.execute("SELECT UPPER(UNACCENT(%s)) as value", [word])
            ac_word = cursor.fetchone()[0]
            try:
                objects[ac_word].occurrence += 1
            except KeyError:
                objects[ac_word] = models.IndexedWord(word=word, occurrence=1)
    
    
    def get_closest_matching_words(word: str, limit: Optional[int] = None, min_similarity: Optional[float] = None):
        # remove accent from the searched word with postgres
        with connection.cursor() as cursor:
            cursor.execute("SELECT UNACCENT(%s) as value", [word])
            word = cursor.fetchone()[0]
        if limit is None:
            limit = _AUTOCOMPLETE_LIMIT
        if min_similarity is None:
            min_similarity = _AUTOCOMPLETE_MIN_SIMILARITY
        qs = models.IndexedWord.objects
        # search on un-accented word
        qs = qs.annotate(ac_word=Unaccent('word'))
        # get the trigram similarity
        qs = qs.annotate(ac_word_s_tri=TrigramSimilarity('ac_word', word))
        # test if the word start with the searched word, if so give a bonus
        qs = qs.annotate(ac_word_bonus=Case(When(ac_word__startswith=word, then=Value(1.0)), default=Value(0.0)))
        # sum similarity and bonus
        qs = qs.annotate(similarity=F('ac_word_s_tri') + F('ac_word_bonus'))
        # filter by min similarity and order it
        qs = qs.filter(similarity__gt=min_similarity).order_by('-similarity')
        if limit < 0:  # allows to have all results
            return qs
        return qs[:limit]