""" Database connection manager — initializes and provides access to PostgreSQL, Neo4j, Qdrant, and Elasticsearch clients. """ import psycopg2 import psycopg2.pool import psycopg2.extras from neo4j import GraphDatabase from qdrant_client import QdrantClient from elasticsearch import Elasticsearch import httpx from app.config import get_settings class Database: """Singleton holding all DB connections.""" def __init__(self): self.pg_pool = None self.neo4j_driver = None self.qdrant = None self.es = None self.http_client = None # for TEI embeddings async def connect(self): settings = get_settings() # PostgreSQL connection pool try: self.pg_pool = psycopg2.pool.ThreadedConnectionPool( minconn=2, maxconn=10, host=settings.pg_host, port=settings.pg_port, dbname=settings.pg_dbname, user=settings.pg_user, password=settings.pg_password, sslmode=settings.pg_sslmode, ) print(f"✅ PostgreSQL pool created ({settings.pg_host})") except Exception as e: print(f"⚠️ PostgreSQL failed: {e}") # Neo4j try: self.neo4j_driver = GraphDatabase.driver( settings.neo4j_uri, auth=(settings.neo4j_user, settings.neo4j_password), ) self.neo4j_driver.verify_connectivity() print(f"✅ Neo4j connected ({settings.neo4j_uri})") except Exception as e: print(f"⚠️ Neo4j failed: {e}") # Qdrant try: self.qdrant = QdrantClient( url=f"https://{settings.qdrant_host}", timeout=5, ) collections = self.qdrant.get_collections() print(f"✅ Qdrant connected ({settings.qdrant_host}, {len(collections.collections)} collections)") except Exception as e: print(f"⚠️ Qdrant unavailable: {e}") self.qdrant = None # Elasticsearch try: self.es = Elasticsearch(settings.es_host, request_timeout=5) if self.es.ping(): print(f"✅ Elasticsearch connected ({settings.es_host})") else: print(f"⚠️ Elasticsearch ping failed ({settings.es_host})") self.es = None except Exception as e: print(f"⚠️ Elasticsearch unavailable: {e}") self.es = None # HTTP client for TEI embedding requests self.http_client = httpx.AsyncClient(timeout=30.0) print(f"✅ HTTP client ready (TEI: {settings.tei_url})") async def disconnect(self): if self.pg_pool: self.pg_pool.closeall() if self.neo4j_driver: self.neo4j_driver.close() if self.http_client: await self.http_client.aclose() print("🔌 All connections closed") # ── PostgreSQL helpers ── def get_pg(self): conn = self.pg_pool.getconn() try: yield conn finally: self.pg_pool.putconn(conn) def pg_query(self, query: str, params: tuple = None) -> list[dict]: conn = self.pg_pool.getconn() try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(query, params) return [dict(row) for row in cur.fetchall()] finally: self.pg_pool.putconn(conn) def pg_query_one(self, query: str, params: tuple = None) -> dict | None: rows = self.pg_query(query, params) return rows[0] if rows else None # ── Neo4j helpers ── def neo4j_query(self, query: str, params: dict = None) -> list[dict]: with self.neo4j_driver.session() as session: result = session.run(query, params or {}) return [dict(record) for record in result] def neo4j_query_one(self, query: str, params: dict = None) -> dict | None: rows = self.neo4j_query(query, params) return rows[0] if rows else None # Global instance db = Database()