ObServML REST API Reference

This comprehensive guide covers the ObServML REST API, framework architecture, and the dynamic configuration-to-function mapping system that powers the platform.

Table of Contents

  1. API Overview
  2. Complete REST API Reference
  3. Framework Architecture
  4. Function-Config Mapping
  5. Configuration-Driven Workflows
  6. Advanced Usage Patterns
  7. Error Handling
  8. Client Libraries

API Overview

The ObServML REST API provides a comprehensive interface for managing machine learning experiments through a microservices architecture. Built with FastAPI, it enables:

  • Experiment Lifecycle Management: Create, train, save, load, and delete experiments
  • Real-time Predictions: Make predictions on streaming data via RabbitMQ
  • Visualization: Access plots and EDA figures in JSON format
  • MLOps Integration: Automatic model versioning and tracking with MLflow
  • Configuration-Driven Operations: Dynamic function execution based on YAML configurations

Base URL

http://localhost:8010

Architecture Components

  • ExperimentHub: Central orchestrator managing all experiments
  • RabbitMQ: Message broker for data streaming and communication
  • MLflow: Model registry and experiment tracking
  • FastAPI: REST API framework with automatic documentation

Complete REST API Reference

The following table presents all available REST API endpoints as documented in the research paper and current implementation:

Endpoint HTTP Method Description
/ GET Default path. Returns a "Hello World" message
/health GET Check the health of all plugins (MLflow, RabbitMQ)
/available_experiments GET Get available experiment types and their configurations
/flush/{queue} POST Removes all data from the specified RabbitMQ queue
/create_experiment/{name}/{experiment_type} POST Create a new experiment of the specified type
/{name}/train POST Starts training for the specified experiment, setting up the training configuration. Requires data in the Rabbit queue and a configuration file
/{name}/load POST Loads an experiment by its name
/{name}/load/{run_id} POST Loads an experiment using the specified run ID from MLflow
/{name}/save POST Saves the current experiment to the MLflow server
/{name}/predict POST Initiates a prediction call for the specified experiment. Requires a request and data in the RabbitMQ queue {name}
/{name}/retrain POST Starts retraining an experiment as requested
/{name}/stop_training POST Stop the training process for an experiment without removing it
/{name}/delete POST Remove an experiment from memory
/{name}/plot/{plot_name} GET Returns a plot (figure) for the specified experiment and plot name in JSON
/{name}/plot_eda/{plot_name} GET Returns the EDA plot (figure) for the specified experiment and plot name in JSON
/{name}/train_data GET Returns training data for the specified experiment in JSON
/{name}/cfg GET Returns the configuration details of the specified experiment
/experiments GET Lists all experiment names and available figure names
/{name}/run_id GET Returns the run ID (MLflow) of the specified experiment
/{name}/exp_id GET Returns the experiment ID (MLflow) of the specified experiment

Framework Architecture

ObServML uses a configuration-driven architecture where YAML configuration files dynamically control experiment behavior. The core principle is:

Each YAML configuration key corresponds to a method in the Experiment class

Core Components

1. Experiment Class (Abstract Base)

The Experiment class serves as the foundation for all experiment types:

class Experiment(Protocol):
    """Abstract base class for all experiments"""

    # Core attributes
    model: any = None
    data: pd.DataFrame = None
    cfg: dict = None
    _model_registry: dict[str, type] = dict()
    _report_registry: dict[str, go.Figure] = dict()
    _eda_registry: dict[str, go.Figure] = dict()

    # MLflow integration
    run_id: str = None
    experiment_id: str = None
    mlflow_uri: str = None

2. Dynamic Function Execution

The run() method implements the core configuration-to-function mapping:

def run(self, data: pd.DataFrame) -> None:
    """Dynamically execute functions based on configuration keys"""

    # Extract function names from config (excluding load_object)
    funcs = list(self.cfg.keys())
    funcs.remove("load_object")

    self.data = data
    for k in funcs:
        if k == "setup":
            self.setup(data)
        else:
            # Dynamic function call using getattr
            getattr(self, k)()

