hadith-ingestion/hadith-ingestion/combined.txt

3426 lines
102 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

=== ./run-full-ingestion.sh ===
#!/bin/bash
# run-full-ingestion.sh
set -e
echo "=== Starting Full HadithAPI Ingestion ==="
# Book slug to collection abbreviation mapping
# Books to ingest (in order)
BOOKS=(
# "sahih-bukhari"
# "sahih-muslim"
# "abu-dawood"
# "al-tirmidhi"
# "ibn-e-majah"
# "sunan-nasai"
# "musnad-ahmad"
# "al-silsila-sahiha"
"mishkat"
)
for BOOK in "${BOOKS[@]}"; do
echo -e "\n========================================="
echo "Ingesting: $BOOK"
echo "========================================="
argo submit -n ml argo/workflows/ingest-hadithapi.yaml \
--parameter book-slug=$BOOK \
--parameter limit=0 \
--wait \
--log
echo "$BOOK completed!"
# Optional: add delay between books
sleep 10
done
echo -e "\n=== All Books Ingestion Complete ==="
# Print summary
kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c "
SELECT
c.name_english,
c.abbreviation,
COUNT(h.id) as hadith_count,
COUNT(DISTINCT b.id) as chapter_count
FROM collections c
LEFT JOIN hadiths h ON c.id = h.collection_id
LEFT JOIN books b ON h.book_id = b.id
GROUP BY c.name_english, c.abbreviation
ORDER BY hadith_count DESC;
"
=== ./create-secrets.sh ===
#!/bin/bash
# create-secrets.sh
# Database secret
kubectl -n ml create secret generic hadith-db-secret \
--from-literal=password='hadith_ingest' \
--dry-run=client -o yaml | kubectl apply -f -
# HadithAPI secret (already public, but for consistency)
kubectl -n ml create secret generic hadithapi-secret \
--from-literal=api-key='$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK' \
--dry-run=client -o yaml | kubectl apply -f -
# MinIO secret
kubectl -n ml create secret generic minio-secret \
--from-literal=access-key='minioadmin' \
--from-literal=secret-key='minioadmin' \
--dry-run=client -o yaml | kubectl apply -f -
echo "Secrets created successfully"
=== ./full-ingestion.sh ===
#!/bin/bash
# run-full-ingestion.sh
set -e
echo "=== Starting Full HadithAPI Ingestion ==="
# Books to ingest (in order)
BOOKS=(
"sahih-bukhari"
"sahih-muslim"
"sunan-abu-dawood"
"jami-at-tirmidhi"
"sunan-an-nasai"
"sunan-ibn-e-majah"
)
for BOOK in "${BOOKS[@]}"; do
echo -e "\n========================================="
echo "Ingesting: $BOOK"
echo "========================================="
argo submit -n argo argo/workflows/ingest-hadithapi.yaml \
--parameter book-slug=$BOOK \
--parameter limit=0 \
--wait \
--log
echo "$BOOK completed!"
# Optional: add delay between books
sleep 10
done
echo -e "\n=== All Books Ingestion Complete ==="
# Print summary
kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c "
SELECT
c.name_english,
c.abbreviation,
COUNT(h.id) as hadith_count,
COUNT(DISTINCT b.id) as chapter_count
FROM collections c
LEFT JOIN hadiths h ON c.id = h.collection_id
LEFT JOIN books b ON h.book_id = b.id
GROUP BY c.name_english, c.abbreviation
ORDER BY hadith_count DESC;
"
=== ./requirements.txt ===
# Core dependencies
python-dotenv==1.0.0
pydantic==2.5.0
pydantic-settings==2.1.0
# HTTP clients
httpx==0.25.2
requests==2.31.0
tenacity==8.2.3
# Database
psycopg2-binary==2.9.9
sqlalchemy==2.0.23
asyncpg==0.29.0
# Data processing
pandas==2.1.4
numpy==1.26.2
pyarabic==0.6.15
arabic-reshaper==3.0.0
# Validation
jsonschema==4.20.0
validators==0.22.0
# Logging & Monitoring
structlog==23.2.0
prometheus-client==0.19.0
# Cloud storage
minio==7.2.0
boto3==1.34.0
# Task queue (optional)
celery==5.3.4
redis==5.0.1
# Testing
pytest==7.4.3
pytest-asyncio==0.21.1
pytest-cov==4.1.0
faker==21.0.0
httpx==0.25.2
qdrant-client==1.7.0
tqdm==4.66.1
asyncpg==0.29.0
=== ./config/__init__.py ===
=== ./config/settings.py ===
"""
Configuration settings for hadith ingestion service
"""
from pydantic_settings import BaseSettings
from typing import Optional
import os
class Settings(BaseSettings):
"""Application settings loaded from environment variables"""
# Database
# DATABASE_HOST: str = "postgres.db.svc.cluster.local"
DATABASE_HOST: str = "pg.betelgeusebytes.io"
DATABASE_PORT: int = 5432
DATABASE_NAME: str = "hadith_db"
DATABASE_USER: str = "hadith_ingest"
DATABASE_PASSWORD: str = "hadith_ingest"
@property
def DATABASE_URL(self) -> str:
return (
f"postgresql://{self.DATABASE_USER}:{self.DATABASE_PASSWORD}"
f"@{self.DATABASE_HOST}:{self.DATABASE_PORT}/{self.DATABASE_NAME}"
)
@property
def ASYNC_DATABASE_URL(self) -> str:
return (
f"postgresql+asyncpg://{self.DATABASE_USER}:{self.DATABASE_PASSWORD}"
f"@{self.DATABASE_HOST}:{self.DATABASE_PORT}/{self.DATABASE_NAME}"
)
# MinIO / S3
MINIO_ENDPOINT: str = "minio.storage.svc.cluster.local:9000"
MINIO_ACCESS_KEY: str = "minioadmin"
MINIO_SECRET_KEY: str = "minioadmin"
MINIO_BUCKET_RAW: str = "hadith-raw-data"
MINIO_BUCKET_PROCESSED: str = "hadith-processed"
MINIO_SECURE: bool = False
SUNNAH_BASE_URL: str = "https://api.sunnah.com/v1"
HADITH_ONE_API_KEY: Optional[str] = "$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK"
# HadithAPI.com
HADITHAPI_KEY: str = "$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK"
HADITHAPI_BASE_URL: str = "https://hadithapi.com/api"
# Rate limiting
API_RATE_LIMIT: int = 30 # requests per minute
API_MAX_RETRIES: int = 3
API_RETRY_DELAY: int = 5 # seconds
# Processing
BATCH_SIZE: int = 100
MAX_WORKERS: int = 4
# TEI Service (for embeddings)
TEI_URL: str = "http://tei.ml.svc.cluster.local"
TEI_TIMEOUT: int = 30
# Qdrant
QDRANT_URL: str = "http://qdrant.db.svc.cluster.local:6333"
QDRANT_COLLECTION: str = "hadith_embeddings"
# Logging
LOG_LEVEL: str = "INFO"
LOG_FORMAT: str = "json"
# Job tracking
JOB_NAME: Optional[str] = None
JOB_TYPE: str = "api_fetch"
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
case_sensitive = True
# Global settings instance
settings = Settings()
=== ./Dockerfile ===
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
postgresql-client \
curl \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY config/ /app/config/
COPY src/ /app/src/
# Create non-root user
RUN useradd -m -u 1000 hadith && chown -R hadith:hadith /app
USER hadith
# Set Python path
ENV PYTHONPATH=/app
# Default command
CMD ["python", "/app/src/main_hadithapi.py"]
=== ./tests/__init__.py ===
=== ./tests/test_clients.py ===
=== ./test-hadithapi-k8s.sh ===
#!/bin/bash
# test-hadithapi-k8s.sh
set -e
echo "=== Kubernetes HadithAPI Integration Test ==="
# 1. Create secrets
echo "Creating secrets..."
#./create-secrets.sh
# 2. Build and load image (if using local cluster)
echo "Building Docker image..."
#docker build -t hadith-ingestion:latest .
# If using kind/minikube, load image
# kind load docker-image hadith-ingestion:latest
# 3. Submit test workflow (10 hadiths)
echo "Submitting test workflow..."
argo submit -n ml argo/workflows/ingest-hadithapi.yaml \
--parameter book-slug=sahih-muslim \
--parameter limit=10 \
--wait \
--log
# 4. Check workflow status
echo -e "\nChecking workflow status..."
argo list -n argo
# 5. Verify data in database
echo -e "\nVerifying data..."
kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c "
SELECT
c.name_english,
COUNT(h.id) as hadith_count,
MAX(h.created_at) as last_ingestion
FROM collections c
LEFT JOIN hadiths h ON c.id = h.collection_id
WHERE c.abbreviation = 'bukhari'
GROUP BY c.name_english;
"
echo -e "\n=== Test Complete ==="
=== ./test_mainhadithapi.py ===
#!/usr/bin/env python3
"""
Test script for main_hadithapi.py
"""
import sys
import os
sys.path.insert(0, '.')
from src.main_hadithapi import HadithAPIIngestionService
def test_main_hadithapi():
"""Test the main HadithAPI ingestion service"""
print("=== Testing HadithAPI Ingestion Service ===\n")
try:
# Initialize the service
print("1. Initializing HadithAPIIngestionService...")
service = HadithAPIIngestionService()
print("✓ Service initialized successfully\n")
# Test 1: List available books
print("2. Testing book synchronization...")
book_mapping = service.sync_books_from_api()
print(f"✓ Found {len(book_mapping)} mapped books")
for book_slug, info in list(book_mapping.items())[:3]: # Show first 3
print(f" - {book_slug}: {info['book_name']} ({info['hadiths_count']} hadiths)")
print()
# Test 2: Test ingestion with limit
print("3. Testing limited ingestion (10 hadiths from Sahih Bukhari)...")
stats = service.ingest_collection(
book_slug='sahih-bukhari',
limit=10
)
print(f"✓ Ingestion completed with stats:")
print(f" Processed: {stats['processed']}")
print(f" Failed: {stats['failed']}")
print(f" Skipped: {stats['skipped']}\n")
# Test 3: List books functionality
print("4. Testing book listing...")
print("\n=== Available Books ===\n")
for book_slug, info in book_mapping.items():
print(f"Book Slug: {book_slug}")
print(f" Name: {info['book_name']}")
print(f" Hadiths: {info['hadiths_count']}")
print(f" Chapters: {info['chapters_count']}")
print()
# Clean up
service.close()
print("=== All Tests Passed! ===")
return True
except Exception as e:
print(f"✗ Test failed with error: {e}")
import traceback
traceback.print_exc()
return False
def test_command_line_args():
"""Test command line argument parsing"""
print("=== Testing Command Line Arguments ===\n")
# We'll simulate command line arguments
import argparse
from src.main_hadithapi import main
# Test --list-books argument
print("1. Testing --list-books argument...")
original_argv = sys.argv.copy()
try:
sys.argv = ['main_hadithapi.py', '--list-books']
# We won't actually run main() as it would exit, but we can check the parsing
parser = argparse.ArgumentParser(description="Ingest hadiths from HadithAPI.com")
parser.add_argument("--book-slug", help="Book slug (e.g., sahih-bukhari)")
parser.add_argument("--limit", type=int, help="Limit number of hadiths to ingest")
parser.add_argument("--list-books", action="store_true", help="List available books and exit")
args = parser.parse_args(['--list-books'])
print(f"✓ Argument parsing successful: list_books={args.list_books}")
# Test book-slug argument
args = parser.parse_args(['--book-slug', 'sahih-bukhari', '--limit', '5'])
print(f"✓ Argument parsing successful: book_slug={args.book_slug}, limit={args.limit}")
print("✓ Command line argument parsing works correctly\n")
return True
except Exception as e:
print(f"✗ Argument parsing failed: {e}")
return False
finally:
sys.argv = original_argv
if __name__ == "__main__":
print("Starting tests for main_hadithapi.py...\n")
# Test command line arguments
if not test_command_line_args():
sys.exit(1)
# Test main functionality
if not test_main_hadithapi():
sys.exit(1)
print("\n🎉 All tests passed successfully!")
sys.exit(0)
=== ./setup.py ===
=== ./build-and-push.sh ===
#!/bin/bash
# build-and-push.sh
set -e
# Configuration
IMAGE_NAME="hadith-ingestion"
TAG="${1:-latest}"
DOCKER_REGISTRY="axxs"
REGISTRY="${DOCKER_REGISTRY:-}"
echo "Building Docker image: ${IMAGE_NAME}:${TAG}"
# Build image
docker build -t ${IMAGE_NAME}:${TAG} -f Dockerfile .
# Tag for registry
docker tag ${IMAGE_NAME}:${TAG} ${REGISTRY}/${IMAGE_NAME}:${TAG}
# Push to registry
echo "Pushing to registry: ${REGISTRY}"
docker push ${REGISTRY}/${IMAGE_NAME}:${TAG}
echo "Done!"
=== ./.env ===
# Database
# DATABASE_HOST=postgres.db.svc.cluster.local
DATABASE_HOST = pg.betelgeusebytes.io
DATABASE_PORT=5432
DATABASE_NAME=hadith_db
DATABASE_USER=hadith_ingest
DATABASE_PASSWORD=hadith_ingest
# HadithAPI.com
HADITHAPI_KEY=$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK
# MinIO
MINIO_ENDPOINT=minio.storage.svc.cluster.local:9000
MINIO_ACCESS_KEY=minioadmin
MINIO_SECRET_KEY=minioadmin
# Services
TEI_URL=http://tei.ml.svc.cluster.local
QDRANT_URL=http://qdrant.vector.svc.cluster.local:6333
# Settings
LOG_LEVEL=INFO
API_RATE_LIMIT=30
BATCH_SIZE=100
=== ./build-hadithapi-ingestion.sh ===
#!/bin/bash
# build-hadithapi-ingestion.sh
set -e
IMAGE_NAME="hadith-ingestion"
TAG="v1.0-hadithapi"
echo "Building Docker image for HadithAPI.com ingestion..."
# Build image
docker build -t ${IMAGE_NAME}:${TAG} -f Dockerfile .
# Tag as latest
docker tag ${IMAGE_NAME}:${TAG} ${IMAGE_NAME}:latest
# If you have a registry, push
# docker push your-registry/${IMAGE_NAME}:${TAG}
echo "Build complete: ${IMAGE_NAME}:${TAG}"
=== ./combine.sh ===
find . -type f -name "*.txt" -o -name "production" -o -name "*.py" -o -name "*.yaml" -o -name "Dockerfile" -o -name "*.sh" -o -name "*.env" ! -name "*.md" ! -name "*.xls" ! -name "*.xlsx"| while read file; do
echo "=== $file ===" >> combined.txt
cat "$file" >> combined.txt
echo "" >> combined.txt
done
=== ./test_hadithapi.py ===
#!/usr/bin/env python3
"""
Quick test script for hadithapi_client.py
"""
import sys
from venv import logger
sys.path.insert(0, '/app')
from src.api_clients.hadithapi_client import HadithAPIClient
from config.settings import settings
def test_api_connection():
"""Test basic API connectivity"""
print("=== Testing HadithAPI Client ===\n")
client = HadithAPIClient()
# Test 1: Get books
print("Test 1: Fetching available books...")
try:
books = client.get_books()
print(f"✓ Success! Found {len(books)} books")
for book in books[:3]: # Show first 3
print(f" - {book.get('bookName')} ({book.get('bookSlug')})")
print(f" Hadiths: {book.get('hadiths_count')}, Chapters: {book.get('chapters_count')}")
logger.info(f"Fetched {len(books)} books successfully")
except Exception as e:
print(f"✗ Failed: {e}")
return False
# Test 2: Get chapters for Sahih Bukhari
print("\nTest 2: Fetching chapters for Sahih Bukhari...")
try:
chapters = client.get_chapters('sahih-bukhari')
print(f"✓ Success! Found {len(chapters)} chapters")
if chapters:
print(f" First chapter: {chapters[0].get('chapterEnglish')}")
except Exception as e:
print(f"✗ Failed: {e}")
return False
# Test 3: Fetch first page of hadiths
print("\nTest 3: Fetching first page of hadiths...")
book_id = None
try:
book = client.get_book_by_slug('sahih-bukhari')
if not book:
print("✗ Failed: Book 'sahih-bukhari' not found")
return False
book_id = book.get('id')
page_data = client.get_hadiths_page('sahih-bukhari', page=1, limit=5)
hadiths = page_data.get('hadiths', [])
print(f"✓ Success! Fetched {len(hadiths)} hadiths")
if hadiths:
first = hadiths[0]
print(f" First hadith number: {first.get('hadithNumber')}")
print(f" Arabic text (first 100 chars): {first.get('hadithArabic', '')[:100]}...")
except Exception as e:
print(f"✗ Failed: {e}")
return False
if book_id is None:
print("✗ Failed: Book ID unavailable for iterator test")
return False
# # Test 4: Test iterator (fetch 3 hadiths)
print("\nTest 4: Testing hadith iterator (3 hadiths)...")
try:
count = 0
for hadith in client.iter_all_hadiths_in_book(book_id='sahih-bukhari', book_slug='sahih-bukhari', batch_size=10):
count += 1
print(f" Hadith #{hadith.get('hadithNumber')} is {hadith.get('englishNarrator')} and is {hadith.get('status')} ")
if count >= 3:
break
print(f"✓ Success! Iterator working correctly")
except Exception as e:
print(f"✗ Failed: {e}")
return False
client.close()
print("\n=== All Tests Passed! ===")
return True
if __name__ == "__main__":
success = test_api_connection()
sys.exit(0 if success else 1)
=== ./test-hadithapi-local.sh ===
#!/bin/bash
# test-hadithapi-local.sh
set -e
echo "=== HadithAPI.com Integration Test ==="
# 1. Test API connection
echo "Testing API connection..."
curl -s "https://hadithapi.com/api/books?apiKey=\$2y\$10\$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK" | jq .
# 2. Test database connection
echo -e "\nTesting database connection..."
kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c "SELECT COUNT(*) FROM collections;"
# 3. List available books
echo -e "\nListing available books..."
python src/main_hadithapi.py --list-books
# 4. Test ingestion (limited to 10 hadiths)
echo -e "\nRunning test ingestion (10 hadiths from Sahih Bukhari)..."
python src/main_hadithapi.py --book-slug sahih-muslim --limit 10
# 5. Verify data
echo -e "\nVerifying ingested data..."
kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c "
SELECT
c.name_english,
c.abbreviation,
COUNT(h.id) as hadith_count,
COUNT(DISTINCT b.id) as book_count
FROM collections c
LEFT JOIN hadiths h ON c.id = h.collection_id
LEFT JOIN books b ON h.book_id = b.id
WHERE c.abbreviation = 'bukhari'
GROUP BY c.name_english, c.abbreviation;
"
echo -e "\n=== Test Complete ==="
=== ./simple-pod.yaml ===
apiVersion: v1
kind: Pod
metadata:
name: hadith-ingestion-list-books
namespace: ml
spec:
restartPolicy: Never
containers:
- name: hadith-ingestion
image: axxs/hadith-ingestion:latest
# command: ["python"]
# args: ["/app/src/main_hadithapi.py", "--list-books"]
command: ["sh","-c","sleep infinity"]
env:
- name: DATABASE_HOST
value: "postgres.db.svc.cluster.local"
- name: DATABASE_PORT
value: "5432"
- name: DATABASE_NAME
value: "hadith_db"
- name: DATABASE_USER
value: "hadith_ingest"
- name: DATABASE_PASSWORD
valueFrom:
secretKeyRef:
name: hadith-db-secret
key: password
- name: HADITHAPI_KEY
valueFrom:
secretKeyRef:
name: hadithapi-secret
key: api-key
- name: MINIO_ENDPOINT
value: "minio.storage.svc.cluster.local:9000"
- name: MINIO_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: access-key
- name: MINIO_SECRET_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: secret-key
- name: LOG_LEVEL
value: "INFO"
=== ./argo/workflows/ingest-collection.yaml ===
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: ingest-hadith-collection-
namespace: argo
spec:
entrypoint: ingest-pipeline
# Arguments
arguments:
parameters:
- name: collection
value: "bukhari"
- name: limit
value: "0" # 0 means no limit
# Service account with database access
serviceAccountName: argo-workflow
# Templates
templates:
# ========================================
# Main pipeline
# ========================================
- name: ingest-pipeline
steps:
- - name: ingest-hadiths
template: ingest
arguments:
parameters:
- name: collection
value: "{{workflow.parameters.collection}}"
- name: limit
value: "{{workflow.parameters.limit}}"
- - name: generate-embeddings
template: generate-embeddings
arguments:
parameters:
- name: collection
value: "{{workflow.parameters.collection}}"
- - name: index-qdrant
template: index-qdrant
arguments:
parameters:
- name: collection
value: "{{workflow.parameters.collection}}"
# ========================================
# Ingestion step
# ========================================
- name: ingest
inputs:
parameters:
- name: collection
- name: limit
container:
image: hadith-ingestion:latest
imagePullPolicy: Always
command: [python, /app/src/main_hadithapi.py]
args:
- "{{inputs.parameters.collection}}"
- "--limit={{inputs.parameters.limit}}"
env:
- name: DATABASE_HOST
value: "postgres.db.svc.cluster.local"
- name: DATABASE_PORT
value: "5432"
- name: DATABASE_NAME
value: "hadith_db"
- name: DATABASE_USER
value: "hadith_ingest"
- name: DATABASE_PASSWORD
valueFrom:
secretKeyRef:
name: hadith-db-secret
key: password
- name: MINIO_ENDPOINT
value: "minio.storage.svc.cluster.local:9000"
- name: MINIO_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: access-key
- name: MINIO_SECRET_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: secret-key
- name: SUNNAH_API_KEY
valueFrom:
secretKeyRef:
name: sunnah-api-secret
key: api-key
- name: LOG_LEVEL
value: "INFO"
- name: JOB_NAME
value: "{{workflow.name}}"
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 2
memory: 4Gi
# ========================================
# Embedding generation step
# ========================================
- name: generate-embeddings
inputs:
parameters:
- name: collection
container:
image: hadith-embeddings:latest
imagePullPolicy: Always
command: [python, /app/src/embeddings/generator.py]
args:
- "--collection={{inputs.parameters.collection}}"
- "--batch-size=32"
env:
- name: DATABASE_URL
value: "postgresql://hadith_ingest:$(DATABASE_PASSWORD)@postgres.db.svc.cluster.local:5432/hadith_db"
- name: DATABASE_PASSWORD
valueFrom:
secretKeyRef:
name: hadith-db-secret
key: password
- name: TEI_URL
value: "http://tei.ml.svc.cluster.local"
- name: LOG_LEVEL
value: "INFO"
resources:
requests:
cpu: 1
memory: 2Gi
limits:
cpu: 4
memory: 8Gi
# ========================================
# Qdrant indexing step
# ========================================
- name: index-qdrant
inputs:
parameters:
- name: collection
container:
image: hadith-qdrant-indexer:latest
imagePullPolicy: Always
command: [python, /app/index_qdrant.py]
args:
- "--collection={{inputs.parameters.collection}}"
- "--batch-size=100"
env:
- name: DATABASE_URL
value: "postgresql://hadith_ingest:$(DATABASE_PASSWORD)@postgres.db.svc.cluster.local:5432/hadith_db"
- name: DATABASE_PASSWORD
valueFrom:
secretKeyRef:
name: hadith-db-secret
key: password
- name: QDRANT_URL
value: "http://qdrant.vector.svc.cluster.local:6333"
- name: QDRANT_COLLECTION
value: "hadith_embeddings"
- name: LOG_LEVEL
value: "INFO"
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 2
memory: 4Gi
=== ./argo/workflows/ingest-hadithapi.yaml ===
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: ingest-hadithapi-
namespace: ml
spec:
entrypoint: ingest-pipeline
arguments:
parameters:
- name: book-slug
value: "al-tirmidhi"
- name: limit
value: "0" # 0 means no limit
serviceAccountName: argo-workflow
templates:
# ========================================
# Main pipeline
# ========================================
- name: ingest-pipeline
steps:
- - name: ingest-hadiths
template: ingest
arguments:
parameters:
- name: book-slug
value: "{{workflow.parameters.book-slug}}"
- name: limit
value: "{{workflow.parameters.limit}}"
# ========================================
# Ingestion step
# ========================================
- name: ingest
inputs:
parameters:
- name: book-slug
- name: limit
container:
image: axxs/hadith-ingestion:latest
imagePullPolicy: Always
command: [python, /app/src/main_hadithapi.py]
args:
- "--book-slug={{inputs.parameters.book-slug}}"
- "--limit={{inputs.parameters.limit}}"
env:
- name: DATABASE_HOST
value: "postgres.db.svc.cluster.local"
- name: DATABASE_PORT
value: "5432"
- name: DATABASE_NAME
value: "hadith_db"
- name: DATABASE_USER
value: "hadith_ingest"
- name: DATABASE_PASSWORD
valueFrom:
secretKeyRef:
name: hadith-db-secret
key: password
- name: HADITHAPI_KEY
valueFrom:
secretKeyRef:
name: hadithapi-secret
key: api-key
- name: MINIO_ENDPOINT
value: "minio.storage.svc.cluster.local:9000"
- name: MINIO_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: access-key
- name: MINIO_SECRET_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: secret-key
- name: LOG_LEVEL
value: "INFO"
- name: JOB_NAME
value: "{{workflow.name}}"
- name: API_RATE_LIMIT
value: "30"
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 2
memory: 4Gi
---
# Workflow to ingest ALL books
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: ingest-all-hadithapi-
namespace: ml
spec:
entrypoint: ingest-all-books
serviceAccountName: argo-workflow
arguments:
parameters:
- name: limit-per-book
value: "0" # 0 means no limit
templates:
# ========================================
# Main pipeline - sequential processing
# ========================================
- name: ingest-all-books
steps:
# Process each book sequentially to avoid rate limiting
- - name: sahih-bukhari
template: ingest-book
arguments:
parameters:
- name: book-slug
value: "sahih-bukhari"
- - name: sahih-muslim
template: ingest-book
arguments:
parameters:
- name: book-slug
value: "sahih-muslim"
- - name: sunan-abu-dawood
template: ingest-book
arguments:
parameters:
- name: book-slug
value: "abu-dawood"
- - name: jami-at-tirmidhi
template: ingest-book
arguments:
parameters:
- name: book-slug
value: "al-tirmidhi"
- - name: sunan-an-nasai
template: ingest-book
arguments:
parameters:
- name: book-slug
value: "sunan-nasai"
- - name: sunan-ibn-e-majah
template: ingest-book
arguments:
parameters:
- name: book-slug
value: "ibn-e-majah"
- - name: musnad-ahmad
template: ingest-book
arguments:
parameters:
- name: book-slug
value: "musnad-ahmad"
- - name: al-silsila-sahiha
template: ingest-book
arguments:
parameters:
- name: book-slug
value: "al-silsila-sahiha"
# ========================================
# Book ingestion template
# ========================================
- name: ingest-book
inputs:
parameters:
- name: book-slug
container:
image: axxs/hadith-ingestion:latest
imagePullPolicy: Always
command: [python, /app/src/main_hadithapi.py]
args:
- "--book-slug={{inputs.parameters.book-slug}}"
- "--limit={{workflow.parameters.limit-per-book}}"
env:
- name: DATABASE_HOST
value: "postgres.db.svc.cluster.local"
- name: DATABASE_PASSWORD
valueFrom:
secretKeyRef:
name: hadith-db-secret
key: password
- name: HADITHAPI_KEY
valueFrom:
secretKeyRef:
name: hadithapi-secret
key: api-key
- name: LOG_LEVEL
value: "INFO"
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 2
memory: 4Gi
=== ./argo/workflows/generate-embeddings.yaml ===
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: generate-embeddings-
namespace: ml
spec:
entrypoint: generate
serviceAccountName: argo-workflow
arguments:
parameters:
- name: batch-size
value: "32"
templates:
- name: generate
container:
image: hadith-ingestion:latest
command: [python, /app/src/embeddings/generator.py]
args: ["--batch-size={{workflow.parameters.batch-size}}"]
env:
- name: DATABASE_HOST
value: "pg.betelgeusebytes.io"
- name: DATABASE_PASSWORD
valueFrom:
secretKeyRef:
name: hadith-db-secret
key: password
resources:
requests: {cpu: 2, memory: 4Gi}
limits: {cpu: 4, memory: 8Gi}
=== ./src/database/__init__.py ===
=== ./src/database/connection.py ===
=== ./src/database/repository.py ===
"""
Database repository for hadith data operations
"""
from typing import List, Dict, Any, Optional
import json
from uuid import UUID
import structlog
from sqlalchemy import create_engine, text, select, insert, update
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.exc import IntegrityError
from config.settings import settings
logger = structlog.get_logger()
class HadithRepository:
"""Repository for hadith database operations"""
def __init__(self, database_url: Optional[str] = None):
self.database_url = database_url or settings.DATABASE_URL
self.engine = create_engine(self.database_url, pool_pre_ping=True)
self.SessionLocal = sessionmaker(bind=self.engine)
@staticmethod
def _coerce_uuid(value: Any) -> UUID:
if isinstance(value, UUID):
return value
return UUID(str(value))
def get_session(self) -> Session:
"""Get database session"""
return self.SessionLocal()
# ===== Collections =====
def get_collection_by_abbreviation(self, abbr: str) -> Optional[Dict[str, Any]]:
"""Get collection by abbreviation"""
with self.get_session() as session:
query = text("""
SELECT * FROM collections
WHERE abbreviation = :abbr
""")
result = session.execute(query, {"abbr": abbr}).fetchone()
if result:
return dict(result._mapping)
return None
def get_all_collections(self) -> List[Dict[str, Any]]:
"""Get all collections"""
with self.get_session() as session:
query = text("SELECT * FROM collections ORDER BY name_english")
result = session.execute(query).fetchall()
return [dict(row._mapping) for row in result]
def update_collection_count(self, collection_id: UUID, count: int):
"""Update total hadith count for a collection"""
with self.get_session() as session:
query = text("""
UPDATE collections
SET total_hadiths = :count, updated_at = NOW()
WHERE id = :id
""")
session.execute(query, {"id": str(collection_id), "count": count})
session.commit()
# ===== Books =====
def upsert_book(
self,
collection_id: UUID,
book_number: int,
name_english: Optional[str] = None,
name_arabic: Optional[str] = None,
metadata: Optional[Dict] = None
) -> UUID:
"""Insert or update a book"""
metadata_json = json.dumps(metadata or {})
with self.get_session() as session:
query = text("""
INSERT INTO books (collection_id, book_number, name_english, name_arabic, metadata)
VALUES (:collection_id, :book_number, :name_english, :name_arabic, :metadata)
ON CONFLICT (collection_id, book_number)
DO UPDATE SET
name_english = EXCLUDED.name_english,
name_arabic = EXCLUDED.name_arabic,
metadata = EXCLUDED.metadata
RETURNING id
""")
result = session.execute(query, {
"collection_id": str(collection_id),
"book_number": book_number,
"name_english": name_english,
"name_arabic": name_arabic,
"metadata": metadata_json
})
session.commit()
return self._coerce_uuid(result.fetchone()[0])
def get_book(self, collection_id: UUID, book_number: int) -> Optional[Dict[str, Any]]:
"""Get book by collection and book number"""
with self.get_session() as session:
query = text("""
SELECT * FROM books
WHERE collection_id = :collection_id AND book_number = :book_number
""")
result = session.execute(query, {
"collection_id": str(collection_id),
"book_number": book_number
}).fetchone()
if result:
return dict(result._mapping)
return None
# ===== Hadiths =====
def upsert_hadith(
self,
collection_id: UUID,
hadith_number: int,
arabic_text: str,
book_id: Optional[UUID] = None,
english_text: Optional[str] = None,
urdu_text: Optional[str] = None,
grade: Optional[str] = None,
grade_source: Optional[str] = None,
chapter_name: Optional[str] = None,
source_id: Optional[str] = None,
source_url: Optional[str] = None,
source_metadata: Optional[Dict] = None
) -> UUID:
"""Insert or update a hadith"""
with self.get_session() as session:
query = text("""
INSERT INTO hadiths (
collection_id, book_id, hadith_number,
arabic_text, english_text, urdu_text,
grade, grade_source, chapter_name,
source_id, source_url, source_metadata
)
VALUES (
:collection_id, :book_id, :hadith_number,
:arabic_text, :english_text, :urdu_text,
:grade, :grade_source, :chapter_name,
:source_id, :source_url, :source_metadata
)
ON CONFLICT (collection_id, book_id, hadith_number)
DO UPDATE SET
arabic_text = EXCLUDED.arabic_text,
english_text = EXCLUDED.english_text,
urdu_text = EXCLUDED.urdu_text,
grade = EXCLUDED.grade,
grade_source = EXCLUDED.grade_source,
chapter_name = EXCLUDED.chapter_name,
source_url = EXCLUDED.source_url,
source_metadata = EXCLUDED.source_metadata,
updated_at = NOW()
RETURNING id
""")
metadata_json = json.dumps(source_metadata or {})
result = session.execute(query, {
"collection_id": str(collection_id),
"book_id": str(book_id) if book_id else None,
"hadith_number": hadith_number,
"arabic_text": arabic_text,
"english_text": english_text,
"urdu_text": urdu_text,
"grade": grade,
"grade_source": grade_source,
"chapter_name": chapter_name,
"source_id": source_id,
"source_url": source_url,
"source_metadata": metadata_json
})
session.commit()
return self._coerce_uuid(result.fetchone()[0])
def get_hadiths_without_embeddings(
self,
limit: int = 100,
collection_id: Optional[UUID] = None
) -> List[Dict[str, Any]]:
"""Get hadiths that need embedding generation"""
with self.get_session() as session:
if collection_id:
query = text("""
SELECT * FROM hadiths
WHERE embedding_generated = FALSE
AND collection_id = :collection_id
ORDER BY created_at ASC
LIMIT :limit
""")
result = session.execute(query, {
"collection_id": str(collection_id),
"limit": limit
}).fetchall()
else:
query = text("""
SELECT * FROM hadiths
WHERE embedding_generated = FALSE
ORDER BY created_at ASC
LIMIT :limit
""")
result = session.execute(query, {"limit": limit}).fetchall()
return [dict(row._mapping) for row in result]
def mark_embedding_generated(self, hadith_id: UUID, version: str = "v1"):
"""Mark hadith as having embedding generated"""
with self.get_session() as session:
# Prepare the update query
query = text("""
UPDATE hadiths
SET embedding_generated = TRUE,
embedding_version = :version,
updated_at = NOW()
WHERE id = :id
""")
# Pre-serialize parameters (keeping consistent with other methods that
# serialize payloads/configs before execution)
params = {"id": str(hadith_id), "version": version}
session.execute(query, {"id": str(hadith_id), "version": version})
session.commit()
# ===== Ingestion Jobs =====
def create_ingestion_job(
self,
job_name: str,
job_type: str,
source_name: str,
config: Optional[Dict] = None
) -> UUID:
"""Create a new ingestion job"""
with self.get_session() as session:
query = text("""
INSERT INTO ingestion_jobs (job_name, job_type, source_name, config, status, started_at)
VALUES (:job_name, :job_type, :source_name, :config, 'running', NOW())
RETURNING id
""")
# serialize config as JSON for storage
result = session.execute(query, {
"job_name": job_name,
"job_type": job_type,
"source_name": source_name,
"config": json.dumps(config or {})
})
session.commit()
job_id = result.fetchone()[0]
return job_id if isinstance(job_id, UUID) else UUID(str(job_id))
def update_job_progress(
self,
job_id: UUID,
total: Optional[int] = None,
processed: Optional[int] = None,
failed: Optional[int] = None,
skipped: Optional[int] = None
):
"""Update job progress counters"""
with self.get_session() as session:
updates = []
params = {"job_id": str(job_id)}
if total is not None:
updates.append("total_records = :total")
params["total"] = total
if processed is not None:
updates.append("processed_records = :processed")
params["processed"] = processed
if failed is not None:
updates.append("failed_records = :failed")
params["failed"] = failed
if skipped is not None:
updates.append("skipped_records = :skipped")
params["skipped"] = skipped
if updates:
query_str = f"""
UPDATE ingestion_jobs
SET {', '.join(updates)}
WHERE id = :job_id
"""
session.execute(text(query_str), params)
session.commit()
def complete_job(
self,
job_id: UUID,
status: str = "success",
error_message: Optional[str] = None
):
"""Mark job as completed"""
with self.get_session() as session:
query = text("""
UPDATE ingestion_jobs
SET status = :status,
completed_at = NOW(),
duration_seconds = EXTRACT(EPOCH FROM (NOW() - started_at)),
error_message = :error_message
WHERE id = :job_id
""")
session.execute(query, {
"job_id": str(job_id),
"status": status,
"error_message": error_message
})
session.commit()
def add_processing_log(
self,
job_id: UUID,
level: str,
message: str,
details: Optional[Dict] = None
):
"""Add a processing log entry"""
with self.get_session() as session:
query = text("""
INSERT INTO processing_logs (job_id, log_level, message, details)
VALUES (:job_id, :level, :message, :details)
""")
details_json = json.dumps(details or {})
session.execute(query, {
"job_id": str(job_id),
"level": level,
"message": message,
"details": details_json
})
session.commit()
# ===== Statistics =====
def get_collection_stats(self, collection_id: UUID) -> Dict[str, Any]:
"""Get statistics for a collection"""
with self.get_session() as session:
query = text("""
SELECT * FROM get_collection_statistics(:collection_id)
""")
result = session.execute(query, {"collection_id": str(collection_id)}).fetchone()
if result:
return dict(result._mapping)
return {}
=== ./src/embeddings/__init__.py ===
=== ./src/embeddings/generator.py ===
# Update: src/embeddings/generator.py
"""
Embedding generation service for hadith texts
"""
import asyncio
import httpx
from typing import List, Tuple, Optional
import psycopg2
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import structlog
from tqdm import tqdm
import sys
import argparse
from config.settings import settings
logger = structlog.get_logger()
class EmbeddingGenerator:
def __init__(self, database_url: str, tei_url: str, qdrant_url: str, batch_size: int = 32):
self.database_url = database_url
self.tei_url = tei_url
self.qdrant_url = qdrant_url
self.batch_size = batch_size
self.http_client = httpx.AsyncClient(timeout=60.0)
self.qdrant = QdrantClient(url=qdrant_url)
async def generate_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings using TEI"""
response = await self.http_client.post(
f"{self.tei_url}/embed",
json={"inputs": texts}
)
response.raise_for_status()
return response.json()
def create_collection(self, name: str = "hadith_embeddings"):
"""Create Qdrant collection"""
try:
self.qdrant.get_collection(name)
except:
self.qdrant.create_collection(
collection_name=name,
vectors_config=VectorParams(size=1024, distance=Distance.COSINE)
)
async def process_batch(self, conn, hadiths: List[Tuple], collection: str):
"""Process batch: generate embeddings & store"""
texts = [f"{h[1]} {h[2] or ''}" for h in hadiths] # arabic + english
embeddings = await self.generate_embeddings_batch(texts)
points = [
PointStruct(
id=str(h[0]),
vector=emb,
payload={"hadith_id": str(h[0]), "collection_id": str(h[4])}
)
for h, emb in zip(hadiths, embeddings)
]
self.qdrant.upsert(collection_name=collection, points=points)
# Mark completed
cursor = conn.cursor()
ids = [str(h[0]) for h in hadiths]
cursor.execute(
"UPDATE hadiths SET embedding_generated = TRUE, embedding_version = 'v1' WHERE id = ANY(%s)",
(ids,)
)
conn.commit()
cursor.close()
return len(points)
async def generate_all(self, collection: str = "hadith_embeddings"):
"""Generate embeddings for all hadiths"""
self.create_collection(collection)
conn = psycopg2.connect(self.database_url)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM hadiths WHERE embedding_generated = FALSE")
total = cursor.fetchone()[0]
cursor.close()
if total == 0:
print("All hadiths already have embeddings!")
return
print(f"Generating embeddings for {total} hadiths...")
processed = 0
with tqdm(total=total) as pbar:
while True:
cursor = conn.cursor()
cursor.execute("""
SELECT id, arabic_text, english_text, urdu_text, collection_id
FROM hadiths
WHERE embedding_generated = FALSE
LIMIT 1000
""")
hadiths = cursor.fetchall()
cursor.close()
if not hadiths:
break
for i in range(0, len(hadiths), self.batch_size):
batch = hadiths[i:i+self.batch_size]
try:
count = await self.process_batch(conn, batch, collection)
processed += count
pbar.update(count)
except Exception as e:
logger.error("batch_failed", error=str(e))
conn.close()
print(f"\nCompleted! Generated {processed} embeddings.")
async def close(self):
await self.http_client.aclose()
async def main():
parser = argparse.ArgumentParser()
parser.add_argument("--batch-size", type=int, default=32)
args = parser.parse_args()
gen = EmbeddingGenerator(
database_url=settings.DATABASE_URL,
tei_url="http://tei.ml.svc.cluster.local",
qdrant_url="http://qdrant.vector.svc.cluster.local:6333",
batch_size=args.batch_size
)
try:
await gen.generate_all()
return 0
except Exception as e:
logger.error("generation_failed", error=str(e))
return 1
finally:
await gen.close()
if __name__ == "__main__":
sys.exit(asyncio.run(main()))
=== ./src/main_hadithapi.py ===
"""
Main ingestion script for fetching hadiths from HadithAPI.com
"""
import sys
from pathlib import Path
import argparse
from typing import Optional, Dict, Any
from uuid import UUID
import structlog
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from config.settings import settings
from src.api_clients.hadithapi_client import HadithAPIClient
from src.database.repository import HadithRepository
from src.processors.text_cleaner import ArabicTextProcessor, TextCleaner
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
# Book slug to collection abbreviation mapping
BOOK_SLUG_MAPPING = {
'sahih-bukhari': 'bukhari',
'sahih-muslim': 'muslim',
'abu-dawood': 'abudawud',
'al-tirmidhi': 'tirmidhi',
'sunan-nasai': 'nasai',
'ibn-e-majah': 'ibnmajah',
'muwatta-imam-malik': 'malik',
'musnad-ahmad': 'ahmad',
'sunan-ad-darimi': 'darimi',
'mishkat': 'mishkat',
'al-silsila-sahiha': 'al-silsila-sahiha'
}
class HadithAPIIngestionService:
"""Service for ingesting hadiths from HadithAPI.com"""
def __init__(self):
self.api_client = HadithAPIClient()
self.repo = HadithRepository()
self.text_processor = ArabicTextProcessor()
self.text_cleaner = TextCleaner()
def sync_books_from_api(self) -> Dict[str, Any]:
"""
Sync book metadata from API to database
Returns:
Dictionary mapping book_slug -> collection_id
"""
logger.info("syncing_books_from_api")
# Get books from API
api_books = self.api_client.get_books()
book_mapping = {}
# print(BOOK_SLUG_MAPPING)
for api_book in api_books:
book_slug = api_book.get('bookSlug')
# print(book_slug)
# Map to our collection abbreviation
collection_abbr = BOOK_SLUG_MAPPING.get(book_slug)
if not collection_abbr:
logger.warning(
"unmapped_book",
book_slug=book_slug,
book_name=api_book.get('bookName')
)
continue
# Get or verify collection exists in database
collection = self.repo.get_collection_by_abbreviation(collection_abbr)
if not collection:
logger.warning(
"collection_not_in_db",
abbreviation=collection_abbr,
book_slug=book_slug
)
continue
collection_id = collection['id']
if not isinstance(collection_id, UUID):
collection_id = UUID(str(collection_id))
book_mapping[book_slug] = {
'collection_id': collection_id,
'book_id': api_book.get('id'),
'book_name': api_book.get('bookName'),
'hadiths_count': api_book.get('hadiths_count'),
'chapters_count': api_book.get('chapters_count')
}
logger.info(
"book_mapped",
book_slug=book_slug,
collection_abbr=collection_abbr,
hadiths_count=api_book.get('hadiths_count')
)
logger.info(
"books_synced",
total_books=len(book_mapping)
)
return book_mapping
def ingest_collection(
self,
book_slug: str,
limit: Optional[int] = None
) -> dict:
"""
Ingest entire collection from HadithAPI.com
Args:
book_slug: Book slug identifier (e.g., 'sahih-bukhari')
limit: Optional limit on number of hadiths to ingest
Returns:
Statistics dictionary
"""
logger.info(
"ingestion_started",
book_slug=book_slug,
limit=limit
)
# Get book mapping
book_mapping = self.sync_books_from_api()
logger.info("containing books", book_mapping=book_mapping)
logger.info("Book slugs", book_slug=book_slug)
if book_slug not in book_mapping:
logger.error(
"book_not_mapped",
book_slug=book_slug,
available_books=list(book_mapping.keys())
)
raise ValueError(f"Book '{book_slug}' not found or not mapped")
book_info = book_mapping[book_slug]
collection_id = book_info['collection_id']
# book_id = book_info['book_id']
book_id = book_slug
# Create ingestion job
job_id = self.repo.create_ingestion_job(
job_name=f"ingest_{book_slug}",
job_type="api_fetch",
source_name="hadithapi.com",
config={
"book_slug": book_slug,
"book_id": book_id,
"limit": limit
}
)
logger.info(
"job_created",
job_id=str(job_id),
book_slug=book_slug,
expected_count=book_info.get('hadiths_count')
)
stats = {
"processed": 0,
"failed": 0,
"skipped": 0
}
try:
# Iterate through all hadiths in book
for hadith_data, chapter_data in self.api_client.iter_all_hadiths_in_book_with_chapters(
book_id=book_id,
book_slug=book_slug,
batch_size=100
):
# Check limit
if limit and stats["processed"] >= limit:
logger.info("limit_reached", limit=limit)
break
try:
# Process and store hadith
self._process_and_store_hadith(
collection_id=collection_id,
hadith_data=hadith_data,
chapter_data=chapter_data
)
stats["processed"] += 1
# Update job progress every 100 hadiths
if stats["processed"] % 100 == 0:
self.repo.update_job_progress(
job_id=job_id,
processed=stats["processed"],
failed=stats["failed"],
skipped=stats["skipped"]
)
logger.info(
"progress_update",
book_slug=book_slug,
processed=stats["processed"],
failed=stats["failed"],
percentage=round(
(stats["processed"] / int(book_info.get('hadiths_count', 1))) * 100,
2
) if book_info.get('hadiths_count') else 0
)
except Exception as e:
stats["failed"] += 1
logger.error(
"hadith_processing_failed",
error=str(e),
hadith_number=hadith_data.get("hadithNumber"),
hadith_id=hadith_data.get("id")
)
self.repo.add_processing_log(
job_id=job_id,
level="ERROR",
message=f"Failed to process hadith: {str(e)}",
details={"hadith_data": hadith_data}
)
# Update final job progress
self.repo.update_job_progress(
job_id=job_id,
total=stats["processed"] + stats["failed"] + stats["skipped"],
processed=stats["processed"],
failed=stats["failed"],
skipped=stats["skipped"]
)
# Mark job as complete
self.repo.complete_job(job_id=job_id, status="success")
# Update collection count
self.repo.update_collection_count(
collection_id=collection_id,
count=stats["processed"]
)
logger.info(
"ingestion_completed",
book_slug=book_slug,
stats=stats
)
return stats
except Exception as e:
logger.error(
"ingestion_failed",
book_slug=book_slug,
error=str(e),
exc_info=True
)
self.repo.complete_job(
job_id=job_id,
status="failed",
error_message=str(e)
)
raise
def _process_and_store_hadith(
self,
collection_id: UUID,
hadith_data: dict,
chapter_data: Optional[dict]
):
"""Process and store a single hadith"""
# Extract hadith number
hadith_number = hadith_data.get("hadithNumber")
if not hadith_number:
raise ValueError("Missing hadith number")
# Convert to integer
try:
hadith_number = int(hadith_number)
except (ValueError, TypeError):
raise ValueError(f"Invalid hadith number: {hadith_number}")
# Extract text in multiple languages
arabic_text = hadith_data.get("hadithArabic")
english_text = hadith_data.get("hadithEnglish")
urdu_text = hadith_data.get("hadithUrdu")
if not arabic_text:
raise ValueError("Missing Arabic text")
# passed logger.warning("Arabic text extracted and validated", hadith_number=hadith_number)
# Clean texts
arabic_text = self.text_cleaner.clean_text(arabic_text)
if english_text:
english_text = self.text_cleaner.clean_text(english_text)
if urdu_text:
urdu_text = self.text_cleaner.clean_text(urdu_text)
# Extract grade/status
grade = hadith_data.get("status")
# Get or create chapter (book in our schema)
book_id = None
chapter_name = None
# logger.warning("Processing chapter data####", chapter_data.get('positional_args', {}).get('id'))
# logger.info("Processing chapter data####2####", chapter_data.get('id'))
if chapter_data:
chapter_id = chapter_data.get('id')
chapter_number = chapter_data.get('chapterNumber')
chapter_name_en = chapter_data.get('chapterEnglish')
chapter_name_ar = chapter_data.get('chapterArabic')
chapter_name = chapter_name_en
# print(chapter_number, chapter_name)
if chapter_number:
try:
chapter_number = int(chapter_number)
except (ValueError, TypeError):
chapter_number = chapter_id # Fallback to ID
# Get or create book (chapter in HadithAPI = book in our schema)
existing_book = self.repo.get_book(collection_id, chapter_number)
# logger.warning("EXISTING BOOK : ", existing_book)
# logger.warning("Fetched or created book", collection_id=collection_id, chapter_number=chapter_number, chapter_name=chapter_name)
if not existing_book:
book_id = self.repo.upsert_book(
collection_id=collection_id,
book_number=chapter_number,
name_english=chapter_name_en,
name_arabic=chapter_name_ar,
metadata=chapter_data
)
else:
# print(existing_book['id'])
existing_id = existing_book['id']
if isinstance(existing_id, str):
existing_id = UUID(existing_id)
book_id = existing_id
# Build source metadata
source_metadata = {
'api_id': hadith_data.get('id'),
'englishNarrator': hadith_data.get('englishNarrator'),
'urduNarrator': hadith_data.get('urduNarrator'),
'bookSlug': hadith_data.get('bookSlug'),
'chapterId': hadith_data.get('chapterId'),
'chapter': chapter_data
}
# logger.warning("Hadith metadata built", source_metadata)
# Store hadith
hadith_id = self.repo.upsert_hadith(
collection_id=collection_id,
book_id=book_id,
hadith_number=hadith_number,
arabic_text=arabic_text,
english_text=english_text,
urdu_text=urdu_text,
grade=grade,
grade_source="hadithapi.com",
chapter_name=chapter_name,
source_id=str(hadith_data.get('id', '')),
source_url=f"https://hadithapi.com/hadith/{hadith_data.get('id')}",
source_metadata=source_metadata
)
logger.debug(
"hadith_stored",
hadith_id=str(hadith_id),
hadith_number=hadith_number,
chapter_id=chapter_data.get('id') if chapter_data else None
)
def ingest_all_books(self, limit_per_book: Optional[int] = None) -> Dict[str, dict]:
"""
Ingest all available books
Args:
limit_per_book: Optional limit per book
Returns:
Dictionary of book_slug -> stats
"""
logger.info("ingesting_all_books", limit_per_book=limit_per_book)
book_mapping = self.sync_books_from_api()
results = {}
for book_slug in book_mapping.keys():
logger.info("starting_book", book_slug=book_slug)
try:
stats = self.ingest_collection(
book_slug=book_slug,
limit=limit_per_book
)
results[book_slug] = {"status": "success", "stats": stats}
except Exception as e:
logger.error(
"book_ingestion_failed",
book_slug=book_slug,
error=str(e)
)
results[book_slug] = {"status": "failed", "error": str(e)}
logger.info("all_books_completed", results=results)
return results
def close(self):
"""Close connections"""
self.api_client.close()
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description="Ingest hadiths from HadithAPI.com"
)
parser.add_argument(
"--book-slug",
help="Book slug (e.g., sahih-bukhari). If not provided, ingests all books."
)
parser.add_argument(
"--limit",
type=int,
help="Limit number of hadiths to ingest per book"
)
parser.add_argument(
"--list-books",
action="store_true",
help="List available books and exit"
)
args = parser.parse_args()
try:
service = HadithAPIIngestionService()
# List books mode
if args.list_books:
logger.info("listing_available_books")
book_mapping = service.sync_books_from_api()
print("\n=== Available Books ===\n")
for book_slug, info in book_mapping.items():
print(f"Book Slug: {book_slug}")
print(f" Name: {info['book_name']}")
print(f" Hadiths: {info['hadiths_count']}")
print(f" Chapters: {info['chapters_count']}")
print()
service.close()
return 0
# Ingest mode
if args.book_slug:
logger.info(
"script_started",
book_slug=args.book_slug,
limit=args.limit
)
stats = service.ingest_collection(
book_slug=args.book_slug,
limit=args.limit
)
logger.info("script_completed", stats=stats)
print(f"\n=== Ingestion completed for {args.book_slug} ===")
print(f"Processed: {stats['processed']}")
print(f"Failed: {stats['failed']}")
print(f"Skipped: {stats['skipped']}")
else:
# Ingest all books
logger.info("script_started_all_books", limit_per_book=args.limit)
results = service.ingest_all_books(limit_per_book=args.limit)
print("\n=== All Books Ingestion Summary ===\n")
for book_slug, result in results.items():
print(f"{book_slug}: {result['status']}")
if result['status'] == 'success':
stats = result['stats']
print(f" Processed: {stats['processed']}")
print(f" Failed: {stats['failed']}")
else:
print(f" Error: {result['error']}")
print()
service.close()
return 0
except Exception as e:
logger.error(
"script_failed",
error=str(e),
exc_info=True
)
print(f"\nIngestion failed: {str(e)}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())
=== ./src/__init__.py ===
=== ./src/utils/__init__.py ===
=== ./src/utils/logger.py ===
=== ./src/utils/retry.py ===
=== ./src/processors/validator.py ===
=== ./src/processors/arabic_normalizer.py ===
=== ./src/processors/__init__.py ===
=== ./src/processors/text_cleaner.py ===
"""
Text cleaning and normalization utilities
"""
import re
from typing import Optional
import unicodedata
import structlog
logger = structlog.get_logger()
class ArabicTextProcessor:
"""Process and normalize Arabic text"""
# Arabic diacritics to remove
DIACRITICS = re.compile(
r'[\u064B-\u065F\u0670\u06D6-\u06DC\u06DF-\u06E8\u06EA-\u06ED]'
)
# Tatweel (elongation character)
TATWEEL = '\u0640'
# Normalize Arabic letters
ALEF_VARIANTS = re.compile(r'[إأآا]')
ALEF_MAKSURA = 'ى'
YAA = 'ي'
TAA_MARBUTA = 'ة'
HAA = 'ه'
@classmethod
def remove_diacritics(cls, text: str) -> str:
"""Remove Arabic diacritics (tashkeel)"""
if not text:
return text
return cls.DIACRITICS.sub('', text)
@classmethod
def remove_tatweel(cls, text: str) -> str:
"""Remove tatweel (elongation) character"""
if not text:
return text
return text.replace(cls.TATWEEL, '')
@classmethod
def normalize_alef(cls, text: str) -> str:
"""Normalize all Alef variants to bare Alef"""
if not text:
return text
return cls.ALEF_VARIANTS.sub('ا', text)
@classmethod
def normalize_yaa(cls, text: str) -> str:
"""Normalize Alef Maksura to Yaa"""
if not text:
return text
return text.replace(cls.ALEF_MAKSURA, cls.YAA)
@classmethod
def normalize_taa_marbuta(cls, text: str) -> str:
"""Normalize Taa Marbuta to Haa"""
if not text:
return text
return text.replace(cls.TAA_MARBUTA, cls.HAA)
@classmethod
def normalize_whitespace(cls, text: str) -> str:
"""Normalize whitespace"""
if not text:
return text
# Replace multiple spaces with single space
text = re.sub(r'\s+', ' ', text)
# Trim
return text.strip()
@classmethod
def normalize_full(cls, text: str) -> str:
"""
Apply full normalization:
- Remove diacritics
- Remove tatweel
- Normalize Alef variants
- Normalize Yaa
- Normalize Taa Marbuta
- Normalize whitespace
"""
if not text:
return text
text = cls.remove_diacritics(text)
text = cls.remove_tatweel(text)
text = cls.normalize_alef(text)
text = cls.normalize_yaa(text)
text = cls.normalize_taa_marbuta(text)
text = cls.normalize_whitespace(text)
return text
@classmethod
def extract_sanad_matn(cls, text: str) -> tuple[Optional[str], Optional[str]]:
"""
Attempt to extract sanad (chain) and matn (text) from hadith
Common patterns:
- حدثنا ... قال ... (sanad ends before reported speech)
- Simple heuristic: Split on first occurrence of قال or أن
Returns:
Tuple of (sanad, matn) or (None, None) if cannot split
"""
if not text:
return None, None
# Look for common sanad-matn separators
separators = [
r'قال\s*رسول\s*الله', # "The Messenger of Allah said"
r'قال\s*النبي', # "The Prophet said"
r'عن\s*النبي', # "From the Prophet"
r'أن\s*رسول\s*الله', # "That the Messenger of Allah"
]
for pattern in separators:
match = re.search(pattern, text, re.IGNORECASE)
if match:
split_pos = match.start()
sanad = text[:split_pos].strip()
matn = text[split_pos:].strip()
logger.debug(
"sanad_matn_extracted",
sanad_length=len(sanad),
matn_length=len(matn)
)
return sanad, matn
# Could not split
logger.debug("sanad_matn_extraction_failed")
return None, None
class TextCleaner:
"""General text cleaning utilities"""
@staticmethod
def clean_html(text: str) -> str:
"""Remove HTML tags"""
if not text:
return text
return re.sub(r'<[^>]+>', '', text)
@staticmethod
def normalize_unicode(text: str) -> str:
"""Normalize Unicode (NFC normalization)"""
if not text:
return text
return unicodedata.normalize('NFC', text)
@staticmethod
def clean_text(text: str) -> str:
"""Apply general cleaning"""
if not text:
return text
# Remove HTML
text = TextCleaner.clean_html(text)
# Normalize Unicode
text = TextCleaner.normalize_unicode(text)
# Normalize whitespace
text = ArabicTextProcessor.normalize_whitespace(text)
return text
=== ./src/api_clients/base_client.py ===
"""
Base API client with retry logic and rate limiting
"""
import httpx
import time
from typing import Optional, Dict, Any
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
import structlog
from config.settings import settings
logger = structlog.get_logger()
class BaseAPIClient:
"""Base class for API clients with built-in retry and rate limiting"""
def __init__(
self,
base_url: str,
api_key: Optional[str] = None,
rate_limit: int = 90,
timeout: int = 30
):
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.rate_limit = rate_limit
self.timeout = timeout
# Rate limiting
self.request_times = []
self.min_interval = 60.0 / rate_limit # seconds between requests
# HTTP client
self.client = httpx.Client(timeout=timeout)
logger.info(
"api_client_initialized",
base_url=base_url,
rate_limit=rate_limit
)
def _wait_for_rate_limit(self):
"""Implement rate limiting"""
now = time.time()
# Remove old timestamps (older than 1 minute)
self.request_times = [t for t in self.request_times if now - t < 60]
# If we're at the limit, wait
if len(self.request_times) >= self.rate_limit:
sleep_time = 60 - (now - self.request_times[0])
if sleep_time > 0:
logger.info(
"rate_limit_wait",
sleep_seconds=sleep_time,
requests_in_window=len(self.request_times)
)
time.sleep(sleep_time)
self.request_times = []
# Add current timestamp
self.request_times.append(time.time())
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)),
reraise=True
)
def _make_request(
self,
method: str,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""Make HTTP request with retry logic"""
# Rate limiting
self._wait_for_rate_limit()
# Prepare headers
request_headers = headers or {}
if self.api_key:
request_headers['X-API-Key'] = self.api_key
# Make request
url = f"{self.base_url}/{endpoint.lstrip('/')}"
logger.debug(
"api_request",
method=method,
url=url,
params=params
)
response = self.client.request(
method=method,
url=url,
params=params,
headers=request_headers
)
response.raise_for_status()
logger.debug(
"api_response",
status_code=response.status_code,
response_size=len(response.content)
)
return response.json()
def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
"""Make GET request"""
return self._make_request("GET", endpoint, params=params)
def close(self):
"""Close the HTTP client"""
self.client.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
=== ./src/api_clients/hadith_one_client.py ===
=== ./src/api_clients/__init__.py ===
=== ./src/api_clients/hadithapi_client.py ===
"""
Client for HadithAPI.com API
"""
from typing import List, Dict, Any, Optional, Generator, Tuple
import structlog
from .base_client import BaseAPIClient
from config.settings import settings
logger = structlog.get_logger()
class HadithAPIClient(BaseAPIClient):
"""Client for interacting with hadithapi.com API"""
def __init__(self, api_key: Optional[str] = None):
super().__init__(
base_url="https://hadithapi.com/api",
api_key=api_key or settings.HADITHAPI_KEY,
rate_limit=30 # Conservative: 30 req/min
)
def _add_api_key(self, params: Optional[Dict] = None) -> Dict:
"""Add API key to request parameters"""
params = params or {}
params['apiKey'] = self.api_key
return params
def get_books(self) -> List[Dict[str, Any]]:
"""
Get list of all available books/collections
Returns:
List of book dictionaries
"""
logger.info("fetching_books")
params = self._add_api_key()
response = self.get("books", params=params)
if response.get('status') != 200:
logger.error(
"api_error",
status=response.get('status'),
message=response.get('message')
)
raise Exception(f"API Error: {response.get('message')}")
books = response.get('books', [])
logger.info(
"books_fetched",
count=len(books)
)
return books
def get_chapters(self, book_slug: str) -> List[Dict[str, Any]]:
"""
Get chapters for a specific book
Args:
book_slug: Book slug identifier (e.g., 'sahih-bukhari')
Returns:
List of chapter dictionaries
"""
logger.info(
"fetching_chapters",
book_slug=book_slug
)
params = self._add_api_key()
response = self.get(f"{book_slug}/chapters", params=params)
if response.get('status') != 200:
logger.error(
"api_error",
status=response.get('status'),
message=response.get('message')
)
raise Exception(f"API Error: {response.get('message')}")
chapters = response.get('chapters', [])
logger.info(
"chapters_fetched",
book_slug=book_slug,
count=len(chapters)
)
return chapters
def get_hadiths_page(
self,
book_id: int,
chapter_id: Optional[int] = None,
page: int = 1,
limit: int = 100
) -> Dict[str, Any]:
"""
Get a page of hadiths
Args:
book_id: Book ID
chapter_id: Optional chapter ID to filter by
page: Page number (1-indexed)
limit: Results per page (max 100)
Returns:
Response dictionary with hadiths and pagination info
"""
params = self._add_api_key({
'book': book_id,
'page': page,
'limit': min(limit, 100) # Enforce max limit
})
if chapter_id:
params['chapter'] = chapter_id
logger.debug(
"fetching_hadiths_page",
book_id=book_id,
chapter_id=chapter_id,
page=page,
limit=limit
)
response = self.get("hadiths", params=params)
# logger.debug(
# "fetching_hadiths_page####",
# response=response
# )
if response.get('status') != 200:
logger.error(
"api_error",
status=response.get('status'),
message=response.get('message')
)
raise Exception(f"API Error: {response.get('message')}")
return response.get('hadiths', {})
def iter_all_hadiths_in_book(
self,
book_id: int,
book_slug: str,
chapter_id: Optional[int] = None,
batch_size: int = 100
) -> Generator[Dict[str, Any], None, None]:
"""
Iterator that yields all hadiths in a book, handling pagination automatically
Args:
book_id: Book ID
book_slug: Book slug for logging
chapter_id: Optional chapter ID to filter by
batch_size: Number of hadiths to fetch per request (max 100)
Yields:
Individual hadith dictionaries
"""
page = 1
total_fetched = 0
while True:
response_data = self.get_hadiths_page(
book_id=book_slug,
chapter_id=chapter_id,
page=page,
limit=batch_size
)
hadiths = response_data.get('data', [])
pagination = response_data.get('pagination', {})
# logger.info(
# "book_complete",
# book_slug=book_slug,
# hadiths=hadiths,
# pagination=pagination,
# response = response_data
# )
if not hadiths:
logger.info(
"book_complete",
book_slug=book_slug,
chapter_id=chapter_id,
total_hadiths=total_fetched
)
break
for hadith in hadiths:
yield hadith
total_fetched += 1
# Log progress
if total_fetched % 500 == 0:
logger.info(
"progress",
book_slug=book_slug,
fetched=total_fetched,
total=response_data.get('total', '?')
)
# Check if there are more pages
current_page = response_data.get('current_page', page)
last_page = response_data.get('last_page', 1)
if current_page >= last_page:
logger.info(
"book_complete",
book_slug=book_slug,
total_hadiths=total_fetched,
total_pages=last_page
)
break
page += 1
def iter_all_hadiths_in_book_with_chapters(
self,
book_id: int,
book_slug: str,
batch_size: int = 100
) -> Generator[Tuple[Dict[str, Any], Optional[Dict[str, Any]]], None, None]:
"""
Iterator that yields all hadiths in a book, organized by chapter
Args:
book_id: Book ID
book_slug: Book slug
batch_size: Number of hadiths to fetch per request
Yields:
Tuple of (hadith_dict, chapter_dict or None)
"""
# First, get all chapters
try:
chapters = self.get_chapters(book_slug)
except Exception as e:
logger.warning(
"chapters_fetch_failed",
book_slug=book_slug,
error=str(e),
fallback="fetching_all_hadiths_without_chapter_filter"
)
# Fallback: fetch all hadiths without chapter filter
for hadith in self.iter_all_hadiths_in_book(
book_id=book_id,
book_slug=book_slug,
batch_size=batch_size
):
chapter_info = hadith.get('chapter')
yield hadith, chapter_info
return
logger.info(
"starting_chapter_by_chapter_fetch",
book_slug=book_slug,
total_chapters=len(chapters)
)
# Process each chapter
for chapter in chapters:
# logger.warning("Processing chapter", chapter=chapter)
if book_slug in {'sahih-muslim','al-tirmidhi','al-silsila-sahiha','abu-dawood','sunan-nasai','ibn-e-majah','mishkat'}:
chapter_id = chapter.get('chapterNumber')
else:
chapter_id = chapter.get('id')
chapter_number = chapter.get('chapterNumber')
print(chapter_id, chapter_number, chapter.get('name'))
logger.info(
"fetching_chapter",
book_slug=book_slug,
chapter_id=chapter_id,
chapter_number=chapter_number
)
try:
for hadith in self.iter_all_hadiths_in_book(
book_id=book_id,
book_slug=book_slug,
chapter_id=chapter_id,
batch_size=batch_size
):
yield hadith, chapter
except Exception as e:
logger.error(
"chapter_fetch_failed",
book_slug=book_slug,
chapter_id=chapter_id,
error=str(e)
)
continue
def get_book_by_slug(self, book_slug: str) -> Optional[Dict[str, Any]]:
"""
Get book details by slug
Args:
book_slug: Book slug identifier
Returns:
Book dictionary or None if not found
"""
books = self.get_books()
for book in books:
if book.get('bookSlug') == book_slug:
return book
return None
=== ./src/api_clients/sunnah_client.py ===
"""
Client for Sunnah.com API
"""
from typing import List, Dict, Any, Optional, Generator
import structlog
from .base_client import BaseAPIClient
from config.settings import settings
logger = structlog.get_logger()
class SunnahAPIClient(BaseAPIClient):
"""Client for interacting with Sunnah.com API"""
def __init__(self, api_key: Optional[str] = None):
super().__init__(
base_url=settings.SUNNAH_BASE_URL,
api_key=api_key or settings.SUNNAH_API_KEY,
rate_limit=settings.API_RATE_LIMIT
)
def get_collections(self) -> List[Dict[str, Any]]:
"""
Get list of all hadith collections
Returns:
List of collection dictionaries
"""
logger.info("fetching_collections")
response = self.get("collections")
collections = response.get("data", [])
logger.info(
"collections_fetched",
count=len(collections)
)
return collections
def get_collection_details(self, collection_name: str) -> Dict[str, Any]:
"""
Get details for a specific collection
Args:
collection_name: Collection abbreviation (e.g., 'bukhari')
Returns:
Collection details dictionary
"""
logger.info(
"fetching_collection_details",
collection=collection_name
)
response = self.get(f"collections/{collection_name}")
return response
def get_books(self, collection_name: str) -> List[Dict[str, Any]]:
"""
Get all books in a collection
Args:
collection_name: Collection abbreviation
Returns:
List of book dictionaries
"""
logger.info(
"fetching_books",
collection=collection_name
)
response = self.get(f"collections/{collection_name}/books")
books = response.get("data", [])
logger.info(
"books_fetched",
collection=collection_name,
count=len(books)
)
return books
def get_hadiths_in_book(
self,
collection_name: str,
book_number: int,
limit: int = 50,
page: int = 1
) -> Dict[str, Any]:
"""
Get hadiths in a specific book with pagination
Args:
collection_name: Collection abbreviation
book_number: Book number
limit: Number of hadiths per page
page: Page number
Returns:
Response with hadiths and pagination info
"""
logger.debug(
"fetching_hadiths",
collection=collection_name,
book=book_number,
page=page,
limit=limit
)
response = self.get(
f"collections/{collection_name}/books/{book_number}/hadiths",
params={"limit": limit, "page": page}
)
return response
def iter_all_hadiths_in_book(
self,
collection_name: str,
book_number: int,
batch_size: int = 50
) -> Generator[Dict[str, Any], None, None]:
"""
Iterator that yields all hadiths in a book, handling pagination automatically
Args:
collection_name: Collection abbreviation
book_number: Book number
batch_size: Number of hadiths to fetch per request
Yields:
Individual hadith dictionaries
"""
page = 1
total_fetched = 0
while True:
response = self.get_hadiths_in_book(
collection_name=collection_name,
book_number=book_number,
limit=batch_size,
page=page
)
hadiths = response.get("data", [])
if not hadiths:
logger.info(
"book_complete",
collection=collection_name,
book=book_number,
total_hadiths=total_fetched
)
break
for hadith in hadiths:
yield hadith
total_fetched += 1
# Check if there are more pages
pagination = response.get("pagination", {})
if page >= pagination.get("total_pages", 1):
break
page += 1
def iter_all_hadiths_in_collection(
self,
collection_name: str,
batch_size: int = 50
) -> Generator[tuple[Dict[str, Any], int], None, None]:
"""
Iterator that yields all hadiths in a collection
Args:
collection_name: Collection abbreviation
batch_size: Number of hadiths to fetch per request
Yields:
Tuple of (hadith_dict, book_number)
"""
# First, get all books in the collection
books = self.get_books(collection_name)
logger.info(
"starting_collection_fetch",
collection=collection_name,
total_books=len(books)
)
for book in books:
book_number = book.get("bookNumber")
if not book_number:
logger.warning(
"book_missing_number",
book=book
)
continue
logger.info(
"fetching_book",
collection=collection_name,
book=book_number
)
try:
for hadith in self.iter_all_hadiths_in_book(
collection_name=collection_name,
book_number=int(book_number),
batch_size=batch_size
):
yield hadith, int(book_number)
except Exception as e:
logger.error(
"book_fetch_failed",
collection=collection_name,
book=book_number,
error=str(e)
)
continue
def get_specific_hadith(
self,
collection_name: str,
book_number: int,
hadith_number: int
) -> Dict[str, Any]:
"""
Get a specific hadith by its number
Args:
collection_name: Collection abbreviation
book_number: Book number
hadith_number: Hadith number
Returns:
Hadith dictionary
"""
response = self.get(
f"hadiths/collection/{collection_name}/{book_number}/{hadith_number}"
)
return response.get("data", {})
=== ./src/main.py ===
"""
Main ingestion script for fetching hadiths from Sunnah.com API
"""
import sys
import argparse
from typing import Optional
from uuid import UUID
import structlog
from config.settings import settings
from api_clients.sunnah_client import SunnahAPIClient
from database.repository import HadithRepository
from processors.text_cleaner import ArabicTextProcessor, TextCleaner
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
class HadithIngestionService:
"""Service for ingesting hadiths from Sunnah.com API"""
def __init__(self):
self.api_client = SunnahAPIClient()
self.repo = HadithRepository()
self.text_processor = ArabicTextProcessor()
self.text_cleaner = TextCleaner()
def ingest_collection(
self,
collection_abbr: str,
limit: Optional[int] = None
) -> dict:
"""
Ingest entire collection from Sunnah.com API
Args:
collection_abbr: Collection abbreviation (e.g., 'bukhari')
limit: Optional limit on number of hadiths to ingest
Returns:
Statistics dictionary
"""
logger.info(
"ingestion_started",
collection=collection_abbr,
limit=limit
)
# Get collection from database
collection = self.repo.get_collection_by_abbreviation(collection_abbr)
if not collection:
logger.error(
"collection_not_found",
collection=collection_abbr
)
raise ValueError(f"Collection '{collection_abbr}' not found in database")
collection_id = UUID(collection['id'])
# Create ingestion job
job_id = self.repo.create_ingestion_job(
job_name=f"ingest_{collection_abbr}",
job_type="api_fetch",
source_name="sunnah.com",
config={"collection": collection_abbr, "limit": limit}
)
logger.info(
"job_created",
job_id=str(job_id),
collection=collection_abbr
)
stats = {
"processed": 0,
"failed": 0,
"skipped": 0
}
try:
# Iterate through all hadiths in collection
for hadith_data, book_number in self.api_client.iter_all_hadiths_in_collection(
collection_name=collection_abbr,
batch_size=50
):
# Check limit
if limit and stats["processed"] >= limit:
logger.info("limit_reached", limit=limit)
break
try:
# Process and store hadith
self._process_and_store_hadith(
collection_id=collection_id,
hadith_data=hadith_data,
book_number=book_number
)
stats["processed"] += 1
# Update job progress every 100 hadiths
if stats["processed"] % 100 == 0:
self.repo.update_job_progress(
job_id=job_id,
processed=stats["processed"],
failed=stats["failed"],
skipped=stats["skipped"]
)
logger.info(
"progress_update",
processed=stats["processed"],
failed=stats["failed"]
)
except Exception as e:
stats["failed"] += 1
logger.error(
"hadith_processing_failed",
error=str(e),
hadith_number=hadith_data.get("hadithNumber")
)
self.repo.add_processing_log(
job_id=job_id,
level="ERROR",
message=f"Failed to process hadith: {str(e)}",
details={"hadith_data": hadith_data}
)
# Update final job progress
self.repo.update_job_progress(
job_id=job_id,
total=stats["processed"] + stats["failed"] + stats["skipped"],
processed=stats["processed"],
failed=stats["failed"],
skipped=stats["skipped"]
)
# Mark job as complete
self.repo.complete_job(job_id=job_id, status="success")
# Update collection count
self.repo.update_collection_count(
collection_id=collection_id,
count=stats["processed"]
)
logger.info(
"ingestion_completed",
collection=collection_abbr,
stats=stats
)
return stats
except Exception as e:
logger.error(
"ingestion_failed",
collection=collection_abbr,
error=str(e)
)
self.repo.complete_job(
job_id=job_id,
status="failed",
error_message=str(e)
)
raise
def _process_and_store_hadith(
self,
collection_id: UUID,
hadith_data: dict,
book_number: int
):
"""Process and store a single hadith"""
# Extract hadith number
hadith_number = hadith_data.get("hadithNumber")
if not hadith_number:
raise ValueError("Missing hadith number")
# Extract text in multiple languages
hadith_texts = hadith_data.get("hadith", [])
arabic_text = None
english_text = None
urdu_text = None
grade = None
grade_source = None
chapter_name = None
for text_entry in hadith_texts:
lang = text_entry.get("lang", "").lower()
body = text_entry.get("body")
if not body:
continue
# Clean text
body = self.text_cleaner.clean_text(body)
if lang == "ar":
arabic_text = body
chapter_name = text_entry.get("chapterTitle")
# Extract grade from Arabic entry
grades = text_entry.get("grades", [])
if grades:
grade = grades[0].get("grade")
grade_source = grades[0].get("name")
elif lang == "en":
english_text = body
# Extract grade from English entry if not found
if not grade:
grades = text_entry.get("grades", [])
if grades:
grade = grades[0].get("grade")
grade_source = grades[0].get("name")
elif lang == "ur":
urdu_text = body
if not arabic_text:
raise ValueError("Missing Arabic text")
# Get or create book
book = self.repo.get_book(collection_id, book_number)
if not book:
# Extract book name from hadith data
book_name_en = None
book_name_ar = None
for text_entry in hadith_texts:
lang = text_entry.get("lang", "").lower()
book_data = text_entry.get("book", [{}])[0] if text_entry.get("book") else {}
if lang == "en" and book_data.get("name"):
book_name_en = book_data.get("name")
elif lang == "ar" and book_data.get("name"):
book_name_ar = book_data.get("name")
book_id = self.repo.upsert_book(
collection_id=collection_id,
book_number=book_number,
name_english=book_name_en,
name_arabic=book_name_ar
)
else:
book_id = UUID(book["id"])
# Store hadith
hadith_id = self.repo.upsert_hadith(
collection_id=collection_id,
book_id=book_id,
hadith_number=int(hadith_number),
arabic_text=arabic_text,
english_text=english_text,
urdu_text=urdu_text,
grade=grade,
grade_source=grade_source,
chapter_name=chapter_name,
source_id=str(hadith_data.get("id", "")),
source_url=hadith_data.get("reference", {}).get("link"),
source_metadata=hadith_data
)
logger.debug(
"hadith_stored",
hadith_id=str(hadith_id),
hadith_number=hadith_number,
book_number=book_number
)
def close(self):
"""Close connections"""
self.api_client.close()
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(description="Ingest hadiths from Sunnah.com API")
parser.add_argument(
"collection",
help="Collection abbreviation (e.g., bukhari, muslim)"
)
parser.add_argument(
"--limit",
type=int,
help="Limit number of hadiths to ingest"
)
args = parser.parse_args()
logger.info(
"script_started",
collection=args.collection,
limit=args.limit
)
try:
service = HadithIngestionService()
stats = service.ingest_collection(
collection_abbr=args.collection,
limit=args.limit
)
logger.info(
"script_completed",
stats=stats
)
print(f"\nIngestion completed successfully!")
print(f"Processed: {stats['processed']}")
print(f"Failed: {stats['failed']}")
print(f"Skipped: {stats['skipped']}")
service.close()
return 0
except Exception as e:
logger.error(
"script_failed",
error=str(e),
exc_info=True
)
print(f"\nIngestion failed: {str(e)}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())