Azure

Building Azure ML Pipelines using the Azure Machine Learning SDK

# Building Azure ML Pipelines using the Azure Machine Learning SDK

The Azure Machine Learning SDK is a powerful tool that allows data scientists and AI developers to interact with Azure Machine Learning services in any Python environment. With this SDK, users can manage datasets, train models using cloud resources, and deploy trained models as web services.

In this article, we will explore the process of building a pipeline for training and modeling using the Azure ML SDK.

## Log in to Workspace

To get started, we need to log in to the workspace using the Azure ML Python SDK. This requires authentication with Azure. The code snippet below imports the necessary packages and loads the workspace from the saved config file:

“`
import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print(‘Ready to use Azure ML {} to work with {}’.format(azureml.core.VERSION, ws.name))
“`

## Default Datastore

Datastores in Azure ML allow users to connect data from various Azure Storage services to their Azure ML Workspace. In our case, we will be using the default datastore, which is attached to a storage account created when provisioning the Azure ML Workspace. The code snippet below sets the default datastore:

“`
# Set Default Datastore
datastore = ws.get_default_datastore()

print(‘The default datastore has been saved to a variable.’)
“`

## Select Compute

A compute cluster is required to process the tasks during each step of the pipeline. In our case, a compute cluster has already been created. The code snippet below selects the existing compute cluster:

“`
# Select the compute cluster target
from azureml.core.compute import ComputeTarget

cpu_cluster = ComputeTarget(workspace=ws, name=”automl-compute”)

print(“Found compute cluster!”)
“`

## Building an Azure ML Pipeline

Azure ML Pipelines allow us to split up machine learning workflows into different steps. This modular approach allows for easier collaboration among team members and the ability to use different compute resources for different steps. In our example, we will build a pipeline with the following steps for training an Iris classification model:

1. Ingest Iris data from a URL
2. Preprocess Iris data and split it into test and training samples
3. Train the model using the preprocessed data
4. Evaluate the model and determine the accuracy
5. Deploy the model as a web service

By splitting up the workflow into different pipeline steps, we can easily scale the workflow to handle larger datasets and allow multiple team members to manage separate parts of the workflow.

## Creating the Source Directories

Each pipeline step requires its own Python script to perform the desired actions. These script files and any dependencies are stored in source directories. It is best practice to use separate folders for each source directory to reduce the size of each snapshot. The code snippet below creates the necessary source directories:

“`
import os

# Create the source directory for each pipeline step
source_directory_ingest=”data_dependency_run_ingest”
source_directory_preprocess=”data_dependency_run_preprocess”
source_directory_train = ‘data_dependency_run_train’
source_directory_evaluate=”data_dependency_run_evaluate”
source_directory_deploy = ‘data_dependency_run_deploy’

if not os.path.exists(source_directory_ingest):
os.makedirs(source_directory_ingest)
if not os.path.exists(source_directory_preprocess):
os.makedirs(source_directory_preprocess)
if not os.path.exists(source_directory_train ):
os.makedirs(source_directory_train)
if not os.path.exists(source_directory_evaluate):
os.makedirs(source_directory_evaluate)
if not os.path.exists(source_directory_deploy):
os.makedirs(source_directory_deploy)

print(‘The source directories have been created.’)
“`

## Creating Scripts in the Source Directories

Now that the source directories are created, we need to create the scripts for each pipeline step and place them in their respective source directory folders. Each script contains arguments that are used to pass in the directory information between each step. The code snippets below show the creation of each script:

### Ingest Script

The `ingest.py` script downloads the Iris data from a URL and saves it to a folder on the datastore.

### Preprocess Script

The `preprocess.py` script takes the ingested Iris data, preprocesses it, and splits it into separate train and test sets.

### Train Script

The `train.py` script takes the preprocessed data and trains the model.

### Evaluation Script

The `evaluate.py` script evaluates the trained model and determines its accuracy.

### Deploy Script

The `deploy.py` script deploys the trained model as a web service endpoint.

### Score Script

