Experiments

Experiments are the basic building blocks of the ExperimentHub. It is a Protocol class and can be used freely to derive an Experiment, such as the ones below. For specific goals and roles, we recommend buildign custom experiments, which require 2 important things: the configuration files, and the models

Bases: Protocol

A class representing an experiment. This class is a protocol that defines the methods that an experiment should implement. Some functions must be overloaded by an implementation class, while others can be left as is. Can only handle sklearn.BaseEstimator class, and so the used model must be overloaded

Parameters:
  • mlflow_uri (str) –

    the URI of the MLflow server

  • model (any) –

    the model to be used in the experiment

  • data (DataFrame) –

    the training data -- allocated during training

  • new_data (DataFrame) –

    data that is not in the training set -- incoming data in prediction

  • prediction (DataFrame) –

    the prediction of the new data

  • metrics (DataFrame) –

    the metrics of the model

  • cfg (dict) –

    the configuration of the experiment

  • _model_registry (dict[str, type]) –

    a registry of model classes

  • _report_registry (dict[str, Figure]) –

    a registry of reports

  • run_id (str) –

    the ID of the run

  • experiment_id (str) –

    the ID of the experiment

  • name (str) –

    the name of the experiment

  • eda_report (str) –

    the EDA report

TODO: - proper logging - Add mmlw_estimator as a base class for the model - SPC chart - add support for data drift (eda) - add support for model drift (eda) - add support for model explainability (shap, lime, etc.) - add support for general data preprocessing (e.g. missing values, outliers, etc.) - add support for dim reduction (PCA, TSNE, etc.) - add support for feature selection (RFE, etc.) - dynamic model loading from specific folder/registry

Source code in framework\Experiment.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
class Experiment(Protocol):
    """A class representing an experiment. This class is a protocol that defines the methods that an experiment should implement.
    Some functions must be overloaded by an implementation class, while others can be left as is. Can only handle sklearn.BaseEstimator class, and so the used model must be overloaded

    Parameters:
        mlflow_uri (str): the URI of the MLflow server
        model (any): the model to be used in the experiment
        data (pd.DataFrame): the training data -- allocated during training
        new_data (pd.DataFrame): data that is not in the training set -- incoming data in prediction
        prediction (pd.DataFrame): the prediction of the new data
        metrics (pd.DataFrame): the metrics of the model
        cfg (dict): the configuration of the experiment
        _model_registry (dict[str, type]): a registry of model classes
        _report_registry (dict[str, go.Figure]): a registry of reports
        run_id (str): the ID of the run
        experiment_id (str): the ID of the experiment
        name (str): the name of the experiment
        eda_report (str): the EDA report



    TODO:
    - proper logging
    - Add mmlw_estimator as a base class for the model
    - SPC chart
    - add support for data drift (eda)
    - add support for model drift (eda)
    - add support for model explainability (shap, lime, etc.) 
    - add support for general data preprocessing (e.g. missing values, outliers, etc.)
    - add support for dim reduction (PCA, TSNE, etc.)
    - add support for feature selection (RFE, etc.)
    - dynamic model loading from specific folder/registry
    """

    mlflow_uri : str = None
    model : any = None
    data : pd.DataFrame = None
    new_data : pd.DataFrame = pd.DataFrame([])
    metrics : pd.DataFrame = None
    cfg : dict = None
    _model_registry : dict[str, type] = dict()
    _report_registry : dict[str, go.Figure] = dict()
    run_id : str = None
    experiment_id : str = None
    name : str = ""
    _eda_registry : dict[str, go.Figure] = dict()
    scheme = None
    ###retrain_window : int, metric : str, metric_threshold : float, higher_better : bool = False
    retrain_window : int = 0
    metric : str = None
    metric_threshold : float = 0.0
    higher_better : bool = False
    data_format : dict = None

    def __init__(self, cfg : dict, experiment_id:str, run_id:str) -> None:
        """Initialize the Experiment class. This class is a protocol that defines the methods that an experiment should implement.

        Parameters:
            cfg (dict): the configuration of the experiment
            experiment_id (str): the ID of the experiment
            run_id (str): the ID of the run
        """

        self.cfg = cfg
        self.experiment_id = experiment_id
        self.run_id = run_id

    def format_data(self, data, format : dict = None) -> pd.DataFrame:
        """This function will provide a function to recover data from nonstandard json as pd.Dataframe.
        Parameters:
            data : data in json format or DataFrame
            format (dict) : settings for formatting. If None, then default pd.read_json is used."""

        if format is None:
            if isinstance(data, pd.DataFrame):
                return data
            else:
                from io import StringIO
                data = pd.read_json(StringIO(data))
                return data
        else:
            if format.get("name", "pivot") == "pivot":
                data = json.loads(data)
                _id = format.get("id", "tsdata")
                mxlvl = format.get("max_level", 1)
                data = pd.json_normalize(data[_id], max_level = 1)
                column = format.get("columns", "target")
                ind = format.get("index", "date")
                vals = format.get("values", "value") 
                data = data.pivot(columns=column, index= ind, values=vals)
                data.columns.name = None
                data.reset_index(drop = False, inplace = True)
                return data
            else:
                raise Exception(f"Configuration contains faulty data formatting settings. Please make sure the setup keyword contaisn the necessary information. Settings are: {format}")

    ###ADD Overloadable methods for Experiment class.
    def setup(self, data:pd.DataFrame, *args, **kwargs):
        """This function sets up the experiment. It is called before training the model."""
        return NotImplementedError("Implement this in the child class.")

    def create_model(self, *args, **kwargs):
        """This function trains the model."""
        return NotImplementedError("Implement this in the child class.")

    def predict(self, data, *args, **kwargs):
        """This function predicts the target variable."""
        return NotImplementedError("Implement this in the child class.")

    def _score(self, y, y_hat) -> pd.DataFrame:
        """This function calculates the metrics of the model."""
        return NotImplementedError("Implement this in the child class.")

    def plot_model(self, plot:str) -> go.Figure:
        """This function plots the model.

        Parameters:
            plot (str): the plot name that is to be displayed

        Returns:
            go.Figure: the plot
        """
        if plot not in self.model._figs:
            raise ValueError(f"Plot {plot} not found in model.")
        return self.model._figs[plot]

    def save(self):
        """This function saves the experiment. It saves the model and the reports. Uses joblib to save the model and pickle to save the reports.
        """
        with mlflow.start_run(nested = True, experiment_id=self.experiment_id, run_id=self.run_id) as run:  

            repository = get_artifact_repository(run.info.artifact_uri)
            try:
                repository.delete_artifacts(mlflow.get_artifact_uri("experiment.pkl"))
                repository.delete_artifacts(mlflow.get_artifact_uri("metadata.yaml"))
                repository.delete_artifacts(mlflow.get_artifact_uri("reports"))
            except:
                pass

        experiment_to_be_saved = deepcopy(self)
        if isinstance(self.model, tf.keras.models.Model):
            self.model.save(os.path.join(os.getcwd(), "runs", self.run_id, "model.keras"))
            experiment_to_be_saved.model = None
            with mlflow.start_run(nested = True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
                mlflow.log_artifact(os.path.join(os.getcwd(),"runs", self.run_id, "model.keras"), "model")


        with open(os.path.join(os.getcwd(), "runs", self.run_id, "experiment.pkl"), "wb") as f:
            joblib.dump(experiment_to_be_saved, f)

        for k, v in self._report_registry.items():  

            with open(os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"{k}.json"), "w") as f:
                write_json(v, f)  #os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"{k}.json") 

            with mlflow.start_run(nested = True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
                mlflow.log_artifact(os.path.join(os.getcwd(),"runs", self.run_id,"reports" , f"{k}.json"), "reports")

        for k,v in self._eda_registry.items():
            with open(os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"eda_{k}.json"), "w") as f:
                write_json(v, f)
            with mlflow.start_run(nested = True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
                mlflow.log_artifact(os.path.join(os.getcwd(),"runs", self.run_id,"reports" , f"eda_{k}.json"), "reports")

        yaml.dump(self.cfg, open(os.path.join(os.getcwd(), "runs", self.run_id, "metadata.yaml"), "w"))

        with mlflow.start_run(nested = True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
                mlflow.log_artifact(os.path.join(os.getcwd(),"runs", self.run_id,  "experiment.pkl"), "")
                mlflow.log_artifact(os.path.join(os.getcwd(),"runs", self.run_id, "metadata.yaml"), "")


    def load(self, run_id:str) -> Experiment:
        """This function loads the experiment. It loads the model and the reports. Uses joblib to load the model and pickle to load the reports.

        Parameters:
            run_id (str): the ID of the run

        Returns:
            Experiment: the experiment
        """
        dst = os.path.join(os.getcwd(), "runs", run_id)
        mlflow.artifacts.download_artifacts(artifact_uri=f"runs:/{run_id}/experiment.pkl", dst_path= dst)
        exp = joblib.load(os.path.join(dst, "experiment.pkl")) 

        if os.path.exists(os.path.join(dst, "model.keras")):
            exp.model = tf.keras.models.load_model(os.path.join(dst, "model.keras"))

        mlflow.artifacts.download_artifacts(artifact_uri=f"runs:/{run_id}/reports", dst_path= dst)
        report_dir = os.path.join(dst, "reports")
        for file in os.listdir(report_dir):
            if file.endswith(".json") and file.startswith("eda_"):
                with open(os.path.join(report_dir, file), "rb") as f:
                        report = read_json(f)
                        key = file[4:].split(".")[0]
                        exp._eda_registry[key] = report  

            elif file.endswith(".json"):
                with open(os.path.join(report_dir, file), "rb") as f:
                     report = read_json(f)
                     exp._report_registry[file.split(".")[0]] = report

        ##find .pkl files in report  

        exp.model._figs = exp._report_registry
        exp.cfg = self.cfg
        exp.experiment_id = self.experiment_id
        exp.run_id = run_id

        ## load _report_registry
        ## assing it to model

        #exp.model = mlflow.sklearn.load_model(f"runs:/{run_id}/model")
        return exp

    def convert_datetime(self, data : pd.DataFrame, format : str):
        # TODO: get and return column only...
        if np.issubdtype(data[self.ds].dtype, np.integer):
            data[self.ds] = pd.to_datetime(data[self.ds], unit = format)#, format = self.format .dt.strftime('%Y-%m-%d %H:%M:%S. %s') .astype(str), format = self.format
        elif data[self.ds].dtype == str:
            data[self.ds] = pd.to_datetime(data[self.ds], format = format)#, format = self.format .dt.strftime('%Y-%m-%d %H:%M:%S. %s') .astype(str), format = self.format
        else:
            logging.warn(f"Unknown type {data[self.ds].dtype}")    
            data[self.ds] = pd.to_datetime(data[self.ds])
        return data

    def eda(self): # interactions = None,
        """This function performs simple exploratory data analysis.

        Returns:
            None
        """
        print("Performing EDA...")
        ## add histograms
        self._eda_registry = dict()

        for col in self.data.columns:
            fig = go.Figure()
            fig.add_trace(go.Histogram(x=self.data[col], name=col))
            fig.update_layout(title_text=f"{col} Histogram")
            fig.update_xaxes(title_text=col)
            fig.update_yaxes(title_text="Count")
            self._eda_registry[col] = fig

        ## add correlation matrix
        labels = self.data.columns.to_list()
        labels.reverse()
        fig = go.Figure(data=go.Heatmap(z=self.data.corr(), x=labels, y=labels))
        self._eda_registry["correlation_matrix"] = fig    

        ## add missing values
        #fig = go.Figure(data=go.Heatmap(z=self.data.isnull().sum(), x=self.data.columns, y=self.data.columns))
        #self._eda_registry["missing_values"] = fig


    def retrain(self):
        if self.metric is not None:
            if self.higher_better:
               print(self.metrics.iloc[-1].loc[self.metric])

               return  self.metrics.iloc[-1].loc[self.metric] < self.metric_threshold
            return self.metrics.iloc[-1].loc[self.metric]> self.metric_threshold

        return False

    def spc(self):
        """This function creates a statistical process control chart. WIP."""
        pass

    def run(self, data:pd.DataFrame) -> None:
        """This function runs the experiment. It trains the model and performs exploratory data analysis. Handles everythin internally...

        Parameters:
            data (pd.DataFrame): the training data

        """

        if data is None:
            raise ValueError("Data not found. Please provide data for training.")

        if self.cfg is None:
            raise ValueError("No training configuration found in metadata. The interface is not properly configured.")

        ## for k in config run the function, with specified parameters...
        funcs = list(self.cfg.keys())
        funcs.remove("load_object")

        self.data = data
        for k in funcs:
            if k == "setup":
                self.setup(data)
                #self.eda(   )
            else:
                getattr(self, k)()


        #self.setup(data, experiment_id, run_id)
        #self.model = self.create_model()
        #self.eda() # -> add eda to figs
        #self.spc() # -> add spc to figs
        #self.spc_chart()
        return None

    def join_data(self):
        """Joins new data and previous train data."""
        ndata = pd.concat((self.data, self.new_data), axis=0)
        if self.retrain_window != 0 and self.retrain_window < ndata.shape[0]:
            ndata = ndata.iloc[-self.retrain_window:, :]


        return ndata

    def export(self) -> None:
        """This function exports the reports as HTML files and logs them to MLflow."""
        with mlflow.start_run(nested=True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
            for k, v in self._eda_registry.items():
                v.write_html(os.path.join(os.getcwd(),"runs", self.run_id, "reports", f"eda_{k}_report.html"))
                mlflow.log_artifact(os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"eda_{k}_report.html"), "reports")

            for k, v in self.model._figs.items(): 
                v.write_html(os.path.join(os.getcwd(),"runs", self.run_id, "reports", f"{k}_report.html"))
                mlflow.log_artifact(os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"{k}_report.html"), "reports")

    def get_eda_reports(self)   -> list[str]:
        """This function returns the IDs of the EDA reports that are available in the model.

        Returns:
            list[str]: the IDs of the reports
        """
        return list(self._eda_registry.keys())       

    def get_fig_types(self) -> list[str]:
        """This function returns the IDs of figures that are available in the model.

        Returns:
            list[str]: the IDs of figures
        """
        return list(self.model._figs.keys())

__init__(cfg, experiment_id, run_id)

Initialize the Experiment class. This class is a protocol that defines the methods that an experiment should implement.

Parameters:
  • cfg (dict) –

    the configuration of the experiment

  • experiment_id (str) –

    the ID of the experiment

  • run_id (str) –

    the ID of the run

Source code in framework\Experiment.py
75
76
77
78
79
80
81
82
83
84
85
86
def __init__(self, cfg : dict, experiment_id:str, run_id:str) -> None:
    """Initialize the Experiment class. This class is a protocol that defines the methods that an experiment should implement.

    Parameters:
        cfg (dict): the configuration of the experiment
        experiment_id (str): the ID of the experiment
        run_id (str): the ID of the run
    """

    self.cfg = cfg
    self.experiment_id = experiment_id
    self.run_id = run_id

create_model(*args, **kwargs)

This function trains the model.

Source code in framework\Experiment.py
122
123
124
def create_model(self, *args, **kwargs):
    """This function trains the model."""
    return NotImplementedError("Implement this in the child class.")

eda()

This function performs simple exploratory data analysis.

Returns:
  • None

Source code in framework\Experiment.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def eda(self): # interactions = None,
    """This function performs simple exploratory data analysis.

    Returns:
        None
    """
    print("Performing EDA...")
    ## add histograms
    self._eda_registry = dict()

    for col in self.data.columns:
        fig = go.Figure()
        fig.add_trace(go.Histogram(x=self.data[col], name=col))
        fig.update_layout(title_text=f"{col} Histogram")
        fig.update_xaxes(title_text=col)
        fig.update_yaxes(title_text="Count")
        self._eda_registry[col] = fig

    ## add correlation matrix
    labels = self.data.columns.to_list()
    labels.reverse()
    fig = go.Figure(data=go.Heatmap(z=self.data.corr(), x=labels, y=labels))
    self._eda_registry["correlation_matrix"] = fig    

export()

This function exports the reports as HTML files and logs them to MLflow.

Source code in framework\Experiment.py
332
333
334
335
336
337
338
339
340
341
def export(self) -> None:
    """This function exports the reports as HTML files and logs them to MLflow."""
    with mlflow.start_run(nested=True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
        for k, v in self._eda_registry.items():
            v.write_html(os.path.join(os.getcwd(),"runs", self.run_id, "reports", f"eda_{k}_report.html"))
            mlflow.log_artifact(os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"eda_{k}_report.html"), "reports")

        for k, v in self.model._figs.items(): 
            v.write_html(os.path.join(os.getcwd(),"runs", self.run_id, "reports", f"{k}_report.html"))
            mlflow.log_artifact(os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"{k}_report.html"), "reports")

