update ingestion muslim

This commit is contained in:
salahangal 2025-11-17 17:57:49 +01:00
parent 7fdcb1417d
commit 4a14036b01
14 changed files with 623 additions and 401 deletions

BIN
.DS_Store vendored

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,31 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: generate-embeddings-
namespace: ml
spec:
entrypoint: generate
serviceAccountName: argo-workflow
arguments:
parameters:
- name: batch-size
value: "32"
templates:
- name: generate
container:
image: hadith-ingestion:latest
command: [python, /app/src/embeddings/generator.py]
args: ["--batch-size={{workflow.parameters.batch-size}}"]
env:
- name: DATABASE_HOST
value: "pg.betelgeusebytes.io"
- name: DATABASE_PASSWORD
valueFrom:
secretKeyRef:
name: hadith-db-secret
key: password
resources:
requests: {cpu: 2, memory: 4Gi}
limits: {cpu: 4, memory: 8Gi}

View File

@ -59,8 +59,8 @@ spec:
container: container:
image: hadith-ingestion:latest image: hadith-ingestion:latest
imagePullPolicy: IfNotPresent imagePullPolicy: Always
command: [python, /app/src/main.py] command: [python, /app/src/main_hadithapi.py]
args: args:
- "{{inputs.parameters.collection}}" - "{{inputs.parameters.collection}}"
- "--limit={{inputs.parameters.limit}}" - "--limit={{inputs.parameters.limit}}"
@ -122,8 +122,8 @@ spec:
container: container:
image: hadith-embeddings:latest image: hadith-embeddings:latest
imagePullPolicy: IfNotPresent imagePullPolicy: Always
command: [python, /app/generate_embeddings.py] command: [python, /app/src/embeddings/generator.py]
args: args:
- "--collection={{inputs.parameters.collection}}" - "--collection={{inputs.parameters.collection}}"
- "--batch-size=32" - "--batch-size=32"
@ -161,7 +161,7 @@ spec:
container: container:
image: hadith-qdrant-indexer:latest image: hadith-qdrant-indexer:latest
imagePullPolicy: IfNotPresent imagePullPolicy: Always
command: [python, /app/index_qdrant.py] command: [python, /app/index_qdrant.py]
args: args:
- "--collection={{inputs.parameters.collection}}" - "--collection={{inputs.parameters.collection}}"

View File

@ -1,4 +1,4 @@
find . -type f -name "*.txt" -o -name "production" -o -name "*.py" -o -name "*.yaml" -o -name "Dockerfile" -o -name "*.sh" -o -name "*.env" -o -name "*.md" ! -name "*.xls" ! -name "*.xlsx"| while read file; do find . -type f -name "*.txt" -o -name "production" -o -name "*.py" -o -name "*.yaml" -o -name "Dockerfile" -o -name "*.sh" -o -name "*.env" ! -name "*.md" ! -name "*.xls" ! -name "*.xlsx"| while read file; do
echo "=== $file ===" >> combined.txt echo "=== $file ===" >> combined.txt
cat "$file" >> combined.txt cat "$file" >> combined.txt
echo "" >> combined.txt echo "" >> combined.txt

File diff suppressed because it is too large Load Diff

View File

@ -40,3 +40,9 @@ pytest==7.4.3
pytest-asyncio==0.21.1 pytest-asyncio==0.21.1
pytest-cov==4.1.0 pytest-cov==4.1.0
faker==21.0.0 faker==21.0.0
httpx==0.25.2
qdrant-client==1.7.0
tqdm==4.66.1
asyncpg==0.29.0

View File

@ -8,14 +8,14 @@ echo "=== Starting Full HadithAPI Ingestion ==="
# Book slug to collection abbreviation mapping # Book slug to collection abbreviation mapping
# Books to ingest (in order) # Books to ingest (in order)
BOOKS=( BOOKS=(
"sahih-bukhari" # "sahih-bukhari"
"sahih-muslim" "sahih-muslim"
"abu-dawood" # "abu-dawood"
"al-tirmidhi" # "al-tirmidhi"
"ibn-e-majah" # "ibn-e-majah"
"sunan-nasai" # "sunan-nasai"
"musnad-ahmad" # "musnad-ahmad"
"al-silsila-sahiha" # "al-silsila-sahiha"
) )
for BOOK in "${BOOKS[@]}"; do for BOOK in "${BOOKS[@]}"; do
@ -23,7 +23,7 @@ for BOOK in "${BOOKS[@]}"; do
echo "Ingesting: $BOOK" echo "Ingesting: $BOOK"
echo "=========================================" echo "========================================="
argo submit -n argo argo/workflows/ingest-hadithapi.yaml \ argo submit -n ml argo/workflows/ingest-hadithapi.yaml \
--parameter book-slug=$BOOK \ --parameter book-slug=$BOOK \
--parameter limit=0 \ --parameter limit=0 \
--wait \ --wait \

View File

@ -264,6 +264,10 @@ class HadithAPIClient(BaseAPIClient):
# Process each chapter # Process each chapter
for chapter in chapters: for chapter in chapters:
# logger.warning("Processing chapter", chapter=chapter)
if book_slug == 'sahih-muslim':
chapter_id = chapter.get('chapterNumber')
else:
chapter_id = chapter.get('id') chapter_id = chapter.get('id')
chapter_number = chapter.get('chapterNumber') chapter_number = chapter.get('chapterNumber')

View File

