This guide provides best practices and patterns for integrating the Vedaya API into your applications, covering authentication, error handling, performance optimization, and production deployment.

Working with Optional Authentication

The API currently accepts dummy keys or works without authentication:
import requests
from openai import OpenAI

# Option 1: No authentication (works for most operations)
response = requests.get("https://vedaya-kge.fly.dev/documents/pipeline_status")

# Option 2: Dummy key with OpenAI SDK
client = OpenAI(
    api_key="sk-dummy",  # Works without real key
    base_url="https://vedaya-kge.fly.dev/v1"
)

# Option 3: Optional authentication pattern
API_KEY = "sk-your-api-key"  # Can be None or dummy
headers = {"Content-Type": "application/json"}
if API_KEY and API_KEY != "sk-mock-dummy-key":
    headers["Authorization"] = f"Bearer {API_KEY}"

Best Practices for Production

When authentication becomes required in production:
  1. Use environment variables - Store keys securely
  2. Implement fallback patterns - Handle both authenticated and non-authenticated modes
  3. Different keys for environments - Separate development and production keys
  4. Monitor usage - Track API usage even without authentication
Example with environment variables:
import os
from openai import OpenAI

# Fallback pattern for optional auth
API_KEY = os.getenv("VEDAYA_API_KEY", "sk-dummy")

client = OpenAI(
    api_key=API_KEY,
    base_url="https://vedaya-kge.fly.dev/v1"
)

Error Handling

The Vedaya API uses standard HTTP status codes and returns detailed error messages to help troubleshoot issues.

Common Status Codes

  • 200: Success
  • 400: Bad Request - Check your request parameters
  • 401: Unauthorized - Invalid or missing API key
  • 404: Not Found - Resource doesn’t exist
  • 429: Too Many Requests - Rate limit exceeded
  • 500: Internal Server Error - Something went wrong on our side

Implementing Robust Error Handling

import requests

def make_api_call(endpoint, method='GET', data=None, files=None):
    url = f"https://vedaya-kge.fly.dev{endpoint}"
    headers = {
        'Authorization': f'Bearer {API_KEY}'
    }
    
    try:
        if method == 'GET':
            response = requests.get(url, headers=headers, params=data)
        elif method == 'POST':
            response = requests.post(url, headers=headers, json=data, files=files)
        
        # Check if the request was successful
        response.raise_for_status()
        return response.json()
    
    except requests.exceptions.HTTPError as http_err:
        # Handle HTTP errors
        if response.status_code == 401:
            print("Authentication error. Check your API key.")
        elif response.status_code == 429:
            print("Rate limit exceeded. Please retry after some time.")
        else:
            print(f"HTTP error occurred: {http_err}")
            print(f"Response: {response.text}")
        return None
    
    except requests.exceptions.ConnectionError:
        print("Connection error. Please check your internet connection.")
        return None
    
    except requests.exceptions.Timeout:
        print("Request timed out. Please try again.")
        return None
    
    except requests.exceptions.RequestException as err:
        print(f"An error occurred: {err}")
        return None

Rate Limiting and Throttling

Vedaya implements rate limiting to ensure fair usage of the API. When you exceed the rate limit, you’ll receive a 429 Too Many Requests response.

Best Practices for Handling Rate Limits

  1. Implement exponential backoff - Gradually increase wait time between retries
  2. Cache responses when possible - Reduce the number of API calls
  3. Batch requests - Process multiple documents in a single request
  4. Monitor your usage - Keep track of your API usage to avoid hitting limits
import time
import random

def call_api_with_backoff(endpoint, max_retries=5):
    retries = 0
    while retries < max_retries:
        response = requests.get(f"https://vedaya-kge.fly.dev{endpoint}", headers=headers)
        
        if response.status_code == 200:
            return response.json()
        
        if response.status_code == 429:
            # Apply exponential backoff with jitter
            sleep_time = (2 ** retries) + random.random()
            print(f"Rate limit exceeded. Retrying in {sleep_time:.2f} seconds...")
            time.sleep(sleep_time)
            retries += 1
        else:
            # Handle other errors
            print(f"Error: {response.status_code}")
            print(response.text)
            break
    
    return None

Document Processing Workflow

The Vedaya API processes documents quickly (typically in seconds). Here’s the correct workflow:

Upload and Process Documents

import requests
import time