The `score.py` script runs when the web service is deployed. It contains the logic for retrieving the registered model and running predictions against incoming data.

## Passing Data Between Pipeline Steps

Pipeline steps can have input and output data. For our pipeline, we will pass data between steps using PipelineData objects. These objects represent data that either already exists or is output by a previous step. In our case, the first step of the pipeline ingests the Iris data from a URL and outputs it to a directory on the default datastore. This directory is then passed to the preprocessing step as an input. The code snippet below creates the necessary PipelineData objects:

“`
from azureml.pipeline.core import PipelineData
from azureml.data.data_reference import DataReference

# Get datastore reference
datastore_reference = DataReference(datastore, mode=”mount”)

# Create Pipeline Data
iris_data_dir = PipelineData(
name=”iris_data_dir”,
pipeline_output_name=”iris_data_dir”,
datastore=datastore_reference.datastore,
output_mode=”mount”,
is_directory=True)

print(‘The iris data PipelineObject has been created.’)

# Create Pipeline Data for remaining steps
train_dir = PipelineData(
name=”train_dir”,
pipeline_output_name=”train_dir”,
datastore=datastore_reference.datastore,
output_mode=”mount”,
is_directory=True)

output_dir = PipelineData(
name=”output_dir”,
pipeline_output_name=”outputdir”,
datastore=datastore_reference.datastore,
output_mode=”mount”,
is_directory=True)

accuracy_dir = PipelineData(
name=”accuracy_dir”,
pipeline_output_name=”accuracydir”,
datastore=datastore_reference.datastore,
output_mode=”mount”,
is_directory=True)

model_dir = PipelineData(
name=”model_dir”,
pipeline_output_name=”modeldir”,
datastore=datastore_reference.datastore,
output_mode=”mount”,
is_directory=True)

test_dir = PipelineData(
name=”test_dir”,
pipeline_output_name=”test_dir”,
datastore=datastore_reference.datastore,
output_mode=”mount”,
is_directory=True)

print(‘The remaining PipelineObjects have been created.’)
“`

## Set up RunConfiguration

The RunConfiguration object contains the information needed to submit a training run in the experiment. For our run, we need the Scikit-Learn package, which will be accessible during the experiment run. The code snippet below configures the RunConfiguration:

“`
from azureml.core.runconfig import RunConfiguration, DockerConfiguration
from azureml.core.environment import CondaDependencies

# Configure the conda dependancies for the Run
conda_dep = CondaDependencies()
conda_dep.add_conda_package(“scikit-learn==0.24.2”)
conda_dep.add_conda_package(“pandas==0.25.3”)
docker_configuration = DockerConfiguration(use_docker=False)
run_config = RunConfiguration(conda_dependencies=conda_dep)
run_config.docker = docker_configuration

print(‘Run configuration has been created.’)
“`

## Defining the Pipeline Steps

Once the necessary PipelineData objects and RunConfiguration are set up, we can define our pipeline steps. We will be using the PythonScriptStep, which allows us to execute Python scripts as pipeline steps. The code snippet below defines the ingestion and preprocessing steps:

“`
import os
from azureml.pipeline.steps import PythonScriptStep

# The URL for the Iris data that will be ingested in the first step of the pipeline
url = “https://gist.githubusercontent.com/cristofima/b4deb0c8435d919d769f0a9a57f740a0/raw/37b1fa4c5ffb2dd4b23d9f958f35de96fc57e71c/iris.csv”

# Pipeline Steps
ingestion_step = PythonScriptStep(
script_name=”ingest.py”,
arguments=[‘–iris_data_dir’, iris_data_dir, ‘–urls’, url],
inputs=[datastore_reference],
outputs=[iris_data_dir],
compute_target=cpu_cluster,
source_directory=source_directory_ingest,
runconfig=run_config,
allow_reuse=True
)

preprocess_step = PythonScriptStep(
script_name=”preprocess.py”,
arguments=[‘–iris_data_dir’, iris_data_dir, ‘–train_dir’, train_dir,’–test_dir’, test_dir],
inputs=[iris_data_dir],
outputs=[train_dir, test_dir],
compute_target=cpu_cluster,
source_directory=source_directory_preprocess,
runconfig=run_config,
allow_reuse=True
)

print(‘The ingestion and preprocess pipelines have been created.’)
“`

