MLOps への道② 〜SageMaker Model Building Pipelinesでモデルの学習・登録パイプラインを作る〜

この記事は、前回の MLOps への道① の続きです。前回の記事をまだお読みでなければ、先にご一読いただけると幸いです。

こんにちは! GIMLEチームの太田です。

まだまだ暑い日が続いておりますね (ーー゛)
私はというと、JAWS-UG京都 での発表も終わってホッとしたからか、少々夏バテ気味になっておりました……
読者の皆様におかれましては、無理をなされませんようお気をつけくださいね!

では、MLOps への道②、いってみたいと思います!

※本シリーズは「できるだけシンプルに」「End To Endで」継続的学習・デプロイ基盤の作成に至るまでの過程を実践してみよう、という想いのもとで執筆しております。扱うデータの内容や学習アルゴリズムの詳細、推論結果の解釈の仕方など、機械学習のディープな部分にはあまり踏み込まない予定ですので、ご留意いただければ幸いです

はじめに

前回の記事では、データの準備、学習、推論ということで、以下構成図の左下の部分を取り扱いました。

第二回目となる今回は、
アルゴリズムにデータを投入し、学習させて、学習済みモデルをModel Registryに登録するまでの流れを、
SageMaker Model Building Pipelineを使ってパイプラインにしてみます。

(前回執筆時点では、説明変数を増やしてもう一度分析をする予定だったのですが、JAWS-UG京都での発表内容も踏まえて内容変更しております)

構成図でいくと、左上の部分です。

実践の大まかな流れとしては、以下のようになります。

  1. データを準備する① 元データを取得して、SageMaker Studioで使えるようにする
  2. データを準備する② SageMaker Studioでのデータ準備
  3. 学習ステップを定義する
  4. モデル登録ステップを定義する
  5. 学習ステップとモデル登録ステップを使って、パイプラインを作成する
  6. パイプラインを実行してみる
  7. お片付け

本記事では、上記の 2. 〜 7. までを記載しています。
1. については、前回記事の「1. データを準備する①」 と同じ手順で作成したデータを使用しますので、そちらを参照いただければと思います。

前提となる知識・用語

前回記事の「前提となる知識・用語」 に加えて、以下の要素が登場します。

AWSに関すること

  • 使用するAWSのサービス
    • SageMaker
      • SageMaker Model Building Pipelines
      • SageMaker Model Registry

1. データを準備する① 元データを取得して、SageMaker Studioで使えるようにする

上述の通り、前回記事 を参照ください。

2. データを準備する② SageMaker Studioでのデータ準備

SageMaker Studioを使用して、データの準備をしていきます。
前回同様、SageMaker Studioのセットアップ方法・起動方法については割愛させていただきます。

2.1 使用するライブラリのインストール

今回使用するPythonライブラリは PyAthena のみなので、こちらをインストールします。

import sys
!{sys.executable} -m pip install PyAthena

実行結果(一部省略):

Successfully installed PyAthena-2.14.0

2.2 後の手順で使用する変数の定義

後続の手順で使用する共通の変数を定義しておきます。

# ライブラリをインポート
import sagemaker
import boto3

# SageMaker APIや、その他AWSサービスとのやり取りに使用するsagemaker.Sessionインスタンス
sm_sess = sagemaker.Session()

# boto3のSessionインスタンス
boto3_sess = boto3.Session()

# 学習用アルゴリズムへの入力データ/出力結果や、学習済みモデルを配置するS3バケット
# sagemaker.Session().default_bucket()を指定すると、"sagemaker-{region}-{aws-account-id}”という名前のS3バケットが使用される
bucket = sm_sess.default_bucket()

# SageMakerで使用するIAMロール
# ここでは、あらかじめ作成しておいたロールを指定
role = "arn:aws:iam::XXXXXXXXXXXX:role/cloud-ml-demo-sagemaker-execution-role"

# リージョン
region = boto3_sess.region_name

2.3 Athenaからのデータの取得と分割

前回と同じように、Athenaからデータを取得して、学習用データ・検証用データ・テスト用データに分けてpandasのデータフレームに格納します。
本記事で作成するパイプラインではテスト用データは使用していないのですが、一応確認のために変数定義して、表示しています。

# ライブラリをインポート
from pyathena import connect
import pandas as pd

# Athenaに接続
conn = connect(
    s3_staging_dir='s3://aws-athena-query-results-ap-northeast-1-XXXXXXXXXXXX/',
    region_name=region
)

# 学習用データを取得・表示
query_train = 'select "value", "year" from "e_stat_go_jp_minkan_kyuyo_db"."e_stat_go_jp_minkan_kyuyo_table" where year < 2013;'
df_train = pd.read_sql(query_train, conn)
print('学習用データ')
print(df_train)

# 検証用データを取得・表示
query_validation = 'select "value", "year" from "e_stat_go_jp_minkan_kyuyo_db"."e_stat_go_jp_minkan_kyuyo_table" where 2013 <= year and year < 2019;'
df_validation = pd.read_sql(query_validation, conn)
print('\n検証用データ')
print(df_validation)