API_BASE_URL = "https://vedaya-kge.fly.dev"
API_KEY = "sk-your-api-key"  # Optional

headers = {"Content-Type": "application/json"}
if API_KEY and API_KEY != "sk-mock-dummy-key":
    headers["Authorization"] = f"Bearer {API_KEY}"

def process_documents(texts):
    # Step 1: Upload text documents (primary method)
    response = requests.post(
        f"{API_BASE_URL}/documents/texts",
        headers=headers,
        json={
            "texts": texts,
            "file_sources": [f"doc_{i}.txt" for i in range(len(texts))]
        }
    )
    
    if response.status_code != 200:
        print(f"Upload failed: {response.status_code}")
        return False
    
    print("✅ Documents uploaded successfully")
    
    # Step 2: Poll for processing status (usually completes in seconds)
    for i in range(30):  # Max 60 seconds polling
        status = requests.get(
            f"{API_BASE_URL}/documents/pipeline_status",
            headers=headers
        ).json()
        
        if not status.get('busy', False):
            print("✅ Processing complete!")
            return True
        
        time.sleep(2)
        print(f"  Status: {status.get('latest_message', 'Processing...')}")
    
    return False

# Example usage
documents = [
    "Your first document content here.",
    "Your second document content here."
]
process_documents(documents)

Performance Optimization

Optimizing Document Processing

  1. Optimize file sizes - Compress images, remove unnecessary content
  2. Chunk appropriately - Adjust chunk size based on your content type
  3. Batch process documents - Process multiple documents in parallel
  4. Use appropriate PDF extractors - Choose the right tool for your document type

Optimizing Queries

  1. Be specific with queries - More specific queries yield better results
  2. Adjust top_k parameter - Only retrieve as many chunks as needed
  3. Cache common queries - Store results for frequently asked questions
  4. Use vector filtering - When available, filter by metadata to improve relevance

Webhook Integration

For production applications, consider using webhooks to receive notifications when asynchronous processes complete:
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/webhook/document-processed', methods=['POST'])
def document_processed_webhook():
    data = request.json
    
    # Verify webhook signature (implement your own security check)
    if not verify_webhook_signature(request):
        return jsonify({"status": "error", "message": "Invalid signature"}), 401
    
    # Process the webhook data
    file_id = data['file_id']
    status = data['status']
    
    if status == 'completed':
        # Document processing completed successfully
        # Trigger your business logic here
        pass
    elif status == 'failed':
        # Document processing failed
        # Implement error handling
        error_message = data.get('error', 'Unknown error')
        print(f"Processing failed for file {file_id}: {error_message}")
    
    return jsonify({"status": "success"})

if __name__ == '__main__':
    app.run(debug=True, port=5000)

Testing and Debugging

Testing Strategies

  1. Create a test suite - Automated tests for different API operations
  2. Use mock data - Test with consistent sample documents
  3. Test error scenarios - Ensure your app handles API errors gracefully
  4. Perform integration testing - Test the complete workflow

Debugging Tools

  1. API request logging - Log all API requests and responses
  2. Response inspection - Examine response bodies for error details
  3. Status polling - Check process status for asynchronous operations

Production Deployment Checklist

Before deploying to production:
  1. Security audit - Ensure API keys are securely stored
  2. Error handling - Implement comprehensive error handling
  3. Rate limit management - Add backoff and retry logic
  4. Monitoring - Set up alerts for API failures
  5. Backups - Plan for data backup and recovery
  6. Scalability - Design for increased load
  7. Testing - Perform thorough testing of all API integrations
The primary method for querying is through the OpenAI-compatible interface:
from openai import OpenAI

def create_vedaya_client():
    """Create a Vedaya client using OpenAI SDK"""
    return OpenAI(
        api_key="sk-dummy",  # Works without real authentication
        base_url="https://vedaya-kge.fly.dev/v1"
    )

def query_with_mode(question, mode="vedaya-hybrid"):
    """Query using different RAG modes"""
    client = create_vedaya_client()
    
    response = client.chat.completions.create(
        model=mode,  # vedaya-hybrid, vedaya-naive, vedaya-local, vedaya-global
        messages=[{"role": "user", "content": question}],
        temperature=0.7,
        max_tokens=500
    )
    
    return response.choices[0].message.content

