ExperimentHub Plugin System
The ExperimentHub plugin system provides a flexible and extensible architecture for integrating various services and functionalities into the ExperimentHub. This document provides a comprehensive overview of the plugin system, including its architecture, configuration, and usage.
System Architecture
graph TD
A[ExperimentHub] --> B[Plugin System]
B --> C[MLOps Plugin]
B --> D[DataStream Plugin]
A --> F[Experiment Management]
F --> G[Training]
F --> H[Prediction]
F --> I[Retraining]
A --> J[Configuration System]
J --> K[Plugin Configuration]
J --> L[Experiment Configuration]
C --> M[MLflowPlugin]
D --> N[RabbitMQPlugin]
The ExperimentHub plugin system consists of three main components:
- Plugin Interfaces: Define the contract that plugin implementations must adhere to
- Plugin Implementations: Concrete implementations of the plugin interfaces
- Plugin Registry: Manages plugin registration and retrieval
Plugin Types
The ExperimentHub supports two main types of plugins:
1. MLOps Plugin
The MLOps plugin handles model tracking, versioning, and artifact storage. It provides methods for:
- Creating experiments
- Starting runs
- Logging artifacts
- Retrieving runs and experiments
Default implementation: MLflowPlugin
2. DataStream Plugin
The DataStream plugin handles data streaming and messaging. It provides methods for:
- Creating queues
- Sending and receiving data
- Flushing queues
Default implementation: RabbitMQPlugin
3. TaskQueue Plugin (REMOVED)
Note: The TaskQueue plugin functionality has been removed from ObservML. Training and prediction operations are now handled synchronously within the ExperimentHub. This simplifies the architecture and reduces dependencies while maintaining performance for most use cases.
Plugin Interfaces
All plugins must implement the Plugin Protocol defined in framework/plugins/base.py:
class Plugin(Protocol):
"""Base interface for all plugins"""
plugin_type: str
def __init__(self, **kwargs) -> None:
"""Initialize the plugin with configuration"""
...
def initialize(self) -> None:
"""Initialize the plugin"""
...
def shutdown(self) -> None:
"""Clean up resources when shutting down"""
...
def health_check(self) -> Tuple[bool, Dict[str, Any]]:
"""Check if the plugin is working correctly
Returns:
Tuple containing:
- Boolean indicating if the plugin is healthy
- Dictionary with additional health information and metrics
"""
...
Each plugin type then extends this base interface with additional methods specific to its functionality.
MLOps Plugin Interface
class MLOpsPlugin(Plugin, Protocol):
"""Interface for MLOps plugins"""
plugin_type = "mlops"
def set_tracking_uri(self, uri: str) -> None:
"""Set tracking URI"""
...
def create_experiment(self, name: str) -> str:
"""Create a new experiment"""
...
def start_run(self, experiment_id: str) -> str:
"""Start a new run"""
...
def log_artifact(self, path: str, artifact_path: str) -> None:
"""Log an artifact"""
...
def get_run(self, run_id: str) -> Any:
"""Get a run by ID"""
...
def get_experiment_by_name(self, name: str) -> Any:
"""Get an experiment by name"""
...
DataStream Plugin Interface
class DataStreamPlugin(Plugin, Protocol):
"""Interface for data stream plugins"""
plugin_type = "datastream"
def connect(self, **kwargs) -> None:
"""Connect to the data stream service"""
...
def disconnect(self) -> None:
"""Disconnect from the data stream service"""
...
def create_queue(self, queue_name: str) -> None:
"""Create a queue"""
...
def pull_data(self, queue: str) -> Any:
"""Pull data from a queue"""
...
def flush_queue(self, queue: str) -> None:
"""Flush a queue"""
...
TaskQueue Plugin Interface (REMOVED)
Note: The TaskQueue plugin interface has been removed from ObservML. This interface is no longer supported as task queue functionality has been replaced with synchronous processing.
Plugin Implementations
MLflowPlugin
The MLflowPlugin is an implementation of the MLOpsPlugin interface that uses MLflow for model tracking and versioning.
class MLflowPlugin(MLOpsPlugin):
"""MLflow implementation of MLOpsPlugin"""
plugin_type = "mlops"
def __init__(self, mlflow_uri: str, **kwargs):
self.mlflow_uri = mlflow_uri
self.client = None
def initialize(self) -> None:
"""Initialize the plugin"""
self.set_tracking_uri(self.mlflow_uri)
self.client = MlflowClient(self.mlflow_uri)
def health_check(self) -> Tuple[bool, Dict[str, Any]]:
"""Check if MLflow plugin is working correctly"""
try:
# Try to connect to MLflow server
self.client.list_experiments()
return True, {"status": "connected", "uri": self.mlflow_uri}
except Exception as e:
return False, {"status": "error", "message": str(e), "uri": self.mlflow_uri}
# ... other methods ...
RabbitMQPlugin
The RabbitMQPlugin is an implementation of the DataStreamPlugin interface that uses RabbitMQ for data streaming and messaging.
class RabbitMQPlugin(DataStreamPlugin):
"""RabbitMQ implementation of DataStreamPlugin"""
plugin_type = "datastream"
def __init__(self, host: str, port: str|int, username: str, password: str, **kwargs):
self.host = host
self.port = port
self.username = username
self.password = password
self.connection = None
self.channel = None
def initialize(self) -> None:
"""Initialize the plugin"""
self.connect()
def health_check(self) -> Tuple[bool, Dict[str, Any]]:
"""Check if RabbitMQ plugin is working correctly"""
try:
# Check if connection is open
if self.connection is None or self.connection.is_closed:
self.connect()
# Try to create a test queue
test_queue = f"health_check_{int(time.time())}"
self.channel.queue_declare(queue=test_queue, durable=False)
self.channel.queue_delete(queue=test_queue)
return True, {
"status": "connected",
"host": self.host,
"port": self.port
}
except Exception as e:
return False, {
"status": "error",
"message": str(e),
"host": self.host,
"port": self.port
}
# ... other methods ...
CeleryPlugin (REMOVED)
Note: The CeleryPlugin implementation has been removed from ObservML as task queue functionality is no longer supported. The system now processes training and prediction operations synchronously for improved simplicity and reliability.
Plugin Registration and Retrieval
Plugins are registered with the ExperimentHub using the register_plugin method:
def register_plugin(self, plugin: Plugin) -> None:
"""Register a plugin.
Args:
plugin: The plugin to register.
"""
self.plugins[plugin.plugin_type] = plugin
plugin.initialize()
Plugins can be retrieved using the get_plugin method:
def get_plugin(self, plugin_type: str) -> Optional[Plugin]:
"""Get a plugin by type.
Args:
plugin_type: The type of plugin to get.
Returns:
The plugin, or None if not found.
"""
return self.plugins.get(plugin_type)
Plugin Health Checks
All plugins must implement a health_check method that verifies if the plugin is working correctly. This method returns a tuple containing:
- A boolean indicating if the plugin is healthy
- A dictionary with additional health information and metrics
The ExperimentHub provides a check_plugin_health method that checks the health of all registered plugins:
def check_plugin_health(self) -> Dict[str, Dict[str, Any]]:
"""Check the health of all registered plugins
Returns:
Dictionary with plugin health information
"""
health_info = {}
for plugin_type, plugin in self.plugins.items():
is_healthy, details = plugin.health_check()
health_info[plugin_type] = {
"healthy": is_healthy,
"details": details
}
return health_info
Plugin Configuration
Plugins are configured using the hub_config.yaml file:
# Plugin configurations
plugins:
# MLOps plugin configuration
mlops:
enabled: true
type: "mlflow" # Which implementation to use
config:
mlflow_uri: "http://localhost:5000"
# Data stream plugin configuration
datastream:
enabled: true
type: "rabbitmq"
config:
host: "localhost"
port: 5672
username: "guest"
password: "guest"
# Task queue plugin configuration - REMOVED
# The task queue functionality has been removed from ObservML
# Training and prediction operations are now handled synchronously
The ExperimentHub initializes plugins based on this configuration:
def _init_plugins_from_config(self, plugin_config: dict) -> None:
"""Initialize plugins from configuration
Args:
plugin_config: Plugin configuration dictionary
"""
# Initialize MLOps plugin
if plugin_config.get("mlops", {}).get("enabled", False):
mlops_config = plugin_config["mlops"]
if mlops_config["type"] == "mlflow":
from framework.plugins.mlflow_plugin import MLflowPlugin
mlops_plugin = MLflowPlugin(**mlops_config["config"])
self.register_plugin(mlops_plugin)
# Initialize DataStream plugin
if plugin_config.get("datastream", {}).get("enabled", False):
datastream_config = plugin_config["datastream"]
if datastream_config["type"] == "rabbitmq":
from framework.plugins.rabbitmq_plugin import RabbitMQPlugin
datastream_plugin = RabbitMQPlugin(**datastream_config["config"])
self.register_plugin(datastream_plugin)
# TaskQueue plugin removed - no longer supported
pass
Creating Custom Plugins
You can create custom plugins by implementing the appropriate plugin interface. Here's an example of creating a custom MLOps plugin:
from framework.plugins.mlops import MLOpsPlugin
from typing import Any, Dict, Tuple
class CustomMLOpsPlugin(MLOpsPlugin):
"""Custom implementation of MLOpsPlugin"""
plugin_type = "mlops"
def __init__(self, custom_param: str, **kwargs):
self.custom_param = custom_param
def initialize(self) -> None:
"""Initialize the plugin"""
# Custom initialization logic
pass
def shutdown(self) -> None:
"""Clean up resources"""
# Custom cleanup logic
pass
def health_check(self) -> Tuple[bool, Dict[str, Any]]:
"""Check if plugin is working correctly"""
try:
# Custom health check logic
return True, {"status": "connected", "custom_param": self.custom_param}
except Exception as e:
return False, {"status": "error", "message": str(e)}
# Implement other MLOpsPlugin methods
# ...
To use your custom plugin, you can register it with the ExperimentHub:
from framework.ExperimentHub import ExperimentHub
# Create ExperimentHub
hub = ExperimentHub()
# Create and register custom plugin
custom_plugin = CustomMLOpsPlugin(custom_param="value")
hub.register_plugin(custom_plugin)
Plugin Lifecycle
Plugins go through the following lifecycle:
- Initialization: The plugin is created with configuration parameters
- Registration: The plugin is registered with the ExperimentHub
- Initialization: The
initializemethod is called to set up the plugin - Usage: The plugin is used by the ExperimentHub
- Shutdown: The
shutdownmethod is called to clean up resources
Best Practices
When working with plugins, follow these best practices:
Error Handling
Plugins should handle errors gracefully and provide meaningful error messages:
def health_check(self) -> Tuple[bool, Dict[str, Any]]:
"""Check if plugin is working correctly"""
try:
# Health check logic
return True, {"status": "connected"}
except ConnectionError as e:
return False, {"status": "error", "message": f"Connection error: {str(e)}"}
except TimeoutError as e:
return False, {"status": "error", "message": f"Timeout error: {str(e)}"}
except Exception as e:
return False, {"status": "error", "message": f"Unexpected error: {str(e)}"}
Resource Management
Plugins should clean up resources in the shutdown method:
def shutdown(self) -> None:
"""Clean up resources"""
if self.connection is not None:
try:
self.connection.close()
except Exception as e:
logging.error(f"Error closing connection: {e}")
finally:
self.connection = None
Configuration
Plugins should accept configuration parameters in the constructor:
def __init__(self, host: str, port: int, username: str, password: str, **kwargs):
self.host = host
self.port = port
self.username = username
self.password = password
self.timeout = kwargs.get("timeout", 30)
self.connection = None
Health Checks
Plugins should implement thorough health checks:
def health_check(self) -> Tuple[bool, Dict[str, Any]]:
"""Check if plugin is working correctly"""
try:
# Check connection
if self.connection is None or not self.connection.is_connected():
self.connect()
# Check functionality
self.ping()
# Return health information
return True, {
"status": "connected",
"host": self.host,
"port": self.port,
"connection_id": self.connection.id
}
except Exception as e:
return False, {
"status": "error",
"message": str(e),
"host": self.host,
"port": self.port
}
Documentation
Document the purpose, configuration, and usage of your plugins:
class CustomPlugin(Plugin):
"""Custom plugin for integrating with Example Service.
This plugin provides integration with Example Service, allowing
the ExperimentHub to use Example Service for specific functionality.
Configuration:
api_key: API key for authenticating with Example Service
host: Hostname of the Example Service API
port: Port of the Example Service API
timeout: Timeout for API requests (default: 30 seconds)
Usage:
plugin = CustomPlugin(
api_key="your-api-key",
host="api.example.com",
port=443,
timeout=60
)
hub.register_plugin(plugin)
"""
# ...
Troubleshooting
Plugin Initialization Errors
If a plugin fails to initialize, check the plugin configuration and make sure all required parameters are provided.
Plugin Health Check Failures
If a plugin health check fails, check the health information for details on what went wrong.
Missing Plugins
If a plugin is missing, check the configuration file and make sure the plugin is enabled and configured correctly.