# テスト用データを取得・表示
query_test = 'select "value", "year" from "e_stat_go_jp_minkan_kyuyo_db"."e_stat_go_jp_minkan_kyuyo_table" where 2019 <= year;'
df_test = pd.read_sql(query_test, conn)
print('\nテスト用データ')
print(df_test)

出力結果:

学習用データ
      value  year
0   4078000  1994
1   4107000  1995
2   4118000  1996
3   4183000  1997
4   4185000  1998
5   4032000  1999
6   4082000  2000
7   4001000  2001
8   3887000  2002
9   3753000  2003
10  3766000  2004
11  3710000  2005
12  3670000  2006
13  3672000  2007
14  3652000  2008
15  3502000  2009
16  3547000  2010
17  3583000  2011
18  3521000  2012

検証用データ
     value  year
0  3595000  2013
1  3614000  2014
2  3612000  2015
3  3562000  2016
4  3671000  2017
5  3716000  2018

テスト用データ
     value  year
0  3879000  2019
1  3701000  2020

2.4 学習用データと検証用データをS3バケットに配置する

次は、学習用と検証用に分けておいたデータをS3バケットに配置します。
その後、S3に配置したデータから、SageMakerのSDKを通じて学習用アルゴリズムへ入力するためのデータを作成します。

# S3バケットのプレフィックス
s3_prefix = "sagemaker/aota_liner_learner_6"

# 学習用データのS3上の場所
s3_train_data = f"s3://{bucket}/{s3_prefix}/train"
print(f"training files will be taken from: {s3_train_data}")

# 検証用データのS3上の場所
s3_validation_data = f"s3://{bucket}/{s3_prefix}/validation"
print(f"validation files will be taken from: {s3_validation_data}")

# 出力されるデータ(モデルなど)のS3上の場所
output_location = f"s3://{bucket}/{s3_prefix}/output"
print(f"training artifacts output location: {output_location}")

# S3上に学習用データと検証用データを配置
df_train.to_csv(f"{s3_train_data}/minkan_kyuyo.csv", index=False, header=False)
df_validation.to_csv(f"{s3_validation_data}/minkan_kyuyo.csv", index=False, header=False)

# 上記でS3に配置したデータから、SageMakerのSDKを通じて学習用アルゴリズムへ入力するためのデータを作成
# 入力データを、sagemaker.inputs.TrainingInputという形式(Pythonのクラス)で作成

# 学習用データ
train_data = sagemaker.inputs.TrainingInput(
    s3_train_data,
    distribution="FullyReplicated",
    content_type="text/csv",
    s3_data_type="S3Prefix",
    record_wrapping=None,
    compression=None,
)
print(train_data.config)

# 検証用データ
validation_data = sagemaker.inputs.TrainingInput(
    s3_data=s3_validation_data ,
    distribution="FullyReplicated",
    content_type="text/csv",
    s3_data_type="S3Prefix",
    record_wrapping=None,
    compression=None,
)
print(validation_data.config)

出力結果:

training files will be taken from: s3://sagemaker-ap-northeast-1-XXXXXXXXXXXX/sagemaker/aota_liner_learner_6/train
validation files will be taken from: s3://sagemaker-ap-northeast-1-XXXXXXXXXXXX/sagemaker/aota_liner_learner_6/validation
training artifacts output location: s3://sagemaker-ap-northeast-1-XXXXXXXXXXXX/sagemaker/aota_liner_learner_6/output
{'DataSource': {'S3DataSource': {'S3DataType': 'S3Prefix', 'S3Uri': 's3://sagemaker-ap-northeast-1-XXXXXXXXXXXX/sagemaker/aota_liner_learner_6/train', 'S3DataDistributionType': 'FullyReplicated'}}, 'ContentType': 'text/csv'}
{'DataSource': {'S3DataSource': {'S3DataType': 'S3Prefix', 'S3Uri': 's3://sagemaker-ap-northeast-1-XXXXXXXXXXXX/sagemaker/aota_liner_learner_6/validation', 'S3DataDistributionType': 'FullyReplicated'}}, 'ContentType': 'text/csv'}

3. 学習ステップを定義する

今回も、SageMakerの組み込みアルゴリズムから、線形学習アルゴリズムを使っていきます。
まずは、コンテナイメージURIの取得から。

3.1 SageMakerの線形学習アルゴリズムのコンテナイメージURIを取得する

# ライブラリをインポート
from sagemaker.image_uris import retrieve

# sagemaker.image_uris.retrieveを使用して、線形学習アルゴリズム(linear-learner)のコンテナイメージURIを取得
container = retrieve("linear-learner", region, version="1")
print(container)

出力結果:

351501993468.dkr.ecr.ap-northeast-1.amazonaws.com/linear-learner:1

続いて、Estimatorインスタンスの作成と、ハイパーパラメータの設定をします。

3.2 線形学習アルゴリズムの設定をする