# Examples of different modes
general_answer = query_with_mode("What are the main topics?", "vedaya-hybrid")
entity_answer = query_with_mode("Find all companies mentioned", "vedaya-local")
relationship_answer = query_with_mode("How are concepts connected?", "vedaya-global")

Integration Best Practices by Framework

Node.js/Express with OpenAI SDK

const OpenAI = require('openai');
require('dotenv').config();

// Create Vedaya client
const vedaya = new OpenAI({
  apiKey: process.env.VEDAYA_API_KEY || 'sk-dummy',
  baseURL: 'https://vedaya-kge.fly.dev/v1'
});

// Query function
async function queryKnowledgeBase(question, mode = 'vedaya-hybrid') {
  try {
    const response = await vedaya.chat.completions.create({
      model: mode,
      messages: [{ role: 'user', content: question }],
      temperature: 0.7,
      max_tokens: 500
    });
    
    return response.choices[0].message.content;
  } catch (error) {
    console.error('Query error:', error);
    
    // Fallback to HTTP request
    const axios = require('axios');
    const httpResponse = await axios.post(
      'https://vedaya-kge.fly.dev/v1/chat/completions',
      {
        model: mode,
        messages: [{ role: 'user', content: question }],
        temperature: 0.7,
        max_tokens: 500
      }
    );
    
    return httpResponse.data.choices[0].message.content;
  }
}

module.exports = { vedaya, queryKnowledgeBase };

Python/FastAPI with Complete Workflow

import os
from fastapi import FastAPI, HTTPException
from openai import OpenAI
import requests
import time
from typing import List, Optional

app = FastAPI()

