From f46297255d4980d3423c1fd058dc91a3a4e09999 Mon Sep 17 00:00:00 2001 From: salahangal Date: Mon, 24 Nov 2025 11:17:57 +0100 Subject: [PATCH] add generate embeddings --- hadith-ingestion/.DS_Store | Bin 8196 -> 8196 bytes .../argo/workflows/generate-embeddings.yaml | 133 +- hadith-ingestion/combined.txt | 3425 +++++++++++++++++ .../scripts/benchmark_embeddings.py | 98 + .../scripts/generate_embeddings.py | 324 ++ hadith-ingestion/scripts/monitor_progress.sh | 29 + hadith-ingestion/scripts/setup_embeddings.sh | 69 + .../scripts/test_semantic_search.py | 93 + hadith-ingestion/scripts/verify_embeddings.sh | 27 + 9 files changed, 4181 insertions(+), 17 deletions(-) create mode 100644 hadith-ingestion/combined.txt create mode 100755 hadith-ingestion/scripts/benchmark_embeddings.py create mode 100755 hadith-ingestion/scripts/generate_embeddings.py create mode 100755 hadith-ingestion/scripts/monitor_progress.sh create mode 100755 hadith-ingestion/scripts/setup_embeddings.sh create mode 100755 hadith-ingestion/scripts/test_semantic_search.py create mode 100755 hadith-ingestion/scripts/verify_embeddings.sh diff --git a/hadith-ingestion/.DS_Store b/hadith-ingestion/.DS_Store index e2dd5723a2749b5158bf789b67ea90baa3845c3d..261810a654acc156333b9454284722542abc46f7 100644 GIT binary patch delta 117 zcmZp1XmOa}&nU4mU^hRb#AY4=eMU_lhGd3(hFpduhD?S$hE#?W20exnh6;uf&z$_^ zq@4UD1_lNJ1_s8Tn{@=&Gb*z)6azIB0TmYjl@+5Y>IN#T+k8RjBFkoWiEk{Mr;D&N F0|1y89Z3KH delta 49 zcmV-10M7q}K!iY$PXQLOP`eKS7PAZxF9EZ25ugFHK@-LVk$`5i2N?DQldTmXvx60c H0+E1YmsAl3 diff --git a/hadith-ingestion/argo/workflows/generate-embeddings.yaml b/hadith-ingestion/argo/workflows/generate-embeddings.yaml index a5a4836..6bf3534 100644 --- a/hadith-ingestion/argo/workflows/generate-embeddings.yaml +++ b/hadith-ingestion/argo/workflows/generate-embeddings.yaml @@ -4,28 +4,127 @@ metadata: generateName: generate-embeddings- namespace: ml spec: - entrypoint: generate + entrypoint: embedding-pipeline serviceAccountName: argo-workflow arguments: parameters: - - name: batch-size - value: "32" + - name: db_password + value: "YOUR_PASSWORD_HERE" # UPDATE THIS templates: - - name: generate + - name: embedding-pipeline + steps: + - - name: generate-embeddings + template: generate-job + + - name: generate-job container: - image: hadith-ingestion:latest - command: [python, /app/src/embeddings/generator.py] - args: ["--batch-size={{workflow.parameters.batch-size}}"] - env: - - name: DATABASE_HOST - value: "pg.betelgeusebytes.io" - - name: DATABASE_PASSWORD - valueFrom: - secretKeyRef: - name: hadith-db-secret - key: password + image: python:3.11-slim + command: [python, -u, -c] + args: + - | + import requests, psycopg2, time, sys, urllib3 + from datetime import datetime + + # Disable SSL warnings + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + # Install deps + import subprocess + subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "psycopg2-binary", "requests", "urllib3"]) + + # Config - EXTERNAL URLS with SSL disabled + TEI_URL = "https://embeddings.betelgeusebytes.io" + QDRANT_URL = "https://vector.betelgeusebytes.io" + DB_CONFIG = { + 'host': 'pg.betelgeusebytes.io', + 'port': 5432, + 'dbname': 'hadith_db', + 'user': 'hadith_ingest', + 'password': '{{workflow.parameters.db_password}}' + } + + BATCH_SIZE = 32 + COLLECTION = "hadith_embeddings" + VERIFY_SSL = False # Important: Ignore SSL certificates + + print(f"Started: {datetime.now()}", flush=True) + print(f"SSL Verification: {VERIFY_SSL}", flush=True) + + conn = psycopg2.connect(**DB_CONFIG) + cur = conn.cursor() + + cur.execute("SELECT COUNT(*) FROM hadiths WHERE embedding_generated = FALSE") + total = cur.fetchone()[0] + print(f"Total to process: {total:,}", flush=True) + + processed = 0 + failed = 0 + offset = 0 + + while offset < total: + cur.execute(""" + SELECT id, arabic_text, english_text, collection_id, hadith_number + FROM hadiths WHERE embedding_generated = FALSE + ORDER BY id LIMIT %s OFFSET %s + """, (BATCH_SIZE, offset)) + + hadiths = cur.fetchall() + if not hadiths: + break + + try: + # Get embeddings (with verify=False) + texts = [" ".join(filter(None, [h[1], h[2]])) for h in hadiths] + resp = requests.post( + f"{TEI_URL}/embed", + json={"inputs": texts}, + timeout=60, + verify=VERIFY_SSL + ) + resp.raise_for_status() + embeddings = resp.json() + + # Upload to Qdrant (with verify=False) + points = [{ + "id": h[0], + "vector": embeddings[i], + "payload": {"collection_id": h[3], "hadith_number": h[4]} + } for i, h in enumerate(hadiths)] + + resp = requests.put( + f"{QDRANT_URL}/collections/{COLLECTION}/points", + json={"points": points}, + timeout=30, + verify=VERIFY_SSL + ) + resp.raise_for_status() + + # Update DB + cur.execute("UPDATE hadiths SET embedding_generated = TRUE WHERE id = ANY(%s)", ([h[0] for h in hadiths],)) + conn.commit() + + processed += len(hadiths) + if processed % 320 == 0: # Every 10 batches + pct = 100 * processed / total + print(f"Progress: {processed:,}/{total:,} ({pct:.1f}%) | Failed: {failed}", flush=True) + + except Exception as e: + print(f"Error at offset {offset}: {e}", flush=True) + failed += len(hadiths) + + offset += BATCH_SIZE + + cur.close() + conn.close() + + print(f"Complete: {processed:,} processed, {failed} failed at {datetime.now()}", flush=True) + resources: - requests: {cpu: 2, memory: 4Gi} - limits: {cpu: 4, memory: 8Gi} \ No newline at end of file + requests: + memory: "2Gi" + cpu: "1" + limits: + memory: "4Gi" + cpu: "2" \ No newline at end of file diff --git a/hadith-ingestion/combined.txt b/hadith-ingestion/combined.txt new file mode 100644 index 0000000..6299108 --- /dev/null +++ b/hadith-ingestion/combined.txt @@ -0,0 +1,3425 @@ +=== ./run-full-ingestion.sh === +#!/bin/bash +# run-full-ingestion.sh + +set -e + +echo "=== Starting Full HadithAPI Ingestion ===" + +# Book slug to collection abbreviation mapping +# Books to ingest (in order) +BOOKS=( + # "sahih-bukhari" + # "sahih-muslim" + # "abu-dawood" + # "al-tirmidhi" + # "ibn-e-majah" + # "sunan-nasai" + # "musnad-ahmad" + # "al-silsila-sahiha" + "mishkat" +) + +for BOOK in "${BOOKS[@]}"; do + echo -e "\n=========================================" + echo "Ingesting: $BOOK" + echo "=========================================" + + argo submit -n ml argo/workflows/ingest-hadithapi.yaml \ + --parameter book-slug=$BOOK \ + --parameter limit=0 \ + --wait \ + --log + + echo "$BOOK completed!" + + # Optional: add delay between books + sleep 10 +done + +echo -e "\n=== All Books Ingestion Complete ===" + +# Print summary +kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c " +SELECT + c.name_english, + c.abbreviation, + COUNT(h.id) as hadith_count, + COUNT(DISTINCT b.id) as chapter_count +FROM collections c +LEFT JOIN hadiths h ON c.id = h.collection_id +LEFT JOIN books b ON h.book_id = b.id +GROUP BY c.name_english, c.abbreviation +ORDER BY hadith_count DESC; +" +=== ./create-secrets.sh === +#!/bin/bash +# create-secrets.sh + +# Database secret +kubectl -n ml create secret generic hadith-db-secret \ + --from-literal=password='hadith_ingest' \ + --dry-run=client -o yaml | kubectl apply -f - + +# HadithAPI secret (already public, but for consistency) +kubectl -n ml create secret generic hadithapi-secret \ + --from-literal=api-key='$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK' \ + --dry-run=client -o yaml | kubectl apply -f - + +# MinIO secret +kubectl -n ml create secret generic minio-secret \ + --from-literal=access-key='minioadmin' \ + --from-literal=secret-key='minioadmin' \ + --dry-run=client -o yaml | kubectl apply -f - + +echo "Secrets created successfully" +=== ./full-ingestion.sh === +#!/bin/bash +# run-full-ingestion.sh + +set -e + +echo "=== Starting Full HadithAPI Ingestion ===" + +# Books to ingest (in order) +BOOKS=( + "sahih-bukhari" + "sahih-muslim" + "sunan-abu-dawood" + "jami-at-tirmidhi" + "sunan-an-nasai" + "sunan-ibn-e-majah" +) + +for BOOK in "${BOOKS[@]}"; do + echo -e "\n=========================================" + echo "Ingesting: $BOOK" + echo "=========================================" + + argo submit -n argo argo/workflows/ingest-hadithapi.yaml \ + --parameter book-slug=$BOOK \ + --parameter limit=0 \ + --wait \ + --log + + echo "$BOOK completed!" + + # Optional: add delay between books + sleep 10 +done + +echo -e "\n=== All Books Ingestion Complete ===" + +# Print summary +kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c " +SELECT + c.name_english, + c.abbreviation, + COUNT(h.id) as hadith_count, + COUNT(DISTINCT b.id) as chapter_count +FROM collections c +LEFT JOIN hadiths h ON c.id = h.collection_id +LEFT JOIN books b ON h.book_id = b.id +GROUP BY c.name_english, c.abbreviation +ORDER BY hadith_count DESC; +" +=== ./requirements.txt === +# Core dependencies +python-dotenv==1.0.0 +pydantic==2.5.0 +pydantic-settings==2.1.0 + +# HTTP clients +httpx==0.25.2 +requests==2.31.0 +tenacity==8.2.3 + +# Database +psycopg2-binary==2.9.9 +sqlalchemy==2.0.23 +asyncpg==0.29.0 + +# Data processing +pandas==2.1.4 +numpy==1.26.2 +pyarabic==0.6.15 +arabic-reshaper==3.0.0 + +# Validation +jsonschema==4.20.0 +validators==0.22.0 + +# Logging & Monitoring +structlog==23.2.0 +prometheus-client==0.19.0 + +# Cloud storage +minio==7.2.0 +boto3==1.34.0 + +# Task queue (optional) +celery==5.3.4 +redis==5.0.1 + +# Testing +pytest==7.4.3 +pytest-asyncio==0.21.1 +pytest-cov==4.1.0 +faker==21.0.0 + + +httpx==0.25.2 +qdrant-client==1.7.0 +tqdm==4.66.1 +asyncpg==0.29.0 +=== ./config/__init__.py === + +=== ./config/settings.py === +""" +Configuration settings for hadith ingestion service +""" +from pydantic_settings import BaseSettings +from typing import Optional +import os + + +class Settings(BaseSettings): + """Application settings loaded from environment variables""" + + # Database + # DATABASE_HOST: str = "postgres.db.svc.cluster.local" + DATABASE_HOST: str = "pg.betelgeusebytes.io" + DATABASE_PORT: int = 5432 + DATABASE_NAME: str = "hadith_db" + DATABASE_USER: str = "hadith_ingest" + DATABASE_PASSWORD: str = "hadith_ingest" + + @property + def DATABASE_URL(self) -> str: + return ( + f"postgresql://{self.DATABASE_USER}:{self.DATABASE_PASSWORD}" + f"@{self.DATABASE_HOST}:{self.DATABASE_PORT}/{self.DATABASE_NAME}" + ) + + @property + def ASYNC_DATABASE_URL(self) -> str: + return ( + f"postgresql+asyncpg://{self.DATABASE_USER}:{self.DATABASE_PASSWORD}" + f"@{self.DATABASE_HOST}:{self.DATABASE_PORT}/{self.DATABASE_NAME}" + ) + + # MinIO / S3 + MINIO_ENDPOINT: str = "minio.storage.svc.cluster.local:9000" + MINIO_ACCESS_KEY: str = "minioadmin" + MINIO_SECRET_KEY: str = "minioadmin" + MINIO_BUCKET_RAW: str = "hadith-raw-data" + MINIO_BUCKET_PROCESSED: str = "hadith-processed" + MINIO_SECURE: bool = False + SUNNAH_BASE_URL: str = "https://api.sunnah.com/v1" + HADITH_ONE_API_KEY: Optional[str] = "$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK" + # HadithAPI.com + HADITHAPI_KEY: str = "$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK" + HADITHAPI_BASE_URL: str = "https://hadithapi.com/api" + # Rate limiting + API_RATE_LIMIT: int = 30 # requests per minute + API_MAX_RETRIES: int = 3 + API_RETRY_DELAY: int = 5 # seconds + + # Processing + BATCH_SIZE: int = 100 + MAX_WORKERS: int = 4 + + # TEI Service (for embeddings) + TEI_URL: str = "http://tei.ml.svc.cluster.local" + TEI_TIMEOUT: int = 30 + + # Qdrant + QDRANT_URL: str = "http://qdrant.db.svc.cluster.local:6333" + QDRANT_COLLECTION: str = "hadith_embeddings" + + # Logging + LOG_LEVEL: str = "INFO" + LOG_FORMAT: str = "json" + + # Job tracking + JOB_NAME: Optional[str] = None + JOB_TYPE: str = "api_fetch" + + class Config: + env_file = ".env" + env_file_encoding = "utf-8" + case_sensitive = True + + +# Global settings instance +settings = Settings() +=== ./Dockerfile === +FROM python:3.11-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + gcc \ + postgresql-client \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements +COPY requirements.txt . + +# Install Python dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY config/ /app/config/ +COPY src/ /app/src/ + +# Create non-root user +RUN useradd -m -u 1000 hadith && chown -R hadith:hadith /app +USER hadith + +# Set Python path +ENV PYTHONPATH=/app + +# Default command +CMD ["python", "/app/src/main_hadithapi.py"] +=== ./tests/__init__.py === + +=== ./tests/test_clients.py === + +=== ./test-hadithapi-k8s.sh === +#!/bin/bash +# test-hadithapi-k8s.sh + +set -e + +echo "=== Kubernetes HadithAPI Integration Test ===" + +# 1. Create secrets +echo "Creating secrets..." +#./create-secrets.sh + +# 2. Build and load image (if using local cluster) +echo "Building Docker image..." +#docker build -t hadith-ingestion:latest . + +# If using kind/minikube, load image +# kind load docker-image hadith-ingestion:latest + +# 3. Submit test workflow (10 hadiths) +echo "Submitting test workflow..." +argo submit -n ml argo/workflows/ingest-hadithapi.yaml \ + --parameter book-slug=sahih-muslim \ + --parameter limit=10 \ + --wait \ + --log + +# 4. Check workflow status +echo -e "\nChecking workflow status..." +argo list -n argo + +# 5. Verify data in database +echo -e "\nVerifying data..." +kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c " +SELECT + c.name_english, + COUNT(h.id) as hadith_count, + MAX(h.created_at) as last_ingestion +FROM collections c +LEFT JOIN hadiths h ON c.id = h.collection_id +WHERE c.abbreviation = 'bukhari' +GROUP BY c.name_english; +" + +echo -e "\n=== Test Complete ===" +=== ./test_mainhadithapi.py === +#!/usr/bin/env python3 +""" +Test script for main_hadithapi.py +""" +import sys +import os +sys.path.insert(0, '.') + +from src.main_hadithapi import HadithAPIIngestionService + +def test_main_hadithapi(): + """Test the main HadithAPI ingestion service""" + print("=== Testing HadithAPI Ingestion Service ===\n") + + try: + # Initialize the service + print("1. Initializing HadithAPIIngestionService...") + service = HadithAPIIngestionService() + print("✓ Service initialized successfully\n") + + # Test 1: List available books + print("2. Testing book synchronization...") + book_mapping = service.sync_books_from_api() + print(f"✓ Found {len(book_mapping)} mapped books") + for book_slug, info in list(book_mapping.items())[:3]: # Show first 3 + print(f" - {book_slug}: {info['book_name']} ({info['hadiths_count']} hadiths)") + print() + + # Test 2: Test ingestion with limit + print("3. Testing limited ingestion (10 hadiths from Sahih Bukhari)...") + stats = service.ingest_collection( + book_slug='sahih-bukhari', + limit=10 + ) + print(f"✓ Ingestion completed with stats:") + print(f" Processed: {stats['processed']}") + print(f" Failed: {stats['failed']}") + print(f" Skipped: {stats['skipped']}\n") + + # Test 3: List books functionality + print("4. Testing book listing...") + print("\n=== Available Books ===\n") + for book_slug, info in book_mapping.items(): + print(f"Book Slug: {book_slug}") + print(f" Name: {info['book_name']}") + print(f" Hadiths: {info['hadiths_count']}") + print(f" Chapters: {info['chapters_count']}") + print() + + # Clean up + service.close() + print("=== All Tests Passed! ===") + return True + + except Exception as e: + print(f"✗ Test failed with error: {e}") + import traceback + traceback.print_exc() + return False + +def test_command_line_args(): + """Test command line argument parsing""" + print("=== Testing Command Line Arguments ===\n") + + # We'll simulate command line arguments + import argparse + from src.main_hadithapi import main + + # Test --list-books argument + print("1. Testing --list-books argument...") + original_argv = sys.argv.copy() + + try: + sys.argv = ['main_hadithapi.py', '--list-books'] + # We won't actually run main() as it would exit, but we can check the parsing + parser = argparse.ArgumentParser(description="Ingest hadiths from HadithAPI.com") + parser.add_argument("--book-slug", help="Book slug (e.g., sahih-bukhari)") + parser.add_argument("--limit", type=int, help="Limit number of hadiths to ingest") + parser.add_argument("--list-books", action="store_true", help="List available books and exit") + + args = parser.parse_args(['--list-books']) + print(f"✓ Argument parsing successful: list_books={args.list_books}") + + # Test book-slug argument + args = parser.parse_args(['--book-slug', 'sahih-bukhari', '--limit', '5']) + print(f"✓ Argument parsing successful: book_slug={args.book_slug}, limit={args.limit}") + + print("✓ Command line argument parsing works correctly\n") + return True + + except Exception as e: + print(f"✗ Argument parsing failed: {e}") + return False + finally: + sys.argv = original_argv + +if __name__ == "__main__": + print("Starting tests for main_hadithapi.py...\n") + + # Test command line arguments + if not test_command_line_args(): + sys.exit(1) + + # Test main functionality + if not test_main_hadithapi(): + sys.exit(1) + + print("\n🎉 All tests passed successfully!") + sys.exit(0) +=== ./setup.py === + +=== ./build-and-push.sh === +#!/bin/bash +# build-and-push.sh + +set -e + +# Configuration +IMAGE_NAME="hadith-ingestion" +TAG="${1:-latest}" +DOCKER_REGISTRY="axxs" +REGISTRY="${DOCKER_REGISTRY:-}" + +echo "Building Docker image: ${IMAGE_NAME}:${TAG}" + +# Build image +docker build -t ${IMAGE_NAME}:${TAG} -f Dockerfile . + +# Tag for registry +docker tag ${IMAGE_NAME}:${TAG} ${REGISTRY}/${IMAGE_NAME}:${TAG} + +# Push to registry +echo "Pushing to registry: ${REGISTRY}" +docker push ${REGISTRY}/${IMAGE_NAME}:${TAG} + +echo "Done!" +=== ./.env === +# Database +# DATABASE_HOST=postgres.db.svc.cluster.local +DATABASE_HOST = pg.betelgeusebytes.io +DATABASE_PORT=5432 +DATABASE_NAME=hadith_db +DATABASE_USER=hadith_ingest +DATABASE_PASSWORD=hadith_ingest + +# HadithAPI.com +HADITHAPI_KEY=$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK + +# MinIO +MINIO_ENDPOINT=minio.storage.svc.cluster.local:9000 +MINIO_ACCESS_KEY=minioadmin +MINIO_SECRET_KEY=minioadmin + +# Services +TEI_URL=http://tei.ml.svc.cluster.local +QDRANT_URL=http://qdrant.vector.svc.cluster.local:6333 + +# Settings +LOG_LEVEL=INFO +API_RATE_LIMIT=30 +BATCH_SIZE=100 +=== ./build-hadithapi-ingestion.sh === +#!/bin/bash +# build-hadithapi-ingestion.sh + +set -e + +IMAGE_NAME="hadith-ingestion" +TAG="v1.0-hadithapi" + +echo "Building Docker image for HadithAPI.com ingestion..." + +# Build image +docker build -t ${IMAGE_NAME}:${TAG} -f Dockerfile . + +# Tag as latest +docker tag ${IMAGE_NAME}:${TAG} ${IMAGE_NAME}:latest + +# If you have a registry, push +# docker push your-registry/${IMAGE_NAME}:${TAG} + +echo "Build complete: ${IMAGE_NAME}:${TAG}" +=== ./combine.sh === +find . -type f -name "*.txt" -o -name "production" -o -name "*.py" -o -name "*.yaml" -o -name "Dockerfile" -o -name "*.sh" -o -name "*.env" ! -name "*.md" ! -name "*.xls" ! -name "*.xlsx"| while read file; do + echo "=== $file ===" >> combined.txt + cat "$file" >> combined.txt + echo "" >> combined.txt +done + +=== ./test_hadithapi.py === +#!/usr/bin/env python3 +""" +Quick test script for hadithapi_client.py +""" +import sys +from venv import logger +sys.path.insert(0, '/app') + +from src.api_clients.hadithapi_client import HadithAPIClient +from config.settings import settings + +def test_api_connection(): + """Test basic API connectivity""" + print("=== Testing HadithAPI Client ===\n") + + client = HadithAPIClient() + + # Test 1: Get books + print("Test 1: Fetching available books...") + try: + books = client.get_books() + print(f"✓ Success! Found {len(books)} books") + for book in books[:3]: # Show first 3 + print(f" - {book.get('bookName')} ({book.get('bookSlug')})") + print(f" Hadiths: {book.get('hadiths_count')}, Chapters: {book.get('chapters_count')}") + logger.info(f"Fetched {len(books)} books successfully") + + except Exception as e: + print(f"✗ Failed: {e}") + return False + + # Test 2: Get chapters for Sahih Bukhari + print("\nTest 2: Fetching chapters for Sahih Bukhari...") + try: + chapters = client.get_chapters('sahih-bukhari') + print(f"✓ Success! Found {len(chapters)} chapters") + if chapters: + print(f" First chapter: {chapters[0].get('chapterEnglish')}") + except Exception as e: + print(f"✗ Failed: {e}") + return False + + # Test 3: Fetch first page of hadiths + print("\nTest 3: Fetching first page of hadiths...") + book_id = None + try: + book = client.get_book_by_slug('sahih-bukhari') + if not book: + print("✗ Failed: Book 'sahih-bukhari' not found") + return False + book_id = book.get('id') + page_data = client.get_hadiths_page('sahih-bukhari', page=1, limit=5) + hadiths = page_data.get('hadiths', []) + print(f"✓ Success! Fetched {len(hadiths)} hadiths") + if hadiths: + first = hadiths[0] + print(f" First hadith number: {first.get('hadithNumber')}") + print(f" Arabic text (first 100 chars): {first.get('hadithArabic', '')[:100]}...") + except Exception as e: + print(f"✗ Failed: {e}") + return False + + if book_id is None: + print("✗ Failed: Book ID unavailable for iterator test") + return False + + # # Test 4: Test iterator (fetch 3 hadiths) + print("\nTest 4: Testing hadith iterator (3 hadiths)...") + try: + count = 0 + + for hadith in client.iter_all_hadiths_in_book(book_id='sahih-bukhari', book_slug='sahih-bukhari', batch_size=10): + count += 1 + print(f" Hadith #{hadith.get('hadithNumber')} is {hadith.get('englishNarrator')} and is {hadith.get('status')} ") + if count >= 3: + break + print(f"✓ Success! Iterator working correctly") + except Exception as e: + print(f"✗ Failed: {e}") + return False + + client.close() + print("\n=== All Tests Passed! ===") + return True + +if __name__ == "__main__": + success = test_api_connection() + sys.exit(0 if success else 1) +=== ./test-hadithapi-local.sh === +#!/bin/bash +# test-hadithapi-local.sh + +set -e + +echo "=== HadithAPI.com Integration Test ===" + +# 1. Test API connection +echo "Testing API connection..." +curl -s "https://hadithapi.com/api/books?apiKey=\$2y\$10\$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK" | jq . + +# 2. Test database connection +echo -e "\nTesting database connection..." +kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c "SELECT COUNT(*) FROM collections;" + +# 3. List available books +echo -e "\nListing available books..." +python src/main_hadithapi.py --list-books + +# 4. Test ingestion (limited to 10 hadiths) +echo -e "\nRunning test ingestion (10 hadiths from Sahih Bukhari)..." +python src/main_hadithapi.py --book-slug sahih-muslim --limit 10 + +# 5. Verify data +echo -e "\nVerifying ingested data..." +kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c " +SELECT + c.name_english, + c.abbreviation, + COUNT(h.id) as hadith_count, + COUNT(DISTINCT b.id) as book_count +FROM collections c +LEFT JOIN hadiths h ON c.id = h.collection_id +LEFT JOIN books b ON h.book_id = b.id +WHERE c.abbreviation = 'bukhari' +GROUP BY c.name_english, c.abbreviation; +" + +echo -e "\n=== Test Complete ===" +=== ./simple-pod.yaml === +apiVersion: v1 +kind: Pod +metadata: + name: hadith-ingestion-list-books + namespace: ml +spec: + restartPolicy: Never + containers: + - name: hadith-ingestion + image: axxs/hadith-ingestion:latest + # command: ["python"] + # args: ["/app/src/main_hadithapi.py", "--list-books"] + command: ["sh","-c","sleep infinity"] + env: + - name: DATABASE_HOST + value: "postgres.db.svc.cluster.local" + - name: DATABASE_PORT + value: "5432" + - name: DATABASE_NAME + value: "hadith_db" + - name: DATABASE_USER + value: "hadith_ingest" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + - name: HADITHAPI_KEY + valueFrom: + secretKeyRef: + name: hadithapi-secret + key: api-key + - name: MINIO_ENDPOINT + value: "minio.storage.svc.cluster.local:9000" + - name: MINIO_ACCESS_KEY + valueFrom: + secretKeyRef: + name: minio-secret + key: access-key + - name: MINIO_SECRET_KEY + valueFrom: + secretKeyRef: + name: minio-secret + key: secret-key + - name: LOG_LEVEL + value: "INFO" +=== ./argo/workflows/ingest-collection.yaml === +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: ingest-hadith-collection- + namespace: argo +spec: + entrypoint: ingest-pipeline + + # Arguments + arguments: + parameters: + - name: collection + value: "bukhari" + - name: limit + value: "0" # 0 means no limit + + # Service account with database access + serviceAccountName: argo-workflow + + # Templates + templates: + + # ======================================== + # Main pipeline + # ======================================== + - name: ingest-pipeline + steps: + - - name: ingest-hadiths + template: ingest + arguments: + parameters: + - name: collection + value: "{{workflow.parameters.collection}}" + - name: limit + value: "{{workflow.parameters.limit}}" + + - - name: generate-embeddings + template: generate-embeddings + arguments: + parameters: + - name: collection + value: "{{workflow.parameters.collection}}" + + - - name: index-qdrant + template: index-qdrant + arguments: + parameters: + - name: collection + value: "{{workflow.parameters.collection}}" + + # ======================================== + # Ingestion step + # ======================================== + - name: ingest + inputs: + parameters: + - name: collection + - name: limit + + container: + image: hadith-ingestion:latest + imagePullPolicy: Always + command: [python, /app/src/main_hadithapi.py] + args: + - "{{inputs.parameters.collection}}" + - "--limit={{inputs.parameters.limit}}" + + env: + - name: DATABASE_HOST + value: "postgres.db.svc.cluster.local" + - name: DATABASE_PORT + value: "5432" + - name: DATABASE_NAME + value: "hadith_db" + - name: DATABASE_USER + value: "hadith_ingest" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + + - name: MINIO_ENDPOINT + value: "minio.storage.svc.cluster.local:9000" + - name: MINIO_ACCESS_KEY + valueFrom: + secretKeyRef: + name: minio-secret + key: access-key + - name: MINIO_SECRET_KEY + valueFrom: + secretKeyRef: + name: minio-secret + key: secret-key + + - name: SUNNAH_API_KEY + valueFrom: + secretKeyRef: + name: sunnah-api-secret + key: api-key + + - name: LOG_LEVEL + value: "INFO" + - name: JOB_NAME + value: "{{workflow.name}}" + + resources: + requests: + cpu: 500m + memory: 1Gi + limits: + cpu: 2 + memory: 4Gi + + # ======================================== + # Embedding generation step + # ======================================== + - name: generate-embeddings + inputs: + parameters: + - name: collection + + container: + image: hadith-embeddings:latest + imagePullPolicy: Always + command: [python, /app/src/embeddings/generator.py] + args: + - "--collection={{inputs.parameters.collection}}" + - "--batch-size=32" + + env: + - name: DATABASE_URL + value: "postgresql://hadith_ingest:$(DATABASE_PASSWORD)@postgres.db.svc.cluster.local:5432/hadith_db" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + + - name: TEI_URL + value: "http://tei.ml.svc.cluster.local" + + - name: LOG_LEVEL + value: "INFO" + + resources: + requests: + cpu: 1 + memory: 2Gi + limits: + cpu: 4 + memory: 8Gi + + # ======================================== + # Qdrant indexing step + # ======================================== + - name: index-qdrant + inputs: + parameters: + - name: collection + + container: + image: hadith-qdrant-indexer:latest + imagePullPolicy: Always + command: [python, /app/index_qdrant.py] + args: + - "--collection={{inputs.parameters.collection}}" + - "--batch-size=100" + + env: + - name: DATABASE_URL + value: "postgresql://hadith_ingest:$(DATABASE_PASSWORD)@postgres.db.svc.cluster.local:5432/hadith_db" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + + - name: QDRANT_URL + value: "http://qdrant.vector.svc.cluster.local:6333" + - name: QDRANT_COLLECTION + value: "hadith_embeddings" + + - name: LOG_LEVEL + value: "INFO" + + resources: + requests: + cpu: 500m + memory: 1Gi + limits: + cpu: 2 + memory: 4Gi +=== ./argo/workflows/ingest-hadithapi.yaml === +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: ingest-hadithapi- + namespace: ml +spec: + entrypoint: ingest-pipeline + + arguments: + parameters: + - name: book-slug + value: "al-tirmidhi" + - name: limit + value: "0" # 0 means no limit + + serviceAccountName: argo-workflow + + templates: + + # ======================================== + # Main pipeline + # ======================================== + - name: ingest-pipeline + steps: + - - name: ingest-hadiths + template: ingest + arguments: + parameters: + - name: book-slug + value: "{{workflow.parameters.book-slug}}" + - name: limit + value: "{{workflow.parameters.limit}}" + + # ======================================== + # Ingestion step + # ======================================== + - name: ingest + inputs: + parameters: + - name: book-slug + - name: limit + + container: + image: axxs/hadith-ingestion:latest + imagePullPolicy: Always + command: [python, /app/src/main_hadithapi.py] + args: + - "--book-slug={{inputs.parameters.book-slug}}" + - "--limit={{inputs.parameters.limit}}" + + env: + - name: DATABASE_HOST + value: "postgres.db.svc.cluster.local" + - name: DATABASE_PORT + value: "5432" + - name: DATABASE_NAME + value: "hadith_db" + - name: DATABASE_USER + value: "hadith_ingest" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + + - name: HADITHAPI_KEY + valueFrom: + secretKeyRef: + name: hadithapi-secret + key: api-key + + - name: MINIO_ENDPOINT + value: "minio.storage.svc.cluster.local:9000" + - name: MINIO_ACCESS_KEY + valueFrom: + secretKeyRef: + name: minio-secret + key: access-key + - name: MINIO_SECRET_KEY + valueFrom: + secretKeyRef: + name: minio-secret + key: secret-key + + - name: LOG_LEVEL + value: "INFO" + - name: JOB_NAME + value: "{{workflow.name}}" + - name: API_RATE_LIMIT + value: "30" + + resources: + requests: + cpu: 500m + memory: 1Gi + limits: + cpu: 2 + memory: 4Gi +--- +# Workflow to ingest ALL books +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: ingest-all-hadithapi- + namespace: ml +spec: + entrypoint: ingest-all-books + + serviceAccountName: argo-workflow + + arguments: + parameters: + - name: limit-per-book + value: "0" # 0 means no limit + + templates: + + # ======================================== + # Main pipeline - sequential processing + # ======================================== + - name: ingest-all-books + steps: + # Process each book sequentially to avoid rate limiting + - - name: sahih-bukhari + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "sahih-bukhari" + + - - name: sahih-muslim + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "sahih-muslim" + + - - name: sunan-abu-dawood + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "abu-dawood" + + - - name: jami-at-tirmidhi + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "al-tirmidhi" + + - - name: sunan-an-nasai + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "sunan-nasai" + + - - name: sunan-ibn-e-majah + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "ibn-e-majah" + + - - name: musnad-ahmad + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "musnad-ahmad" + + + - - name: al-silsila-sahiha + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "al-silsila-sahiha" + + # ======================================== + # Book ingestion template + # ======================================== + - name: ingest-book + inputs: + parameters: + - name: book-slug + + container: + image: axxs/hadith-ingestion:latest + imagePullPolicy: Always + command: [python, /app/src/main_hadithapi.py] + args: + - "--book-slug={{inputs.parameters.book-slug}}" + - "--limit={{workflow.parameters.limit-per-book}}" + + env: + - name: DATABASE_HOST + value: "postgres.db.svc.cluster.local" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + - name: HADITHAPI_KEY + valueFrom: + secretKeyRef: + name: hadithapi-secret + key: api-key + - name: LOG_LEVEL + value: "INFO" + + resources: + requests: + cpu: 500m + memory: 1Gi + limits: + cpu: 2 + memory: 4Gi +=== ./argo/workflows/generate-embeddings.yaml === +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: generate-embeddings- + namespace: ml +spec: + entrypoint: generate + serviceAccountName: argo-workflow + + arguments: + parameters: + - name: batch-size + value: "32" + + templates: + - name: generate + container: + image: hadith-ingestion:latest + command: [python, /app/src/embeddings/generator.py] + args: ["--batch-size={{workflow.parameters.batch-size}}"] + env: + - name: DATABASE_HOST + value: "pg.betelgeusebytes.io" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + resources: + requests: {cpu: 2, memory: 4Gi} + limits: {cpu: 4, memory: 8Gi} +=== ./src/database/__init__.py === + +=== ./src/database/connection.py === + +=== ./src/database/repository.py === +""" +Database repository for hadith data operations +""" +from typing import List, Dict, Any, Optional +import json +from uuid import UUID +import structlog +from sqlalchemy import create_engine, text, select, insert, update +from sqlalchemy.orm import sessionmaker, Session +from sqlalchemy.exc import IntegrityError +from config.settings import settings + +logger = structlog.get_logger() + + +class HadithRepository: + """Repository for hadith database operations""" + + def __init__(self, database_url: Optional[str] = None): + self.database_url = database_url or settings.DATABASE_URL + self.engine = create_engine(self.database_url, pool_pre_ping=True) + self.SessionLocal = sessionmaker(bind=self.engine) + + @staticmethod + def _coerce_uuid(value: Any) -> UUID: + if isinstance(value, UUID): + return value + return UUID(str(value)) + + def get_session(self) -> Session: + """Get database session""" + return self.SessionLocal() + + # ===== Collections ===== + + def get_collection_by_abbreviation(self, abbr: str) -> Optional[Dict[str, Any]]: + """Get collection by abbreviation""" + with self.get_session() as session: + query = text(""" + SELECT * FROM collections + WHERE abbreviation = :abbr + """) + result = session.execute(query, {"abbr": abbr}).fetchone() + + if result: + return dict(result._mapping) + return None + + def get_all_collections(self) -> List[Dict[str, Any]]: + """Get all collections""" + with self.get_session() as session: + query = text("SELECT * FROM collections ORDER BY name_english") + result = session.execute(query).fetchall() + return [dict(row._mapping) for row in result] + + def update_collection_count(self, collection_id: UUID, count: int): + """Update total hadith count for a collection""" + with self.get_session() as session: + query = text(""" + UPDATE collections + SET total_hadiths = :count, updated_at = NOW() + WHERE id = :id + """) + session.execute(query, {"id": str(collection_id), "count": count}) + session.commit() + + # ===== Books ===== + + def upsert_book( + self, + collection_id: UUID, + book_number: int, + name_english: Optional[str] = None, + name_arabic: Optional[str] = None, + metadata: Optional[Dict] = None + ) -> UUID: + """Insert or update a book""" + metadata_json = json.dumps(metadata or {}) + with self.get_session() as session: + query = text(""" + INSERT INTO books (collection_id, book_number, name_english, name_arabic, metadata) + VALUES (:collection_id, :book_number, :name_english, :name_arabic, :metadata) + ON CONFLICT (collection_id, book_number) + DO UPDATE SET + name_english = EXCLUDED.name_english, + name_arabic = EXCLUDED.name_arabic, + metadata = EXCLUDED.metadata + RETURNING id + """) + + result = session.execute(query, { + "collection_id": str(collection_id), + "book_number": book_number, + "name_english": name_english, + "name_arabic": name_arabic, + "metadata": metadata_json + }) + session.commit() + + return self._coerce_uuid(result.fetchone()[0]) + + def get_book(self, collection_id: UUID, book_number: int) -> Optional[Dict[str, Any]]: + """Get book by collection and book number""" + with self.get_session() as session: + query = text(""" + SELECT * FROM books + WHERE collection_id = :collection_id AND book_number = :book_number + """) + result = session.execute(query, { + "collection_id": str(collection_id), + "book_number": book_number + }).fetchone() + + if result: + return dict(result._mapping) + return None + + # ===== Hadiths ===== + + def upsert_hadith( + self, + collection_id: UUID, + hadith_number: int, + arabic_text: str, + book_id: Optional[UUID] = None, + english_text: Optional[str] = None, + urdu_text: Optional[str] = None, + grade: Optional[str] = None, + grade_source: Optional[str] = None, + chapter_name: Optional[str] = None, + source_id: Optional[str] = None, + source_url: Optional[str] = None, + source_metadata: Optional[Dict] = None + ) -> UUID: + """Insert or update a hadith""" + + with self.get_session() as session: + query = text(""" + INSERT INTO hadiths ( + collection_id, book_id, hadith_number, + arabic_text, english_text, urdu_text, + grade, grade_source, chapter_name, + source_id, source_url, source_metadata + ) + VALUES ( + :collection_id, :book_id, :hadith_number, + :arabic_text, :english_text, :urdu_text, + :grade, :grade_source, :chapter_name, + :source_id, :source_url, :source_metadata + ) + ON CONFLICT (collection_id, book_id, hadith_number) + DO UPDATE SET + arabic_text = EXCLUDED.arabic_text, + english_text = EXCLUDED.english_text, + urdu_text = EXCLUDED.urdu_text, + grade = EXCLUDED.grade, + grade_source = EXCLUDED.grade_source, + chapter_name = EXCLUDED.chapter_name, + source_url = EXCLUDED.source_url, + source_metadata = EXCLUDED.source_metadata, + updated_at = NOW() + RETURNING id + """) + metadata_json = json.dumps(source_metadata or {}) + + result = session.execute(query, { + "collection_id": str(collection_id), + "book_id": str(book_id) if book_id else None, + "hadith_number": hadith_number, + "arabic_text": arabic_text, + "english_text": english_text, + "urdu_text": urdu_text, + "grade": grade, + "grade_source": grade_source, + "chapter_name": chapter_name, + "source_id": source_id, + "source_url": source_url, + "source_metadata": metadata_json + }) + session.commit() + + return self._coerce_uuid(result.fetchone()[0]) + + def get_hadiths_without_embeddings( + self, + limit: int = 100, + collection_id: Optional[UUID] = None + ) -> List[Dict[str, Any]]: + """Get hadiths that need embedding generation""" + with self.get_session() as session: + if collection_id: + query = text(""" + SELECT * FROM hadiths + WHERE embedding_generated = FALSE + AND collection_id = :collection_id + ORDER BY created_at ASC + LIMIT :limit + """) + result = session.execute(query, { + "collection_id": str(collection_id), + "limit": limit + }).fetchall() + else: + query = text(""" + SELECT * FROM hadiths + WHERE embedding_generated = FALSE + ORDER BY created_at ASC + LIMIT :limit + """) + result = session.execute(query, {"limit": limit}).fetchall() + + return [dict(row._mapping) for row in result] + + def mark_embedding_generated(self, hadith_id: UUID, version: str = "v1"): + """Mark hadith as having embedding generated""" + with self.get_session() as session: + # Prepare the update query + query = text(""" + UPDATE hadiths + SET embedding_generated = TRUE, + embedding_version = :version, + updated_at = NOW() + WHERE id = :id + """) + # Pre-serialize parameters (keeping consistent with other methods that + # serialize payloads/configs before execution) + params = {"id": str(hadith_id), "version": version} + session.execute(query, {"id": str(hadith_id), "version": version}) + session.commit() + + # ===== Ingestion Jobs ===== + + def create_ingestion_job( + self, + job_name: str, + job_type: str, + source_name: str, + config: Optional[Dict] = None + ) -> UUID: + """Create a new ingestion job""" + with self.get_session() as session: + query = text(""" + INSERT INTO ingestion_jobs (job_name, job_type, source_name, config, status, started_at) + VALUES (:job_name, :job_type, :source_name, :config, 'running', NOW()) + RETURNING id + """) + # serialize config as JSON for storage + result = session.execute(query, { + "job_name": job_name, + "job_type": job_type, + "source_name": source_name, + "config": json.dumps(config or {}) + }) + session.commit() + job_id = result.fetchone()[0] + return job_id if isinstance(job_id, UUID) else UUID(str(job_id)) + + def update_job_progress( + self, + job_id: UUID, + total: Optional[int] = None, + processed: Optional[int] = None, + failed: Optional[int] = None, + skipped: Optional[int] = None + ): + """Update job progress counters""" + with self.get_session() as session: + updates = [] + params = {"job_id": str(job_id)} + + if total is not None: + updates.append("total_records = :total") + params["total"] = total + if processed is not None: + updates.append("processed_records = :processed") + params["processed"] = processed + if failed is not None: + updates.append("failed_records = :failed") + params["failed"] = failed + if skipped is not None: + updates.append("skipped_records = :skipped") + params["skipped"] = skipped + + if updates: + query_str = f""" + UPDATE ingestion_jobs + SET {', '.join(updates)} + WHERE id = :job_id + """ + session.execute(text(query_str), params) + session.commit() + + def complete_job( + self, + job_id: UUID, + status: str = "success", + error_message: Optional[str] = None + ): + """Mark job as completed""" + with self.get_session() as session: + query = text(""" + UPDATE ingestion_jobs + SET status = :status, + completed_at = NOW(), + duration_seconds = EXTRACT(EPOCH FROM (NOW() - started_at)), + error_message = :error_message + WHERE id = :job_id + """) + session.execute(query, { + "job_id": str(job_id), + "status": status, + "error_message": error_message + }) + session.commit() + + def add_processing_log( + self, + job_id: UUID, + level: str, + message: str, + details: Optional[Dict] = None + ): + """Add a processing log entry""" + with self.get_session() as session: + query = text(""" + INSERT INTO processing_logs (job_id, log_level, message, details) + VALUES (:job_id, :level, :message, :details) + """) + details_json = json.dumps(details or {}) + session.execute(query, { + "job_id": str(job_id), + "level": level, + "message": message, + "details": details_json + }) + session.commit() + + # ===== Statistics ===== + + def get_collection_stats(self, collection_id: UUID) -> Dict[str, Any]: + """Get statistics for a collection""" + with self.get_session() as session: + query = text(""" + SELECT * FROM get_collection_statistics(:collection_id) + """) + result = session.execute(query, {"collection_id": str(collection_id)}).fetchone() + + if result: + return dict(result._mapping) + return {} +=== ./src/embeddings/__init__.py === + +=== ./src/embeddings/generator.py === +# Update: src/embeddings/generator.py +""" +Embedding generation service for hadith texts +""" +import asyncio +import httpx +from typing import List, Tuple, Optional +import psycopg2 +from qdrant_client import QdrantClient +from qdrant_client.models import Distance, VectorParams, PointStruct +import structlog +from tqdm import tqdm +import sys +import argparse + +from config.settings import settings + +logger = structlog.get_logger() + + +class EmbeddingGenerator: + def __init__(self, database_url: str, tei_url: str, qdrant_url: str, batch_size: int = 32): + self.database_url = database_url + self.tei_url = tei_url + self.qdrant_url = qdrant_url + self.batch_size = batch_size + self.http_client = httpx.AsyncClient(timeout=60.0) + self.qdrant = QdrantClient(url=qdrant_url) + + async def generate_embeddings_batch(self, texts: List[str]) -> List[List[float]]: + """Generate embeddings using TEI""" + response = await self.http_client.post( + f"{self.tei_url}/embed", + json={"inputs": texts} + ) + response.raise_for_status() + return response.json() + + def create_collection(self, name: str = "hadith_embeddings"): + """Create Qdrant collection""" + try: + self.qdrant.get_collection(name) + except: + self.qdrant.create_collection( + collection_name=name, + vectors_config=VectorParams(size=1024, distance=Distance.COSINE) + ) + + async def process_batch(self, conn, hadiths: List[Tuple], collection: str): + """Process batch: generate embeddings & store""" + texts = [f"{h[1]} {h[2] or ''}" for h in hadiths] # arabic + english + embeddings = await self.generate_embeddings_batch(texts) + + points = [ + PointStruct( + id=str(h[0]), + vector=emb, + payload={"hadith_id": str(h[0]), "collection_id": str(h[4])} + ) + for h, emb in zip(hadiths, embeddings) + ] + + self.qdrant.upsert(collection_name=collection, points=points) + + # Mark completed + cursor = conn.cursor() + ids = [str(h[0]) for h in hadiths] + cursor.execute( + "UPDATE hadiths SET embedding_generated = TRUE, embedding_version = 'v1' WHERE id = ANY(%s)", + (ids,) + ) + conn.commit() + cursor.close() + + return len(points) + + async def generate_all(self, collection: str = "hadith_embeddings"): + """Generate embeddings for all hadiths""" + self.create_collection(collection) + conn = psycopg2.connect(self.database_url) + cursor = conn.cursor() + + cursor.execute("SELECT COUNT(*) FROM hadiths WHERE embedding_generated = FALSE") + total = cursor.fetchone()[0] + cursor.close() + + if total == 0: + print("All hadiths already have embeddings!") + return + + print(f"Generating embeddings for {total} hadiths...") + processed = 0 + + with tqdm(total=total) as pbar: + while True: + cursor = conn.cursor() + cursor.execute(""" + SELECT id, arabic_text, english_text, urdu_text, collection_id + FROM hadiths + WHERE embedding_generated = FALSE + LIMIT 1000 + """) + hadiths = cursor.fetchall() + cursor.close() + + if not hadiths: + break + + for i in range(0, len(hadiths), self.batch_size): + batch = hadiths[i:i+self.batch_size] + try: + count = await self.process_batch(conn, batch, collection) + processed += count + pbar.update(count) + except Exception as e: + logger.error("batch_failed", error=str(e)) + + conn.close() + print(f"\nCompleted! Generated {processed} embeddings.") + + async def close(self): + await self.http_client.aclose() + + +async def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--batch-size", type=int, default=32) + args = parser.parse_args() + + gen = EmbeddingGenerator( + database_url=settings.DATABASE_URL, + tei_url="http://tei.ml.svc.cluster.local", + qdrant_url="http://qdrant.vector.svc.cluster.local:6333", + batch_size=args.batch_size + ) + + try: + await gen.generate_all() + return 0 + except Exception as e: + logger.error("generation_failed", error=str(e)) + return 1 + finally: + await gen.close() + + +if __name__ == "__main__": + sys.exit(asyncio.run(main())) +=== ./src/main_hadithapi.py === +""" +Main ingestion script for fetching hadiths from HadithAPI.com +""" +import sys +from pathlib import Path +import argparse +from typing import Optional, Dict, Any +from uuid import UUID +import structlog + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + +from config.settings import settings +from src.api_clients.hadithapi_client import HadithAPIClient +from src.database.repository import HadithRepository +from src.processors.text_cleaner import ArabicTextProcessor, TextCleaner + +# Configure structured logging +structlog.configure( + processors=[ + structlog.stdlib.filter_by_level, + structlog.stdlib.add_logger_name, + structlog.stdlib.add_log_level, + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.JSONRenderer() + ], + wrapper_class=structlog.stdlib.BoundLogger, + context_class=dict, + logger_factory=structlog.stdlib.LoggerFactory(), + cache_logger_on_first_use=True, +) + +logger = structlog.get_logger() + + +# Book slug to collection abbreviation mapping +BOOK_SLUG_MAPPING = { + 'sahih-bukhari': 'bukhari', + 'sahih-muslim': 'muslim', + 'abu-dawood': 'abudawud', + 'al-tirmidhi': 'tirmidhi', + 'sunan-nasai': 'nasai', + 'ibn-e-majah': 'ibnmajah', + 'muwatta-imam-malik': 'malik', + 'musnad-ahmad': 'ahmad', + 'sunan-ad-darimi': 'darimi', + 'mishkat': 'mishkat', + 'al-silsila-sahiha': 'al-silsila-sahiha' +} + + +class HadithAPIIngestionService: + """Service for ingesting hadiths from HadithAPI.com""" + + def __init__(self): + self.api_client = HadithAPIClient() + self.repo = HadithRepository() + self.text_processor = ArabicTextProcessor() + self.text_cleaner = TextCleaner() + + def sync_books_from_api(self) -> Dict[str, Any]: + """ + Sync book metadata from API to database + + Returns: + Dictionary mapping book_slug -> collection_id + """ + logger.info("syncing_books_from_api") + + # Get books from API + api_books = self.api_client.get_books() + + book_mapping = {} + # print(BOOK_SLUG_MAPPING) + for api_book in api_books: + book_slug = api_book.get('bookSlug') + # print(book_slug) + # Map to our collection abbreviation + collection_abbr = BOOK_SLUG_MAPPING.get(book_slug) + + if not collection_abbr: + logger.warning( + "unmapped_book", + book_slug=book_slug, + book_name=api_book.get('bookName') + ) + continue + # Get or verify collection exists in database + collection = self.repo.get_collection_by_abbreviation(collection_abbr) + + if not collection: + logger.warning( + "collection_not_in_db", + abbreviation=collection_abbr, + book_slug=book_slug + ) + continue + + collection_id = collection['id'] + if not isinstance(collection_id, UUID): + collection_id = UUID(str(collection_id)) + book_mapping[book_slug] = { + 'collection_id': collection_id, + 'book_id': api_book.get('id'), + 'book_name': api_book.get('bookName'), + 'hadiths_count': api_book.get('hadiths_count'), + 'chapters_count': api_book.get('chapters_count') + } + + logger.info( + "book_mapped", + book_slug=book_slug, + collection_abbr=collection_abbr, + hadiths_count=api_book.get('hadiths_count') + ) + + logger.info( + "books_synced", + total_books=len(book_mapping) + ) + + return book_mapping + + def ingest_collection( + self, + book_slug: str, + limit: Optional[int] = None + ) -> dict: + """ + Ingest entire collection from HadithAPI.com + + Args: + book_slug: Book slug identifier (e.g., 'sahih-bukhari') + limit: Optional limit on number of hadiths to ingest + + Returns: + Statistics dictionary + """ + logger.info( + "ingestion_started", + book_slug=book_slug, + limit=limit + ) + + # Get book mapping + book_mapping = self.sync_books_from_api() + logger.info("containing books", book_mapping=book_mapping) + logger.info("Book slugs", book_slug=book_slug) + if book_slug not in book_mapping: + logger.error( + "book_not_mapped", + book_slug=book_slug, + available_books=list(book_mapping.keys()) + ) + raise ValueError(f"Book '{book_slug}' not found or not mapped") + + book_info = book_mapping[book_slug] + collection_id = book_info['collection_id'] + # book_id = book_info['book_id'] + book_id = book_slug + + # Create ingestion job + job_id = self.repo.create_ingestion_job( + job_name=f"ingest_{book_slug}", + job_type="api_fetch", + source_name="hadithapi.com", + config={ + "book_slug": book_slug, + "book_id": book_id, + "limit": limit + } + ) + + logger.info( + "job_created", + job_id=str(job_id), + book_slug=book_slug, + expected_count=book_info.get('hadiths_count') + ) + + stats = { + "processed": 0, + "failed": 0, + "skipped": 0 + } + + try: + # Iterate through all hadiths in book + for hadith_data, chapter_data in self.api_client.iter_all_hadiths_in_book_with_chapters( + book_id=book_id, + book_slug=book_slug, + batch_size=100 + ): + # Check limit + if limit and stats["processed"] >= limit: + logger.info("limit_reached", limit=limit) + break + + try: + # Process and store hadith + self._process_and_store_hadith( + collection_id=collection_id, + hadith_data=hadith_data, + chapter_data=chapter_data + ) + + stats["processed"] += 1 + + # Update job progress every 100 hadiths + if stats["processed"] % 100 == 0: + self.repo.update_job_progress( + job_id=job_id, + processed=stats["processed"], + failed=stats["failed"], + skipped=stats["skipped"] + ) + + logger.info( + "progress_update", + book_slug=book_slug, + processed=stats["processed"], + failed=stats["failed"], + percentage=round( + (stats["processed"] / int(book_info.get('hadiths_count', 1))) * 100, + 2 + ) if book_info.get('hadiths_count') else 0 + ) + + except Exception as e: + stats["failed"] += 1 + logger.error( + "hadith_processing_failed", + error=str(e), + hadith_number=hadith_data.get("hadithNumber"), + hadith_id=hadith_data.get("id") + ) + + self.repo.add_processing_log( + job_id=job_id, + level="ERROR", + message=f"Failed to process hadith: {str(e)}", + details={"hadith_data": hadith_data} + ) + + # Update final job progress + self.repo.update_job_progress( + job_id=job_id, + total=stats["processed"] + stats["failed"] + stats["skipped"], + processed=stats["processed"], + failed=stats["failed"], + skipped=stats["skipped"] + ) + + # Mark job as complete + self.repo.complete_job(job_id=job_id, status="success") + + # Update collection count + self.repo.update_collection_count( + collection_id=collection_id, + count=stats["processed"] + ) + + logger.info( + "ingestion_completed", + book_slug=book_slug, + stats=stats + ) + + return stats + + except Exception as e: + logger.error( + "ingestion_failed", + book_slug=book_slug, + error=str(e), + exc_info=True + ) + + self.repo.complete_job( + job_id=job_id, + status="failed", + error_message=str(e) + ) + + raise + + def _process_and_store_hadith( + self, + collection_id: UUID, + hadith_data: dict, + chapter_data: Optional[dict] + ): + """Process and store a single hadith""" + + # Extract hadith number + hadith_number = hadith_data.get("hadithNumber") + if not hadith_number: + raise ValueError("Missing hadith number") + + # Convert to integer + try: + hadith_number = int(hadith_number) + except (ValueError, TypeError): + raise ValueError(f"Invalid hadith number: {hadith_number}") + + # Extract text in multiple languages + arabic_text = hadith_data.get("hadithArabic") + english_text = hadith_data.get("hadithEnglish") + urdu_text = hadith_data.get("hadithUrdu") + + if not arabic_text: + raise ValueError("Missing Arabic text") + # passed logger.warning("Arabic text extracted and validated", hadith_number=hadith_number) + # Clean texts + arabic_text = self.text_cleaner.clean_text(arabic_text) + if english_text: + english_text = self.text_cleaner.clean_text(english_text) + if urdu_text: + urdu_text = self.text_cleaner.clean_text(urdu_text) + + # Extract grade/status + grade = hadith_data.get("status") + + # Get or create chapter (book in our schema) + book_id = None + chapter_name = None + # logger.warning("Processing chapter data####", chapter_data.get('positional_args', {}).get('id')) + # logger.info("Processing chapter data####2####", chapter_data.get('id')) + if chapter_data: + chapter_id = chapter_data.get('id') + chapter_number = chapter_data.get('chapterNumber') + chapter_name_en = chapter_data.get('chapterEnglish') + chapter_name_ar = chapter_data.get('chapterArabic') + chapter_name = chapter_name_en + # print(chapter_number, chapter_name) + if chapter_number: + try: + chapter_number = int(chapter_number) + except (ValueError, TypeError): + chapter_number = chapter_id # Fallback to ID + + # Get or create book (chapter in HadithAPI = book in our schema) + existing_book = self.repo.get_book(collection_id, chapter_number) + # logger.warning("EXISTING BOOK : ", existing_book) + # logger.warning("Fetched or created book", collection_id=collection_id, chapter_number=chapter_number, chapter_name=chapter_name) + if not existing_book: + book_id = self.repo.upsert_book( + collection_id=collection_id, + book_number=chapter_number, + name_english=chapter_name_en, + name_arabic=chapter_name_ar, + metadata=chapter_data + ) + else: + # print(existing_book['id']) + existing_id = existing_book['id'] + if isinstance(existing_id, str): + existing_id = UUID(existing_id) + book_id = existing_id + + # Build source metadata + source_metadata = { + 'api_id': hadith_data.get('id'), + 'englishNarrator': hadith_data.get('englishNarrator'), + 'urduNarrator': hadith_data.get('urduNarrator'), + 'bookSlug': hadith_data.get('bookSlug'), + 'chapterId': hadith_data.get('chapterId'), + 'chapter': chapter_data + } + # logger.warning("Hadith metadata built", source_metadata) + # Store hadith + hadith_id = self.repo.upsert_hadith( + collection_id=collection_id, + book_id=book_id, + hadith_number=hadith_number, + arabic_text=arabic_text, + english_text=english_text, + urdu_text=urdu_text, + grade=grade, + grade_source="hadithapi.com", + chapter_name=chapter_name, + source_id=str(hadith_data.get('id', '')), + source_url=f"https://hadithapi.com/hadith/{hadith_data.get('id')}", + source_metadata=source_metadata + ) + + logger.debug( + "hadith_stored", + hadith_id=str(hadith_id), + hadith_number=hadith_number, + chapter_id=chapter_data.get('id') if chapter_data else None + ) + + def ingest_all_books(self, limit_per_book: Optional[int] = None) -> Dict[str, dict]: + """ + Ingest all available books + + Args: + limit_per_book: Optional limit per book + + Returns: + Dictionary of book_slug -> stats + """ + logger.info("ingesting_all_books", limit_per_book=limit_per_book) + + book_mapping = self.sync_books_from_api() + results = {} + + for book_slug in book_mapping.keys(): + logger.info("starting_book", book_slug=book_slug) + + try: + stats = self.ingest_collection( + book_slug=book_slug, + limit=limit_per_book + ) + results[book_slug] = {"status": "success", "stats": stats} + + except Exception as e: + logger.error( + "book_ingestion_failed", + book_slug=book_slug, + error=str(e) + ) + results[book_slug] = {"status": "failed", "error": str(e)} + + logger.info("all_books_completed", results=results) + return results + + def close(self): + """Close connections""" + self.api_client.close() + + +def main(): + """Main entry point""" + parser = argparse.ArgumentParser( + description="Ingest hadiths from HadithAPI.com" + ) + parser.add_argument( + "--book-slug", + help="Book slug (e.g., sahih-bukhari). If not provided, ingests all books." + ) + parser.add_argument( + "--limit", + type=int, + help="Limit number of hadiths to ingest per book" + ) + parser.add_argument( + "--list-books", + action="store_true", + help="List available books and exit" + ) + + args = parser.parse_args() + + try: + service = HadithAPIIngestionService() + + # List books mode + if args.list_books: + logger.info("listing_available_books") + book_mapping = service.sync_books_from_api() + + print("\n=== Available Books ===\n") + for book_slug, info in book_mapping.items(): + print(f"Book Slug: {book_slug}") + print(f" Name: {info['book_name']}") + print(f" Hadiths: {info['hadiths_count']}") + print(f" Chapters: {info['chapters_count']}") + print() + + service.close() + return 0 + + # Ingest mode + if args.book_slug: + logger.info( + "script_started", + book_slug=args.book_slug, + limit=args.limit + ) + + stats = service.ingest_collection( + book_slug=args.book_slug, + limit=args.limit + ) + + logger.info("script_completed", stats=stats) + + print(f"\n=== Ingestion completed for {args.book_slug} ===") + print(f"Processed: {stats['processed']}") + print(f"Failed: {stats['failed']}") + print(f"Skipped: {stats['skipped']}") + + else: + # Ingest all books + logger.info("script_started_all_books", limit_per_book=args.limit) + + results = service.ingest_all_books(limit_per_book=args.limit) + + print("\n=== All Books Ingestion Summary ===\n") + for book_slug, result in results.items(): + print(f"{book_slug}: {result['status']}") + if result['status'] == 'success': + stats = result['stats'] + print(f" Processed: {stats['processed']}") + print(f" Failed: {stats['failed']}") + else: + print(f" Error: {result['error']}") + print() + + service.close() + return 0 + + except Exception as e: + logger.error( + "script_failed", + error=str(e), + exc_info=True + ) + + print(f"\nIngestion failed: {str(e)}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) +=== ./src/__init__.py === + +=== ./src/utils/__init__.py === + +=== ./src/utils/logger.py === + +=== ./src/utils/retry.py === + +=== ./src/processors/validator.py === + +=== ./src/processors/arabic_normalizer.py === + +=== ./src/processors/__init__.py === + +=== ./src/processors/text_cleaner.py === +""" +Text cleaning and normalization utilities +""" +import re +from typing import Optional +import unicodedata +import structlog + +logger = structlog.get_logger() + + +class ArabicTextProcessor: + """Process and normalize Arabic text""" + + # Arabic diacritics to remove + DIACRITICS = re.compile( + r'[\u064B-\u065F\u0670\u06D6-\u06DC\u06DF-\u06E8\u06EA-\u06ED]' + ) + + # Tatweel (elongation character) + TATWEEL = '\u0640' + + # Normalize Arabic letters + ALEF_VARIANTS = re.compile(r'[إأآا]') + ALEF_MAKSURA = 'ى' + YAA = 'ي' + TAA_MARBUTA = 'ة' + HAA = 'ه' + + @classmethod + def remove_diacritics(cls, text: str) -> str: + """Remove Arabic diacritics (tashkeel)""" + if not text: + return text + return cls.DIACRITICS.sub('', text) + + @classmethod + def remove_tatweel(cls, text: str) -> str: + """Remove tatweel (elongation) character""" + if not text: + return text + return text.replace(cls.TATWEEL, '') + + @classmethod + def normalize_alef(cls, text: str) -> str: + """Normalize all Alef variants to bare Alef""" + if not text: + return text + return cls.ALEF_VARIANTS.sub('ا', text) + + @classmethod + def normalize_yaa(cls, text: str) -> str: + """Normalize Alef Maksura to Yaa""" + if not text: + return text + return text.replace(cls.ALEF_MAKSURA, cls.YAA) + + @classmethod + def normalize_taa_marbuta(cls, text: str) -> str: + """Normalize Taa Marbuta to Haa""" + if not text: + return text + return text.replace(cls.TAA_MARBUTA, cls.HAA) + + @classmethod + def normalize_whitespace(cls, text: str) -> str: + """Normalize whitespace""" + if not text: + return text + # Replace multiple spaces with single space + text = re.sub(r'\s+', ' ', text) + # Trim + return text.strip() + + @classmethod + def normalize_full(cls, text: str) -> str: + """ + Apply full normalization: + - Remove diacritics + - Remove tatweel + - Normalize Alef variants + - Normalize Yaa + - Normalize Taa Marbuta + - Normalize whitespace + """ + if not text: + return text + + text = cls.remove_diacritics(text) + text = cls.remove_tatweel(text) + text = cls.normalize_alef(text) + text = cls.normalize_yaa(text) + text = cls.normalize_taa_marbuta(text) + text = cls.normalize_whitespace(text) + + return text + + @classmethod + def extract_sanad_matn(cls, text: str) -> tuple[Optional[str], Optional[str]]: + """ + Attempt to extract sanad (chain) and matn (text) from hadith + + Common patterns: + - حدثنا ... قال ... (sanad ends before reported speech) + - Simple heuristic: Split on first occurrence of قال or أن + + Returns: + Tuple of (sanad, matn) or (None, None) if cannot split + """ + if not text: + return None, None + + # Look for common sanad-matn separators + separators = [ + r'قال\s*رسول\s*الله', # "The Messenger of Allah said" + r'قال\s*النبي', # "The Prophet said" + r'عن\s*النبي', # "From the Prophet" + r'أن\s*رسول\s*الله', # "That the Messenger of Allah" + ] + + for pattern in separators: + match = re.search(pattern, text, re.IGNORECASE) + if match: + split_pos = match.start() + sanad = text[:split_pos].strip() + matn = text[split_pos:].strip() + + logger.debug( + "sanad_matn_extracted", + sanad_length=len(sanad), + matn_length=len(matn) + ) + + return sanad, matn + + # Could not split + logger.debug("sanad_matn_extraction_failed") + return None, None + + +class TextCleaner: + """General text cleaning utilities""" + + @staticmethod + def clean_html(text: str) -> str: + """Remove HTML tags""" + if not text: + return text + return re.sub(r'<[^>]+>', '', text) + + @staticmethod + def normalize_unicode(text: str) -> str: + """Normalize Unicode (NFC normalization)""" + if not text: + return text + return unicodedata.normalize('NFC', text) + + @staticmethod + def clean_text(text: str) -> str: + """Apply general cleaning""" + if not text: + return text + + # Remove HTML + text = TextCleaner.clean_html(text) + + # Normalize Unicode + text = TextCleaner.normalize_unicode(text) + + # Normalize whitespace + text = ArabicTextProcessor.normalize_whitespace(text) + + return text +=== ./src/api_clients/base_client.py === +""" +Base API client with retry logic and rate limiting +""" +import httpx +import time +from typing import Optional, Dict, Any +from tenacity import ( + retry, + stop_after_attempt, + wait_exponential, + retry_if_exception_type +) +import structlog +from config.settings import settings + +logger = structlog.get_logger() + + +class BaseAPIClient: + """Base class for API clients with built-in retry and rate limiting""" + + def __init__( + self, + base_url: str, + api_key: Optional[str] = None, + rate_limit: int = 90, + timeout: int = 30 + ): + self.base_url = base_url.rstrip('/') + self.api_key = api_key + self.rate_limit = rate_limit + self.timeout = timeout + + # Rate limiting + self.request_times = [] + self.min_interval = 60.0 / rate_limit # seconds between requests + + # HTTP client + self.client = httpx.Client(timeout=timeout) + + logger.info( + "api_client_initialized", + base_url=base_url, + rate_limit=rate_limit + ) + + def _wait_for_rate_limit(self): + """Implement rate limiting""" + now = time.time() + + # Remove old timestamps (older than 1 minute) + self.request_times = [t for t in self.request_times if now - t < 60] + + # If we're at the limit, wait + if len(self.request_times) >= self.rate_limit: + sleep_time = 60 - (now - self.request_times[0]) + if sleep_time > 0: + logger.info( + "rate_limit_wait", + sleep_seconds=sleep_time, + requests_in_window=len(self.request_times) + ) + time.sleep(sleep_time) + self.request_times = [] + + # Add current timestamp + self.request_times.append(time.time()) + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=2, max=10), + retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)), + reraise=True + ) + def _make_request( + self, + method: str, + endpoint: str, + params: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None + ) -> Dict[str, Any]: + """Make HTTP request with retry logic""" + + # Rate limiting + self._wait_for_rate_limit() + + # Prepare headers + request_headers = headers or {} + if self.api_key: + request_headers['X-API-Key'] = self.api_key + + # Make request + url = f"{self.base_url}/{endpoint.lstrip('/')}" + + logger.debug( + "api_request", + method=method, + url=url, + params=params + ) + + response = self.client.request( + method=method, + url=url, + params=params, + headers=request_headers + ) + + response.raise_for_status() + + logger.debug( + "api_response", + status_code=response.status_code, + response_size=len(response.content) + ) + + return response.json() + + def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]: + """Make GET request""" + return self._make_request("GET", endpoint, params=params) + + def close(self): + """Close the HTTP client""" + self.client.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() +=== ./src/api_clients/hadith_one_client.py === + +=== ./src/api_clients/__init__.py === + +=== ./src/api_clients/hadithapi_client.py === +""" +Client for HadithAPI.com API +""" +from typing import List, Dict, Any, Optional, Generator, Tuple +import structlog +from .base_client import BaseAPIClient +from config.settings import settings + +logger = structlog.get_logger() + + +class HadithAPIClient(BaseAPIClient): + """Client for interacting with hadithapi.com API""" + + def __init__(self, api_key: Optional[str] = None): + super().__init__( + base_url="https://hadithapi.com/api", + api_key=api_key or settings.HADITHAPI_KEY, + rate_limit=30 # Conservative: 30 req/min + ) + + def _add_api_key(self, params: Optional[Dict] = None) -> Dict: + """Add API key to request parameters""" + params = params or {} + params['apiKey'] = self.api_key + return params + + def get_books(self) -> List[Dict[str, Any]]: + """ + Get list of all available books/collections + + Returns: + List of book dictionaries + """ + logger.info("fetching_books") + + params = self._add_api_key() + response = self.get("books", params=params) + + if response.get('status') != 200: + logger.error( + "api_error", + status=response.get('status'), + message=response.get('message') + ) + raise Exception(f"API Error: {response.get('message')}") + + books = response.get('books', []) + + + logger.info( + "books_fetched", + count=len(books) + ) + + return books + + def get_chapters(self, book_slug: str) -> List[Dict[str, Any]]: + """ + Get chapters for a specific book + + Args: + book_slug: Book slug identifier (e.g., 'sahih-bukhari') + + Returns: + List of chapter dictionaries + """ + logger.info( + "fetching_chapters", + book_slug=book_slug + ) + + params = self._add_api_key() + response = self.get(f"{book_slug}/chapters", params=params) + + if response.get('status') != 200: + logger.error( + "api_error", + status=response.get('status'), + message=response.get('message') + ) + raise Exception(f"API Error: {response.get('message')}") + + chapters = response.get('chapters', []) + + + logger.info( + "chapters_fetched", + book_slug=book_slug, + count=len(chapters) + ) + + return chapters + + def get_hadiths_page( + self, + book_id: int, + chapter_id: Optional[int] = None, + page: int = 1, + limit: int = 100 + ) -> Dict[str, Any]: + """ + Get a page of hadiths + + Args: + book_id: Book ID + chapter_id: Optional chapter ID to filter by + page: Page number (1-indexed) + limit: Results per page (max 100) + + Returns: + Response dictionary with hadiths and pagination info + """ + params = self._add_api_key({ + 'book': book_id, + 'page': page, + 'limit': min(limit, 100) # Enforce max limit + }) + + if chapter_id: + params['chapter'] = chapter_id + + logger.debug( + "fetching_hadiths_page", + book_id=book_id, + chapter_id=chapter_id, + page=page, + limit=limit + ) + + response = self.get("hadiths", params=params) + # logger.debug( + # "fetching_hadiths_page####", + # response=response + # ) + if response.get('status') != 200: + logger.error( + "api_error", + status=response.get('status'), + message=response.get('message') + ) + raise Exception(f"API Error: {response.get('message')}") + + return response.get('hadiths', {}) + + def iter_all_hadiths_in_book( + self, + book_id: int, + book_slug: str, + chapter_id: Optional[int] = None, + batch_size: int = 100 + ) -> Generator[Dict[str, Any], None, None]: + """ + Iterator that yields all hadiths in a book, handling pagination automatically + + Args: + book_id: Book ID + book_slug: Book slug for logging + chapter_id: Optional chapter ID to filter by + batch_size: Number of hadiths to fetch per request (max 100) + + Yields: + Individual hadith dictionaries + """ + page = 1 + total_fetched = 0 + + while True: + response_data = self.get_hadiths_page( + book_id=book_slug, + chapter_id=chapter_id, + page=page, + limit=batch_size + ) + + hadiths = response_data.get('data', []) + pagination = response_data.get('pagination', {}) + # logger.info( + # "book_complete", + # book_slug=book_slug, + # hadiths=hadiths, + # pagination=pagination, + # response = response_data + # ) + if not hadiths: + logger.info( + "book_complete", + book_slug=book_slug, + chapter_id=chapter_id, + total_hadiths=total_fetched + ) + break + + for hadith in hadiths: + yield hadith + total_fetched += 1 + + # Log progress + if total_fetched % 500 == 0: + logger.info( + "progress", + book_slug=book_slug, + fetched=total_fetched, + total=response_data.get('total', '?') + ) + + # Check if there are more pages + current_page = response_data.get('current_page', page) + last_page = response_data.get('last_page', 1) + + if current_page >= last_page: + logger.info( + "book_complete", + book_slug=book_slug, + total_hadiths=total_fetched, + total_pages=last_page + ) + break + + page += 1 + + def iter_all_hadiths_in_book_with_chapters( + self, + book_id: int, + book_slug: str, + batch_size: int = 100 + ) -> Generator[Tuple[Dict[str, Any], Optional[Dict[str, Any]]], None, None]: + """ + Iterator that yields all hadiths in a book, organized by chapter + + Args: + book_id: Book ID + book_slug: Book slug + batch_size: Number of hadiths to fetch per request + + Yields: + Tuple of (hadith_dict, chapter_dict or None) + """ + # First, get all chapters + try: + chapters = self.get_chapters(book_slug) + except Exception as e: + logger.warning( + "chapters_fetch_failed", + book_slug=book_slug, + error=str(e), + fallback="fetching_all_hadiths_without_chapter_filter" + ) + # Fallback: fetch all hadiths without chapter filter + for hadith in self.iter_all_hadiths_in_book( + book_id=book_id, + book_slug=book_slug, + batch_size=batch_size + ): + chapter_info = hadith.get('chapter') + yield hadith, chapter_info + return + + logger.info( + "starting_chapter_by_chapter_fetch", + book_slug=book_slug, + total_chapters=len(chapters) + ) + + # Process each chapter + for chapter in chapters: + # logger.warning("Processing chapter", chapter=chapter) + if book_slug in {'sahih-muslim','al-tirmidhi','al-silsila-sahiha','abu-dawood','sunan-nasai','ibn-e-majah','mishkat'}: + chapter_id = chapter.get('chapterNumber') + else: + chapter_id = chapter.get('id') + chapter_number = chapter.get('chapterNumber') + print(chapter_id, chapter_number, chapter.get('name')) + logger.info( + "fetching_chapter", + book_slug=book_slug, + chapter_id=chapter_id, + chapter_number=chapter_number + ) + + try: + for hadith in self.iter_all_hadiths_in_book( + book_id=book_id, + book_slug=book_slug, + chapter_id=chapter_id, + batch_size=batch_size + ): + yield hadith, chapter + except Exception as e: + logger.error( + "chapter_fetch_failed", + book_slug=book_slug, + chapter_id=chapter_id, + error=str(e) + ) + continue + + def get_book_by_slug(self, book_slug: str) -> Optional[Dict[str, Any]]: + """ + Get book details by slug + + Args: + book_slug: Book slug identifier + + Returns: + Book dictionary or None if not found + """ + books = self.get_books() + for book in books: + if book.get('bookSlug') == book_slug: + return book + return None +=== ./src/api_clients/sunnah_client.py === +""" +Client for Sunnah.com API +""" +from typing import List, Dict, Any, Optional, Generator +import structlog +from .base_client import BaseAPIClient +from config.settings import settings + +logger = structlog.get_logger() + + +class SunnahAPIClient(BaseAPIClient): + """Client for interacting with Sunnah.com API""" + + def __init__(self, api_key: Optional[str] = None): + super().__init__( + base_url=settings.SUNNAH_BASE_URL, + api_key=api_key or settings.SUNNAH_API_KEY, + rate_limit=settings.API_RATE_LIMIT + ) + + def get_collections(self) -> List[Dict[str, Any]]: + """ + Get list of all hadith collections + + Returns: + List of collection dictionaries + """ + logger.info("fetching_collections") + + response = self.get("collections") + collections = response.get("data", []) + + logger.info( + "collections_fetched", + count=len(collections) + ) + + return collections + + def get_collection_details(self, collection_name: str) -> Dict[str, Any]: + """ + Get details for a specific collection + + Args: + collection_name: Collection abbreviation (e.g., 'bukhari') + + Returns: + Collection details dictionary + """ + logger.info( + "fetching_collection_details", + collection=collection_name + ) + + response = self.get(f"collections/{collection_name}") + + return response + + def get_books(self, collection_name: str) -> List[Dict[str, Any]]: + """ + Get all books in a collection + + Args: + collection_name: Collection abbreviation + + Returns: + List of book dictionaries + """ + logger.info( + "fetching_books", + collection=collection_name + ) + + response = self.get(f"collections/{collection_name}/books") + books = response.get("data", []) + + logger.info( + "books_fetched", + collection=collection_name, + count=len(books) + ) + + return books + + def get_hadiths_in_book( + self, + collection_name: str, + book_number: int, + limit: int = 50, + page: int = 1 + ) -> Dict[str, Any]: + """ + Get hadiths in a specific book with pagination + + Args: + collection_name: Collection abbreviation + book_number: Book number + limit: Number of hadiths per page + page: Page number + + Returns: + Response with hadiths and pagination info + """ + logger.debug( + "fetching_hadiths", + collection=collection_name, + book=book_number, + page=page, + limit=limit + ) + + response = self.get( + f"collections/{collection_name}/books/{book_number}/hadiths", + params={"limit": limit, "page": page} + ) + + return response + + def iter_all_hadiths_in_book( + self, + collection_name: str, + book_number: int, + batch_size: int = 50 + ) -> Generator[Dict[str, Any], None, None]: + """ + Iterator that yields all hadiths in a book, handling pagination automatically + + Args: + collection_name: Collection abbreviation + book_number: Book number + batch_size: Number of hadiths to fetch per request + + Yields: + Individual hadith dictionaries + """ + page = 1 + total_fetched = 0 + + while True: + response = self.get_hadiths_in_book( + collection_name=collection_name, + book_number=book_number, + limit=batch_size, + page=page + ) + + hadiths = response.get("data", []) + + if not hadiths: + logger.info( + "book_complete", + collection=collection_name, + book=book_number, + total_hadiths=total_fetched + ) + break + + for hadith in hadiths: + yield hadith + total_fetched += 1 + + # Check if there are more pages + pagination = response.get("pagination", {}) + if page >= pagination.get("total_pages", 1): + break + + page += 1 + + def iter_all_hadiths_in_collection( + self, + collection_name: str, + batch_size: int = 50 + ) -> Generator[tuple[Dict[str, Any], int], None, None]: + """ + Iterator that yields all hadiths in a collection + + Args: + collection_name: Collection abbreviation + batch_size: Number of hadiths to fetch per request + + Yields: + Tuple of (hadith_dict, book_number) + """ + # First, get all books in the collection + books = self.get_books(collection_name) + + logger.info( + "starting_collection_fetch", + collection=collection_name, + total_books=len(books) + ) + + for book in books: + book_number = book.get("bookNumber") + + if not book_number: + logger.warning( + "book_missing_number", + book=book + ) + continue + + logger.info( + "fetching_book", + collection=collection_name, + book=book_number + ) + + try: + for hadith in self.iter_all_hadiths_in_book( + collection_name=collection_name, + book_number=int(book_number), + batch_size=batch_size + ): + yield hadith, int(book_number) + except Exception as e: + logger.error( + "book_fetch_failed", + collection=collection_name, + book=book_number, + error=str(e) + ) + continue + + def get_specific_hadith( + self, + collection_name: str, + book_number: int, + hadith_number: int + ) -> Dict[str, Any]: + """ + Get a specific hadith by its number + + Args: + collection_name: Collection abbreviation + book_number: Book number + hadith_number: Hadith number + + Returns: + Hadith dictionary + """ + response = self.get( + f"hadiths/collection/{collection_name}/{book_number}/{hadith_number}" + ) + + return response.get("data", {}) +=== ./src/main.py === +""" +Main ingestion script for fetching hadiths from Sunnah.com API +""" +import sys +import argparse +from typing import Optional +from uuid import UUID +import structlog +from config.settings import settings +from api_clients.sunnah_client import SunnahAPIClient +from database.repository import HadithRepository +from processors.text_cleaner import ArabicTextProcessor, TextCleaner + +# Configure structured logging +structlog.configure( + processors=[ + structlog.stdlib.filter_by_level, + structlog.stdlib.add_logger_name, + structlog.stdlib.add_log_level, + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.JSONRenderer() + ], + wrapper_class=structlog.stdlib.BoundLogger, + context_class=dict, + logger_factory=structlog.stdlib.LoggerFactory(), + cache_logger_on_first_use=True, +) + +logger = structlog.get_logger() + + +class HadithIngestionService: + """Service for ingesting hadiths from Sunnah.com API""" + + def __init__(self): + self.api_client = SunnahAPIClient() + self.repo = HadithRepository() + self.text_processor = ArabicTextProcessor() + self.text_cleaner = TextCleaner() + + def ingest_collection( + self, + collection_abbr: str, + limit: Optional[int] = None + ) -> dict: + """ + Ingest entire collection from Sunnah.com API + + Args: + collection_abbr: Collection abbreviation (e.g., 'bukhari') + limit: Optional limit on number of hadiths to ingest + + Returns: + Statistics dictionary + """ + logger.info( + "ingestion_started", + collection=collection_abbr, + limit=limit + ) + + # Get collection from database + collection = self.repo.get_collection_by_abbreviation(collection_abbr) + if not collection: + logger.error( + "collection_not_found", + collection=collection_abbr + ) + raise ValueError(f"Collection '{collection_abbr}' not found in database") + + collection_id = UUID(collection['id']) + + # Create ingestion job + job_id = self.repo.create_ingestion_job( + job_name=f"ingest_{collection_abbr}", + job_type="api_fetch", + source_name="sunnah.com", + config={"collection": collection_abbr, "limit": limit} + ) + + logger.info( + "job_created", + job_id=str(job_id), + collection=collection_abbr + ) + + stats = { + "processed": 0, + "failed": 0, + "skipped": 0 + } + + try: + # Iterate through all hadiths in collection + for hadith_data, book_number in self.api_client.iter_all_hadiths_in_collection( + collection_name=collection_abbr, + batch_size=50 + ): + # Check limit + if limit and stats["processed"] >= limit: + logger.info("limit_reached", limit=limit) + break + + try: + # Process and store hadith + self._process_and_store_hadith( + collection_id=collection_id, + hadith_data=hadith_data, + book_number=book_number + ) + + stats["processed"] += 1 + + # Update job progress every 100 hadiths + if stats["processed"] % 100 == 0: + self.repo.update_job_progress( + job_id=job_id, + processed=stats["processed"], + failed=stats["failed"], + skipped=stats["skipped"] + ) + + logger.info( + "progress_update", + processed=stats["processed"], + failed=stats["failed"] + ) + + except Exception as e: + stats["failed"] += 1 + logger.error( + "hadith_processing_failed", + error=str(e), + hadith_number=hadith_data.get("hadithNumber") + ) + + self.repo.add_processing_log( + job_id=job_id, + level="ERROR", + message=f"Failed to process hadith: {str(e)}", + details={"hadith_data": hadith_data} + ) + + # Update final job progress + self.repo.update_job_progress( + job_id=job_id, + total=stats["processed"] + stats["failed"] + stats["skipped"], + processed=stats["processed"], + failed=stats["failed"], + skipped=stats["skipped"] + ) + + # Mark job as complete + self.repo.complete_job(job_id=job_id, status="success") + + # Update collection count + self.repo.update_collection_count( + collection_id=collection_id, + count=stats["processed"] + ) + + logger.info( + "ingestion_completed", + collection=collection_abbr, + stats=stats + ) + + return stats + + except Exception as e: + logger.error( + "ingestion_failed", + collection=collection_abbr, + error=str(e) + ) + + self.repo.complete_job( + job_id=job_id, + status="failed", + error_message=str(e) + ) + + raise + + def _process_and_store_hadith( + self, + collection_id: UUID, + hadith_data: dict, + book_number: int + ): + """Process and store a single hadith""" + + # Extract hadith number + hadith_number = hadith_data.get("hadithNumber") + if not hadith_number: + raise ValueError("Missing hadith number") + + # Extract text in multiple languages + hadith_texts = hadith_data.get("hadith", []) + + arabic_text = None + english_text = None + urdu_text = None + grade = None + grade_source = None + chapter_name = None + + for text_entry in hadith_texts: + lang = text_entry.get("lang", "").lower() + body = text_entry.get("body") + + if not body: + continue + + # Clean text + body = self.text_cleaner.clean_text(body) + + if lang == "ar": + arabic_text = body + chapter_name = text_entry.get("chapterTitle") + + # Extract grade from Arabic entry + grades = text_entry.get("grades", []) + if grades: + grade = grades[0].get("grade") + grade_source = grades[0].get("name") + + elif lang == "en": + english_text = body + + # Extract grade from English entry if not found + if not grade: + grades = text_entry.get("grades", []) + if grades: + grade = grades[0].get("grade") + grade_source = grades[0].get("name") + + elif lang == "ur": + urdu_text = body + + if not arabic_text: + raise ValueError("Missing Arabic text") + + # Get or create book + book = self.repo.get_book(collection_id, book_number) + if not book: + # Extract book name from hadith data + book_name_en = None + book_name_ar = None + + for text_entry in hadith_texts: + lang = text_entry.get("lang", "").lower() + book_data = text_entry.get("book", [{}])[0] if text_entry.get("book") else {} + + if lang == "en" and book_data.get("name"): + book_name_en = book_data.get("name") + elif lang == "ar" and book_data.get("name"): + book_name_ar = book_data.get("name") + + book_id = self.repo.upsert_book( + collection_id=collection_id, + book_number=book_number, + name_english=book_name_en, + name_arabic=book_name_ar + ) + else: + book_id = UUID(book["id"]) + + # Store hadith + hadith_id = self.repo.upsert_hadith( + collection_id=collection_id, + book_id=book_id, + hadith_number=int(hadith_number), + arabic_text=arabic_text, + english_text=english_text, + urdu_text=urdu_text, + grade=grade, + grade_source=grade_source, + chapter_name=chapter_name, + source_id=str(hadith_data.get("id", "")), + source_url=hadith_data.get("reference", {}).get("link"), + source_metadata=hadith_data + ) + + logger.debug( + "hadith_stored", + hadith_id=str(hadith_id), + hadith_number=hadith_number, + book_number=book_number + ) + + def close(self): + """Close connections""" + self.api_client.close() + + +def main(): + """Main entry point""" + parser = argparse.ArgumentParser(description="Ingest hadiths from Sunnah.com API") + parser.add_argument( + "collection", + help="Collection abbreviation (e.g., bukhari, muslim)" + ) + parser.add_argument( + "--limit", + type=int, + help="Limit number of hadiths to ingest" + ) + + args = parser.parse_args() + + logger.info( + "script_started", + collection=args.collection, + limit=args.limit + ) + + try: + service = HadithIngestionService() + stats = service.ingest_collection( + collection_abbr=args.collection, + limit=args.limit + ) + + logger.info( + "script_completed", + stats=stats + ) + + print(f"\nIngestion completed successfully!") + print(f"Processed: {stats['processed']}") + print(f"Failed: {stats['failed']}") + print(f"Skipped: {stats['skipped']}") + + service.close() + + return 0 + + except Exception as e: + logger.error( + "script_failed", + error=str(e), + exc_info=True + ) + + print(f"\nIngestion failed: {str(e)}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/hadith-ingestion/scripts/benchmark_embeddings.py b/hadith-ingestion/scripts/benchmark_embeddings.py new file mode 100755 index 0000000..873a1ad --- /dev/null +++ b/hadith-ingestion/scripts/benchmark_embeddings.py @@ -0,0 +1,98 @@ +# Save as benchmark_embeddings.py +import requests +import time +import statistics +from typing import List + +TEI_URL = "http://tei.ml.svc.cluster.local" +QDRANT_URL = "http://qdrant.vector.svc.cluster.local:6333" + +def benchmark_tei(num_requests: int = 100) -> Dict: + """Benchmark TEI embedding generation""" + test_text = "This is a test hadith about prayer and fasting" + times = [] + + print(f"Benchmarking TEI ({num_requests} requests)...") + for i in range(num_requests): + start = time.time() + response = requests.post( + f"{TEI_URL}/embed", + json={"inputs": test_text} + ) + times.append(time.time() - start) + + if (i + 1) % 10 == 0: + print(f" Progress: {i + 1}/{num_requests}") + + return { + 'mean': statistics.mean(times), + 'median': statistics.median(times), + 'min': min(times), + 'max': max(times), + 'stdev': statistics.stdev(times) if len(times) > 1 else 0 + } + +def benchmark_qdrant(num_queries: int = 100) -> Dict: + """Benchmark Qdrant search""" + # Get a sample embedding first + response = requests.post( + f"{TEI_URL}/embed", + json={"inputs": "test query"} + ) + query_vector = response.json()[0] + + times = [] + + print(f"\nBenchmarking Qdrant ({num_queries} searches)...") + for i in range(num_queries): + start = time.time() + response = requests.post( + f"{QDRANT_URL}/collections/hadith_embeddings/points/search", + json={ + "vector": query_vector, + "limit": 10 + } + ) + times.append(time.time() - start) + + if (i + 1) % 10 == 0: + print(f" Progress: {i + 1}/{num_queries}") + + return { + 'mean': statistics.mean(times), + 'median': statistics.median(times), + 'min': min(times), + 'max': max(times), + 'stdev': statistics.stdev(times) if len(times) > 1 else 0 + } + +# Run benchmarks +print("=== PERFORMANCE BENCHMARK ===\n") + +tei_stats = benchmark_tei(100) +qdrant_stats = benchmark_qdrant(100) + +print("\n" + "="*80) +print("RESULTS") +print("="*80) + +print("\nTEI Embedding Generation (per request):") +print(f" Mean: {tei_stats['mean']*1000:.2f}ms") +print(f" Median: {tei_stats['median']*1000:.2f}ms") +print(f" Min: {tei_stats['min']*1000:.2f}ms") +print(f" Max: {tei_stats['max']*1000:.2f}ms") +print(f" StdDev: {tei_stats['stdev']*1000:.2f}ms") + +print("\nQdrant Vector Search (per query):") +print(f" Mean: {qdrant_stats['mean']*1000:.2f}ms") +print(f" Median: {qdrant_stats['median']*1000:.2f}ms") +print(f" Min: {qdrant_stats['min']*1000:.2f}ms") +print(f" Max: {qdrant_stats['max']*1000:.2f}ms") +print(f" StdDev: {qdrant_stats['stdev']*1000:.2f}ms") + +# Expected performance targets +print("\n" + "="*80) +print("PERFORMANCE TARGETS") +print("="*80) +print("TEI Embedding: < 50ms (good), < 100ms (acceptable)") +print("Qdrant Search: < 20ms (good), < 50ms (acceptable)") \ No newline at end of file diff --git a/hadith-ingestion/scripts/generate_embeddings.py b/hadith-ingestion/scripts/generate_embeddings.py new file mode 100755 index 0000000..c266302 --- /dev/null +++ b/hadith-ingestion/scripts/generate_embeddings.py @@ -0,0 +1,324 @@ +#!/usr/bin/env python3 +""" +Generate embeddings for all hadiths and store in Qdrant +Updated with SSL verification disabled +""" +import requests +import psycopg2 +from psycopg2.extras import execute_values + +import time +import urllib3 +from typing import List, Dict, Tuple, Optional +import logging +from datetime import datetime +# import uuid + +# Disable SSL warnings +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +# Configuration +TEI_URL = "https://embeddings.betelgeusebytes.io" +QDRANT_URL = "https://vector.betelgeusebytes.io" +DB_CONFIG = { + 'host': 'pg.betelgeusebytes.io', + 'port': 5432, + 'dbname': 'hadith_db', + 'user': 'hadith_ingest', + 'password': 'hadith_ingest' # UPDATE THIS +} + +# BATCH_SIZE = 8 # Process 32 hadiths at a time +BATCH_SIZE = 32 # Process 32 hadiths at a time +MAX_TEXT_LENGTH = 1500 # Truncate individual texts to avoid issues +COLLECTION_NAME = "hadith_embeddings" +VERIFY_SSL = False # Ignore SSL certificate verification + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +def get_single_embedding(text: str, hadith_id: str, max_retries: int = 3) -> Optional[List[float]]: + """Get embedding for a single text with retries and length reduction""" + current_text = text + + for attempt in range(max_retries): + try: + response = requests.post( + f"{TEI_URL}/embed", + json={"inputs": [current_text]}, + timeout=60, + verify=VERIFY_SSL + ) + response.raise_for_status() + return response.json()[0] + except requests.exceptions.HTTPError as e: + if e.response.status_code == 413: + # Text still too large, reduce by 50% + new_length = len(current_text) // 2 + logger.warning(f"Hadith {hadith_id}: Text too large ({len(current_text)} chars), reducing to {new_length}") + current_text = current_text[:new_length] + "..." + if new_length < 100: # Don't go below 100 chars + logger.error(f"Hadith {hadith_id}: Cannot reduce text further") + return None + else: + logger.error(f"Hadith {hadith_id}: HTTP error {e.response.status_code}") + return None + except Exception as e: + logger.error(f"Hadith {hadith_id}: Error getting embedding: {e}") + if attempt < max_retries - 1: + time.sleep(1) + else: + return None + + return None + + +def get_embeddings_batch(texts: List[str], hadith_ids: List[str]) -> List[Optional[List[float]]]: + """Get embeddings for a batch of texts, fall back to individual if needed""" + try: + response = requests.post( + f"{TEI_URL}/embed", + json={"inputs": texts}, + timeout=60, + verify=VERIFY_SSL + ) + response.raise_for_status() + return response.json() + except requests.exceptions.HTTPError as e: + if e.response.status_code == 413 and len(texts) > 1: + # Batch too large, try individually + logger.warning(f"Batch too large, processing {len(texts)} texts individually...") + embeddings = [] + for text, hid in zip(texts, hadith_ids): + embedding = get_single_embedding(text, hid) + embeddings.append(embedding) + time.sleep(0.1) + return embeddings + else: + logger.error(f"Error getting embeddings: {e}") + return [None] * len(texts) + except Exception as e: + logger.error(f"Error getting embeddings: {e}") + return [None] * len(texts) + + +def upsert_to_qdrant(points: List[Dict]) -> bool: + """Upsert points to Qdrant""" + if not points: # Skip if no valid points + return True + + try: + response = requests.put( + f"{QDRANT_URL}/collections/{COLLECTION_NAME}/points", + json={"points": points}, + timeout=30, + verify=VERIFY_SSL + ) + response.raise_for_status() + return True + except Exception as e: + logger.error(f"Error upserting to Qdrant: {e}") + return False + + +def mark_embeddings_generated(conn, hadith_ids: List[str], failed_ids: List[str] = None) -> bool: + """Mark hadiths as having embeddings generated""" + try: + cur = conn.cursor() + + # Mark successful ones + if hadith_ids: + cur.execute(""" + UPDATE hadiths + SET embedding_generated = TRUE, + updated_at = CURRENT_TIMESTAMP + WHERE id IN ( + SELECT UNNEST(%s::text[])::uuid + ) + """, (hadith_ids,)) + + # Mark failed ones (so we can skip them in future runs) + if failed_ids: + cur.execute(""" + UPDATE hadiths + SET embedding_generated = TRUE, + updated_at = CURRENT_TIMESTAMP + WHERE id IN ( + SELECT UNNEST(%s::text[])::uuid + ) + """, (failed_ids,)) + logger.warning(f"Marked {len(failed_ids)} failed hadiths as processed to skip them") + + conn.commit() + cur.close() + return True + except Exception as e: + logger.error(f"Error updating database: {e}") + conn.rollback() + return False + + +def fetch_hadiths_batch(cur, offset: int, limit: int) -> List[Tuple]: + """Fetch a batch of hadiths without embeddings""" + cur.execute(""" + SELECT id, arabic_text, english_text, urdu_text, + collection_id, hadith_number + FROM hadiths + WHERE embedding_generated = FALSE + ORDER BY id + LIMIT %s OFFSET %s + """, (limit, offset)) + return cur.fetchall() + + +def create_combined_text(hadith: Tuple, max_length: int = MAX_TEXT_LENGTH) -> str: + """Create combined text for embedding, truncate if needed""" + id, arabic, english, urdu, coll_id, num = hadith + + parts = [] + if arabic: + parts.append(arabic) + if english: + parts.append(english) + + combined = " ".join(parts) if parts else "No text available" + + if len(combined) > max_length: + combined = combined[:max_length] + "..." + + return combined + + +def main(): + start_time = datetime.now() + logger.info("=" * 80) + logger.info("HADITH EMBEDDING GENERATION") + logger.info(f"Started at: {start_time}") + logger.info(f"Batch size: {BATCH_SIZE}") + logger.info(f"Max text length: {MAX_TEXT_LENGTH}") + logger.info(f"SSL Verification: {VERIFY_SSL}") + logger.info("=" * 80) + + logger.info("Connecting to database...") + conn = psycopg2.connect(**DB_CONFIG) + cur = conn.cursor() + + cur.execute("SELECT COUNT(*) FROM hadiths WHERE embedding_generated = FALSE") + total_hadiths = cur.fetchone()[0] + logger.info(f"Total hadiths to process: {total_hadiths:,}") + + if total_hadiths == 0: + logger.info("No hadiths to process!") + return + + estimated_time_mins = (total_hadiths / BATCH_SIZE) * 3 / 60 + logger.info(f"Estimated time: {estimated_time_mins:.1f} minutes") + logger.info("=" * 80) + + offset = 0 + processed = 0 + failed = 0 + skipped = 0 + last_report_pct = 0 + + while offset < total_hadiths: + batch_start = time.time() + + hadiths = fetch_hadiths_batch(cur, offset, BATCH_SIZE) + + if not hadiths: + break + + texts = [create_combined_text(h) for h in hadiths] + hadith_ids = [str(h[0]) for h in hadiths] + + try: + embeddings = get_embeddings_batch(texts, hadith_ids) + + # Separate successful and failed embeddings + successful_points = [] + successful_ids = [] + failed_ids = [] + + for i, (hadith_id, embedding) in enumerate(zip(hadith_ids, embeddings)): + if embedding is not None: + hadith = hadiths[i] + successful_points.append({ + "id": hadith_id, + "vector": embedding, + "payload": { + "collection_id": str(hadith[4]), + "hadith_number": hadith[5], + "has_arabic": bool(hadith[1]), + "has_english": bool(hadith[2]), + "has_urdu": bool(hadith[3]) + } + }) + successful_ids.append(hadith_id) + else: + failed_ids.append(hadith_id) + skipped += 1 + + # Process successful ones + if successful_points: + if upsert_to_qdrant(successful_points): + if mark_embeddings_generated(conn, successful_ids, failed_ids): + processed += len(successful_ids) + failed += len(failed_ids) + else: + logger.error(f"Failed to update database for batch at offset {offset}") + failed += len(hadiths) + else: + logger.error(f"Failed to upsert to Qdrant for batch at offset {offset}") + failed += len(hadiths) + else: + # All failed in this batch + mark_embeddings_generated(conn, [], failed_ids) + failed += len(failed_ids) + + progress_pct = ((processed + failed) / total_hadiths) * 100 + if progress_pct - last_report_pct >= 2: + batch_time = time.time() - batch_start + elapsed = (datetime.now() - start_time).total_seconds() / 60 + rate = (processed + failed) / elapsed if elapsed > 0 else 0 + remaining = (total_hadiths - processed - failed) / rate if rate > 0 else 0 + + logger.info( + f"Progress: {processed:,}/{total_hadiths:,} ({progress_pct:.1f}%) | " + f"Failed: {failed} | Skipped: {skipped} | Rate: {rate:.0f}/min | " + f"ETA: {remaining:.1f}min | Batch: {batch_time:.2f}s" + ) + last_report_pct = progress_pct + + time.sleep(0.2) + + except Exception as e: + logger.error(f"Error processing batch at offset {offset}: {e}") + failed += len(hadiths) + + offset += BATCH_SIZE + + cur.close() + conn.close() + + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() / 60 + + logger.info("=" * 80) + logger.info("EMBEDDING GENERATION COMPLETE") + logger.info(f"Started: {start_time}") + logger.info(f"Finished: {end_time}") + logger.info(f"Duration: {duration:.1f} minutes") + logger.info(f"Total hadiths: {total_hadiths:,}") + logger.info(f"Successfully processed: {processed:,}") + logger.info(f"Failed/Skipped: {failed} ({skipped} too long)") + logger.info(f"Success rate: {100 * processed / total_hadiths:.2f}%") + logger.info("=" * 80) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/hadith-ingestion/scripts/monitor_progress.sh b/hadith-ingestion/scripts/monitor_progress.sh new file mode 100755 index 0000000..1f39fc4 --- /dev/null +++ b/hadith-ingestion/scripts/monitor_progress.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +echo "=== EMBEDDING GENERATION MONITOR ===" +echo "" + +while true; do + clear + echo "=== EMBEDDING GENERATION PROGRESS ===" + date + echo "" + + # Database stats + echo "📊 Database Progress:" + PGPASSWORD=hadith_ingest psql -h pg.betelgeusebytes.io -U hadith_ingest -d hadith_db -t -c " + SELECT + 'Done: ' || COUNT(*) FILTER (WHERE embedding_generated = TRUE) || + ' | Remaining: ' || COUNT(*) FILTER (WHERE embedding_generated = FALSE) || + ' | Progress: ' || ROUND(100.0 * COUNT(*) FILTER (WHERE embedding_generated = TRUE) / COUNT(*), 2) || '%' + FROM hadiths;" 2>/dev/null || echo " (Database connection failed)" + + echo "" + echo "🔢 Qdrant Collection:" + QDRANT_COUNT=$(curl -k -s https://vector.betelgeusebytes.io/collections/hadith_embeddings 2>/dev/null | jq -r '.result.points_count // "N/A"') + echo " Points: $QDRANT_COUNT" + + echo "" + echo "⏰ Next update in 5 minutes... (Ctrl+C to stop)" + sleep 300 +done \ No newline at end of file diff --git a/hadith-ingestion/scripts/setup_embeddings.sh b/hadith-ingestion/scripts/setup_embeddings.sh new file mode 100755 index 0000000..cc4f411 --- /dev/null +++ b/hadith-ingestion/scripts/setup_embeddings.sh @@ -0,0 +1,69 @@ +#!/bin/bash +set -e + +echo "=== EMBEDDING PIPELINE SETUP ===" + +# 1. Check TEI service +echo -e "\n1. Checking TEI service..." +kubectl -n ml get pods -l app=tei +TEI_STATUS=$? + +if [ $TEI_STATUS -ne 0 ]; then + echo "❌ TEI pods not found!" + exit 1 +fi + +# 2. Test TEI endpoint +echo -e "\n2. Testing TEI endpoint..." +TEI_TEST=$(curl -k -X POST https://embeddings.betelgeusebytes.io/embed \ + -H "Content-Type: application/json" \ + -d '{"inputs": "test"}' | jq -r 'type') + +if [ "$TEI_TEST" != "array" ]; then + echo "❌ TEI not responding correctly!" + echo "Response: $TEI_TEST" + exit 1 +fi +echo "✅ TEI is working" + +# 3. Check Qdrant +echo -e "\n3. Checking Qdrant..." +QDRANT_STATUS=$(curl -s -k https://vector.betelgeusebytes.io/collections | jq -r '.status') + +if [ "$QDRANT_STATUS" != "ok" ]; then + echo "❌ Qdrant not responding!" + exit 1 +fi +echo "✅ Qdrant is working" + +# 4. Check if collection exists +echo -e "\n4. Checking hadith_embeddings collection..." +COLLECTION_EXISTS=$(curl -s -k https://vector.betelgeusebytes.io/collections/hadith_embeddings | jq -r '.status // "missing"') + +if [ "$COLLECTION_EXISTS" == "missing" ]; then + echo "📝 Creating hadith_embeddings collection..." + curl -X -k PUT https://vector.betelgeusebytes.io/collections/hadith_embeddings \ + -H "Content-Type: application/json" \ + -d '{ + "vectors": { + "size": 1024, + "distance": "Cosine" + }, + "optimizers_config": { + "indexing_threshold": 10000 + } + }' + echo -e "\n✅ Collection created" +else + echo "✅ Collection exists" + POINT_COUNT=$(curl -s -k https://vector.betelgeusebytes.io/collections/hadith_embeddings | jq -r '.result.points_count') + echo " Current points: $POINT_COUNT" +fi + +# 5. Check database +echo -e "\n5. Checking database..." +HADITH_COUNT=$(psql -h pg.betelgeusebytes.io -U hadith_ingest -d hadith_db -t -c "SELECT COUNT(*) FROM hadiths;") +echo " Total hadiths: $HADITH_COUNT" + +echo -e "\n=== SETUP COMPLETE ✅ ===" +echo -e "\nReady to generate embeddings!" \ No newline at end of file diff --git a/hadith-ingestion/scripts/test_semantic_search.py b/hadith-ingestion/scripts/test_semantic_search.py new file mode 100755 index 0000000..2b1937f --- /dev/null +++ b/hadith-ingestion/scripts/test_semantic_search.py @@ -0,0 +1,93 @@ +# Save as test_semantic_search.py +import requests +import psycopg2 +from typing import List, Dict + +# Configuration +TEI_URL = "http://tei.ml.svc.cluster.local" +QDRANT_URL = "http://qdrant.vector.svc.cluster.local:6333" +DB_CONFIG = { + 'host': 'pg.betelgeusebytes.io', + 'port': 5432, + 'dbname': 'hadith_db', + 'user': 'hadith_ingest', + 'password': 'your_password' # Update this +} + +def get_embedding(text: str) -> List[float]: + """Get embedding from TEI service""" + response = requests.post( + f"{TEI_URL}/embed", + json={"inputs": text} + ) + return response.json()[0] + +def search_similar_hadiths(query: str, limit: int = 5) -> List[Dict]: + """Search for similar hadiths using semantic search""" + # Get query embedding + query_embedding = get_embedding(query) + + # Search in Qdrant + response = requests.post( + f"{QDRANT_URL}/collections/hadith_embeddings/points/search", + json={ + "vector": query_embedding, + "limit": limit, + "with_payload": True + } + ) + + results = response.json()['result'] + + # Get full hadith details from database + conn = psycopg2.connect(**DB_CONFIG) + cur = conn.cursor() + + hadith_ids = [r['id'] for r in results] + cur.execute(""" + SELECT h.id, h.hadith_number, c.name as collection, + h.arabic_text, h.english_text, h.grade + FROM hadiths h + JOIN collections c ON h.collection_id = c.id + WHERE h.id = ANY(%s) + """, (hadith_ids,)) + + hadiths = {row[0]: dict(zip(['id', 'number', 'collection', 'arabic', 'english', 'grade'], row)) + for row in cur.fetchall()} + + cur.close() + conn.close() + + # Combine results + return [ + { + **hadiths[r['id']], + 'similarity_score': r['score'] + } + for r in results if r['id'] in hadiths + ] + +# Test queries +test_queries = [ + "prayer times and importance", + "fasting in Ramadan", + "charity and helping the poor", + "truthfulness and honesty", + "parents and their rights" +] + +print("=== SEMANTIC SEARCH TEST ===\n") + +for query in test_queries: + print(f"\nQuery: '{query}'") + print("-" * 80) + + results = search_similar_hadiths(query, limit=3) + + for i, hadith in enumerate(results, 1): + print(f"\n{i}. [{hadith['collection']} #{hadith['number']}] (Score: {hadith['similarity_score']:.4f})") + print(f" Grade: {hadith['grade']}") + print(f" English: {hadith['english'][:200]}...") + print(f" Arabic: {hadith['arabic'][:100]}...") + +print("\n=== TEST COMPLETE ===") \ No newline at end of file diff --git a/hadith-ingestion/scripts/verify_embeddings.sh b/hadith-ingestion/scripts/verify_embeddings.sh new file mode 100755 index 0000000..1664d2e --- /dev/null +++ b/hadith-ingestion/scripts/verify_embeddings.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +echo "=== EMBEDDING VERIFICATION ===" +echo "" + +# 1. Database check +echo "1. Checking database..." +psql -h pg.betelgeusebytes.io -U hadith_ingest -d hadith_db -t -c " +SELECT + 'Total hadiths: ' || COUNT(*) || + ' | With embeddings: ' || COUNT(*) FILTER (WHERE embedding_generated = TRUE) || + ' | Missing: ' || COUNT(*) FILTER (WHERE embedding_generated = FALSE) +FROM hadiths;" + +# 2. Qdrant check +echo "" +echo "2. Checking Qdrant..." +QDRANT_COUNT=$(curl -k -s https://vector.betelgeusebytes.io/collections/hadith_embeddings | jq -r '.result.points_count') +echo "Qdrant vectors: $QDRANT_COUNT" + +# 3. Collection info +echo "" +echo "3. Collection details..." +curl -k -s https://vector.betelgeusebytes.io/collections/hadith_embeddings | jq '.result | {points_count, vectors_count, status, optimizer_status}' + +echo "" +echo "=== VERIFICATION COMPLETE ===" \ No newline at end of file