3. Experiment Types

ObServML provides four main experiment types, each inheriting from the base Experiment class:

  • TimeSeriesAnalysis: For forecasting and temporal anomaly detection
  • FaultDetection: For unsupervised anomaly detection
  • FaultIsolation: For supervised classification and root cause analysis
  • ProcessMining: For workflow and sequence analysis

Function-Config Mapping

The configuration-to-function mapping system enables dynamic experiment execution without code changes. Here's how it works:

Core Framework Functions

Required Functions (Must be implemented by experiment types)

Function Config Key Purpose Parameters
setup() setup Data preparation and experiment initialization data: pd.DataFrame, **kwargs
create_model() create_model Model training and configuration **kwargs
predict() predict Prediction on new data data: pd.DataFrame, **kwargs
_score() N/A Calculate model metrics y, y_hat

Optional Functions (Can be included in config)

Function Config Key Purpose Parameters
eda() eda Exploratory data analysis None
retrain() retrain Model retraining logic None
format_data() format Custom data formatting data, format: dict

Built-in Functions (Always available)

Function Purpose Usage
save() Save experiment to MLflow Called via API endpoint
load() Load experiment from MLflow Called via API endpoint
plot_model() Retrieve model visualizations Called via API endpoint
export() Export reports as HTML Called internally
join_data() Combine training and new data Called during retraining

Configuration Structure

Every experiment configuration follows this structure:

load_object:
  module: framework.{ExperimentType}    # Python module path
  name: {ExperimentType}Experiment      # Class name

setup:                                  # Maps to setup() function
  datetime_column: "timestamp"
  target: "target_variable"
  # Additional setup parameters...

eda:                                    # Maps to eda() function
  # EDA configuration (empty = use defaults)

create_model:                           # Maps to create_model() function
  model: "model_name"
  params:
    # Model-specific parameters

# Additional custom functions can be added
custom_preprocessing:                   # Maps to custom_preprocessing() function
  param1: value1
  param2: value2

Parameter Passing

Parameters from the configuration are passed to functions using Python's **kwargs mechanism:

# Configuration
setup:
  datetime_column: "timestamp"
  target: "value"
  predict_window: 1000

# Function call (automatically generated)
self.setup(data, datetime_column="timestamp", target="value", predict_window=1000)

Configuration-Driven Workflows

1. Training Workflow

graph TD
    A[POST /{name}/train] --> B[Parse Configuration]
    B --> C[Create Experiment Instance]
    C --> D[Load Data from RabbitMQ]
    D --> E[Execute run() method]
    E --> F[Dynamic Function Calls]
    F --> G[setup()]
    F --> H[eda()]
    F --> I[create_model()]
    I --> J[Save to MLflow]
    J --> K[Return Response]

2. Prediction Workflow

graph TD
    A[POST /{name}/predict] --> B[Load Experiment]
    B --> C[Get Data from RabbitMQ]
    C --> D[Call predict() method]
    D --> E[Update Visualizations]
    E --> F[Return Results]

3. Configuration Processing

When a training request is received:

  1. Configuration Parsing: YAML config is loaded and validated
  2. Dynamic Import: Experiment class is loaded using load_object specification
  3. Instance Creation: Experiment instance is created with config
  4. Function Mapping: Config keys are mapped to class methods
  5. Sequential Execution: Functions are called in order based on config structure

Advanced Usage Patterns

1. Multi-Model Training

NOTE: YOu must have sent a data already on rabbit before the Curl can properly start the training process. Train multiple models with different configurations:

# Train Isolation Forest
curl -X POST "http://localhost:8010/pump_iforest/train" \
  -H "Content-Type: application/json" \
  -d '{
    "load_object": {
      "module": "framework.FaultDetection",
      "name": "FaultDetectionExperiment"
    },
    "setup": {"datetime_column": "ds"},
    "eda": {},
    "create_model": {
      "model": "iforest",
      "params": {"n_estimators": 100, "contamination": "auto"}
    }
  }'

