Skip to content
Snippets Groups Projects
Select Git revision
  • 0dfde9df1592a0416b9d41dfb49eef76875cd4c2
  • master default protected
2 results

README.md

Blame
  • database.py 1.28 KiB
    # app/services/database.py
    import asyncpg
    from app.config import DSN, logger
    
    class PSQLDatabase:
        pool = None
    
        @classmethod
        async def get_pool(cls):
            if cls.pool is None:
                cls.pool = await asyncpg.create_pool(dsn=DSN)
            return cls.pool
    
        @classmethod
        async def close_pool(cls):
            if cls.pool is not None:
                await cls.pool.close()
                cls.pool = None
    
    async def ensure_custom_id_index_on_embedding():
        table_name = "langchain_pg_embedding"
        column_name = "custom_id"
        # You might want to standardize the index naming convention
        index_name = f"idx_{table_name}_{column_name}"
    
        pool = await PSQLDatabase.get_pool()
        async with pool.acquire() as conn:
            await conn.execute(f"""
                    CREATE INDEX IF NOT EXISTS {index_name} ON {table_name} ({column_name});
                """)
            logger.debug(f"Checking if index '{index_name}' on '{table_name}({column_name}) exists, if not found then the index is created.'")
    
    
    async def pg_health_check() -> bool:
        try:
            pool = await PSQLDatabase.get_pool()
            async with pool.acquire() as conn:
                await conn.fetchval("SELECT 1")
            return True
        except Exception as e:
            logger.error(f"Health check failed: {e}")
            return False