fix ingestion tests errors

This commit is contained in:
salahangal 2025-11-17 13:49:40 +01:00
parent 53cd6e2415
commit 737ac54c1b
23 changed files with 194 additions and 62 deletions

View File

@ -1,5 +1,6 @@
# Database # Database
DATABASE_HOST=postgres.db.svc.cluster.local # DATABASE_HOST=postgres.db.svc.cluster.local
DATABASE_HOST = pg.betelgeusebytes.io
DATABASE_PORT=5432 DATABASE_PORT=5432
DATABASE_NAME=hadith_db DATABASE_NAME=hadith_db
DATABASE_USER=hadith_ingest DATABASE_USER=hadith_ingest

View File

@ -10,7 +10,8 @@ class Settings(BaseSettings):
"""Application settings loaded from environment variables""" """Application settings loaded from environment variables"""
# Database # Database
DATABASE_HOST: str = "postgres.db.svc.cluster.local" # DATABASE_HOST: str = "postgres.db.svc.cluster.local"
DATABASE_HOST: str = "pg.betelgeusebytes.io"
DATABASE_PORT: int = 5432 DATABASE_PORT: int = 5432
DATABASE_NAME: str = "hadith_db" DATABASE_NAME: str = "hadith_db"
DATABASE_USER: str = "hadith_ingest" DATABASE_USER: str = "hadith_ingest"
@ -29,7 +30,7 @@ class Settings(BaseSettings):
f"postgresql+asyncpg://{self.DATABASE_USER}:{self.DATABASE_PASSWORD}" f"postgresql+asyncpg://{self.DATABASE_USER}:{self.DATABASE_PASSWORD}"
f"@{self.DATABASE_HOST}:{self.DATABASE_PORT}/{self.DATABASE_NAME}" f"@{self.DATABASE_HOST}:{self.DATABASE_PORT}/{self.DATABASE_NAME}"
) )
# MinIO / S3 # MinIO / S3
MINIO_ENDPOINT: str = "minio.storage.svc.cluster.local:9000" MINIO_ENDPOINT: str = "minio.storage.svc.cluster.local:9000"
MINIO_ACCESS_KEY: str = "minioadmin" MINIO_ACCESS_KEY: str = "minioadmin"
@ -37,9 +38,6 @@ class Settings(BaseSettings):
MINIO_BUCKET_RAW: str = "hadith-raw-data" MINIO_BUCKET_RAW: str = "hadith-raw-data"
MINIO_BUCKET_PROCESSED: str = "hadith-processed" MINIO_BUCKET_PROCESSED: str = "hadith-processed"
MINIO_SECURE: bool = False MINIO_SECURE: bool = False
# APIs
SUNNAH_API_KEY: Optional[str] = None
SUNNAH_BASE_URL: str = "https://api.sunnah.com/v1" SUNNAH_BASE_URL: str = "https://api.sunnah.com/v1"
HADITH_ONE_API_KEY: Optional[str] = "$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK" HADITH_ONE_API_KEY: Optional[str] = "$2y$10$nTJnyX3WUDoGmjKrKqSmbecANVsQWKyffmtp9fxmsQwR15DEv4mK"
# HadithAPI.com # HadithAPI.com

View File