# Train PCA model
curl -X POST "http://localhost:8010/pump_pca/train" \
  -H "Content-Type: application/json" \
  -d '{
    "load_object": {
      "module": "framework.FaultDetection", 
      "name": "FaultDetectionExperiment"
    },
    "setup": {"datetime_column": "ds"},
    "create_model": {
      "model": "pca",
      "params": {"n_components": 0.95, "alpha": 0.05}
    }
  }'

2. Automated Retraining Pipeline

Configure automatic retraining based on performance metrics:

setup:
  datetime_column: "ds"
  target: "fault_type"
  retrain:
    retrain_window: 5000
    metric: "Accuracy"
    metric_threshold: 0.85
    higher_better: true

create_model:
  model: "dt"
  params: {}

3. Custom Function Integration

Add custom preprocessing functions:

load_object:
  module: framework.FaultIsolation
  name: FaultIsolationExperiment

setup:
  datetime_column: "ds"
  target: "output"

custom_preprocessing:          # Custom function
  normalize: true
  remove_outliers: true
  outlier_threshold: 3

eda:

create_model:
  model: "rf"
  params:
    n_estimators: 100

4. Batch Processing

Process multiple experiments in sequence:

#!/bin/bash
# Batch training script

experiments=("pump_iforest" "pump_pca" "pump_dbscan")
models=("iforest" "pca" "dbscan")

for i in "${!experiments[@]}"; do
  echo "Training ${experiments[$i]} with ${models[$i]}"

  curl -X POST "http://localhost:8010/${experiments[$i]}/train" \
    -H "Content-Type: application/json" \
    -d @configs/pump/${models[$i]}.yaml

  # Wait for training to complete
  sleep 30

  # Make initial prediction
  curl -X POST "http://localhost:8010/${experiments[$i]}/predict"
done

Common Error Scenarios

1. Configuration Errors

{
  "detail": "Configuration validation failed: missing required field 'target'"
}

Solution: Ensure all required configuration fields are present.

2. Experiment Not Found

{
  "detail": "Experiment 'nonexistent_model' not found"
}

Solution: Check experiment name and ensure it has been created/loaded.

3. Model Training Failures

{
  "detail": "Model training failed: insufficient data"
}

Solution: Verify data is available in RabbitMQ queue and meets model requirements.

4. Plugin Health Issues

{
  "mlops": {
    "healthy": false,
    "details": {
      "error": "Connection refused to MLflow server"
    }
  }
}

Solution: Check MLflow server status and connectivity.

Error Recovery Patterns

def robust_api_call(url, max_retries=3, backoff_factor=2):
    """Make API calls with retry logic"""
    for attempt in range(max_retries):
        try:
            response = requests.post(url, timeout=30)
            response.raise_for_status()
            return response
        except requests.exceptions.RequestException as e:
            if attempt == max_retries - 1:
                raise e
            wait_time = backoff_factor ** attempt
            print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s...")
            time.sleep(wait_time)

Client Libraries

ObServML Python Client

The ObServML Python Client is available as observml_client.py in the project root directory. This comprehensive client combines the functionality from api_commands.py with enhanced error handling, type hints, and modern Python practices.

Key Features: - Enhanced Error Handling: Automatic retry logic with exponential backoff - Type Hints: Full type annotations for better IDE support - RabbitMQ Integration: Automatic data posting to message queues - File Format Support: Load data from .xlsx, .csv, .json, .pkl files - Backward Compatibility: Legacy functions for existing code - Comprehensive Logging: Built-in logging for debugging - Monitoring Support: Continuous experiment monitoring capabilities

Installation:

# Import the client
from observml_client import ObServMLClient

# Or use legacy functions for backward compatibility
from observml_client import train, predict, load_experiment

Python Client Usage

import requests
import json
from typing import Dict, Any, Optional