class VedayaClient:
    def __init__(self, api_key: Optional[str] = None):
        self.base_url = "https://vedaya-kge.fly.dev"
        self.api_key = api_key or "sk-dummy"
        
        # OpenAI client for queries
        self.openai_client = OpenAI(
            api_key=self.api_key,
            base_url=f"{self.base_url}/v1"
        )
        
        # Headers for direct API calls
        self.headers = {"Content-Type": "application/json"}
        if self.api_key and self.api_key != "sk-dummy":
            self.headers["Authorization"] = f"Bearer {self.api_key}"
    
    def upload_documents(self, texts: List[str]) -> bool:
        """Upload text documents"""
        response = requests.post(
            f"{self.base_url}/documents/texts",
            headers=self.headers,
            json={
                "texts": texts,
                "file_sources": [f"doc_{i}.txt" for i in range(len(texts))]
            }
        )
        return response.status_code == 200
    
    def wait_for_processing(self, max_wait: int = 60) -> bool:
        """Wait for document processing to complete"""
        for i in range(max_wait // 2):
            status = requests.get(
                f"{self.base_url}/documents/pipeline_status",
                headers=self.headers
            ).json()
            
            if not status.get('busy', False):
                return True
            
            time.sleep(2)
        
        return False
    
    def query(self, question: str, mode: str = "vedaya-hybrid") -> str:
        """Query the knowledge base"""
        try:
            response = self.openai_client.chat.completions.create(
                model=mode,
                messages=[{"role": "user", "content": question}],
                temperature=0.7,
                max_tokens=500
            )
            return response.choices[0].message.content
        except Exception as e:
            # Fallback to direct HTTP
            response = requests.post(
                f"{self.base_url}/v1/chat/completions",
                headers=self.headers,
                json={
                    "model": mode,
                    "messages": [{"role": "user", "content": question}],
                    "temperature": 0.7,
                    "max_tokens": 500
                }
            )
            if response.status_code == 200:
                return response.json()['choices'][0]['message']['content']
            raise HTTPException(status_code=response.status_code, detail=response.text)

# Initialize client
vedaya = VedayaClient(os.getenv("VEDAYA_API_KEY"))

@app.post("/upload")
async def upload_documents(texts: List[str]):
    """Upload documents endpoint"""
    if not vedaya.upload_documents(texts):
        raise HTTPException(status_code=500, detail="Upload failed")
    
    if not vedaya.wait_for_processing():
        raise HTTPException(status_code=500, detail="Processing timeout")
    
    return {"status": "success", "message": "Documents processed"}

@app.get("/query")
async def query_knowledge(question: str, mode: str = "vedaya-hybrid"):
    """Query endpoint"""
    return {"answer": vedaya.query(question, mode)}

Complete Working Example

Here’s a production-ready implementation with all best practices:
import os
import time
import logging
from typing import List, Optional, Dict, Any
from openai import OpenAI
import requests
from tenacity import retry, stop_after_attempt, wait_exponential

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class VedayaAPIClient:
    """Production-ready Vedaya API client"""
    
    def __init__(self, api_key: Optional[str] = None):
        self.base_url = "https://vedaya-kge.fly.dev"
        self.api_key = api_key or os.getenv("VEDAYA_API_KEY", "sk-dummy")
        
        # Initialize OpenAI client for queries
        self.openai_client = OpenAI(
            api_key=self.api_key,
            base_url=f"{self.base_url}/v1"
        )
        
        # Headers for direct API calls
        self.headers = self._get_headers()
    
    def _get_headers(self) -> Dict[str, str]:
        """Get headers with optional authentication"""
        headers = {"Content-Type": "application/json"}
        if self.api_key and self.api_key not in ["sk-dummy", "sk-mock-dummy-key"]:
            headers["Authorization"] = f"Bearer {self.api_key}"
        return headers
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
    def upload_documents(self, texts: List[str], sources: Optional[List[str]] = None) -> bool:
        """Upload documents with retry logic"""
        if not sources:
            sources = [f"doc_{i}.txt" for i in range(len(texts))]
        
        logger.info(f"Uploading {len(texts)} documents")
        response = requests.post(
            f"{self.base_url}/documents/texts",
            headers=self.headers,
            json={"texts": texts, "file_sources": sources}
        )
        
        if response.status_code == 200:
            logger.info("Documents uploaded successfully")
            return True
        
        logger.error(f"Upload failed: {response.status_code} - {response.text}")
        return False
    
    def wait_for_processing(self, timeout: int = 60) -> bool:
        """Wait for document processing with timeout"""
        logger.info("Waiting for document processing...")
        end_time = time.time() + timeout
        
        while time.time() < end_time:
            try:
                status = requests.get(
                    f"{self.base_url}/documents/pipeline_status",
                    headers=self.headers
                ).json()
                
                if not status.get('busy', False):
                    logger.info("Processing complete")
                    return True
                
                logger.debug(f"Status: {status.get('latest_message', 'Processing...')}")
                time.sleep(2)
            
            except Exception as e:
                logger.warning(f"Status check error: {e}")
                time.sleep(5)
        
        logger.error("Processing timeout")
        return False
    
    def query(self, 
             question: str, 
             mode: str = "vedaya-hybrid",
             temperature: float = 0.7,
             max_tokens: int = 500) -> str:
        """Query with fallback to HTTP"""
        logger.info(f"Querying: {question[:50]}... (mode: {mode})")
        
        try:
            # Try OpenAI SDK first
            response = self.openai_client.chat.completions.create(
                model=mode,
                messages=[{"role": "user", "content": question}],
                temperature=temperature,
                max_tokens=max_tokens
            )
            return response.choices[0].message.content
        
        except Exception as e:
            logger.warning(f"OpenAI SDK failed: {e}, falling back to HTTP")
            
            # Fallback to direct HTTP
            response = requests.post(
                f"{self.base_url}/v1/chat/completions",
                headers=self.headers,
                json={
                    "model": mode,
                    "messages": [{"role": "user", "content": question}],
                    "temperature": temperature,
                    "max_tokens": max_tokens
                }
            )
            
            if response.status_code == 200:
                return response.json()['choices'][0]['message']['content']
            
            logger.error(f"Query failed: {response.status_code}")
            raise Exception(f"Query failed: {response.text}")
    
    def complete_workflow(self, texts: List[str], query: str) -> str:
        """Complete workflow: upload, process, query"""
        # Upload documents
        if not self.upload_documents(texts):
            raise Exception("Document upload failed")
        
        # Wait for processing
        if not self.wait_for_processing():
            raise Exception("Document processing timeout")
        
        # Query the knowledge base
        return self.query(query)

# Usage example
if __name__ == "__main__":
    client = VedayaAPIClient()
    
    # Upload and query
    documents = [
        "Machine learning is a subset of artificial intelligence.",
        "Deep learning uses neural networks with multiple layers."
    ]
    
    answer = client.complete_workflow(
        texts=documents,
        query="What is machine learning?"
    )
    
    print(f"Answer: {answer}")