inial repo creation

This commit is contained in:
salahangal 2025-11-14 10:15:41 +01:00
commit b059fcab6e
37 changed files with 2818 additions and 0 deletions

23
hadith-ingestion/.env Normal file
View File

@ -0,0 +1,23 @@
# Database
DATABASE_HOST=postgres.db.svc.cluster.local
DATABASE_PORT=5432
DATABASE_NAME=hadith_db
DATABASE_USER=hadith_ingest
DATABASE_PASSWORD=hadith_ingest
# HadithAPI.com
HADITHAPI_KEY=$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK
# MinIO
MINIO_ENDPOINT=minio.storage.svc.cluster.local:9000
MINIO_ACCESS_KEY=minioadmin
MINIO_SECRET_KEY=minioadmin
# Services
TEI_URL=http://tei.ml.svc.cluster.local
QDRANT_URL=http://qdrant.vector.svc.cluster.local:6333
# Settings
LOG_LEVEL=INFO
API_RATE_LIMIT=30
BATCH_SIZE=100

View File

@ -0,0 +1,30 @@
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
postgresql-client \
curl \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY config/ /app/config/
COPY src/ /app/src/
# Create non-root user
RUN useradd -m -u 1000 hadith && chown -R hadith:hadith /app
USER hadith
# Set Python path
ENV PYTHONPATH=/app
# Default command
CMD ["python", "/app/src/main_hadithapi.py"]

View File

View File

@ -0,0 +1,193 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: ingest-hadith-collection-
namespace: argo
spec:
entrypoint: ingest-pipeline
# Arguments
arguments:
parameters:
- name: collection
value: "bukhari"
- name: limit
value: "0" # 0 means no limit
# Service account with database access
serviceAccountName: argo-workflow
# Templates
templates:
# ========================================
# Main pipeline
# ========================================
- name: ingest-pipeline
steps:
- - name: ingest-hadiths
template: ingest
arguments:
parameters:
- name: collection
value: "{{workflow.parameters.collection}}"
- name: limit
value: "{{workflow.parameters.limit}}"
- - name: generate-embeddings
template: generate-embeddings
arguments:
parameters:
- name: collection
value: "{{workflow.parameters.collection}}"
- - name: index-qdrant
template: index-qdrant
arguments:
parameters:
- name: collection
value: "{{workflow.parameters.collection}}"
# ========================================
# Ingestion step
# ========================================
- name: ingest
inputs:
parameters:
- name: collection
- name: limit
container:
image: hadith-ingestion:latest
imagePullPolicy: IfNotPresent
command: [python, /app/src/main.py]
args:
- "{{inputs.parameters.collection}}"
- "--limit={{inputs.parameters.limit}}"
env:
- name: DATABASE_HOST
value: "postgres.db.svc.cluster.local"
- name: DATABASE_PORT
value: "5432"
- name: DATABASE_NAME
value: "hadith_db"
- name: DATABASE_USER
value: "hadith_ingest"
- name: DATABASE_PASSWORD
valueFrom:
secretKeyRef:
name: hadith-db-secret
key: password
- name: MINIO_ENDPOINT
value: "minio.storage.svc.cluster.local:9000"
- name: MINIO_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: access-key
- name: MINIO_SECRET_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: secret-key
- name: SUNNAH_API_KEY
valueFrom:
secretKeyRef:
name: sunnah-api-secret
key: api-key
- name: LOG_LEVEL
value: "INFO"
- name: JOB_NAME
value: "{{workflow.name}}"
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 2
memory: 4Gi
# ========================================
# Embedding generation step
# ========================================
- name: generate-embeddings
inputs:
parameters:
- name: collection
container:
image: hadith-embeddings:latest
imagePullPolicy: IfNotPresent
command: [python, /app/generate_embeddings.py]
args:
- "--collection={{inputs.parameters.collection}}"
- "--batch-size=32"
env:
- name: DATABASE_URL
value: "postgresql://hadith_ingest:$(DATABASE_PASSWORD)@postgres.db.svc.cluster.local:5432/hadith_db"
- name: DATABASE_PASSWORD
valueFrom:
secretKeyRef:
name: hadith-db-secret
key: password
- name: TEI_URL
value: "http://tei.ml.svc.cluster.local"
- name: LOG_LEVEL
value: "INFO"
resources:
requests:
cpu: 1
memory: 2Gi
limits:
cpu: 4
memory: 8Gi
# ========================================
# Qdrant indexing step
# ========================================
- name: index-qdrant
inputs:
parameters:
- name: collection
container:
image: hadith-qdrant-indexer:latest
imagePullPolicy: IfNotPresent
command: [python, /app/index_qdrant.py]
args:
- "--collection={{inputs.parameters.collection}}"
- "--batch-size=100"
env:
- name: DATABASE_URL
value: "postgresql://hadith_ingest:$(DATABASE_PASSWORD)@postgres.db.svc.cluster.local:5432/hadith_db"
- name: DATABASE_PASSWORD
valueFrom:
secretKeyRef:
name: hadith-db-secret
key: password
- name: QDRANT_URL
value: "http://qdrant.vector.svc.cluster.local:6333"
- name: QDRANT_COLLECTION
value: "hadith_embeddings"
- name: LOG_LEVEL
value: "INFO"
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 2
memory: 4Gi

View File

@ -0,0 +1,204 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: ingest-hadithapi-
namespace: argo
spec:
entrypoint: ingest-pipeline
arguments:
parameters:
- name: book-slug
value: "sahih-bukhari"
- name: limit
value: "0" # 0 means no limit
serviceAccountName: argo-workflow
templates:
# ========================================
# Main pipeline
# ========================================
- name: ingest-pipeline
steps:
- - name: ingest-hadiths
template: ingest
arguments:
parameters:
- name: book-slug
value: "{{workflow.parameters.book-slug}}"
- name: limit
value: "{{workflow.parameters.limit}}"
# ========================================
# Ingestion step
# ========================================
- name: ingest
inputs:
parameters:
- name: book-slug
- name: limit
container:
image: hadith-ingestion:latest
imagePullPolicy: IfNotPresent
command: [python, /app/src/main_hadithapi.py]
args:
- "--book-slug={{inputs.parameters.book-slug}}"
- "--limit={{inputs.parameters.limit}}"
env:
- name: DATABASE_HOST
value: "postgres.db.svc.cluster.local"
- name: DATABASE_PORT
value: "5432"
- name: DATABASE_NAME
value: "hadith_db"
- name: DATABASE_USER
value: "hadith_ingest"
- name: DATABASE_PASSWORD
valueFrom:
secretKeyRef:
name: hadith-db-secret
key: password
- name: HADITHAPI_KEY
valueFrom:
secretKeyRef:
name: hadithapi-secret
key: api-key
- name: MINIO_ENDPOINT
value: "minio.storage.svc.cluster.local:9000"
- name: MINIO_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: access-key
- name: MINIO_SECRET_KEY
valueFrom:
secretKeyRef:
name: minio-secret
key: secret-key
- name: LOG_LEVEL
value: "INFO"
- name: JOB_NAME
value: "{{workflow.name}}"
- name: API_RATE_LIMIT
value: "30"
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 2
memory: 4Gi
---
# Workflow to ingest ALL books
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: ingest-all-hadithapi-
namespace: argo
spec:
entrypoint: ingest-all-books
serviceAccountName: argo-workflow
arguments:
parameters:
- name: limit-per-book
value: "0" # 0 means no limit
templates:
# ========================================
# Main pipeline - sequential processing
# ========================================
- name: ingest-all-books
steps:
# Process each book sequentially to avoid rate limiting
- - name: sahih-bukhari
template: ingest-book
arguments:
parameters:
- name: book-slug
value: "sahih-bukhari"
- - name: sahih-muslim
template: ingest-book
arguments:
parameters:
- name: book-slug
value: "sahih-muslim"
- - name: sunan-abu-dawood
template: ingest-book
arguments:
parameters:
- name: book-slug
value: "sunan-abu-dawood"
- - name: jami-at-tirmidhi
template: ingest-book
arguments:
parameters:
- name: book-slug
value: "jami-at-tirmidhi"
- - name: sunan-an-nasai
template: ingest-book
arguments:
parameters:
- name: book-slug
value: "sunan-an-nasai"
- - name: sunan-ibn-e-majah
template: ingest-book
arguments:
parameters:
- name: book-slug
value: "sunan-ibn-e-majah"
# ========================================
# Book ingestion template
# ========================================
- name: ingest-book
inputs:
parameters:
- name: book-slug
container:
image: hadith-ingestion:latest
imagePullPolicy: IfNotPresent
command: [python, /app/src/main_hadithapi.py]
args:
- "--book-slug={{inputs.parameters.book-slug}}"
- "--limit={{workflow.parameters.limit-per-book}}"
env:
- name: DATABASE_HOST
value: "postgres.db.svc.cluster.local"
- name: DATABASE_PASSWORD
valueFrom:
secretKeyRef:
name: hadith-db-secret
key: password
- name: HADITHAPI_KEY
valueFrom:
secretKeyRef:
name: hadithapi-secret
key: api-key
- name: LOG_LEVEL
value: "INFO"
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 2
memory: 4Gi

