From cf02b918b87012e299853e29080d3285bef2ab99 Mon Sep 17 00:00:00 2001 From: salahangal Date: Sun, 23 Nov 2025 20:00:53 +0100 Subject: [PATCH] update mishkat ingestion --- hadith-ingestion/combined.txt | 3229 ----------------- hadith-ingestion/run-full-ingestion.sh | 7 +- .../src/api_clients/hadithapi_client.py | 2 +- 3 files changed, 5 insertions(+), 3233 deletions(-) delete mode 100644 hadith-ingestion/combined.txt diff --git a/hadith-ingestion/combined.txt b/hadith-ingestion/combined.txt deleted file mode 100644 index 7018741..0000000 --- a/hadith-ingestion/combined.txt +++ /dev/null @@ -1,3229 +0,0 @@ -=== ./run-full-ingestion.sh === -#!/bin/bash -# run-full-ingestion.sh - -set -e - -echo "=== Starting Full HadithAPI Ingestion ===" - -# Book slug to collection abbreviation mapping -# Books to ingest (in order) -BOOKS=( - # "sahih-bukhari" - "sahih-muslim" - "abu-dawood" - "al-tirmidhi" - "ibn-e-majah" - "sunan-nasai" - "musnad-ahmad" - "al-silsila-sahiha" -) - -for BOOK in "${BOOKS[@]}"; do - echo -e "\n=========================================" - echo "Ingesting: $BOOK" - echo "=========================================" - - argo submit -n ml argo/workflows/ingest-hadithapi.yaml \ - --parameter book-slug=$BOOK \ - --parameter limit=0 \ - --wait \ - --log - - echo "$BOOK completed!" - - # Optional: add delay between books - sleep 10 -done - -echo -e "\n=== All Books Ingestion Complete ===" - -# Print summary -kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c " -SELECT - c.name_english, - c.abbreviation, - COUNT(h.id) as hadith_count, - COUNT(DISTINCT b.id) as chapter_count -FROM collections c -LEFT JOIN hadiths h ON c.id = h.collection_id -LEFT JOIN books b ON h.book_id = b.id -GROUP BY c.name_english, c.abbreviation -ORDER BY hadith_count DESC; -" -=== ./create-secrets.sh === -#!/bin/bash -# create-secrets.sh - -# Database secret -kubectl -n ml create secret generic hadith-db-secret \ - --from-literal=password='hadith_ingest' \ - --dry-run=client -o yaml | kubectl apply -f - - -# HadithAPI secret (already public, but for consistency) -kubectl -n ml create secret generic hadithapi-secret \ - --from-literal=api-key='$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK' \ - --dry-run=client -o yaml | kubectl apply -f - - -# MinIO secret -kubectl -n ml create secret generic minio-secret \ - --from-literal=access-key='minioadmin' \ - --from-literal=secret-key='minioadmin' \ - --dry-run=client -o yaml | kubectl apply -f - - -echo "Secrets created successfully" -=== ./full-ingestion.sh === -#!/bin/bash -# run-full-ingestion.sh - -set -e - -echo "=== Starting Full HadithAPI Ingestion ===" - -# Books to ingest (in order) -BOOKS=( - "sahih-bukhari" - "sahih-muslim" - "sunan-abu-dawood" - "jami-at-tirmidhi" - "sunan-an-nasai" - "sunan-ibn-e-majah" -) - -for BOOK in "${BOOKS[@]}"; do - echo -e "\n=========================================" - echo "Ingesting: $BOOK" - echo "=========================================" - - argo submit -n argo argo/workflows/ingest-hadithapi.yaml \ - --parameter book-slug=$BOOK \ - --parameter limit=0 \ - --wait \ - --log - - echo "$BOOK completed!" - - # Optional: add delay between books - sleep 10 -done - -echo -e "\n=== All Books Ingestion Complete ===" - -# Print summary -kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c " -SELECT - c.name_english, - c.abbreviation, - COUNT(h.id) as hadith_count, - COUNT(DISTINCT b.id) as chapter_count -FROM collections c -LEFT JOIN hadiths h ON c.id = h.collection_id -LEFT JOIN books b ON h.book_id = b.id -GROUP BY c.name_english, c.abbreviation -ORDER BY hadith_count DESC; -" -=== ./requirements.txt === -# Core dependencies -python-dotenv==1.0.0 -pydantic==2.5.0 -pydantic-settings==2.1.0 - -# HTTP clients -httpx==0.25.2 -requests==2.31.0 -tenacity==8.2.3 - -# Database -psycopg2-binary==2.9.9 -sqlalchemy==2.0.23 -asyncpg==0.29.0 - -# Data processing -pandas==2.1.4 -numpy==1.26.2 -pyarabic==0.6.15 -arabic-reshaper==3.0.0 - -# Validation -jsonschema==4.20.0 -validators==0.22.0 - -# Logging & Monitoring -structlog==23.2.0 -prometheus-client==0.19.0 - -# Cloud storage -minio==7.2.0 -boto3==1.34.0 - -# Task queue (optional) -celery==5.3.4 -redis==5.0.1 - -# Testing -pytest==7.4.3 -pytest-asyncio==0.21.1 -pytest-cov==4.1.0 -faker==21.0.0 -=== ./config/__init__.py === - -=== ./config/settings.py === -""" -Configuration settings for hadith ingestion service -""" -from pydantic_settings import BaseSettings -from typing import Optional -import os - - -class Settings(BaseSettings): - """Application settings loaded from environment variables""" - - # Database - # DATABASE_HOST: str = "postgres.db.svc.cluster.local" - DATABASE_HOST: str = "pg.betelgeusebytes.io" - DATABASE_PORT: int = 5432 - DATABASE_NAME: str = "hadith_db" - DATABASE_USER: str = "hadith_ingest" - DATABASE_PASSWORD: str = "hadith_ingest" - - @property - def DATABASE_URL(self) -> str: - return ( - f"postgresql://{self.DATABASE_USER}:{self.DATABASE_PASSWORD}" - f"@{self.DATABASE_HOST}:{self.DATABASE_PORT}/{self.DATABASE_NAME}" - ) - - @property - def ASYNC_DATABASE_URL(self) -> str: - return ( - f"postgresql+asyncpg://{self.DATABASE_USER}:{self.DATABASE_PASSWORD}" - f"@{self.DATABASE_HOST}:{self.DATABASE_PORT}/{self.DATABASE_NAME}" - ) - - # MinIO / S3 - MINIO_ENDPOINT: str = "minio.storage.svc.cluster.local:9000" - MINIO_ACCESS_KEY: str = "minioadmin" - MINIO_SECRET_KEY: str = "minioadmin" - MINIO_BUCKET_RAW: str = "hadith-raw-data" - MINIO_BUCKET_PROCESSED: str = "hadith-processed" - MINIO_SECURE: bool = False - SUNNAH_BASE_URL: str = "https://api.sunnah.com/v1" - HADITH_ONE_API_KEY: Optional[str] = "$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK" - # HadithAPI.com - HADITHAPI_KEY: str = "$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK" - HADITHAPI_BASE_URL: str = "https://hadithapi.com/api" - # Rate limiting - API_RATE_LIMIT: int = 30 # requests per minute - API_MAX_RETRIES: int = 3 - API_RETRY_DELAY: int = 5 # seconds - - # Processing - BATCH_SIZE: int = 100 - MAX_WORKERS: int = 4 - - # TEI Service (for embeddings) - TEI_URL: str = "http://tei.ml.svc.cluster.local" - TEI_TIMEOUT: int = 30 - - # Qdrant - QDRANT_URL: str = "http://qdrant.db.svc.cluster.local:6333" - QDRANT_COLLECTION: str = "hadith_embeddings" - - # Logging - LOG_LEVEL: str = "INFO" - LOG_FORMAT: str = "json" - - # Job tracking - JOB_NAME: Optional[str] = None - JOB_TYPE: str = "api_fetch" - - class Config: - env_file = ".env" - env_file_encoding = "utf-8" - case_sensitive = True - - -# Global settings instance -settings = Settings() -=== ./Dockerfile === -FROM python:3.11-slim - -WORKDIR /app - -# Install system dependencies -RUN apt-get update && apt-get install -y \ - gcc \ - postgresql-client \ - curl \ - && rm -rf /var/lib/apt/lists/* - -# Copy requirements -COPY requirements.txt . - -# Install Python dependencies -RUN pip install --no-cache-dir -r requirements.txt - -# Copy application code -COPY config/ /app/config/ -COPY src/ /app/src/ - -# Create non-root user -RUN useradd -m -u 1000 hadith && chown -R hadith:hadith /app -USER hadith - -# Set Python path -ENV PYTHONPATH=/app - -# Default command -CMD ["python", "/app/src/main_hadithapi.py"] -=== ./tests/__init__.py === - -=== ./tests/test_clients.py === - -=== ./test-hadithapi-k8s.sh === -#!/bin/bash -# test-hadithapi-k8s.sh - -set -e - -echo "=== Kubernetes HadithAPI Integration Test ===" - -# 1. Create secrets -echo "Creating secrets..." -#./create-secrets.sh - -# 2. Build and load image (if using local cluster) -echo "Building Docker image..." -#docker build -t hadith-ingestion:latest . - -# If using kind/minikube, load image -# kind load docker-image hadith-ingestion:latest - -# 3. Submit test workflow (10 hadiths) -echo "Submitting test workflow..." -argo submit -n ml argo/workflows/ingest-hadithapi.yaml \ - --parameter book-slug=sahih-bukhari \ - --parameter limit=10 \ - --wait \ - --log - -# 4. Check workflow status -echo -e "\nChecking workflow status..." -argo list -n argo - -# 5. Verify data in database -echo -e "\nVerifying data..." -kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c " -SELECT - c.name_english, - COUNT(h.id) as hadith_count, - MAX(h.created_at) as last_ingestion -FROM collections c -LEFT JOIN hadiths h ON c.id = h.collection_id -WHERE c.abbreviation = 'bukhari' -GROUP BY c.name_english; -" - -echo -e "\n=== Test Complete ===" -=== ./test_mainhadithapi.py === -#!/usr/bin/env python3 -""" -Test script for main_hadithapi.py -""" -import sys -import os -sys.path.insert(0, '.') - -from src.main_hadithapi import HadithAPIIngestionService - -def test_main_hadithapi(): - """Test the main HadithAPI ingestion service""" - print("=== Testing HadithAPI Ingestion Service ===\n") - - try: - # Initialize the service - print("1. Initializing HadithAPIIngestionService...") - service = HadithAPIIngestionService() - print("✓ Service initialized successfully\n") - - # Test 1: List available books - print("2. Testing book synchronization...") - book_mapping = service.sync_books_from_api() - print(f"✓ Found {len(book_mapping)} mapped books") - for book_slug, info in list(book_mapping.items())[:3]: # Show first 3 - print(f" - {book_slug}: {info['book_name']} ({info['hadiths_count']} hadiths)") - print() - - # Test 2: Test ingestion with limit - print("3. Testing limited ingestion (10 hadiths from Sahih Bukhari)...") - stats = service.ingest_collection( - book_slug='sahih-bukhari', - limit=10 - ) - print(f"✓ Ingestion completed with stats:") - print(f" Processed: {stats['processed']}") - print(f" Failed: {stats['failed']}") - print(f" Skipped: {stats['skipped']}\n") - - # Test 3: List books functionality - print("4. Testing book listing...") - print("\n=== Available Books ===\n") - for book_slug, info in book_mapping.items(): - print(f"Book Slug: {book_slug}") - print(f" Name: {info['book_name']}") - print(f" Hadiths: {info['hadiths_count']}") - print(f" Chapters: {info['chapters_count']}") - print() - - # Clean up - service.close() - print("=== All Tests Passed! ===") - return True - - except Exception as e: - print(f"✗ Test failed with error: {e}") - import traceback - traceback.print_exc() - return False - -def test_command_line_args(): - """Test command line argument parsing""" - print("=== Testing Command Line Arguments ===\n") - - # We'll simulate command line arguments - import argparse - from src.main_hadithapi import main - - # Test --list-books argument - print("1. Testing --list-books argument...") - original_argv = sys.argv.copy() - - try: - sys.argv = ['main_hadithapi.py', '--list-books'] - # We won't actually run main() as it would exit, but we can check the parsing - parser = argparse.ArgumentParser(description="Ingest hadiths from HadithAPI.com") - parser.add_argument("--book-slug", help="Book slug (e.g., sahih-bukhari)") - parser.add_argument("--limit", type=int, help="Limit number of hadiths to ingest") - parser.add_argument("--list-books", action="store_true", help="List available books and exit") - - args = parser.parse_args(['--list-books']) - print(f"✓ Argument parsing successful: list_books={args.list_books}") - - # Test book-slug argument - args = parser.parse_args(['--book-slug', 'sahih-bukhari', '--limit', '5']) - print(f"✓ Argument parsing successful: book_slug={args.book_slug}, limit={args.limit}") - - print("✓ Command line argument parsing works correctly\n") - return True - - except Exception as e: - print(f"✗ Argument parsing failed: {e}") - return False - finally: - sys.argv = original_argv - -if __name__ == "__main__": - print("Starting tests for main_hadithapi.py...\n") - - # Test command line arguments - if not test_command_line_args(): - sys.exit(1) - - # Test main functionality - if not test_main_hadithapi(): - sys.exit(1) - - print("\n🎉 All tests passed successfully!") - sys.exit(0) -=== ./setup.py === - -=== ./build-and-push.sh === -#!/bin/bash -# build-and-push.sh - -set -e - -# Configuration -IMAGE_NAME="hadith-ingestion" -TAG="${1:-latest}" -DOCKER_REGISTRY="axxs" -REGISTRY="${DOCKER_REGISTRY:-}" - -echo "Building Docker image: ${IMAGE_NAME}:${TAG}" - -# Build image -docker build -t ${IMAGE_NAME}:${TAG} -f Dockerfile . - -# Tag for registry -docker tag ${IMAGE_NAME}:${TAG} ${REGISTRY}/${IMAGE_NAME}:${TAG} - -# Push to registry -echo "Pushing to registry: ${REGISTRY}" -docker push ${REGISTRY}/${IMAGE_NAME}:${TAG} - -echo "Done!" -=== ./.env === -# Database -# DATABASE_HOST=postgres.db.svc.cluster.local -DATABASE_HOST = pg.betelgeusebytes.io -DATABASE_PORT=5432 -DATABASE_NAME=hadith_db -DATABASE_USER=hadith_ingest -DATABASE_PASSWORD=hadith_ingest - -# HadithAPI.com -HADITHAPI_KEY=$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK - -# MinIO -MINIO_ENDPOINT=minio.storage.svc.cluster.local:9000 -MINIO_ACCESS_KEY=minioadmin -MINIO_SECRET_KEY=minioadmin - -# Services -TEI_URL=http://tei.ml.svc.cluster.local -QDRANT_URL=http://qdrant.vector.svc.cluster.local:6333 - -# Settings -LOG_LEVEL=INFO -API_RATE_LIMIT=30 -BATCH_SIZE=100 -=== ./build-hadithapi-ingestion.sh === -#!/bin/bash -# build-hadithapi-ingestion.sh - -set -e - -IMAGE_NAME="hadith-ingestion" -TAG="v1.0-hadithapi" - -echo "Building Docker image for HadithAPI.com ingestion..." - -# Build image -docker build -t ${IMAGE_NAME}:${TAG} -f Dockerfile . - -# Tag as latest -docker tag ${IMAGE_NAME}:${TAG} ${IMAGE_NAME}:latest - -# If you have a registry, push -# docker push your-registry/${IMAGE_NAME}:${TAG} - -echo "Build complete: ${IMAGE_NAME}:${TAG}" -=== ./combine.sh === -find . -type f -name "*.txt" -o -name "production" -o -name "*.py" -o -name "*.yaml" -o -name "Dockerfile" -o -name "*.sh" -o -name "*.env" ! -name "*.md" ! -name "*.xls" ! -name "*.xlsx"| while read file; do - echo "=== $file ===" >> combined.txt - cat "$file" >> combined.txt - echo "" >> combined.txt -done - -=== ./test_hadithapi.py === -#!/usr/bin/env python3 -""" -Quick test script for hadithapi_client.py -""" -import sys -from venv import logger -sys.path.insert(0, '/app') - -from src.api_clients.hadithapi_client import HadithAPIClient -from config.settings import settings - -def test_api_connection(): - """Test basic API connectivity""" - print("=== Testing HadithAPI Client ===\n") - - client = HadithAPIClient() - - # Test 1: Get books - print("Test 1: Fetching available books...") - try: - books = client.get_books() - print(f"✓ Success! Found {len(books)} books") - for book in books[:3]: # Show first 3 - print(f" - {book.get('bookName')} ({book.get('bookSlug')})") - print(f" Hadiths: {book.get('hadiths_count')}, Chapters: {book.get('chapters_count')}") - logger.info(f"Fetched {len(books)} books successfully") - - except Exception as e: - print(f"✗ Failed: {e}") - return False - - # Test 2: Get chapters for Sahih Bukhari - print("\nTest 2: Fetching chapters for Sahih Bukhari...") - try: - chapters = client.get_chapters('sahih-bukhari') - print(f"✓ Success! Found {len(chapters)} chapters") - if chapters: - print(f" First chapter: {chapters[0].get('chapterEnglish')}") - except Exception as e: - print(f"✗ Failed: {e}") - return False - - # Test 3: Fetch first page of hadiths - print("\nTest 3: Fetching first page of hadiths...") - book_id = None - try: - book = client.get_book_by_slug('sahih-bukhari') - if not book: - print("✗ Failed: Book 'sahih-bukhari' not found") - return False - book_id = book.get('id') - page_data = client.get_hadiths_page('sahih-bukhari', page=1, limit=5) - hadiths = page_data.get('hadiths', []) - print(f"✓ Success! Fetched {len(hadiths)} hadiths") - if hadiths: - first = hadiths[0] - print(f" First hadith number: {first.get('hadithNumber')}") - print(f" Arabic text (first 100 chars): {first.get('hadithArabic', '')[:100]}...") - except Exception as e: - print(f"✗ Failed: {e}") - return False - - if book_id is None: - print("✗ Failed: Book ID unavailable for iterator test") - return False - - # # Test 4: Test iterator (fetch 3 hadiths) - print("\nTest 4: Testing hadith iterator (3 hadiths)...") - try: - count = 0 - - for hadith in client.iter_all_hadiths_in_book(book_id='sahih-bukhari', book_slug='sahih-bukhari', batch_size=10): - count += 1 - print(f" Hadith #{hadith.get('hadithNumber')} is {hadith.get('englishNarrator')} and is {hadith.get('status')} ") - if count >= 3: - break - print(f"✓ Success! Iterator working correctly") - except Exception as e: - print(f"✗ Failed: {e}") - return False - - client.close() - print("\n=== All Tests Passed! ===") - return True - -if __name__ == "__main__": - success = test_api_connection() - sys.exit(0 if success else 1) -=== ./test-hadithapi-local.sh === -#!/bin/bash -# test-hadithapi-local.sh - -set -e - -echo "=== HadithAPI.com Integration Test ===" - -# 1. Test API connection -echo "Testing API connection..." -curl -s "https://hadithapi.com/api/books?apiKey=\$2y\$10\$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK" | jq . - -# 2. Test database connection -echo -e "\nTesting database connection..." -kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c "SELECT COUNT(*) FROM collections;" - -# 3. List available books -echo -e "\nListing available books..." -python src/main_hadithapi.py --list-books - -# 4. Test ingestion (limited to 10 hadiths) -echo -e "\nRunning test ingestion (10 hadiths from Sahih Bukhari)..." -python src/main_hadithapi.py --book-slug sahih-bukhari --limit 10 - -# 5. Verify data -echo -e "\nVerifying ingested data..." -kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c " -SELECT - c.name_english, - c.abbreviation, - COUNT(h.id) as hadith_count, - COUNT(DISTINCT b.id) as book_count -FROM collections c -LEFT JOIN hadiths h ON c.id = h.collection_id -LEFT JOIN books b ON h.book_id = b.id -WHERE c.abbreviation = 'bukhari' -GROUP BY c.name_english, c.abbreviation; -" - -echo -e "\n=== Test Complete ===" -=== ./simple-pod.yaml === -apiVersion: v1 -kind: Pod -metadata: - name: hadith-ingestion-list-books - namespace: ml -spec: - restartPolicy: Never - containers: - - name: hadith-ingestion - image: axxs/hadith-ingestion:latest - # command: ["python"] - # args: ["/app/src/main_hadithapi.py", "--list-books"] - command: ["sh","-c","sleep infinity"] - env: - - name: DATABASE_HOST - value: "postgres.db.svc.cluster.local" - - name: DATABASE_PORT - value: "5432" - - name: DATABASE_NAME - value: "hadith_db" - - name: DATABASE_USER - value: "hadith_ingest" - - name: DATABASE_PASSWORD - valueFrom: - secretKeyRef: - name: hadith-db-secret - key: password - - name: HADITHAPI_KEY - valueFrom: - secretKeyRef: - name: hadithapi-secret - key: api-key - - name: MINIO_ENDPOINT - value: "minio.storage.svc.cluster.local:9000" - - name: MINIO_ACCESS_KEY - valueFrom: - secretKeyRef: - name: minio-secret - key: access-key - - name: MINIO_SECRET_KEY - valueFrom: - secretKeyRef: - name: minio-secret - key: secret-key - - name: LOG_LEVEL - value: "INFO" -=== ./argo/workflows/ingest-collection.yaml === -apiVersion: argoproj.io/v1alpha1 -kind: Workflow -metadata: - generateName: ingest-hadith-collection- - namespace: argo -spec: - entrypoint: ingest-pipeline - - # Arguments - arguments: - parameters: - - name: collection - value: "bukhari" - - name: limit - value: "0" # 0 means no limit - - # Service account with database access - serviceAccountName: argo-workflow - - # Templates - templates: - - # ======================================== - # Main pipeline - # ======================================== - - name: ingest-pipeline - steps: - - - name: ingest-hadiths - template: ingest - arguments: - parameters: - - name: collection - value: "{{workflow.parameters.collection}}" - - name: limit - value: "{{workflow.parameters.limit}}" - - - - name: generate-embeddings - template: generate-embeddings - arguments: - parameters: - - name: collection - value: "{{workflow.parameters.collection}}" - - - - name: index-qdrant - template: index-qdrant - arguments: - parameters: - - name: collection - value: "{{workflow.parameters.collection}}" - - # ======================================== - # Ingestion step - # ======================================== - - name: ingest - inputs: - parameters: - - name: collection - - name: limit - - container: - image: hadith-ingestion:latest - imagePullPolicy: Always - command: [python, /app/src/main_hadithapi.py] - args: - - "{{inputs.parameters.collection}}" - - "--limit={{inputs.parameters.limit}}" - - env: - - name: DATABASE_HOST - value: "postgres.db.svc.cluster.local" - - name: DATABASE_PORT - value: "5432" - - name: DATABASE_NAME - value: "hadith_db" - - name: DATABASE_USER - value: "hadith_ingest" - - name: DATABASE_PASSWORD - valueFrom: - secretKeyRef: - name: hadith-db-secret - key: password - - - name: MINIO_ENDPOINT - value: "minio.storage.svc.cluster.local:9000" - - name: MINIO_ACCESS_KEY - valueFrom: - secretKeyRef: - name: minio-secret - key: access-key - - name: MINIO_SECRET_KEY - valueFrom: - secretKeyRef: - name: minio-secret - key: secret-key - - - name: SUNNAH_API_KEY - valueFrom: - secretKeyRef: - name: sunnah-api-secret - key: api-key - - - name: LOG_LEVEL - value: "INFO" - - name: JOB_NAME - value: "{{workflow.name}}" - - resources: - requests: - cpu: 500m - memory: 1Gi - limits: - cpu: 2 - memory: 4Gi - - # ======================================== - # Embedding generation step - # ======================================== - - name: generate-embeddings - inputs: - parameters: - - name: collection - - container: - image: hadith-embeddings:latest - imagePullPolicy: Always - command: [python, /app/src/embeddings/generator.py] - args: - - "--collection={{inputs.parameters.collection}}" - - "--batch-size=32" - - env: - - name: DATABASE_URL - value: "postgresql://hadith_ingest:$(DATABASE_PASSWORD)@postgres.db.svc.cluster.local:5432/hadith_db" - - name: DATABASE_PASSWORD - valueFrom: - secretKeyRef: - name: hadith-db-secret - key: password - - - name: TEI_URL - value: "http://tei.ml.svc.cluster.local" - - - name: LOG_LEVEL - value: "INFO" - - resources: - requests: - cpu: 1 - memory: 2Gi - limits: - cpu: 4 - memory: 8Gi - - # ======================================== - # Qdrant indexing step - # ======================================== - - name: index-qdrant - inputs: - parameters: - - name: collection - - container: - image: hadith-qdrant-indexer:latest - imagePullPolicy: Always - command: [python, /app/index_qdrant.py] - args: - - "--collection={{inputs.parameters.collection}}" - - "--batch-size=100" - - env: - - name: DATABASE_URL - value: "postgresql://hadith_ingest:$(DATABASE_PASSWORD)@postgres.db.svc.cluster.local:5432/hadith_db" - - name: DATABASE_PASSWORD - valueFrom: - secretKeyRef: - name: hadith-db-secret - key: password - - - name: QDRANT_URL - value: "http://qdrant.vector.svc.cluster.local:6333" - - name: QDRANT_COLLECTION - value: "hadith_embeddings" - - - name: LOG_LEVEL - value: "INFO" - - resources: - requests: - cpu: 500m - memory: 1Gi - limits: - cpu: 2 - memory: 4Gi -=== ./argo/workflows/ingest-hadithapi.yaml === -apiVersion: argoproj.io/v1alpha1 -kind: Workflow -metadata: - generateName: ingest-hadithapi- - namespace: ml -spec: - entrypoint: ingest-pipeline - - arguments: - parameters: - - name: book-slug - value: "sahih-bukhari" - - name: limit - value: "0" # 0 means no limit - - serviceAccountName: argo-workflow - - templates: - - # ======================================== - # Main pipeline - # ======================================== - - name: ingest-pipeline - steps: - - - name: ingest-hadiths - template: ingest - arguments: - parameters: - - name: book-slug - value: "{{workflow.parameters.book-slug}}" - - name: limit - value: "{{workflow.parameters.limit}}" - - # ======================================== - # Ingestion step - # ======================================== - - name: ingest - inputs: - parameters: - - name: book-slug - - name: limit - - container: - image: axxs/hadith-ingestion:latest - imagePullPolicy: Always - command: [python, /app/src/main_hadithapi.py] - args: - - "--book-slug={{inputs.parameters.book-slug}}" - - "--limit={{inputs.parameters.limit}}" - - env: - - name: DATABASE_HOST - value: "postgres.db.svc.cluster.local" - - name: DATABASE_PORT - value: "5432" - - name: DATABASE_NAME - value: "hadith_db" - - name: DATABASE_USER - value: "hadith_ingest" - - name: DATABASE_PASSWORD - valueFrom: - secretKeyRef: - name: hadith-db-secret - key: password - - - name: HADITHAPI_KEY - valueFrom: - secretKeyRef: - name: hadithapi-secret - key: api-key - - - name: MINIO_ENDPOINT - value: "minio.storage.svc.cluster.local:9000" - - name: MINIO_ACCESS_KEY - valueFrom: - secretKeyRef: - name: minio-secret - key: access-key - - name: MINIO_SECRET_KEY - valueFrom: - secretKeyRef: - name: minio-secret - key: secret-key - - - name: LOG_LEVEL - value: "INFO" - - name: JOB_NAME - value: "{{workflow.name}}" - - name: API_RATE_LIMIT - value: "30" - - resources: - requests: - cpu: 500m - memory: 1Gi - limits: - cpu: 2 - memory: 4Gi ---- -# Workflow to ingest ALL books -apiVersion: argoproj.io/v1alpha1 -kind: Workflow -metadata: - generateName: ingest-all-hadithapi- - namespace: ml -spec: - entrypoint: ingest-all-books - - serviceAccountName: argo-workflow - - arguments: - parameters: - - name: limit-per-book - value: "0" # 0 means no limit - - templates: - - # ======================================== - # Main pipeline - sequential processing - # ======================================== - - name: ingest-all-books - steps: - # Process each book sequentially to avoid rate limiting - - - name: sahih-bukhari - template: ingest-book - arguments: - parameters: - - name: book-slug - value: "sahih-bukhari" - - - - name: sahih-muslim - template: ingest-book - arguments: - parameters: - - name: book-slug - value: "sahih-muslim" - - - - name: sunan-abu-dawood - template: ingest-book - arguments: - parameters: - - name: book-slug - value: "abu-dawood" - - - - name: jami-at-tirmidhi - template: ingest-book - arguments: - parameters: - - name: book-slug - value: "al-tirmidhi" - - - - name: sunan-an-nasai - template: ingest-book - arguments: - parameters: - - name: book-slug - value: "sunan-nasai" - - - - name: sunan-ibn-e-majah - template: ingest-book - arguments: - parameters: - - name: book-slug - value: "ibn-e-majah" - - - - name: musnad-ahmad - template: ingest-book - arguments: - parameters: - - name: book-slug - value: "musnad-ahmad" - - - - - name: al-silsila-sahiha - template: ingest-book - arguments: - parameters: - - name: book-slug - value: "al-silsila-sahiha" - - # ======================================== - # Book ingestion template - # ======================================== - - name: ingest-book - inputs: - parameters: - - name: book-slug - - container: - image: axxs/hadith-ingestion:latest - imagePullPolicy: Always - command: [python, /app/src/main_hadithapi.py] - args: - - "--book-slug={{inputs.parameters.book-slug}}" - - "--limit={{workflow.parameters.limit-per-book}}" - - env: - - name: DATABASE_HOST - value: "postgres.db.svc.cluster.local" - - name: DATABASE_PASSWORD - valueFrom: - secretKeyRef: - name: hadith-db-secret - key: password - - name: HADITHAPI_KEY - valueFrom: - secretKeyRef: - name: hadithapi-secret - key: api-key - - name: LOG_LEVEL - value: "INFO" - - resources: - requests: - cpu: 500m - memory: 1Gi - limits: - cpu: 2 - memory: 4Gi -=== ./src/database/__init__.py === - -=== ./src/database/connection.py === - -=== ./src/database/repository.py === -""" -Database repository for hadith data operations -""" -from typing import List, Dict, Any, Optional -import json -from uuid import UUID -import structlog -from sqlalchemy import create_engine, text, select, insert, update -from sqlalchemy.orm import sessionmaker, Session -from sqlalchemy.exc import IntegrityError -from config.settings import settings - -logger = structlog.get_logger() - - -class HadithRepository: - """Repository for hadith database operations""" - - def __init__(self, database_url: Optional[str] = None): - self.database_url = database_url or settings.DATABASE_URL - self.engine = create_engine(self.database_url, pool_pre_ping=True) - self.SessionLocal = sessionmaker(bind=self.engine) - - @staticmethod - def _coerce_uuid(value: Any) -> UUID: - if isinstance(value, UUID): - return value - return UUID(str(value)) - - def get_session(self) -> Session: - """Get database session""" - return self.SessionLocal() - - # ===== Collections ===== - - def get_collection_by_abbreviation(self, abbr: str) -> Optional[Dict[str, Any]]: - """Get collection by abbreviation""" - with self.get_session() as session: - query = text(""" - SELECT * FROM collections - WHERE abbreviation = :abbr - """) - result = session.execute(query, {"abbr": abbr}).fetchone() - - if result: - return dict(result._mapping) - return None - - def get_all_collections(self) -> List[Dict[str, Any]]: - """Get all collections""" - with self.get_session() as session: - query = text("SELECT * FROM collections ORDER BY name_english") - result = session.execute(query).fetchall() - return [dict(row._mapping) for row in result] - - def update_collection_count(self, collection_id: UUID, count: int): - """Update total hadith count for a collection""" - with self.get_session() as session: - query = text(""" - UPDATE collections - SET total_hadiths = :count, updated_at = NOW() - WHERE id = :id - """) - session.execute(query, {"id": str(collection_id), "count": count}) - session.commit() - - # ===== Books ===== - - def upsert_book( - self, - collection_id: UUID, - book_number: int, - name_english: Optional[str] = None, - name_arabic: Optional[str] = None, - metadata: Optional[Dict] = None - ) -> UUID: - """Insert or update a book""" - metadata_json = json.dumps(metadata or {}) - with self.get_session() as session: - query = text(""" - INSERT INTO books (collection_id, book_number, name_english, name_arabic, metadata) - VALUES (:collection_id, :book_number, :name_english, :name_arabic, :metadata) - ON CONFLICT (collection_id, book_number) - DO UPDATE SET - name_english = EXCLUDED.name_english, - name_arabic = EXCLUDED.name_arabic, - metadata = EXCLUDED.metadata - RETURNING id - """) - - result = session.execute(query, { - "collection_id": str(collection_id), - "book_number": book_number, - "name_english": name_english, - "name_arabic": name_arabic, - "metadata": metadata_json - }) - session.commit() - - return self._coerce_uuid(result.fetchone()[0]) - - def get_book(self, collection_id: UUID, book_number: int) -> Optional[Dict[str, Any]]: - """Get book by collection and book number""" - with self.get_session() as session: - query = text(""" - SELECT * FROM books - WHERE collection_id = :collection_id AND book_number = :book_number - """) - result = session.execute(query, { - "collection_id": str(collection_id), - "book_number": book_number - }).fetchone() - - if result: - return dict(result._mapping) - return None - - # ===== Hadiths ===== - - def upsert_hadith( - self, - collection_id: UUID, - hadith_number: int, - arabic_text: str, - book_id: Optional[UUID] = None, - english_text: Optional[str] = None, - urdu_text: Optional[str] = None, - grade: Optional[str] = None, - grade_source: Optional[str] = None, - chapter_name: Optional[str] = None, - source_id: Optional[str] = None, - source_url: Optional[str] = None, - source_metadata: Optional[Dict] = None - ) -> UUID: - """Insert or update a hadith""" - - with self.get_session() as session: - query = text(""" - INSERT INTO hadiths ( - collection_id, book_id, hadith_number, - arabic_text, english_text, urdu_text, - grade, grade_source, chapter_name, - source_id, source_url, source_metadata - ) - VALUES ( - :collection_id, :book_id, :hadith_number, - :arabic_text, :english_text, :urdu_text, - :grade, :grade_source, :chapter_name, - :source_id, :source_url, :source_metadata - ) - ON CONFLICT (collection_id, book_id, hadith_number) - DO UPDATE SET - arabic_text = EXCLUDED.arabic_text, - english_text = EXCLUDED.english_text, - urdu_text = EXCLUDED.urdu_text, - grade = EXCLUDED.grade, - grade_source = EXCLUDED.grade_source, - chapter_name = EXCLUDED.chapter_name, - source_url = EXCLUDED.source_url, - source_metadata = EXCLUDED.source_metadata, - updated_at = NOW() - RETURNING id - """) - metadata_json = json.dumps(source_metadata or {}) - - result = session.execute(query, { - "collection_id": str(collection_id), - "book_id": str(book_id) if book_id else None, - "hadith_number": hadith_number, - "arabic_text": arabic_text, - "english_text": english_text, - "urdu_text": urdu_text, - "grade": grade, - "grade_source": grade_source, - "chapter_name": chapter_name, - "source_id": source_id, - "source_url": source_url, - "source_metadata": metadata_json - }) - session.commit() - - return self._coerce_uuid(result.fetchone()[0]) - - def get_hadiths_without_embeddings( - self, - limit: int = 100, - collection_id: Optional[UUID] = None - ) -> List[Dict[str, Any]]: - """Get hadiths that need embedding generation""" - with self.get_session() as session: - if collection_id: - query = text(""" - SELECT * FROM hadiths - WHERE embedding_generated = FALSE - AND collection_id = :collection_id - ORDER BY created_at ASC - LIMIT :limit - """) - result = session.execute(query, { - "collection_id": str(collection_id), - "limit": limit - }).fetchall() - else: - query = text(""" - SELECT * FROM hadiths - WHERE embedding_generated = FALSE - ORDER BY created_at ASC - LIMIT :limit - """) - result = session.execute(query, {"limit": limit}).fetchall() - - return [dict(row._mapping) for row in result] - - def mark_embedding_generated(self, hadith_id: UUID, version: str = "v1"): - """Mark hadith as having embedding generated""" - with self.get_session() as session: - # Prepare the update query - query = text(""" - UPDATE hadiths - SET embedding_generated = TRUE, - embedding_version = :version, - updated_at = NOW() - WHERE id = :id - """) - # Pre-serialize parameters (keeping consistent with other methods that - # serialize payloads/configs before execution) - params = {"id": str(hadith_id), "version": version} - session.execute(query, {"id": str(hadith_id), "version": version}) - session.commit() - - # ===== Ingestion Jobs ===== - - def create_ingestion_job( - self, - job_name: str, - job_type: str, - source_name: str, - config: Optional[Dict] = None - ) -> UUID: - """Create a new ingestion job""" - with self.get_session() as session: - query = text(""" - INSERT INTO ingestion_jobs (job_name, job_type, source_name, config, status, started_at) - VALUES (:job_name, :job_type, :source_name, :config, 'running', NOW()) - RETURNING id - """) - # serialize config as JSON for storage - result = session.execute(query, { - "job_name": job_name, - "job_type": job_type, - "source_name": source_name, - "config": json.dumps(config or {}) - }) - session.commit() - job_id = result.fetchone()[0] - return job_id if isinstance(job_id, UUID) else UUID(str(job_id)) - - def update_job_progress( - self, - job_id: UUID, - total: Optional[int] = None, - processed: Optional[int] = None, - failed: Optional[int] = None, - skipped: Optional[int] = None - ): - """Update job progress counters""" - with self.get_session() as session: - updates = [] - params = {"job_id": str(job_id)} - - if total is not None: - updates.append("total_records = :total") - params["total"] = total - if processed is not None: - updates.append("processed_records = :processed") - params["processed"] = processed - if failed is not None: - updates.append("failed_records = :failed") - params["failed"] = failed - if skipped is not None: - updates.append("skipped_records = :skipped") - params["skipped"] = skipped - - if updates: - query_str = f""" - UPDATE ingestion_jobs - SET {', '.join(updates)} - WHERE id = :job_id - """ - session.execute(text(query_str), params) - session.commit() - - def complete_job( - self, - job_id: UUID, - status: str = "success", - error_message: Optional[str] = None - ): - """Mark job as completed""" - with self.get_session() as session: - query = text(""" - UPDATE ingestion_jobs - SET status = :status, - completed_at = NOW(), - duration_seconds = EXTRACT(EPOCH FROM (NOW() - started_at)), - error_message = :error_message - WHERE id = :job_id - """) - session.execute(query, { - "job_id": str(job_id), - "status": status, - "error_message": error_message - }) - session.commit() - - def add_processing_log( - self, - job_id: UUID, - level: str, - message: str, - details: Optional[Dict] = None - ): - """Add a processing log entry""" - with self.get_session() as session: - query = text(""" - INSERT INTO processing_logs (job_id, log_level, message, details) - VALUES (:job_id, :level, :message, :details) - """) - details_json = json.dumps(details or {}) - session.execute(query, { - "job_id": str(job_id), - "level": level, - "message": message, - "details": details_json - }) - session.commit() - - # ===== Statistics ===== - - def get_collection_stats(self, collection_id: UUID) -> Dict[str, Any]: - """Get statistics for a collection""" - with self.get_session() as session: - query = text(""" - SELECT * FROM get_collection_statistics(:collection_id) - """) - result = session.execute(query, {"collection_id": str(collection_id)}).fetchone() - - if result: - return dict(result._mapping) - return {} -=== ./src/embeddings/__init__.py === - -=== ./src/embeddings/generator.py === - -=== ./src/main_hadithapi.py === -""" -Main ingestion script for fetching hadiths from HadithAPI.com -""" -import sys -import argparse -from typing import Optional, Dict, Any -from uuid import UUID -import structlog -from config.settings import settings -from src.api_clients.hadithapi_client import HadithAPIClient -from src.database.repository import HadithRepository -from src.processors.text_cleaner import ArabicTextProcessor, TextCleaner - -# Configure structured logging -structlog.configure( - processors=[ - structlog.stdlib.filter_by_level, - structlog.stdlib.add_logger_name, - structlog.stdlib.add_log_level, - structlog.processors.TimeStamper(fmt="iso"), - structlog.processors.StackInfoRenderer(), - structlog.processors.format_exc_info, - structlog.processors.JSONRenderer() - ], - wrapper_class=structlog.stdlib.BoundLogger, - context_class=dict, - logger_factory=structlog.stdlib.LoggerFactory(), - cache_logger_on_first_use=True, -) - -logger = structlog.get_logger() - - -# Book slug to collection abbreviation mapping -BOOK_SLUG_MAPPING = { - 'sahih-bukhari': 'bukhari', - 'sahih-muslim': 'muslim', - 'abu-dawood': 'abudawud', - 'al-tirmidhi': 'tirmidhi', - 'sunan-nasai': 'nasai', - 'ibn-e-majah': 'ibnmajah', - 'muwatta-imam-malik': 'malik', - 'musnad-ahmad': 'ahmad', - 'sunan-ad-darimi': 'darimi', - 'mishkat': 'mishkat', - 'al-silsila-sahiha': 'al-silsila-sahiha' -} - - -class HadithAPIIngestionService: - """Service for ingesting hadiths from HadithAPI.com""" - - def __init__(self): - self.api_client = HadithAPIClient() - self.repo = HadithRepository() - self.text_processor = ArabicTextProcessor() - self.text_cleaner = TextCleaner() - - def sync_books_from_api(self) -> Dict[str, Any]: - """ - Sync book metadata from API to database - - Returns: - Dictionary mapping book_slug -> collection_id - """ - logger.info("syncing_books_from_api") - - # Get books from API - api_books = self.api_client.get_books() - - book_mapping = {} - # print(BOOK_SLUG_MAPPING) - for api_book in api_books: - book_slug = api_book.get('bookSlug') - # print(book_slug) - # Map to our collection abbreviation - collection_abbr = BOOK_SLUG_MAPPING.get(book_slug) - - if not collection_abbr: - logger.warning( - "unmapped_book", - book_slug=book_slug, - book_name=api_book.get('bookName') - ) - continue - # Get or verify collection exists in database - collection = self.repo.get_collection_by_abbreviation(collection_abbr) - - if not collection: - logger.warning( - "collection_not_in_db", - abbreviation=collection_abbr, - book_slug=book_slug - ) - continue - - collection_id = collection['id'] - if not isinstance(collection_id, UUID): - collection_id = UUID(str(collection_id)) - book_mapping[book_slug] = { - 'collection_id': collection_id, - 'book_id': api_book.get('id'), - 'book_name': api_book.get('bookName'), - 'hadiths_count': api_book.get('hadiths_count'), - 'chapters_count': api_book.get('chapters_count') - } - - logger.info( - "book_mapped", - book_slug=book_slug, - collection_abbr=collection_abbr, - hadiths_count=api_book.get('hadiths_count') - ) - - logger.info( - "books_synced", - total_books=len(book_mapping) - ) - - return book_mapping - - def ingest_collection( - self, - book_slug: str, - limit: Optional[int] = None - ) -> dict: - """ - Ingest entire collection from HadithAPI.com - - Args: - book_slug: Book slug identifier (e.g., 'sahih-bukhari') - limit: Optional limit on number of hadiths to ingest - - Returns: - Statistics dictionary - """ - logger.info( - "ingestion_started", - book_slug=book_slug, - limit=limit - ) - - # Get book mapping - book_mapping = self.sync_books_from_api() - logger.info("containing books", book_mapping=book_mapping) - logger.info("Book slugs", book_slug=book_slug) - if book_slug not in book_mapping: - logger.error( - "book_not_mapped", - book_slug=book_slug, - available_books=list(book_mapping.keys()) - ) - raise ValueError(f"Book '{book_slug}' not found or not mapped") - - book_info = book_mapping[book_slug] - collection_id = book_info['collection_id'] - # book_id = book_info['book_id'] - book_id = book_slug - - # Create ingestion job - job_id = self.repo.create_ingestion_job( - job_name=f"ingest_{book_slug}", - job_type="api_fetch", - source_name="hadithapi.com", - config={ - "book_slug": book_slug, - "book_id": book_id, - "limit": limit - } - ) - - logger.info( - "job_created", - job_id=str(job_id), - book_slug=book_slug, - expected_count=book_info.get('hadiths_count') - ) - - stats = { - "processed": 0, - "failed": 0, - "skipped": 0 - } - - try: - # Iterate through all hadiths in book - for hadith_data, chapter_data in self.api_client.iter_all_hadiths_in_book_with_chapters( - book_id=book_id, - book_slug=book_slug, - batch_size=100 - ): - # Check limit - if limit and stats["processed"] >= limit: - logger.info("limit_reached", limit=limit) - break - - try: - # Process and store hadith - self._process_and_store_hadith( - collection_id=collection_id, - hadith_data=hadith_data, - chapter_data=chapter_data - ) - - stats["processed"] += 1 - - # Update job progress every 100 hadiths - if stats["processed"] % 100 == 0: - self.repo.update_job_progress( - job_id=job_id, - processed=stats["processed"], - failed=stats["failed"], - skipped=stats["skipped"] - ) - - logger.info( - "progress_update", - book_slug=book_slug, - processed=stats["processed"], - failed=stats["failed"], - percentage=round( - (stats["processed"] / int(book_info.get('hadiths_count', 1))) * 100, - 2 - ) if book_info.get('hadiths_count') else 0 - ) - - except Exception as e: - stats["failed"] += 1 - logger.error( - "hadith_processing_failed", - error=str(e), - hadith_number=hadith_data.get("hadithNumber"), - hadith_id=hadith_data.get("id") - ) - - self.repo.add_processing_log( - job_id=job_id, - level="ERROR", - message=f"Failed to process hadith: {str(e)}", - details={"hadith_data": hadith_data} - ) - - # Update final job progress - self.repo.update_job_progress( - job_id=job_id, - total=stats["processed"] + stats["failed"] + stats["skipped"], - processed=stats["processed"], - failed=stats["failed"], - skipped=stats["skipped"] - ) - - # Mark job as complete - self.repo.complete_job(job_id=job_id, status="success") - - # Update collection count - self.repo.update_collection_count( - collection_id=collection_id, - count=stats["processed"] - ) - - logger.info( - "ingestion_completed", - book_slug=book_slug, - stats=stats - ) - - return stats - - except Exception as e: - logger.error( - "ingestion_failed", - book_slug=book_slug, - error=str(e), - exc_info=True - ) - - self.repo.complete_job( - job_id=job_id, - status="failed", - error_message=str(e) - ) - - raise - - def _process_and_store_hadith( - self, - collection_id: UUID, - hadith_data: dict, - chapter_data: Optional[dict] - ): - """Process and store a single hadith""" - - # Extract hadith number - hadith_number = hadith_data.get("hadithNumber") - if not hadith_number: - raise ValueError("Missing hadith number") - - # Convert to integer - try: - hadith_number = int(hadith_number) - except (ValueError, TypeError): - raise ValueError(f"Invalid hadith number: {hadith_number}") - - # Extract text in multiple languages - arabic_text = hadith_data.get("hadithArabic") - english_text = hadith_data.get("hadithEnglish") - urdu_text = hadith_data.get("hadithUrdu") - - if not arabic_text: - raise ValueError("Missing Arabic text") - # passed logger.warning("Arabic text extracted and validated", hadith_number=hadith_number) - # Clean texts - arabic_text = self.text_cleaner.clean_text(arabic_text) - if english_text: - english_text = self.text_cleaner.clean_text(english_text) - if urdu_text: - urdu_text = self.text_cleaner.clean_text(urdu_text) - - # Extract grade/status - grade = hadith_data.get("status") - - # Get or create chapter (book in our schema) - book_id = None - chapter_name = None - # logger.warning("Processing chapter data####", chapter_data.get('positional_args', {}).get('id')) - # logger.info("Processing chapter data####2####", chapter_data.get('id')) - if chapter_data: - chapter_id = chapter_data.get('id') - chapter_number = chapter_data.get('chapterNumber') - chapter_name_en = chapter_data.get('chapterEnglish') - chapter_name_ar = chapter_data.get('chapterArabic') - chapter_name = chapter_name_en - # print(chapter_number, chapter_name) - if chapter_number: - try: - chapter_number = int(chapter_number) - except (ValueError, TypeError): - chapter_number = chapter_id # Fallback to ID - - # Get or create book (chapter in HadithAPI = book in our schema) - existing_book = self.repo.get_book(collection_id, chapter_number) - # logger.warning("EXISTING BOOK : ", existing_book) - # logger.warning("Fetched or created book", collection_id=collection_id, chapter_number=chapter_number, chapter_name=chapter_name) - if not existing_book: - book_id = self.repo.upsert_book( - collection_id=collection_id, - book_number=chapter_number, - name_english=chapter_name_en, - name_arabic=chapter_name_ar, - metadata=chapter_data - ) - else: - # print(existing_book['id']) - existing_id = existing_book['id'] - if isinstance(existing_id, str): - existing_id = UUID(existing_id) - book_id = existing_id - - # Build source metadata - source_metadata = { - 'api_id': hadith_data.get('id'), - 'englishNarrator': hadith_data.get('englishNarrator'), - 'urduNarrator': hadith_data.get('urduNarrator'), - 'bookSlug': hadith_data.get('bookSlug'), - 'chapterId': hadith_data.get('chapterId'), - 'chapter': chapter_data - } - # logger.warning("Hadith metadata built", source_metadata) - # Store hadith - hadith_id = self.repo.upsert_hadith( - collection_id=collection_id, - book_id=book_id, - hadith_number=hadith_number, - arabic_text=arabic_text, - english_text=english_text, - urdu_text=urdu_text, - grade=grade, - grade_source="hadithapi.com", - chapter_name=chapter_name, - source_id=str(hadith_data.get('id', '')), - source_url=f"https://hadithapi.com/hadith/{hadith_data.get('id')}", - source_metadata=source_metadata - ) - - logger.debug( - "hadith_stored", - hadith_id=str(hadith_id), - hadith_number=hadith_number, - chapter_id=chapter_data.get('id') if chapter_data else None - ) - - def ingest_all_books(self, limit_per_book: Optional[int] = None) -> Dict[str, dict]: - """ - Ingest all available books - - Args: - limit_per_book: Optional limit per book - - Returns: - Dictionary of book_slug -> stats - """ - logger.info("ingesting_all_books", limit_per_book=limit_per_book) - - book_mapping = self.sync_books_from_api() - results = {} - - for book_slug in book_mapping.keys(): - logger.info("starting_book", book_slug=book_slug) - - try: - stats = self.ingest_collection( - book_slug=book_slug, - limit=limit_per_book - ) - results[book_slug] = {"status": "success", "stats": stats} - - except Exception as e: - logger.error( - "book_ingestion_failed", - book_slug=book_slug, - error=str(e) - ) - results[book_slug] = {"status": "failed", "error": str(e)} - - logger.info("all_books_completed", results=results) - return results - - def close(self): - """Close connections""" - self.api_client.close() - - -def main(): - """Main entry point""" - parser = argparse.ArgumentParser( - description="Ingest hadiths from HadithAPI.com" - ) - parser.add_argument( - "--book-slug", - help="Book slug (e.g., sahih-bukhari). If not provided, ingests all books." - ) - parser.add_argument( - "--limit", - type=int, - help="Limit number of hadiths to ingest per book" - ) - parser.add_argument( - "--list-books", - action="store_true", - help="List available books and exit" - ) - - args = parser.parse_args() - - try: - service = HadithAPIIngestionService() - - # List books mode - if args.list_books: - logger.info("listing_available_books") - book_mapping = service.sync_books_from_api() - - print("\n=== Available Books ===\n") - for book_slug, info in book_mapping.items(): - print(f"Book Slug: {book_slug}") - print(f" Name: {info['book_name']}") - print(f" Hadiths: {info['hadiths_count']}") - print(f" Chapters: {info['chapters_count']}") - print() - - service.close() - return 0 - - # Ingest mode - if args.book_slug: - logger.info( - "script_started", - book_slug=args.book_slug, - limit=args.limit - ) - - stats = service.ingest_collection( - book_slug=args.book_slug, - limit=args.limit - ) - - logger.info("script_completed", stats=stats) - - print(f"\n=== Ingestion completed for {args.book_slug} ===") - print(f"Processed: {stats['processed']}") - print(f"Failed: {stats['failed']}") - print(f"Skipped: {stats['skipped']}") - - else: - # Ingest all books - logger.info("script_started_all_books", limit_per_book=args.limit) - - results = service.ingest_all_books(limit_per_book=args.limit) - - print("\n=== All Books Ingestion Summary ===\n") - for book_slug, result in results.items(): - print(f"{book_slug}: {result['status']}") - if result['status'] == 'success': - stats = result['stats'] - print(f" Processed: {stats['processed']}") - print(f" Failed: {stats['failed']}") - else: - print(f" Error: {result['error']}") - print() - - service.close() - return 0 - - except Exception as e: - logger.error( - "script_failed", - error=str(e), - exc_info=True - ) - - print(f"\nIngestion failed: {str(e)}", file=sys.stderr) - return 1 - - -if __name__ == "__main__": - sys.exit(main()) -=== ./src/__init__.py === - -=== ./src/utils/__init__.py === - -=== ./src/utils/logger.py === - -=== ./src/utils/retry.py === - -=== ./src/processors/validator.py === - -=== ./src/processors/arabic_normalizer.py === - -=== ./src/processors/__init__.py === - -=== ./src/processors/text_cleaner.py === -""" -Text cleaning and normalization utilities -""" -import re -from typing import Optional -import unicodedata -import structlog - -logger = structlog.get_logger() - - -class ArabicTextProcessor: - """Process and normalize Arabic text""" - - # Arabic diacritics to remove - DIACRITICS = re.compile( - r'[\u064B-\u065F\u0670\u06D6-\u06DC\u06DF-\u06E8\u06EA-\u06ED]' - ) - - # Tatweel (elongation character) - TATWEEL = '\u0640' - - # Normalize Arabic letters - ALEF_VARIANTS = re.compile(r'[إأآا]') - ALEF_MAKSURA = 'ى' - YAA = 'ي' - TAA_MARBUTA = 'ة' - HAA = 'ه' - - @classmethod - def remove_diacritics(cls, text: str) -> str: - """Remove Arabic diacritics (tashkeel)""" - if not text: - return text - return cls.DIACRITICS.sub('', text) - - @classmethod - def remove_tatweel(cls, text: str) -> str: - """Remove tatweel (elongation) character""" - if not text: - return text - return text.replace(cls.TATWEEL, '') - - @classmethod - def normalize_alef(cls, text: str) -> str: - """Normalize all Alef variants to bare Alef""" - if not text: - return text - return cls.ALEF_VARIANTS.sub('ا', text) - - @classmethod - def normalize_yaa(cls, text: str) -> str: - """Normalize Alef Maksura to Yaa""" - if not text: - return text - return text.replace(cls.ALEF_MAKSURA, cls.YAA) - - @classmethod - def normalize_taa_marbuta(cls, text: str) -> str: - """Normalize Taa Marbuta to Haa""" - if not text: - return text - return text.replace(cls.TAA_MARBUTA, cls.HAA) - - @classmethod - def normalize_whitespace(cls, text: str) -> str: - """Normalize whitespace""" - if not text: - return text - # Replace multiple spaces with single space - text = re.sub(r'\s+', ' ', text) - # Trim - return text.strip() - - @classmethod - def normalize_full(cls, text: str) -> str: - """ - Apply full normalization: - - Remove diacritics - - Remove tatweel - - Normalize Alef variants - - Normalize Yaa - - Normalize Taa Marbuta - - Normalize whitespace - """ - if not text: - return text - - text = cls.remove_diacritics(text) - text = cls.remove_tatweel(text) - text = cls.normalize_alef(text) - text = cls.normalize_yaa(text) - text = cls.normalize_taa_marbuta(text) - text = cls.normalize_whitespace(text) - - return text - - @classmethod - def extract_sanad_matn(cls, text: str) -> tuple[Optional[str], Optional[str]]: - """ - Attempt to extract sanad (chain) and matn (text) from hadith - - Common patterns: - - حدثنا ... قال ... (sanad ends before reported speech) - - Simple heuristic: Split on first occurrence of قال or أن - - Returns: - Tuple of (sanad, matn) or (None, None) if cannot split - """ - if not text: - return None, None - - # Look for common sanad-matn separators - separators = [ - r'قال\s*رسول\s*الله', # "The Messenger of Allah said" - r'قال\s*النبي', # "The Prophet said" - r'عن\s*النبي', # "From the Prophet" - r'أن\s*رسول\s*الله', # "That the Messenger of Allah" - ] - - for pattern in separators: - match = re.search(pattern, text, re.IGNORECASE) - if match: - split_pos = match.start() - sanad = text[:split_pos].strip() - matn = text[split_pos:].strip() - - logger.debug( - "sanad_matn_extracted", - sanad_length=len(sanad), - matn_length=len(matn) - ) - - return sanad, matn - - # Could not split - logger.debug("sanad_matn_extraction_failed") - return None, None - - -class TextCleaner: - """General text cleaning utilities""" - - @staticmethod - def clean_html(text: str) -> str: - """Remove HTML tags""" - if not text: - return text - return re.sub(r'<[^>]+>', '', text) - - @staticmethod - def normalize_unicode(text: str) -> str: - """Normalize Unicode (NFC normalization)""" - if not text: - return text - return unicodedata.normalize('NFC', text) - - @staticmethod - def clean_text(text: str) -> str: - """Apply general cleaning""" - if not text: - return text - - # Remove HTML - text = TextCleaner.clean_html(text) - - # Normalize Unicode - text = TextCleaner.normalize_unicode(text) - - # Normalize whitespace - text = ArabicTextProcessor.normalize_whitespace(text) - - return text -=== ./src/api_clients/base_client.py === -""" -Base API client with retry logic and rate limiting -""" -import httpx -import time -from typing import Optional, Dict, Any -from tenacity import ( - retry, - stop_after_attempt, - wait_exponential, - retry_if_exception_type -) -import structlog -from config.settings import settings - -logger = structlog.get_logger() - - -class BaseAPIClient: - """Base class for API clients with built-in retry and rate limiting""" - - def __init__( - self, - base_url: str, - api_key: Optional[str] = None, - rate_limit: int = 90, - timeout: int = 30 - ): - self.base_url = base_url.rstrip('/') - self.api_key = api_key - self.rate_limit = rate_limit - self.timeout = timeout - - # Rate limiting - self.request_times = [] - self.min_interval = 60.0 / rate_limit # seconds between requests - - # HTTP client - self.client = httpx.Client(timeout=timeout) - - logger.info( - "api_client_initialized", - base_url=base_url, - rate_limit=rate_limit - ) - - def _wait_for_rate_limit(self): - """Implement rate limiting""" - now = time.time() - - # Remove old timestamps (older than 1 minute) - self.request_times = [t for t in self.request_times if now - t < 60] - - # If we're at the limit, wait - if len(self.request_times) >= self.rate_limit: - sleep_time = 60 - (now - self.request_times[0]) - if sleep_time > 0: - logger.info( - "rate_limit_wait", - sleep_seconds=sleep_time, - requests_in_window=len(self.request_times) - ) - time.sleep(sleep_time) - self.request_times = [] - - # Add current timestamp - self.request_times.append(time.time()) - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=2, max=10), - retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)), - reraise=True - ) - def _make_request( - self, - method: str, - endpoint: str, - params: Optional[Dict[str, Any]] = None, - headers: Optional[Dict[str, str]] = None - ) -> Dict[str, Any]: - """Make HTTP request with retry logic""" - - # Rate limiting - self._wait_for_rate_limit() - - # Prepare headers - request_headers = headers or {} - if self.api_key: - request_headers['X-API-Key'] = self.api_key - - # Make request - url = f"{self.base_url}/{endpoint.lstrip('/')}" - - logger.debug( - "api_request", - method=method, - url=url, - params=params - ) - - response = self.client.request( - method=method, - url=url, - params=params, - headers=request_headers - ) - - response.raise_for_status() - - logger.debug( - "api_response", - status_code=response.status_code, - response_size=len(response.content) - ) - - return response.json() - - def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]: - """Make GET request""" - return self._make_request("GET", endpoint, params=params) - - def close(self): - """Close the HTTP client""" - self.client.close() - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() -=== ./src/api_clients/hadith_one_client.py === - -=== ./src/api_clients/__init__.py === - -=== ./src/api_clients/hadithapi_client.py === -""" -Client for HadithAPI.com API -""" -from typing import List, Dict, Any, Optional, Generator, Tuple -import structlog -from .base_client import BaseAPIClient -from config.settings import settings - -logger = structlog.get_logger() - - -class HadithAPIClient(BaseAPIClient): - """Client for interacting with hadithapi.com API""" - - def __init__(self, api_key: Optional[str] = None): - super().__init__( - base_url="https://hadithapi.com/api", - api_key=api_key or settings.HADITHAPI_KEY, - rate_limit=30 # Conservative: 30 req/min - ) - - def _add_api_key(self, params: Optional[Dict] = None) -> Dict: - """Add API key to request parameters""" - params = params or {} - params['apiKey'] = self.api_key - return params - - def get_books(self) -> List[Dict[str, Any]]: - """ - Get list of all available books/collections - - Returns: - List of book dictionaries - """ - logger.info("fetching_books") - - params = self._add_api_key() - response = self.get("books", params=params) - - if response.get('status') != 200: - logger.error( - "api_error", - status=response.get('status'), - message=response.get('message') - ) - raise Exception(f"API Error: {response.get('message')}") - - books = response.get('books', []) - - - logger.info( - "books_fetched", - count=len(books) - ) - - return books - - def get_chapters(self, book_slug: str) -> List[Dict[str, Any]]: - """ - Get chapters for a specific book - - Args: - book_slug: Book slug identifier (e.g., 'sahih-bukhari') - - Returns: - List of chapter dictionaries - """ - logger.info( - "fetching_chapters", - book_slug=book_slug - ) - - params = self._add_api_key() - response = self.get(f"{book_slug}/chapters", params=params) - - if response.get('status') != 200: - logger.error( - "api_error", - status=response.get('status'), - message=response.get('message') - ) - raise Exception(f"API Error: {response.get('message')}") - - chapters = response.get('chapters', []) - - - logger.info( - "chapters_fetched", - book_slug=book_slug, - count=len(chapters) - ) - - return chapters - - def get_hadiths_page( - self, - book_id: int, - chapter_id: Optional[int] = None, - page: int = 1, - limit: int = 100 - ) -> Dict[str, Any]: - """ - Get a page of hadiths - - Args: - book_id: Book ID - chapter_id: Optional chapter ID to filter by - page: Page number (1-indexed) - limit: Results per page (max 100) - - Returns: - Response dictionary with hadiths and pagination info - """ - params = self._add_api_key({ - 'book': book_id, - 'page': page, - 'limit': min(limit, 100) # Enforce max limit - }) - - if chapter_id: - params['chapter'] = chapter_id - - logger.debug( - "fetching_hadiths_page", - book_id=book_id, - chapter_id=chapter_id, - page=page, - limit=limit - ) - - response = self.get("hadiths", params=params) - # logger.debug( - # "fetching_hadiths_page####", - # response=response - # ) - if response.get('status') != 200: - logger.error( - "api_error", - status=response.get('status'), - message=response.get('message') - ) - raise Exception(f"API Error: {response.get('message')}") - - return response.get('hadiths', {}) - - def iter_all_hadiths_in_book( - self, - book_id: int, - book_slug: str, - chapter_id: Optional[int] = None, - batch_size: int = 100 - ) -> Generator[Dict[str, Any], None, None]: - """ - Iterator that yields all hadiths in a book, handling pagination automatically - - Args: - book_id: Book ID - book_slug: Book slug for logging - chapter_id: Optional chapter ID to filter by - batch_size: Number of hadiths to fetch per request (max 100) - - Yields: - Individual hadith dictionaries - """ - page = 1 - total_fetched = 0 - - while True: - response_data = self.get_hadiths_page( - book_id=book_slug, - chapter_id=chapter_id, - page=page, - limit=batch_size - ) - - hadiths = response_data.get('data', []) - pagination = response_data.get('pagination', {}) - # logger.info( - # "book_complete", - # book_slug=book_slug, - # hadiths=hadiths, - # pagination=pagination, - # response = response_data - # ) - if not hadiths: - logger.info( - "book_complete", - book_slug=book_slug, - chapter_id=chapter_id, - total_hadiths=total_fetched - ) - break - - for hadith in hadiths: - yield hadith - total_fetched += 1 - - # Log progress - if total_fetched % 500 == 0: - logger.info( - "progress", - book_slug=book_slug, - fetched=total_fetched, - total=response_data.get('total', '?') - ) - - # Check if there are more pages - current_page = response_data.get('current_page', page) - last_page = response_data.get('last_page', 1) - - if current_page >= last_page: - logger.info( - "book_complete", - book_slug=book_slug, - total_hadiths=total_fetched, - total_pages=last_page - ) - break - - page += 1 - - def iter_all_hadiths_in_book_with_chapters( - self, - book_id: int, - book_slug: str, - batch_size: int = 100 - ) -> Generator[Tuple[Dict[str, Any], Optional[Dict[str, Any]]], None, None]: - """ - Iterator that yields all hadiths in a book, organized by chapter - - Args: - book_id: Book ID - book_slug: Book slug - batch_size: Number of hadiths to fetch per request - - Yields: - Tuple of (hadith_dict, chapter_dict or None) - """ - # First, get all chapters - try: - chapters = self.get_chapters(book_slug) - except Exception as e: - logger.warning( - "chapters_fetch_failed", - book_slug=book_slug, - error=str(e), - fallback="fetching_all_hadiths_without_chapter_filter" - ) - # Fallback: fetch all hadiths without chapter filter - for hadith in self.iter_all_hadiths_in_book( - book_id=book_id, - book_slug=book_slug, - batch_size=batch_size - ): - chapter_info = hadith.get('chapter') - yield hadith, chapter_info - return - - logger.info( - "starting_chapter_by_chapter_fetch", - book_slug=book_slug, - total_chapters=len(chapters) - ) - - # Process each chapter - for chapter in chapters: - chapter_id = chapter.get('id') - chapter_number = chapter.get('chapterNumber') - - logger.info( - "fetching_chapter", - book_slug=book_slug, - chapter_id=chapter_id, - chapter_number=chapter_number - ) - - try: - for hadith in self.iter_all_hadiths_in_book( - book_id=book_id, - book_slug=book_slug, - chapter_id=chapter_id, - batch_size=batch_size - ): - yield hadith, chapter - except Exception as e: - logger.error( - "chapter_fetch_failed", - book_slug=book_slug, - chapter_id=chapter_id, - error=str(e) - ) - continue - - def get_book_by_slug(self, book_slug: str) -> Optional[Dict[str, Any]]: - """ - Get book details by slug - - Args: - book_slug: Book slug identifier - - Returns: - Book dictionary or None if not found - """ - books = self.get_books() - for book in books: - if book.get('bookSlug') == book_slug: - return book - return None -=== ./src/api_clients/sunnah_client.py === -""" -Client for Sunnah.com API -""" -from typing import List, Dict, Any, Optional, Generator -import structlog -from .base_client import BaseAPIClient -from config.settings import settings - -logger = structlog.get_logger() - - -class SunnahAPIClient(BaseAPIClient): - """Client for interacting with Sunnah.com API""" - - def __init__(self, api_key: Optional[str] = None): - super().__init__( - base_url=settings.SUNNAH_BASE_URL, - api_key=api_key or settings.SUNNAH_API_KEY, - rate_limit=settings.API_RATE_LIMIT - ) - - def get_collections(self) -> List[Dict[str, Any]]: - """ - Get list of all hadith collections - - Returns: - List of collection dictionaries - """ - logger.info("fetching_collections") - - response = self.get("collections") - collections = response.get("data", []) - - logger.info( - "collections_fetched", - count=len(collections) - ) - - return collections - - def get_collection_details(self, collection_name: str) -> Dict[str, Any]: - """ - Get details for a specific collection - - Args: - collection_name: Collection abbreviation (e.g., 'bukhari') - - Returns: - Collection details dictionary - """ - logger.info( - "fetching_collection_details", - collection=collection_name - ) - - response = self.get(f"collections/{collection_name}") - - return response - - def get_books(self, collection_name: str) -> List[Dict[str, Any]]: - """ - Get all books in a collection - - Args: - collection_name: Collection abbreviation - - Returns: - List of book dictionaries - """ - logger.info( - "fetching_books", - collection=collection_name - ) - - response = self.get(f"collections/{collection_name}/books") - books = response.get("data", []) - - logger.info( - "books_fetched", - collection=collection_name, - count=len(books) - ) - - return books - - def get_hadiths_in_book( - self, - collection_name: str, - book_number: int, - limit: int = 50, - page: int = 1 - ) -> Dict[str, Any]: - """ - Get hadiths in a specific book with pagination - - Args: - collection_name: Collection abbreviation - book_number: Book number - limit: Number of hadiths per page - page: Page number - - Returns: - Response with hadiths and pagination info - """ - logger.debug( - "fetching_hadiths", - collection=collection_name, - book=book_number, - page=page, - limit=limit - ) - - response = self.get( - f"collections/{collection_name}/books/{book_number}/hadiths", - params={"limit": limit, "page": page} - ) - - return response - - def iter_all_hadiths_in_book( - self, - collection_name: str, - book_number: int, - batch_size: int = 50 - ) -> Generator[Dict[str, Any], None, None]: - """ - Iterator that yields all hadiths in a book, handling pagination automatically - - Args: - collection_name: Collection abbreviation - book_number: Book number - batch_size: Number of hadiths to fetch per request - - Yields: - Individual hadith dictionaries - """ - page = 1 - total_fetched = 0 - - while True: - response = self.get_hadiths_in_book( - collection_name=collection_name, - book_number=book_number, - limit=batch_size, - page=page - ) - - hadiths = response.get("data", []) - - if not hadiths: - logger.info( - "book_complete", - collection=collection_name, - book=book_number, - total_hadiths=total_fetched - ) - break - - for hadith in hadiths: - yield hadith - total_fetched += 1 - - # Check if there are more pages - pagination = response.get("pagination", {}) - if page >= pagination.get("total_pages", 1): - break - - page += 1 - - def iter_all_hadiths_in_collection( - self, - collection_name: str, - batch_size: int = 50 - ) -> Generator[tuple[Dict[str, Any], int], None, None]: - """ - Iterator that yields all hadiths in a collection - - Args: - collection_name: Collection abbreviation - batch_size: Number of hadiths to fetch per request - - Yields: - Tuple of (hadith_dict, book_number) - """ - # First, get all books in the collection - books = self.get_books(collection_name) - - logger.info( - "starting_collection_fetch", - collection=collection_name, - total_books=len(books) - ) - - for book in books: - book_number = book.get("bookNumber") - - if not book_number: - logger.warning( - "book_missing_number", - book=book - ) - continue - - logger.info( - "fetching_book", - collection=collection_name, - book=book_number - ) - - try: - for hadith in self.iter_all_hadiths_in_book( - collection_name=collection_name, - book_number=int(book_number), - batch_size=batch_size - ): - yield hadith, int(book_number) - except Exception as e: - logger.error( - "book_fetch_failed", - collection=collection_name, - book=book_number, - error=str(e) - ) - continue - - def get_specific_hadith( - self, - collection_name: str, - book_number: int, - hadith_number: int - ) -> Dict[str, Any]: - """ - Get a specific hadith by its number - - Args: - collection_name: Collection abbreviation - book_number: Book number - hadith_number: Hadith number - - Returns: - Hadith dictionary - """ - response = self.get( - f"hadiths/collection/{collection_name}/{book_number}/{hadith_number}" - ) - - return response.get("data", {}) -=== ./src/main.py === -""" -Main ingestion script for fetching hadiths from Sunnah.com API -""" -import sys -import argparse -from typing import Optional -from uuid import UUID -import structlog -from config.settings import settings -from api_clients.sunnah_client import SunnahAPIClient -from database.repository import HadithRepository -from processors.text_cleaner import ArabicTextProcessor, TextCleaner - -# Configure structured logging -structlog.configure( - processors=[ - structlog.stdlib.filter_by_level, - structlog.stdlib.add_logger_name, - structlog.stdlib.add_log_level, - structlog.processors.TimeStamper(fmt="iso"), - structlog.processors.StackInfoRenderer(), - structlog.processors.format_exc_info, - structlog.processors.JSONRenderer() - ], - wrapper_class=structlog.stdlib.BoundLogger, - context_class=dict, - logger_factory=structlog.stdlib.LoggerFactory(), - cache_logger_on_first_use=True, -) - -logger = structlog.get_logger() - - -class HadithIngestionService: - """Service for ingesting hadiths from Sunnah.com API""" - - def __init__(self): - self.api_client = SunnahAPIClient() - self.repo = HadithRepository() - self.text_processor = ArabicTextProcessor() - self.text_cleaner = TextCleaner() - - def ingest_collection( - self, - collection_abbr: str, - limit: Optional[int] = None - ) -> dict: - """ - Ingest entire collection from Sunnah.com API - - Args: - collection_abbr: Collection abbreviation (e.g., 'bukhari') - limit: Optional limit on number of hadiths to ingest - - Returns: - Statistics dictionary - """ - logger.info( - "ingestion_started", - collection=collection_abbr, - limit=limit - ) - - # Get collection from database - collection = self.repo.get_collection_by_abbreviation(collection_abbr) - if not collection: - logger.error( - "collection_not_found", - collection=collection_abbr - ) - raise ValueError(f"Collection '{collection_abbr}' not found in database") - - collection_id = UUID(collection['id']) - - # Create ingestion job - job_id = self.repo.create_ingestion_job( - job_name=f"ingest_{collection_abbr}", - job_type="api_fetch", - source_name="sunnah.com", - config={"collection": collection_abbr, "limit": limit} - ) - - logger.info( - "job_created", - job_id=str(job_id), - collection=collection_abbr - ) - - stats = { - "processed": 0, - "failed": 0, - "skipped": 0 - } - - try: - # Iterate through all hadiths in collection - for hadith_data, book_number in self.api_client.iter_all_hadiths_in_collection( - collection_name=collection_abbr, - batch_size=50 - ): - # Check limit - if limit and stats["processed"] >= limit: - logger.info("limit_reached", limit=limit) - break - - try: - # Process and store hadith - self._process_and_store_hadith( - collection_id=collection_id, - hadith_data=hadith_data, - book_number=book_number - ) - - stats["processed"] += 1 - - # Update job progress every 100 hadiths - if stats["processed"] % 100 == 0: - self.repo.update_job_progress( - job_id=job_id, - processed=stats["processed"], - failed=stats["failed"], - skipped=stats["skipped"] - ) - - logger.info( - "progress_update", - processed=stats["processed"], - failed=stats["failed"] - ) - - except Exception as e: - stats["failed"] += 1 - logger.error( - "hadith_processing_failed", - error=str(e), - hadith_number=hadith_data.get("hadithNumber") - ) - - self.repo.add_processing_log( - job_id=job_id, - level="ERROR", - message=f"Failed to process hadith: {str(e)}", - details={"hadith_data": hadith_data} - ) - - # Update final job progress - self.repo.update_job_progress( - job_id=job_id, - total=stats["processed"] + stats["failed"] + stats["skipped"], - processed=stats["processed"], - failed=stats["failed"], - skipped=stats["skipped"] - ) - - # Mark job as complete - self.repo.complete_job(job_id=job_id, status="success") - - # Update collection count - self.repo.update_collection_count( - collection_id=collection_id, - count=stats["processed"] - ) - - logger.info( - "ingestion_completed", - collection=collection_abbr, - stats=stats - ) - - return stats - - except Exception as e: - logger.error( - "ingestion_failed", - collection=collection_abbr, - error=str(e) - ) - - self.repo.complete_job( - job_id=job_id, - status="failed", - error_message=str(e) - ) - - raise - - def _process_and_store_hadith( - self, - collection_id: UUID, - hadith_data: dict, - book_number: int - ): - """Process and store a single hadith""" - - # Extract hadith number - hadith_number = hadith_data.get("hadithNumber") - if not hadith_number: - raise ValueError("Missing hadith number") - - # Extract text in multiple languages - hadith_texts = hadith_data.get("hadith", []) - - arabic_text = None - english_text = None - urdu_text = None - grade = None - grade_source = None - chapter_name = None - - for text_entry in hadith_texts: - lang = text_entry.get("lang", "").lower() - body = text_entry.get("body") - - if not body: - continue - - # Clean text - body = self.text_cleaner.clean_text(body) - - if lang == "ar": - arabic_text = body - chapter_name = text_entry.get("chapterTitle") - - # Extract grade from Arabic entry - grades = text_entry.get("grades", []) - if grades: - grade = grades[0].get("grade") - grade_source = grades[0].get("name") - - elif lang == "en": - english_text = body - - # Extract grade from English entry if not found - if not grade: - grades = text_entry.get("grades", []) - if grades: - grade = grades[0].get("grade") - grade_source = grades[0].get("name") - - elif lang == "ur": - urdu_text = body - - if not arabic_text: - raise ValueError("Missing Arabic text") - - # Get or create book - book = self.repo.get_book(collection_id, book_number) - if not book: - # Extract book name from hadith data - book_name_en = None - book_name_ar = None - - for text_entry in hadith_texts: - lang = text_entry.get("lang", "").lower() - book_data = text_entry.get("book", [{}])[0] if text_entry.get("book") else {} - - if lang == "en" and book_data.get("name"): - book_name_en = book_data.get("name") - elif lang == "ar" and book_data.get("name"): - book_name_ar = book_data.get("name") - - book_id = self.repo.upsert_book( - collection_id=collection_id, - book_number=book_number, - name_english=book_name_en, - name_arabic=book_name_ar - ) - else: - book_id = UUID(book["id"]) - - # Store hadith - hadith_id = self.repo.upsert_hadith( - collection_id=collection_id, - book_id=book_id, - hadith_number=int(hadith_number), - arabic_text=arabic_text, - english_text=english_text, - urdu_text=urdu_text, - grade=grade, - grade_source=grade_source, - chapter_name=chapter_name, - source_id=str(hadith_data.get("id", "")), - source_url=hadith_data.get("reference", {}).get("link"), - source_metadata=hadith_data - ) - - logger.debug( - "hadith_stored", - hadith_id=str(hadith_id), - hadith_number=hadith_number, - book_number=book_number - ) - - def close(self): - """Close connections""" - self.api_client.close() - - -def main(): - """Main entry point""" - parser = argparse.ArgumentParser(description="Ingest hadiths from Sunnah.com API") - parser.add_argument( - "collection", - help="Collection abbreviation (e.g., bukhari, muslim)" - ) - parser.add_argument( - "--limit", - type=int, - help="Limit number of hadiths to ingest" - ) - - args = parser.parse_args() - - logger.info( - "script_started", - collection=args.collection, - limit=args.limit - ) - - try: - service = HadithIngestionService() - stats = service.ingest_collection( - collection_abbr=args.collection, - limit=args.limit - ) - - logger.info( - "script_completed", - stats=stats - ) - - print(f"\nIngestion completed successfully!") - print(f"Processed: {stats['processed']}") - print(f"Failed: {stats['failed']}") - print(f"Skipped: {stats['skipped']}") - - service.close() - - return 0 - - except Exception as e: - logger.error( - "script_failed", - error=str(e), - exc_info=True - ) - - print(f"\nIngestion failed: {str(e)}", file=sys.stderr) - return 1 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/hadith-ingestion/run-full-ingestion.sh b/hadith-ingestion/run-full-ingestion.sh index d3136f9..5890cbb 100755 --- a/hadith-ingestion/run-full-ingestion.sh +++ b/hadith-ingestion/run-full-ingestion.sh @@ -10,12 +10,13 @@ echo "=== Starting Full HadithAPI Ingestion ===" BOOKS=( # "sahih-bukhari" # "sahih-muslim" - "abu-dawood" + # "abu-dawood" # "al-tirmidhi" - "ibn-e-majah" - "sunan-nasai" + # "ibn-e-majah" + # "sunan-nasai" # "musnad-ahmad" # "al-silsila-sahiha" + "mishkat" ) for BOOK in "${BOOKS[@]}"; do diff --git a/hadith-ingestion/src/api_clients/hadithapi_client.py b/hadith-ingestion/src/api_clients/hadithapi_client.py index b60cc7a..3d6ef19 100644 --- a/hadith-ingestion/src/api_clients/hadithapi_client.py +++ b/hadith-ingestion/src/api_clients/hadithapi_client.py @@ -265,7 +265,7 @@ class HadithAPIClient(BaseAPIClient): # Process each chapter for chapter in chapters: # logger.warning("Processing chapter", chapter=chapter) - if book_slug in {'sahih-muslim','al-tirmidhi','al-silsila-sahiha','abu-dawood','sunan-nasai','ibn-e-majah'}: + if book_slug in {'sahih-muslim','al-tirmidhi','al-silsila-sahiha','abu-dawood','sunan-nasai','ibn-e-majah','mishkat'}: chapter_id = chapter.get('chapterNumber') else: chapter_id = chapter.get('id')