From b059fcab6ec6642b0d46288a4111cf772b0bd339 Mon Sep 17 00:00:00 2001 From: salahangal Date: Fri, 14 Nov 2025 10:15:41 +0100 Subject: [PATCH] inial repo creation --- hadith-ingestion/.env | 23 + hadith-ingestion/Dockerfile | 30 + hadith-ingestion/README.md | 0 .../argo/workflows/ingest-collection.yaml | 193 +++++++ .../argo/workflows/ingest-hadithapi.yaml | 204 +++++++ hadith-ingestion/build-and-push.sh | 23 + hadith-ingestion/build-hadithapi-ingestion.sh | 20 + hadith-ingestion/config/__init__.py | 0 hadith-ingestion/config/settings.py | 80 +++ hadith-ingestion/create-secrets.sh | 20 + hadith-ingestion/requirements.txt | 42 ++ hadith-ingestion/run-full-ingestion.sh | 49 ++ hadith-ingestion/setup.py | 0 hadith-ingestion/src/__init__.py | 0 hadith-ingestion/src/api_clients/__init__.py | 0 .../src/api_clients/base_client.py | 131 +++++ .../src/api_clients/hadith_one_client.py | 0 .../src/api_clients/hadithapi_client.py | 297 ++++++++++ .../src/api_clients/sunnah_client.py | 247 +++++++++ hadith-ingestion/src/database/__init__.py | 0 hadith-ingestion/src/database/connection.py | 0 hadith-ingestion/src/database/repository.py | 334 ++++++++++++ hadith-ingestion/src/embeddings/__init__.py | 0 hadith-ingestion/src/embeddings/generator.py | 0 hadith-ingestion/src/main.py | 353 ++++++++++++ hadith-ingestion/src/main_hadithapi.py | 516 ++++++++++++++++++ hadith-ingestion/src/processors/__init__.py | 0 .../src/processors/arabic_normalizer.py | 0 .../src/processors/text_cleaner.py | 173 ++++++ hadith-ingestion/src/processors/validator.py | 0 hadith-ingestion/src/utils/__init__.py | 0 hadith-ingestion/src/utils/logger.py | 0 hadith-ingestion/src/utils/retry.py | 0 hadith-ingestion/test-hadithapi-k8s.sh | 44 ++ hadith-ingestion/test-hadithapi-local.sh | 39 ++ hadith-ingestion/tests/__init__.py | 0 hadith-ingestion/tests/test_clients.py | 0 37 files changed, 2818 insertions(+) create mode 100644 hadith-ingestion/.env create mode 100644 hadith-ingestion/Dockerfile create mode 100644 hadith-ingestion/README.md create mode 100644 hadith-ingestion/argo/workflows/ingest-collection.yaml create mode 100644 hadith-ingestion/argo/workflows/ingest-hadithapi.yaml create mode 100755 hadith-ingestion/build-and-push.sh create mode 100755 hadith-ingestion/build-hadithapi-ingestion.sh create mode 100644 hadith-ingestion/config/__init__.py create mode 100644 hadith-ingestion/config/settings.py create mode 100755 hadith-ingestion/create-secrets.sh create mode 100644 hadith-ingestion/requirements.txt create mode 100755 hadith-ingestion/run-full-ingestion.sh create mode 100644 hadith-ingestion/setup.py create mode 100644 hadith-ingestion/src/__init__.py create mode 100644 hadith-ingestion/src/api_clients/__init__.py create mode 100644 hadith-ingestion/src/api_clients/base_client.py create mode 100644 hadith-ingestion/src/api_clients/hadith_one_client.py create mode 100644 hadith-ingestion/src/api_clients/hadithapi_client.py create mode 100644 hadith-ingestion/src/api_clients/sunnah_client.py create mode 100644 hadith-ingestion/src/database/__init__.py create mode 100644 hadith-ingestion/src/database/connection.py create mode 100644 hadith-ingestion/src/database/repository.py create mode 100644 hadith-ingestion/src/embeddings/__init__.py create mode 100644 hadith-ingestion/src/embeddings/generator.py create mode 100644 hadith-ingestion/src/main.py create mode 100644 hadith-ingestion/src/main_hadithapi.py create mode 100644 hadith-ingestion/src/processors/__init__.py create mode 100644 hadith-ingestion/src/processors/arabic_normalizer.py create mode 100644 hadith-ingestion/src/processors/text_cleaner.py create mode 100644 hadith-ingestion/src/processors/validator.py create mode 100644 hadith-ingestion/src/utils/__init__.py create mode 100644 hadith-ingestion/src/utils/logger.py create mode 100644 hadith-ingestion/src/utils/retry.py create mode 100755 hadith-ingestion/test-hadithapi-k8s.sh create mode 100755 hadith-ingestion/test-hadithapi-local.sh create mode 100644 hadith-ingestion/tests/__init__.py create mode 100644 hadith-ingestion/tests/test_clients.py diff --git a/hadith-ingestion/.env b/hadith-ingestion/.env new file mode 100644 index 0000000..db27494 --- /dev/null +++ b/hadith-ingestion/.env @@ -0,0 +1,23 @@ +# Database +DATABASE_HOST=postgres.db.svc.cluster.local +DATABASE_PORT=5432 +DATABASE_NAME=hadith_db +DATABASE_USER=hadith_ingest +DATABASE_PASSWORD=hadith_ingest + +# HadithAPI.com +HADITHAPI_KEY=$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK + +# MinIO +MINIO_ENDPOINT=minio.storage.svc.cluster.local:9000 +MINIO_ACCESS_KEY=minioadmin +MINIO_SECRET_KEY=minioadmin + +# Services +TEI_URL=http://tei.ml.svc.cluster.local +QDRANT_URL=http://qdrant.vector.svc.cluster.local:6333 + +# Settings +LOG_LEVEL=INFO +API_RATE_LIMIT=30 +BATCH_SIZE=100 \ No newline at end of file diff --git a/hadith-ingestion/Dockerfile b/hadith-ingestion/Dockerfile new file mode 100644 index 0000000..8eef601 --- /dev/null +++ b/hadith-ingestion/Dockerfile @@ -0,0 +1,30 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + gcc \ + postgresql-client \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements +COPY requirements.txt . + +# Install Python dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY config/ /app/config/ +COPY src/ /app/src/ + +# Create non-root user +RUN useradd -m -u 1000 hadith && chown -R hadith:hadith /app +USER hadith + +# Set Python path +ENV PYTHONPATH=/app + +# Default command +CMD ["python", "/app/src/main_hadithapi.py"] \ No newline at end of file diff --git a/hadith-ingestion/README.md b/hadith-ingestion/README.md new file mode 100644 index 0000000..e69de29 diff --git a/hadith-ingestion/argo/workflows/ingest-collection.yaml b/hadith-ingestion/argo/workflows/ingest-collection.yaml new file mode 100644 index 0000000..90927f9 --- /dev/null +++ b/hadith-ingestion/argo/workflows/ingest-collection.yaml @@ -0,0 +1,193 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: ingest-hadith-collection- + namespace: argo +spec: + entrypoint: ingest-pipeline + + # Arguments + arguments: + parameters: + - name: collection + value: "bukhari" + - name: limit + value: "0" # 0 means no limit + + # Service account with database access + serviceAccountName: argo-workflow + + # Templates + templates: + + # ======================================== + # Main pipeline + # ======================================== + - name: ingest-pipeline + steps: + - - name: ingest-hadiths + template: ingest + arguments: + parameters: + - name: collection + value: "{{workflow.parameters.collection}}" + - name: limit + value: "{{workflow.parameters.limit}}" + + - - name: generate-embeddings + template: generate-embeddings + arguments: + parameters: + - name: collection + value: "{{workflow.parameters.collection}}" + + - - name: index-qdrant + template: index-qdrant + arguments: + parameters: + - name: collection + value: "{{workflow.parameters.collection}}" + + # ======================================== + # Ingestion step + # ======================================== + - name: ingest + inputs: + parameters: + - name: collection + - name: limit + + container: + image: hadith-ingestion:latest + imagePullPolicy: IfNotPresent + command: [python, /app/src/main.py] + args: + - "{{inputs.parameters.collection}}" + - "--limit={{inputs.parameters.limit}}" + + env: + - name: DATABASE_HOST + value: "postgres.db.svc.cluster.local" + - name: DATABASE_PORT + value: "5432" + - name: DATABASE_NAME + value: "hadith_db" + - name: DATABASE_USER + value: "hadith_ingest" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + + - name: MINIO_ENDPOINT + value: "minio.storage.svc.cluster.local:9000" + - name: MINIO_ACCESS_KEY + valueFrom: + secretKeyRef: + name: minio-secret + key: access-key + - name: MINIO_SECRET_KEY + valueFrom: + secretKeyRef: + name: minio-secret + key: secret-key + + - name: SUNNAH_API_KEY + valueFrom: + secretKeyRef: + name: sunnah-api-secret + key: api-key + + - name: LOG_LEVEL + value: "INFO" + - name: JOB_NAME + value: "{{workflow.name}}" + + resources: + requests: + cpu: 500m + memory: 1Gi + limits: + cpu: 2 + memory: 4Gi + + # ======================================== + # Embedding generation step + # ======================================== + - name: generate-embeddings + inputs: + parameters: + - name: collection + + container: + image: hadith-embeddings:latest + imagePullPolicy: IfNotPresent + command: [python, /app/generate_embeddings.py] + args: + - "--collection={{inputs.parameters.collection}}" + - "--batch-size=32" + + env: + - name: DATABASE_URL + value: "postgresql://hadith_ingest:$(DATABASE_PASSWORD)@postgres.db.svc.cluster.local:5432/hadith_db" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + + - name: TEI_URL + value: "http://tei.ml.svc.cluster.local" + + - name: LOG_LEVEL + value: "INFO" + + resources: + requests: + cpu: 1 + memory: 2Gi + limits: + cpu: 4 + memory: 8Gi + + # ======================================== + # Qdrant indexing step + # ======================================== + - name: index-qdrant + inputs: + parameters: + - name: collection + + container: + image: hadith-qdrant-indexer:latest + imagePullPolicy: IfNotPresent + command: [python, /app/index_qdrant.py] + args: + - "--collection={{inputs.parameters.collection}}" + - "--batch-size=100" + + env: + - name: DATABASE_URL + value: "postgresql://hadith_ingest:$(DATABASE_PASSWORD)@postgres.db.svc.cluster.local:5432/hadith_db" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + + - name: QDRANT_URL + value: "http://qdrant.vector.svc.cluster.local:6333" + - name: QDRANT_COLLECTION + value: "hadith_embeddings" + + - name: LOG_LEVEL + value: "INFO" + + resources: + requests: + cpu: 500m + memory: 1Gi + limits: + cpu: 2 + memory: 4Gi \ No newline at end of file diff --git a/hadith-ingestion/argo/workflows/ingest-hadithapi.yaml b/hadith-ingestion/argo/workflows/ingest-hadithapi.yaml new file mode 100644 index 0000000..2cd6d43 --- /dev/null +++ b/hadith-ingestion/argo/workflows/ingest-hadithapi.yaml @@ -0,0 +1,204 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: ingest-hadithapi- + namespace: argo +spec: + entrypoint: ingest-pipeline + + arguments: + parameters: + - name: book-slug + value: "sahih-bukhari" + - name: limit + value: "0" # 0 means no limit + + serviceAccountName: argo-workflow + + templates: + + # ======================================== + # Main pipeline + # ======================================== + - name: ingest-pipeline + steps: + - - name: ingest-hadiths + template: ingest + arguments: + parameters: + - name: book-slug + value: "{{workflow.parameters.book-slug}}" + - name: limit + value: "{{workflow.parameters.limit}}" + + # ======================================== + # Ingestion step + # ======================================== + - name: ingest + inputs: + parameters: + - name: book-slug + - name: limit + + container: + image: hadith-ingestion:latest + imagePullPolicy: IfNotPresent + command: [python, /app/src/main_hadithapi.py] + args: + - "--book-slug={{inputs.parameters.book-slug}}" + - "--limit={{inputs.parameters.limit}}" + + env: + - name: DATABASE_HOST + value: "postgres.db.svc.cluster.local" + - name: DATABASE_PORT + value: "5432" + - name: DATABASE_NAME + value: "hadith_db" + - name: DATABASE_USER + value: "hadith_ingest" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + + - name: HADITHAPI_KEY + valueFrom: + secretKeyRef: + name: hadithapi-secret + key: api-key + + - name: MINIO_ENDPOINT + value: "minio.storage.svc.cluster.local:9000" + - name: MINIO_ACCESS_KEY + valueFrom: + secretKeyRef: + name: minio-secret + key: access-key + - name: MINIO_SECRET_KEY + valueFrom: + secretKeyRef: + name: minio-secret + key: secret-key + + - name: LOG_LEVEL + value: "INFO" + - name: JOB_NAME + value: "{{workflow.name}}" + - name: API_RATE_LIMIT + value: "30" + + resources: + requests: + cpu: 500m + memory: 1Gi + limits: + cpu: 2 + memory: 4Gi +--- +# Workflow to ingest ALL books +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: ingest-all-hadithapi- + namespace: argo +spec: + entrypoint: ingest-all-books + + serviceAccountName: argo-workflow + + arguments: + parameters: + - name: limit-per-book + value: "0" # 0 means no limit + + templates: + + # ======================================== + # Main pipeline - sequential processing + # ======================================== + - name: ingest-all-books + steps: + # Process each book sequentially to avoid rate limiting + - - name: sahih-bukhari + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "sahih-bukhari" + + - - name: sahih-muslim + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "sahih-muslim" + + - - name: sunan-abu-dawood + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "sunan-abu-dawood" + + - - name: jami-at-tirmidhi + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "jami-at-tirmidhi" + + - - name: sunan-an-nasai + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "sunan-an-nasai" + + - - name: sunan-ibn-e-majah + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "sunan-ibn-e-majah" + + # ======================================== + # Book ingestion template + # ======================================== + - name: ingest-book + inputs: + parameters: + - name: book-slug + + container: + image: hadith-ingestion:latest + imagePullPolicy: IfNotPresent + command: [python, /app/src/main_hadithapi.py] + args: + - "--book-slug={{inputs.parameters.book-slug}}" + - "--limit={{workflow.parameters.limit-per-book}}" + + env: + - name: DATABASE_HOST + value: "postgres.db.svc.cluster.local" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + - name: HADITHAPI_KEY + valueFrom: + secretKeyRef: + name: hadithapi-secret + key: api-key + - name: LOG_LEVEL + value: "INFO" + + resources: + requests: + cpu: 500m + memory: 1Gi + limits: + cpu: 2 + memory: 4Gi \ No newline at end of file diff --git a/hadith-ingestion/build-and-push.sh b/hadith-ingestion/build-and-push.sh new file mode 100755 index 0000000..e1667c7 --- /dev/null +++ b/hadith-ingestion/build-and-push.sh @@ -0,0 +1,23 @@ +#!/bin/bash +# build-and-push.sh + +set -e + +# Configuration +IMAGE_NAME="hadith-ingestion" +TAG="${1:-latest}" +REGISTRY="${DOCKER_REGISTRY:-localhost:5000}" + +echo "Building Docker image: ${IMAGE_NAME}:${TAG}" + +# Build image +docker build -t ${IMAGE_NAME}:${TAG} -f Dockerfile . + +# Tag for registry +docker tag ${IMAGE_NAME}:${TAG} ${REGISTRY}/${IMAGE_NAME}:${TAG} + +# Push to registry +echo "Pushing to registry: ${REGISTRY}" +docker push ${REGISTRY}/${IMAGE_NAME}:${TAG} + +echo "Done!" \ No newline at end of file diff --git a/hadith-ingestion/build-hadithapi-ingestion.sh b/hadith-ingestion/build-hadithapi-ingestion.sh new file mode 100755 index 0000000..73f78d1 --- /dev/null +++ b/hadith-ingestion/build-hadithapi-ingestion.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# build-hadithapi-ingestion.sh + +set -e + +IMAGE_NAME="hadith-ingestion" +TAG="v1.0-hadithapi" + +echo "Building Docker image for HadithAPI.com ingestion..." + +# Build image +docker build -t ${IMAGE_NAME}:${TAG} -f Dockerfile . + +# Tag as latest +docker tag ${IMAGE_NAME}:${TAG} ${IMAGE_NAME}:latest + +# If you have a registry, push +# docker push your-registry/${IMAGE_NAME}:${TAG} + +echo "Build complete: ${IMAGE_NAME}:${TAG}" \ No newline at end of file diff --git a/hadith-ingestion/config/__init__.py b/hadith-ingestion/config/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hadith-ingestion/config/settings.py b/hadith-ingestion/config/settings.py new file mode 100644 index 0000000..e92bafd --- /dev/null +++ b/hadith-ingestion/config/settings.py @@ -0,0 +1,80 @@ +""" +Configuration settings for hadith ingestion service +""" +from pydantic_settings import BaseSettings +from typing import Optional +import os + + +class Settings(BaseSettings): + """Application settings loaded from environment variables""" + + # Database + DATABASE_HOST: str = "postgres.db.svc.cluster.local" + DATABASE_PORT: int = 5432 + DATABASE_NAME: str = "hadith_db" + DATABASE_USER: str = "hadith_ingest" + DATABASE_PASSWORD: str = "hadith_ingest" + + @property + def DATABASE_URL(self) -> str: + return ( + f"postgresql://{self.DATABASE_USER}:{self.DATABASE_PASSWORD}" + f"@{self.DATABASE_HOST}:{self.DATABASE_PORT}/{self.DATABASE_NAME}" + ) + + @property + def ASYNC_DATABASE_URL(self) -> str: + return ( + f"postgresql+asyncpg://{self.DATABASE_USER}:{self.DATABASE_PASSWORD}" + f"@{self.DATABASE_HOST}:{self.DATABASE_PORT}/{self.DATABASE_NAME}" + ) + + # MinIO / S3 + MINIO_ENDPOINT: str = "minio.storage.svc.cluster.local:9000" + MINIO_ACCESS_KEY: str = "minioadmin" + MINIO_SECRET_KEY: str = "minioadmin" + MINIO_BUCKET_RAW: str = "hadith-raw-data" + MINIO_BUCKET_PROCESSED: str = "hadith-processed" + MINIO_SECURE: bool = False + + # APIs + SUNNAH_API_KEY: Optional[str] = None + SUNNAH_BASE_URL: str = "https://api.sunnah.com/v1" + HADITH_ONE_API_KEY: Optional[str] = "$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK" + # HadithAPI.com + HADITHAPI_KEY: str = "$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK" + HADITHAPI_BASE_URL: str = "https://hadithapi.com/api" + # Rate limiting + API_RATE_LIMIT: int = 30 # requests per minute + API_MAX_RETRIES: int = 3 + API_RETRY_DELAY: int = 5 # seconds + + # Processing + BATCH_SIZE: int = 100 + MAX_WORKERS: int = 4 + + # TEI Service (for embeddings) + TEI_URL: str = "http://tei.ml.svc.cluster.local" + TEI_TIMEOUT: int = 30 + + # Qdrant + QDRANT_URL: str = "http://qdrant.db.svc.cluster.local:6333" + QDRANT_COLLECTION: str = "hadith_embeddings" + + # Logging + LOG_LEVEL: str = "INFO" + LOG_FORMAT: str = "json" + + # Job tracking + JOB_NAME: Optional[str] = None + JOB_TYPE: str = "api_fetch" + + class Config: + env_file = ".env" + env_file_encoding = "utf-8" + case_sensitive = True + + +# Global settings instance +settings = Settings() \ No newline at end of file diff --git a/hadith-ingestion/create-secrets.sh b/hadith-ingestion/create-secrets.sh new file mode 100755 index 0000000..b4ea95c --- /dev/null +++ b/hadith-ingestion/create-secrets.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# create-secrets.sh + +# Database secret +kubectl -n ml create secret generic hadith-db-secret \ + --from-literal=password='hadith_ingest' \ + --dry-run=client -o yaml | kubectl apply -f - + +# HadithAPI secret (already public, but for consistency) +kubectl -n ml create secret generic hadithapi-secret \ + --from-literal=api-key='$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK' \ + --dry-run=client -o yaml | kubectl apply -f - + +# MinIO secret +kubectl -n ml create secret generic minio-secret \ + --from-literal=access-key='minioadmin' \ + --from-literal=secret-key='minioadmin' \ + --dry-run=client -o yaml | kubectl apply -f - + +echo "Secrets created successfully" \ No newline at end of file diff --git a/hadith-ingestion/requirements.txt b/hadith-ingestion/requirements.txt new file mode 100644 index 0000000..211a2c9 --- /dev/null +++ b/hadith-ingestion/requirements.txt @@ -0,0 +1,42 @@ +# Core dependencies +python-dotenv==1.0.0 +pydantic==2.5.0 +pydantic-settings==2.1.0 + +# HTTP clients +httpx==0.25.2 +requests==2.31.0 +tenacity==8.2.3 + +# Database +psycopg2-binary==2.9.9 +sqlalchemy==2.0.23 +asyncpg==0.29.0 + +# Data processing +pandas==2.1.4 +numpy==1.26.2 +pyarabic==0.6.15 +arabic-reshaper==3.0.0 + +# Validation +jsonschema==4.20.0 +validators==0.22.0 + +# Logging & Monitoring +structlog==23.2.0 +prometheus-client==0.19.0 + +# Cloud storage +minio==7.2.0 +boto3==1.34.0 + +# Task queue (optional) +celery==5.3.4 +redis==5.0.1 + +# Testing +pytest==7.4.3 +pytest-asyncio==0.21.1 +pytest-cov==4.1.0 +faker==21.0.0 \ No newline at end of file diff --git a/hadith-ingestion/run-full-ingestion.sh b/hadith-ingestion/run-full-ingestion.sh new file mode 100755 index 0000000..2906b7e --- /dev/null +++ b/hadith-ingestion/run-full-ingestion.sh @@ -0,0 +1,49 @@ +#!/bin/bash +# run-full-ingestion.sh + +set -e + +echo "=== Starting Full HadithAPI Ingestion ===" + +# Books to ingest (in order) +BOOKS=( + "sahih-bukhari" + "sahih-muslim" + "sunan-abu-dawood" + "jami-at-tirmidhi" + "sunan-an-nasai" + "sunan-ibn-e-majah" +) + +for BOOK in "${BOOKS[@]}"; do + echo -e "\n=========================================" + echo "Ingesting: $BOOK" + echo "=========================================" + + argo submit -n argo argo/workflows/ingest-hadithapi.yaml \ + --parameter book-slug=$BOOK \ + --parameter limit=0 \ + --wait \ + --log + + echo "$BOOK completed!" + + # Optional: add delay between books + sleep 10 +done + +echo -e "\n=== All Books Ingestion Complete ===" + +# Print summary +kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c " +SELECT + c.name_english, + c.abbreviation, + COUNT(h.id) as hadith_count, + COUNT(DISTINCT b.id) as chapter_count +FROM collections c +LEFT JOIN hadiths h ON c.id = h.collection_id +LEFT JOIN books b ON h.book_id = b.id +GROUP BY c.name_english, c.abbreviation +ORDER BY hadith_count DESC; +" \ No newline at end of file diff --git a/hadith-ingestion/setup.py b/hadith-ingestion/setup.py new file mode 100644 index 0000000..e69de29 diff --git a/hadith-ingestion/src/__init__.py b/hadith-ingestion/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hadith-ingestion/src/api_clients/__init__.py b/hadith-ingestion/src/api_clients/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hadith-ingestion/src/api_clients/base_client.py b/hadith-ingestion/src/api_clients/base_client.py new file mode 100644 index 0000000..63fb725 --- /dev/null +++ b/hadith-ingestion/src/api_clients/base_client.py @@ -0,0 +1,131 @@ +""" +Base API client with retry logic and rate limiting +""" +import httpx +import time +from typing import Optional, Dict, Any +from tenacity import ( + retry, + stop_after_attempt, + wait_exponential, + retry_if_exception_type +) +import structlog +from config.settings import settings + +logger = structlog.get_logger() + + +class BaseAPIClient: + """Base class for API clients with built-in retry and rate limiting""" + + def __init__( + self, + base_url: str, + api_key: Optional[str] = None, + rate_limit: int = 90, + timeout: int = 30 + ): + self.base_url = base_url.rstrip('/') + self.api_key = api_key + self.rate_limit = rate_limit + self.timeout = timeout + + # Rate limiting + self.request_times = [] + self.min_interval = 60.0 / rate_limit # seconds between requests + + # HTTP client + self.client = httpx.Client(timeout=timeout) + + logger.info( + "api_client_initialized", + base_url=base_url, + rate_limit=rate_limit + ) + + def _wait_for_rate_limit(self): + """Implement rate limiting""" + now = time.time() + + # Remove old timestamps (older than 1 minute) + self.request_times = [t for t in self.request_times if now - t < 60] + + # If we're at the limit, wait + if len(self.request_times) >= self.rate_limit: + sleep_time = 60 - (now - self.request_times[0]) + if sleep_time > 0: + logger.info( + "rate_limit_wait", + sleep_seconds=sleep_time, + requests_in_window=len(self.request_times) + ) + time.sleep(sleep_time) + self.request_times = [] + + # Add current timestamp + self.request_times.append(time.time()) + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=2, max=10), + retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)), + reraise=True + ) + def _make_request( + self, + method: str, + endpoint: str, + params: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None + ) -> Dict[str, Any]: + """Make HTTP request with retry logic""" + + # Rate limiting + self._wait_for_rate_limit() + + # Prepare headers + request_headers = headers or {} + if self.api_key: + request_headers['X-API-Key'] = self.api_key + + # Make request + url = f"{self.base_url}/{endpoint.lstrip('/')}" + + logger.debug( + "api_request", + method=method, + url=url, + params=params + ) + + response = self.client.request( + method=method, + url=url, + params=params, + headers=request_headers + ) + + response.raise_for_status() + + logger.debug( + "api_response", + status_code=response.status_code, + response_size=len(response.content) + ) + + return response.json() + + def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]: + """Make GET request""" + return self._make_request("GET", endpoint, params=params) + + def close(self): + """Close the HTTP client""" + self.client.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() \ No newline at end of file diff --git a/hadith-ingestion/src/api_clients/hadith_one_client.py b/hadith-ingestion/src/api_clients/hadith_one_client.py new file mode 100644 index 0000000..e69de29 diff --git a/hadith-ingestion/src/api_clients/hadithapi_client.py b/hadith-ingestion/src/api_clients/hadithapi_client.py new file mode 100644 index 0000000..02dee1f --- /dev/null +++ b/hadith-ingestion/src/api_clients/hadithapi_client.py @@ -0,0 +1,297 @@ +""" +Client for HadithAPI.com API +""" +from typing import List, Dict, Any, Optional, Generator +import structlog +from .base_client import BaseAPIClient +from config.settings import settings + +logger = structlog.get_logger() + + +class HadithAPIClient(BaseAPIClient): + """Client for interacting with hadithapi.com API""" + + def __init__(self, api_key: Optional[str] = None): + super().__init__( + base_url="https://hadithapi.com/api", + api_key=api_key or settings.HADITHAPI_KEY, + rate_limit=30 # Conservative: 30 req/min + ) + + def _add_api_key(self, params: Optional[Dict] = None) -> Dict: + """Add API key to request parameters""" + params = params or {} + params['apiKey'] = self.api_key + return params + + def get_books(self) -> List[Dict[str, Any]]: + """ + Get list of all available books/collections + + Returns: + List of book dictionaries + """ + logger.info("fetching_books") + + params = self._add_api_key() + response = self.get("books", params=params) + + if response.get('status') != 200: + logger.error( + "api_error", + status=response.get('status'), + message=response.get('message') + ) + raise Exception(f"API Error: {response.get('message')}") + + books = response.get('data', []) + + logger.info( + "books_fetched", + count=len(books) + ) + + return books + + def get_chapters(self, book_slug: str) -> List[Dict[str, Any]]: + """ + Get chapters for a specific book + + Args: + book_slug: Book slug identifier (e.g., 'sahih-bukhari') + + Returns: + List of chapter dictionaries + """ + logger.info( + "fetching_chapters", + book_slug=book_slug + ) + + params = self._add_api_key() + response = self.get(f"{book_slug}/chapters", params=params) + + if response.get('status') != 200: + logger.error( + "api_error", + status=response.get('status'), + message=response.get('message') + ) + raise Exception(f"API Error: {response.get('message')}") + + chapters = response.get('data', []) + + logger.info( + "chapters_fetched", + book_slug=book_slug, + count=len(chapters) + ) + + return chapters + + def get_hadiths_page( + self, + book_id: int, + chapter_id: Optional[int] = None, + page: int = 1, + limit: int = 100 + ) -> Dict[str, Any]: + """ + Get a page of hadiths + + Args: + book_id: Book ID + chapter_id: Optional chapter ID to filter by + page: Page number (1-indexed) + limit: Results per page (max 100) + + Returns: + Response dictionary with hadiths and pagination info + """ + params = self._add_api_key({ + 'book': book_id, + 'page': page, + 'limit': min(limit, 100) # Enforce max limit + }) + + if chapter_id: + params['chapter'] = chapter_id + + logger.debug( + "fetching_hadiths_page", + book_id=book_id, + chapter_id=chapter_id, + page=page, + limit=limit + ) + + response = self.get("hadiths", params=params) + + if response.get('status') != 200: + logger.error( + "api_error", + status=response.get('status'), + message=response.get('message') + ) + raise Exception(f"API Error: {response.get('message')}") + + return response.get('data', {}) + + def iter_all_hadiths_in_book( + self, + book_id: int, + book_slug: str, + chapter_id: Optional[int] = None, + batch_size: int = 100 + ) -> Generator[Dict[str, Any], None, None]: + """ + Iterator that yields all hadiths in a book, handling pagination automatically + + Args: + book_id: Book ID + book_slug: Book slug for logging + chapter_id: Optional chapter ID to filter by + batch_size: Number of hadiths to fetch per request (max 100) + + Yields: + Individual hadith dictionaries + """ + page = 1 + total_fetched = 0 + + while True: + response_data = self.get_hadiths_page( + book_id=book_id, + chapter_id=chapter_id, + page=page, + limit=batch_size + ) + + hadiths = response_data.get('hadiths', []) + pagination = response_data.get('pagination', {}) + + if not hadiths: + logger.info( + "book_complete", + book_slug=book_slug, + chapter_id=chapter_id, + total_hadiths=total_fetched + ) + break + + for hadith in hadiths: + yield hadith + total_fetched += 1 + + # Log progress + if total_fetched % 500 == 0: + logger.info( + "progress", + book_slug=book_slug, + fetched=total_fetched, + total=pagination.get('total', '?') + ) + + # Check if there are more pages + current_page = pagination.get('current_page', page) + last_page = pagination.get('last_page', 1) + + if current_page >= last_page: + logger.info( + "book_complete", + book_slug=book_slug, + total_hadiths=total_fetched, + total_pages=last_page + ) + break + + page += 1 + + def iter_all_hadiths_in_book_with_chapters( + self, + book_id: int, + book_slug: str, + batch_size: int = 100 + ) -> Generator[tuple[Dict[str, Any], Optional[Dict[str, Any]]], None, None]: + """ + Iterator that yields all hadiths in a book, organized by chapter + + Args: + book_id: Book ID + book_slug: Book slug + batch_size: Number of hadiths to fetch per request + + Yields: + Tuple of (hadith_dict, chapter_dict or None) + """ + # First, get all chapters + try: + chapters = self.get_chapters(book_slug) + except Exception as e: + logger.warning( + "chapters_fetch_failed", + book_slug=book_slug, + error=str(e), + fallback="fetching_all_hadiths_without_chapter_filter" + ) + # Fallback: fetch all hadiths without chapter filter + for hadith in self.iter_all_hadiths_in_book( + book_id=book_id, + book_slug=book_slug, + batch_size=batch_size + ): + chapter_info = hadith.get('chapter') + yield hadith, chapter_info + return + + logger.info( + "starting_chapter_by_chapter_fetch", + book_slug=book_slug, + total_chapters=len(chapters) + ) + + # Process each chapter + for chapter in chapters: + chapter_id = chapter.get('id') + chapter_number = chapter.get('chapterNumber') + + logger.info( + "fetching_chapter", + book_slug=book_slug, + chapter_id=chapter_id, + chapter_number=chapter_number + ) + + try: + for hadith in self.iter_all_hadiths_in_book( + book_id=book_id, + book_slug=book_slug, + chapter_id=chapter_id, + batch_size=batch_size + ): + yield hadith, chapter + except Exception as e: + logger.error( + "chapter_fetch_failed", + book_slug=book_slug, + chapter_id=chapter_id, + error=str(e) + ) + continue + + def get_book_by_slug(self, book_slug: str) -> Optional[Dict[str, Any]]: + """ + Get book details by slug + + Args: + book_slug: Book slug identifier + + Returns: + Book dictionary or None if not found + """ + books = self.get_books() + for book in books: + if book.get('bookSlug') == book_slug: + return book + return None \ No newline at end of file diff --git a/hadith-ingestion/src/api_clients/sunnah_client.py b/hadith-ingestion/src/api_clients/sunnah_client.py new file mode 100644 index 0000000..d806e56 --- /dev/null +++ b/hadith-ingestion/src/api_clients/sunnah_client.py @@ -0,0 +1,247 @@ +""" +Client for Sunnah.com API +""" +from typing import List, Dict, Any, Optional, Generator +import structlog +from .base_client import BaseAPIClient +from config.settings import settings + +logger = structlog.get_logger() + + +class SunnahAPIClient(BaseAPIClient): + """Client for interacting with Sunnah.com API""" + + def __init__(self, api_key: Optional[str] = None): + super().__init__( + base_url=settings.SUNNAH_BASE_URL, + api_key=api_key or settings.SUNNAH_API_KEY, + rate_limit=settings.API_RATE_LIMIT + ) + + def get_collections(self) -> List[Dict[str, Any]]: + """ + Get list of all hadith collections + + Returns: + List of collection dictionaries + """ + logger.info("fetching_collections") + + response = self.get("collections") + collections = response.get("data", []) + + logger.info( + "collections_fetched", + count=len(collections) + ) + + return collections + + def get_collection_details(self, collection_name: str) -> Dict[str, Any]: + """ + Get details for a specific collection + + Args: + collection_name: Collection abbreviation (e.g., 'bukhari') + + Returns: + Collection details dictionary + """ + logger.info( + "fetching_collection_details", + collection=collection_name + ) + + response = self.get(f"collections/{collection_name}") + + return response + + def get_books(self, collection_name: str) -> List[Dict[str, Any]]: + """ + Get all books in a collection + + Args: + collection_name: Collection abbreviation + + Returns: + List of book dictionaries + """ + logger.info( + "fetching_books", + collection=collection_name + ) + + response = self.get(f"collections/{collection_name}/books") + books = response.get("data", []) + + logger.info( + "books_fetched", + collection=collection_name, + count=len(books) + ) + + return books + + def get_hadiths_in_book( + self, + collection_name: str, + book_number: int, + limit: int = 50, + page: int = 1 + ) -> Dict[str, Any]: + """ + Get hadiths in a specific book with pagination + + Args: + collection_name: Collection abbreviation + book_number: Book number + limit: Number of hadiths per page + page: Page number + + Returns: + Response with hadiths and pagination info + """ + logger.debug( + "fetching_hadiths", + collection=collection_name, + book=book_number, + page=page, + limit=limit + ) + + response = self.get( + f"collections/{collection_name}/books/{book_number}/hadiths", + params={"limit": limit, "page": page} + ) + + return response + + def iter_all_hadiths_in_book( + self, + collection_name: str, + book_number: int, + batch_size: int = 50 + ) -> Generator[Dict[str, Any], None, None]: + """ + Iterator that yields all hadiths in a book, handling pagination automatically + + Args: + collection_name: Collection abbreviation + book_number: Book number + batch_size: Number of hadiths to fetch per request + + Yields: + Individual hadith dictionaries + """ + page = 1 + total_fetched = 0 + + while True: + response = self.get_hadiths_in_book( + collection_name=collection_name, + book_number=book_number, + limit=batch_size, + page=page + ) + + hadiths = response.get("data", []) + + if not hadiths: + logger.info( + "book_complete", + collection=collection_name, + book=book_number, + total_hadiths=total_fetched + ) + break + + for hadith in hadiths: + yield hadith + total_fetched += 1 + + # Check if there are more pages + pagination = response.get("pagination", {}) + if page >= pagination.get("total_pages", 1): + break + + page += 1 + + def iter_all_hadiths_in_collection( + self, + collection_name: str, + batch_size: int = 50 + ) -> Generator[tuple[Dict[str, Any], int], None, None]: + """ + Iterator that yields all hadiths in a collection + + Args: + collection_name: Collection abbreviation + batch_size: Number of hadiths to fetch per request + + Yields: + Tuple of (hadith_dict, book_number) + """ + # First, get all books in the collection + books = self.get_books(collection_name) + + logger.info( + "starting_collection_fetch", + collection=collection_name, + total_books=len(books) + ) + + for book in books: + book_number = book.get("bookNumber") + + if not book_number: + logger.warning( + "book_missing_number", + book=book + ) + continue + + logger.info( + "fetching_book", + collection=collection_name, + book=book_number + ) + + try: + for hadith in self.iter_all_hadiths_in_book( + collection_name=collection_name, + book_number=int(book_number), + batch_size=batch_size + ): + yield hadith, int(book_number) + except Exception as e: + logger.error( + "book_fetch_failed", + collection=collection_name, + book=book_number, + error=str(e) + ) + continue + + def get_specific_hadith( + self, + collection_name: str, + book_number: int, + hadith_number: int + ) -> Dict[str, Any]: + """ + Get a specific hadith by its number + + Args: + collection_name: Collection abbreviation + book_number: Book number + hadith_number: Hadith number + + Returns: + Hadith dictionary + """ + response = self.get( + f"hadiths/collection/{collection_name}/{book_number}/{hadith_number}" + ) + + return response.get("data", {}) \ No newline at end of file diff --git a/hadith-ingestion/src/database/__init__.py b/hadith-ingestion/src/database/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hadith-ingestion/src/database/connection.py b/hadith-ingestion/src/database/connection.py new file mode 100644 index 0000000..e69de29 diff --git a/hadith-ingestion/src/database/repository.py b/hadith-ingestion/src/database/repository.py new file mode 100644 index 0000000..85e8b0b --- /dev/null +++ b/hadith-ingestion/src/database/repository.py @@ -0,0 +1,334 @@ +""" +Database repository for hadith data operations +""" +from typing import List, Dict, Any, Optional +from uuid import UUID +import structlog +from sqlalchemy import create_engine, text, select, insert, update +from sqlalchemy.orm import sessionmaker, Session +from sqlalchemy.exc import IntegrityError +from config.settings import settings + +logger = structlog.get_logger() + + +class HadithRepository: + """Repository for hadith database operations""" + + def __init__(self, database_url: Optional[str] = None): + self.database_url = database_url or settings.DATABASE_URL + self.engine = create_engine(self.database_url, pool_pre_ping=True) + self.SessionLocal = sessionmaker(bind=self.engine) + + def get_session(self) -> Session: + """Get database session""" + return self.SessionLocal() + + # ===== Collections ===== + + def get_collection_by_abbreviation(self, abbr: str) -> Optional[Dict[str, Any]]: + """Get collection by abbreviation""" + with self.get_session() as session: + query = text(""" + SELECT * FROM collections + WHERE abbreviation = :abbr + """) + result = session.execute(query, {"abbr": abbr}).fetchone() + + if result: + return dict(result._mapping) + return None + + def get_all_collections(self) -> List[Dict[str, Any]]: + """Get all collections""" + with self.get_session() as session: + query = text("SELECT * FROM collections ORDER BY name_english") + result = session.execute(query).fetchall() + return [dict(row._mapping) for row in result] + + def update_collection_count(self, collection_id: UUID, count: int): + """Update total hadith count for a collection""" + with self.get_session() as session: + query = text(""" + UPDATE collections + SET total_hadiths = :count, updated_at = NOW() + WHERE id = :id + """) + session.execute(query, {"id": str(collection_id), "count": count}) + session.commit() + + # ===== Books ===== + + def upsert_book( + self, + collection_id: UUID, + book_number: int, + name_english: Optional[str] = None, + name_arabic: Optional[str] = None, + metadata: Optional[Dict] = None + ) -> UUID: + """Insert or update a book""" + with self.get_session() as session: + query = text(""" + INSERT INTO books (collection_id, book_number, name_english, name_arabic, metadata) + VALUES (:collection_id, :book_number, :name_english, :name_arabic, :metadata) + ON CONFLICT (collection_id, book_number) + DO UPDATE SET + name_english = EXCLUDED.name_english, + name_arabic = EXCLUDED.name_arabic, + metadata = EXCLUDED.metadata + RETURNING id + """) + + result = session.execute(query, { + "collection_id": str(collection_id), + "book_number": book_number, + "name_english": name_english, + "name_arabic": name_arabic, + "metadata": metadata or {} + }) + session.commit() + + return UUID(result.fetchone()[0]) + + def get_book(self, collection_id: UUID, book_number: int) -> Optional[Dict[str, Any]]: + """Get book by collection and book number""" + with self.get_session() as session: + query = text(""" + SELECT * FROM books + WHERE collection_id = :collection_id AND book_number = :book_number + """) + result = session.execute(query, { + "collection_id": str(collection_id), + "book_number": book_number + }).fetchone() + + if result: + return dict(result._mapping) + return None + + # ===== Hadiths ===== + + def upsert_hadith( + self, + collection_id: UUID, + hadith_number: int, + arabic_text: str, + book_id: Optional[UUID] = None, + english_text: Optional[str] = None, + urdu_text: Optional[str] = None, + grade: Optional[str] = None, + grade_source: Optional[str] = None, + chapter_name: Optional[str] = None, + source_id: Optional[str] = None, + source_url: Optional[str] = None, + source_metadata: Optional[Dict] = None + ) -> UUID: + """Insert or update a hadith""" + with self.get_session() as session: + query = text(""" + INSERT INTO hadiths ( + collection_id, book_id, hadith_number, + arabic_text, english_text, urdu_text, + grade, grade_source, chapter_name, + source_id, source_url, source_metadata + ) + VALUES ( + :collection_id, :book_id, :hadith_number, + :arabic_text, :english_text, :urdu_text, + :grade, :grade_source, :chapter_name, + :source_id, :source_url, :source_metadata + ) + ON CONFLICT (collection_id, book_id, hadith_number) + DO UPDATE SET + arabic_text = EXCLUDED.arabic_text, + english_text = EXCLUDED.english_text, + urdu_text = EXCLUDED.urdu_text, + grade = EXCLUDED.grade, + grade_source = EXCLUDED.grade_source, + chapter_name = EXCLUDED.chapter_name, + source_url = EXCLUDED.source_url, + source_metadata = EXCLUDED.source_metadata, + updated_at = NOW() + RETURNING id + """) + + result = session.execute(query, { + "collection_id": str(collection_id), + "book_id": str(book_id) if book_id else None, + "hadith_number": hadith_number, + "arabic_text": arabic_text, + "english_text": english_text, + "urdu_text": urdu_text, + "grade": grade, + "grade_source": grade_source, + "chapter_name": chapter_name, + "source_id": source_id, + "source_url": source_url, + "source_metadata": source_metadata or {} + }) + session.commit() + + return UUID(result.fetchone()[0]) + + def get_hadiths_without_embeddings( + self, + limit: int = 100, + collection_id: Optional[UUID] = None + ) -> List[Dict[str, Any]]: + """Get hadiths that need embedding generation""" + with self.get_session() as session: + if collection_id: + query = text(""" + SELECT * FROM hadiths + WHERE embedding_generated = FALSE + AND collection_id = :collection_id + ORDER BY created_at ASC + LIMIT :limit + """) + result = session.execute(query, { + "collection_id": str(collection_id), + "limit": limit + }).fetchall() + else: + query = text(""" + SELECT * FROM hadiths + WHERE embedding_generated = FALSE + ORDER BY created_at ASC + LIMIT :limit + """) + result = session.execute(query, {"limit": limit}).fetchall() + + return [dict(row._mapping) for row in result] + + def mark_embedding_generated(self, hadith_id: UUID, version: str = "v1"): + """Mark hadith as having embedding generated""" + with self.get_session() as session: + query = text(""" + UPDATE hadiths + SET embedding_generated = TRUE, + embedding_version = :version, + updated_at = NOW() + WHERE id = :id + """) + session.execute(query, {"id": str(hadith_id), "version": version}) + session.commit() + + # ===== Ingestion Jobs ===== + + def create_ingestion_job( + self, + job_name: str, + job_type: str, + source_name: str, + config: Optional[Dict] = None + ) -> UUID: + """Create a new ingestion job""" + with self.get_session() as session: + query = text(""" + INSERT INTO ingestion_jobs (job_name, job_type, source_name, config, status, started_at) + VALUES (:job_name, :job_type, :source_name, :config, 'running', NOW()) + RETURNING id + """) + result = session.execute(query, { + "job_name": job_name, + "job_type": job_type, + "source_name": source_name, + "config": config or {} + }) + session.commit() + + return UUID(result.fetchone()[0]) + + def update_job_progress( + self, + job_id: UUID, + total: Optional[int] = None, + processed: Optional[int] = None, + failed: Optional[int] = None, + skipped: Optional[int] = None + ): + """Update job progress counters""" + with self.get_session() as session: + updates = [] + params = {"job_id": str(job_id)} + + if total is not None: + updates.append("total_records = :total") + params["total"] = total + if processed is not None: + updates.append("processed_records = :processed") + params["processed"] = processed + if failed is not None: + updates.append("failed_records = :failed") + params["failed"] = failed + if skipped is not None: + updates.append("skipped_records = :skipped") + params["skipped"] = skipped + + if updates: + query_str = f""" + UPDATE ingestion_jobs + SET {', '.join(updates)} + WHERE id = :job_id + """ + session.execute(text(query_str), params) + session.commit() + + def complete_job( + self, + job_id: UUID, + status: str = "success", + error_message: Optional[str] = None + ): + """Mark job as completed""" + with self.get_session() as session: + query = text(""" + UPDATE ingestion_jobs + SET status = :status, + completed_at = NOW(), + duration_seconds = EXTRACT(EPOCH FROM (NOW() - started_at)), + error_message = :error_message + WHERE id = :job_id + """) + session.execute(query, { + "job_id": str(job_id), + "status": status, + "error_message": error_message + }) + session.commit() + + def add_processing_log( + self, + job_id: UUID, + level: str, + message: str, + details: Optional[Dict] = None + ): + """Add a processing log entry""" + with self.get_session() as session: + query = text(""" + INSERT INTO processing_logs (job_id, log_level, message, details) + VALUES (:job_id, :level, :message, :details) + """) + session.execute(query, { + "job_id": str(job_id), + "level": level, + "message": message, + "details": details or {} + }) + session.commit() + + # ===== Statistics ===== + + def get_collection_stats(self, collection_id: UUID) -> Dict[str, Any]: + """Get statistics for a collection""" + with self.get_session() as session: + query = text(""" + SELECT * FROM get_collection_statistics(:collection_id) + """) + result = session.execute(query, {"collection_id": str(collection_id)}).fetchone() + + if result: + return dict(result._mapping) + return {} \ No newline at end of file diff --git a/hadith-ingestion/src/embeddings/__init__.py b/hadith-ingestion/src/embeddings/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hadith-ingestion/src/embeddings/generator.py b/hadith-ingestion/src/embeddings/generator.py new file mode 100644 index 0000000..e69de29 diff --git a/hadith-ingestion/src/main.py b/hadith-ingestion/src/main.py new file mode 100644 index 0000000..1448f3a --- /dev/null +++ b/hadith-ingestion/src/main.py @@ -0,0 +1,353 @@ +""" +Main ingestion script for fetching hadiths from Sunnah.com API +""" +import sys +import argparse +from typing import Optional +from uuid import UUID +import structlog +from config.settings import settings +from api_clients.sunnah_client import SunnahAPIClient +from database.repository import HadithRepository +from processors.text_cleaner import ArabicTextProcessor, TextCleaner + +# Configure structured logging +structlog.configure( + processors=[ + structlog.stdlib.filter_by_level, + structlog.stdlib.add_logger_name, + structlog.stdlib.add_log_level, + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.JSONRenderer() + ], + wrapper_class=structlog.stdlib.BoundLogger, + context_class=dict, + logger_factory=structlog.stdlib.LoggerFactory(), + cache_logger_on_first_use=True, +) + +logger = structlog.get_logger() + + +class HadithIngestionService: + """Service for ingesting hadiths from Sunnah.com API""" + + def __init__(self): + self.api_client = SunnahAPIClient() + self.repo = HadithRepository() + self.text_processor = ArabicTextProcessor() + self.text_cleaner = TextCleaner() + + def ingest_collection( + self, + collection_abbr: str, + limit: Optional[int] = None + ) -> dict: + """ + Ingest entire collection from Sunnah.com API + + Args: + collection_abbr: Collection abbreviation (e.g., 'bukhari') + limit: Optional limit on number of hadiths to ingest + + Returns: + Statistics dictionary + """ + logger.info( + "ingestion_started", + collection=collection_abbr, + limit=limit + ) + + # Get collection from database + collection = self.repo.get_collection_by_abbreviation(collection_abbr) + if not collection: + logger.error( + "collection_not_found", + collection=collection_abbr + ) + raise ValueError(f"Collection '{collection_abbr}' not found in database") + + collection_id = UUID(collection['id']) + + # Create ingestion job + job_id = self.repo.create_ingestion_job( + job_name=f"ingest_{collection_abbr}", + job_type="api_fetch", + source_name="sunnah.com", + config={"collection": collection_abbr, "limit": limit} + ) + + logger.info( + "job_created", + job_id=str(job_id), + collection=collection_abbr + ) + + stats = { + "processed": 0, + "failed": 0, + "skipped": 0 + } + + try: + # Iterate through all hadiths in collection + for hadith_data, book_number in self.api_client.iter_all_hadiths_in_collection( + collection_name=collection_abbr, + batch_size=50 + ): + # Check limit + if limit and stats["processed"] >= limit: + logger.info("limit_reached", limit=limit) + break + + try: + # Process and store hadith + self._process_and_store_hadith( + collection_id=collection_id, + hadith_data=hadith_data, + book_number=book_number + ) + + stats["processed"] += 1 + + # Update job progress every 100 hadiths + if stats["processed"] % 100 == 0: + self.repo.update_job_progress( + job_id=job_id, + processed=stats["processed"], + failed=stats["failed"], + skipped=stats["skipped"] + ) + + logger.info( + "progress_update", + processed=stats["processed"], + failed=stats["failed"] + ) + + except Exception as e: + stats["failed"] += 1 + logger.error( + "hadith_processing_failed", + error=str(e), + hadith_number=hadith_data.get("hadithNumber") + ) + + self.repo.add_processing_log( + job_id=job_id, + level="ERROR", + message=f"Failed to process hadith: {str(e)}", + details={"hadith_data": hadith_data} + ) + + # Update final job progress + self.repo.update_job_progress( + job_id=job_id, + total=stats["processed"] + stats["failed"] + stats["skipped"], + processed=stats["processed"], + failed=stats["failed"], + skipped=stats["skipped"] + ) + + # Mark job as complete + self.repo.complete_job(job_id=job_id, status="success") + + # Update collection count + self.repo.update_collection_count( + collection_id=collection_id, + count=stats["processed"] + ) + + logger.info( + "ingestion_completed", + collection=collection_abbr, + stats=stats + ) + + return stats + + except Exception as e: + logger.error( + "ingestion_failed", + collection=collection_abbr, + error=str(e) + ) + + self.repo.complete_job( + job_id=job_id, + status="failed", + error_message=str(e) + ) + + raise + + def _process_and_store_hadith( + self, + collection_id: UUID, + hadith_data: dict, + book_number: int + ): + """Process and store a single hadith""" + + # Extract hadith number + hadith_number = hadith_data.get("hadithNumber") + if not hadith_number: + raise ValueError("Missing hadith number") + + # Extract text in multiple languages + hadith_texts = hadith_data.get("hadith", []) + + arabic_text = None + english_text = None + urdu_text = None + grade = None + grade_source = None + chapter_name = None + + for text_entry in hadith_texts: + lang = text_entry.get("lang", "").lower() + body = text_entry.get("body") + + if not body: + continue + + # Clean text + body = self.text_cleaner.clean_text(body) + + if lang == "ar": + arabic_text = body + chapter_name = text_entry.get("chapterTitle") + + # Extract grade from Arabic entry + grades = text_entry.get("grades", []) + if grades: + grade = grades[0].get("grade") + grade_source = grades[0].get("name") + + elif lang == "en": + english_text = body + + # Extract grade from English entry if not found + if not grade: + grades = text_entry.get("grades", []) + if grades: + grade = grades[0].get("grade") + grade_source = grades[0].get("name") + + elif lang == "ur": + urdu_text = body + + if not arabic_text: + raise ValueError("Missing Arabic text") + + # Get or create book + book = self.repo.get_book(collection_id, book_number) + if not book: + # Extract book name from hadith data + book_name_en = None + book_name_ar = None + + for text_entry in hadith_texts: + lang = text_entry.get("lang", "").lower() + book_data = text_entry.get("book", [{}])[0] if text_entry.get("book") else {} + + if lang == "en" and book_data.get("name"): + book_name_en = book_data.get("name") + elif lang == "ar" and book_data.get("name"): + book_name_ar = book_data.get("name") + + book_id = self.repo.upsert_book( + collection_id=collection_id, + book_number=book_number, + name_english=book_name_en, + name_arabic=book_name_ar + ) + else: + book_id = UUID(book["id"]) + + # Store hadith + hadith_id = self.repo.upsert_hadith( + collection_id=collection_id, + book_id=book_id, + hadith_number=int(hadith_number), + arabic_text=arabic_text, + english_text=english_text, + urdu_text=urdu_text, + grade=grade, + grade_source=grade_source, + chapter_name=chapter_name, + source_id=str(hadith_data.get("id", "")), + source_url=hadith_data.get("reference", {}).get("link"), + source_metadata=hadith_data + ) + + logger.debug( + "hadith_stored", + hadith_id=str(hadith_id), + hadith_number=hadith_number, + book_number=book_number + ) + + def close(self): + """Close connections""" + self.api_client.close() + + +def main(): + """Main entry point""" + parser = argparse.ArgumentParser(description="Ingest hadiths from Sunnah.com API") + parser.add_argument( + "collection", + help="Collection abbreviation (e.g., bukhari, muslim)" + ) + parser.add_argument( + "--limit", + type=int, + help="Limit number of hadiths to ingest" + ) + + args = parser.parse_args() + + logger.info( + "script_started", + collection=args.collection, + limit=args.limit + ) + + try: + service = HadithIngestionService() + stats = service.ingest_collection( + collection_abbr=args.collection, + limit=args.limit + ) + + logger.info( + "script_completed", + stats=stats + ) + + print(f"\nIngestion completed successfully!") + print(f"Processed: {stats['processed']}") + print(f"Failed: {stats['failed']}") + print(f"Skipped: {stats['skipped']}") + + service.close() + + return 0 + + except Exception as e: + logger.error( + "script_failed", + error=str(e), + exc_info=True + ) + + print(f"\nIngestion failed: {str(e)}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/hadith-ingestion/src/main_hadithapi.py b/hadith-ingestion/src/main_hadithapi.py new file mode 100644 index 0000000..6cd61e4 --- /dev/null +++ b/hadith-ingestion/src/main_hadithapi.py @@ -0,0 +1,516 @@ +""" +Main ingestion script for fetching hadiths from HadithAPI.com +""" +import sys +import argparse +from typing import Optional, Dict, Any +from uuid import UUID +import structlog +from config.settings import settings +from api_clients.hadithapi_client import HadithAPIClient +from database.repository import HadithRepository +from processors.text_cleaner import ArabicTextProcessor, TextCleaner + +# Configure structured logging +structlog.configure( + processors=[ + structlog.stdlib.filter_by_level, + structlog.stdlib.add_logger_name, + structlog.stdlib.add_log_level, + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.JSONRenderer() + ], + wrapper_class=structlog.stdlib.BoundLogger, + context_class=dict, + logger_factory=structlog.stdlib.LoggerFactory(), + cache_logger_on_first_use=True, +) + +logger = structlog.get_logger() + + +# Book slug to collection abbreviation mapping +BOOK_SLUG_MAPPING = { + 'sahih-bukhari': 'bukhari', + 'sahih-muslim': 'muslim', + 'sunan-abu-dawood': 'abudawud', + 'jami-at-tirmidhi': 'tirmidhi', + 'sunan-an-nasai': 'nasai', + 'sunan-ibn-e-majah': 'ibnmajah', + 'muwatta-imam-malik': 'malik', + 'musnad-ahmad': 'ahmad', + 'sunan-ad-darimi': 'darimi', + 'mishkat-al-masabih': 'mishkat' +} + + +class HadithAPIIngestionService: + """Service for ingesting hadiths from HadithAPI.com""" + + def __init__(self): + self.api_client = HadithAPIClient() + self.repo = HadithRepository() + self.text_processor = ArabicTextProcessor() + self.text_cleaner = TextCleaner() + + def sync_books_from_api(self) -> Dict[str, Any]: + """ + Sync book metadata from API to database + + Returns: + Dictionary mapping book_slug -> collection_id + """ + logger.info("syncing_books_from_api") + + # Get books from API + api_books = self.api_client.get_books() + + book_mapping = {} + + for api_book in api_books: + book_slug = api_book.get('bookSlug') + + # Map to our collection abbreviation + collection_abbr = BOOK_SLUG_MAPPING.get(book_slug) + + if not collection_abbr: + logger.warning( + "unmapped_book", + book_slug=book_slug, + book_name=api_book.get('bookName') + ) + continue + + # Get or verify collection exists in database + collection = self.repo.get_collection_by_abbreviation(collection_abbr) + + if not collection: + logger.warning( + "collection_not_in_db", + abbreviation=collection_abbr, + book_slug=book_slug + ) + continue + + collection_id = UUID(collection['id']) + book_mapping[book_slug] = { + 'collection_id': collection_id, + 'book_id': api_book.get('id'), + 'book_name': api_book.get('bookName'), + 'hadiths_count': api_book.get('hadiths_count'), + 'chapters_count': api_book.get('chapters_count') + } + + logger.info( + "book_mapped", + book_slug=book_slug, + collection_abbr=collection_abbr, + hadiths_count=api_book.get('hadiths_count') + ) + + logger.info( + "books_synced", + total_books=len(book_mapping) + ) + + return book_mapping + + def ingest_collection( + self, + book_slug: str, + limit: Optional[int] = None + ) -> dict: + """ + Ingest entire collection from HadithAPI.com + + Args: + book_slug: Book slug identifier (e.g., 'sahih-bukhari') + limit: Optional limit on number of hadiths to ingest + + Returns: + Statistics dictionary + """ + logger.info( + "ingestion_started", + book_slug=book_slug, + limit=limit + ) + + # Get book mapping + book_mapping = self.sync_books_from_api() + + if book_slug not in book_mapping: + logger.error( + "book_not_mapped", + book_slug=book_slug, + available_books=list(book_mapping.keys()) + ) + raise ValueError(f"Book '{book_slug}' not found or not mapped") + + book_info = book_mapping[book_slug] + collection_id = book_info['collection_id'] + book_id = book_info['book_id'] + + # Create ingestion job + job_id = self.repo.create_ingestion_job( + job_name=f"ingest_{book_slug}", + job_type="api_fetch", + source_name="hadithapi.com", + config={ + "book_slug": book_slug, + "book_id": book_id, + "limit": limit + } + ) + + logger.info( + "job_created", + job_id=str(job_id), + book_slug=book_slug, + expected_count=book_info.get('hadiths_count') + ) + + stats = { + "processed": 0, + "failed": 0, + "skipped": 0 + } + + try: + # Iterate through all hadiths in book + for hadith_data, chapter_data in self.api_client.iter_all_hadiths_in_book_with_chapters( + book_id=book_id, + book_slug=book_slug, + batch_size=100 + ): + # Check limit + if limit and stats["processed"] >= limit: + logger.info("limit_reached", limit=limit) + break + + try: + # Process and store hadith + self._process_and_store_hadith( + collection_id=collection_id, + hadith_data=hadith_data, + chapter_data=chapter_data + ) + + stats["processed"] += 1 + + # Update job progress every 100 hadiths + if stats["processed"] % 100 == 0: + self.repo.update_job_progress( + job_id=job_id, + processed=stats["processed"], + failed=stats["failed"], + skipped=stats["skipped"] + ) + + logger.info( + "progress_update", + book_slug=book_slug, + processed=stats["processed"], + failed=stats["failed"], + percentage=round( + (stats["processed"] / int(book_info.get('hadiths_count', 1))) * 100, + 2 + ) if book_info.get('hadiths_count') else 0 + ) + + except Exception as e: + stats["failed"] += 1 + logger.error( + "hadith_processing_failed", + error=str(e), + hadith_number=hadith_data.get("hadithNumber"), + hadith_id=hadith_data.get("id") + ) + + self.repo.add_processing_log( + job_id=job_id, + level="ERROR", + message=f"Failed to process hadith: {str(e)}", + details={"hadith_data": hadith_data} + ) + + # Update final job progress + self.repo.update_job_progress( + job_id=job_id, + total=stats["processed"] + stats["failed"] + stats["skipped"], + processed=stats["processed"], + failed=stats["failed"], + skipped=stats["skipped"] + ) + + # Mark job as complete + self.repo.complete_job(job_id=job_id, status="success") + + # Update collection count + self.repo.update_collection_count( + collection_id=collection_id, + count=stats["processed"] + ) + + logger.info( + "ingestion_completed", + book_slug=book_slug, + stats=stats + ) + + return stats + + except Exception as e: + logger.error( + "ingestion_failed", + book_slug=book_slug, + error=str(e), + exc_info=True + ) + + self.repo.complete_job( + job_id=job_id, + status="failed", + error_message=str(e) + ) + + raise + + def _process_and_store_hadith( + self, + collection_id: UUID, + hadith_data: dict, + chapter_data: Optional[dict] + ): + """Process and store a single hadith""" + + # Extract hadith number + hadith_number = hadith_data.get("hadithNumber") + if not hadith_number: + raise ValueError("Missing hadith number") + + # Convert to integer + try: + hadith_number = int(hadith_number) + except (ValueError, TypeError): + raise ValueError(f"Invalid hadith number: {hadith_number}") + + # Extract text in multiple languages + arabic_text = hadith_data.get("hadithArabic") + english_text = hadith_data.get("hadithEnglish") + urdu_text = hadith_data.get("hadithUrdu") + + if not arabic_text: + raise ValueError("Missing Arabic text") + + # Clean texts + arabic_text = self.text_cleaner.clean_text(arabic_text) + if english_text: + english_text = self.text_cleaner.clean_text(english_text) + if urdu_text: + urdu_text = self.text_cleaner.clean_text(urdu_text) + + # Extract grade/status + grade = hadith_data.get("status") + + # Get or create chapter (book in our schema) + book_id = None + chapter_name = None + + if chapter_data: + chapter_id = chapter_data.get('id') + chapter_number = chapter_data.get('chapterNumber') + chapter_name_en = chapter_data.get('chapterEnglish') + chapter_name_ar = chapter_data.get('chapterArabic') + chapter_name = chapter_name_en + + if chapter_number: + try: + chapter_number = int(chapter_number) + except (ValueError, TypeError): + chapter_number = chapter_id # Fallback to ID + + # Get or create book (chapter in HadithAPI = book in our schema) + existing_book = self.repo.get_book(collection_id, chapter_number) + + if not existing_book: + book_id = self.repo.upsert_book( + collection_id=collection_id, + book_number=chapter_number, + name_english=chapter_name_en, + name_arabic=chapter_name_ar, + metadata=chapter_data + ) + else: + book_id = UUID(existing_book['id']) + + # Build source metadata + source_metadata = { + 'api_id': hadith_data.get('id'), + 'englishNarrator': hadith_data.get('englishNarrator'), + 'urduNarrator': hadith_data.get('urduNarrator'), + 'bookSlug': hadith_data.get('bookSlug'), + 'chapterId': hadith_data.get('chapterId'), + 'chapter': chapter_data + } + + # Store hadith + hadith_id = self.repo.upsert_hadith( + collection_id=collection_id, + book_id=book_id, + hadith_number=hadith_number, + arabic_text=arabic_text, + english_text=english_text, + urdu_text=urdu_text, + grade=grade, + grade_source="hadithapi.com", + chapter_name=chapter_name, + source_id=str(hadith_data.get('id', '')), + source_url=f"https://hadithapi.com/hadith/{hadith_data.get('id')}", + source_metadata=source_metadata + ) + + logger.debug( + "hadith_stored", + hadith_id=str(hadith_id), + hadith_number=hadith_number, + chapter_id=chapter_data.get('id') if chapter_data else None + ) + + def ingest_all_books(self, limit_per_book: Optional[int] = None) -> Dict[str, dict]: + """ + Ingest all available books + + Args: + limit_per_book: Optional limit per book + + Returns: + Dictionary of book_slug -> stats + """ + logger.info("ingesting_all_books", limit_per_book=limit_per_book) + + book_mapping = self.sync_books_from_api() + results = {} + + for book_slug in book_mapping.keys(): + logger.info("starting_book", book_slug=book_slug) + + try: + stats = self.ingest_collection( + book_slug=book_slug, + limit=limit_per_book + ) + results[book_slug] = {"status": "success", "stats": stats} + + except Exception as e: + logger.error( + "book_ingestion_failed", + book_slug=book_slug, + error=str(e) + ) + results[book_slug] = {"status": "failed", "error": str(e)} + + logger.info("all_books_completed", results=results) + return results + + def close(self): + """Close connections""" + self.api_client.close() + + +def main(): + """Main entry point""" + parser = argparse.ArgumentParser( + description="Ingest hadiths from HadithAPI.com" + ) + parser.add_argument( + "--book-slug", + help="Book slug (e.g., sahih-bukhari). If not provided, ingests all books." + ) + parser.add_argument( + "--limit", + type=int, + help="Limit number of hadiths to ingest per book" + ) + parser.add_argument( + "--list-books", + action="store_true", + help="List available books and exit" + ) + + args = parser.parse_args() + + try: + service = HadithAPIIngestionService() + + # List books mode + if args.list_books: + logger.info("listing_available_books") + book_mapping = service.sync_books_from_api() + + print("\n=== Available Books ===\n") + for book_slug, info in book_mapping.items(): + print(f"Book Slug: {book_slug}") + print(f" Name: {info['book_name']}") + print(f" Hadiths: {info['hadiths_count']}") + print(f" Chapters: {info['chapters_count']}") + print() + + service.close() + return 0 + + # Ingest mode + if args.book_slug: + logger.info( + "script_started", + book_slug=args.book_slug, + limit=args.limit + ) + + stats = service.ingest_collection( + book_slug=args.book_slug, + limit=args.limit + ) + + logger.info("script_completed", stats=stats) + + print(f"\n=== Ingestion completed for {args.book_slug} ===") + print(f"Processed: {stats['processed']}") + print(f"Failed: {stats['failed']}") + print(f"Skipped: {stats['skipped']}") + + else: + # Ingest all books + logger.info("script_started_all_books", limit_per_book=args.limit) + + results = service.ingest_all_books(limit_per_book=args.limit) + + print("\n=== All Books Ingestion Summary ===\n") + for book_slug, result in results.items(): + print(f"{book_slug}: {result['status']}") + if result['status'] == 'success': + stats = result['stats'] + print(f" Processed: {stats['processed']}") + print(f" Failed: {stats['failed']}") + else: + print(f" Error: {result['error']}") + print() + + service.close() + return 0 + + except Exception as e: + logger.error( + "script_failed", + error=str(e), + exc_info=True + ) + + print(f"\nIngestion failed: {str(e)}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/hadith-ingestion/src/processors/__init__.py b/hadith-ingestion/src/processors/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hadith-ingestion/src/processors/arabic_normalizer.py b/hadith-ingestion/src/processors/arabic_normalizer.py new file mode 100644 index 0000000..e69de29 diff --git a/hadith-ingestion/src/processors/text_cleaner.py b/hadith-ingestion/src/processors/text_cleaner.py new file mode 100644 index 0000000..fa3cf66 --- /dev/null +++ b/hadith-ingestion/src/processors/text_cleaner.py @@ -0,0 +1,173 @@ +""" +Text cleaning and normalization utilities +""" +import re +from typing import Optional +import unicodedata +import structlog + +logger = structlog.get_logger() + + +class ArabicTextProcessor: + """Process and normalize Arabic text""" + + # Arabic diacritics to remove + DIACRITICS = re.compile( + r'[\u064B-\u065F\u0670\u06D6-\u06DC\u06DF-\u06E8\u06EA-\u06ED]' + ) + + # Tatweel (elongation character) + TATWEEL = '\u0640' + + # Normalize Arabic letters + ALEF_VARIANTS = re.compile(r'[إأآا]') + ALEF_MAKSURA = 'ى' + YAA = 'ي' + TAA_MARBUTA = 'ة' + HAA = 'ه' + + @classmethod + def remove_diacritics(cls, text: str) -> str: + """Remove Arabic diacritics (tashkeel)""" + if not text: + return text + return cls.DIACRITICS.sub('', text) + + @classmethod + def remove_tatweel(cls, text: str) -> str: + """Remove tatweel (elongation) character""" + if not text: + return text + return text.replace(cls.TATWEEL, '') + + @classmethod + def normalize_alef(cls, text: str) -> str: + """Normalize all Alef variants to bare Alef""" + if not text: + return text + return cls.ALEF_VARIANTS.sub('ا', text) + + @classmethod + def normalize_yaa(cls, text: str) -> str: + """Normalize Alef Maksura to Yaa""" + if not text: + return text + return text.replace(cls.ALEF_MAKSURA, cls.YAA) + + @classmethod + def normalize_taa_marbuta(cls, text: str) -> str: + """Normalize Taa Marbuta to Haa""" + if not text: + return text + return text.replace(cls.TAA_MARBUTA, cls.HAA) + + @classmethod + def normalize_whitespace(cls, text: str) -> str: + """Normalize whitespace""" + if not text: + return text + # Replace multiple spaces with single space + text = re.sub(r'\s+', ' ', text) + # Trim + return text.strip() + + @classmethod + def normalize_full(cls, text: str) -> str: + """ + Apply full normalization: + - Remove diacritics + - Remove tatweel + - Normalize Alef variants + - Normalize Yaa + - Normalize Taa Marbuta + - Normalize whitespace + """ + if not text: + return text + + text = cls.remove_diacritics(text) + text = cls.remove_tatweel(text) + text = cls.normalize_alef(text) + text = cls.normalize_yaa(text) + text = cls.normalize_taa_marbuta(text) + text = cls.normalize_whitespace(text) + + return text + + @classmethod + def extract_sanad_matn(cls, text: str) -> tuple[Optional[str], Optional[str]]: + """ + Attempt to extract sanad (chain) and matn (text) from hadith + + Common patterns: + - حدثنا ... قال ... (sanad ends before reported speech) + - Simple heuristic: Split on first occurrence of قال or أن + + Returns: + Tuple of (sanad, matn) or (None, None) if cannot split + """ + if not text: + return None, None + + # Look for common sanad-matn separators + separators = [ + r'قال\s*رسول\s*الله', # "The Messenger of Allah said" + r'قال\s*النبي', # "The Prophet said" + r'عن\s*النبي', # "From the Prophet" + r'أن\s*رسول\s*الله', # "That the Messenger of Allah" + ] + + for pattern in separators: + match = re.search(pattern, text, re.IGNORECASE) + if match: + split_pos = match.start() + sanad = text[:split_pos].strip() + matn = text[split_pos:].strip() + + logger.debug( + "sanad_matn_extracted", + sanad_length=len(sanad), + matn_length=len(matn) + ) + + return sanad, matn + + # Could not split + logger.debug("sanad_matn_extraction_failed") + return None, None + + +class TextCleaner: + """General text cleaning utilities""" + + @staticmethod + def clean_html(text: str) -> str: + """Remove HTML tags""" + if not text: + return text + return re.sub(r'<[^>]+>', '', text) + + @staticmethod + def normalize_unicode(text: str) -> str: + """Normalize Unicode (NFC normalization)""" + if not text: + return text + return unicodedata.normalize('NFC', text) + + @staticmethod + def clean_text(text: str) -> str: + """Apply general cleaning""" + if not text: + return text + + # Remove HTML + text = TextCleaner.clean_html(text) + + # Normalize Unicode + text = TextCleaner.normalize_unicode(text) + + # Normalize whitespace + text = ArabicTextProcessor.normalize_whitespace(text) + + return text \ No newline at end of file diff --git a/hadith-ingestion/src/processors/validator.py b/hadith-ingestion/src/processors/validator.py new file mode 100644 index 0000000..e69de29 diff --git a/hadith-ingestion/src/utils/__init__.py b/hadith-ingestion/src/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hadith-ingestion/src/utils/logger.py b/hadith-ingestion/src/utils/logger.py new file mode 100644 index 0000000..e69de29 diff --git a/hadith-ingestion/src/utils/retry.py b/hadith-ingestion/src/utils/retry.py new file mode 100644 index 0000000..e69de29 diff --git a/hadith-ingestion/test-hadithapi-k8s.sh b/hadith-ingestion/test-hadithapi-k8s.sh new file mode 100755 index 0000000..5db8a63 --- /dev/null +++ b/hadith-ingestion/test-hadithapi-k8s.sh @@ -0,0 +1,44 @@ +#!/bin/bash +# test-hadithapi-k8s.sh + +set -e + +echo "=== Kubernetes HadithAPI Integration Test ===" + +# 1. Create secrets +echo "Creating secrets..." +./create-secrets.sh + +# 2. Build and load image (if using local cluster) +echo "Building Docker image..." +docker build -t hadith-ingestion:latest . + +# If using kind/minikube, load image +# kind load docker-image hadith-ingestion:latest + +# 3. Submit test workflow (10 hadiths) +echo "Submitting test workflow..." +argo submit -n ml argo/workflows/ingest-hadithapi.yaml \ + --parameter book-slug=sahih-bukhari \ + --parameter limit=10 \ + --wait \ + --log + +# 4. Check workflow status +echo -e "\nChecking workflow status..." +argo list -n argo + +# 5. Verify data in database +echo -e "\nVerifying data..." +kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c " +SELECT + c.name_english, + COUNT(h.id) as hadith_count, + MAX(h.created_at) as last_ingestion +FROM collections c +LEFT JOIN hadiths h ON c.id = h.collection_id +WHERE c.abbreviation = 'bukhari' +GROUP BY c.name_english; +" + +echo -e "\n=== Test Complete ===" \ No newline at end of file diff --git a/hadith-ingestion/test-hadithapi-local.sh b/hadith-ingestion/test-hadithapi-local.sh new file mode 100755 index 0000000..aa01df5 --- /dev/null +++ b/hadith-ingestion/test-hadithapi-local.sh @@ -0,0 +1,39 @@ +#!/bin/bash +# test-hadithapi-local.sh + +set -e + +echo "=== HadithAPI.com Integration Test ===" + +# 1. Test API connection +echo "Testing API connection..." +curl -s "https://hadithapi.com/api/books?apiKey=\$2y\$10\$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK" | jq . + +# 2. Test database connection +echo -e "\nTesting database connection..." +kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c "SELECT COUNT(*) FROM collections;" + +# 3. List available books +echo -e "\nListing available books..." +python src/main_hadithapi.py --list-books + +# 4. Test ingestion (limited to 10 hadiths) +echo -e "\nRunning test ingestion (10 hadiths from Sahih Bukhari)..." +python src/main_hadithapi.py --book-slug sahih-bukhari --limit 10 + +# 5. Verify data +echo -e "\nVerifying ingested data..." +kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c " +SELECT + c.name_english, + c.abbreviation, + COUNT(h.id) as hadith_count, + COUNT(DISTINCT b.id) as book_count +FROM collections c +LEFT JOIN hadiths h ON c.id = h.collection_id +LEFT JOIN books b ON h.book_id = b.id +WHERE c.abbreviation = 'bukhari' +GROUP BY c.name_english, c.abbreviation; +" + +echo -e "\n=== Test Complete ===" \ No newline at end of file diff --git a/hadith-ingestion/tests/__init__.py b/hadith-ingestion/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hadith-ingestion/tests/test_clients.py b/hadith-ingestion/tests/test_clients.py new file mode 100644 index 0000000..e69de29