class ObServMLClient:
    """Comprehensive Python client for ObServML API"""

    def __init__(self, base_url: str = "http://localhost:8010"):
        self.base_url = base_url.rstrip('/')

    def health_check(self) -> Dict[str, Any]:
        """Check system health"""
        response = requests.get(f"{self.base_url}/health")
        response.raise_for_status()
        return response.json()

    def get_available_experiments(self) -> Dict[str, Any]:
        """Get available experiment types"""
        response = requests.get(f"{self.base_url}/available_experiments")
        response.raise_for_status()
        return response.json()

    def create_experiment(self, name: str, experiment_type: str, config: Dict[str, Any]) -> Dict[str, Any]:
        """Create a new experiment"""
        response = requests.post(
            f"{self.base_url}/create_experiment/{name}/{experiment_type}",
            json=config
        )
        response.raise_for_status()
        return response.json()

    def train_experiment(self, name: str, config: Dict[str, Any]) -> str:
        """Train an experiment"""
        response = requests.post(f"{self.base_url}/{name}/train", json=config)
        response.raise_for_status()
        return response.text

    def predict(self, name: str) -> str:
        """Make predictions"""
        response = requests.post(f"{self.base_url}/{name}/predict")
        response.raise_for_status()
        return response.text

    def load_experiment(self, name: str, run_id: Optional[str] = None) -> str:
        """Load an experiment"""
        if run_id:
            url = f"{self.base_url}/{name}/load/{run_id}"
        else:
            url = f"{self.base_url}/{name}/load"

        response = requests.post(url)
        response.raise_for_status()
        return response.text

    def save_experiment(self, name: str) -> str:
        """Save an experiment"""
        response = requests.post(f"{self.base_url}/{name}/save")
        response.raise_for_status()
        return response.text

    def get_plot(self, name: str, plot_name: str) -> Dict[str, Any]:
        """Get a plot from an experiment"""
        response = requests.get(f"{self.base_url}/{name}/plot/{plot_name}")
        response.raise_for_status()
        return response.json()

    def get_eda_plot(self, name: str, plot_name: str) -> Dict[str, Any]:
        """Get an EDA plot from an experiment"""
        response = requests.get(f"{self.base_url}/{name}/plot_eda/{plot_name}")
        response.raise_for_status()
        return response.json()

    def get_configuration(self, name: str) -> Dict[str, Any]:
        """Get experiment configuration"""
        response = requests.get(f"{self.base_url}/{name}/cfg")
        response.raise_for_status()
        return response.json()

    def get_experiments(self) -> Dict[str, Any]:
        """Get all experiments and their available plots"""
        response = requests.get(f"{self.base_url}/experiments")
        response.raise_for_status()
        return response.json()

    def flush_queue(self, queue_name: str) -> str:
        """Flush a RabbitMQ queue"""
        response = requests.post(f"{self.base_url}/flush/{queue_name}")
        response.raise_for_status()
        return response.text

    def retrain_experiment(self, name: str) -> str:
        """Retrain an experiment"""
        response = requests.post(f"{self.base_url}/{name}/retrain")
        response.raise_for_status()
        return response.text

    def delete_experiment(self, name: str) -> Dict[str, Any]:
        """Delete an experiment from memory"""
        response = requests.post(f"{self.base_url}/{name}/delete")
        response.raise_for_status()
        return response.json()

# Usage example
client = ObServMLClient()

# Check system health
health = client.health_check()
print(f"System health: {json.dumps(health, indent=2)}")

# Create and train a fault detection experiment
config = {
    "load_object": {
        "module": "framework.FaultDetection",
        "name": "FaultDetectionExperiment"
    },
    "setup": {
        "datetime_column": "ds"
    },
    "eda": {},
    "create_model": {
        "model": "iforest",
        "params": {
            "n_estimators": 100,
            "contamination": "auto"
        }
    }
}

result = client.train_experiment("pump_anomaly", config)
print(f"Training result: {result}")

# Make predictions
prediction = client.predict("pump_anomaly")
print(f"Prediction result: {prediction}")

# Get anomaly plot
plot = client.get_plot("pump_anomaly", "outliers")
print(f"Plot keys: {list(plot.keys())}")