View File

@ -0,0 +1,23 @@
#!/bin/bash
# build-and-push.sh
set -e
# Configuration
IMAGE_NAME="hadith-ingestion"
TAG="${1:-latest}"
REGISTRY="${DOCKER_REGISTRY:-localhost:5000}"
echo "Building Docker image: ${IMAGE_NAME}:${TAG}"
# Build image
docker build -t ${IMAGE_NAME}:${TAG} -f Dockerfile .
# Tag for registry
docker tag ${IMAGE_NAME}:${TAG} ${REGISTRY}/${IMAGE_NAME}:${TAG}
# Push to registry
echo "Pushing to registry: ${REGISTRY}"
docker push ${REGISTRY}/${IMAGE_NAME}:${TAG}
echo "Done!"

View File

@ -0,0 +1,20 @@
#!/bin/bash
# build-hadithapi-ingestion.sh
set -e
IMAGE_NAME="hadith-ingestion"
TAG="v1.0-hadithapi"
echo "Building Docker image for HadithAPI.com ingestion..."
# Build image
docker build -t ${IMAGE_NAME}:${TAG} -f Dockerfile .
# Tag as latest
docker tag ${IMAGE_NAME}:${TAG} ${IMAGE_NAME}:latest
# If you have a registry, push
# docker push your-registry/${IMAGE_NAME}:${TAG}
echo "Build complete: ${IMAGE_NAME}:${TAG}"

View File

View File

@ -0,0 +1,80 @@
"""
Configuration settings for hadith ingestion service
"""
from pydantic_settings import BaseSettings
from typing import Optional
import os
class Settings(BaseSettings):
"""Application settings loaded from environment variables"""
# Database
DATABASE_HOST: str = "postgres.db.svc.cluster.local"
DATABASE_PORT: int = 5432
DATABASE_NAME: str = "hadith_db"
DATABASE_USER: str = "hadith_ingest"
DATABASE_PASSWORD: str = "hadith_ingest"
@property
def DATABASE_URL(self) -> str:
return (
f"postgresql://{self.DATABASE_USER}:{self.DATABASE_PASSWORD}"
f"@{self.DATABASE_HOST}:{self.DATABASE_PORT}/{self.DATABASE_NAME}"
)
@property
def ASYNC_DATABASE_URL(self) -> str:
return (
f"postgresql+asyncpg://{self.DATABASE_USER}:{self.DATABASE_PASSWORD}"
f"@{self.DATABASE_HOST}:{self.DATABASE_PORT}/{self.DATABASE_NAME}"
)
# MinIO / S3
MINIO_ENDPOINT: str = "minio.storage.svc.cluster.local:9000"
MINIO_ACCESS_KEY: str = "minioadmin"
MINIO_SECRET_KEY: str = "minioadmin"
MINIO_BUCKET_RAW: str = "hadith-raw-data"
MINIO_BUCKET_PROCESSED: str = "hadith-processed"
MINIO_SECURE: bool = False
# APIs
SUNNAH_API_KEY: Optional[str] = None
SUNNAH_BASE_URL: str = "https://api.sunnah.com/v1"
HADITH_ONE_API_KEY: Optional[str] = "$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK"
# HadithAPI.com
HADITHAPI_KEY: str = "$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK"
HADITHAPI_BASE_URL: str = "https://hadithapi.com/api"
# Rate limiting
API_RATE_LIMIT: int = 30 # requests per minute
API_MAX_RETRIES: int = 3
API_RETRY_DELAY: int = 5 # seconds
# Processing
BATCH_SIZE: int = 100
MAX_WORKERS: int = 4
# TEI Service (for embeddings)
TEI_URL: str = "http://tei.ml.svc.cluster.local"
TEI_TIMEOUT: int = 30
# Qdrant
QDRANT_URL: str = "http://qdrant.db.svc.cluster.local:6333"
QDRANT_COLLECTION: str = "hadith_embeddings"
# Logging
LOG_LEVEL: str = "INFO"
LOG_FORMAT: str = "json"
# Job tracking
JOB_NAME: Optional[str] = None
JOB_TYPE: str = "api_fetch"
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
case_sensitive = True
# Global settings instance
settings = Settings()

View File

@ -0,0 +1,20 @@
#!/bin/bash
# create-secrets.sh
# Database secret
kubectl -n ml create secret generic hadith-db-secret \
--from-literal=password='hadith_ingest' \
--dry-run=client -o yaml | kubectl apply -f -
# HadithAPI secret (already public, but for consistency)
kubectl -n ml create secret generic hadithapi-secret \
--from-literal=api-key='$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK' \
--dry-run=client -o yaml | kubectl apply -f -
# MinIO secret
kubectl -n ml create secret generic minio-secret \
--from-literal=access-key='minioadmin' \
--from-literal=secret-key='minioadmin' \
--dry-run=client -o yaml | kubectl apply -f -
echo "Secrets created successfully"

View File

@ -0,0 +1,42 @@
# Core dependencies
python-dotenv==1.0.0
pydantic==2.5.0
pydantic-settings==2.1.0
# HTTP clients
httpx==0.25.2
requests==2.31.0
tenacity==8.2.3
# Database
psycopg2-binary==2.9.9
sqlalchemy==2.0.23
asyncpg==0.29.0
# Data processing
pandas==2.1.4
numpy==1.26.2
pyarabic==0.6.15
arabic-reshaper==3.0.0
# Validation
jsonschema==4.20.0
validators==0.22.0
# Logging & Monitoring
structlog==23.2.0
prometheus-client==0.19.0
# Cloud storage
minio==7.2.0
boto3==1.34.0
# Task queue (optional)
celery==5.3.4
redis==5.0.1
# Testing
pytest==7.4.3
pytest-asyncio==0.21.1
pytest-cov==4.1.0
faker==21.0.0

View File

@ -0,0 +1,49 @@
#!/bin/bash
# run-full-ingestion.sh
set -e
echo "=== Starting Full HadithAPI Ingestion ==="
# Books to ingest (in order)
BOOKS=(
"sahih-bukhari"
"sahih-muslim"
"sunan-abu-dawood"
"jami-at-tirmidhi"
"sunan-an-nasai"
"sunan-ibn-e-majah"
)
for BOOK in "${BOOKS[@]}"; do
echo -e "\n========================================="
echo "Ingesting: $BOOK"
echo "========================================="
argo submit -n argo argo/workflows/ingest-hadithapi.yaml \
--parameter book-slug=$BOOK \
--parameter limit=0 \
--wait \
--log
echo "$BOOK completed!"
# Optional: add delay between books
sleep 10
done
echo -e "\n=== All Books Ingestion Complete ==="
# Print summary
kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c "
SELECT
c.name_english,
c.abbreviation,
COUNT(h.id) as hadith_count,
COUNT(DISTINCT b.id) as chapter_count
FROM collections c
LEFT JOIN hadiths h ON c.id = h.collection_id
LEFT JOIN books b ON h.book_id = b.id
GROUP BY c.name_english, c.abbreviation
ORDER BY hadith_count DESC;
"