@ -2,6 +2,7 @@
Database repository for hadith data operations Database repository for hadith data operations
""" """
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
import json
from uuid import UUID from uuid import UUID
import structlog import structlog
from sqlalchemy import create_engine, text, select, insert, update from sqlalchemy import create_engine, text, select, insert, update
@ -14,31 +15,37 @@ logger = structlog.get_logger()
class HadithRepository: class HadithRepository:
"""Repository for hadith database operations""" """Repository for hadith database operations"""
def __init__(self, database_url: Optional[str] = None): def __init__(self, database_url: Optional[str] = None):
self.database_url = database_url or settings.DATABASE_URL self.database_url = database_url or settings.DATABASE_URL
self.engine = create_engine(self.database_url, pool_pre_ping=True) self.engine = create_engine(self.database_url, pool_pre_ping=True)
self.SessionLocal = sessionmaker(bind=self.engine) self.SessionLocal = sessionmaker(bind=self.engine)
@staticmethod
def _coerce_uuid(value: Any) -> UUID:
if isinstance(value, UUID):
return value
return UUID(str(value))
def get_session(self) -> Session: def get_session(self) -> Session:
"""Get database session""" """Get database session"""
return self.SessionLocal() return self.SessionLocal()
# ===== Collections ===== # ===== Collections =====
def get_collection_by_abbreviation(self, abbr: str) -> Optional[Dict[str, Any]]: def get_collection_by_abbreviation(self, abbr: str) -> Optional[Dict[str, Any]]:
"""Get collection by abbreviation""" """Get collection by abbreviation"""
with self.get_session() as session: with self.get_session() as session:
query = text(""" query = text("""
SELECT * FROM collections SELECT * FROM collections
WHERE abbreviation = :abbr WHERE abbreviation = :abbr
""") """)
result = session.execute(query, {"abbr": abbr}).fetchone() result = session.execute(query, {"abbr": abbr}).fetchone()
if result: if result:
return dict(result._mapping) return dict(result._mapping)
return None return None
def get_all_collections(self) -> List[Dict[str, Any]]: def get_all_collections(self) -> List[Dict[str, Any]]:
"""Get all collections""" """Get all collections"""
with self.get_session() as session: with self.get_session() as session:
@ -68,34 +75,35 @@ class HadithRepository:
metadata: Optional[Dict] = None metadata: Optional[Dict] = None
) -> UUID: ) -> UUID:
"""Insert or update a book""" """Insert or update a book"""
metadata_json = json.dumps(metadata or {})
with self.get_session() as session: with self.get_session() as session:
query = text(""" query = text("""
INSERT INTO books (collection_id, book_number, name_english, name_arabic, metadata) INSERT INTO books (collection_id, book_number, name_english, name_arabic, metadata)
VALUES (:collection_id, :book_number, :name_english, :name_arabic, :metadata) VALUES (:collection_id, :book_number, :name_english, :name_arabic, :metadata)
ON CONFLICT (collection_id, book_number) ON CONFLICT (collection_id, book_number)
DO UPDATE SET DO UPDATE SET
name_english = EXCLUDED.name_english, name_english = EXCLUDED.name_english,
name_arabic = EXCLUDED.name_arabic, name_arabic = EXCLUDED.name_arabic,
metadata = EXCLUDED.metadata metadata = EXCLUDED.metadata
RETURNING id RETURNING id
""") """)
result = session.execute(query, { result = session.execute(query, {
"collection_id": str(collection_id), "collection_id": str(collection_id),
"book_number": book_number, "book_number": book_number,
"name_english": name_english, "name_english": name_english,
"name_arabic": name_arabic, "name_arabic": name_arabic,
"metadata": metadata or {} "metadata": metadata_json
}) })
session.commit() session.commit()
return UUID(result.fetchone()[0]) return self._coerce_uuid(result.fetchone()[0])
def get_book(self, collection_id: UUID, book_number: int) -> Optional[Dict[str, Any]]: def get_book(self, collection_id: UUID, book_number: int) -> Optional[Dict[str, Any]]:
"""Get book by collection and book number""" """Get book by collection and book number"""
with self.get_session() as session: with self.get_session() as session:
query = text(""" query = text("""
SELECT * FROM books SELECT * FROM books
WHERE collection_id = :collection_id AND book_number = :book_number WHERE collection_id = :collection_id AND book_number = :book_number
""") """)
result = session.execute(query, { result = session.execute(query, {
@ -125,6 +133,7 @@ class HadithRepository:
source_metadata: Optional[Dict] = None source_metadata: Optional[Dict] = None
) -> UUID: ) -> UUID:
"""Insert or update a hadith""" """Insert or update a hadith"""
with self.get_session() as session: with self.get_session() as session:
query = text(""" query = text("""
INSERT INTO hadiths ( INSERT INTO hadiths (
@ -152,7 +161,8 @@ class HadithRepository:
updated_at = NOW() updated_at = NOW()
RETURNING id RETURNING id
""") """)
metadata_json = json.dumps(source_metadata or {})
result = session.execute(query, { result = session.execute(query, {
"collection_id": str(collection_id), "collection_id": str(collection_id),
"book_id": str(book_id) if book_id else None, "book_id": str(book_id) if book_id else None,
@ -165,12 +175,12 @@ class HadithRepository:
"chapter_name": chapter_name, "chapter_name": chapter_name,
"source_id": source_id, "source_id": source_id,
"source_url": source_url, "source_url": source_url,
"source_metadata": source_metadata or {} "source_metadata": metadata_json
}) })
session.commit() session.commit()
return UUID(result.fetchone()[0]) return self._coerce_uuid(result.fetchone()[0])
def get_hadiths_without_embeddings( def get_hadiths_without_embeddings(
self, self,
limit: int = 100, limit: int = 100,
@ -180,8 +190,8 @@ class HadithRepository:
with self.get_session() as session: with self.get_session() as session:
if collection_id: if collection_id:
query = text(""" query = text("""
SELECT * FROM hadiths SELECT * FROM hadiths
WHERE embedding_generated = FALSE WHERE embedding_generated = FALSE
AND collection_id = :collection_id AND collection_id = :collection_id
ORDER BY created_at ASC ORDER BY created_at ASC
LIMIT :limit LIMIT :limit
@ -200,22 +210,26 @@ class HadithRepository:
result = session.execute(query, {"limit": limit}).fetchall() result = session.execute(query, {"limit": limit}).fetchall()
return [dict(row._mapping) for row in result] return [dict(row._mapping) for row in result]
def mark_embedding_generated(self, hadith_id: UUID, version: str = "v1"): def mark_embedding_generated(self, hadith_id: UUID, version: str = "v1"):
"""Mark hadith as having embedding generated""" """Mark hadith as having embedding generated"""
with self.get_session() as session: with self.get_session() as session:
# Prepare the update query
query = text(""" query = text("""
UPDATE hadiths UPDATE hadiths
SET embedding_generated = TRUE, SET embedding_generated = TRUE,
embedding_version = :version, embedding_version = :version,
updated_at = NOW() updated_at = NOW()
WHERE id = :id WHERE id = :id
""") """)
# Pre-serialize parameters (keeping consistent with other methods that
# serialize payloads/configs before execution)
params = {"id": str(hadith_id), "version": version}
session.execute(query, {"id": str(hadith_id), "version": version}) session.execute(query, {"id": str(hadith_id), "version": version})
session.commit() session.commit()
# ===== Ingestion Jobs ===== # ===== Ingestion Jobs =====
def create_ingestion_job( def create_ingestion_job(
self, self,
job_name: str, job_name: str,
@ -230,15 +244,16 @@ class HadithRepository:
VALUES (:job_name, :job_type, :source_name, :config, 'running', NOW()) VALUES (:job_name, :job_type, :source_name, :config, 'running', NOW())
RETURNING id RETURNING id
""") """)
# serialize config as JSON for storage
result = session.execute(query, { result = session.execute(query, {
"job_name": job_name, "job_name": job_name,
"job_type": job_type, "job_type": job_type,
"source_name": source_name, "source_name": source_name,
"config": config or {} "config": json.dumps(config or {})
}) })
session.commit() session.commit()
job_id = result.fetchone()[0]
return UUID(result.fetchone()[0]) return job_id if isinstance(job_id, UUID) else UUID(str(job_id))
def update_job_progress( def update_job_progress(
self, self,
@ -297,7 +312,7 @@ class HadithRepository:
"error_message": error_message "error_message": error_message
}) })
session.commit() session.commit()
def add_processing_log( def add_processing_log(
self, self,
job_id: UUID, job_id: UUID,
@ -311,16 +326,17 @@ class HadithRepository:
INSERT INTO processing_logs (job_id, log_level, message, details) INSERT INTO processing_logs (job_id, log_level, message, details)
VALUES (:job_id, :level, :message, :details) VALUES (:job_id, :level, :message, :details)
""") """)
details_json = json.dumps(details or {})
session.execute(query, { session.execute(query, {
"job_id": str(job_id), "job_id": str(job_id),
"level": level, "level": level,
"message": message, "message": message,
"details": details or {} "details": details_json
}) })
session.commit() session.commit()
# ===== Statistics ===== # ===== Statistics =====
def get_collection_stats(self, collection_id: UUID) -> Dict[str, Any]: def get_collection_stats(self, collection_id: UUID) -> Dict[str, Any]:
"""Get statistics for a collection""" """Get statistics for a collection"""
with self.get_session() as session: with self.get_session() as session:
@ -328,7 +344,7 @@ class HadithRepository:
SELECT * FROM get_collection_statistics(:collection_id) SELECT * FROM get_collection_statistics(:collection_id)
""") """)
result = session.execute(query, {"collection_id": str(collection_id)}).fetchone() result = session.execute(query, {"collection_id": str(collection_id)}).fetchone()
if result: if result:
return dict(result._mapping) return dict(result._mapping)
return {} return {}

View File

@ -7,9 +7,9 @@ from typing import Optional, Dict, Any
from uuid import UUID from uuid import UUID
import structlog import structlog
from config.settings import settings from config.settings import settings
from api_clients.hadithapi_client import HadithAPIClient from src.api_clients.hadithapi_client import HadithAPIClient
from database.repository import HadithRepository from src.database.repository import HadithRepository
from processors.text_cleaner import ArabicTextProcessor, TextCleaner from src.processors.text_cleaner import ArabicTextProcessor, TextCleaner
# Configure structured logging # Configure structured logging
structlog.configure( structlog.configure(
@ -35,14 +35,15 @@ logger = structlog.get_logger()
BOOK_SLUG_MAPPING = { BOOK_SLUG_MAPPING = {
'sahih-bukhari': 'bukhari', 'sahih-bukhari': 'bukhari',
'sahih-muslim': 'muslim', 'sahih-muslim': 'muslim',
'sunan-abu-dawood': 'abudawud', 'abu-dawood': 'abudawud',
'jami-at-tirmidhi': 'tirmidhi', 'al-tirmidhi': 'tirmidhi',
'sunan-an-nasai': 'nasai', 'sunan-nasai': 'nasai',
'sunan-ibn-e-majah': 'ibnmajah', 'ibn-e-majah': 'ibnmajah',
'muwatta-imam-malik': 'malik', 'muwatta-imam-malik': 'malik',
'musnad-ahmad': 'ahmad', 'musnad-ahmad': 'ahmad',
'sunan-ad-darimi': 'darimi', 'sunan-ad-darimi': 'darimi',
'mishkat-al-masabih': 'mishkat' 'mishkat': 'mishkat',
'al-silsila-sahiha': 'al-silsila-sahiha'
} }
@ -66,12 +67,12 @@ class HadithAPIIngestionService:
# Get books from API # Get books from API
api_books = self.api_client.get_books() api_books = self.api_client.get_books()
book_mapping = {} book_mapping = {}
# print(BOOK_SLUG_MAPPING)
for api_book in api_books: for api_book in api_books:
book_slug = api_book.get('bookSlug') book_slug = api_book.get('bookSlug')
# print(book_slug)
# Map to our collection abbreviation # Map to our collection abbreviation
collection_abbr = BOOK_SLUG_MAPPING.get(book_slug) collection_abbr = BOOK_SLUG_MAPPING.get(book_slug)
@ -82,7 +83,6 @@ class HadithAPIIngestionService:
book_name=api_book.get('bookName') book_name=api_book.get('bookName')
) )
continue continue
# Get or verify collection exists in database # Get or verify collection exists in database
collection = self.repo.get_collection_by_abbreviation(collection_abbr) collection = self.repo.get_collection_by_abbreviation(collection_abbr)
@ -93,8 +93,10 @@ class HadithAPIIngestionService:
book_slug=book_slug book_slug=book_slug
) )
continue continue
collection_id = UUID(collection['id']) collection_id = collection['id']
if not isinstance(collection_id, UUID):
collection_id = UUID(str(collection_id))
book_mapping[book_slug] = { book_mapping[book_slug] = {
'collection_id': collection_id, 'collection_id': collection_id,
'book_id': api_book.get('id'), 'book_id': api_book.get('id'),
@ -305,7 +307,7 @@ class HadithAPIIngestionService:
if not arabic_text: if not arabic_text:
raise ValueError("Missing Arabic text") raise ValueError("Missing Arabic text")
# passed logger.warning("Arabic text extracted and validated", hadith_number=hadith_number)
# Clean texts # Clean texts
arabic_text = self.text_cleaner.clean_text(arabic_text) arabic_text = self.text_cleaner.clean_text(arabic_text)
if english_text: if english_text:
@ -319,14 +321,15 @@ class HadithAPIIngestionService:
# Get or create chapter (book in our schema) # Get or create chapter (book in our schema)
book_id = None book_id = None
chapter_name = None chapter_name = None
# logger.warning("Processing chapter data####", chapter_data.get('positional_args', {}).get('id'))
# logger.info("Processing chapter data####2####", chapter_data.get('id'))
if chapter_data: if chapter_data:
chapter_id = chapter_data.get('id') chapter_id = chapter_data.get('id')
chapter_number = chapter_data.get('chapterNumber') chapter_number = chapter_data.get('chapterNumber')
chapter_name_en = chapter_data.get('chapterEnglish') chapter_name_en = chapter_data.get('chapterEnglish')
chapter_name_ar = chapter_data.get('chapterArabic') chapter_name_ar = chapter_data.get('chapterArabic')
chapter_name = chapter_name_en chapter_name = chapter_name_en
# print(chapter_number, chapter_name)
if chapter_number: if chapter_number:
try: try:
chapter_number = int(chapter_number) chapter_number = int(chapter_number)
@ -335,7 +338,8 @@ class HadithAPIIngestionService:
# Get or create book (chapter in HadithAPI = book in our schema) # Get or create book (chapter in HadithAPI = book in our schema)
existing_book = self.repo.get_book(collection_id, chapter_number) existing_book = self.repo.get_book(collection_id, chapter_number)
# logger.warning("EXISTING BOOK : ", existing_book)
# logger.warning("Fetched or created book", collection_id=collection_id, chapter_number=chapter_number, chapter_name=chapter_name)
if not existing_book: if not existing_book:
book_id = self.repo.upsert_book( book_id = self.repo.upsert_book(
collection_id=collection_id, collection_id=collection_id,
@ -345,8 +349,12 @@ class HadithAPIIngestionService:
metadata=chapter_data metadata=chapter_data
) )
else: else:
book_id = UUID(existing_book['id']) # print(existing_book['id'])
existing_id = existing_book['id']
if isinstance(existing_id, str):
existing_id = UUID(existing_id)
book_id = existing_id
# Build source metadata # Build source metadata
source_metadata = { source_metadata = {
'api_id': hadith_data.get('id'), 'api_id': hadith_data.get('id'),
@ -356,7 +364,7 @@ class HadithAPIIngestionService:
'chapterId': hadith_data.get('chapterId'), 'chapterId': hadith_data.get('chapterId'),
'chapter': chapter_data 'chapter': chapter_data
} }
# logger.warning("Hadith metadata built", source_metadata)
# Store hadith # Store hadith
hadith_id = self.repo.upsert_hadith( hadith_id = self.repo.upsert_hadith(
collection_id=collection_id, collection_id=collection_id,

View File

@ -0,0 +1,109 @@
#!/usr/bin/env python3
"""
Test script for main_hadithapi.py
"""
import sys
import os
sys.path.insert(0, '.')
from src.main_hadithapi import HadithAPIIngestionService
def test_main_hadithapi():
"""Test the main HadithAPI ingestion service"""
print("=== Testing HadithAPI Ingestion Service ===\n")
try:
# Initialize the service
print("1. Initializing HadithAPIIngestionService...")
service = HadithAPIIngestionService()
print("✓ Service initialized successfully\n")
# Test 1: List available books
print("2. Testing book synchronization...")
book_mapping = service.sync_books_from_api()
print(f"✓ Found {len(book_mapping)} mapped books")
for book_slug, info in list(book_mapping.items())[:3]: # Show first 3
print(f" - {book_slug}: {info['book_name']} ({info['hadiths_count']} hadiths)")
print()
# Test 2: Test ingestion with limit
print("3. Testing limited ingestion (10 hadiths from Sahih Bukhari)...")
stats = service.ingest_collection(
book_slug='sahih-bukhari',
limit=10
)
print(f"✓ Ingestion completed with stats:")
print(f" Processed: {stats['processed']}")
print(f" Failed: {stats['failed']}")
print(f" Skipped: {stats['skipped']}\n")
# Test 3: List books functionality
print("4. Testing book listing...")
print("\n=== Available Books ===\n")
for book_slug, info in book_mapping.items():
print(f"Book Slug: {book_slug}")
print(f" Name: {info['book_name']}")
print(f" Hadiths: {info['hadiths_count']}")
print(f" Chapters: {info['chapters_count']}")
print()
# Clean up
service.close()
print("=== All Tests Passed! ===")
return True
except Exception as e:
print(f"✗ Test failed with error: {e}")
import traceback
traceback.print_exc()
return False
def test_command_line_args():
"""Test command line argument parsing"""
print("=== Testing Command Line Arguments ===\n")
# We'll simulate command line arguments
import argparse
from src.main_hadithapi import main
# Test --list-books argument
print("1. Testing --list-books argument...")
original_argv = sys.argv.copy()
try:
sys.argv = ['main_hadithapi.py', '--list-books']
# We won't actually run main() as it would exit, but we can check the parsing
parser = argparse.ArgumentParser(description="Ingest hadiths from HadithAPI.com")
parser.add_argument("--book-slug", help="Book slug (e.g., sahih-bukhari)")
parser.add_argument("--limit", type=int, help="Limit number of hadiths to ingest")
parser.add_argument("--list-books", action="store_true", help="List available books and exit")
args = parser.parse_args(['--list-books'])
print(f"✓ Argument parsing successful: list_books={args.list_books}")
# Test book-slug argument
args = parser.parse_args(['--book-slug', 'sahih-bukhari', '--limit', '5'])
print(f"✓ Argument parsing successful: book_slug={args.book_slug}, limit={args.limit}")
print("✓ Command line argument parsing works correctly\n")
return True
except Exception as e:
print(f"✗ Argument parsing failed: {e}")
return False
finally:
sys.argv = original_argv
if __name__ == "__main__":
print("Starting tests for main_hadithapi.py...\n")
# Test command line arguments
if not test_command_line_args():
sys.exit(1)
# Test main functionality
if not test_main_hadithapi():
sys.exit(1)
print("\n🎉 All tests passed successfully!")
sys.exit(0)