diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..e3d3557 Binary files /dev/null and b/.DS_Store differ diff --git a/hadith-ingestion/.DS_Store b/hadith-ingestion/.DS_Store new file mode 100644 index 0000000..c499c23 Binary files /dev/null and b/hadith-ingestion/.DS_Store differ diff --git a/hadith-ingestion/combine.sh b/hadith-ingestion/combine.sh new file mode 100755 index 0000000..fd9fa32 --- /dev/null +++ b/hadith-ingestion/combine.sh @@ -0,0 +1,5 @@ +find . -type f -name "*.txt" -o -name "production" -o -name "*.py" -o -name "*.yaml" -o -name "Dockerfile" -o -name "*.sh" -o -name "*.env" -o -name "*.md" ! -name "*.xls" ! -name "*.xlsx"| while read file; do + echo "=== $file ===" >> combined.txt + cat "$file" >> combined.txt + echo "" >> combined.txt +done diff --git a/hadith-ingestion/combined.txt b/hadith-ingestion/combined.txt new file mode 100644 index 0000000..f9b621c --- /dev/null +++ b/hadith-ingestion/combined.txt @@ -0,0 +1,3202 @@ +=== ./run-full-ingestion.sh === +#!/bin/bash +# run-full-ingestion.sh + +set -e + +echo "=== Starting Full HadithAPI Ingestion ===" + +# Books to ingest (in order) +BOOKS=( + "sahih-bukhari" + "sahih-muslim" + "sunan-abu-dawood" + "jami-at-tirmidhi" + "sunan-an-nasai" + "sunan-ibn-e-majah" +) + +for BOOK in "${BOOKS[@]}"; do + echo -e "\n=========================================" + echo "Ingesting: $BOOK" + echo "=========================================" + + argo submit -n argo argo/workflows/ingest-hadithapi.yaml \ + --parameter book-slug=$BOOK \ + --parameter limit=0 \ + --wait \ + --log + + echo "$BOOK completed!" + + # Optional: add delay between books + sleep 10 +done + +echo -e "\n=== All Books Ingestion Complete ===" + +# Print summary +kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c " +SELECT + c.name_english, + c.abbreviation, + COUNT(h.id) as hadith_count, + COUNT(DISTINCT b.id) as chapter_count +FROM collections c +LEFT JOIN hadiths h ON c.id = h.collection_id +LEFT JOIN books b ON h.book_id = b.id +GROUP BY c.name_english, c.abbreviation +ORDER BY hadith_count DESC; +" +=== ./create-secrets.sh === +#!/bin/bash +# create-secrets.sh + +# Database secret +kubectl -n ml create secret generic hadith-db-secret \ + --from-literal=password='hadith_ingest' \ + --dry-run=client -o yaml | kubectl apply -f - + +# HadithAPI secret (already public, but for consistency) +kubectl -n ml create secret generic hadithapi-secret \ + --from-literal=api-key='$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK' \ + --dry-run=client -o yaml | kubectl apply -f - + +# MinIO secret +kubectl -n ml create secret generic minio-secret \ + --from-literal=access-key='minioadmin' \ + --from-literal=secret-key='minioadmin' \ + --dry-run=client -o yaml | kubectl apply -f - + +echo "Secrets created successfully" +=== ./requirements.txt === +# Core dependencies +python-dotenv==1.0.0 +pydantic==2.5.0 +pydantic-settings==2.1.0 + +# HTTP clients +httpx==0.25.2 +requests==2.31.0 +tenacity==8.2.3 + +# Database +psycopg2-binary==2.9.9 +sqlalchemy==2.0.23 +asyncpg==0.29.0 + +# Data processing +pandas==2.1.4 +numpy==1.26.2 +pyarabic==0.6.15 +arabic-reshaper==3.0.0 + +# Validation +jsonschema==4.20.0 +validators==0.22.0 + +# Logging & Monitoring +structlog==23.2.0 +prometheus-client==0.19.0 + +# Cloud storage +minio==7.2.0 +boto3==1.34.0 + +# Task queue (optional) +celery==5.3.4 +redis==5.0.1 + +# Testing +pytest==7.4.3 +pytest-asyncio==0.21.1 +pytest-cov==4.1.0 +faker==21.0.0 +=== ./config/__init__.py === + +=== ./config/settings.py === +""" +Configuration settings for hadith ingestion service +""" +from pydantic_settings import BaseSettings +from typing import Optional +import os + + +class Settings(BaseSettings): + """Application settings loaded from environment variables""" + + # Database + DATABASE_HOST: str = "postgres.db.svc.cluster.local" + DATABASE_PORT: int = 5432 + DATABASE_NAME: str = "hadith_db" + DATABASE_USER: str = "hadith_ingest" + DATABASE_PASSWORD: str = "hadith_ingest" + + @property + def DATABASE_URL(self) -> str: + return ( + f"postgresql://{self.DATABASE_USER}:{self.DATABASE_PASSWORD}" + f"@{self.DATABASE_HOST}:{self.DATABASE_PORT}/{self.DATABASE_NAME}" + ) + + @property + def ASYNC_DATABASE_URL(self) -> str: + return ( + f"postgresql+asyncpg://{self.DATABASE_USER}:{self.DATABASE_PASSWORD}" + f"@{self.DATABASE_HOST}:{self.DATABASE_PORT}/{self.DATABASE_NAME}" + ) + + # MinIO / S3 + MINIO_ENDPOINT: str = "minio.storage.svc.cluster.local:9000" + MINIO_ACCESS_KEY: str = "minioadmin" + MINIO_SECRET_KEY: str = "minioadmin" + MINIO_BUCKET_RAW: str = "hadith-raw-data" + MINIO_BUCKET_PROCESSED: str = "hadith-processed" + MINIO_SECURE: bool = False + + # APIs + SUNNAH_API_KEY: Optional[str] = None + SUNNAH_BASE_URL: str = "https://api.sunnah.com/v1" + HADITH_ONE_API_KEY: Optional[str] = "$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK" + # HadithAPI.com + HADITHAPI_KEY: str = "$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK" + HADITHAPI_BASE_URL: str = "https://hadithapi.com/api" + # Rate limiting + API_RATE_LIMIT: int = 30 # requests per minute + API_MAX_RETRIES: int = 3 + API_RETRY_DELAY: int = 5 # seconds + + # Processing + BATCH_SIZE: int = 100 + MAX_WORKERS: int = 4 + + # TEI Service (for embeddings) + TEI_URL: str = "http://tei.ml.svc.cluster.local" + TEI_TIMEOUT: int = 30 + + # Qdrant + QDRANT_URL: str = "http://qdrant.db.svc.cluster.local:6333" + QDRANT_COLLECTION: str = "hadith_embeddings" + + # Logging + LOG_LEVEL: str = "INFO" + LOG_FORMAT: str = "json" + + # Job tracking + JOB_NAME: Optional[str] = None + JOB_TYPE: str = "api_fetch" + + class Config: + env_file = ".env" + env_file_encoding = "utf-8" + case_sensitive = True + + +# Global settings instance +settings = Settings() +=== ./Dockerfile === +FROM python:3.11-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + gcc \ + postgresql-client \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements +COPY requirements.txt . + +# Install Python dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY config/ /app/config/ +COPY src/ /app/src/ + +# Create non-root user +RUN useradd -m -u 1000 hadith && chown -R hadith:hadith /app +USER hadith + +# Set Python path +ENV PYTHONPATH=/app + +# Default command +CMD ["python", "/app/src/main_hadithapi.py"] +=== ./tests/__init__.py === + +=== ./tests/test_clients.py === + +=== ./test-hadithapi-k8s.sh === +#!/bin/bash +# test-hadithapi-k8s.sh + +set -e + +echo "=== Kubernetes HadithAPI Integration Test ===" + +# 1. Create secrets +echo "Creating secrets..." +#./create-secrets.sh + +# 2. Build and load image (if using local cluster) +echo "Building Docker image..." +#docker build -t hadith-ingestion:latest . + +# If using kind/minikube, load image +# kind load docker-image hadith-ingestion:latest + +# 3. Submit test workflow (10 hadiths) +echo "Submitting test workflow..." +argo submit -n ml argo/workflows/ingest-hadithapi.yaml \ + --parameter book-slug=sahih-bukhari \ + --parameter limit=10 \ + --wait \ + --log + +# 4. Check workflow status +echo -e "\nChecking workflow status..." +argo list -n argo + +# 5. Verify data in database +echo -e "\nVerifying data..." +kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c " +SELECT + c.name_english, + COUNT(h.id) as hadith_count, + MAX(h.created_at) as last_ingestion +FROM collections c +LEFT JOIN hadiths h ON c.id = h.collection_id +WHERE c.abbreviation = 'bukhari' +GROUP BY c.name_english; +" + +echo -e "\n=== Test Complete ===" +=== ./README.md === +# 🚀 HadithAPI.com Deployment - Quick Start + +## What You Got + +Three comprehensive guides: +1. **PHASE_2_IMPLEMENTATION_GUIDE.md** - Original guide with PostgreSQL schema +2. **HADITHAPI_INTEGRATION_GUIDE.md** - Complete HadithAPI.com implementation +3. **This summary** - Quick deployment steps + +## 📦 Complete Package Structure + +The HadithAPI guide includes everything you need: + +### Production-Ready Code +✅ **hadithapi_client.py** - Full API client with pagination and rate limiting +✅ **main_hadithapi.py** - Complete ingestion service +✅ **settings.py** - Configuration with your API key +✅ **Dockerfile** - Container image +✅ **Argo Workflows** - Kubernetes automation +✅ **Test scripts** - Validation and troubleshooting + +### Key Features +- ✅ Automatic pagination handling +- ✅ Rate limiting (30 req/min) +- ✅ Error handling and retries +- ✅ Progress tracking +- ✅ Structured logging +- ✅ Multi-language support (Arabic, English, Urdu) + +## 🎯 5-Minute Quick Start + +### 1. Database Setup (2 min) +```bash +# Use schema from PHASE_2_IMPLEMENTATION_GUIDE.md Section 1 +kubectl -n db exec -it postgres-0 -- psql -U app -d gitea + +# Copy all SQL from Section 1.2 through 1.6 +# This creates hadith_db with complete schema +``` + +### 2. Create Project Structure (1 min) +```bash +mkdir -p hadith-ingestion/{config,src/{api_clients,processors,database,utils},argo/workflows} +cd hadith-ingestion/ + +# Copy code from HADITHAPI_INTEGRATION_GUIDE.md: +# - Section 2.1 → src/api_clients/hadithapi_client.py +# - Section 4.1 → src/main_hadithapi.py +# - Section 5.1 → config/settings.py +# - Section 6.1 → Dockerfile +# - Section 6.4 → argo/workflows/ingest-hadithapi.yaml + +# Also copy from PHASE_2_IMPLEMENTATION_GUIDE.md: +# - Section 3.4 → src/api_clients/base_client.py +# - Section 3.6 → src/processors/text_cleaner.py +# - Section 3.7 → src/database/repository.py +``` + +### 3. Build & Deploy (2 min) +```bash +# Build image +docker build -t hadith-ingestion:latest . + +# Create secrets +kubectl -n argo create secret generic hadith-db-secret \ + --from-literal=password='YOUR_PASSWORD' + +kubectl -n argo create secret generic hadithapi-secret \ + --from-literal=api-key='$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK' + +# Test with 10 hadiths +argo submit -n argo argo/workflows/ingest-hadithapi.yaml \ + --parameter book-slug=sahih-bukhari \ + --parameter limit=10 \ + --watch +``` + +## 📊 Expected Results + +### Available Collections +| Book | Hadiths | Time | +|------|---------|------| +| Sahih Bukhari | ~7,500 | 2-3h | +| Sahih Muslim | ~7,000 | 2-3h | +| Sunan Abu Dawood | ~5,000 | 1-2h | +| Jami` at-Tirmidhi | ~4,000 | 1-2h | +| Sunan an-Nasa'i | ~5,700 | 2h | +| Sunan Ibn Majah | ~4,300 | 1-2h | +| **TOTAL** | **~33,500** | **10-15h** | + +## 🔧 Key Differences from Sunnah.com + +| Feature | HadithAPI.com | Sunnah.com | +|---------|---------------|------------| +| **API Key** | ✅ Public (provided) | ❌ Requires PR | +| **Rate Limit** | Unknown (using 30/min) | 100/min | +| **Coverage** | 6 major books | 10+ books | +| **Languages** | Arabic, English, Urdu | Arabic, English | +| **Cost** | ✅ Free | Free | +| **Stability** | Good | Excellent | + +## 📝 Complete File Checklist + +Create these files from the guides: + +``` +hadith-ingestion/ +├── Dockerfile ✓ Section 6.1 +├── requirements.txt ✓ Phase 2 Section 3.2 +├── .env ✓ Section 5.2 +├── build-hadithapi-ingestion.sh ✓ Section 6.2 +├── create-secrets.sh ✓ Section 6.3 +├── test-hadithapi-local.sh ✓ Section 7.1 +├── test-hadithapi-k8s.sh ✓ Section 7.2 +├── run-full-ingestion.sh ✓ Section 7.3 +├── config/ +│ ├── __init__.py (empty file) +│ └── settings.py ✓ Section 5.1 +├── src/ +│ ├── __init__.py (empty file) +│ ├── main_hadithapi.py ✓ Section 4.1 +│ ├── api_clients/ +│ │ ├── __init__.py (empty file) +│ │ ├── base_client.py ✓ Phase 2 Sec 3.4 +│ │ └── hadithapi_client.py ✓ Section 2.1 +│ ├── processors/ +│ │ ├── __init__.py (empty file) +│ │ └── text_cleaner.py ✓ Phase 2 Sec 3.6 +│ ├── database/ +│ │ ├── __init__.py (empty file) +│ │ ├── connection.py (optional) +│ │ └── repository.py ✓ Phase 2 Sec 3.7 +│ └── utils/ +│ ├── __init__.py (empty file) +│ └── logger.py (optional) +└── argo/ + └── workflows/ + └── ingest-hadithapi.yaml ✓ Section 6.4 +``` + +## 🎬 Step-by-Step Execution + +### Day 1: Setup & Test (2-3 hours) +```bash +# 1. Create database schema +# 2. Set up project structure +# 3. Build Docker image +# 4. Create secrets +# 5. Run test with 10 hadiths +# 6. Verify data +``` + +### Day 2: Ingest Major Collections (10-15 hours) +```bash +# Ingest all 6 major collections sequentially +./run-full-ingestion.sh + +# Or manually one by one: +argo submit ... --parameter book-slug=sahih-bukhari +argo submit ... --parameter book-slug=sahih-muslim +# etc... +``` + +### Day 3: Validation & Next Steps +```bash +# 1. Verify data quality +# 2. Check statistics +# 3. Proceed to Phase 3 (ML model development) +``` + +## ✅ Verification Checklist + +After ingestion completes: + +```bash +# 1. Check total hadiths +kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c " +SELECT COUNT(*) FROM hadiths; +" +# Expected: ~33,500 + +# 2. Check per collection +kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c " +SELECT + c.name_english, + COUNT(h.id) as count +FROM collections c +LEFT JOIN hadiths h ON c.id = h.collection_id +WHERE c.abbreviation IN ('bukhari', 'muslim', 'abudawud', 'tirmidhi', 'nasai', 'ibnmajah') +GROUP BY c.name_english; +" + +# 3. Check for errors +kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c " +SELECT * FROM ingestion_jobs +WHERE status = 'failed' +ORDER BY created_at DESC; +" +``` + +## 🐛 Common Issues & Solutions + +### Issue: Rate Limiting +``` +Error: 429 Too Many Requests +Solution: Already set to conservative 30/min +If still hitting limits, edit settings.py: + API_RATE_LIMIT = 20 +``` + +### Issue: Connection Timeout +``` +Error: Connection timeout to database +Solution: +1. Check PostgreSQL is running +2. Verify credentials in secrets +3. Test connection manually +``` + +### Issue: Missing Chapters +``` +Warning: chapters_fetch_failed +Solution: Script automatically falls back to fetching all hadiths +This is expected and not critical +``` + +## 📚 Documentation References + +All details in the comprehensive guides: + +1. **PHASE_2_IMPLEMENTATION_GUIDE.md** + - PostgreSQL schema (Section 1) + - Base utilities (Section 3) + - Database repository (Section 3.7) + +2. **HADITHAPI_INTEGRATION_GUIDE.md** + - API client (Section 2) + - Main ingestion service (Section 4) + - Deployment (Section 6) + - Testing (Section 7) + +## 🎯 Next Phase + +After Phase 2 completion: +→ **Phase 3: ML Model Development** + - Annotate sample hadiths (Label Studio) + - Train NER model + - Train relation extraction model + - Fine-tune LLM with LoRA + +## 💡 Pro Tips + +1. **Start Small**: Test with `--limit 10` first +2. **Monitor Progress**: Use `argo logs -n argo -f` +3. **Check Logs**: Structured JSON logs for easy debugging +4. **Backup Data**: Before major operations +5. **Rate Limiting**: Be conservative to avoid blocks + +## 🎉 Success Criteria + +Phase 2 is complete when: +- ✅ Database schema created +- ✅ 33,500+ hadiths ingested +- ✅ All 6 collections present +- ✅ No critical errors +- ✅ Data validated +- ✅ Ready for embedding generation + +--- + +**Estimated Total Time: 1-2 days** +**Difficulty: Intermediate** +**Prerequisites: Phase 1 completed (all core services running)** + +Ready to start? Begin with Section 1 of PHASE_2_IMPLEMENTATION_GUIDE.md! +=== ./setup.py === + +=== ./build-and-push.sh === +#!/bin/bash +# build-and-push.sh + +set -e + +# Configuration +IMAGE_NAME="hadith-ingestion" +TAG="${1:-latest}" +DOCKER_REGISTRY="axxs" +REGISTRY="${DOCKER_REGISTRY:-}" + +echo "Building Docker image: ${IMAGE_NAME}:${TAG}" + +# Build image +docker build -t ${IMAGE_NAME}:${TAG} -f Dockerfile . + +# Tag for registry +docker tag ${IMAGE_NAME}:${TAG} ${REGISTRY}/${IMAGE_NAME}:${TAG} + +# Push to registry +echo "Pushing to registry: ${REGISTRY}" +docker push ${REGISTRY}/${IMAGE_NAME}:${TAG} + +echo "Done!" +=== ./.env === +# Database +DATABASE_HOST=postgres.db.svc.cluster.local +DATABASE_PORT=5432 +DATABASE_NAME=hadith_db +DATABASE_USER=hadith_ingest +DATABASE_PASSWORD=hadith_ingest + +# HadithAPI.com +HADITHAPI_KEY=$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK + +# MinIO +MINIO_ENDPOINT=minio.storage.svc.cluster.local:9000 +MINIO_ACCESS_KEY=minioadmin +MINIO_SECRET_KEY=minioadmin + +# Services +TEI_URL=http://tei.ml.svc.cluster.local +QDRANT_URL=http://qdrant.vector.svc.cluster.local:6333 + +# Settings +LOG_LEVEL=INFO +API_RATE_LIMIT=30 +BATCH_SIZE=100 +=== ./build-hadithapi-ingestion.sh === +#!/bin/bash +# build-hadithapi-ingestion.sh + +set -e + +IMAGE_NAME="hadith-ingestion" +TAG="v1.0-hadithapi" + +echo "Building Docker image for HadithAPI.com ingestion..." + +# Build image +docker build -t ${IMAGE_NAME}:${TAG} -f Dockerfile . + +# Tag as latest +docker tag ${IMAGE_NAME}:${TAG} ${IMAGE_NAME}:latest + +# If you have a registry, push +# docker push your-registry/${IMAGE_NAME}:${TAG} + +echo "Build complete: ${IMAGE_NAME}:${TAG}" +=== ./combine.sh === +find . -type f -name "*.txt" -o -name "production" -o -name "*.py" -o -name "*.yaml" -o -name "Dockerfile" -o -name "*.sh" -o -name "*.env" -o -name "*.md" ! -name "*.xls" ! -name "*.xlsx"| while read file; do + echo "=== $file ===" >> combined.txt + cat "$file" >> combined.txt + echo "" >> combined.txt +done + +=== ./test-hadithapi-local.sh === +#!/bin/bash +# test-hadithapi-local.sh + +set -e + +echo "=== HadithAPI.com Integration Test ===" + +# 1. Test API connection +echo "Testing API connection..." +curl -s "https://hadithapi.com/api/books?apiKey=\$2y\$10\$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK" | jq . + +# 2. Test database connection +echo -e "\nTesting database connection..." +kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c "SELECT COUNT(*) FROM collections;" + +# 3. List available books +echo -e "\nListing available books..." +python src/main_hadithapi.py --list-books + +# 4. Test ingestion (limited to 10 hadiths) +echo -e "\nRunning test ingestion (10 hadiths from Sahih Bukhari)..." +python src/main_hadithapi.py --book-slug sahih-bukhari --limit 10 + +# 5. Verify data +echo -e "\nVerifying ingested data..." +kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c " +SELECT + c.name_english, + c.abbreviation, + COUNT(h.id) as hadith_count, + COUNT(DISTINCT b.id) as book_count +FROM collections c +LEFT JOIN hadiths h ON c.id = h.collection_id +LEFT JOIN books b ON h.book_id = b.id +WHERE c.abbreviation = 'bukhari' +GROUP BY c.name_english, c.abbreviation; +" + +echo -e "\n=== Test Complete ===" +=== ./simple-pod.yaml === +apiVersion: v1 +kind: Pod +metadata: + name: hadith-ingestion-list-books + namespace: ml +spec: + restartPolicy: Never + containers: + - name: hadith-ingestion + image: axxs/hadith-ingestion:latest + # command: ["python"] + # args: ["/app/src/main_hadithapi.py", "--list-books"] + command: ["sh","-c","sleep infinity"] + env: + - name: DATABASE_HOST + value: "postgres.db.svc.cluster.local" + - name: DATABASE_PORT + value: "5432" + - name: DATABASE_NAME + value: "hadith_db" + - name: DATABASE_USER + value: "hadith_ingest" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + - name: HADITHAPI_KEY + valueFrom: + secretKeyRef: + name: hadithapi-secret + key: api-key + - name: MINIO_ENDPOINT + value: "minio.storage.svc.cluster.local:9000" + - name: MINIO_ACCESS_KEY + valueFrom: + secretKeyRef: + name: minio-secret + key: access-key + - name: MINIO_SECRET_KEY + valueFrom: + secretKeyRef: + name: minio-secret + key: secret-key + - name: LOG_LEVEL + value: "INFO" +=== ./argo/workflows/ingest-collection.yaml === +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: ingest-hadith-collection- + namespace: argo +spec: + entrypoint: ingest-pipeline + + # Arguments + arguments: + parameters: + - name: collection + value: "bukhari" + - name: limit + value: "0" # 0 means no limit + + # Service account with database access + serviceAccountName: argo-workflow + + # Templates + templates: + + # ======================================== + # Main pipeline + # ======================================== + - name: ingest-pipeline + steps: + - - name: ingest-hadiths + template: ingest + arguments: + parameters: + - name: collection + value: "{{workflow.parameters.collection}}" + - name: limit + value: "{{workflow.parameters.limit}}" + + - - name: generate-embeddings + template: generate-embeddings + arguments: + parameters: + - name: collection + value: "{{workflow.parameters.collection}}" + + - - name: index-qdrant + template: index-qdrant + arguments: + parameters: + - name: collection + value: "{{workflow.parameters.collection}}" + + # ======================================== + # Ingestion step + # ======================================== + - name: ingest + inputs: + parameters: + - name: collection + - name: limit + + container: + image: hadith-ingestion:latest + imagePullPolicy: IfNotPresent + command: [python, /app/src/main.py] + args: + - "{{inputs.parameters.collection}}" + - "--limit={{inputs.parameters.limit}}" + + env: + - name: DATABASE_HOST + value: "postgres.db.svc.cluster.local" + - name: DATABASE_PORT + value: "5432" + - name: DATABASE_NAME + value: "hadith_db" + - name: DATABASE_USER + value: "hadith_ingest" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + + - name: MINIO_ENDPOINT + value: "minio.storage.svc.cluster.local:9000" + - name: MINIO_ACCESS_KEY + valueFrom: + secretKeyRef: + name: minio-secret + key: access-key + - name: MINIO_SECRET_KEY + valueFrom: + secretKeyRef: + name: minio-secret + key: secret-key + + - name: SUNNAH_API_KEY + valueFrom: + secretKeyRef: + name: sunnah-api-secret + key: api-key + + - name: LOG_LEVEL + value: "INFO" + - name: JOB_NAME + value: "{{workflow.name}}" + + resources: + requests: + cpu: 500m + memory: 1Gi + limits: + cpu: 2 + memory: 4Gi + + # ======================================== + # Embedding generation step + # ======================================== + - name: generate-embeddings + inputs: + parameters: + - name: collection + + container: + image: hadith-embeddings:latest + imagePullPolicy: IfNotPresent + command: [python, /app/generate_embeddings.py] + args: + - "--collection={{inputs.parameters.collection}}" + - "--batch-size=32" + + env: + - name: DATABASE_URL + value: "postgresql://hadith_ingest:$(DATABASE_PASSWORD)@postgres.db.svc.cluster.local:5432/hadith_db" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + + - name: TEI_URL + value: "http://tei.ml.svc.cluster.local" + + - name: LOG_LEVEL + value: "INFO" + + resources: + requests: + cpu: 1 + memory: 2Gi + limits: + cpu: 4 + memory: 8Gi + + # ======================================== + # Qdrant indexing step + # ======================================== + - name: index-qdrant + inputs: + parameters: + - name: collection + + container: + image: hadith-qdrant-indexer:latest + imagePullPolicy: IfNotPresent + command: [python, /app/index_qdrant.py] + args: + - "--collection={{inputs.parameters.collection}}" + - "--batch-size=100" + + env: + - name: DATABASE_URL + value: "postgresql://hadith_ingest:$(DATABASE_PASSWORD)@postgres.db.svc.cluster.local:5432/hadith_db" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + + - name: QDRANT_URL + value: "http://qdrant.vector.svc.cluster.local:6333" + - name: QDRANT_COLLECTION + value: "hadith_embeddings" + + - name: LOG_LEVEL + value: "INFO" + + resources: + requests: + cpu: 500m + memory: 1Gi + limits: + cpu: 2 + memory: 4Gi +=== ./argo/workflows/ingest-hadithapi.yaml === +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: ingest-hadithapi- + namespace: ml +spec: + entrypoint: ingest-pipeline + + arguments: + parameters: + - name: book-slug + value: "sahih-bukhari" + - name: limit + value: "0" # 0 means no limit + + serviceAccountName: argo-workflow + + templates: + + # ======================================== + # Main pipeline + # ======================================== + - name: ingest-pipeline + steps: + - - name: ingest-hadiths + template: ingest + arguments: + parameters: + - name: book-slug + value: "{{workflow.parameters.book-slug}}" + - name: limit + value: "{{workflow.parameters.limit}}" + + # ======================================== + # Ingestion step + # ======================================== + - name: ingest + inputs: + parameters: + - name: book-slug + - name: limit + + container: + image: axxs/hadith-ingestion:latest + imagePullPolicy: IfNotPresent + command: [python, /app/src/main_hadithapi.py] + args: + - "--book-slug={{inputs.parameters.book-slug}}" + - "--limit={{inputs.parameters.limit}}" + + env: + - name: DATABASE_HOST + value: "postgres.db.svc.cluster.local" + - name: DATABASE_PORT + value: "5432" + - name: DATABASE_NAME + value: "hadith_db" + - name: DATABASE_USER + value: "hadith_ingest" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + + - name: HADITHAPI_KEY + valueFrom: + secretKeyRef: + name: hadithapi-secret + key: api-key + + - name: MINIO_ENDPOINT + value: "minio.storage.svc.cluster.local:9000" + - name: MINIO_ACCESS_KEY + valueFrom: + secretKeyRef: + name: minio-secret + key: access-key + - name: MINIO_SECRET_KEY + valueFrom: + secretKeyRef: + name: minio-secret + key: secret-key + + - name: LOG_LEVEL + value: "INFO" + - name: JOB_NAME + value: "{{workflow.name}}" + - name: API_RATE_LIMIT + value: "30" + + resources: + requests: + cpu: 500m + memory: 1Gi + limits: + cpu: 2 + memory: 4Gi +--- +# Workflow to ingest ALL books +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: ingest-all-hadithapi- + namespace: ml +spec: + entrypoint: ingest-all-books + + serviceAccountName: argo-workflow + + arguments: + parameters: + - name: limit-per-book + value: "0" # 0 means no limit + + templates: + + # ======================================== + # Main pipeline - sequential processing + # ======================================== + - name: ingest-all-books + steps: + # Process each book sequentially to avoid rate limiting + - - name: sahih-bukhari + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "sahih-bukhari" + + - - name: sahih-muslim + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "sahih-muslim" + + - - name: sunan-abu-dawood + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "sunan-abu-dawood" + + - - name: jami-at-tirmidhi + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "jami-at-tirmidhi" + + - - name: sunan-an-nasai + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "sunan-an-nasai" + + - - name: sunan-ibn-e-majah + template: ingest-book + arguments: + parameters: + - name: book-slug + value: "sunan-ibn-e-majah" + + # ======================================== + # Book ingestion template + # ======================================== + - name: ingest-book + inputs: + parameters: + - name: book-slug + + container: + image: axxs/hadith-ingestion:latest + imagePullPolicy: IfNotPresent + command: [python, /app/src/main_hadithapi.py] + args: + - "--book-slug={{inputs.parameters.book-slug}}" + - "--limit={{workflow.parameters.limit-per-book}}" + + env: + - name: DATABASE_HOST + value: "postgres.db.svc.cluster.local" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + - name: HADITHAPI_KEY + valueFrom: + secretKeyRef: + name: hadithapi-secret + key: api-key + - name: LOG_LEVEL + value: "INFO" + + resources: + requests: + cpu: 500m + memory: 1Gi + limits: + cpu: 2 + memory: 4Gi +=== ./src/database/__init__.py === + +=== ./src/database/connection.py === + +=== ./src/database/repository.py === +""" +Database repository for hadith data operations +""" +from typing import List, Dict, Any, Optional +from uuid import UUID +import structlog +from sqlalchemy import create_engine, text, select, insert, update +from sqlalchemy.orm import sessionmaker, Session +from sqlalchemy.exc import IntegrityError +from config.settings import settings + +logger = structlog.get_logger() + + +class HadithRepository: + """Repository for hadith database operations""" + + def __init__(self, database_url: Optional[str] = None): + self.database_url = database_url or settings.DATABASE_URL + self.engine = create_engine(self.database_url, pool_pre_ping=True) + self.SessionLocal = sessionmaker(bind=self.engine) + + def get_session(self) -> Session: + """Get database session""" + return self.SessionLocal() + + # ===== Collections ===== + + def get_collection_by_abbreviation(self, abbr: str) -> Optional[Dict[str, Any]]: + """Get collection by abbreviation""" + with self.get_session() as session: + query = text(""" + SELECT * FROM collections + WHERE abbreviation = :abbr + """) + result = session.execute(query, {"abbr": abbr}).fetchone() + + if result: + return dict(result._mapping) + return None + + def get_all_collections(self) -> List[Dict[str, Any]]: + """Get all collections""" + with self.get_session() as session: + query = text("SELECT * FROM collections ORDER BY name_english") + result = session.execute(query).fetchall() + return [dict(row._mapping) for row in result] + + def update_collection_count(self, collection_id: UUID, count: int): + """Update total hadith count for a collection""" + with self.get_session() as session: + query = text(""" + UPDATE collections + SET total_hadiths = :count, updated_at = NOW() + WHERE id = :id + """) + session.execute(query, {"id": str(collection_id), "count": count}) + session.commit() + + # ===== Books ===== + + def upsert_book( + self, + collection_id: UUID, + book_number: int, + name_english: Optional[str] = None, + name_arabic: Optional[str] = None, + metadata: Optional[Dict] = None + ) -> UUID: + """Insert or update a book""" + with self.get_session() as session: + query = text(""" + INSERT INTO books (collection_id, book_number, name_english, name_arabic, metadata) + VALUES (:collection_id, :book_number, :name_english, :name_arabic, :metadata) + ON CONFLICT (collection_id, book_number) + DO UPDATE SET + name_english = EXCLUDED.name_english, + name_arabic = EXCLUDED.name_arabic, + metadata = EXCLUDED.metadata + RETURNING id + """) + + result = session.execute(query, { + "collection_id": str(collection_id), + "book_number": book_number, + "name_english": name_english, + "name_arabic": name_arabic, + "metadata": metadata or {} + }) + session.commit() + + return UUID(result.fetchone()[0]) + + def get_book(self, collection_id: UUID, book_number: int) -> Optional[Dict[str, Any]]: + """Get book by collection and book number""" + with self.get_session() as session: + query = text(""" + SELECT * FROM books + WHERE collection_id = :collection_id AND book_number = :book_number + """) + result = session.execute(query, { + "collection_id": str(collection_id), + "book_number": book_number + }).fetchone() + + if result: + return dict(result._mapping) + return None + + # ===== Hadiths ===== + + def upsert_hadith( + self, + collection_id: UUID, + hadith_number: int, + arabic_text: str, + book_id: Optional[UUID] = None, + english_text: Optional[str] = None, + urdu_text: Optional[str] = None, + grade: Optional[str] = None, + grade_source: Optional[str] = None, + chapter_name: Optional[str] = None, + source_id: Optional[str] = None, + source_url: Optional[str] = None, + source_metadata: Optional[Dict] = None + ) -> UUID: + """Insert or update a hadith""" + with self.get_session() as session: + query = text(""" + INSERT INTO hadiths ( + collection_id, book_id, hadith_number, + arabic_text, english_text, urdu_text, + grade, grade_source, chapter_name, + source_id, source_url, source_metadata + ) + VALUES ( + :collection_id, :book_id, :hadith_number, + :arabic_text, :english_text, :urdu_text, + :grade, :grade_source, :chapter_name, + :source_id, :source_url, :source_metadata + ) + ON CONFLICT (collection_id, book_id, hadith_number) + DO UPDATE SET + arabic_text = EXCLUDED.arabic_text, + english_text = EXCLUDED.english_text, + urdu_text = EXCLUDED.urdu_text, + grade = EXCLUDED.grade, + grade_source = EXCLUDED.grade_source, + chapter_name = EXCLUDED.chapter_name, + source_url = EXCLUDED.source_url, + source_metadata = EXCLUDED.source_metadata, + updated_at = NOW() + RETURNING id + """) + + result = session.execute(query, { + "collection_id": str(collection_id), + "book_id": str(book_id) if book_id else None, + "hadith_number": hadith_number, + "arabic_text": arabic_text, + "english_text": english_text, + "urdu_text": urdu_text, + "grade": grade, + "grade_source": grade_source, + "chapter_name": chapter_name, + "source_id": source_id, + "source_url": source_url, + "source_metadata": source_metadata or {} + }) + session.commit() + + return UUID(result.fetchone()[0]) + + def get_hadiths_without_embeddings( + self, + limit: int = 100, + collection_id: Optional[UUID] = None + ) -> List[Dict[str, Any]]: + """Get hadiths that need embedding generation""" + with self.get_session() as session: + if collection_id: + query = text(""" + SELECT * FROM hadiths + WHERE embedding_generated = FALSE + AND collection_id = :collection_id + ORDER BY created_at ASC + LIMIT :limit + """) + result = session.execute(query, { + "collection_id": str(collection_id), + "limit": limit + }).fetchall() + else: + query = text(""" + SELECT * FROM hadiths + WHERE embedding_generated = FALSE + ORDER BY created_at ASC + LIMIT :limit + """) + result = session.execute(query, {"limit": limit}).fetchall() + + return [dict(row._mapping) for row in result] + + def mark_embedding_generated(self, hadith_id: UUID, version: str = "v1"): + """Mark hadith as having embedding generated""" + with self.get_session() as session: + query = text(""" + UPDATE hadiths + SET embedding_generated = TRUE, + embedding_version = :version, + updated_at = NOW() + WHERE id = :id + """) + session.execute(query, {"id": str(hadith_id), "version": version}) + session.commit() + + # ===== Ingestion Jobs ===== + + def create_ingestion_job( + self, + job_name: str, + job_type: str, + source_name: str, + config: Optional[Dict] = None + ) -> UUID: + """Create a new ingestion job""" + with self.get_session() as session: + query = text(""" + INSERT INTO ingestion_jobs (job_name, job_type, source_name, config, status, started_at) + VALUES (:job_name, :job_type, :source_name, :config, 'running', NOW()) + RETURNING id + """) + result = session.execute(query, { + "job_name": job_name, + "job_type": job_type, + "source_name": source_name, + "config": config or {} + }) + session.commit() + + return UUID(result.fetchone()[0]) + + def update_job_progress( + self, + job_id: UUID, + total: Optional[int] = None, + processed: Optional[int] = None, + failed: Optional[int] = None, + skipped: Optional[int] = None + ): + """Update job progress counters""" + with self.get_session() as session: + updates = [] + params = {"job_id": str(job_id)} + + if total is not None: + updates.append("total_records = :total") + params["total"] = total + if processed is not None: + updates.append("processed_records = :processed") + params["processed"] = processed + if failed is not None: + updates.append("failed_records = :failed") + params["failed"] = failed + if skipped is not None: + updates.append("skipped_records = :skipped") + params["skipped"] = skipped + + if updates: + query_str = f""" + UPDATE ingestion_jobs + SET {', '.join(updates)} + WHERE id = :job_id + """ + session.execute(text(query_str), params) + session.commit() + + def complete_job( + self, + job_id: UUID, + status: str = "success", + error_message: Optional[str] = None + ): + """Mark job as completed""" + with self.get_session() as session: + query = text(""" + UPDATE ingestion_jobs + SET status = :status, + completed_at = NOW(), + duration_seconds = EXTRACT(EPOCH FROM (NOW() - started_at)), + error_message = :error_message + WHERE id = :job_id + """) + session.execute(query, { + "job_id": str(job_id), + "status": status, + "error_message": error_message + }) + session.commit() + + def add_processing_log( + self, + job_id: UUID, + level: str, + message: str, + details: Optional[Dict] = None + ): + """Add a processing log entry""" + with self.get_session() as session: + query = text(""" + INSERT INTO processing_logs (job_id, log_level, message, details) + VALUES (:job_id, :level, :message, :details) + """) + session.execute(query, { + "job_id": str(job_id), + "level": level, + "message": message, + "details": details or {} + }) + session.commit() + + # ===== Statistics ===== + + def get_collection_stats(self, collection_id: UUID) -> Dict[str, Any]: + """Get statistics for a collection""" + with self.get_session() as session: + query = text(""" + SELECT * FROM get_collection_statistics(:collection_id) + """) + result = session.execute(query, {"collection_id": str(collection_id)}).fetchone() + + if result: + return dict(result._mapping) + return {} +=== ./src/embeddings/__init__.py === + +=== ./src/embeddings/generator.py === + +=== ./src/main_hadithapi.py === +""" +Main ingestion script for fetching hadiths from HadithAPI.com +""" +import sys +import argparse +from typing import Optional, Dict, Any +from uuid import UUID +import structlog +from config.settings import settings +from api_clients.hadithapi_client import HadithAPIClient +from database.repository import HadithRepository +from processors.text_cleaner import ArabicTextProcessor, TextCleaner + +# Configure structured logging +structlog.configure( + processors=[ + structlog.stdlib.filter_by_level, + structlog.stdlib.add_logger_name, + structlog.stdlib.add_log_level, + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.JSONRenderer() + ], + wrapper_class=structlog.stdlib.BoundLogger, + context_class=dict, + logger_factory=structlog.stdlib.LoggerFactory(), + cache_logger_on_first_use=True, +) + +logger = structlog.get_logger() + + +# Book slug to collection abbreviation mapping +BOOK_SLUG_MAPPING = { + 'sahih-bukhari': 'bukhari', + 'sahih-muslim': 'muslim', + 'sunan-abu-dawood': 'abudawud', + 'jami-at-tirmidhi': 'tirmidhi', + 'sunan-an-nasai': 'nasai', + 'sunan-ibn-e-majah': 'ibnmajah', + 'muwatta-imam-malik': 'malik', + 'musnad-ahmad': 'ahmad', + 'sunan-ad-darimi': 'darimi', + 'mishkat-al-masabih': 'mishkat' +} + + +class HadithAPIIngestionService: + """Service for ingesting hadiths from HadithAPI.com""" + + def __init__(self): + self.api_client = HadithAPIClient() + self.repo = HadithRepository() + self.text_processor = ArabicTextProcessor() + self.text_cleaner = TextCleaner() + + def sync_books_from_api(self) -> Dict[str, Any]: + """ + Sync book metadata from API to database + + Returns: + Dictionary mapping book_slug -> collection_id + """ + logger.info("syncing_books_from_api") + + # Get books from API + api_books = self.api_client.get_books() + + book_mapping = {} + + for api_book in api_books: + book_slug = api_book.get('bookSlug') + + # Map to our collection abbreviation + collection_abbr = BOOK_SLUG_MAPPING.get(book_slug) + + if not collection_abbr: + logger.warning( + "unmapped_book", + book_slug=book_slug, + book_name=api_book.get('bookName') + ) + continue + + # Get or verify collection exists in database + collection = self.repo.get_collection_by_abbreviation(collection_abbr) + + if not collection: + logger.warning( + "collection_not_in_db", + abbreviation=collection_abbr, + book_slug=book_slug + ) + continue + + collection_id = UUID(collection['id']) + book_mapping[book_slug] = { + 'collection_id': collection_id, + 'book_id': api_book.get('id'), + 'book_name': api_book.get('bookName'), + 'hadiths_count': api_book.get('hadiths_count'), + 'chapters_count': api_book.get('chapters_count') + } + + logger.info( + "book_mapped", + book_slug=book_slug, + collection_abbr=collection_abbr, + hadiths_count=api_book.get('hadiths_count') + ) + + logger.info( + "books_synced", + total_books=len(book_mapping) + ) + + return book_mapping + + def ingest_collection( + self, + book_slug: str, + limit: Optional[int] = None + ) -> dict: + """ + Ingest entire collection from HadithAPI.com + + Args: + book_slug: Book slug identifier (e.g., 'sahih-bukhari') + limit: Optional limit on number of hadiths to ingest + + Returns: + Statistics dictionary + """ + logger.info( + "ingestion_started", + book_slug=book_slug, + limit=limit + ) + + # Get book mapping + book_mapping = self.sync_books_from_api() + + if book_slug not in book_mapping: + logger.error( + "book_not_mapped", + book_slug=book_slug, + available_books=list(book_mapping.keys()) + ) + raise ValueError(f"Book '{book_slug}' not found or not mapped") + + book_info = book_mapping[book_slug] + collection_id = book_info['collection_id'] + book_id = book_info['book_id'] + + # Create ingestion job + job_id = self.repo.create_ingestion_job( + job_name=f"ingest_{book_slug}", + job_type="api_fetch", + source_name="hadithapi.com", + config={ + "book_slug": book_slug, + "book_id": book_id, + "limit": limit + } + ) + + logger.info( + "job_created", + job_id=str(job_id), + book_slug=book_slug, + expected_count=book_info.get('hadiths_count') + ) + + stats = { + "processed": 0, + "failed": 0, + "skipped": 0 + } + + try: + # Iterate through all hadiths in book + for hadith_data, chapter_data in self.api_client.iter_all_hadiths_in_book_with_chapters( + book_id=book_id, + book_slug=book_slug, + batch_size=100 + ): + # Check limit + if limit and stats["processed"] >= limit: + logger.info("limit_reached", limit=limit) + break + + try: + # Process and store hadith + self._process_and_store_hadith( + collection_id=collection_id, + hadith_data=hadith_data, + chapter_data=chapter_data + ) + + stats["processed"] += 1 + + # Update job progress every 100 hadiths + if stats["processed"] % 100 == 0: + self.repo.update_job_progress( + job_id=job_id, + processed=stats["processed"], + failed=stats["failed"], + skipped=stats["skipped"] + ) + + logger.info( + "progress_update", + book_slug=book_slug, + processed=stats["processed"], + failed=stats["failed"], + percentage=round( + (stats["processed"] / int(book_info.get('hadiths_count', 1))) * 100, + 2 + ) if book_info.get('hadiths_count') else 0 + ) + + except Exception as e: + stats["failed"] += 1 + logger.error( + "hadith_processing_failed", + error=str(e), + hadith_number=hadith_data.get("hadithNumber"), + hadith_id=hadith_data.get("id") + ) + + self.repo.add_processing_log( + job_id=job_id, + level="ERROR", + message=f"Failed to process hadith: {str(e)}", + details={"hadith_data": hadith_data} + ) + + # Update final job progress + self.repo.update_job_progress( + job_id=job_id, + total=stats["processed"] + stats["failed"] + stats["skipped"], + processed=stats["processed"], + failed=stats["failed"], + skipped=stats["skipped"] + ) + + # Mark job as complete + self.repo.complete_job(job_id=job_id, status="success") + + # Update collection count + self.repo.update_collection_count( + collection_id=collection_id, + count=stats["processed"] + ) + + logger.info( + "ingestion_completed", + book_slug=book_slug, + stats=stats + ) + + return stats + + except Exception as e: + logger.error( + "ingestion_failed", + book_slug=book_slug, + error=str(e), + exc_info=True + ) + + self.repo.complete_job( + job_id=job_id, + status="failed", + error_message=str(e) + ) + + raise + + def _process_and_store_hadith( + self, + collection_id: UUID, + hadith_data: dict, + chapter_data: Optional[dict] + ): + """Process and store a single hadith""" + + # Extract hadith number + hadith_number = hadith_data.get("hadithNumber") + if not hadith_number: + raise ValueError("Missing hadith number") + + # Convert to integer + try: + hadith_number = int(hadith_number) + except (ValueError, TypeError): + raise ValueError(f"Invalid hadith number: {hadith_number}") + + # Extract text in multiple languages + arabic_text = hadith_data.get("hadithArabic") + english_text = hadith_data.get("hadithEnglish") + urdu_text = hadith_data.get("hadithUrdu") + + if not arabic_text: + raise ValueError("Missing Arabic text") + + # Clean texts + arabic_text = self.text_cleaner.clean_text(arabic_text) + if english_text: + english_text = self.text_cleaner.clean_text(english_text) + if urdu_text: + urdu_text = self.text_cleaner.clean_text(urdu_text) + + # Extract grade/status + grade = hadith_data.get("status") + + # Get or create chapter (book in our schema) + book_id = None + chapter_name = None + + if chapter_data: + chapter_id = chapter_data.get('id') + chapter_number = chapter_data.get('chapterNumber') + chapter_name_en = chapter_data.get('chapterEnglish') + chapter_name_ar = chapter_data.get('chapterArabic') + chapter_name = chapter_name_en + + if chapter_number: + try: + chapter_number = int(chapter_number) + except (ValueError, TypeError): + chapter_number = chapter_id # Fallback to ID + + # Get or create book (chapter in HadithAPI = book in our schema) + existing_book = self.repo.get_book(collection_id, chapter_number) + + if not existing_book: + book_id = self.repo.upsert_book( + collection_id=collection_id, + book_number=chapter_number, + name_english=chapter_name_en, + name_arabic=chapter_name_ar, + metadata=chapter_data + ) + else: + book_id = UUID(existing_book['id']) + + # Build source metadata + source_metadata = { + 'api_id': hadith_data.get('id'), + 'englishNarrator': hadith_data.get('englishNarrator'), + 'urduNarrator': hadith_data.get('urduNarrator'), + 'bookSlug': hadith_data.get('bookSlug'), + 'chapterId': hadith_data.get('chapterId'), + 'chapter': chapter_data + } + + # Store hadith + hadith_id = self.repo.upsert_hadith( + collection_id=collection_id, + book_id=book_id, + hadith_number=hadith_number, + arabic_text=arabic_text, + english_text=english_text, + urdu_text=urdu_text, + grade=grade, + grade_source="hadithapi.com", + chapter_name=chapter_name, + source_id=str(hadith_data.get('id', '')), + source_url=f"https://hadithapi.com/hadith/{hadith_data.get('id')}", + source_metadata=source_metadata + ) + + logger.debug( + "hadith_stored", + hadith_id=str(hadith_id), + hadith_number=hadith_number, + chapter_id=chapter_data.get('id') if chapter_data else None + ) + + def ingest_all_books(self, limit_per_book: Optional[int] = None) -> Dict[str, dict]: + """ + Ingest all available books + + Args: + limit_per_book: Optional limit per book + + Returns: + Dictionary of book_slug -> stats + """ + logger.info("ingesting_all_books", limit_per_book=limit_per_book) + + book_mapping = self.sync_books_from_api() + results = {} + + for book_slug in book_mapping.keys(): + logger.info("starting_book", book_slug=book_slug) + + try: + stats = self.ingest_collection( + book_slug=book_slug, + limit=limit_per_book + ) + results[book_slug] = {"status": "success", "stats": stats} + + except Exception as e: + logger.error( + "book_ingestion_failed", + book_slug=book_slug, + error=str(e) + ) + results[book_slug] = {"status": "failed", "error": str(e)} + + logger.info("all_books_completed", results=results) + return results + + def close(self): + """Close connections""" + self.api_client.close() + + +def main(): + """Main entry point""" + parser = argparse.ArgumentParser( + description="Ingest hadiths from HadithAPI.com" + ) + parser.add_argument( + "--book-slug", + help="Book slug (e.g., sahih-bukhari). If not provided, ingests all books." + ) + parser.add_argument( + "--limit", + type=int, + help="Limit number of hadiths to ingest per book" + ) + parser.add_argument( + "--list-books", + action="store_true", + help="List available books and exit" + ) + + args = parser.parse_args() + + try: + service = HadithAPIIngestionService() + + # List books mode + if args.list_books: + logger.info("listing_available_books") + book_mapping = service.sync_books_from_api() + + print("\n=== Available Books ===\n") + for book_slug, info in book_mapping.items(): + print(f"Book Slug: {book_slug}") + print(f" Name: {info['book_name']}") + print(f" Hadiths: {info['hadiths_count']}") + print(f" Chapters: {info['chapters_count']}") + print() + + service.close() + return 0 + + # Ingest mode + if args.book_slug: + logger.info( + "script_started", + book_slug=args.book_slug, + limit=args.limit + ) + + stats = service.ingest_collection( + book_slug=args.book_slug, + limit=args.limit + ) + + logger.info("script_completed", stats=stats) + + print(f"\n=== Ingestion completed for {args.book_slug} ===") + print(f"Processed: {stats['processed']}") + print(f"Failed: {stats['failed']}") + print(f"Skipped: {stats['skipped']}") + + else: + # Ingest all books + logger.info("script_started_all_books", limit_per_book=args.limit) + + results = service.ingest_all_books(limit_per_book=args.limit) + + print("\n=== All Books Ingestion Summary ===\n") + for book_slug, result in results.items(): + print(f"{book_slug}: {result['status']}") + if result['status'] == 'success': + stats = result['stats'] + print(f" Processed: {stats['processed']}") + print(f" Failed: {stats['failed']}") + else: + print(f" Error: {result['error']}") + print() + + service.close() + return 0 + + except Exception as e: + logger.error( + "script_failed", + error=str(e), + exc_info=True + ) + + print(f"\nIngestion failed: {str(e)}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) +=== ./src/__init__.py === + +=== ./src/utils/__init__.py === + +=== ./src/utils/logger.py === + +=== ./src/utils/retry.py === + +=== ./src/processors/validator.py === + +=== ./src/processors/arabic_normalizer.py === + +=== ./src/processors/__init__.py === + +=== ./src/processors/text_cleaner.py === +""" +Text cleaning and normalization utilities +""" +import re +from typing import Optional +import unicodedata +import structlog + +logger = structlog.get_logger() + + +class ArabicTextProcessor: + """Process and normalize Arabic text""" + + # Arabic diacritics to remove + DIACRITICS = re.compile( + r'[\u064B-\u065F\u0670\u06D6-\u06DC\u06DF-\u06E8\u06EA-\u06ED]' + ) + + # Tatweel (elongation character) + TATWEEL = '\u0640' + + # Normalize Arabic letters + ALEF_VARIANTS = re.compile(r'[إأآا]') + ALEF_MAKSURA = 'ى' + YAA = 'ي' + TAA_MARBUTA = 'ة' + HAA = 'ه' + + @classmethod + def remove_diacritics(cls, text: str) -> str: + """Remove Arabic diacritics (tashkeel)""" + if not text: + return text + return cls.DIACRITICS.sub('', text) + + @classmethod + def remove_tatweel(cls, text: str) -> str: + """Remove tatweel (elongation) character""" + if not text: + return text + return text.replace(cls.TATWEEL, '') + + @classmethod + def normalize_alef(cls, text: str) -> str: + """Normalize all Alef variants to bare Alef""" + if not text: + return text + return cls.ALEF_VARIANTS.sub('ا', text) + + @classmethod + def normalize_yaa(cls, text: str) -> str: + """Normalize Alef Maksura to Yaa""" + if not text: + return text + return text.replace(cls.ALEF_MAKSURA, cls.YAA) + + @classmethod + def normalize_taa_marbuta(cls, text: str) -> str: + """Normalize Taa Marbuta to Haa""" + if not text: + return text + return text.replace(cls.TAA_MARBUTA, cls.HAA) + + @classmethod + def normalize_whitespace(cls, text: str) -> str: + """Normalize whitespace""" + if not text: + return text + # Replace multiple spaces with single space + text = re.sub(r'\s+', ' ', text) + # Trim + return text.strip() + + @classmethod + def normalize_full(cls, text: str) -> str: + """ + Apply full normalization: + - Remove diacritics + - Remove tatweel + - Normalize Alef variants + - Normalize Yaa + - Normalize Taa Marbuta + - Normalize whitespace + """ + if not text: + return text + + text = cls.remove_diacritics(text) + text = cls.remove_tatweel(text) + text = cls.normalize_alef(text) + text = cls.normalize_yaa(text) + text = cls.normalize_taa_marbuta(text) + text = cls.normalize_whitespace(text) + + return text + + @classmethod + def extract_sanad_matn(cls, text: str) -> tuple[Optional[str], Optional[str]]: + """ + Attempt to extract sanad (chain) and matn (text) from hadith + + Common patterns: + - حدثنا ... قال ... (sanad ends before reported speech) + - Simple heuristic: Split on first occurrence of قال or أن + + Returns: + Tuple of (sanad, matn) or (None, None) if cannot split + """ + if not text: + return None, None + + # Look for common sanad-matn separators + separators = [ + r'قال\s*رسول\s*الله', # "The Messenger of Allah said" + r'قال\s*النبي', # "The Prophet said" + r'عن\s*النبي', # "From the Prophet" + r'أن\s*رسول\s*الله', # "That the Messenger of Allah" + ] + + for pattern in separators: + match = re.search(pattern, text, re.IGNORECASE) + if match: + split_pos = match.start() + sanad = text[:split_pos].strip() + matn = text[split_pos:].strip() + + logger.debug( + "sanad_matn_extracted", + sanad_length=len(sanad), + matn_length=len(matn) + ) + + return sanad, matn + + # Could not split + logger.debug("sanad_matn_extraction_failed") + return None, None + + +class TextCleaner: + """General text cleaning utilities""" + + @staticmethod + def clean_html(text: str) -> str: + """Remove HTML tags""" + if not text: + return text + return re.sub(r'<[^>]+>', '', text) + + @staticmethod + def normalize_unicode(text: str) -> str: + """Normalize Unicode (NFC normalization)""" + if not text: + return text + return unicodedata.normalize('NFC', text) + + @staticmethod + def clean_text(text: str) -> str: + """Apply general cleaning""" + if not text: + return text + + # Remove HTML + text = TextCleaner.clean_html(text) + + # Normalize Unicode + text = TextCleaner.normalize_unicode(text) + + # Normalize whitespace + text = ArabicTextProcessor.normalize_whitespace(text) + + return text +=== ./src/api_clients/base_client.py === +""" +Base API client with retry logic and rate limiting +""" +import httpx +import time +from typing import Optional, Dict, Any +from tenacity import ( + retry, + stop_after_attempt, + wait_exponential, + retry_if_exception_type +) +import structlog +from config.settings import settings + +logger = structlog.get_logger() + + +class BaseAPIClient: + """Base class for API clients with built-in retry and rate limiting""" + + def __init__( + self, + base_url: str, + api_key: Optional[str] = None, + rate_limit: int = 90, + timeout: int = 30 + ): + self.base_url = base_url.rstrip('/') + self.api_key = api_key + self.rate_limit = rate_limit + self.timeout = timeout + + # Rate limiting + self.request_times = [] + self.min_interval = 60.0 / rate_limit # seconds between requests + + # HTTP client + self.client = httpx.Client(timeout=timeout) + + logger.info( + "api_client_initialized", + base_url=base_url, + rate_limit=rate_limit + ) + + def _wait_for_rate_limit(self): + """Implement rate limiting""" + now = time.time() + + # Remove old timestamps (older than 1 minute) + self.request_times = [t for t in self.request_times if now - t < 60] + + # If we're at the limit, wait + if len(self.request_times) >= self.rate_limit: + sleep_time = 60 - (now - self.request_times[0]) + if sleep_time > 0: + logger.info( + "rate_limit_wait", + sleep_seconds=sleep_time, + requests_in_window=len(self.request_times) + ) + time.sleep(sleep_time) + self.request_times = [] + + # Add current timestamp + self.request_times.append(time.time()) + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=2, max=10), + retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)), + reraise=True + ) + def _make_request( + self, + method: str, + endpoint: str, + params: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None + ) -> Dict[str, Any]: + """Make HTTP request with retry logic""" + + # Rate limiting + self._wait_for_rate_limit() + + # Prepare headers + request_headers = headers or {} + if self.api_key: + request_headers['X-API-Key'] = self.api_key + + # Make request + url = f"{self.base_url}/{endpoint.lstrip('/')}" + + logger.debug( + "api_request", + method=method, + url=url, + params=params + ) + + response = self.client.request( + method=method, + url=url, + params=params, + headers=request_headers + ) + + response.raise_for_status() + + logger.debug( + "api_response", + status_code=response.status_code, + response_size=len(response.content) + ) + + return response.json() + + def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]: + """Make GET request""" + return self._make_request("GET", endpoint, params=params) + + def close(self): + """Close the HTTP client""" + self.client.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() +=== ./src/api_clients/hadith_one_client.py === + +=== ./src/api_clients/__init__.py === + +=== ./src/api_clients/hadithapi_client.py === +""" +Client for HadithAPI.com API +""" +from typing import List, Dict, Any, Optional, Generator +import structlog +from .base_client import BaseAPIClient +from config.settings import settings + +logger = structlog.get_logger() + + +class HadithAPIClient(BaseAPIClient): + """Client for interacting with hadithapi.com API""" + + def __init__(self, api_key: Optional[str] = None): + super().__init__( + base_url="https://hadithapi.com/api", + api_key=api_key or settings.HADITHAPI_KEY, + rate_limit=30 # Conservative: 30 req/min + ) + + def _add_api_key(self, params: Optional[Dict] = None) -> Dict: + """Add API key to request parameters""" + params = params or {} + params['apiKey'] = self.api_key + return params + + def get_books(self) -> List[Dict[str, Any]]: + """ + Get list of all available books/collections + + Returns: + List of book dictionaries + """ + logger.info("fetching_books") + + params = self._add_api_key() + response = self.get("books", params=params) + + if response.get('status') != 200: + logger.error( + "api_error", + status=response.get('status'), + message=response.get('message') + ) + raise Exception(f"API Error: {response.get('message')}") + + books = response.get('data', []) + + logger.info( + "books_fetched", + count=len(books) + ) + + return books + + def get_chapters(self, book_slug: str) -> List[Dict[str, Any]]: + """ + Get chapters for a specific book + + Args: + book_slug: Book slug identifier (e.g., 'sahih-bukhari') + + Returns: + List of chapter dictionaries + """ + logger.info( + "fetching_chapters", + book_slug=book_slug + ) + + params = self._add_api_key() + response = self.get(f"{book_slug}/chapters", params=params) + + if response.get('status') != 200: + logger.error( + "api_error", + status=response.get('status'), + message=response.get('message') + ) + raise Exception(f"API Error: {response.get('message')}") + + chapters = response.get('data', []) + + logger.info( + "chapters_fetched", + book_slug=book_slug, + count=len(chapters) + ) + + return chapters + + def get_hadiths_page( + self, + book_id: int, + chapter_id: Optional[int] = None, + page: int = 1, + limit: int = 100 + ) -> Dict[str, Any]: + """ + Get a page of hadiths + + Args: + book_id: Book ID + chapter_id: Optional chapter ID to filter by + page: Page number (1-indexed) + limit: Results per page (max 100) + + Returns: + Response dictionary with hadiths and pagination info + """ + params = self._add_api_key({ + 'book': book_id, + 'page': page, + 'limit': min(limit, 100) # Enforce max limit + }) + + if chapter_id: + params['chapter'] = chapter_id + + logger.debug( + "fetching_hadiths_page", + book_id=book_id, + chapter_id=chapter_id, + page=page, + limit=limit + ) + + response = self.get("hadiths", params=params) + + if response.get('status') != 200: + logger.error( + "api_error", + status=response.get('status'), + message=response.get('message') + ) + raise Exception(f"API Error: {response.get('message')}") + + return response.get('data', {}) + + def iter_all_hadiths_in_book( + self, + book_id: int, + book_slug: str, + chapter_id: Optional[int] = None, + batch_size: int = 100 + ) -> Generator[Dict[str, Any], None, None]: + """ + Iterator that yields all hadiths in a book, handling pagination automatically + + Args: + book_id: Book ID + book_slug: Book slug for logging + chapter_id: Optional chapter ID to filter by + batch_size: Number of hadiths to fetch per request (max 100) + + Yields: + Individual hadith dictionaries + """ + page = 1 + total_fetched = 0 + + while True: + response_data = self.get_hadiths_page( + book_id=book_id, + chapter_id=chapter_id, + page=page, + limit=batch_size + ) + + hadiths = response_data.get('hadiths', []) + pagination = response_data.get('pagination', {}) + + if not hadiths: + logger.info( + "book_complete", + book_slug=book_slug, + chapter_id=chapter_id, + total_hadiths=total_fetched + ) + break + + for hadith in hadiths: + yield hadith + total_fetched += 1 + + # Log progress + if total_fetched % 500 == 0: + logger.info( + "progress", + book_slug=book_slug, + fetched=total_fetched, + total=pagination.get('total', '?') + ) + + # Check if there are more pages + current_page = pagination.get('current_page', page) + last_page = pagination.get('last_page', 1) + + if current_page >= last_page: + logger.info( + "book_complete", + book_slug=book_slug, + total_hadiths=total_fetched, + total_pages=last_page + ) + break + + page += 1 + + def iter_all_hadiths_in_book_with_chapters( + self, + book_id: int, + book_slug: str, + batch_size: int = 100 + ) -> Generator[tuple[Dict[str, Any], Optional[Dict[str, Any]]], None, None]: + """ + Iterator that yields all hadiths in a book, organized by chapter + + Args: + book_id: Book ID + book_slug: Book slug + batch_size: Number of hadiths to fetch per request + + Yields: + Tuple of (hadith_dict, chapter_dict or None) + """ + # First, get all chapters + try: + chapters = self.get_chapters(book_slug) + except Exception as e: + logger.warning( + "chapters_fetch_failed", + book_slug=book_slug, + error=str(e), + fallback="fetching_all_hadiths_without_chapter_filter" + ) + # Fallback: fetch all hadiths without chapter filter + for hadith in self.iter_all_hadiths_in_book( + book_id=book_id, + book_slug=book_slug, + batch_size=batch_size + ): + chapter_info = hadith.get('chapter') + yield hadith, chapter_info + return + + logger.info( + "starting_chapter_by_chapter_fetch", + book_slug=book_slug, + total_chapters=len(chapters) + ) + + # Process each chapter + for chapter in chapters: + chapter_id = chapter.get('id') + chapter_number = chapter.get('chapterNumber') + + logger.info( + "fetching_chapter", + book_slug=book_slug, + chapter_id=chapter_id, + chapter_number=chapter_number + ) + + try: + for hadith in self.iter_all_hadiths_in_book( + book_id=book_id, + book_slug=book_slug, + chapter_id=chapter_id, + batch_size=batch_size + ): + yield hadith, chapter + except Exception as e: + logger.error( + "chapter_fetch_failed", + book_slug=book_slug, + chapter_id=chapter_id, + error=str(e) + ) + continue + + def get_book_by_slug(self, book_slug: str) -> Optional[Dict[str, Any]]: + """ + Get book details by slug + + Args: + book_slug: Book slug identifier + + Returns: + Book dictionary or None if not found + """ + books = self.get_books() + for book in books: + if book.get('bookSlug') == book_slug: + return book + return None +=== ./src/api_clients/sunnah_client.py === +""" +Client for Sunnah.com API +""" +from typing import List, Dict, Any, Optional, Generator +import structlog +from .base_client import BaseAPIClient +from config.settings import settings + +logger = structlog.get_logger() + + +class SunnahAPIClient(BaseAPIClient): + """Client for interacting with Sunnah.com API""" + + def __init__(self, api_key: Optional[str] = None): + super().__init__( + base_url=settings.SUNNAH_BASE_URL, + api_key=api_key or settings.SUNNAH_API_KEY, + rate_limit=settings.API_RATE_LIMIT + ) + + def get_collections(self) -> List[Dict[str, Any]]: + """ + Get list of all hadith collections + + Returns: + List of collection dictionaries + """ + logger.info("fetching_collections") + + response = self.get("collections") + collections = response.get("data", []) + + logger.info( + "collections_fetched", + count=len(collections) + ) + + return collections + + def get_collection_details(self, collection_name: str) -> Dict[str, Any]: + """ + Get details for a specific collection + + Args: + collection_name: Collection abbreviation (e.g., 'bukhari') + + Returns: + Collection details dictionary + """ + logger.info( + "fetching_collection_details", + collection=collection_name + ) + + response = self.get(f"collections/{collection_name}") + + return response + + def get_books(self, collection_name: str) -> List[Dict[str, Any]]: + """ + Get all books in a collection + + Args: + collection_name: Collection abbreviation + + Returns: + List of book dictionaries + """ + logger.info( + "fetching_books", + collection=collection_name + ) + + response = self.get(f"collections/{collection_name}/books") + books = response.get("data", []) + + logger.info( + "books_fetched", + collection=collection_name, + count=len(books) + ) + + return books + + def get_hadiths_in_book( + self, + collection_name: str, + book_number: int, + limit: int = 50, + page: int = 1 + ) -> Dict[str, Any]: + """ + Get hadiths in a specific book with pagination + + Args: + collection_name: Collection abbreviation + book_number: Book number + limit: Number of hadiths per page + page: Page number + + Returns: + Response with hadiths and pagination info + """ + logger.debug( + "fetching_hadiths", + collection=collection_name, + book=book_number, + page=page, + limit=limit + ) + + response = self.get( + f"collections/{collection_name}/books/{book_number}/hadiths", + params={"limit": limit, "page": page} + ) + + return response + + def iter_all_hadiths_in_book( + self, + collection_name: str, + book_number: int, + batch_size: int = 50 + ) -> Generator[Dict[str, Any], None, None]: + """ + Iterator that yields all hadiths in a book, handling pagination automatically + + Args: + collection_name: Collection abbreviation + book_number: Book number + batch_size: Number of hadiths to fetch per request + + Yields: + Individual hadith dictionaries + """ + page = 1 + total_fetched = 0 + + while True: + response = self.get_hadiths_in_book( + collection_name=collection_name, + book_number=book_number, + limit=batch_size, + page=page + ) + + hadiths = response.get("data", []) + + if not hadiths: + logger.info( + "book_complete", + collection=collection_name, + book=book_number, + total_hadiths=total_fetched + ) + break + + for hadith in hadiths: + yield hadith + total_fetched += 1 + + # Check if there are more pages + pagination = response.get("pagination", {}) + if page >= pagination.get("total_pages", 1): + break + + page += 1 + + def iter_all_hadiths_in_collection( + self, + collection_name: str, + batch_size: int = 50 + ) -> Generator[tuple[Dict[str, Any], int], None, None]: + """ + Iterator that yields all hadiths in a collection + + Args: + collection_name: Collection abbreviation + batch_size: Number of hadiths to fetch per request + + Yields: + Tuple of (hadith_dict, book_number) + """ + # First, get all books in the collection + books = self.get_books(collection_name) + + logger.info( + "starting_collection_fetch", + collection=collection_name, + total_books=len(books) + ) + + for book in books: + book_number = book.get("bookNumber") + + if not book_number: + logger.warning( + "book_missing_number", + book=book + ) + continue + + logger.info( + "fetching_book", + collection=collection_name, + book=book_number + ) + + try: + for hadith in self.iter_all_hadiths_in_book( + collection_name=collection_name, + book_number=int(book_number), + batch_size=batch_size + ): + yield hadith, int(book_number) + except Exception as e: + logger.error( + "book_fetch_failed", + collection=collection_name, + book=book_number, + error=str(e) + ) + continue + + def get_specific_hadith( + self, + collection_name: str, + book_number: int, + hadith_number: int + ) -> Dict[str, Any]: + """ + Get a specific hadith by its number + + Args: + collection_name: Collection abbreviation + book_number: Book number + hadith_number: Hadith number + + Returns: + Hadith dictionary + """ + response = self.get( + f"hadiths/collection/{collection_name}/{book_number}/{hadith_number}" + ) + + return response.get("data", {}) +=== ./src/main.py === +""" +Main ingestion script for fetching hadiths from Sunnah.com API +""" +import sys +import argparse +from typing import Optional +from uuid import UUID +import structlog +from config.settings import settings +from api_clients.sunnah_client import SunnahAPIClient +from database.repository import HadithRepository +from processors.text_cleaner import ArabicTextProcessor, TextCleaner + +# Configure structured logging +structlog.configure( + processors=[ + structlog.stdlib.filter_by_level, + structlog.stdlib.add_logger_name, + structlog.stdlib.add_log_level, + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.JSONRenderer() + ], + wrapper_class=structlog.stdlib.BoundLogger, + context_class=dict, + logger_factory=structlog.stdlib.LoggerFactory(), + cache_logger_on_first_use=True, +) + +logger = structlog.get_logger() + + +class HadithIngestionService: + """Service for ingesting hadiths from Sunnah.com API""" + + def __init__(self): + self.api_client = SunnahAPIClient() + self.repo = HadithRepository() + self.text_processor = ArabicTextProcessor() + self.text_cleaner = TextCleaner() + + def ingest_collection( + self, + collection_abbr: str, + limit: Optional[int] = None + ) -> dict: + """ + Ingest entire collection from Sunnah.com API + + Args: + collection_abbr: Collection abbreviation (e.g., 'bukhari') + limit: Optional limit on number of hadiths to ingest + + Returns: + Statistics dictionary + """ + logger.info( + "ingestion_started", + collection=collection_abbr, + limit=limit + ) + + # Get collection from database + collection = self.repo.get_collection_by_abbreviation(collection_abbr) + if not collection: + logger.error( + "collection_not_found", + collection=collection_abbr + ) + raise ValueError(f"Collection '{collection_abbr}' not found in database") + + collection_id = UUID(collection['id']) + + # Create ingestion job + job_id = self.repo.create_ingestion_job( + job_name=f"ingest_{collection_abbr}", + job_type="api_fetch", + source_name="sunnah.com", + config={"collection": collection_abbr, "limit": limit} + ) + + logger.info( + "job_created", + job_id=str(job_id), + collection=collection_abbr + ) + + stats = { + "processed": 0, + "failed": 0, + "skipped": 0 + } + + try: + # Iterate through all hadiths in collection + for hadith_data, book_number in self.api_client.iter_all_hadiths_in_collection( + collection_name=collection_abbr, + batch_size=50 + ): + # Check limit + if limit and stats["processed"] >= limit: + logger.info("limit_reached", limit=limit) + break + + try: + # Process and store hadith + self._process_and_store_hadith( + collection_id=collection_id, + hadith_data=hadith_data, + book_number=book_number + ) + + stats["processed"] += 1 + + # Update job progress every 100 hadiths + if stats["processed"] % 100 == 0: + self.repo.update_job_progress( + job_id=job_id, + processed=stats["processed"], + failed=stats["failed"], + skipped=stats["skipped"] + ) + + logger.info( + "progress_update", + processed=stats["processed"], + failed=stats["failed"] + ) + + except Exception as e: + stats["failed"] += 1 + logger.error( + "hadith_processing_failed", + error=str(e), + hadith_number=hadith_data.get("hadithNumber") + ) + + self.repo.add_processing_log( + job_id=job_id, + level="ERROR", + message=f"Failed to process hadith: {str(e)}", + details={"hadith_data": hadith_data} + ) + + # Update final job progress + self.repo.update_job_progress( + job_id=job_id, + total=stats["processed"] + stats["failed"] + stats["skipped"], + processed=stats["processed"], + failed=stats["failed"], + skipped=stats["skipped"] + ) + + # Mark job as complete + self.repo.complete_job(job_id=job_id, status="success") + + # Update collection count + self.repo.update_collection_count( + collection_id=collection_id, + count=stats["processed"] + ) + + logger.info( + "ingestion_completed", + collection=collection_abbr, + stats=stats + ) + + return stats + + except Exception as e: + logger.error( + "ingestion_failed", + collection=collection_abbr, + error=str(e) + ) + + self.repo.complete_job( + job_id=job_id, + status="failed", + error_message=str(e) + ) + + raise + + def _process_and_store_hadith( + self, + collection_id: UUID, + hadith_data: dict, + book_number: int + ): + """Process and store a single hadith""" + + # Extract hadith number + hadith_number = hadith_data.get("hadithNumber") + if not hadith_number: + raise ValueError("Missing hadith number") + + # Extract text in multiple languages + hadith_texts = hadith_data.get("hadith", []) + + arabic_text = None + english_text = None + urdu_text = None + grade = None + grade_source = None + chapter_name = None + + for text_entry in hadith_texts: + lang = text_entry.get("lang", "").lower() + body = text_entry.get("body") + + if not body: + continue + + # Clean text + body = self.text_cleaner.clean_text(body) + + if lang == "ar": + arabic_text = body + chapter_name = text_entry.get("chapterTitle") + + # Extract grade from Arabic entry + grades = text_entry.get("grades", []) + if grades: + grade = grades[0].get("grade") + grade_source = grades[0].get("name") + + elif lang == "en": + english_text = body + + # Extract grade from English entry if not found + if not grade: + grades = text_entry.get("grades", []) + if grades: + grade = grades[0].get("grade") + grade_source = grades[0].get("name") + + elif lang == "ur": + urdu_text = body + + if not arabic_text: + raise ValueError("Missing Arabic text") + + # Get or create book + book = self.repo.get_book(collection_id, book_number) + if not book: + # Extract book name from hadith data + book_name_en = None + book_name_ar = None + + for text_entry in hadith_texts: + lang = text_entry.get("lang", "").lower() + book_data = text_entry.get("book", [{}])[0] if text_entry.get("book") else {} + + if lang == "en" and book_data.get("name"): + book_name_en = book_data.get("name") + elif lang == "ar" and book_data.get("name"): + book_name_ar = book_data.get("name") + + book_id = self.repo.upsert_book( + collection_id=collection_id, + book_number=book_number, + name_english=book_name_en, + name_arabic=book_name_ar + ) + else: + book_id = UUID(book["id"]) + + # Store hadith + hadith_id = self.repo.upsert_hadith( + collection_id=collection_id, + book_id=book_id, + hadith_number=int(hadith_number), + arabic_text=arabic_text, + english_text=english_text, + urdu_text=urdu_text, + grade=grade, + grade_source=grade_source, + chapter_name=chapter_name, + source_id=str(hadith_data.get("id", "")), + source_url=hadith_data.get("reference", {}).get("link"), + source_metadata=hadith_data + ) + + logger.debug( + "hadith_stored", + hadith_id=str(hadith_id), + hadith_number=hadith_number, + book_number=book_number + ) + + def close(self): + """Close connections""" + self.api_client.close() + + +def main(): + """Main entry point""" + parser = argparse.ArgumentParser(description="Ingest hadiths from Sunnah.com API") + parser.add_argument( + "collection", + help="Collection abbreviation (e.g., bukhari, muslim)" + ) + parser.add_argument( + "--limit", + type=int, + help="Limit number of hadiths to ingest" + ) + + args = parser.parse_args() + + logger.info( + "script_started", + collection=args.collection, + limit=args.limit + ) + + try: + service = HadithIngestionService() + stats = service.ingest_collection( + collection_abbr=args.collection, + limit=args.limit + ) + + logger.info( + "script_completed", + stats=stats + ) + + print(f"\nIngestion completed successfully!") + print(f"Processed: {stats['processed']}") + print(f"Failed: {stats['failed']}") + print(f"Skipped: {stats['skipped']}") + + service.close() + + return 0 + + except Exception as e: + logger.error( + "script_failed", + error=str(e), + exc_info=True + ) + + print(f"\nIngestion failed: {str(e)}", file=sys.stderr) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/hadith-ingestion/config/__pycache__/__init__.cpython-312.pyc b/hadith-ingestion/config/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..1a351fb Binary files /dev/null and b/hadith-ingestion/config/__pycache__/__init__.cpython-312.pyc differ diff --git a/hadith-ingestion/config/__pycache__/__init__.cpython-38.pyc b/hadith-ingestion/config/__pycache__/__init__.cpython-38.pyc new file mode 100644 index 0000000..ab9e668 Binary files /dev/null and b/hadith-ingestion/config/__pycache__/__init__.cpython-38.pyc differ diff --git a/hadith-ingestion/config/__pycache__/settings.cpython-312.pyc b/hadith-ingestion/config/__pycache__/settings.cpython-312.pyc new file mode 100644 index 0000000..97a1e31 Binary files /dev/null and b/hadith-ingestion/config/__pycache__/settings.cpython-312.pyc differ diff --git a/hadith-ingestion/config/__pycache__/settings.cpython-38.pyc b/hadith-ingestion/config/__pycache__/settings.cpython-38.pyc new file mode 100644 index 0000000..3ed3541 Binary files /dev/null and b/hadith-ingestion/config/__pycache__/settings.cpython-38.pyc differ diff --git a/hadith-ingestion/simple-pod.yaml b/hadith-ingestion/simple-pod.yaml new file mode 100644 index 0000000..8e5c67c --- /dev/null +++ b/hadith-ingestion/simple-pod.yaml @@ -0,0 +1,46 @@ +apiVersion: v1 +kind: Pod +metadata: + name: hadith-ingestion-list-books + namespace: ml +spec: + restartPolicy: Never + containers: + - name: hadith-ingestion + image: axxs/hadith-ingestion:latest + # command: ["python"] + # args: ["/app/src/main_hadithapi.py", "--list-books"] + command: ["sh","-c","sleep infinity"] + env: + - name: DATABASE_HOST + value: "postgres.db.svc.cluster.local" + - name: DATABASE_PORT + value: "5432" + - name: DATABASE_NAME + value: "hadith_db" + - name: DATABASE_USER + value: "hadith_ingest" + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: hadith-db-secret + key: password + - name: HADITHAPI_KEY + valueFrom: + secretKeyRef: + name: hadithapi-secret + key: api-key + - name: MINIO_ENDPOINT + value: "minio.storage.svc.cluster.local:9000" + - name: MINIO_ACCESS_KEY + valueFrom: + secretKeyRef: + name: minio-secret + key: access-key + - name: MINIO_SECRET_KEY + valueFrom: + secretKeyRef: + name: minio-secret + key: secret-key + - name: LOG_LEVEL + value: "INFO" \ No newline at end of file diff --git a/hadith-ingestion/src/.DS_Store b/hadith-ingestion/src/.DS_Store new file mode 100644 index 0000000..dd91efd Binary files /dev/null and b/hadith-ingestion/src/.DS_Store differ diff --git a/hadith-ingestion/src/__pycache__/__init__.cpython-312.pyc b/hadith-ingestion/src/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..77710c3 Binary files /dev/null and b/hadith-ingestion/src/__pycache__/__init__.cpython-312.pyc differ diff --git a/hadith-ingestion/src/__pycache__/__init__.cpython-38.pyc b/hadith-ingestion/src/__pycache__/__init__.cpython-38.pyc new file mode 100644 index 0000000..c0bdaed Binary files /dev/null and b/hadith-ingestion/src/__pycache__/__init__.cpython-38.pyc differ diff --git a/hadith-ingestion/src/api_clients/__pycache__/__init__.cpython-312.pyc b/hadith-ingestion/src/api_clients/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..e256b74 Binary files /dev/null and b/hadith-ingestion/src/api_clients/__pycache__/__init__.cpython-312.pyc differ diff --git a/hadith-ingestion/src/api_clients/__pycache__/__init__.cpython-38.pyc b/hadith-ingestion/src/api_clients/__pycache__/__init__.cpython-38.pyc new file mode 100644 index 0000000..4b41161 Binary files /dev/null and b/hadith-ingestion/src/api_clients/__pycache__/__init__.cpython-38.pyc differ diff --git a/hadith-ingestion/src/api_clients/__pycache__/base_client.cpython-312.pyc b/hadith-ingestion/src/api_clients/__pycache__/base_client.cpython-312.pyc new file mode 100644 index 0000000..f900392 Binary files /dev/null and b/hadith-ingestion/src/api_clients/__pycache__/base_client.cpython-312.pyc differ diff --git a/hadith-ingestion/src/api_clients/__pycache__/base_client.cpython-38.pyc b/hadith-ingestion/src/api_clients/__pycache__/base_client.cpython-38.pyc new file mode 100644 index 0000000..1af3fc1 Binary files /dev/null and b/hadith-ingestion/src/api_clients/__pycache__/base_client.cpython-38.pyc differ diff --git a/hadith-ingestion/src/api_clients/__pycache__/hadithapi_client.cpython-312.pyc b/hadith-ingestion/src/api_clients/__pycache__/hadithapi_client.cpython-312.pyc new file mode 100644 index 0000000..8c6dce5 Binary files /dev/null and b/hadith-ingestion/src/api_clients/__pycache__/hadithapi_client.cpython-312.pyc differ diff --git a/hadith-ingestion/src/api_clients/__pycache__/hadithapi_client.cpython-38.pyc b/hadith-ingestion/src/api_clients/__pycache__/hadithapi_client.cpython-38.pyc new file mode 100644 index 0000000..9f740ed Binary files /dev/null and b/hadith-ingestion/src/api_clients/__pycache__/hadithapi_client.cpython-38.pyc differ diff --git a/hadith-ingestion/src/api_clients/hadithapi_client.py b/hadith-ingestion/src/api_clients/hadithapi_client.py index 02dee1f..d9e2e5a 100644 --- a/hadith-ingestion/src/api_clients/hadithapi_client.py +++ b/hadith-ingestion/src/api_clients/hadithapi_client.py @@ -1,7 +1,7 @@ """ Client for HadithAPI.com API """ -from typing import List, Dict, Any, Optional, Generator +from typing import List, Dict, Any, Optional, Generator, Tuple import structlog from .base_client import BaseAPIClient from config.settings import settings @@ -45,7 +45,8 @@ class HadithAPIClient(BaseAPIClient): ) raise Exception(f"API Error: {response.get('message')}") - books = response.get('data', []) + books = response.get('books', []) + logger.info( "books_fetched", @@ -80,7 +81,8 @@ class HadithAPIClient(BaseAPIClient): ) raise Exception(f"API Error: {response.get('message')}") - chapters = response.get('data', []) + chapters = response.get('chapters', []) + logger.info( "chapters_fetched", @@ -127,7 +129,10 @@ class HadithAPIClient(BaseAPIClient): ) response = self.get("hadiths", params=params) - + # logger.debug( + # "fetching_hadiths_page####", + # response=response + # ) if response.get('status') != 200: logger.error( "api_error", @@ -136,7 +141,7 @@ class HadithAPIClient(BaseAPIClient): ) raise Exception(f"API Error: {response.get('message')}") - return response.get('data', {}) + return response.get('hadiths', {}) def iter_all_hadiths_in_book( self, @@ -162,15 +167,21 @@ class HadithAPIClient(BaseAPIClient): while True: response_data = self.get_hadiths_page( - book_id=book_id, + book_id=book_slug, chapter_id=chapter_id, page=page, limit=batch_size ) - hadiths = response_data.get('hadiths', []) + hadiths = response_data.get('data', []) pagination = response_data.get('pagination', {}) - + # logger.info( + # "book_complete", + # book_slug=book_slug, + # hadiths=hadiths, + # pagination=pagination, + # response = response_data + # ) if not hadiths: logger.info( "book_complete", @@ -190,12 +201,12 @@ class HadithAPIClient(BaseAPIClient): "progress", book_slug=book_slug, fetched=total_fetched, - total=pagination.get('total', '?') + total=response_data.get('total', '?') ) # Check if there are more pages - current_page = pagination.get('current_page', page) - last_page = pagination.get('last_page', 1) + current_page = response_data.get('current_page', page) + last_page = response_data.get('last_page', 1) if current_page >= last_page: logger.info( @@ -213,15 +224,15 @@ class HadithAPIClient(BaseAPIClient): book_id: int, book_slug: str, batch_size: int = 100 - ) -> Generator[tuple[Dict[str, Any], Optional[Dict[str, Any]]], None, None]: + ) -> Generator[Tuple[Dict[str, Any], Optional[Dict[str, Any]]], None, None]: """ Iterator that yields all hadiths in a book, organized by chapter - + Args: book_id: Book ID book_slug: Book slug batch_size: Number of hadiths to fetch per request - + Yields: Tuple of (hadith_dict, chapter_dict or None) """ diff --git a/hadith-ingestion/src/main_hadithapi.py b/hadith-ingestion/src/main_hadithapi.py index 6cd61e4..19dc566 100644 --- a/hadith-ingestion/src/main_hadithapi.py +++ b/hadith-ingestion/src/main_hadithapi.py @@ -151,7 +151,8 @@ class HadithAPIIngestionService: book_info = book_mapping[book_slug] collection_id = book_info['collection_id'] - book_id = book_info['book_id'] + # book_id = book_info['book_id'] + book_id = book_slug # Create ingestion job job_id = self.repo.create_ingestion_job( diff --git a/hadith-ingestion/test_hadithapi.py b/hadith-ingestion/test_hadithapi.py new file mode 100644 index 0000000..0cd20c2 --- /dev/null +++ b/hadith-ingestion/test_hadithapi.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +""" +Quick test script for hadithapi_client.py +""" +import sys +from venv import logger +sys.path.insert(0, '/app') + +from src.api_clients.hadithapi_client import HadithAPIClient +from config.settings import settings + +def test_api_connection(): + """Test basic API connectivity""" + print("=== Testing HadithAPI Client ===\n") + + client = HadithAPIClient() + + # Test 1: Get books + print("Test 1: Fetching available books...") + try: + books = client.get_books() + print(f"✓ Success! Found {len(books)} books") + for book in books[:3]: # Show first 3 + print(f" - {book.get('bookName')} ({book.get('bookSlug')})") + print(f" Hadiths: {book.get('hadiths_count')}, Chapters: {book.get('chapters_count')}") + logger.info(f"Fetched {len(books)} books successfully") + + except Exception as e: + print(f"✗ Failed: {e}") + return False + + # Test 2: Get chapters for Sahih Bukhari + print("\nTest 2: Fetching chapters for Sahih Bukhari...") + try: + chapters = client.get_chapters('sahih-bukhari') + print(f"✓ Success! Found {len(chapters)} chapters") + if chapters: + print(f" First chapter: {chapters[0].get('chapterEnglish')}") + except Exception as e: + print(f"✗ Failed: {e}") + return False + + # Test 3: Fetch first page of hadiths + print("\nTest 3: Fetching first page of hadiths...") + book_id = None + try: + book = client.get_book_by_slug('sahih-bukhari') + if not book: + print("✗ Failed: Book 'sahih-bukhari' not found") + return False + book_id = book.get('id') + page_data = client.get_hadiths_page('sahih-bukhari', page=1, limit=5) + hadiths = page_data.get('hadiths', []) + print(f"✓ Success! Fetched {len(hadiths)} hadiths") + if hadiths: + first = hadiths[0] + print(f" First hadith number: {first.get('hadithNumber')}") + print(f" Arabic text (first 100 chars): {first.get('hadithArabic', '')[:100]}...") + except Exception as e: + print(f"✗ Failed: {e}") + return False + + if book_id is None: + print("✗ Failed: Book ID unavailable for iterator test") + return False + + # # Test 4: Test iterator (fetch 3 hadiths) + print("\nTest 4: Testing hadith iterator (3 hadiths)...") + try: + count = 0 + + for hadith in client.iter_all_hadiths_in_book(book_id='sahih-bukhari', book_slug='sahih-bukhari', batch_size=10): + count += 1 + print(f" Hadith #{hadith.get('hadithNumber')} is {hadith.get('englishNarrator')} and is {hadith.get('status')} ") + if count >= 3: + break + print(f"✓ Success! Iterator working correctly") + except Exception as e: + print(f"✗ Failed: {e}") + return False + + client.close() + print("\n=== All Tests Passed! ===") + return True + +if __name__ == "__main__": + success = test_api_connection() + sys.exit(0 if success else 1) \ No newline at end of file