hadith-api/app/services/database.py

129 lines
4.1 KiB
Python

"""
Database connection manager — initializes and provides access to
PostgreSQL, Neo4j, Qdrant, and Elasticsearch clients.
"""
import psycopg2
import psycopg2.pool
import psycopg2.extras
from neo4j import GraphDatabase
from qdrant_client import QdrantClient
from elasticsearch import Elasticsearch
import httpx
from app.config import get_settings
class Database:
"""Singleton holding all DB connections."""
def __init__(self):
self.pg_pool = None
self.neo4j_driver = None
self.qdrant = None
self.es = None
self.http_client = None # for TEI embeddings
async def connect(self):
settings = get_settings()
# PostgreSQL connection pool
try:
self.pg_pool = psycopg2.pool.ThreadedConnectionPool(
minconn=2,
maxconn=10,
host=settings.pg_host,
port=settings.pg_port,
dbname=settings.pg_dbname,
user=settings.pg_user,
password=settings.pg_password,
sslmode=settings.pg_sslmode,
)
print(f"✅ PostgreSQL pool created ({settings.pg_host})")
except Exception as e:
print(f"⚠️ PostgreSQL failed: {e}")
# Neo4j
try:
self.neo4j_driver = GraphDatabase.driver(
settings.neo4j_uri,
auth=(settings.neo4j_user, settings.neo4j_password),
)
self.neo4j_driver.verify_connectivity()
print(f"✅ Neo4j connected ({settings.neo4j_uri})")
except Exception as e:
print(f"⚠️ Neo4j failed: {e}")
# Qdrant
try:
self.qdrant = QdrantClient(
url=f"https://{settings.qdrant_host}",
timeout=5,
)
collections = self.qdrant.get_collections()
print(f"✅ Qdrant connected ({settings.qdrant_host}, {len(collections.collections)} collections)")
except Exception as e:
print(f"⚠️ Qdrant unavailable: {e}")
self.qdrant = None
# Elasticsearch
try:
self.es = Elasticsearch(settings.es_host, request_timeout=5)
if self.es.ping():
print(f"✅ Elasticsearch connected ({settings.es_host})")
else:
print(f"⚠️ Elasticsearch ping failed ({settings.es_host})")
self.es = None
except Exception as e:
print(f"⚠️ Elasticsearch unavailable: {e}")
self.es = None
# HTTP client for TEI embedding requests
self.http_client = httpx.AsyncClient(timeout=30.0)
print(f"✅ HTTP client ready (TEI: {settings.tei_url})")
async def disconnect(self):
if self.pg_pool:
self.pg_pool.closeall()
if self.neo4j_driver:
self.neo4j_driver.close()
if self.http_client:
await self.http_client.aclose()
print("🔌 All connections closed")
# ── PostgreSQL helpers ──
def get_pg(self):
conn = self.pg_pool.getconn()
try:
yield conn
finally:
self.pg_pool.putconn(conn)
def pg_query(self, query: str, params: tuple = None) -> list[dict]:
conn = self.pg_pool.getconn()
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(query, params)
return [dict(row) for row in cur.fetchall()]
finally:
self.pg_pool.putconn(conn)
def pg_query_one(self, query: str, params: tuple = None) -> dict | None:
rows = self.pg_query(query, params)
return rows[0] if rows else None
# ── Neo4j helpers ──
def neo4j_query(self, query: str, params: dict = None) -> list[dict]:
with self.neo4j_driver.session() as session:
result = session.run(query, params or {})
return [dict(record) for record in result]
def neo4j_query_one(self, query: str, params: dict = None) -> dict | None:
rows = self.neo4j_query(query, params)
return rows[0] if rows else None
# Global instance
db = Database()