@ -0,0 +1,148 @@
# Update: src/embeddings/generator.py
"""
Embedding generation service for hadith texts
"""
import asyncio
import httpx
from typing import List, Tuple, Optional
import psycopg2
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import structlog
from tqdm import tqdm
import sys
import argparse
from config.settings import settings
logger = structlog.get_logger()
class EmbeddingGenerator:
def __init__(self, database_url: str, tei_url: str, qdrant_url: str, batch_size: int = 32):
self.database_url = database_url
self.tei_url = tei_url
self.qdrant_url = qdrant_url
self.batch_size = batch_size
self.http_client = httpx.AsyncClient(timeout=60.0)
self.qdrant = QdrantClient(url=qdrant_url)
async def generate_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings using TEI"""
response = await self.http_client.post(
f"{self.tei_url}/embed",
json={"inputs": texts}
)
response.raise_for_status()
return response.json()
def create_collection(self, name: str = "hadith_embeddings"):
"""Create Qdrant collection"""
try:
self.qdrant.get_collection(name)
except:
self.qdrant.create_collection(
collection_name=name,
vectors_config=VectorParams(size=1024, distance=Distance.COSINE)
)
async def process_batch(self, conn, hadiths: List[Tuple], collection: str):
"""Process batch: generate embeddings & store"""
texts = [f"{h[1]} {h[2] or ''}" for h in hadiths] # arabic + english
embeddings = await self.generate_embeddings_batch(texts)
points = [
PointStruct(
id=str(h[0]),
vector=emb,
payload={"hadith_id": str(h[0]), "collection_id": str(h[4])}
)
for h, emb in zip(hadiths, embeddings)
]
self.qdrant.upsert(collection_name=collection, points=points)
# Mark completed
cursor = conn.cursor()
ids = [str(h[0]) for h in hadiths]
cursor.execute(
"UPDATE hadiths SET embedding_generated = TRUE, embedding_version = 'v1' WHERE id = ANY(%s)",
(ids,)
)
conn.commit()
cursor.close()
return len(points)
async def generate_all(self, collection: str = "hadith_embeddings"):
"""Generate embeddings for all hadiths"""
self.create_collection(collection)
conn = psycopg2.connect(self.database_url)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM hadiths WHERE embedding_generated = FALSE")
total = cursor.fetchone()[0]
cursor.close()
if total == 0:
print("All hadiths already have embeddings!")
return
print(f"Generating embeddings for {total} hadiths...")
processed = 0
with tqdm(total=total) as pbar:
while True:
cursor = conn.cursor()
cursor.execute("""
SELECT id, arabic_text, english_text, urdu_text, collection_id
FROM hadiths
WHERE embedding_generated = FALSE
LIMIT 1000
""")
hadiths = cursor.fetchall()
cursor.close()
if not hadiths:
break
for i in range(0, len(hadiths), self.batch_size):
batch = hadiths[i:i+self.batch_size]
try:
count = await self.process_batch(conn, batch, collection)
processed += count
pbar.update(count)
except Exception as e:
logger.error("batch_failed", error=str(e))
conn.close()
print(f"\nCompleted! Generated {processed} embeddings.")
async def close(self):
await self.http_client.aclose()
async def main():
parser = argparse.ArgumentParser()
parser.add_argument("--batch-size", type=int, default=32)
args = parser.parse_args()
gen = EmbeddingGenerator(
database_url=settings.DATABASE_URL,
tei_url="http://tei.ml.svc.cluster.local",
qdrant_url="http://qdrant.vector.svc.cluster.local:6333",
batch_size=args.batch_size
)
try:
await gen.generate_all()
return 0
except Exception as e:
logger.error("generation_failed", error=str(e))
return 1
finally:
await gen.close()
if __name__ == "__main__":
sys.exit(asyncio.run(main()))

View File

@ -2,10 +2,16 @@
Main ingestion script for fetching hadiths from HadithAPI.com Main ingestion script for fetching hadiths from HadithAPI.com
""" """
import sys import sys
from pathlib import Path
import argparse import argparse
from typing import Optional, Dict, Any from typing import Optional, Dict, Any
from uuid import UUID from uuid import UUID
import structlog import structlog
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from config.settings import settings from config.settings import settings
from src.api_clients.hadithapi_client import HadithAPIClient from src.api_clients.hadithapi_client import HadithAPIClient
from src.database.repository import HadithRepository from src.database.repository import HadithRepository

View File

@ -19,7 +19,7 @@ echo "Building Docker image..."
# 3. Submit test workflow (10 hadiths) # 3. Submit test workflow (10 hadiths)
echo "Submitting test workflow..." echo "Submitting test workflow..."
argo submit -n ml argo/workflows/ingest-hadithapi.yaml \ argo submit -n ml argo/workflows/ingest-hadithapi.yaml \
--parameter book-slug=sahih-bukhari \ --parameter book-slug=sahih-muslim \
--parameter limit=10 \ --parameter limit=10 \
--wait \ --wait \
--log --log

View File

@ -19,7 +19,7 @@ python src/main_hadithapi.py --list-books
# 4. Test ingestion (limited to 10 hadiths) # 4. Test ingestion (limited to 10 hadiths)
echo -e "\nRunning test ingestion (10 hadiths from Sahih Bukhari)..." echo -e "\nRunning test ingestion (10 hadiths from Sahih Bukhari)..."
python src/main_hadithapi.py --book-slug sahih-bukhari --limit 10 python src/main_hadithapi.py --book-slug sahih-muslim --limit 10
# 5. Verify data # 5. Verify data
echo -e "\nVerifying ingested data..." echo -e "\nVerifying ingested data..."