View File

View File

View File

@ -0,0 +1,131 @@
"""
Base API client with retry logic and rate limiting
"""
import httpx
import time
from typing import Optional, Dict, Any
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
import structlog
from config.settings import settings
logger = structlog.get_logger()
class BaseAPIClient:
"""Base class for API clients with built-in retry and rate limiting"""
def __init__(
self,
base_url: str,
api_key: Optional[str] = None,
rate_limit: int = 90,
timeout: int = 30
):
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.rate_limit = rate_limit
self.timeout = timeout
# Rate limiting
self.request_times = []
self.min_interval = 60.0 / rate_limit # seconds between requests
# HTTP client
self.client = httpx.Client(timeout=timeout)
logger.info(
"api_client_initialized",
base_url=base_url,
rate_limit=rate_limit
)
def _wait_for_rate_limit(self):
"""Implement rate limiting"""
now = time.time()
# Remove old timestamps (older than 1 minute)
self.request_times = [t for t in self.request_times if now - t < 60]
# If we're at the limit, wait
if len(self.request_times) >= self.rate_limit:
sleep_time = 60 - (now - self.request_times[0])
if sleep_time > 0:
logger.info(
"rate_limit_wait",
sleep_seconds=sleep_time,
requests_in_window=len(self.request_times)
)
time.sleep(sleep_time)
self.request_times = []
# Add current timestamp
self.request_times.append(time.time())
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)),
reraise=True
)
def _make_request(
self,
method: str,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""Make HTTP request with retry logic"""
# Rate limiting
self._wait_for_rate_limit()
# Prepare headers
request_headers = headers or {}
if self.api_key:
request_headers['X-API-Key'] = self.api_key
# Make request
url = f"{self.base_url}/{endpoint.lstrip('/')}"
logger.debug(
"api_request",
method=method,
url=url,
params=params
)
response = self.client.request(
method=method,
url=url,
params=params,
headers=request_headers
)
response.raise_for_status()
logger.debug(
"api_response",
status_code=response.status_code,
response_size=len(response.content)
)
return response.json()
def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
"""Make GET request"""
return self._make_request("GET", endpoint, params=params)
def close(self):
"""Close the HTTP client"""
self.client.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

View File

@ -0,0 +1,297 @@
"""
Client for HadithAPI.com API
"""
from typing import List, Dict, Any, Optional, Generator
import structlog
from .base_client import BaseAPIClient
from config.settings import settings
logger = structlog.get_logger()
class HadithAPIClient(BaseAPIClient):
"""Client for interacting with hadithapi.com API"""
def __init__(self, api_key: Optional[str] = None):
super().__init__(
base_url="https://hadithapi.com/api",
api_key=api_key or settings.HADITHAPI_KEY,
rate_limit=30 # Conservative: 30 req/min
)
def _add_api_key(self, params: Optional[Dict] = None) -> Dict:
"""Add API key to request parameters"""
params = params or {}
params['apiKey'] = self.api_key
return params
def get_books(self) -> List[Dict[str, Any]]:
"""
Get list of all available books/collections
Returns:
List of book dictionaries
"""
logger.info("fetching_books")
params = self._add_api_key()
response = self.get("books", params=params)
if response.get('status') != 200:
logger.error(
"api_error",
status=response.get('status'),
message=response.get('message')
)
raise Exception(f"API Error: {response.get('message')}")
books = response.get('data', [])
logger.info(
"books_fetched",
count=len(books)
)
return books
def get_chapters(self, book_slug: str) -> List[Dict[str, Any]]:
"""
Get chapters for a specific book
Args:
book_slug: Book slug identifier (e.g., 'sahih-bukhari')
Returns:
List of chapter dictionaries
"""
logger.info(
"fetching_chapters",
book_slug=book_slug
)
params = self._add_api_key()
response = self.get(f"{book_slug}/chapters", params=params)
if response.get('status') != 200:
logger.error(
"api_error",
status=response.get('status'),
message=response.get('message')
)
raise Exception(f"API Error: {response.get('message')}")
chapters = response.get('data', [])
logger.info(
"chapters_fetched",
book_slug=book_slug,
count=len(chapters)
)
return chapters
def get_hadiths_page(
self,
book_id: int,
chapter_id: Optional[int] = None,
page: int = 1,
limit: int = 100
) -> Dict[str, Any]:
"""
Get a page of hadiths
Args:
book_id: Book ID
chapter_id: Optional chapter ID to filter by
page: Page number (1-indexed)
limit: Results per page (max 100)
Returns:
Response dictionary with hadiths and pagination info
"""
params = self._add_api_key({
'book': book_id,
'page': page,
'limit': min(limit, 100) # Enforce max limit
})
if chapter_id:
params['chapter'] = chapter_id
logger.debug(
"fetching_hadiths_page",
book_id=book_id,
chapter_id=chapter_id,
page=page,
limit=limit
)
response = self.get("hadiths", params=params)
if response.get('status') != 200:
logger.error(
"api_error",
status=response.get('status'),
message=response.get('message')
)
raise Exception(f"API Error: {response.get('message')}")
return response.get('data', {})
def iter_all_hadiths_in_book(
self,
book_id: int,
book_slug: str,
chapter_id: Optional[int] = None,
batch_size: int = 100
) -> Generator[Dict[str, Any], None, None]:
"""
Iterator that yields all hadiths in a book, handling pagination automatically
Args:
book_id: Book ID
book_slug: Book slug for logging
chapter_id: Optional chapter ID to filter by
batch_size: Number of hadiths to fetch per request (max 100)
Yields:
Individual hadith dictionaries
"""
page = 1
total_fetched = 0
while True:
response_data = self.get_hadiths_page(
book_id=book_id,
chapter_id=chapter_id,
page=page,
limit=batch_size
)
hadiths = response_data.get('hadiths', [])
pagination = response_data.get('pagination', {})
if not hadiths:
logger.info(
"book_complete",
book_slug=book_slug,
chapter_id=chapter_id,
total_hadiths=total_fetched
)
break
for hadith in hadiths:
yield hadith
total_fetched += 1
# Log progress
if total_fetched % 500 == 0:
logger.info(
"progress",
book_slug=book_slug,
fetched=total_fetched,
total=pagination.get('total', '?')
)
# Check if there are more pages
current_page = pagination.get('current_page', page)
last_page = pagination.get('last_page', 1)
if current_page >= last_page:
logger.info(
"book_complete",
book_slug=book_slug,
total_hadiths=total_fetched,
total_pages=last_page
)
break
page += 1
def iter_all_hadiths_in_book_with_chapters(
self,
book_id: int,
book_slug: str,
batch_size: int = 100
) -> Generator[tuple[Dict[str, Any], Optional[Dict[str, Any]]], None, None]:
"""
Iterator that yields all hadiths in a book, organized by chapter
Args:
book_id: Book ID
book_slug: Book slug
batch_size: Number of hadiths to fetch per request
Yields:
Tuple of (hadith_dict, chapter_dict or None)
"""
# First, get all chapters
try:
chapters = self.get_chapters(book_slug)
except Exception as e:
logger.warning(
"chapters_fetch_failed",
book_slug=book_slug,
error=str(e),
fallback="fetching_all_hadiths_without_chapter_filter"
)
# Fallback: fetch all hadiths without chapter filter
for hadith in self.iter_all_hadiths_in_book(
book_id=book_id,
book_slug=book_slug,
batch_size=batch_size
):
chapter_info = hadith.get('chapter')
yield hadith, chapter_info
return
logger.info(
"starting_chapter_by_chapter_fetch",
book_slug=book_slug,
total_chapters=len(chapters)
)
# Process each chapter
for chapter in chapters:
chapter_id = chapter.get('id')
chapter_number = chapter.get('chapterNumber')
logger.info(
"fetching_chapter",
book_slug=book_slug,
chapter_id=chapter_id,
chapter_number=chapter_number
)
try:
for hadith in self.iter_all_hadiths_in_book(
book_id=book_id,
book_slug=book_slug,
chapter_id=chapter_id,
batch_size=batch_size
):
yield hadith, chapter
except Exception as e:
logger.error(
"chapter_fetch_failed",
book_slug=book_slug,
chapter_id=chapter_id,
error=str(e)
)
continue
def get_book_by_slug(self, book_slug: str) -> Optional[Dict[str, Any]]:
"""
Get book details by slug
Args:
book_slug: Book slug identifier
Returns:
Book dictionary or None if not found
"""
books = self.get_books()
for book in books:
if book.get('bookSlug') == book_slug:
return book
return None

View File

@ -0,0 +1,247 @@
"""
Client for Sunnah.com API
"""
from typing import List, Dict, Any, Optional, Generator
import structlog
from .base_client import BaseAPIClient
from config.settings import settings
logger = structlog.get_logger()
class SunnahAPIClient(BaseAPIClient):
"""Client for interacting with Sunnah.com API"""
def __init__(self, api_key: Optional[str] = None):
super().__init__(
base_url=settings.SUNNAH_BASE_URL,
api_key=api_key or settings.SUNNAH_API_KEY,
rate_limit=settings.API_RATE_LIMIT
)
def get_collections(self) -> List[Dict[str, Any]]:
"""
Get list of all hadith collections
Returns:
List of collection dictionaries
"""
logger.info("fetching_collections")
response = self.get("collections")
collections = response.get("data", [])
logger.info(
"collections_fetched",
count=len(collections)
)
return collections
def get_collection_details(self, collection_name: str) -> Dict[str, Any]:
"""
Get details for a specific collection
Args:
collection_name: Collection abbreviation (e.g., 'bukhari')
Returns:
Collection details dictionary
"""
logger.info(
"fetching_collection_details",
collection=collection_name
)
response = self.get(f"collections/{collection_name}")
return response
def get_books(self, collection_name: str) -> List[Dict[str, Any]]:
"""
Get all books in a collection
Args:
collection_name: Collection abbreviation
Returns:
List of book dictionaries
"""
logger.info(
"fetching_books",
collection=collection_name
)
response = self.get(f"collections/{collection_name}/books")
books = response.get("data", [])
logger.info(
"books_fetched",
collection=collection_name,
count=len(books)
)
return books
def get_hadiths_in_book(
self,
collection_name: str,
book_number: int,
limit: int = 50,
page: int = 1
) -> Dict[str, Any]:
"""
Get hadiths in a specific book with pagination
Args:
collection_name: Collection abbreviation
book_number: Book number
limit: Number of hadiths per page
page: Page number
Returns:
Response with hadiths and pagination info
"""
logger.debug(
"fetching_hadiths",
collection=collection_name,
book=book_number,
page=page,
limit=limit
)
response = self.get(
f"collections/{collection_name}/books/{book_number}/hadiths",
params={"limit": limit, "page": page}
)
return response
def iter_all_hadiths_in_book(
self,
collection_name: str,
book_number: int,
batch_size: int = 50
) -> Generator[Dict[str, Any], None, None]:
"""
Iterator that yields all hadiths in a book, handling pagination automatically
Args:
collection_name: Collection abbreviation
book_number: Book number
batch_size: Number of hadiths to fetch per request
Yields:
Individual hadith dictionaries
"""
page = 1
total_fetched = 0
while True:
response = self.get_hadiths_in_book(
collection_name=collection_name,
book_number=book_number,
limit=batch_size,
page=page
)
hadiths = response.get("data", [])
if not hadiths:
logger.info(
"book_complete",
collection=collection_name,
book=book_number,
total_hadiths=total_fetched
)
break
for hadith in hadiths:
yield hadith
total_fetched += 1
# Check if there are more pages
pagination = response.get("pagination", {})
if page >= pagination.get("total_pages", 1):
break
page += 1
def iter_all_hadiths_in_collection(
self,
collection_name: str,
batch_size: int = 50
) -> Generator[tuple[Dict[str, Any], int], None, None]:
"""
Iterator that yields all hadiths in a collection
Args:
collection_name: Collection abbreviation
batch_size: Number of hadiths to fetch per request
Yields:
Tuple of (hadith_dict, book_number)
"""
# First, get all books in the collection
books = self.get_books(collection_name)
logger.info(
"starting_collection_fetch",
collection=collection_name,
total_books=len(books)
)
for book in books:
book_number = book.get("bookNumber")
if not book_number:
logger.warning(
"book_missing_number",
book=book
)
continue
logger.info(
"fetching_book",
collection=collection_name,
book=book_number
)
try:
for hadith in self.iter_all_hadiths_in_book(
collection_name=collection_name,
book_number=int(book_number),
batch_size=batch_size
):
yield hadith, int(book_number)
except Exception as e:
logger.error(
"book_fetch_failed",
collection=collection_name,
book=book_number,
error=str(e)
)
continue
def get_specific_hadith(
self,
collection_name: str,
book_number: int,
hadith_number: int
) -> Dict[str, Any]:
"""
Get a specific hadith by its number
Args:
collection_name: Collection abbreviation
book_number: Book number
hadith_number: Hadith number
Returns:
Hadith dictionary
"""
response = self.get(
f"hadiths/collection/{collection_name}/{book_number}/{hadith_number}"
)
return response.get("data", {})

View File

@ -0,0 +1,334 @@
"""
Database repository for hadith data operations
"""
from typing import List, Dict, Any, Optional
from uuid import UUID
import structlog
from sqlalchemy import create_engine, text, select, insert, update
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.exc import IntegrityError
from config.settings import settings
logger = structlog.get_logger()
class HadithRepository:
"""Repository for hadith database operations"""
def __init__(self, database_url: Optional[str] = None):
self.database_url = database_url or settings.DATABASE_URL
self.engine = create_engine(self.database_url, pool_pre_ping=True)
self.SessionLocal = sessionmaker(bind=self.engine)
def get_session(self) -> Session:
"""Get database session"""
return self.SessionLocal()
# ===== Collections =====
def get_collection_by_abbreviation(self, abbr: str) -> Optional[Dict[str, Any]]:
"""Get collection by abbreviation"""
with self.get_session() as session:
query = text("""
SELECT * FROM collections
WHERE abbreviation = :abbr
""")
result = session.execute(query, {"abbr": abbr}).fetchone()
if result:
return dict(result._mapping)
return None
def get_all_collections(self) -> List[Dict[str, Any]]:
"""Get all collections"""
with self.get_session() as session:
query = text("SELECT * FROM collections ORDER BY name_english")
result = session.execute(query).fetchall()
return [dict(row._mapping) for row in result]
def update_collection_count(self, collection_id: UUID, count: int):
"""Update total hadith count for a collection"""
with self.get_session() as session:
query = text("""
UPDATE collections
SET total_hadiths = :count, updated_at = NOW()
WHERE id = :id
""")
session.execute(query, {"id": str(collection_id), "count": count})
session.commit()
# ===== Books =====
def upsert_book(
self,
collection_id: UUID,
book_number: int,
name_english: Optional[str] = None,
name_arabic: Optional[str] = None,
metadata: Optional[Dict] = None
) -> UUID:
"""Insert or update a book"""
with self.get_session() as session:
query = text("""
INSERT INTO books (collection_id, book_number, name_english, name_arabic, metadata)
VALUES (:collection_id, :book_number, :name_english, :name_arabic, :metadata)
ON CONFLICT (collection_id, book_number)
DO UPDATE SET
name_english = EXCLUDED.name_english,
name_arabic = EXCLUDED.name_arabic,
metadata = EXCLUDED.metadata
RETURNING id
""")
result = session.execute(query, {
"collection_id": str(collection_id),
"book_number": book_number,
"name_english": name_english,
"name_arabic": name_arabic,
"metadata": metadata or {}
})
session.commit()
return UUID(result.fetchone()[0])
def get_book(self, collection_id: UUID, book_number: int) -> Optional[Dict[str, Any]]:
"""Get book by collection and book number"""
with self.get_session() as session:
query = text("""
SELECT * FROM books
WHERE collection_id = :collection_id AND book_number = :book_number
""")
result = session.execute(query, {
"collection_id": str(collection_id),
"book_number": book_number
}).fetchone()
if result:
return dict(result._mapping)
return None
# ===== Hadiths =====
def upsert_hadith(
self,
collection_id: UUID,
hadith_number: int,
arabic_text: str,
book_id: Optional[UUID] = None,
english_text: Optional[str] = None,
urdu_text: Optional[str] = None,
grade: Optional[str] = None,
grade_source: Optional[str] = None,
chapter_name: Optional[str] = None,
source_id: Optional[str] = None,
source_url: Optional[str] = None,
source_metadata: Optional[Dict] = None
) -> UUID:
"""Insert or update a hadith"""
with self.get_session() as session:
query = text("""
INSERT INTO hadiths (
collection_id, book_id, hadith_number,
arabic_text, english_text, urdu_text,
grade, grade_source, chapter_name,
source_id, source_url, source_metadata
)
VALUES (
:collection_id, :book_id, :hadith_number,
:arabic_text, :english_text, :urdu_text,
:grade, :grade_source, :chapter_name,
:source_id, :source_url, :source_metadata
)
ON CONFLICT (collection_id, book_id, hadith_number)
DO UPDATE SET
arabic_text = EXCLUDED.arabic_text,
english_text = EXCLUDED.english_text,
urdu_text = EXCLUDED.urdu_text,
grade = EXCLUDED.grade,
grade_source = EXCLUDED.grade_source,
chapter_name = EXCLUDED.chapter_name,
source_url = EXCLUDED.source_url,
source_metadata = EXCLUDED.source_metadata,
updated_at = NOW()
RETURNING id
""")
result = session.execute(query, {
"collection_id": str(collection_id),
"book_id": str(book_id) if book_id else None,
"hadith_number": hadith_number,
"arabic_text": arabic_text,
"english_text": english_text,
"urdu_text": urdu_text,
"grade": grade,
"grade_source": grade_source,
"chapter_name": chapter_name,
"source_id": source_id,
"source_url": source_url,
"source_metadata": source_metadata or {}
})
session.commit()
return UUID(result.fetchone()[0])
def get_hadiths_without_embeddings(
self,
limit: int = 100,
collection_id: Optional[UUID] = None
) -> List[Dict[str, Any]]:
"""Get hadiths that need embedding generation"""
with self.get_session() as session:
if collection_id:
query = text("""
SELECT * FROM hadiths
WHERE embedding_generated = FALSE
AND collection_id = :collection_id
ORDER BY created_at ASC
LIMIT :limit
""")
result = session.execute(query, {
"collection_id": str(collection_id),
"limit": limit
}).fetchall()
else:
query = text("""
SELECT * FROM hadiths
WHERE embedding_generated = FALSE
ORDER BY created_at ASC
LIMIT :limit
""")
result = session.execute(query, {"limit": limit}).fetchall()
return [dict(row._mapping) for row in result]
def mark_embedding_generated(self, hadith_id: UUID, version: str = "v1"):
"""Mark hadith as having embedding generated"""
with self.get_session() as session:
query = text("""
UPDATE hadiths
SET embedding_generated = TRUE,
embedding_version = :version,
updated_at = NOW()
WHERE id = :id
""")
session.execute(query, {"id": str(hadith_id), "version": version})
session.commit()
# ===== Ingestion Jobs =====
def create_ingestion_job(
self,
job_name: str,
job_type: str,
source_name: str,
config: Optional[Dict] = None
) -> UUID:
"""Create a new ingestion job"""
with self.get_session() as session:
query = text("""
INSERT INTO ingestion_jobs (job_name, job_type, source_name, config, status, started_at)
VALUES (:job_name, :job_type, :source_name, :config, 'running', NOW())
RETURNING id
""")
result = session.execute(query, {
"job_name": job_name,
"job_type": job_type,
"source_name": source_name,
"config": config or {}
})
session.commit()
return UUID(result.fetchone()[0])
def update_job_progress(
self,
job_id: UUID,
total: Optional[int] = None,
processed: Optional[int] = None,
failed: Optional[int] = None,
skipped: Optional[int] = None
):
"""Update job progress counters"""
with self.get_session() as session:
updates = []
params = {"job_id": str(job_id)}
if total is not None:
updates.append("total_records = :total")
params["total"] = total
if processed is not None:
updates.append("processed_records = :processed")
params["processed"] = processed
if failed is not None:
updates.append("failed_records = :failed")
params["failed"] = failed
if skipped is not None:
updates.append("skipped_records = :skipped")
params["skipped"] = skipped
if updates:
query_str = f"""
UPDATE ingestion_jobs
SET {', '.join(updates)}
WHERE id = :job_id
"""
session.execute(text(query_str), params)
session.commit()
def complete_job(
self,
job_id: UUID,
status: str = "success",
error_message: Optional[str] = None
):
"""Mark job as completed"""
with self.get_session() as session:
query = text("""
UPDATE ingestion_jobs
SET status = :status,
completed_at = NOW(),
duration_seconds = EXTRACT(EPOCH FROM (NOW() - started_at)),
error_message = :error_message
WHERE id = :job_id
""")
session.execute(query, {
"job_id": str(job_id),
"status": status,
"error_message": error_message
})
session.commit()
def add_processing_log(
self,
job_id: UUID,
level: str,
message: str,
details: Optional[Dict] = None
):
"""Add a processing log entry"""
with self.get_session() as session:
query = text("""
INSERT INTO processing_logs (job_id, log_level, message, details)
VALUES (:job_id, :level, :message, :details)
""")
session.execute(query, {
"job_id": str(job_id),
"level": level,
"message": message,
"details": details or {}
})
session.commit()
# ===== Statistics =====
def get_collection_stats(self, collection_id: UUID) -> Dict[str, Any]:
"""Get statistics for a collection"""
with self.get_session() as session:
query = text("""
SELECT * FROM get_collection_statistics(:collection_id)
""")
result = session.execute(query, {"collection_id": str(collection_id)}).fetchone()
if result:
return dict(result._mapping)
return {}

View File

@ -0,0 +1,353 @@
"""
Main ingestion script for fetching hadiths from Sunnah.com API
"""
import sys
import argparse
from typing import Optional
from uuid import UUID
import structlog
from config.settings import settings
from api_clients.sunnah_client import SunnahAPIClient
from database.repository import HadithRepository
from processors.text_cleaner import ArabicTextProcessor, TextCleaner
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
class HadithIngestionService:
"""Service for ingesting hadiths from Sunnah.com API"""
def __init__(self):
self.api_client = SunnahAPIClient()
self.repo = HadithRepository()
self.text_processor = ArabicTextProcessor()
self.text_cleaner = TextCleaner()
def ingest_collection(
self,
collection_abbr: str,
limit: Optional[int] = None
) -> dict:
"""
Ingest entire collection from Sunnah.com API
Args:
collection_abbr: Collection abbreviation (e.g., 'bukhari')
limit: Optional limit on number of hadiths to ingest
Returns:
Statistics dictionary
"""
logger.info(
"ingestion_started",
collection=collection_abbr,
limit=limit
)
# Get collection from database
collection = self.repo.get_collection_by_abbreviation(collection_abbr)
if not collection:
logger.error(
"collection_not_found",
collection=collection_abbr
)
raise ValueError(f"Collection '{collection_abbr}' not found in database")
collection_id = UUID(collection['id'])
# Create ingestion job
job_id = self.repo.create_ingestion_job(
job_name=f"ingest_{collection_abbr}",
job_type="api_fetch",
source_name="sunnah.com",
config={"collection": collection_abbr, "limit": limit}
)
logger.info(
"job_created",
job_id=str(job_id),
collection=collection_abbr
)
stats = {
"processed": 0,
"failed": 0,
"skipped": 0
}
try:
# Iterate through all hadiths in collection
for hadith_data, book_number in self.api_client.iter_all_hadiths_in_collection(
collection_name=collection_abbr,
batch_size=50
):
# Check limit
if limit and stats["processed"] >= limit:
logger.info("limit_reached", limit=limit)
break
try:
# Process and store hadith
self._process_and_store_hadith(
collection_id=collection_id,
hadith_data=hadith_data,
book_number=book_number
)
stats["processed"] += 1
# Update job progress every 100 hadiths
if stats["processed"] % 100 == 0:
self.repo.update_job_progress(
job_id=job_id,
processed=stats["processed"],
failed=stats["failed"],
skipped=stats["skipped"]
)
logger.info(
"progress_update",
processed=stats["processed"],
failed=stats["failed"]
)
except Exception as e:
stats["failed"] += 1
logger.error(
"hadith_processing_failed",
error=str(e),
hadith_number=hadith_data.get("hadithNumber")
)
self.repo.add_processing_log(
job_id=job_id,
level="ERROR",
message=f"Failed to process hadith: {str(e)}",
details={"hadith_data": hadith_data}
)
# Update final job progress
self.repo.update_job_progress(
job_id=job_id,
total=stats["processed"] + stats["failed"] + stats["skipped"],
processed=stats["processed"],
failed=stats["failed"],
skipped=stats["skipped"]
)
# Mark job as complete
self.repo.complete_job(job_id=job_id, status="success")
# Update collection count
self.repo.update_collection_count(
collection_id=collection_id,
count=stats["processed"]
)
logger.info(
"ingestion_completed",
collection=collection_abbr,
stats=stats
)
return stats
except Exception as e:
logger.error(
"ingestion_failed",
collection=collection_abbr,
error=str(e)
)
self.repo.complete_job(
job_id=job_id,
status="failed",
error_message=str(e)
)
raise
def _process_and_store_hadith(
self,
collection_id: UUID,
hadith_data: dict,
book_number: int
):
"""Process and store a single hadith"""
# Extract hadith number
hadith_number = hadith_data.get("hadithNumber")
if not hadith_number:
raise ValueError("Missing hadith number")
# Extract text in multiple languages
hadith_texts = hadith_data.get("hadith", [])
arabic_text = None
english_text = None
urdu_text = None
grade = None
grade_source = None
chapter_name = None
for text_entry in hadith_texts:
lang = text_entry.get("lang", "").lower()
body = text_entry.get("body")
if not body:
continue
# Clean text
body = self.text_cleaner.clean_text(body)
if lang == "ar":
arabic_text = body
chapter_name = text_entry.get("chapterTitle")
# Extract grade from Arabic entry
grades = text_entry.get("grades", [])
if grades:
grade = grades[0].get("grade")
grade_source = grades[0].get("name")
elif lang == "en":
english_text = body
# Extract grade from English entry if not found
if not grade:
grades = text_entry.get("grades", [])
if grades:
grade = grades[0].get("grade")
grade_source = grades[0].get("name")
elif lang == "ur":
urdu_text = body
if not arabic_text:
raise ValueError("Missing Arabic text")
# Get or create book
book = self.repo.get_book(collection_id, book_number)
if not book:
# Extract book name from hadith data
book_name_en = None
book_name_ar = None
for text_entry in hadith_texts:
lang = text_entry.get("lang", "").lower()
book_data = text_entry.get("book", [{}])[0] if text_entry.get("book") else {}
if lang == "en" and book_data.get("name"):
book_name_en = book_data.get("name")
elif lang == "ar" and book_data.get("name"):
book_name_ar = book_data.get("name")
book_id = self.repo.upsert_book(
collection_id=collection_id,
book_number=book_number,
name_english=book_name_en,
name_arabic=book_name_ar
)
else:
book_id = UUID(book["id"])
# Store hadith
hadith_id = self.repo.upsert_hadith(
collection_id=collection_id,
book_id=book_id,
hadith_number=int(hadith_number),
arabic_text=arabic_text,
english_text=english_text,
urdu_text=urdu_text,
grade=grade,
grade_source=grade_source,
chapter_name=chapter_name,
source_id=str(hadith_data.get("id", "")),
source_url=hadith_data.get("reference", {}).get("link"),
source_metadata=hadith_data
)
logger.debug(
"hadith_stored",
hadith_id=str(hadith_id),
hadith_number=hadith_number,
book_number=book_number
)
def close(self):
"""Close connections"""
self.api_client.close()
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(description="Ingest hadiths from Sunnah.com API")
parser.add_argument(
"collection",
help="Collection abbreviation (e.g., bukhari, muslim)"
)
parser.add_argument(
"--limit",
type=int,
help="Limit number of hadiths to ingest"
)
args = parser.parse_args()
logger.info(
"script_started",
collection=args.collection,
limit=args.limit
)
try:
service = HadithIngestionService()
stats = service.ingest_collection(
collection_abbr=args.collection,
limit=args.limit
)
logger.info(
"script_completed",
stats=stats
)
print(f"\nIngestion completed successfully!")
print(f"Processed: {stats['processed']}")
print(f"Failed: {stats['failed']}")
print(f"Skipped: {stats['skipped']}")
service.close()
return 0
except Exception as e:
logger.error(
"script_failed",
error=str(e),
exc_info=True
)
print(f"\nIngestion failed: {str(e)}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,516 @@
"""
Main ingestion script for fetching hadiths from HadithAPI.com
"""
import sys
import argparse
from typing import Optional, Dict, Any
from uuid import UUID
import structlog
from config.settings import settings
from api_clients.hadithapi_client import HadithAPIClient
from database.repository import HadithRepository
from processors.text_cleaner import ArabicTextProcessor, TextCleaner
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
# Book slug to collection abbreviation mapping
BOOK_SLUG_MAPPING = {
'sahih-bukhari': 'bukhari',
'sahih-muslim': 'muslim',
'sunan-abu-dawood': 'abudawud',
'jami-at-tirmidhi': 'tirmidhi',
'sunan-an-nasai': 'nasai',
'sunan-ibn-e-majah': 'ibnmajah',
'muwatta-imam-malik': 'malik',
'musnad-ahmad': 'ahmad',
'sunan-ad-darimi': 'darimi',
'mishkat-al-masabih': 'mishkat'
}
class HadithAPIIngestionService:
"""Service for ingesting hadiths from HadithAPI.com"""
def __init__(self):
self.api_client = HadithAPIClient()
self.repo = HadithRepository()
self.text_processor = ArabicTextProcessor()
self.text_cleaner = TextCleaner()
def sync_books_from_api(self) -> Dict[str, Any]:
"""
Sync book metadata from API to database
Returns:
Dictionary mapping book_slug -> collection_id
"""
logger.info("syncing_books_from_api")
# Get books from API
api_books = self.api_client.get_books()
book_mapping = {}
for api_book in api_books:
book_slug = api_book.get('bookSlug')
# Map to our collection abbreviation
collection_abbr = BOOK_SLUG_MAPPING.get(book_slug)
if not collection_abbr:
logger.warning(
"unmapped_book",
book_slug=book_slug,
book_name=api_book.get('bookName')
)
continue
# Get or verify collection exists in database
collection = self.repo.get_collection_by_abbreviation(collection_abbr)
if not collection:
logger.warning(
"collection_not_in_db",
abbreviation=collection_abbr,
book_slug=book_slug
)
continue
collection_id = UUID(collection['id'])
book_mapping[book_slug] = {
'collection_id': collection_id,
'book_id': api_book.get('id'),
'book_name': api_book.get('bookName'),
'hadiths_count': api_book.get('hadiths_count'),
'chapters_count': api_book.get('chapters_count')
}
logger.info(
"book_mapped",
book_slug=book_slug,
collection_abbr=collection_abbr,
hadiths_count=api_book.get('hadiths_count')
)
logger.info(
"books_synced",
total_books=len(book_mapping)
)
return book_mapping
def ingest_collection(
self,
book_slug: str,
limit: Optional[int] = None
) -> dict:
"""
Ingest entire collection from HadithAPI.com
Args:
book_slug: Book slug identifier (e.g., 'sahih-bukhari')
limit: Optional limit on number of hadiths to ingest
Returns:
Statistics dictionary
"""
logger.info(
"ingestion_started",
book_slug=book_slug,
limit=limit
)
# Get book mapping
book_mapping = self.sync_books_from_api()
if book_slug not in book_mapping:
logger.error(
"book_not_mapped",
book_slug=book_slug,
available_books=list(book_mapping.keys())
)
raise ValueError(f"Book '{book_slug}' not found or not mapped")
book_info = book_mapping[book_slug]
collection_id = book_info['collection_id']
book_id = book_info['book_id']
# Create ingestion job
job_id = self.repo.create_ingestion_job(
job_name=f"ingest_{book_slug}",
job_type="api_fetch",
source_name="hadithapi.com",
config={
"book_slug": book_slug,
"book_id": book_id,
"limit": limit
}
)
logger.info(
"job_created",
job_id=str(job_id),
book_slug=book_slug,
expected_count=book_info.get('hadiths_count')
)
stats = {
"processed": 0,
"failed": 0,
"skipped": 0
}
try:
# Iterate through all hadiths in book
for hadith_data, chapter_data in self.api_client.iter_all_hadiths_in_book_with_chapters(
book_id=book_id,
book_slug=book_slug,
batch_size=100
):
# Check limit
if limit and stats["processed"] >= limit:
logger.info("limit_reached", limit=limit)
break
try:
# Process and store hadith
self._process_and_store_hadith(
collection_id=collection_id,
hadith_data=hadith_data,
chapter_data=chapter_data
)
stats["processed"] += 1
# Update job progress every 100 hadiths
if stats["processed"] % 100 == 0:
self.repo.update_job_progress(
job_id=job_id,
processed=stats["processed"],
failed=stats["failed"],
skipped=stats["skipped"]
)
logger.info(
"progress_update",
book_slug=book_slug,
processed=stats["processed"],
failed=stats["failed"],
percentage=round(
(stats["processed"] / int(book_info.get('hadiths_count', 1))) * 100,
2
) if book_info.get('hadiths_count') else 0
)
except Exception as e:
stats["failed"] += 1
logger.error(
"hadith_processing_failed",
error=str(e),
hadith_number=hadith_data.get("hadithNumber"),
hadith_id=hadith_data.get("id")
)
self.repo.add_processing_log(
job_id=job_id,
level="ERROR",
message=f"Failed to process hadith: {str(e)}",
details={"hadith_data": hadith_data}
)
# Update final job progress
self.repo.update_job_progress(
job_id=job_id,
total=stats["processed"] + stats["failed"] + stats["skipped"],
processed=stats["processed"],
failed=stats["failed"],
skipped=stats["skipped"]
)
# Mark job as complete
self.repo.complete_job(job_id=job_id, status="success")
# Update collection count
self.repo.update_collection_count(
collection_id=collection_id,
count=stats["processed"]
)
logger.info(
"ingestion_completed",
book_slug=book_slug,
stats=stats
)
return stats
except Exception as e:
logger.error(
"ingestion_failed",
book_slug=book_slug,
error=str(e),
exc_info=True
)
self.repo.complete_job(
job_id=job_id,
status="failed",
error_message=str(e)
)
raise
def _process_and_store_hadith(
self,
collection_id: UUID,
hadith_data: dict,
chapter_data: Optional[dict]
):
"""Process and store a single hadith"""
# Extract hadith number
hadith_number = hadith_data.get("hadithNumber")
if not hadith_number:
raise ValueError("Missing hadith number")
# Convert to integer
try:
hadith_number = int(hadith_number)
except (ValueError, TypeError):
raise ValueError(f"Invalid hadith number: {hadith_number}")
# Extract text in multiple languages
arabic_text = hadith_data.get("hadithArabic")
english_text = hadith_data.get("hadithEnglish")
urdu_text = hadith_data.get("hadithUrdu")
if not arabic_text:
raise ValueError("Missing Arabic text")
# Clean texts
arabic_text = self.text_cleaner.clean_text(arabic_text)
if english_text:
english_text = self.text_cleaner.clean_text(english_text)
if urdu_text:
urdu_text = self.text_cleaner.clean_text(urdu_text)
# Extract grade/status
grade = hadith_data.get("status")
# Get or create chapter (book in our schema)
book_id = None
chapter_name = None
if chapter_data:
chapter_id = chapter_data.get('id')
chapter_number = chapter_data.get('chapterNumber')
chapter_name_en = chapter_data.get('chapterEnglish')
chapter_name_ar = chapter_data.get('chapterArabic')
chapter_name = chapter_name_en
if chapter_number:
try:
chapter_number = int(chapter_number)
except (ValueError, TypeError):
chapter_number = chapter_id # Fallback to ID
# Get or create book (chapter in HadithAPI = book in our schema)
existing_book = self.repo.get_book(collection_id, chapter_number)
if not existing_book:
book_id = self.repo.upsert_book(
collection_id=collection_id,
book_number=chapter_number,
name_english=chapter_name_en,
name_arabic=chapter_name_ar,
metadata=chapter_data
)
else:
book_id = UUID(existing_book['id'])
# Build source metadata
source_metadata = {
'api_id': hadith_data.get('id'),
'englishNarrator': hadith_data.get('englishNarrator'),
'urduNarrator': hadith_data.get('urduNarrator'),
'bookSlug': hadith_data.get('bookSlug'),
'chapterId': hadith_data.get('chapterId'),
'chapter': chapter_data
}
# Store hadith
hadith_id = self.repo.upsert_hadith(
collection_id=collection_id,
book_id=book_id,
hadith_number=hadith_number,
arabic_text=arabic_text,
english_text=english_text,
urdu_text=urdu_text,
grade=grade,
grade_source="hadithapi.com",
chapter_name=chapter_name,
source_id=str(hadith_data.get('id', '')),
source_url=f"https://hadithapi.com/hadith/{hadith_data.get('id')}",
source_metadata=source_metadata
)
logger.debug(
"hadith_stored",
hadith_id=str(hadith_id),
hadith_number=hadith_number,
chapter_id=chapter_data.get('id') if chapter_data else None
)
def ingest_all_books(self, limit_per_book: Optional[int] = None) -> Dict[str, dict]:
"""
Ingest all available books
Args:
limit_per_book: Optional limit per book
Returns:
Dictionary of book_slug -> stats
"""
logger.info("ingesting_all_books", limit_per_book=limit_per_book)
book_mapping = self.sync_books_from_api()
results = {}
for book_slug in book_mapping.keys():
logger.info("starting_book", book_slug=book_slug)
try:
stats = self.ingest_collection(
book_slug=book_slug,
limit=limit_per_book
)
results[book_slug] = {"status": "success", "stats": stats}
except Exception as e:
logger.error(
"book_ingestion_failed",
book_slug=book_slug,
error=str(e)
)
results[book_slug] = {"status": "failed", "error": str(e)}
logger.info("all_books_completed", results=results)
return results
def close(self):
"""Close connections"""
self.api_client.close()
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description="Ingest hadiths from HadithAPI.com"
)
parser.add_argument(
"--book-slug",
help="Book slug (e.g., sahih-bukhari). If not provided, ingests all books."
)
parser.add_argument(
"--limit",
type=int,
help="Limit number of hadiths to ingest per book"
)
parser.add_argument(
"--list-books",
action="store_true",
help="List available books and exit"
)
args = parser.parse_args()
try:
service = HadithAPIIngestionService()
# List books mode
if args.list_books:
logger.info("listing_available_books")
book_mapping = service.sync_books_from_api()
print("\n=== Available Books ===\n")
for book_slug, info in book_mapping.items():
print(f"Book Slug: {book_slug}")
print(f" Name: {info['book_name']}")
print(f" Hadiths: {info['hadiths_count']}")
print(f" Chapters: {info['chapters_count']}")
print()
service.close()
return 0
# Ingest mode
if args.book_slug:
logger.info(
"script_started",
book_slug=args.book_slug,
limit=args.limit
)
stats = service.ingest_collection(
book_slug=args.book_slug,
limit=args.limit
)
logger.info("script_completed", stats=stats)
print(f"\n=== Ingestion completed for {args.book_slug} ===")
print(f"Processed: {stats['processed']}")
print(f"Failed: {stats['failed']}")
print(f"Skipped: {stats['skipped']}")
else:
# Ingest all books
logger.info("script_started_all_books", limit_per_book=args.limit)
results = service.ingest_all_books(limit_per_book=args.limit)
print("\n=== All Books Ingestion Summary ===\n")
for book_slug, result in results.items():
print(f"{book_slug}: {result['status']}")
if result['status'] == 'success':
stats = result['stats']
print(f" Processed: {stats['processed']}")
print(f" Failed: {stats['failed']}")
else:
print(f" Error: {result['error']}")
print()
service.close()
return 0
except Exception as e:
logger.error(
"script_failed",
error=str(e),
exc_info=True
)
print(f"\nIngestion failed: {str(e)}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,173 @@
"""
Text cleaning and normalization utilities
"""
import re
from typing import Optional
import unicodedata
import structlog
logger = structlog.get_logger()
class ArabicTextProcessor:
"""Process and normalize Arabic text"""
# Arabic diacritics to remove
DIACRITICS = re.compile(
r'[\u064B-\u065F\u0670\u06D6-\u06DC\u06DF-\u06E8\u06EA-\u06ED]'
)
# Tatweel (elongation character)
TATWEEL = '\u0640'
# Normalize Arabic letters
ALEF_VARIANTS = re.compile(r'[إأآا]')
ALEF_MAKSURA = 'ى'
YAA = 'ي'
TAA_MARBUTA = 'ة'
HAA = 'ه'
@classmethod
def remove_diacritics(cls, text: str) -> str:
"""Remove Arabic diacritics (tashkeel)"""
if not text:
return text
return cls.DIACRITICS.sub('', text)
@classmethod
def remove_tatweel(cls, text: str) -> str:
"""Remove tatweel (elongation) character"""
if not text:
return text
return text.replace(cls.TATWEEL, '')
@classmethod
def normalize_alef(cls, text: str) -> str:
"""Normalize all Alef variants to bare Alef"""
if not text:
return text
return cls.ALEF_VARIANTS.sub('ا', text)
@classmethod
def normalize_yaa(cls, text: str) -> str:
"""Normalize Alef Maksura to Yaa"""
if not text:
return text
return text.replace(cls.ALEF_MAKSURA, cls.YAA)
@classmethod
def normalize_taa_marbuta(cls, text: str) -> str:
"""Normalize Taa Marbuta to Haa"""
if not text:
return text
return text.replace(cls.TAA_MARBUTA, cls.HAA)
@classmethod
def normalize_whitespace(cls, text: str) -> str:
"""Normalize whitespace"""
if not text:
return text
# Replace multiple spaces with single space
text = re.sub(r'\s+', ' ', text)
# Trim
return text.strip()
@classmethod
def normalize_full(cls, text: str) -> str:
"""
Apply full normalization:
- Remove diacritics
- Remove tatweel
- Normalize Alef variants
- Normalize Yaa
- Normalize Taa Marbuta
- Normalize whitespace
"""
if not text:
return text
text = cls.remove_diacritics(text)
text = cls.remove_tatweel(text)
text = cls.normalize_alef(text)
text = cls.normalize_yaa(text)
text = cls.normalize_taa_marbuta(text)
text = cls.normalize_whitespace(text)
return text
@classmethod
def extract_sanad_matn(cls, text: str) -> tuple[Optional[str], Optional[str]]:
"""
Attempt to extract sanad (chain) and matn (text) from hadith
Common patterns:
- حدثنا ... قال ... (sanad ends before reported speech)
- Simple heuristic: Split on first occurrence of قال or أن
Returns:
Tuple of (sanad, matn) or (None, None) if cannot split
"""
if not text:
return None, None
# Look for common sanad-matn separators
separators = [
r'قال\s*رسول\s*الله', # "The Messenger of Allah said"
r'قال\s*النبي', # "The Prophet said"
r'عن\s*النبي', # "From the Prophet"
r'أن\s*رسول\s*الله', # "That the Messenger of Allah"
]
for pattern in separators:
match = re.search(pattern, text, re.IGNORECASE)
if match:
split_pos = match.start()
sanad = text[:split_pos].strip()
matn = text[split_pos:].strip()
logger.debug(
"sanad_matn_extracted",
sanad_length=len(sanad),
matn_length=len(matn)
)
return sanad, matn
# Could not split
logger.debug("sanad_matn_extraction_failed")
return None, None
class TextCleaner:
"""General text cleaning utilities"""
@staticmethod
def clean_html(text: str) -> str:
"""Remove HTML tags"""
if not text:
return text
return re.sub(r'<[^>]+>', '', text)
@staticmethod
def normalize_unicode(text: str) -> str:
"""Normalize Unicode (NFC normalization)"""
if not text:
return text
return unicodedata.normalize('NFC', text)
@staticmethod
def clean_text(text: str) -> str:
"""Apply general cleaning"""
if not text:
return text
# Remove HTML
text = TextCleaner.clean_html(text)
# Normalize Unicode
text = TextCleaner.normalize_unicode(text)
# Normalize whitespace
text = ArabicTextProcessor.normalize_whitespace(text)
return text

View File

View File

View File

View File

@ -0,0 +1,44 @@
#!/bin/bash
# test-hadithapi-k8s.sh
set -e
echo "=== Kubernetes HadithAPI Integration Test ==="
# 1. Create secrets
echo "Creating secrets..."
./create-secrets.sh
# 2. Build and load image (if using local cluster)
echo "Building Docker image..."
docker build -t hadith-ingestion:latest .
# If using kind/minikube, load image
# kind load docker-image hadith-ingestion:latest
# 3. Submit test workflow (10 hadiths)
echo "Submitting test workflow..."
argo submit -n ml argo/workflows/ingest-hadithapi.yaml \
--parameter book-slug=sahih-bukhari \
--parameter limit=10 \
--wait \
--log
# 4. Check workflow status
echo -e "\nChecking workflow status..."
argo list -n argo
# 5. Verify data in database
echo -e "\nVerifying data..."
kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c "
SELECT
c.name_english,
COUNT(h.id) as hadith_count,
MAX(h.created_at) as last_ingestion
FROM collections c
LEFT JOIN hadiths h ON c.id = h.collection_id
WHERE c.abbreviation = 'bukhari'
GROUP BY c.name_english;
"
echo -e "\n=== Test Complete ==="

View File

@ -0,0 +1,39 @@
#!/bin/bash
# test-hadithapi-local.sh
set -e
echo "=== HadithAPI.com Integration Test ==="
# 1. Test API connection
echo "Testing API connection..."
curl -s "https://hadithapi.com/api/books?apiKey=\$2y\$10\$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK" | jq .
# 2. Test database connection
echo -e "\nTesting database connection..."
kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c "SELECT COUNT(*) FROM collections;"
# 3. List available books
echo -e "\nListing available books..."
python src/main_hadithapi.py --list-books
# 4. Test ingestion (limited to 10 hadiths)
echo -e "\nRunning test ingestion (10 hadiths from Sahih Bukhari)..."
python src/main_hadithapi.py --book-slug sahih-bukhari --limit 10
# 5. Verify data
echo -e "\nVerifying ingested data..."
kubectl -n db exec -it postgres-0 -- psql -U hadith_ingest -d hadith_db -c "
SELECT
c.name_english,
c.abbreviation,
COUNT(h.id) as hadith_count,
COUNT(DISTINCT b.id) as book_count
FROM collections c
LEFT JOIN hadiths h ON c.id = h.collection_id
LEFT JOIN books b ON h.book_id = b.id
WHERE c.abbreviation = 'bukhari'
GROUP BY c.name_english, c.abbreviation;
"
echo -e "\n=== Test Complete ==="

View File

View File