format_data(data, format=None)

This function will provide a function to recover data from nonstandard json as pd.Dataframe. Parameters: data : data in json format or DataFrame format (dict) : settings for formatting. If None, then default pd.read_json is used.

Source code in framework\Experiment.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def format_data(self, data, format : dict = None) -> pd.DataFrame:
    """This function will provide a function to recover data from nonstandard json as pd.Dataframe.
    Parameters:
        data : data in json format or DataFrame
        format (dict) : settings for formatting. If None, then default pd.read_json is used."""

    if format is None:
        if isinstance(data, pd.DataFrame):
            return data
        else:
            from io import StringIO
            data = pd.read_json(StringIO(data))
            return data
    else:
        if format.get("name", "pivot") == "pivot":
            data = json.loads(data)
            _id = format.get("id", "tsdata")
            mxlvl = format.get("max_level", 1)
            data = pd.json_normalize(data[_id], max_level = 1)
            column = format.get("columns", "target")
            ind = format.get("index", "date")
            vals = format.get("values", "value") 
            data = data.pivot(columns=column, index= ind, values=vals)
            data.columns.name = None
            data.reset_index(drop = False, inplace = True)
            return data
        else:
            raise Exception(f"Configuration contains faulty data formatting settings. Please make sure the setup keyword contaisn the necessary information. Settings are: {format}")

get_eda_reports()

This function returns the IDs of the EDA reports that are available in the model.

Returns:
  • list[str]

    list[str]: the IDs of the reports

Source code in framework\Experiment.py
343
344
345
346
347
348
349
def get_eda_reports(self)   -> list[str]:
    """This function returns the IDs of the EDA reports that are available in the model.

    Returns:
        list[str]: the IDs of the reports
    """
    return list(self._eda_registry.keys())       

get_fig_types()

This function returns the IDs of figures that are available in the model.

Returns:
  • list[str]

    list[str]: the IDs of figures

Source code in framework\Experiment.py
351
352
353
354
355
356
357
def get_fig_types(self) -> list[str]:
    """This function returns the IDs of figures that are available in the model.

    Returns:
        list[str]: the IDs of figures
    """
    return list(self.model._figs.keys())

join_data()

Joins new data and previous train data.

Source code in framework\Experiment.py
323
324
325
326
327
328
329
330
def join_data(self):
    """Joins new data and previous train data."""
    ndata = pd.concat((self.data, self.new_data), axis=0)
    if self.retrain_window != 0 and self.retrain_window < ndata.shape[0]:
        ndata = ndata.iloc[-self.retrain_window:, :]


    return ndata

load(run_id)

This function loads the experiment. It loads the model and the reports. Uses joblib to load the model and pickle to load the reports.

Parameters:
  • run_id (str) –

    the ID of the run

Returns:
Source code in framework\Experiment.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def load(self, run_id:str) -> Experiment:
    """This function loads the experiment. It loads the model and the reports. Uses joblib to load the model and pickle to load the reports.

    Parameters:
        run_id (str): the ID of the run

    Returns:
        Experiment: the experiment
    """
    dst = os.path.join(os.getcwd(), "runs", run_id)
    mlflow.artifacts.download_artifacts(artifact_uri=f"runs:/{run_id}/experiment.pkl", dst_path= dst)
    exp = joblib.load(os.path.join(dst, "experiment.pkl")) 

    if os.path.exists(os.path.join(dst, "model.keras")):
        exp.model = tf.keras.models.load_model(os.path.join(dst, "model.keras"))

    mlflow.artifacts.download_artifacts(artifact_uri=f"runs:/{run_id}/reports", dst_path= dst)
    report_dir = os.path.join(dst, "reports")
    for file in os.listdir(report_dir):
        if file.endswith(".json") and file.startswith("eda_"):
            with open(os.path.join(report_dir, file), "rb") as f:
                    report = read_json(f)
                    key = file[4:].split(".")[0]
                    exp._eda_registry[key] = report  

        elif file.endswith(".json"):
            with open(os.path.join(report_dir, file), "rb") as f:
                 report = read_json(f)
                 exp._report_registry[file.split(".")[0]] = report

    ##find .pkl files in report  

    exp.model._figs = exp._report_registry
    exp.cfg = self.cfg
    exp.experiment_id = self.experiment_id
    exp.run_id = run_id

    ## load _report_registry
    ## assing it to model

    #exp.model = mlflow.sklearn.load_model(f"runs:/{run_id}/model")
    return exp

plot_model(plot)

This function plots the model.

Parameters:
  • plot (str) –

    the plot name that is to be displayed

Returns:
  • Figure

    go.Figure: the plot