Add another step for the training:

“`
# Create training pipeline step

train_step = PythonScriptStep(
script_name=”train.py”,
arguments=[‘–train_dir’, train_dir, ‘–output_dir’, model_dir],
inputs=[train_dir],
outputs=[model_dir],
compute_target=cpu_cluster,
source_directory=source_directory_train,
runconfig=run_config,
allow_reuse=False
)
print(‘The training pipeline has been created.’)
“`

Create the remaining steps for evaluating and deploying the model:

“`
# Create the evaluate and deploy pipeline steps

evaluate_step = PythonScriptStep(
script_name=”evaluate.py”,
arguments=[‘–model_dir’, model_dir,’–test_dir’, test_dir, ‘–accuracy_dir’, accuracy_dir],
inputs=[test_dir,model_dir],
outputs=[accuracy_dir],
compute_target=cpu_cluster,
source_directory=source_directory_evaluate,
runconfig=run_config,
allow_reuse=True
)

deploy_step = PythonScriptStep(
script_name=”deploy.py”,
arguments=[‘–model_dir’, model_dir, ‘–accuracy_dir’, accuracy_dir,’–test_dir’, test_dir],
inputs=[test_dir,accuracy_dir,model_dir],
outputs=[output_dir],
compute_target=cpu_cluster,
source_directory=source_directory_deploy,
runconfig=run_config,
allow_reuse=True
)

print(‘The evaluate and deploy pipelines have been created.’)
“`

## Run Pipeline

Now that our pipeline steps are defined, we can submit the pipeline for execution. The code snippet below submits the pipeline:

“`
from azureml.pipeline.core import Pipeline
from azureml.core import Experiment

# Submit the pipeline
print(‘Submitting pipeline …’)
pipeline = Pipeline(workspace=ws, steps=[ingestion_step, preprocess_step, train_step, evaluate_step, deploy_step])
pipeline_run = Experiment(ws, ‘iris_pipeline’).submit(pipeline)
“`

To view the progress of the pipeline, you can either click the link to the Azure Machine Learning Portal generated by the code snippet or run the azureml widget in the Jupyter Notebook:

“`
# Show run details
from azureml.widgets import RunDetails
r = RunDetails(pipeline_run)
r.get_widget_data()
r.show()
“`

## Test the Endpoint

After the web service is deployed, it can be tested by sending a series of sepal and petal measurements to the URI. The web service will take the data, run predictions using the trained model, and return the classification prediction. The code snippet below demonstrates a simple test:

“`
import urllib.request, urllib.error
import json
from azureml.core.webservice import Webservice

# Iris petal and sepal measurements
rawdata = {“data”: [
6.7,
3.0,
5.2,
2.3
] }

# Get the URL of the web service
service = Webservice(workspace=ws, name=”iris-classification-service”)
url = service.scoring_uri

# Send data to web service
body = str.encode(json.dumps(rawdata))

headers = {‘Content-Type’:’application/json’, ‘Authorization’:(‘Bearer ‘)}
req = urllib.request.Request(url, body, headers)

try:
response = urllib.request.urlopen(req)
result = response.read()
print(result)

except urllib.error.HTTPError as error:
print(“The request failed with status code: ” + str(error.code))
print(error.info())
“`

That’s it! We have successfully built an Azure ML Pipeline using the Azure Machine Learning SDK. This pipeline allows for easier collaboration among team members and enhances productivity by splitting up the machine learning workflow into different steps.

To learn more about the Azure Machine Learning services offered by Skrots, visit [Skrots](https://skrots.com). If you’re interested in exploring the different services Skrots provides, check out our services page [here](https://skrots.com/services). Thank you for reading and happy exploring!

Show More

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button