Spaces:
Sleeping
Sleeping
| # Path: /astoria_rag_hub/scripts/pg_vector_manual_ingest.py | |
| # Filename: pg_vector_manual_ingest.py | |
| import os | |
| import sys | |
| import json | |
| import logging | |
| import math | |
| import requests | |
| from dotenv import load_dotenv | |
| # Add the root directory to the Python path | |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from app.rag_components.data_loader import load_documents_from_directory | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| def manual_ingest(): | |
| """ | |
| Loads, splits, embeds, and manually uploads documents using the requests library. | |
| """ | |
| load_dotenv() | |
| # --- 1. Load Documents from files --- | |
| logging.info("Loading documents from directory...") | |
| documents = load_documents_from_directory('data/maritime_history') | |
| if not documents: | |
| logging.warning("No documents found. Exiting.") | |
| return | |
| # --- 2. Split documents into smaller chunks --- | |
| logging.info("Splitting documents...") | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100) | |
| chunks = text_splitter.split_documents(documents) | |
| logging.info(f"Split {len(documents)} documents into {len(chunks)} chunks.") | |
| # --- 3. Create Embeddings for each chunk --- | |
| logging.info("Initializing embedding model...") | |
| model_name = "sentence-transformers/all-MiniLM-L6-v2" | |
| embeddings_model = HuggingFaceEmbeddings(model_name=model_name) | |
| logging.info("Creating embeddings for all chunks...") | |
| all_embeddings = embeddings_model.embed_documents([chunk.page_content for chunk in chunks]) | |
| logging.info("Embeddings created.") | |
| # --- 4. Prepare data payloads for upload --- | |
| payloads = [] | |
| for i, chunk in enumerate(chunks): | |
| payloads.append({ | |
| "content": chunk.page_content, | |
| "metadata": chunk.metadata, | |
| "embedding": all_embeddings[i] | |
| }) | |
| # --- 5. Upload data in batches using requests --- | |
| logging.info("Preparing to upload data in batches...") | |
| url = os.getenv("SUPABASE_URL") | |
| key = os.getenv("SUPABASE_KEY") | |
| api_url = f"{url}/rest/v1/documents" # Target the 'documents' table | |
| headers = { | |
| "apikey": key, | |
| "Authorization": f"Bearer {key}", | |
| "Content-Type": "application/json", | |
| "Prefer": "return=representation" | |
| } | |
| batch_size = 50 | |
| num_batches = math.ceil(len(payloads) / batch_size) | |
| for i in range(0, len(payloads), batch_size): | |
| batch_num = (i // batch_size) + 1 | |
| batch = payloads[i:i + batch_size] | |
| logging.info(f"Uploading batch {batch_num}/{num_batches}...") | |
| try: | |
| response = requests.post(api_url, headers=headers, data=json.dumps(batch)) | |
| response.raise_for_status() # Raises an error for bad status codes (4xx or 5xx) | |
| logging.info(f"Batch {batch_num} uploaded successfully. Status: {response.status_code}") | |
| except requests.exceptions.RequestException as e: | |
| logging.error(f"Failed to upload batch {batch_num}. Error: {e}") | |
| break # Stop on failure | |
| logging.info("✅ Manual ingestion process completed!") | |
| if __name__ == "__main__": | |
| manual_ingest() | |
| #end-of-file | |