Source code in framework\Experiment.py
134
135
136
137
138
139
140
141
142
143
144
145
def plot_model(self, plot:str) -> go.Figure:
    """This function plots the model.

    Parameters:
        plot (str): the plot name that is to be displayed

    Returns:
        go.Figure: the plot
    """
    if plot not in self.model._figs:
        raise ValueError(f"Plot {plot} not found in model.")
    return self.model._figs[plot]

predict(data, *args, **kwargs)

This function predicts the target variable.

Source code in framework\Experiment.py
126
127
128
def predict(self, data, *args, **kwargs):
    """This function predicts the target variable."""
    return NotImplementedError("Implement this in the child class.")

run(data)

This function runs the experiment. It trains the model and performs exploratory data analysis. Handles everythin internally...

Parameters:
  • data (DataFrame) –

    the training data

Source code in framework\Experiment.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def run(self, data:pd.DataFrame) -> None:
    """This function runs the experiment. It trains the model and performs exploratory data analysis. Handles everythin internally...

    Parameters:
        data (pd.DataFrame): the training data

    """

    if data is None:
        raise ValueError("Data not found. Please provide data for training.")

    if self.cfg is None:
        raise ValueError("No training configuration found in metadata. The interface is not properly configured.")

    ## for k in config run the function, with specified parameters...
    funcs = list(self.cfg.keys())
    funcs.remove("load_object")

    self.data = data
    for k in funcs:
        if k == "setup":
            self.setup(data)
            #self.eda(   )
        else:
            getattr(self, k)()


    #self.setup(data, experiment_id, run_id)
    #self.model = self.create_model()
    #self.eda() # -> add eda to figs
    #self.spc() # -> add spc to figs
    #self.spc_chart()
    return None

save()

This function saves the experiment. It saves the model and the reports. Uses joblib to save the model and pickle to save the reports.

