Skip to content
Snippets Groups Projects
Commit 6c6f3f0a authored by Bryan BRANCOTTE's avatar Bryan BRANCOTTE
Browse files

import sources

parent d3b9db9e
Branches
Tags
No related merge requests found
Showing
with 375 additions and 0 deletions
from django.contrib import admin
# Register your models here.
from autocomplete_multi_models import models
@admin.register(models.IndexedWord)
class IndexedWordAdmin(admin.ModelAdmin):
list_display = ("word",)
search_fields = ("word",)
from functools import partial
from django.apps import AppConfig
from django.db.models import signals
from autocomplete_multi_models import utils
class AutocompleteMultiModelsConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'autocomplete_multi_models'
__indexed_fields = None
def ready(self):
from . import signals as my_signals
utils.init_from_settings()
for model, field_names in utils.get_indexed_fields().items():
sender = model
signals.post_save.connect(partial(my_signals.instance_update, field_names=field_names), sender=sender)
signals.pre_delete.connect(partial(my_signals.instance_delete, field_names=field_names), sender=sender)
import re
from typing import Optional, List
from django.contrib.postgres.lookups import Unaccent
from django.contrib.postgres.search import TrigramSimilarity
from django.db import connection
from django.db.models import Exists, OuterRef, Case, When, Value, F
from django.db.transaction import atomic
from autocomplete_multi_models import utils, models
_pattern = re.compile("[^\\w\\d]")
_AUTOCOMPLETE_MIN_LENGTH = utils.DEFAULT_AUTOCOMPLETE_MIN_LENGTH
_AUTOCOMPLETE_MIN_SIMILARITY = utils.DEFAULT_AUTOCOMPLETE_MIN_SIMILARITY
_AUTOCOMPLETE_LIMIT = utils.DEFAULT_AUTOCOMPLETE_LIMIT
def get_setting_from_storage(key, default):
return key == utils.REBUILD_NEEDED
def set_setting_in_storage(key, value):
pass
def split_string(value):
return _pattern.split(value)
@atomic
def rebuild_index():
models.IndexedWord.objects.all().delete()
for model, field_names in utils.get_indexed_fields().items():
for instance in model.objects.only(*field_names):
add_instance_to_index(instance, field_names)
clean_duplicate()
def clean_duplicate():
models.IndexedWord.objects.annotate(
is_duplicate=Exists(
models.IndexedWord.objects.filter(
word__iexact=OuterRef('word'),
pk__gt=OuterRef('pk'),
)
)
).filter(is_duplicate=True).delete()
def add_instance_to_index(instance, field_names: List[str]):
for field_name in field_names:
add_text_to_index(getattr(instance, field_name))
def add_text_to_index(value: str):
if value is None or value == '':
return
objects = []
for word in split_string(value):
len_word = len(word)
if len_word < _AUTOCOMPLETE_MIN_LENGTH or word.isdecimal() or len_word > 64:
continue
objects.append(models.IndexedWord(word=word))
models.IndexedWord.objects.bulk_create(objects)
def get_closest_matching_words(word: str, limit: Optional[int] = None, min_similarity: Optional[float] = None):
# remove accent from the searched word with postgres
with connection.cursor() as cursor:
cursor.execute("SELECT UNACCENT(%s) as value", [word])
word = cursor.fetchone()[0]
if limit is None:
limit = _AUTOCOMPLETE_LIMIT
if min_similarity is None:
min_similarity = _AUTOCOMPLETE_MIN_SIMILARITY
qs = models.IndexedWord.objects
# search on un-accented word
qs = qs.annotate(ac_word=Unaccent('word'))
# get the trigram similarity
qs = qs.annotate(ac_word_s_tri=TrigramSimilarity('ac_word', word))
# test if the word start with the searched word, if so give a bonus
qs = qs.annotate(ac_word_bonus=Case(When(ac_word__startswith=word, then=Value(1.0)), default=Value(0.0)))
# sum similarity and bonus
qs = qs.annotate(similarity=F('ac_word_s_tri') + F('ac_word_bonus'))
# filter by min similarity and order it
qs = qs.filter(similarity__gt=min_similarity).order_by('-similarity')
if limit < 0: # allows to have all results
return qs
return qs[:limit]
```
{
autocomplete(q: "Homer") {
edges {
node {
word,
similarity
}
}
}
}
```
returns
```json
{
"data": {
"autocomplete": {
"edges": [
{
"node": {
"word": "home",
"similarity": 0.5714286
}
},
{
"node": {
"word": "homodimer",
"similarity": 0.45454547
}
},
{
"node": {
"word": "homo",
"similarity": 0.375
}
},
{
"node": {
"word": "homemade",
"similarity": 0.36363637
}
},
{
"node": {
"word": "HMMER",
"similarity": 0.33333334
}
},
{
"node": {
"word": "homme",
"similarity": 0.33333334
}
},
{
"node": {
"word": "homeostatic",
"similarity": 0.2857143
}
},
{
"node": {
"word": "homeodomain",
"similarity": 0.2857143
}
},
{
"node": {
"word": "homeostasis",
"similarity": 0.2857143
}
},
{
"node": {
"word": "monomer",
"similarity": 0.27272728
}
}
]
}
}
}
```
\ No newline at end of file
import django_filters
import graphene
from graphene_django.filter import DjangoFilterConnectionField
from graphene_django.types import DjangoObjectType
import autocomplete_multi_models.business_process
import autocomplete_multi_models.models
class IndexedWordFilter(django_filters.FilterSet):
q = django_filters.CharFilter(method='resolve_q')
class Meta:
model = autocomplete_multi_models.models.IndexedWord
exclude = ('id',)
def resolve_q(self, queryset, name, value, *args, **kwargs):
return autocomplete_multi_models.business_process.get_closest_matching_words(value)
class IndexedWordNode(DjangoObjectType):
similarity = graphene.Field(graphene.Float)
class Meta:
model = autocomplete_multi_models.models.IndexedWord
interfaces = (graphene.Node,)
class FindClosestWordsQuery(graphene.ObjectType):
autocomplete = DjangoFilterConnectionField(IndexedWordNode, filterset_class=IndexedWordFilter)
from rest_framework import serializers
from autocomplete_multi_models import models
class IndexedWordSerializer(serializers.ModelSerializer):
class Meta:
model = models.IndexedWord
fields = [
'word',
'similarity',
]
similarity = serializers.FloatField()
class SearchSerializer(serializers.Serializer):
q = serializers.CharField(
min_length=2,
required=True,
)
limit = serializers.IntegerField(
min_value=1,
default=10,
required=False,
)
min_similarity = serializers.FloatField(
required=False,
default=None,
)
from django.urls import path
from autocomplete_multi_models.expose_with.rest_framework.views import FindClosestWords
urlpatterns = [
path('autocomplete/', FindClosestWords.as_view()),
]
from rest_framework import views, response, status
from autocomplete_multi_models import business_process
from autocomplete_multi_models.expose_with.rest_framework import serializers
class FindClosestWords(views.APIView):
def get(self, request, format=None):
return self.do_the_work(request.GET)
def post(self, request, format=None):
return self.do_the_work(request.data)
def do_the_work(self, data):
# {"q":"rna","min_similarity":0.1,"limit":8}
search_serializer = serializers.SearchSerializer(data=data)
if not search_serializer.is_valid():
return response.Response(search_serializer.errors, status=status.HTTP_400_BAD_REQUEST)
results_serializer = serializers.IndexedWordSerializer(
business_process.get_closest_matching_words(
search_serializer.data['q'],
search_serializer.data['limit'],
search_serializer.data['min_similarity'],
),
many=True,
)
return response.Response(results_serializer.data)
import time
from django.core.management import BaseCommand
from django.db import transaction
from autocomplete_multi_models import business_process, models, utils
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('--forced', action='store_true')
@transaction.atomic
def handle(self, *args, **options):
if not (options['forced'] or business_process.get_setting_from_storage(utils.REBUILD_NEEDED, True)):
return
ts = time.time()
business_process.rebuild_index()
te = time.time()
print(f"Index rebuild in {int((te - ts) * 100) / 100}s, it contains {models.IndexedWord.objects.count()} words")
business_process.set_setting_in_storage(utils.REBUILD_NEEDED, False)
# Generated by Django 3.2.9 on 2022-02-11 11:47
from django.contrib.postgres.operations import TrigramExtension, BtreeGinExtension, UnaccentExtension
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
TrigramExtension(),
BtreeGinExtension(),
UnaccentExtension(),
migrations.CreateModel(
name='IndexedWord',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('word', models.CharField(db_index=True, max_length=64)),
],
),
]
import django.contrib.postgres.indexes
import django.db.models
# Create your models here.
class IndexedWord(django.db.models.Model):
indexes = [
django.contrib.postgres.indexes.GinIndex(fields=['word']),
]
word = django.db.models.CharField(
max_length=64,
db_index=True,
null=False,
)
def __str__(self):
return self.word
from autocomplete_multi_models import business_process, utils
def instance_update(sender, instance, field_names, **kwargs):
business_process.add_instance_to_index(instance, field_names)
business_process.clean_duplicate()
def instance_delete(sender, instance, field_names, **kwargs):
business_process.set_setting_in_storage(utils.REBUILD_NEEDED, True)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment