Configuring and Executing SageMaker Training Job

Overview

This guide will cover how to configure a SageMaker Training pipeline, execute a training job on SageMaker through the SageMaker Training API service, and access metrics and artifacts via MLflow.

In this guide, you will leverage 2 pipelines, a PySpark data delivery pipeline to upload data to an S3 bucket and a SageMaker training pipeline to train a model on AWS SageMaker.

This guide will utilize the Iris dataset for the example. The dataset consists of 50 samples from each of three species of Iris (Iris setosa, Iris virginica and Iris versicolor). Four features were measured from each sample: the length and the width of the sepals and petals, in centimeters. This guide will walk through training a simple neural network to predict the Iris species based on these measurements using the TensorFlow library.

Configuration

This section will cover configuring the AWS credentials and project setup. You will need to have access to Elastic Container Registry (ECR), Simple Storage Service (S3) bucket, and SageMaker on AWS.

AWS Setup

Please see the AWS Credentials and AWS Elastic Container Registry Setup section of the Machine Learning Pipelines page for instructions detailing the AWS credentials setup and ECR repository creation.

You will also need to create an S3 bucket that will store the data needed for SageMaker training.

You will need your AWS credentials throughout the configuration of the PySpark and SageMaker Training pipelines.

AWS Authentication Setup For Pyspark Pipelines

When constructing a PySpark pipeline to prepare data for SageMaker model training, you must configure AWS credentials so that the data can be written to AWS S3. In order to read from or write to AWS S3 from a data delivery pipeline, you must configure AWS credentials in your <project-name>/<project-name>-pipelines/<pyspark-pipeline-name-dashed>/src/<pyspark-pipeline-name-snake-cased>/resources/apps/<pyspark-pipeline-name-dashed>-base-values.yaml file. Specifically, within spec.hadoopConf, add the following configuration:

    fs.s3a.aws.credentials.provider: "com.amazonaws.auth.EnvironmentVariableCredentialsProvider"
    fs.s3a.endpoint: "s3.amazonaws.com"
    mapreduce.fileoutputcommitter.marksuccessfuljobs: "false"

In this code example, the configuration tells the Spark application that when it interacts with AWS S3 using the S3A filesystem, it should use the EnvironmentVariableCredentialsProvider to fetch the necessary AWS credentials from environment variables.

Ensure that your AWS credentials, specifically AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY are configured in the environment variables. Additionally, configure the AWS_SESSION_TOKEN environment variable if applicable.

In your <project-name>/<project-name>-pipelines/<pyspark-pipeline-name-dashed>/src/<pyspark-pipeline-name-snake-cased>/resources/apps/<pyspark-pipeline-name-dashed>-base-values.yaml file, you’ll need to set the environment variables for both the Spark driver and Spark executor. Please note that AWS credentials should never be committed to GitHub repositories. To avoid this, for both the executor environment (spec.executor.env) and the driver environment (spec.executor.env), you’ll need to enter the following information:

    AWS_ACCESS_KEY_ID: "<Your-Access-Key>"
    AWS_SECRET_ACCESS_KEY: "<Your-Secret-Key>"
    AWS_SESSION_TOKEN: "<Your-Session-Token-if-Applicable>"

Project Setup

After generating the aiSSEMBLE project, create a PySpark pipeline with a single step named "Ingest" and a Machine Learning pipeline with a single sagemaker-training step. Please see PySpark Data Delivery Pipeline Patterns for instructions on creating a PySpark pipeline and Machine Learning Pipeline Patterns for instructions on creating a Machine Learning pipeline.

Data Preparation and Uploading to S3

This section will cover prepping the Iris data for training by splitting the data into train, validate, test datasets and uploading the partitioned data to your S3 bucket.

Data Preparation

In order to prepare data for SageMaker training and upload data to S3, you must configure AWS Authentication. To start the data preparation, you will need to import the packages needed for loading and manipulating the data within <project-name>/<project-name>-pipelines/<pyspark-pipeline-name-dashed>/src/<pyspark-pipeline-name-snake-cased>/step/ingest.py. You will use the sklearn.datasets package to retrieve the data and the numpy package to apply data manipulation.

ingest.py
from sklearn.datasets import load_iris
import numpy as np
You will need to include the scikit-learn, numpy, and pandas packages within: <project-name>/<project-name>-pipelines/<pyspark-pipeline-name-dashed>/pyproject.toml

For this guides’s example, within the provided execute_step_impl function, you will need to include the logic to load the Iris data, concat the feature columns and target column along the second axis, and split the data into train, validation, and test datasets using 80% of the data for train, 10% of the data for validation, and 10% of the data for test.

ingest.py
def execute_step_impl(self) -> None:
    """
    This method performs the business logic of this step.
    """
    iris = load_iris()
    df = self.spark.createDataFrame(np.c_[iris.data, iris.target], iris.feature_names + ["species"])
    train_df, val_df, test_df = df.randomSplit([0.8, 0.1, 0.1])

    # Write train, val, and test datasets as .csv files to S3:
    train_df.write.format("csv").option("header", "true").mode("overwrite").save(
        f"s3a://{S3_BUCKET}/{SOME_PREFIX}/train"
    )
    val_df.write.format("csv").option("header", "true").mode("overwrite").save(
        f"s3a://{S3_BUCKET}/{SOME_PREFIX}/val"
    )
    test_df.write.format("csv").option("header", "true").mode("overwrite").save(
        f"s3a://{S3_BUCKET}/{SOME_PREFIX}/test"
    )

The S3_BUCKET and SOME_PREFIX variables must be configured within the script to point to the S3 bucket and folder within that bucket where your data will be written.

Training Your Model

This section will cover configuring the provided functions for the training job. You will fit your neural network model with the training data, apply hyperparameters, and save the model to an S3 bucket.

Train Model

To train the model, you will need to first import the TensorFlow and Pandas packages within <project-name>/<project-name>-pipelines/<machine-learning-pipeline-name-dashed>/<sagemaker-step-name-dashed>/src/<sagemaker-step-name-snake-cased>/<training_name>.py.

import tensorflow as tf
import pandas as pd
You will need to include the tensorflow and pandas packages within: <project-name>/<project-name>-pipelines/<machine-learning-pipeline-name-dashed>/<sagemaker-step-name-dashed>/pyproject.toml

You will then need to load the train, validation, and test data within their respective functions. For this guide, we’ll leverage a Pandas dataframe to load the data within the load_train_data function:

def load_train_data(train_data_dir):
    """Load training data from file(s) here
    Args:
        train_data_dir: training data file(s) will be available in this directory
    Returns:
        loaded training data
    """
    all_files = os.listdir(train_data_dir)
    dfs = []

    for filename in all_files:
        filepath = os.path.join(train_data_dir, filename)
        df = pd.read_csv(filepath, index_col=None, header=0)
        dfs.append(df)

    train_data = pd.concat(dfs, axis=0, ignore_index=True)
    return train_data

Again, please note that you will need to implement similar logic for the load_validation_data and load_test_data functions.

To leverage the validation dataset, you can set the validation_data parameter in the model.fit function to evaluate the model on the validation dataset after each pass over the training set. Within our train_model function, we define our neural network model and kick off training:

def train_model(train_data, validation_data, hyperparameters):
    """Train model based on user-provided hyperparameters, evaluate against validation dataset after each epoch
    Args:
        train_data: loaded training dataset
        validation_data: loaded validation dataset
        hyperparameters: dictionary containing user-provided hyperparameters
    Returns:
        trained model
    """
    model = tf.keras.Sequential(
        [
            tf.keras.layers.Dense(10, activation="relu"),
            tf.keras.layers.Dense(10, activation="relu"),
            tf.keras.layers.Dense(3, activation="softmax"),
        ]
    )

    model.compile(
        optimizer=hyperparameters["optimizer"],
        loss="categorical_crossentropy",
        metrics=["accuracy"],
    )

    model.fit(
        train_data.drop("species", axis=1),
        pd.get_dummies(train_data["species"]),
        batch_size=50,
        epochs=100,
        validation_data=(
            validation_data.drop("species", axis=1),
            pd.get_dummies(validation_data["species"]),
        ),
    )
    return model

Evaluate Model

To test the model after training completes, we will first import some helper functions from the scikit-learn package. We’ll use them to evaluate the accuracy score and F1 score achieved by the trained model on the test dataset.

import numpy as np
import mlflow.keras
from sklearn.metrics import accuracy_score, f1_score
You will need to include the scikit-learn and numpy packages within: <project-name>/<project-name>-pipelines/<machine-learning-pipeline-name-dashed>/<sagemaker-step-name-dashed>/pyproject.toml

You then need to modify the test_model function to predict on the test dataset and print out the metrics. Your function should resemble:

def test_model(model, test_data):
    """Evaluate model against test data
    Args:
        test_data: loaded test dataset
    Returns:
        resulting metrics
    """
    y_pred = model.predict(test_data.drop("species", axis=1))

    test_acc = accuracy_score(test_data["species"], np.argmax(y_pred, axis=1))
    test_f1 = f1_score(test_data["species"], np.argmax(y_pred, axis=1))

    print(f"test_accuracy={test_acc};", flush=True)
    print(f"test_f1={test_f1};", flush=True)

Save Model

To save the model, we’ll utilize the MLflow Model wrapper. This is not explicitly required, but the benefits of this approach are detailed in the Notes/Updates column of Table 3. The mlflow package is already imported within the provide training template.

In the save_model function, we’ll use the the mlflow.keras flavor to save out the model to the required location:

def save_model(model, output_dir):
    """Save model artifacts to disk
    Args:
        model: trained model
        output_dir: directory in which to save model artifacts
    """
    mlflow.keras.save_model(model, output_dir)

Utilizing SageMaker Training API Service

This section will cover how to use the SageMaker Training API service to start a SageMaker Training Job. Please see SageMaker Model Training API for all available routes.

Before you can access the API Service, there are 2 areas where AWS Credentials will need to be updated:

  • <project-name>/<project-name>-deploy/src/main/resources/apps/model-training-api-SageMaker/values.yaml

  • <project-name>/<project-name>-deploy/src/main/resources/apps/mlflow-ui/values.yaml.

After the credentials are added, you are ready to deploy your project. You will need to run the command ./mvnw clean install for your image to be pushed to ECR. To deploy the local components of your project, run the command tilt up.

Submitting a SageMaker Training Job

To submit a SageMaker Training Job, you will need to set the parameters of the POST request body. The required parameters can be found under the POST /sagemaker-training-jobs from the SageMaker Model Training API documentation.

Using a cURL request, your request should resemble:

curl --location 'http://127.0.0.1:5001/sagemaker-training-jobs' \
--header 'Content-Type: application/json' \
--data '{
    "image_uri": "<image_URI>",
    "hyperparameters": {
        "optimizer": "adam"
    },
    "instance_type": "<instance_type>",
    "bucket": "<S3_bucket_URI>",
    "prefix": "<path_to_data>",
    "metric_definitions": {
        "Loss": "loss: (.*?) ",
        "Validation Loss": "val_loss: (.*?) ",
        "Test Accuracy": "test_accuracy=(.*?);",
        "Test F1": "test_f1=(.*?);"
    }
}'

For this guide, the hyperparameters parameter will contain the optimizer hyperparameters for the model and the metric_definitions parameter should be set to a RegEx patterns that correctly parse the metrics output we defined in our train_model function.

After running this cURL command, the model training job name should be returned, which can subsequently be used to view the status of the job.

Viewing Status

To see the status of the SageMaker training job, you will need to submit a GET request with the model training job name using the cURL command:

curl --location 'http://127.0.0.1:5001/<model_training_job_name>'

Viewing Metrics and Artifacts on MLflow

To view the metrics, you can visit the MLflow UI by opening up 127.0.0.1:5005 in your browser. The model training job will show up in the Experiments tab. Clicking into the Experiment Run will allow you to view the metrics during t raining, and ultimately the resulting model artifacts generated by SageMaker Training Job.