Source code in framework\Experiment.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def save(self):
    """This function saves the experiment. It saves the model and the reports. Uses joblib to save the model and pickle to save the reports.
    """
    with mlflow.start_run(nested = True, experiment_id=self.experiment_id, run_id=self.run_id) as run:  

        repository = get_artifact_repository(run.info.artifact_uri)
        try:
            repository.delete_artifacts(mlflow.get_artifact_uri("experiment.pkl"))
            repository.delete_artifacts(mlflow.get_artifact_uri("metadata.yaml"))
            repository.delete_artifacts(mlflow.get_artifact_uri("reports"))
        except:
            pass

    experiment_to_be_saved = deepcopy(self)
    if isinstance(self.model, tf.keras.models.Model):
        self.model.save(os.path.join(os.getcwd(), "runs", self.run_id, "model.keras"))
        experiment_to_be_saved.model = None
        with mlflow.start_run(nested = True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
            mlflow.log_artifact(os.path.join(os.getcwd(),"runs", self.run_id, "model.keras"), "model")


    with open(os.path.join(os.getcwd(), "runs", self.run_id, "experiment.pkl"), "wb") as f:
        joblib.dump(experiment_to_be_saved, f)

    for k, v in self._report_registry.items():  

        with open(os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"{k}.json"), "w") as f:
            write_json(v, f)  #os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"{k}.json") 

        with mlflow.start_run(nested = True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
            mlflow.log_artifact(os.path.join(os.getcwd(),"runs", self.run_id,"reports" , f"{k}.json"), "reports")

    for k,v in self._eda_registry.items():
        with open(os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"eda_{k}.json"), "w") as f:
            write_json(v, f)
        with mlflow.start_run(nested = True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
            mlflow.log_artifact(os.path.join(os.getcwd(),"runs", self.run_id,"reports" , f"eda_{k}.json"), "reports")

    yaml.dump(self.cfg, open(os.path.join(os.getcwd(), "runs", self.run_id, "metadata.yaml"), "w"))

    with mlflow.start_run(nested = True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
            mlflow.log_artifact(os.path.join(os.getcwd(),"runs", self.run_id,  "experiment.pkl"), "")
            mlflow.log_artifact(os.path.join(os.getcwd(),"runs", self.run_id, "metadata.yaml"), "")

setup(data, *args, **kwargs)

This function sets up the experiment. It is called before training the model.

Source code in framework\Experiment.py
118
119
120
def setup(self, data:pd.DataFrame, *args, **kwargs):
    """This function sets up the experiment. It is called before training the model."""
    return NotImplementedError("Implement this in the child class.")

spc()

This function creates a statistical process control chart. WIP.

Source code in framework\Experiment.py
285
286
287
def spc(self):
    """This function creates a statistical process control chart. WIP."""
    pass

FaultIsolationExperiment

Bases: Experiment

This experiment file is used for fault isolation experiments. It is a subclass of the Experiment class in the framework.Experiment module. It is used to create, train, and predict using fault isolation models. Heavily relies on classification algorithms.

Implemented models:

  • Decision Tree : 'models.fault_isolation.DecisionTree'

  • Random Forest : 'models.fault_isolation.RandomForest'

  • Naive Bayes : 'models.fault_isolation.NaiveBayes'

  • HMM : 'models.fault_isolation.HMM'

  • Markov Chain : 'models.fault_isolation.MarkovChain'

Inherits from Experiment Protocol class from framework.Experiment.

If the experiment is overloaded, and new functions are added, one can call it in the system by adding the function name to the cfg file with relevant parameters. For example, if a new function 'new_function' is added to the system, the cfg file should have the following structure: cfg file should have the following structure:

load_object:
    module: framework.FaultIsolation
    name: FaultIsolationExperiment
setup:
    datetime_column: str The column that contains the datetime information.
    target: str The target column for the experiment. (output column)
...
eda:
create_model:
    model: str The model to be used for training.
    params: dict The parameters to be used for the model.
new_function:
    param1: str The first parameter for the function.
    param2: str The second parameter for the function.
...
Source code in framework\FaultIsolation.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
class FaultIsolationExperiment(Experiment):
    """This experiment file is used for fault isolation experiments.
    It is a subclass of the Experiment class in the framework.Experiment module. It is used to create, train, and predict using fault isolation models.
    Heavily relies on classification algorithms.

    Implemented models:

    - Decision Tree : 'models.fault_isolation.DecisionTree'

    - Random Forest :  'models.fault_isolation.RandomForest'

    - Naive Bayes : 'models.fault_isolation.NaiveBayes'

    - HMM :  'models.fault_isolation.HMM'

    - Markov Chain :  'models.fault_isolation.MarkovChain'

    Inherits from Experiment Protocol class from framework.Experiment.


    If the experiment is overloaded, and new functions are added, one can call it in the system by adding the function name to the cfg file with relevant parameters.
    For example, if a new function 'new_function' is added to the system, the cfg file should have the following structure:
    cfg file should have the following structure:


    ```
    load_object:
        module: framework.FaultIsolation
        name: FaultIsolationExperiment
    setup:
        datetime_column: str The column that contains the datetime information.
        target: str The target column for the experiment. (output column)
    ...
    eda:
    create_model:
        model: str The model to be used for training.
        params: dict The parameters to be used for the model.
    new_function:
        param1: str The first parameter for the function.
        param2: str The second parameter for the function.
    ...
    ```
    """

    def __init__(self, cfg:dict, experiment_id:str, run_id:str, * args, **kwargs) -> None:
        """Initialize the model registry. Currently cannot be changed after initialization. In future versions, we will allow for dynamic model loading through configuration files.

        Parameters:

            cfg (dict): The configuration file for the experiment.
            experiment_id (str): The experiment id for the experiment.
            run_id (str): The run id for the experiment.
        """

        self.cfg = cfg
        self.experiment_id = experiment_id
        self.run_id = run_id
        self._model_registry["dt"] = DecisionTreeModel # decision tree
        self._model_registry["rf"] = RandomForestModel # random forest
        self._model_registry["nb"] = NaiveBayesModel # Naive Bayes
        self._model_registry["bn"] = BayesNet # Bayes Net!
        self._model_registry["hmm"] = HMM # hidden markov model
        self._model_registry["mc"] = MarkovChainModel# Markov Chain

    def setup(self, data:str) -> pd.DataFrame:
        """Setup the data for training and prediction. This function is called before training the model. 
        Parameters:

            data (str): The data to be used for training and prediction in json format.

        Returns:
            data (pd.DataFrame): The data set up for training and prediction.
        """
        cfg = self.cfg["setup"]

        self.data_format =  cfg.get("format", None)
        data = self.format_data(data, self.data_format)

        self.data = data
        print(self.data)

        self.ds = cfg.get("datetime_column", None)
        self.format = cfg.get("datetime_format", None)
        self.predict_window = cfg.get("predict_window", 0)
        if self.predict_window is None:
            self.predict_window = 0
        if self.predict_window < 0:
            self.predict_window *= -1
        retrain_cfg = cfg.get("retrain", None)

        if retrain_cfg is None or len(retrain_cfg) == 0:
            self.retrain_window = 0
            self.metric = None
            self.metric_threshold = 0.0
            self.higher_better = True
        else:
            self.retrain_window = retrain_cfg.get("retrain_window", 0)
            self.metric = retrain_cfg.get("metric", None)
            self.metric_threshold = retrain_cfg.get("metric_threshold", 0.0)
            self.higher_better = retrain_cfg.get("higher_better", True)


        self.target = cfg["target"]
        # self.target = self.target.replace(" ", "_")   # replace spaces with underscores
        # self.target = self.target.replace("//", "") 
        # self.target = self.target.replace("/", "") 
        # self.target = self.target.replace("(", "")
        # self.target = self.target.replace(")", "")
        # self.target = self.target.replace("\\", "")
        # self.target = self.target.replace(".", "")


        ## TODO: Abstrct data VC -- integrate dvc or other data versioning tools.
        if self.ds is not None:
            self.data = self.convert_datetime(self.data, format = self.format)
            self.data.set_index(self.ds, inplace=True)

        self.data.rename(columns={self.target : "target"}, inplace=True)

        self.data["target"] = self.data["target"].astype("category")
        self.input_scheme = self.data.columns.to_list()
        self.input_scheme.remove("target")

        return data

    def create_model(self, *args, **kwargs) -> any: 
        """Create the model using the configuration file. The model is trained on the data set up in the 'setup' function.

        Returns: 
            (any) The trained model."""


        model = self.cfg["create_model"]["model"]
        params = self.cfg["create_model"].get("params", None)
        if params is None:
            params = dict()

        if self.data is None:
            raise ValueError("Data not found. Please use 'setup' to setup the data first.")

        try:
            model_class = self._model_registry[model]
        except KeyError:
            raise ValueError(f"Model {model} not found in model registry. Please check configuration file. Available models are {self._model_registry.keys()}.")
        self.model = model_class(**params).fit(self.data)
        self._report_registry = self.model._figs
        y = self.model.predict(self.data)
        try:
            self.metrics = self._score(self.data["target"], y["y_pred"])
            self._report_registry["metrics"] = px.line(self.metrics, x = self.metrics.index, y = self.metrics.columns, markers=True)
        except Exception as e:
            self.metrics = pd.DataFrame([np.NaN, np.NaN, np.NaN, np.NaN], index = ["Accuracy", "F1", "Precision", "Recall"]).transpose()
            logging.warn(f"Error calculating metrics: {e}. Model {self.model.__class__.__name__} may not have y_pred as output.")

        with mlflow.start_run(nested=True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
            mlflow.set_tag("model", model)
            for k, v in self.model._figs.items():
                v.write_html(os.path.join(os.getcwd(),"runs", self.run_id, "reports", f"{k}.html"))
                mlflow.log_artifact(os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"{k}.html"), "reports")
            for k, v in params.items():
                mlflow.log_param(k, v)

        return self.model


    def predict(self, data:str):
        """Predict using the trained model. The model should be trained before calling this function.

        Parameters:
            data (str): The data to be used for prediction in json format.

        Returns:
            (pd.DataFrame) The predictions made by the model."""
        data = self.format_data(data, self.data_format)
        if self.model is None:
            raise ValueError("Model not found. Please train the model first with 'create_model'.")  
        X = data.copy()
        if self.ds is not None:
            X = self.convert_datetime(X, format = self.format)
            X.set_index(self.ds, inplace=True)

        if self.target in X.columns:
            X.rename(columns={self.target : "target"}, inplace=True)
        y = self.model.predict(X)

        metrics = self._score(X["target"], y["y_pred"])
        self.metrics = pd.concat([self.metrics, metrics], axis=0, ignore_index=True)
        self.new_data = pd.concat([self.new_data, y], axis=0)

        if self.predict_window > self.new_data.shape[0] :
            self.model.update_predict(self.new_data, reset_fig = True, update_fig = True)
        else:
            self.model.update_predict(self.new_data.iloc[-self.predict_window:, :], reset_fig = True, update_fig = True) 

        self._report_registry["metrics"] = px.line(self.metrics, x = self.metrics.index, y = self.metrics.columns, markers=True)
        return y, metrics

    def _score(self, y, y_hat):
        """Score the model using the predictions. This function is called after the predictions are made.
        Parameters:
            y (pd.Series): The actual values.
            y_hat (pd.Series): The predicted values.

        Returns:
            (pd.DataFrame) The metrics calculated for the model."""


        if self.model is None:
            raise ValueError("Model not found. Please train the model first with 'create_model'.")
        if y is None:
            raise ValueError("No data found for scoring. Please provide data for scoring.")
        if y_hat is None:
            raise ValueError("No predictions found for scoring. Please provide predictions for scoring.")

        acc = accuracy_score(y, y_hat)
        f1 = f1_score(y, y_hat)
        prec = precision_score(y, y_hat)
        rec = recall_score(y, y_hat)
        return pd.DataFrame([acc, f1, prec, rec], index=["Accuracy", "F1", "Precision", "Recall"]).transpose()

__init__(cfg, experiment_id, run_id, *args, **kwargs)

Initialize the model registry. Currently cannot be changed after initialization. In future versions, we will allow for dynamic model loading through configuration files.

Parameters:

cfg (dict): The configuration file for the experiment.
experiment_id (str): The experiment id for the experiment.
run_id (str): The run id for the experiment.
Source code in framework\FaultIsolation.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def __init__(self, cfg:dict, experiment_id:str, run_id:str, * args, **kwargs) -> None:
    """Initialize the model registry. Currently cannot be changed after initialization. In future versions, we will allow for dynamic model loading through configuration files.

    Parameters:

        cfg (dict): The configuration file for the experiment.
        experiment_id (str): The experiment id for the experiment.
        run_id (str): The run id for the experiment.
    """

    self.cfg = cfg
    self.experiment_id = experiment_id
    self.run_id = run_id
    self._model_registry["dt"] = DecisionTreeModel # decision tree
    self._model_registry["rf"] = RandomForestModel # random forest
    self._model_registry["nb"] = NaiveBayesModel # Naive Bayes
    self._model_registry["bn"] = BayesNet # Bayes Net!
    self._model_registry["hmm"] = HMM # hidden markov model
    self._model_registry["mc"] = MarkovChainModel# Markov Chain

create_model(*args, **kwargs)

Create the model using the configuration file. The model is trained on the data set up in the 'setup' function.

Returns:
  • any

    (any) The trained model.

Source code in framework\FaultIsolation.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def create_model(self, *args, **kwargs) -> any: 
    """Create the model using the configuration file. The model is trained on the data set up in the 'setup' function.

    Returns: 
        (any) The trained model."""


    model = self.cfg["create_model"]["model"]
    params = self.cfg["create_model"].get("params", None)
    if params is None:
        params = dict()

    if self.data is None:
        raise ValueError("Data not found. Please use 'setup' to setup the data first.")

    try:
        model_class = self._model_registry[model]
    except KeyError:
        raise ValueError(f"Model {model} not found in model registry. Please check configuration file. Available models are {self._model_registry.keys()}.")
    self.model = model_class(**params).fit(self.data)
    self._report_registry = self.model._figs
    y = self.model.predict(self.data)
    try:
        self.metrics = self._score(self.data["target"], y["y_pred"])
        self._report_registry["metrics"] = px.line(self.metrics, x = self.metrics.index, y = self.metrics.columns, markers=True)
    except Exception as e:
        self.metrics = pd.DataFrame([np.NaN, np.NaN, np.NaN, np.NaN], index = ["Accuracy", "F1", "Precision", "Recall"]).transpose()
        logging.warn(f"Error calculating metrics: {e}. Model {self.model.__class__.__name__} may not have y_pred as output.")

    with mlflow.start_run(nested=True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
        mlflow.set_tag("model", model)
        for k, v in self.model._figs.items():
            v.write_html(os.path.join(os.getcwd(),"runs", self.run_id, "reports", f"{k}.html"))
            mlflow.log_artifact(os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"{k}.html"), "reports")
        for k, v in params.items():
            mlflow.log_param(k, v)

    return self.model

predict(data)

Predict using the trained model. The model should be trained before calling this function.

Parameters:
  • data (str) –

    The data to be used for prediction in json format.

Returns:
  • (pd.DataFrame) The predictions made by the model.

Source code in framework\FaultIsolation.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def predict(self, data:str):
    """Predict using the trained model. The model should be trained before calling this function.

    Parameters:
        data (str): The data to be used for prediction in json format.

    Returns:
        (pd.DataFrame) The predictions made by the model."""
    data = self.format_data(data, self.data_format)
    if self.model is None:
        raise ValueError("Model not found. Please train the model first with 'create_model'.")  
    X = data.copy()
    if self.ds is not None:
        X = self.convert_datetime(X, format = self.format)
        X.set_index(self.ds, inplace=True)

    if self.target in X.columns:
        X.rename(columns={self.target : "target"}, inplace=True)
    y = self.model.predict(X)

    metrics = self._score(X["target"], y["y_pred"])
    self.metrics = pd.concat([self.metrics, metrics], axis=0, ignore_index=True)
    self.new_data = pd.concat([self.new_data, y], axis=0)

    if self.predict_window > self.new_data.shape[0] :
        self.model.update_predict(self.new_data, reset_fig = True, update_fig = True)
    else:
        self.model.update_predict(self.new_data.iloc[-self.predict_window:, :], reset_fig = True, update_fig = True) 

    self._report_registry["metrics"] = px.line(self.metrics, x = self.metrics.index, y = self.metrics.columns, markers=True)
    return y, metrics

setup(data)

Setup the data for training and prediction. This function is called before training the model. Parameters:

data (str): The data to be used for training and prediction in json format.
Returns:
  • data( DataFrame ) –

    The data set up for training and prediction.

Source code in framework\FaultIsolation.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def setup(self, data:str) -> pd.DataFrame:
    """Setup the data for training and prediction. This function is called before training the model. 
    Parameters:

        data (str): The data to be used for training and prediction in json format.

    Returns:
        data (pd.DataFrame): The data set up for training and prediction.
    """
    cfg = self.cfg["setup"]

    self.data_format =  cfg.get("format", None)
    data = self.format_data(data, self.data_format)

    self.data = data
    print(self.data)

    self.ds = cfg.get("datetime_column", None)
    self.format = cfg.get("datetime_format", None)
    self.predict_window = cfg.get("predict_window", 0)
    if self.predict_window is None:
        self.predict_window = 0
    if self.predict_window < 0:
        self.predict_window *= -1
    retrain_cfg = cfg.get("retrain", None)

    if retrain_cfg is None or len(retrain_cfg) == 0:
        self.retrain_window = 0
        self.metric = None
        self.metric_threshold = 0.0
        self.higher_better = True
    else:
        self.retrain_window = retrain_cfg.get("retrain_window", 0)
        self.metric = retrain_cfg.get("metric", None)
        self.metric_threshold = retrain_cfg.get("metric_threshold", 0.0)
        self.higher_better = retrain_cfg.get("higher_better", True)


    self.target = cfg["target"]
    # self.target = self.target.replace(" ", "_")   # replace spaces with underscores
    # self.target = self.target.replace("//", "") 
    # self.target = self.target.replace("/", "") 
    # self.target = self.target.replace("(", "")
    # self.target = self.target.replace(")", "")
    # self.target = self.target.replace("\\", "")
    # self.target = self.target.replace(".", "")


    ## TODO: Abstrct data VC -- integrate dvc or other data versioning tools.
    if self.ds is not None:
        self.data = self.convert_datetime(self.data, format = self.format)
        self.data.set_index(self.ds, inplace=True)

    self.data.rename(columns={self.target : "target"}, inplace=True)

    self.data["target"] = self.data["target"].astype("category")
    self.input_scheme = self.data.columns.to_list()
    self.input_scheme.remove("target")

    return data

FaultDetectionExperiment

Bases: Experiment

Source code in framework\FaultDetection.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
class FaultDetectionExperiment(Experiment):
    def __init__(self, cfg:dict, experiment_id:str, run_id:str, *args, **kwargs) -> None:
        """Initialize the model registry. Currently cannot be changed after initialization. In future versions, we will allow for dynamic model loading through configuration files.
        This experiment file is used for fault detection experiments. 
        It is a subclass of the Experiment class in the framework.Experiment module. It is used to create, train, and predict using fault detection models.
        Heavily relies on outlier detection/clustering algorithms.

        Parameters:
            cfg: dict The configuration file for the experiment.
            experiment_id: str The experiment id for the experiment.
            run_id: str The run id for the experiment.

        Implemented models:
        - DBSCAN :  'models.fault_detection.DBSCANAnomalyDetection'
        - Elliptic Envelope  'models.fault_detection.EllipticEnvelopeAnomalyDetection'
        - Isolation Forest  'models.fault_detection.IsolationForestAnomaly'
        - PCA 'models.fault_detection.PCAAnomalyDetection'

        Inherits from Experiment Protocol class from framework.Experiment. 
        If the experiment is overloaded, and new functions are added, one can call it in the system by adding the function name to the cfg file with relevant parameters.
        For example, if a new function 'new_function' is added to the system, the cfg file should have the following structure:
        cfg file should have the following structure:
        ```
        load_object:
            module: framework.FaultIsolation
            name: FaultIsolationExperiment
        setup:
            datetime_column: str The column that contains the datetime information.
            target: str The target column for the experiment. (output column)
        ...
        eda:
        create_model:
            model: str The model to be used for training.
            params: dict The parameters to be used for the model.
        new_function:
            param1: str The first parameter for the function.
            param2: str The second parameter for the function.

        ```
        The function will then be called in the system by calling the 'run' function.
        experiment.run(data, experiment_id, run_id)

        """

        self.cfg = cfg
        self.experiment_id = experiment_id
        self.run_id = run_id
        self._model_registry["dbscan"] = DBSCANAnomalyDetection
        self._model_registry["ee"] = EllipticEnvelopeAnomalyDetection
        self._model_registry["iforest"] = IsolationForestAnomaly
        self._model_registry["pca"] = PCAAnomalyDetection


    def setup(self, data:str) -> pd.DataFrame:
        """Setup the data for training and prediction. This function is called before any other function to set self.data that can be used in any other function. Also returns the data if need be.

        Parameters:
            data (str): The data to be used for training and prediction.

        Returns:
            pd.DataFrame The data that was set for the experiment.
        """
        cfg = self.cfg["setup"]
        self.ds = cfg.get("datetime_column", None)
        self.format = cfg.get("datetime_format", None)
        self.data_format =  cfg.get("format", None)
        data = self.format_data(data, self.data_format)

        self.predict_window = cfg.get("predict_window", 0)
        if self.predict_window is None:
            self.predict_window = 0
        if self.predict_window < 0:
            self.predict_window *= -1

        self.data = data
        if self.ds is not None:
            self.data = self.convert_datetime(self.data, format = self.format)
            self.data.set_index(self.ds, inplace=True) 

        return data

    def create_model(self, *args, **kwargs):
        """Create the model using the configuration file. The model is trained on the data set up in the 'setup' function.
        Returns:
            any The trained model.

        """
        model = self.cfg["create_model"]["model"]
        params = self.cfg["create_model"].get("params", None)
        if params is None:
            params = dict()

        if self.data is None:
            raise ValueError("Data not found. Please use 'setup' to setup the data first.")

        try:
            model_class = self._model_registry[model]
        except KeyError:
            raise ValueError(f"Model {model} not found in model registry. Please check configuration file. Available models are {self._model_registry.keys()}.")

        self.model = model_class(**params).fit(self.data)
        self._report_registry = self.model._figs

        with mlflow.start_run(nested=True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
            mlflow.set_tag("model", model)

            for k, v in self.model._figs.items():
                v.write_html(os.path.join(os.getcwd(),"runs", self.run_id, "reports", f"{k}.html"))
                mlflow.log_artifact(os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"{k}.html"), "reports")

            #mlflow.sklearn.log_model(self.model, "model")
            for k, v in params.items():
                mlflow.log_param(k, v)

        return self.model


    def predict(self, data:str) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """Predict using the trained model. The model should be trained before calling this function.   
        Parameters:
            data (str): The data to be used for prediction.

        Returns:
            Tuple[pd.DataFrame, pd.DataFrame] The predictions made by the model and the metrics calculated for the model.
        """
        data = self.format_data(data, self.data_format)
        if self.model is None:
            raise ValueError("Model not found. Please train the model first with 'create_model'.")  
        X = data.copy()

        if self.ds is not None:
            X = self.convert_datetime(X, format = self.format)
            X.set_index(self.ds, inplace=True)

        y = self.model.predict(X)
        self.new_data = pd.concat([self.new_data, y], axis=0)
        if self.predict_window > self.new_data.shape[0] :
            self.model.update_predict(self.new_data, reset_fig = True, update_fig = True)
        else:
            self.model.update_predict(self.new_data.iloc[-self.predict_window:, :], reset_fig = True, update_fig = True) 

        return y, pd.DataFrame([])

    def _score(self, y, y_hat):
        """Score the model using the predictions. This function is called after the predictions are made.
        Parameters:
            y: pd.Series The actual values.
            y_hat: pd.Series The predicted values.

        Returns:
            pd.DataFrame The metrics calculated for the model.
        """
        logging.log("Scoring function not implemented.")
        return pd.DataFrame([])

__init__(cfg, experiment_id, run_id, *args, **kwargs)

Initialize the model registry. Currently cannot be changed after initialization. In future versions, we will allow for dynamic model loading through configuration files. This experiment file is used for fault detection experiments. It is a subclass of the Experiment class in the framework.Experiment module. It is used to create, train, and predict using fault detection models. Heavily relies on outlier detection/clustering algorithms.

Parameters:
  • cfg (dict) –

    dict The configuration file for the experiment.

  • experiment_id (str) –

    str The experiment id for the experiment.

  • run_id (str) –

    str The run id for the experiment.

Implemented models: - DBSCAN : 'models.fault_detection.DBSCANAnomalyDetection' - Elliptic Envelope 'models.fault_detection.EllipticEnvelopeAnomalyDetection' - Isolation Forest 'models.fault_detection.IsolationForestAnomaly' - PCA 'models.fault_detection.PCAAnomalyDetection'

Inherits from Experiment Protocol class from framework.Experiment. If the experiment is overloaded, and new functions are added, one can call it in the system by adding the function name to the cfg file with relevant parameters. For example, if a new function 'new_function' is added to the system, the cfg file should have the following structure: cfg file should have the following structure:

load_object:
    module: framework.FaultIsolation
    name: FaultIsolationExperiment
setup:
    datetime_column: str The column that contains the datetime information.
    target: str The target column for the experiment. (output column)
...
eda:
create_model:
    model: str The model to be used for training.
    params: dict The parameters to be used for the model.
new_function:
    param1: str The first parameter for the function.
    param2: str The second parameter for the function.

The function will then be called in the system by calling the 'run' function. experiment.run(data, experiment_id, run_id)

Source code in framework\FaultDetection.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def __init__(self, cfg:dict, experiment_id:str, run_id:str, *args, **kwargs) -> None:
    """Initialize the model registry. Currently cannot be changed after initialization. In future versions, we will allow for dynamic model loading through configuration files.
    This experiment file is used for fault detection experiments. 
    It is a subclass of the Experiment class in the framework.Experiment module. It is used to create, train, and predict using fault detection models.
    Heavily relies on outlier detection/clustering algorithms.

    Parameters:
        cfg: dict The configuration file for the experiment.
        experiment_id: str The experiment id for the experiment.
        run_id: str The run id for the experiment.

    Implemented models:
    - DBSCAN :  'models.fault_detection.DBSCANAnomalyDetection'
    - Elliptic Envelope  'models.fault_detection.EllipticEnvelopeAnomalyDetection'
    - Isolation Forest  'models.fault_detection.IsolationForestAnomaly'
    - PCA 'models.fault_detection.PCAAnomalyDetection'

    Inherits from Experiment Protocol class from framework.Experiment. 
    If the experiment is overloaded, and new functions are added, one can call it in the system by adding the function name to the cfg file with relevant parameters.
    For example, if a new function 'new_function' is added to the system, the cfg file should have the following structure:
    cfg file should have the following structure:
    ```
    load_object:
        module: framework.FaultIsolation
        name: FaultIsolationExperiment
    setup:
        datetime_column: str The column that contains the datetime information.
        target: str The target column for the experiment. (output column)
    ...
    eda:
    create_model:
        model: str The model to be used for training.
        params: dict The parameters to be used for the model.
    new_function:
        param1: str The first parameter for the function.
        param2: str The second parameter for the function.

    ```
    The function will then be called in the system by calling the 'run' function.
    experiment.run(data, experiment_id, run_id)

    """

    self.cfg = cfg
    self.experiment_id = experiment_id
    self.run_id = run_id
    self._model_registry["dbscan"] = DBSCANAnomalyDetection
    self._model_registry["ee"] = EllipticEnvelopeAnomalyDetection
    self._model_registry["iforest"] = IsolationForestAnomaly
    self._model_registry["pca"] = PCAAnomalyDetection

create_model(*args, **kwargs)

Create the model using the configuration file. The model is trained on the data set up in the 'setup' function. Returns: any The trained model.

Source code in framework\FaultDetection.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def create_model(self, *args, **kwargs):
    """Create the model using the configuration file. The model is trained on the data set up in the 'setup' function.
    Returns:
        any The trained model.

    """
    model = self.cfg["create_model"]["model"]
    params = self.cfg["create_model"].get("params", None)
    if params is None:
        params = dict()

    if self.data is None:
        raise ValueError("Data not found. Please use 'setup' to setup the data first.")

    try:
        model_class = self._model_registry[model]
    except KeyError:
        raise ValueError(f"Model {model} not found in model registry. Please check configuration file. Available models are {self._model_registry.keys()}.")

    self.model = model_class(**params).fit(self.data)
    self._report_registry = self.model._figs

    with mlflow.start_run(nested=True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
        mlflow.set_tag("model", model)

        for k, v in self.model._figs.items():
            v.write_html(os.path.join(os.getcwd(),"runs", self.run_id, "reports", f"{k}.html"))
            mlflow.log_artifact(os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"{k}.html"), "reports")

        #mlflow.sklearn.log_model(self.model, "model")
        for k, v in params.items():
            mlflow.log_param(k, v)

    return self.model

predict(data)

Predict using the trained model. The model should be trained before calling this function.
Parameters: data (str): The data to be used for prediction.

Returns:
  • Tuple[DataFrame, DataFrame]

    Tuple[pd.DataFrame, pd.DataFrame] The predictions made by the model and the metrics calculated for the model.

Source code in framework\FaultDetection.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def predict(self, data:str) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Predict using the trained model. The model should be trained before calling this function.   
    Parameters:
        data (str): The data to be used for prediction.

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame] The predictions made by the model and the metrics calculated for the model.
    """
    data = self.format_data(data, self.data_format)
    if self.model is None:
        raise ValueError("Model not found. Please train the model first with 'create_model'.")  
    X = data.copy()

    if self.ds is not None:
        X = self.convert_datetime(X, format = self.format)
        X.set_index(self.ds, inplace=True)

    y = self.model.predict(X)
    self.new_data = pd.concat([self.new_data, y], axis=0)
    if self.predict_window > self.new_data.shape[0] :
        self.model.update_predict(self.new_data, reset_fig = True, update_fig = True)
    else:
        self.model.update_predict(self.new_data.iloc[-self.predict_window:, :], reset_fig = True, update_fig = True) 

    return y, pd.DataFrame([])

setup(data)

Setup the data for training and prediction. This function is called before any other function to set self.data that can be used in any other function. Also returns the data if need be.

Parameters:
  • data (str) –

    The data to be used for training and prediction.

Returns:
  • DataFrame

    pd.DataFrame The data that was set for the experiment.

Source code in framework\FaultDetection.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def setup(self, data:str) -> pd.DataFrame:
    """Setup the data for training and prediction. This function is called before any other function to set self.data that can be used in any other function. Also returns the data if need be.

    Parameters:
        data (str): The data to be used for training and prediction.

    Returns:
        pd.DataFrame The data that was set for the experiment.
    """
    cfg = self.cfg["setup"]
    self.ds = cfg.get("datetime_column", None)
    self.format = cfg.get("datetime_format", None)
    self.data_format =  cfg.get("format", None)
    data = self.format_data(data, self.data_format)

    self.predict_window = cfg.get("predict_window", 0)
    if self.predict_window is None:
        self.predict_window = 0
    if self.predict_window < 0:
        self.predict_window *= -1

    self.data = data
    if self.ds is not None:
        self.data = self.convert_datetime(self.data, format = self.format)
        self.data.set_index(self.ds, inplace=True) 

    return data

TimeSeriesAnomalyExperiment

Bases: Experiment

Class for interacting with time series models. Plays a similar role to pycaret experiment. Deals with model training, predicting, metrics and serialization. Inherits from Experiment Protocol class from framework.Experiment. Implemented models: - Autoencoder : 'models.time_series_analysis.Autoencoder' - LSTM : 'models.time_series_analysis.LSTM' - Prophet : 'models.time_series_analysis.Prophet' - SSA : 'models.time_series_analysis.SSA' - ARIMA : 'models.time_series_analysis.ARIMA' - Exponential Smoothing : 'models.time_series_analysis.ExponentialSmoothing'

If the experiment is overloaded, and new functions are added, one can call it in the system by adding the function name to the cfg file with relevant parameters. For example, if a new function 'new_function' is added to the system, the cfg file should have the following structure: cfg file should have the following structure:

load_object:
    module: framework.TimeSeriesAnalysis
    name: TimeSeriesAnomalyExperiment
setup:
    ds : str The column that contains the datetime information.
    y : str The target column for the experiment. (output column)
...
create_model:
    model: str The model to be used for training.
    params: dict The parameters to be used for the model.
...
new_function: # e.g. posterior reconciliation of results.
    param1: str The first parameter for the function.
    param2: str The second parameter for the function.
Source code in framework\TimeSeriesAnalysis.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
class TimeSeriesAnomalyExperiment(Experiment):
    """Class for interacting with time series models. Plays a similar role to pycaret experiment. Deals with model training, predicting, metrics and serialization.
    Inherits from Experiment Protocol class from framework.Experiment.
    Implemented models:
    - Autoencoder : 'models.time_series_analysis.Autoencoder'
    - LSTM :  'models.time_series_analysis.LSTM'
    - Prophet :  'models.time_series_analysis.Prophet'
    - SSA : 'models.time_series_analysis.SSA'
    - ARIMA : 'models.time_series_analysis.ARIMA'
    - Exponential Smoothing : 'models.time_series_analysis.ExponentialSmoothing'


    If the experiment is overloaded, and new functions are added, one can call it in the system by adding the function name to the cfg file with relevant parameters.
    For example, if a new function 'new_function' is added to the system, the cfg file should have the following structure:
    cfg file should have the following structure:

    ```
    load_object:
        module: framework.TimeSeriesAnalysis
        name: TimeSeriesAnomalyExperiment
    setup:
        ds : str The column that contains the datetime information.
        y : str The target column for the experiment. (output column)
    ...
    create_model:
        model: str The model to be used for training.
        params: dict The parameters to be used for the model.
    ...
    new_function: # e.g. posterior reconciliation of results.
        param1: str The first parameter for the function.
        param2: str The second parameter for the function.
    ```
    """

    def __init__(self, cfg:dict, experiment_id:str, run_id:str) -> None:
        """Initialize the model registry. Currently cannot be changed after initialization. In future versions, we will allow for dynamic model loading through configuration files.
        This experiment function is used for loading basic time series anomaly detection models.
        Parameters:
            cfg: dict The configuration file for the experiment.
            experiment_id: str The experiment id for the experiment.
            run_id: str The run id for the experiment.
        """ 

        self.cfg = cfg
        self.experiment_id = experiment_id
        self.run_id = run_id

        self._model_registry["ae"] = Autoencoder
        self._model_registry["lstm"] = LSTM
        self._model_registry["prophet"] = ProphetAnomalyDetection
        self._model_registry["ssa"] = SSAAnomalyDetection
        self._model_registry["arima"] = ARIMAAnomalyDetector
        self._model_registry["es"] = ExponentialSmoothingAnomaly


    def setup(self, data:str, *args, **kwargs):
        """Setup the data for training and prediction. This function is called before training the model. In the future, it will also be used to preprocess the data and prepare it for training.
        Parameters:
            data: pd.DataFrame The data to be used for training and prediction.

        Returns:
            pd.DataFrame The data after processing. This data is used for training and prediction.


        Description:

        - The setup function is used to prepare the data for training and prediction. It is called before the model is trained.
        - The function renames the columns to 'ds' and 'y' for consistency.
        - The function logs the target and datestamp columns to mlflow.
        - The function returns the data after processing.
        """
        cfg = self.cfg["setup"]

        self.data_format =  cfg.get("format", None)
        data = self.format_data(data, self.data_format)

        self.ds = cfg["datetime_column"]
        self.target = cfg["target"]
        self.format = cfg["datetime_format"]
        self.predict_window = cfg.get("predict_window", 0)
        if self.predict_window is None:
            self.predict_window = 0
        if self.predict_window < 0:
            self.predict_window *= -1

        retrain_cfg = cfg.get("retrain", None)

        if retrain_cfg is None or len(retrain_cfg) == 0:
            self.retrain_window = 0
            self.metric = None
            self.metric_threshold = 0.0
            self.higher_better = True
        else:
            self.retrain_window = retrain_cfg.get("retrain_window", 0)
            self.metric = retrain_cfg.get("metric", None)
            self.metric_threshold = retrain_cfg.get("metric_threshold", 0.0)
            self.higher_better = retrain_cfg.get("higher_better", True)


        if self.ds not in data.columns:
            raise ValueError("Datestamp column not found in data. Configuration file must specify datestamp column as ds.")
        if self.target not in data.columns:
            raise ValueError("Target column not found in data. Configuration file must specify target column as target.")


        data = self.convert_datetime(data, format= self.format)
        data.rename(columns = {self.ds : "ds", self.target : "y"}, inplace=True)
        self.data = data.loc[:, ["ds", "y"]]
        self.input_scheme = []
        with mlflow.start_run(nested= True, experiment_id=self.experiment_id) as run:
            mlflow.log_param("target", self.target)
            mlflow.log_param("datetime_column", self.ds)


        return data

    def create_model(self, *args, **kwargs):
        """Create the model using the configuration file. The model is trained on the data set up in the 'setup' function.
        Returns:
            any The trained model.

        Description:    
        - The function creates the model using the configuration file.
        - The function logs the model and parameters to mlflow.
        - The function logs the metrics to mlflow.
        - The function saves the model to disk.
        - The function returns the trained model.
        """

        model = self.cfg["create_model"]["model"]
        params = self.cfg["create_model"].get("params", None)
        if params is None:
            params = dict()
        if self.data is None:
            raise ValueError("Data not found. Please use 'setup' to setup the data first.")
        try:
            model_class = self._model_registry[model]
        except KeyError:
            raise ValueError(f"Model {model} not found in model registry. Please check configuration file. Available models are {self._model_registry.keys()}.")

        self.model = model_class(**params).fit(self.data)
        self._report_registry = self.model._figs
        y = self.model.predict(self.data)

        try:
            self.metrics = self._score(self.data[self.target], y["y_pred"])
            self._report_registry["metrics"] = px.line(self.metrics, x = self.metrics.index, y = self.metrics.columns, markers=True)

        except Exception as e:
            self.metrics = pd.DataFrame([np.NaN, np.NaN, np.NaN, np.NaN], index = ["MSE", "MAE", "R2", "MAPE"]).transpose()
            logging.warn(f"Error calculating metrics: {e}. Model {self.model} may not have y_pred as output.")

        with mlflow.start_run(nested=True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
            mlflow.set_tag("model", model)
            #mlflow.sklearn.save_model(self.model, f"runs/{self.run_id}/model")

            for k, v in params.items():
                mlflow.log_param(k, v)
            for key, value in self.metrics.to_dict().items():
                mlflow.log_metric(key, value[0])

            for k,v in self._report_registry.items():
                v.write_html(os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"{k}_report.html"))
                mlflow.log_artifact(os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"{k}_report.html"), "reports")

        return self.model


    def predict(self, data:str):
        """Predict using the trained model. The model should be trained before calling this function.
        Parameters:
            data str The data to be used for prediction.

        Returns:
            pd.DataFrame The predictions made by the model.

        Description:
        - The function predicts using the trained model.
        - The function updates the predict figures with the new data.
        - The function calculates the metrics for the model.
        - The function logs the metrics to mlflow.
        - The function saves the reports to disk.
        - The function logs the reports to mlflow.
        - The function returns the predictions and metrics.
        """
        data = self.format_data(data, self.data_format)
        if self.model is None:
            raise ValueError("Model not found. Please train the model first with 'create_model'.")  

        data.rename(columns = {self.ds : "ds", self.target : "y"}, inplace=True)
        data = self.convert_datetime(data, format = self.format)
        data = data.loc[:, ["ds", "y"]]
        y = self.model.predict(data)
        self.new_data = pd.concat([self.new_data, y], axis=0)

        if self.predict_window > self.new_data.shape[0] :
            self.model.update_predict(self.new_data, reset_fig = True, update_fig = True)
        else:
            self.model.update_predict(self.new_data.iloc[-self.predict_window:, :], reset_fig = True, update_fig = True) 
        if "y_pred" not in y.columns:
            metrics = pd.DataFrame([np.NaN, np.NaN, np.NaN, np.NaN], index = ["MSE", "MAE", "R2", "MAPE"]).transpose()
            logging.warn("y_pred not found in model output. Please make sure the model has a 'predict' method that returns a DataFrame with 'y_pred' column.")
        else:
            metrics = self._score(data[self.target], y["y_pred"])


        self.metrics = pd.concat([self.metrics, metrics], axis=0, ignore_index=True)
        self._report_registry["metrics"] = px.line(self.metrics, x = self.metrics.index, y = self.metrics.columns, markers=True)

        #self.spc_chart(update_fig=True)
        self._report_registry["predict"].write_html(os.path.join(os.getcwd(), "runs", self.run_id, "reports","predict_report.html"))
        self._report_registry["metrics"].write_html(os.path.join(os.getcwd(), "runs", self.run_id, "reports", "metrics_report.html"))
        with mlflow.start_run(nested=True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
            mlflow.log_artifact(os.path.join(os.getcwd(), "runs", self.run_id, "reports", "predict_report.html"), "reports")
            mlflow.log_artifact(os.path.join(os.getcwd(), "runs", self.run_id, "reports", "metrics_report.html"), "reports")

        return y, metrics


    def _score(self, y, y_hat):
        """Calculate the metrics for the model.
        Parameters:
            y: pd.Series The true values.
            y_hat: pd.Series The predicted values.

        Returns:
            pd.DataFrame The metrics for the model.

        Description:

        - The function calculates the metrics for the model.
        - The function returns:
            - Mean Squared Error
            - Mean Absolute Error
            - R2 Score
            - Mean Absolute Percentage Error
        """


        assert isinstance(y, pd.Series) or isinstance(y, pd.DataFrame), "y must be a pandas Series or DataFrame."
        assert isinstance(y_hat, pd.Series) or isinstance(y_hat, pd.DataFrame), "y_hat must be a pandas Series or DataFrame."

        if isinstance(y, pd.Series):
            y = y.to_frame()
        if isinstance(y_hat, pd.Series):
            y_hat = y_hat.to_frame()

        assert y.shape == y_hat.shape, "y and y_hat must have the same shape."   

        mse = mean_squared_error(y, y_hat)
        mae = mean_absolute_error(y, y_hat)
        r2 = r2_score(y, y_hat)
        mape = mean_absolute_percentage_error(y, y_hat)

        return pd.DataFrame([mse, mae, r2, mape],index = ["MSE", "MAE", "R2", "MAPE"]).transpose()

__init__(cfg, experiment_id, run_id)

Initialize the model registry. Currently cannot be changed after initialization. In future versions, we will allow for dynamic model loading through configuration files. This experiment function is used for loading basic time series anomaly detection models. Parameters: cfg: dict The configuration file for the experiment. experiment_id: str The experiment id for the experiment. run_id: str The run id for the experiment.

Source code in framework\TimeSeriesAnalysis.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(self, cfg:dict, experiment_id:str, run_id:str) -> None:
    """Initialize the model registry. Currently cannot be changed after initialization. In future versions, we will allow for dynamic model loading through configuration files.
    This experiment function is used for loading basic time series anomaly detection models.
    Parameters:
        cfg: dict The configuration file for the experiment.
        experiment_id: str The experiment id for the experiment.
        run_id: str The run id for the experiment.
    """ 

    self.cfg = cfg
    self.experiment_id = experiment_id
    self.run_id = run_id

    self._model_registry["ae"] = Autoencoder
    self._model_registry["lstm"] = LSTM
    self._model_registry["prophet"] = ProphetAnomalyDetection
    self._model_registry["ssa"] = SSAAnomalyDetection
    self._model_registry["arima"] = ARIMAAnomalyDetector
    self._model_registry["es"] = ExponentialSmoothingAnomaly

create_model(*args, **kwargs)

Create the model using the configuration file. The model is trained on the data set up in the 'setup' function. Returns: any The trained model.

Description:
- The function creates the model using the configuration file. - The function logs the model and parameters to mlflow. - The function logs the metrics to mlflow. - The function saves the model to disk. - The function returns the trained model.

Source code in framework\TimeSeriesAnalysis.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def create_model(self, *args, **kwargs):
    """Create the model using the configuration file. The model is trained on the data set up in the 'setup' function.
    Returns:
        any The trained model.

    Description:    
    - The function creates the model using the configuration file.
    - The function logs the model and parameters to mlflow.
    - The function logs the metrics to mlflow.
    - The function saves the model to disk.
    - The function returns the trained model.
    """

    model = self.cfg["create_model"]["model"]
    params = self.cfg["create_model"].get("params", None)
    if params is None:
        params = dict()
    if self.data is None:
        raise ValueError("Data not found. Please use 'setup' to setup the data first.")
    try:
        model_class = self._model_registry[model]
    except KeyError:
        raise ValueError(f"Model {model} not found in model registry. Please check configuration file. Available models are {self._model_registry.keys()}.")

    self.model = model_class(**params).fit(self.data)
    self._report_registry = self.model._figs
    y = self.model.predict(self.data)

    try:
        self.metrics = self._score(self.data[self.target], y["y_pred"])
        self._report_registry["metrics"] = px.line(self.metrics, x = self.metrics.index, y = self.metrics.columns, markers=True)

    except Exception as e:
        self.metrics = pd.DataFrame([np.NaN, np.NaN, np.NaN, np.NaN], index = ["MSE", "MAE", "R2", "MAPE"]).transpose()
        logging.warn(f"Error calculating metrics: {e}. Model {self.model} may not have y_pred as output.")

    with mlflow.start_run(nested=True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
        mlflow.set_tag("model", model)
        #mlflow.sklearn.save_model(self.model, f"runs/{self.run_id}/model")

        for k, v in params.items():
            mlflow.log_param(k, v)
        for key, value in self.metrics.to_dict().items():
            mlflow.log_metric(key, value[0])

        for k,v in self._report_registry.items():
            v.write_html(os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"{k}_report.html"))
            mlflow.log_artifact(os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"{k}_report.html"), "reports")

    return self.model

predict(data)

Predict using the trained model. The model should be trained before calling this function. Parameters: data str The data to be used for prediction.

Returns:
  • pd.DataFrame The predictions made by the model.

Description: - The function predicts using the trained model. - The function updates the predict figures with the new data. - The function calculates the metrics for the model. - The function logs the metrics to mlflow. - The function saves the reports to disk. - The function logs the reports to mlflow. - The function returns the predictions and metrics.

Source code in framework\TimeSeriesAnalysis.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def predict(self, data:str):
    """Predict using the trained model. The model should be trained before calling this function.
    Parameters:
        data str The data to be used for prediction.

    Returns:
        pd.DataFrame The predictions made by the model.

    Description:
    - The function predicts using the trained model.
    - The function updates the predict figures with the new data.
    - The function calculates the metrics for the model.
    - The function logs the metrics to mlflow.
    - The function saves the reports to disk.
    - The function logs the reports to mlflow.
    - The function returns the predictions and metrics.
    """
    data = self.format_data(data, self.data_format)
    if self.model is None:
        raise ValueError("Model not found. Please train the model first with 'create_model'.")  

    data.rename(columns = {self.ds : "ds", self.target : "y"}, inplace=True)
    data = self.convert_datetime(data, format = self.format)
    data = data.loc[:, ["ds", "y"]]
    y = self.model.predict(data)
    self.new_data = pd.concat([self.new_data, y], axis=0)

    if self.predict_window > self.new_data.shape[0] :
        self.model.update_predict(self.new_data, reset_fig = True, update_fig = True)
    else:
        self.model.update_predict(self.new_data.iloc[-self.predict_window:, :], reset_fig = True, update_fig = True) 
    if "y_pred" not in y.columns:
        metrics = pd.DataFrame([np.NaN, np.NaN, np.NaN, np.NaN], index = ["MSE", "MAE", "R2", "MAPE"]).transpose()
        logging.warn("y_pred not found in model output. Please make sure the model has a 'predict' method that returns a DataFrame with 'y_pred' column.")
    else:
        metrics = self._score(data[self.target], y["y_pred"])


    self.metrics = pd.concat([self.metrics, metrics], axis=0, ignore_index=True)
    self._report_registry["metrics"] = px.line(self.metrics, x = self.metrics.index, y = self.metrics.columns, markers=True)

    #self.spc_chart(update_fig=True)
    self._report_registry["predict"].write_html(os.path.join(os.getcwd(), "runs", self.run_id, "reports","predict_report.html"))
    self._report_registry["metrics"].write_html(os.path.join(os.getcwd(), "runs", self.run_id, "reports", "metrics_report.html"))
    with mlflow.start_run(nested=True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
        mlflow.log_artifact(os.path.join(os.getcwd(), "runs", self.run_id, "reports", "predict_report.html"), "reports")
        mlflow.log_artifact(os.path.join(os.getcwd(), "runs", self.run_id, "reports", "metrics_report.html"), "reports")

    return y, metrics

setup(data, *args, **kwargs)

Setup the data for training and prediction. This function is called before training the model. In the future, it will also be used to preprocess the data and prepare it for training. Parameters: data: pd.DataFrame The data to be used for training and prediction.

Returns:
  • pd.DataFrame The data after processing. This data is used for training and prediction.

Description:

  • The setup function is used to prepare the data for training and prediction. It is called before the model is trained.
  • The function renames the columns to 'ds' and 'y' for consistency.
  • The function logs the target and datestamp columns to mlflow.
  • The function returns the data after processing.
Source code in framework\TimeSeriesAnalysis.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def setup(self, data:str, *args, **kwargs):
    """Setup the data for training and prediction. This function is called before training the model. In the future, it will also be used to preprocess the data and prepare it for training.
    Parameters:
        data: pd.DataFrame The data to be used for training and prediction.

    Returns:
        pd.DataFrame The data after processing. This data is used for training and prediction.


    Description:

    - The setup function is used to prepare the data for training and prediction. It is called before the model is trained.
    - The function renames the columns to 'ds' and 'y' for consistency.
    - The function logs the target and datestamp columns to mlflow.
    - The function returns the data after processing.
    """
    cfg = self.cfg["setup"]

    self.data_format =  cfg.get("format", None)
    data = self.format_data(data, self.data_format)

    self.ds = cfg["datetime_column"]
    self.target = cfg["target"]
    self.format = cfg["datetime_format"]
    self.predict_window = cfg.get("predict_window", 0)
    if self.predict_window is None:
        self.predict_window = 0
    if self.predict_window < 0:
        self.predict_window *= -1

    retrain_cfg = cfg.get("retrain", None)

    if retrain_cfg is None or len(retrain_cfg) == 0:
        self.retrain_window = 0
        self.metric = None
        self.metric_threshold = 0.0
        self.higher_better = True
    else:
        self.retrain_window = retrain_cfg.get("retrain_window", 0)
        self.metric = retrain_cfg.get("metric", None)
        self.metric_threshold = retrain_cfg.get("metric_threshold", 0.0)
        self.higher_better = retrain_cfg.get("higher_better", True)


    if self.ds not in data.columns:
        raise ValueError("Datestamp column not found in data. Configuration file must specify datestamp column as ds.")
    if self.target not in data.columns:
        raise ValueError("Target column not found in data. Configuration file must specify target column as target.")


    data = self.convert_datetime(data, format= self.format)
    data.rename(columns = {self.ds : "ds", self.target : "y"}, inplace=True)
    self.data = data.loc[:, ["ds", "y"]]
    self.input_scheme = []
    with mlflow.start_run(nested= True, experiment_id=self.experiment_id) as run:
        mlflow.log_param("target", self.target)
        mlflow.log_param("datetime_column", self.ds)


    return data

ProcessMiningExperiment

Bases: Experiment

This experiment file is used for process mining experiments. It is a subclass of the Experiment class in the framework.Experiment module. It is used to create, train, and predict using process mining models. Heavily relies on sequence mining algorithms.

Implemented models: - Apriori : 'models.spmf.Apriori' - CMSPAM : 'models.spmf.CM_SPAM' - TopKRules :'models.spmf.TopKRules' - Heuristics Miner : 'models.spmf.HeuristicsMiner' Inherits from Experiment Protocol class from framework.Experiment. If the experiment is overloaded, and new functions are added, one can call it in the system by adding the function name to the cfg file with relevant parameters. For example, if a new function 'new_function' is added to the system, the cfg file should have the following structure: cfg file should have the following structure:

load_object:
    module: framework.ProcessMining    
    name: ProcessMiningExperiment
setup:
...
create_model:
    model: str The model to be used for training.
    params: dict The parameters to be used for the model.
new_function:
    param1: str The first parameter for the function.
    param2: str The second parameter for the function.
...
Source code in framework\ProcessMining.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class ProcessMiningExperiment(Experiment):
    """ This experiment file is used for process mining experiments.
    It is a subclass of the Experiment class in the framework.Experiment module. It is used to create, train, and predict using process mining models.
    Heavily relies on sequence mining algorithms.

    Implemented models:
    - Apriori : 'models.spmf.Apriori'
    - CMSPAM :  'models.spmf.CM_SPAM'
    - TopKRules :'models.spmf.TopKRules'
    - Heuristics Miner : 'models.spmf.HeuristicsMiner'
    Inherits from Experiment Protocol class from framework.Experiment.
    If the experiment is overloaded, and new functions are added, one can call it in the system by adding the function name to the cfg file with relevant parameters.
    For example, if a new function 'new_function' is added to the system, the cfg file should have the following structure:
    cfg file should have the following structure:
    ```
    load_object:
        module: framework.ProcessMining    
        name: ProcessMiningExperiment
    setup:
    ...
    create_model:
        model: str The model to be used for training.
        params: dict The parameters to be used for the model.
    new_function:
        param1: str The first parameter for the function.
        param2: str The second parameter for the function.
    ...
    ```


    """


    def __init__(self, cfg:dict,  experiment_id:str, run_id:str, * args, **kwargs) -> None:
        """Initialize the model registry. Currently cannot be changed after initialization.
        Parameters:
            cfg: dict The configuration file for the experiment.
            experiment_id: str The experiment id for the experiment.
            run_id: str The run id for the experiment.
        """

        self.cfg = cfg
        self.experiment_id = experiment_id
        self.run_id = run_id
        self._model_registry["apriori"] = Apriori
        self._model_registry["cmspam"] = CMSPAM
        self._model_registry["topk"] = TopKRules
        self._model_registry["heuristics"] = HeuristicsMiner


    def setup(self, data:pd.DataFrame):
        """Setup the data for training and prediction. This function is called before training the model. Data must be in pandas DataFrame format - can be a columns, with several rows. 

        Parameters:
            data: pd.DataFrame The data to be used for training and prediction.

        Returns:
            pd.DataFrame The data after processing. This data is used for training and prediction    """

        cfg = self.cfg["setup"]
        self.data_format =  cfg.get("format", None)
        data = self.format_data(data, self.data_format)
        # col_names = dict("Start_timestamp" : "start:timestamp",
        #                 "End_timestamp" : "time:timestamp",
        #                 "Event" : "concept:name",
        #                 "Case_id" : "case:concept:name",
        #                 "Resource" : "org:resource",
        #                 "Ordered" : "Ordered",
        #                 "Completed" : "Completed",
        #                 "Rejected" : "Rejected",
        #                 "MRB" : "MRB",
        #                 "Part" : "Part")
        #data.rename(columns=col_names, inplace=True)

        #TODO : excavate a log ...

        self.data = data#.drop(columns=[self.ds])
        return data

    def create_model(self, *args, **kwargs)->any:
        """Create the model using the configuration file. The model is trained on the data set up in the 'setup' function.
        Returns:
            any The trained model.
        """
        model = self.cfg["create_model"]["model"]
        params = self.cfg["create_model"].get("params", None)
        if params is None:
            params = dict()

        if self.data is None:
            raise ValueError("Data not found. Please use 'setup' to setup the data first.")

        try:
            model_class = self._model_registry[model]
        except KeyError:
            raise ValueError(f"Model {model} not found in model registry. Please check configuration file. Available models are {self._model_registry.keys()}.")

        ## data.to_csv()

        self.model = model_class(**params).fit(self.data)
        self._report_registry = self.model._figs

        with mlflow.start_run(nested=True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
            mlflow.set_tag("model", model)
            for k, v in self.model._figs.items():
                v.write_html(os.path.join(os.getcwd(),"runs", self.run_id, "reports", f"{k}.html"))
                mlflow.log_artifact(os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"{k}.html"), "reports")


            for k, v in params.items():
                mlflow.log_param(k, v)

        return self.model


    def predict(self, data:pd.DataFrame):
        """Predict using the trained model. The model should be trained before calling this function.
        Parameters:
            data: pd.DataFrame The data to be used for prediction.

        Returns:
            pd.DataFrame The predictions made by the model.
        """
        if self.model is None:
            raise ValueError("Model not found. Please train the model first with 'create_model'.")  
        X = data.copy()
        y = self.model.predict(X)
        return y, pd.DataFrame([])

    def _score(self, y, y_hat):
        """Score the model using the predictions. This function is called after the predictions are made.
        Parameters:
            y: pd.Series The actual values.
            y_hat: pd.Series The predicted values.
        Returns:
            pd.DataFrame The scores for the model."""

        if self.model is None:
            raise ValueError("Model not found. Please train the model first with 'create_model'.")
        if y is None:
            raise ValueError("No data found for scoring. Please provide data for scoring.")
        if y_hat is None:
            raise ValueError("No predictions found for scoring. Please provide predictions for scoring.")

        return pd.DataFrame([])

__init__(cfg, experiment_id, run_id, *args, **kwargs)

Initialize the model registry. Currently cannot be changed after initialization. Parameters: cfg: dict The configuration file for the experiment. experiment_id: str The experiment id for the experiment. run_id: str The run id for the experiment.

Source code in framework\ProcessMining.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(self, cfg:dict,  experiment_id:str, run_id:str, * args, **kwargs) -> None:
    """Initialize the model registry. Currently cannot be changed after initialization.
    Parameters:
        cfg: dict The configuration file for the experiment.
        experiment_id: str The experiment id for the experiment.
        run_id: str The run id for the experiment.
    """

    self.cfg = cfg
    self.experiment_id = experiment_id
    self.run_id = run_id
    self._model_registry["apriori"] = Apriori
    self._model_registry["cmspam"] = CMSPAM
    self._model_registry["topk"] = TopKRules
    self._model_registry["heuristics"] = HeuristicsMiner

create_model(*args, **kwargs)

Create the model using the configuration file. The model is trained on the data set up in the 'setup' function. Returns: any The trained model.

Source code in framework\ProcessMining.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def create_model(self, *args, **kwargs)->any:
    """Create the model using the configuration file. The model is trained on the data set up in the 'setup' function.
    Returns:
        any The trained model.
    """
    model = self.cfg["create_model"]["model"]
    params = self.cfg["create_model"].get("params", None)
    if params is None:
        params = dict()

    if self.data is None:
        raise ValueError("Data not found. Please use 'setup' to setup the data first.")

    try:
        model_class = self._model_registry[model]
    except KeyError:
        raise ValueError(f"Model {model} not found in model registry. Please check configuration file. Available models are {self._model_registry.keys()}.")

    ## data.to_csv()

    self.model = model_class(**params).fit(self.data)
    self._report_registry = self.model._figs

    with mlflow.start_run(nested=True, experiment_id=self.experiment_id, run_id=self.run_id) as run:
        mlflow.set_tag("model", model)
        for k, v in self.model._figs.items():
            v.write_html(os.path.join(os.getcwd(),"runs", self.run_id, "reports", f"{k}.html"))
            mlflow.log_artifact(os.path.join(os.getcwd(), "runs", self.run_id, "reports", f"{k}.html"), "reports")


        for k, v in params.items():
            mlflow.log_param(k, v)

    return self.model

predict(data)

Predict using the trained model. The model should be trained before calling this function. Parameters: data: pd.DataFrame The data to be used for prediction.

Returns:
  • pd.DataFrame The predictions made by the model.

Source code in framework\ProcessMining.py
134
135
136
137
138
139
140
141
142
143
144
145
146
def predict(self, data:pd.DataFrame):
    """Predict using the trained model. The model should be trained before calling this function.
    Parameters:
        data: pd.DataFrame The data to be used for prediction.

    Returns:
        pd.DataFrame The predictions made by the model.
    """
    if self.model is None:
        raise ValueError("Model not found. Please train the model first with 'create_model'.")  
    X = data.copy()
    y = self.model.predict(X)
    return y, pd.DataFrame([])

setup(data)

Setup the data for training and prediction. This function is called before training the model. Data must be in pandas DataFrame format - can be a columns, with several rows.

Parameters:
  • data (DataFrame) –

    pd.DataFrame The data to be used for training and prediction.

Returns:
  • pd.DataFrame The data after processing. This data is used for training and prediction

Source code in framework\ProcessMining.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def setup(self, data:pd.DataFrame):
    """Setup the data for training and prediction. This function is called before training the model. Data must be in pandas DataFrame format - can be a columns, with several rows. 

    Parameters:
        data: pd.DataFrame The data to be used for training and prediction.

    Returns:
        pd.DataFrame The data after processing. This data is used for training and prediction    """

    cfg = self.cfg["setup"]
    self.data_format =  cfg.get("format", None)
    data = self.format_data(data, self.data_format)
    # col_names = dict("Start_timestamp" : "start:timestamp",
    #                 "End_timestamp" : "time:timestamp",
    #                 "Event" : "concept:name",
    #                 "Case_id" : "case:concept:name",
    #                 "Resource" : "org:resource",
    #                 "Ordered" : "Ordered",
    #                 "Completed" : "Completed",
    #                 "Rejected" : "Rejected",
    #                 "MRB" : "MRB",
    #                 "Part" : "Part")
    #data.rename(columns=col_names, inplace=True)

    #TODO : excavate a log ...

    self.data = data#.drop(columns=[self.ds])
    return data