import os
import time
import logging
from typing import List, Optional, Dict, Any
from openai import OpenAI
import requests
from tenacity import retry, stop_after_attempt, wait_exponential
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class VedayaAPIClient:
"""Production-ready Vedaya API client"""
def __init__(self, api_key: Optional[str] = None):
self.base_url = "https://vedaya-kge.fly.dev"
self.api_key = api_key or os.getenv("VEDAYA_API_KEY", "sk-dummy")
# Initialize OpenAI client for queries
self.openai_client = OpenAI(
api_key=self.api_key,
base_url=f"{self.base_url}/v1"
)
# Headers for direct API calls
self.headers = self._get_headers()
def _get_headers(self) -> Dict[str, str]:
"""Get headers with optional authentication"""
headers = {"Content-Type": "application/json"}
if self.api_key and self.api_key not in ["sk-dummy", "sk-mock-dummy-key"]:
headers["Authorization"] = f"Bearer {self.api_key}"
return headers
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def upload_documents(self, texts: List[str], sources: Optional[List[str]] = None) -> bool:
"""Upload documents with retry logic"""
if not sources:
sources = [f"doc_{i}.txt" for i in range(len(texts))]
logger.info(f"Uploading {len(texts)} documents")
response = requests.post(
f"{self.base_url}/documents/texts",
headers=self.headers,
json={"texts": texts, "file_sources": sources}
)
if response.status_code == 200:
logger.info("Documents uploaded successfully")
return True
logger.error(f"Upload failed: {response.status_code} - {response.text}")
return False
def wait_for_processing(self, timeout: int = 60) -> bool:
"""Wait for document processing with timeout"""
logger.info("Waiting for document processing...")
end_time = time.time() + timeout
while time.time() < end_time:
try:
status = requests.get(
f"{self.base_url}/documents/pipeline_status",
headers=self.headers
).json()
if not status.get('busy', False):
logger.info("Processing complete")
return True
logger.debug(f"Status: {status.get('latest_message', 'Processing...')}")
time.sleep(2)
except Exception as e:
logger.warning(f"Status check error: {e}")
time.sleep(5)
logger.error("Processing timeout")
return False
def query(self,
question: str,
mode: str = "vedaya-hybrid",
temperature: float = 0.7,
max_tokens: int = 500) -> str:
"""Query with fallback to HTTP"""
logger.info(f"Querying: {question[:50]}... (mode: {mode})")
try:
# Try OpenAI SDK first
response = self.openai_client.chat.completions.create(
model=mode,
messages=[{"role": "user", "content": question}],
temperature=temperature,
max_tokens=max_tokens
)
return response.choices[0].message.content
except Exception as e:
logger.warning(f"OpenAI SDK failed: {e}, falling back to HTTP")
# Fallback to direct HTTP
response = requests.post(
f"{self.base_url}/v1/chat/completions",
headers=self.headers,
json={
"model": mode,
"messages": [{"role": "user", "content": question}],
"temperature": temperature,
"max_tokens": max_tokens
}
)
if response.status_code == 200:
return response.json()['choices'][0]['message']['content']
logger.error(f"Query failed: {response.status_code}")
raise Exception(f"Query failed: {response.text}")
def complete_workflow(self, texts: List[str], query: str) -> str:
"""Complete workflow: upload, process, query"""
# Upload documents
if not self.upload_documents(texts):
raise Exception("Document upload failed")
# Wait for processing
if not self.wait_for_processing():
raise Exception("Document processing timeout")
# Query the knowledge base
return self.query(query)
# Usage example
if __name__ == "__main__":
client = VedayaAPIClient()
# Upload and query
documents = [
"Machine learning is a subset of artificial intelligence.",
"Deep learning uses neural networks with multiple layers."
]
answer = client.complete_workflow(
texts=documents,
query="What is machine learning?"
)
print(f"Answer: {answer}")