# 学習アルゴリズムを実行するEstimator(推定器)を作成
linear = sagemaker.estimator.Estimator(
    container,
    role,
    instance_count=1,
    instance_type="ml.m4.xlarge",
    output_path=output_location,
    sagemaker_session=sess,
)

# 最低限必要なハイパーパラメータを設定
linear.set_hyperparameters(
    predictor_type="regressor",
    mini_batch_size=10,
)

ここまでは、前回とほぼ同じですね。
ではここから、パイプラインの作成に入っていきます!

3.3 学習ステップを定義する

今回作成するのは、学習 → モデル登録の2ステップを組み合わせたパイプラインでした。
まずは、学習ステップから定義していきます。

# ライブラリをインポート
from sagemaker.workflow.steps import TrainingStep

# 学習ステップを定義
training_step = TrainingStep(
    name="Train",
    estimator=linear,
    inputs={
        "train": train_data,
        "validation": validation_data,
    }
)

print(training_step)

ステップの名前・Estimatorインスタンス・学習のインプットデータを指定して、学習ステップを定義しています。

4. モデル登録ステップを定義する

次に、学習が終わったモデルを登録するステップを定義します。
流れとしては、
モデル自体の定義 → モデルを登録するステップの定義
の順でコードを書いています。

# ライブラリをインポート
import time
from sagemaker.model import Model
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.model_step import ModelStep

timestamp = int(time.time())

# Model Registryに作成するモデルグループの名前 
model_package_group_name = f"aota-linear-learner-test-pipeline2"

# モデルの名前
model_name = "aota-linear-learner-pipeline-test-model-{}".format(timestamp)

# モデルを定義
model = Model(
    name=model_name,
    image_uri=container,
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=PipelineSession(),
    role=role,
)

# モデル登録ステップを定義
register_step = ModelStep(
    name="ModelStep",
    step_args=model.register(
        content_types=["application/jsonlines"],
        response_types=["application/jsonlines"],
        inference_instances=["ml.m4.xlarge"],
        transform_instances=["ml.m4.xlarge"],
        model_package_group_name=model_package_group_name
    )
)

print(register_step)

モデル自体の定義では、学習ステップから学習済みモデルの情報を受け取るように設定。
モデル登録ステップの定義では、定義したモデルの register() メソッドを呼び出して、モデルを登録するように設定しています。

5. 学習ステップとモデル登録ステップを使って、パイプラインを作成する

上記で定義した学習ステップとモデル登録ステップを指定して、パイプラインを作成します。

from sagemaker.workflow.pipeline import Pipeline

# パイプラインの名前
pipeline_name = "aota-linear-learner-pipeline-test3"

# パイプラインの定義
pipeline = Pipeline(
    name=pipeline_name,
    steps=[training_step, register_step],
)

# パイプラインの作成
response = pipeline.create(role_arn=role)

print(response["PipelineArn"])

ここは比較的単純ですね。
パイプラインの名前と定義したステップを設定してパイプラインを定義し、create() メソッドを呼び出して作成します。

作成したパイプラインは、SageMaker Studioの、SageMaker resources - Pipelines から確認できます。

今回作成したパイプラインはこちら ↓

では、このパイプラインを実行してみましょう!

6. パイプラインを実行してみる

start() メソッドを実行することで、パイプラインを実行できます。

pipeline.start()

実行が進んでいる様子 ↓

実行完了! ↓

では、Model Registryに学習済みモデルが登録されているか、確認してみます。
Model Registryは、SageMaker Studioの、SageMaker resources - Model Registry から確認できます。

バージョン1として、学習済みモデルが登録されていました!

7. お片付け

パイプラインを作成して、動かすことができました。
今回は一旦ここまでにして、お片付けに入っていきます。

※本記事の内容を見て一緒に実践してくださっている方、ありがとうございます! お片付けの手順は必要に応じて実施ください。

7.1 パイプラインを削除する

delete() メソッドを実行すると、パイプラインが削除できます。

pipeline.delete()

7.2 Model Registryからモデルグループを削除する

モデルグループを削除するには、モデルグループからモデルパッケージ(先ほどパイプラインで登録した学習済みモデル)を削除しておく必要があります。
以下のコードで、モデルパッケージの削除 → モデルグループの削除 を実行します。

sm_client = boto3.client('sagemaker')

# モデルグループから、モデルパッケージを削除する

model_package_group = sm_client.list_model_packages(
        ModelPackageGroupName=model_package_group_name,
    )

model_packages = model_package_group.get('ModelPackageSummaryList')

if model_packages:
    for mp in model_packages:
        sm_client.delete_model_package(
            ModelPackageName=mp['ModelPackageArn']
        )
        time.sleep(1)
        
# モデルグループを削除する

sm_client.delete_model_package_group(
    ModelPackageGroupName=model_package_group_name
)

おわりに

今回の実践はここまでとなります。
徐々に ML "Ops" の領域に入ってきている気がしますね。

次回は、構成図の右半分、「CodePipelineを使用した、学習済みモデルを継続してデプロイする仕組みの構築」をやっていこうと思います!

MLOps への道 記事一覧