ObServML REST API Reference
This comprehensive guide covers the ObServML REST API, framework architecture, and the dynamic configuration-to-function mapping system that powers the platform.
Table of Contents
- API Overview
- Complete REST API Reference
- Framework Architecture
- Function-Config Mapping
- Configuration-Driven Workflows
- Advanced Usage Patterns
- Error Handling
- Client Libraries
API Overview
The ObServML REST API provides a comprehensive interface for managing machine learning experiments through a microservices architecture. Built with FastAPI, it enables:
- Experiment Lifecycle Management: Create, train, save, load, and delete experiments
- Real-time Predictions: Make predictions on streaming data via RabbitMQ
- Visualization: Access plots and EDA figures in JSON format
- MLOps Integration: Automatic model versioning and tracking with MLflow
- Configuration-Driven Operations: Dynamic function execution based on YAML configurations
Base URL
http://localhost:8010
Architecture Components
- ExperimentHub: Central orchestrator managing all experiments
- RabbitMQ: Message broker for data streaming and communication
- MLflow: Model registry and experiment tracking
- FastAPI: REST API framework with automatic documentation
Complete REST API Reference
The following table presents all available REST API endpoints as documented in the research paper and current implementation:
| Endpoint | HTTP Method | Description |
|---|---|---|
/ |
GET | Default path. Returns a "Hello World" message |
/health |
GET | Check the health of all plugins (MLflow, RabbitMQ) |
/available_experiments |
GET | Get available experiment types and their configurations |
/flush/{queue} |
POST | Removes all data from the specified RabbitMQ queue |
/create_experiment/{name}/{experiment_type} |
POST | Create a new experiment of the specified type |
/{name}/train |
POST | Starts training for the specified experiment, setting up the training configuration. Requires data in the Rabbit queue and a configuration file |
/{name}/load |
POST | Loads an experiment by its name |
/{name}/load/{run_id} |
POST | Loads an experiment using the specified run ID from MLflow |
/{name}/save |
POST | Saves the current experiment to the MLflow server |
/{name}/predict |
POST | Initiates a prediction call for the specified experiment. Requires a request and data in the RabbitMQ queue {name} |
/{name}/retrain |
POST | Starts retraining an experiment as requested |
/{name}/stop_training |
POST | Stop the training process for an experiment without removing it |
/{name}/delete |
POST | Remove an experiment from memory |
/{name}/plot/{plot_name} |
GET | Returns a plot (figure) for the specified experiment and plot name in JSON |
/{name}/plot_eda/{plot_name} |
GET | Returns the EDA plot (figure) for the specified experiment and plot name in JSON |
/{name}/train_data |
GET | Returns training data for the specified experiment in JSON |
/{name}/cfg |
GET | Returns the configuration details of the specified experiment |
/experiments |
GET | Lists all experiment names and available figure names |
/{name}/run_id |
GET | Returns the run ID (MLflow) of the specified experiment |
/{name}/exp_id |
GET | Returns the experiment ID (MLflow) of the specified experiment |
Framework Architecture
ObServML uses a configuration-driven architecture where YAML configuration files dynamically control experiment behavior. The core principle is:
Each YAML configuration key corresponds to a method in the Experiment class
Core Components
1. Experiment Class (Abstract Base)
The Experiment class serves as the foundation for all experiment types:
class Experiment(Protocol):
"""Abstract base class for all experiments"""
# Core attributes
model: any = None
data: pd.DataFrame = None
cfg: dict = None
_model_registry: dict[str, type] = dict()
_report_registry: dict[str, go.Figure] = dict()
_eda_registry: dict[str, go.Figure] = dict()
# MLflow integration
run_id: str = None
experiment_id: str = None
mlflow_uri: str = None
2. Dynamic Function Execution
The run() method implements the core configuration-to-function mapping:
def run(self, data: pd.DataFrame) -> None:
"""Dynamically execute functions based on configuration keys"""
# Extract function names from config (excluding load_object)
funcs = list(self.cfg.keys())
funcs.remove("load_object")
self.data = data
for k in funcs:
if k == "setup":
self.setup(data)
else:
# Dynamic function call using getattr
getattr(self, k)()
3. Experiment Types
ObServML provides four main experiment types, each inheriting from the base Experiment class:
- TimeSeriesAnalysis: For forecasting and temporal anomaly detection
- FaultDetection: For unsupervised anomaly detection
- FaultIsolation: For supervised classification and root cause analysis
- ProcessMining: For workflow and sequence analysis
Function-Config Mapping
The configuration-to-function mapping system enables dynamic experiment execution without code changes. Here's how it works:
Core Framework Functions
Required Functions (Must be implemented by experiment types)
| Function | Config Key | Purpose | Parameters |
|---|---|---|---|
setup() |
setup |
Data preparation and experiment initialization | data: pd.DataFrame, **kwargs |
create_model() |
create_model |
Model training and configuration | **kwargs |
predict() |
predict |
Prediction on new data | data: pd.DataFrame, **kwargs |
_score() |
N/A | Calculate model metrics | y, y_hat |
Optional Functions (Can be included in config)
| Function | Config Key | Purpose | Parameters |
|---|---|---|---|
eda() |
eda |
Exploratory data analysis | None |
retrain() |
retrain |
Model retraining logic | None |
format_data() |
format |
Custom data formatting | data, format: dict |
Built-in Functions (Always available)
| Function | Purpose | Usage |
|---|---|---|
save() |
Save experiment to MLflow | Called via API endpoint |
load() |
Load experiment from MLflow | Called via API endpoint |
plot_model() |
Retrieve model visualizations | Called via API endpoint |
export() |
Export reports as HTML | Called internally |
join_data() |
Combine training and new data | Called during retraining |
Configuration Structure
Every experiment configuration follows this structure:
load_object:
module: framework.{ExperimentType} # Python module path
name: {ExperimentType}Experiment # Class name
setup: # Maps to setup() function
datetime_column: "timestamp"
target: "target_variable"
# Additional setup parameters...
eda: # Maps to eda() function
# EDA configuration (empty = use defaults)
create_model: # Maps to create_model() function
model: "model_name"
params:
# Model-specific parameters
# Additional custom functions can be added
custom_preprocessing: # Maps to custom_preprocessing() function
param1: value1
param2: value2
Parameter Passing
Parameters from the configuration are passed to functions using Python's **kwargs mechanism:
# Configuration
setup:
datetime_column: "timestamp"
target: "value"
predict_window: 1000
# Function call (automatically generated)
self.setup(data, datetime_column="timestamp", target="value", predict_window=1000)
Configuration-Driven Workflows
1. Training Workflow
graph TD
A[POST /{name}/train] --> B[Parse Configuration]
B --> C[Create Experiment Instance]
C --> D[Load Data from RabbitMQ]
D --> E[Execute run() method]
E --> F[Dynamic Function Calls]
F --> G[setup()]
F --> H[eda()]
F --> I[create_model()]
I --> J[Save to MLflow]
J --> K[Return Response]
2. Prediction Workflow
graph TD
A[POST /{name}/predict] --> B[Load Experiment]
B --> C[Get Data from RabbitMQ]
C --> D[Call predict() method]
D --> E[Update Visualizations]
E --> F[Return Results]
3. Configuration Processing
When a training request is received:
- Configuration Parsing: YAML config is loaded and validated
- Dynamic Import: Experiment class is loaded using
load_objectspecification - Instance Creation: Experiment instance is created with config
- Function Mapping: Config keys are mapped to class methods
- Sequential Execution: Functions are called in order based on config structure
Advanced Usage Patterns
1. Multi-Model Training
NOTE: YOu must have sent a data already on rabbit before the Curl can properly start the training process. Train multiple models with different configurations:
# Train Isolation Forest
curl -X POST "http://localhost:8010/pump_iforest/train" \
-H "Content-Type: application/json" \
-d '{
"load_object": {
"module": "framework.FaultDetection",
"name": "FaultDetectionExperiment"
},
"setup": {"datetime_column": "ds"},
"eda": {},
"create_model": {
"model": "iforest",
"params": {"n_estimators": 100, "contamination": "auto"}
}
}'
# Train PCA model
curl -X POST "http://localhost:8010/pump_pca/train" \
-H "Content-Type: application/json" \
-d '{
"load_object": {
"module": "framework.FaultDetection",
"name": "FaultDetectionExperiment"
},
"setup": {"datetime_column": "ds"},
"create_model": {
"model": "pca",
"params": {"n_components": 0.95, "alpha": 0.05}
}
}'
2. Automated Retraining Pipeline
Configure automatic retraining based on performance metrics:
setup:
datetime_column: "ds"
target: "fault_type"
retrain:
retrain_window: 5000
metric: "Accuracy"
metric_threshold: 0.85
higher_better: true
create_model:
model: "dt"
params: {}
3. Custom Function Integration
Add custom preprocessing functions:
load_object:
module: framework.FaultIsolation
name: FaultIsolationExperiment
setup:
datetime_column: "ds"
target: "output"
custom_preprocessing: # Custom function
normalize: true
remove_outliers: true
outlier_threshold: 3
eda:
create_model:
model: "rf"
params:
n_estimators: 100
4. Batch Processing
Process multiple experiments in sequence:
#!/bin/bash
# Batch training script
experiments=("pump_iforest" "pump_pca" "pump_dbscan")
models=("iforest" "pca" "dbscan")
for i in "${!experiments[@]}"; do
echo "Training ${experiments[$i]} with ${models[$i]}"
curl -X POST "http://localhost:8010/${experiments[$i]}/train" \
-H "Content-Type: application/json" \
-d @configs/pump/${models[$i]}.yaml
# Wait for training to complete
sleep 30
# Make initial prediction
curl -X POST "http://localhost:8010/${experiments[$i]}/predict"
done
Common Error Scenarios
1. Configuration Errors
{
"detail": "Configuration validation failed: missing required field 'target'"
}
Solution: Ensure all required configuration fields are present.
2. Experiment Not Found
{
"detail": "Experiment 'nonexistent_model' not found"
}
Solution: Check experiment name and ensure it has been created/loaded.
3. Model Training Failures
{
"detail": "Model training failed: insufficient data"
}
Solution: Verify data is available in RabbitMQ queue and meets model requirements.
4. Plugin Health Issues
{
"mlops": {
"healthy": false,
"details": {
"error": "Connection refused to MLflow server"
}
}
}
Solution: Check MLflow server status and connectivity.
Error Recovery Patterns
def robust_api_call(url, max_retries=3, backoff_factor=2):
"""Make API calls with retry logic"""
for attempt in range(max_retries):
try:
response = requests.post(url, timeout=30)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise e
wait_time = backoff_factor ** attempt
print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s...")
time.sleep(wait_time)
Client Libraries
ObServML Python Client
The ObServML Python Client is available as observml_client.py in the project root directory. This comprehensive client combines the functionality from api_commands.py with enhanced error handling, type hints, and modern Python practices.
Key Features: - Enhanced Error Handling: Automatic retry logic with exponential backoff - Type Hints: Full type annotations for better IDE support - RabbitMQ Integration: Automatic data posting to message queues - File Format Support: Load data from .xlsx, .csv, .json, .pkl files - Backward Compatibility: Legacy functions for existing code - Comprehensive Logging: Built-in logging for debugging - Monitoring Support: Continuous experiment monitoring capabilities
Installation:
# Import the client
from observml_client import ObServMLClient
# Or use legacy functions for backward compatibility
from observml_client import train, predict, load_experiment
Python Client Usage
import requests
import json
from typing import Dict, Any, Optional
class ObServMLClient:
"""Comprehensive Python client for ObServML API"""
def __init__(self, base_url: str = "http://localhost:8010"):
self.base_url = base_url.rstrip('/')
def health_check(self) -> Dict[str, Any]:
"""Check system health"""
response = requests.get(f"{self.base_url}/health")
response.raise_for_status()
return response.json()
def get_available_experiments(self) -> Dict[str, Any]:
"""Get available experiment types"""
response = requests.get(f"{self.base_url}/available_experiments")
response.raise_for_status()
return response.json()
def create_experiment(self, name: str, experiment_type: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new experiment"""
response = requests.post(
f"{self.base_url}/create_experiment/{name}/{experiment_type}",
json=config
)
response.raise_for_status()
return response.json()
def train_experiment(self, name: str, config: Dict[str, Any]) -> str:
"""Train an experiment"""
response = requests.post(f"{self.base_url}/{name}/train", json=config)
response.raise_for_status()
return response.text
def predict(self, name: str) -> str:
"""Make predictions"""
response = requests.post(f"{self.base_url}/{name}/predict")
response.raise_for_status()
return response.text
def load_experiment(self, name: str, run_id: Optional[str] = None) -> str:
"""Load an experiment"""
if run_id:
url = f"{self.base_url}/{name}/load/{run_id}"
else:
url = f"{self.base_url}/{name}/load"
response = requests.post(url)
response.raise_for_status()
return response.text
def save_experiment(self, name: str) -> str:
"""Save an experiment"""
response = requests.post(f"{self.base_url}/{name}/save")
response.raise_for_status()
return response.text
def get_plot(self, name: str, plot_name: str) -> Dict[str, Any]:
"""Get a plot from an experiment"""
response = requests.get(f"{self.base_url}/{name}/plot/{plot_name}")
response.raise_for_status()
return response.json()
def get_eda_plot(self, name: str, plot_name: str) -> Dict[str, Any]:
"""Get an EDA plot from an experiment"""
response = requests.get(f"{self.base_url}/{name}/plot_eda/{plot_name}")
response.raise_for_status()
return response.json()
def get_configuration(self, name: str) -> Dict[str, Any]:
"""Get experiment configuration"""
response = requests.get(f"{self.base_url}/{name}/cfg")
response.raise_for_status()
return response.json()
def get_experiments(self) -> Dict[str, Any]:
"""Get all experiments and their available plots"""
response = requests.get(f"{self.base_url}/experiments")
response.raise_for_status()
return response.json()
def flush_queue(self, queue_name: str) -> str:
"""Flush a RabbitMQ queue"""
response = requests.post(f"{self.base_url}/flush/{queue_name}")
response.raise_for_status()
return response.text
def retrain_experiment(self, name: str) -> str:
"""Retrain an experiment"""
response = requests.post(f"{self.base_url}/{name}/retrain")
response.raise_for_status()
return response.text
def delete_experiment(self, name: str) -> Dict[str, Any]:
"""Delete an experiment from memory"""
response = requests.post(f"{self.base_url}/{name}/delete")
response.raise_for_status()
return response.json()
# Usage example
client = ObServMLClient()
# Check system health
health = client.health_check()
print(f"System health: {json.dumps(health, indent=2)}")
# Create and train a fault detection experiment
config = {
"load_object": {
"module": "framework.FaultDetection",
"name": "FaultDetectionExperiment"
},
"setup": {
"datetime_column": "ds"
},
"eda": {},
"create_model": {
"model": "iforest",
"params": {
"n_estimators": 100,
"contamination": "auto"
}
}
}
result = client.train_experiment("pump_anomaly", config)
print(f"Training result: {result}")
# Make predictions
prediction = client.predict("pump_anomaly")
print(f"Prediction result: {prediction}")
# Get anomaly plot
plot = client.get_plot("pump_anomaly", "outliers")
print(f"Plot keys: {list(plot.keys())}")