import os import requests import json import asyncio from pathlib import Path import time # Configuration OPENWEBUI_URL = "http://192.168.1.2:31028" OLLAMA_BASE_URL = "http://192.168.1.2:30068" # Ollama instance on TrueNAS API_KEY = "sk-609c9c4e941b487389a12b675742e288" # Placeholder USERNAME = "rushabh.techie@gmail.com" PASSWORD = "Rushabh%1" SOURCE_DIRECTORY = "/mnt/storage.rushg.me/data/z5/rushg.me" KNOWLEDGE_BASE_NAME = "rushg-me-kb" session = requests.Session() def login_and_get_cookie(): """ Logs into OpenWebUI and returns the session cookie. """ print("Attempting to log in to OpenWebUI...") login_url = f"{OPENWEBUI_URL}/api/auth/login" # Common login endpoint headers = {"Content-Type": "application/json"} payload = {"email": USERNAME, "password": PASSWORD} try: response = session.post(login_url, headers=headers, json=payload, timeout=60) response.raise_for_status() print("Login successful.") return response.cookies except requests.exceptions.RequestException as e: print(f"Login failed: {e}") if response is not None: print(f"Response content: {response.text}") raise def upload_file(file_path): """ Uploads a single file to OpenWebUI using the session cookie. """ upload_url = f"{OPENWEBUI_URL}/api/v1/files" try: with open(file_path, "rb") as f: files = {"file": (os.path.basename(file_path), f, "application/octet-stream")} response = session.post(upload_url, files=files, timeout=300) response.raise_for_status() file_info = response.json() print(f"Successfully uploaded {file_path}: {file_info.get('filename')} (ID: {file_info.get('id')})") return file_info except requests.exceptions.RequestException as e: print(f"Failed to upload {file_path}: {e}") if response is not None: print(f"Response content: {response.text}") return None def get_uploaded_files(): """ Retrieves a list of all files uploaded to OpenWebUI. """ files_url = f"{OPENWEBUI_URL}/api/v1/files" try: response = session.get(files_url, timeout=60) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"Failed to retrieve uploaded files: {e}") if response is not None: print(f"Response content: {response.text}") return [] def create_knowledge_base(kb_name): """ Creates a new knowledge base in OpenWebUI. """ kb_create_url = f"{OPENWEBUI_URL}/api/v1/knowledge" headers = {"Content-Type": "application/json"} payload = {"name": kb_name} try: response = session.post(kb_create_url, headers=headers, json=payload, timeout=60) response.raise_for_status() kb_info = response.json() print(f"Successfully created Knowledge Base '{kb_name}': (ID: {kb_info.get('id')})") return kb_info except requests.exceptions.RequestException as e: print(f"Failed to create Knowledge Base '{kb_name}': {e}") if response is not None: print(f"Response content: {response.text}") return None def add_file_to_knowledge_base(kb_id, file_id): """ Adds a file to a specified knowledge base. """ add_file_url = f"{OPENWEBUI_URL}/api/v1/knowledge/{kb_id}/file/add" headers = {"Content-Type": "application/json"} payload = {"file_id": file_id} try: response = session.post(add_file_url, headers=headers, json=payload, timeout=60) response.raise_for_status() print(f"Successfully added file {file_id} to KB {kb_id}") return True except requests.exceptions.RequestException as e: print(f"Failed to add file {file_id} to KB {kb_id}: {e}") if response is not None: print(f"Response content: {response.text}") return False def get_knowledge_bases(): """ Retrieves a list of all knowledge bases. """ kb_list_url = f"{OPENWEBUI_URL}/api/v1/knowledge" try: response = session.get(kb_list_url, timeout=60) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"Failed to retrieve knowledge bases: {e}") if response is not None: print(f"Response content: {response.text}") return [] def main(): # Login to get session cookie try: login_and_get_cookie() except Exception: print("Exiting due to login failure.") return # Check for existing knowledge base existing_kbs = get_knowledge_bases() rushg_me_kb_id = None for kb in existing_kbs: if kb.get("name") == KNOWLEDGE_BASE_NAME: rushg_me_kb_id = kb.get("id") print(f"Found existing Knowledge Base '{KNOWLEDGE_BASE_NAME}' (ID: {rushg_me_kb_id}).") break if not rushg_me_kb_id: # Create knowledge base kb_info = create_knowledge_base(KNOWLEDGE_BASE_NAME) if not kb_info: print("Failed to create Knowledge Base. Exiting.") return rushg_me_kb_id = kb_info.get("id") print(f"Using Knowledge Base ID: {rushg_me_kb_id}") # Collect all file paths files_to_upload = [] for root, _, files in os.walk(SOURCE_DIRECTORY): for file in files: file_path = os.path.join(root, file) files_to_upload.append(file_path) # Upload files and store their IDs uploaded_file_ids = [] uploaded_file_names = set() # Get already uploaded files to avoid re-uploading existing_uploaded_files = get_uploaded_files() for f in existing_uploaded_files: uploaded_file_names.add(f.get("filename")) for file_path in files_to_upload: if os.path.basename(file_path) in uploaded_file_names: print(f"Skipping {file_path}, already uploaded.") continue file_info = upload_file(file_path) if file_info: uploaded_file_ids.append(file_info["id"]) # Small delay to avoid overwhelming the server time.sleep(0.1) # Attach uploaded files to the knowledge base print(f"Attempting to attach {len(uploaded_file_ids)} new files to KB {rushg_me_kb_id}...") for file_id in uploaded_file_ids: add_file_to_knowledge_base(rushg_me_kb_id, file_id) time.sleep(0.1) # Small delay print("\nOpenWebUI RAG setup complete. Knowledge Base populated.") print(f"Knowledge Base '{KNOWLEDGE_BASE_NAME}' ID: {rushg_me_kb_id}") print("You can now use this ID in your chat completion requests with OpenWebUI.") if __name__ == "__main__": main()