MiniKFでReusableなComponentを作成

前回MiniKFでとりあえずPipeline作成してみたけど
次はReusableなComponentを作成してみる
基本前回と同じで「ローカルからGCSにcsvアップロード」「GCSのcsvをBQにロードする」の二つを実現。
Componentに変更を加えなくても、引数GCSのバケット名等指定すればいろんな場所にデータ移行できればいいなという目論見

ディレクトリ構成

以下構成で作成
本当はcomponentsディレクトリ作成して、「GCSからBQへロード」みたいな抽象度でcomponent作成して行った方が再利用性高そう

.
├── load_from_gcs_to_bq
│   ├── Dockerfile
│   ├── README.md
│   ├── build_image.sh
│   ├── component.yaml
│   ├── requirements.txt
│   └── src
│       └── main.py
├── pipeline.py
├── pipeline.tar.gz
├── upload_from_local_to_gcs
│   ├── Dockerfile
│   ├── README.md
│   ├── build_image.sh
│   ├── component.yaml
│   ├── requirements.txt
│   └── src
│       ├── data
│       │   ├── calendar.csv
│       │   ├── sales_train_validation.csv
│       │   ├── sample_submission.csv
│       │   └── sell_prices.csv
│       └── main.py
└── upload_pipeline.sh

GCSへのアップロード

アップロードソースコードの作成
アップロードするGCSバケット名とディレクトリを引数から受け取る
upload_from_local_to_gcs/src/main.py

from google.cloud import storage
import argparse
import os
from os import listdir


def main():
   parser = argparse.ArgumentParser(description='ML Trainer')
   parser.add_argument('--input-bucket-name', type=str, help='Google Cloud Storage bucket name for input', required=True)
   parser.add_argument('--input-bucket-dir', type=str, help='Google Cloud Storage bucket dir for input', required=True)
   args = parser.parse_args()
   print('bucket name: ', args.input_bucket_name)
   print('bucket dir: ', args.input_bucket_dir)

   storage_client = storage.Client()
   bucket = storage_client.bucket(args.input_bucket_name)
   destination_blob_dir = args.input_bucket_dir
   source_dir = os.path.join(os.path.dirname(__file__), 'data')
   for file in listdir(source_dir):
       destination_blob_name = f"{destination_blob_dir}/{file}"
       blob = bucket.blob(destination_blob_name)
       blob.upload_from_filename(os.path.join(source_dir, file))

   with open('/tmp/output_bucket_name.txt', 'w') as f:
       f.write(args.input_bucket_name)

   with open('/tmp/output_bucket_dir.txt', 'w') as f:
       f.write(args.input_bucket_dir)


if __name__ == '__main__':
   main()

コンテナにしてアップロード
ソースコードを詰めるだけのコンテナ作成
upload_from_local_to_gcs/Dockerfile

FROM python:3.8.2
COPY requirements.txt ./
RUN pip install -r requirements.txt
COPY ./src /pipelines/component/src

以下シェルスクリプトを実行してコンテナ(0.1.0)をGCRにプッシュ
[project_id]は各GCPプロジェクトのものに変更してくだされ

#!/bin/sh -e

if [ $# -lt 1 ]; then
 cat << EOS
 image_tag argument is required.
 ex) sh build_image.sh 0.1.0
EOS
 exit
fi

IMAGE_TAG="$1"

image_name=asia.gcr.io/[project_id]/kubeflow/components/upload_from_local_to_gcs
full_image_name=${image_name}:${IMAGE_TAG}

cd "$(dirname "$0")"

docker build -t "$full_image_name" .
docker push "$full_image_name"

# Output the strict image name (which contains the sha256 image digest)
docker inspect --format="{{index .RepoDigests 0}}" "${full_image_name}"

component.yamlの作成
componentをyamlで定義しPipelineから呼び出せるようにする
1. fileOutputsにソースコードで吐き出したパスを指定
2. outputsで指定するとPipelineでoutputをして取り出せる
3. inputsで指定したものをソースの引数に渡す
upload_from_local_to_gcs/component.yaml

name: upload to gcs
description: upload csv from local to gcs
inputs:
 - {name: input_bucket_name, type: String, description: 'GCS bucket name for input'}
 - {name: input_bucket_dir, type: String, description: 'GCS bucket dir for input'}
outputs:
 - {name: output_bucket_name, type: String, description: 'GCS bucket name for output'}
 - {name: output_bucket_dir, type: String, description: 'GCS bucket dir for output'}
implementation:
 container:
   image: asia.gcr.io/[project_id]/kubeflow/components/upload_from_local_to_gcs:0.1.0
   command: [python, /pipelines/component/src/main.py]
   args: [
     --input-bucket-name, {inputValue: input_bucket_name},
     --input-bucket-dir, {inputValue: input_bucket_dir}
   ]
   fileOutputs:
     output_bucket_name: /tmp/output_bucket_name.txt
     output_bucket_dir: /tmp/output_bucket_dir.txt

GCSからBQへのロード

BQロードソースコードの作成
読み込むGCSバケット名とディレクトリを引数(前項のoutput)から取得
load_from_gcs_to_bq/src/main.py

import argparse
from google.cloud import storage
from google.cloud import bigquery


def main():
   parser = argparse.ArgumentParser(description='From gcs to bq load')
   parser.add_argument('--input-bucket-name', type=str, help='Google Cloud Storage bucket name', required=True)
   parser.add_argument('--input-bucket-dir', type=str, help='Google Cloud Storage bucket dir', required=True)
   parser.add_argument('--input-bq-dataset', type=str, help='BQ target dataset', required=True)
   args = parser.parse_args()
   print('bucket name: ', args.input_bucket_name)
   print('bucket dir: ', args.input_bucket_dir)
   print('bq dataset: ', args.input_bq_dataset)

   storage_client = storage.Client()
   blobs = storage_client.list_blobs(
       args.input_bucket_name, prefix=args.input_bucket_dir
   )

   client = bigquery.Client()
   dataset_id = args.input_bq_dataset

   dataset_ref = client. dataset(dataset_id)
   job_config = bigquery.LoadJobConfig()
   job_config.autodetect = True
   job_config.skip_leading_rows = 1
   job_config.source_format = bigquery.SourceFormat.CSV
   base_uri = f"gs://{args.input_bucket_name}"
   for blob in blobs:
       num = blob.name.count('/')
       string = blob.name.split('/')[num]
       string_list = string.split('.')
       load_job = client.load_table_from_uri(
           f"{base_uri}/{blob.name}", dataset_ref.table(string_list[0]), job_config=job_config
       )
       load_job.result()  # Waits for table load to complete.

   with open('/tmp/output_dataset.txt', 'w') as f:
       f.write(dataset_id)


if __name__ == '__main__':
   main()

コンテナにしてアップロード
ソースコードを詰めるだけのコンテナ作成
load_from_gcs_to_bq/Dockerfile

FROM python:3.8.2
COPY requirements.txt ./
RUN pip install -r requirements.txt
COPY ./src /pipelines/component/src

以下シェルスクリプトを実行してコンテナ(0.1.0)をGCRにプッシュ
[project_id]は各GCPプロジェクトのものに変更してくだされload_from_gcs_to_bq/build_image.sh

#!/bin/sh

if [ $# -lt 1 ]; then
 cat << EOS
 image_tag argument is required.
 ex) sh build_image.sh 0.1.0
EOS
 exit
fi

IMAGE_TAG="$1"

image_name=asia.gcr.io/[project_id]/kubeflow/components/load_from_gcs_to_bq
full_image_name=${image_name}:${IMAGE_TAG}

cd "$(dirname "$0")"

docker build -t "$full_image_name" .
docker push "$full_image_name"

component.yamlの作成
componentをyamlで定義しPipelineから呼び出せるようにする
1. fileOutputsにソースコードで吐き出したパスを指定
2. outputsで指定するとPipelineでoutputをして取り出せる
3. inputsで指定したものをソースの引数に渡す
upload_from_local_to_gcs/component.yaml

name: m5 forecasting
description: m5 forecasting pipeline
inputs:
 - {name: input_bucket_name, type: String, description: 'GCS bucket name for input'}
 - {name: input_bucket_dir, type: String, description: 'GCS bucket dir for input'}
 - {name: input_bq_dataset, type: String, description: 'Target BQ dataset'}
outputs:
 - {name: output_bq_dataset, type: String, description: 'BQ dataset name for output'}
implementation:
 container:
   image: asia.gcr.io/[project_id]/kubeflow/components/load_from_gcs_to_bq:0.1.0
   command: [python, /pipelines/component/src/main.py]
   args: [
     --input-bucket-name, {inputValue: input_bucket_name},
     --input-bucket-dir, {inputValue: input_bucket_dir},
     --input-bq-dataset, {inputValue: input_bq_dataset}
   ]
   fileOutputs:
     output_bq_dataset: /tmp/output_dataset.txt

Pipelineの作成
細かいとこは前回の記事を参照したいとこだが
load_component_from_fileで作成したyamlファイルを指定して、あとはcomponent間のinputとoutputを出したり入れたり調整する感じ

import kfp
from kfp import dsl
from kfp import gcp
import kubernetes
import os


base_dir = os.path.dirname(__file__)

upload_from_local_to_gcs_op = kfp.components.load_component_from_file(
   os.path.join(base_dir, 'upload_from_local_to_gcs/component.yaml'))

load_from_gcs_to_bq = kfp.components.load_component_from_file(
   os.path.join(base_dir, 'load_from_gcs_to_bq/component.yaml'))


@dsl.pipeline(
   name='m5 forecasting pipeline',
   description='A pipeline fro m5 forecasting.'
)
def m5_forecasting_pipeline(bucket_name, bucket_dir, bq_dataset):
   dsl.get_pipeline_conf().set_image_pull_secrets([kubernetes.client.V1LocalObjectReference(name="regcred")])
   upload_from_local_to_gcs_task = upload_from_local_to_gcs_op(
       input_bucket_name=bucket_name,
       input_bucket_dir=bucket_dir,
   ).apply(gcp.use_gcp_secret())

   load_from_gcs_to_bq(
       input_bucket_name=upload_from_local_to_gcs_task.outputs['output_bucket_name'],
       input_bucket_dir=upload_from_local_to_gcs_task.outputs['output_bucket_dir'],
       input_bq_dataset=bq_dataset,
   ).apply(gcp.use_gcp_secret()).after(upload_from_local_to_gcs_task)


if __name__ == '__main__':
   kfp.Client().create_run_from_pipeline_func(m5_forecasting_pipeline, arguments={})

検証

例の如くpipelineをコンパイル

dsl-compile --py pipeline.py --output pipeline.tar.gz

KubeflowのUIからアップロードしてexperiments, run作成して実行

画像1

こんな感じでパイプラインが成功すればOK

画像2

まとめ

パイプラインのアップロードがUIからなのが面倒
SDKにアップロードするメソッドはありそうだけど、ClientでMiniKFの認証を突破する術がわからぬ・・・トークン渡せそうだけど、userとpasswodだからなぁ・・・
https://github.com/kubeflow/pipelines/blob/62e33c711b12ebeb60d5fb393e9484f2b297bcac/sdk/python/kfp/_client.py#L675

この辺のissueで何かしら回答が出ることを祈って一旦忘れることにする
https://github.com/kubeflow/pipelines/issues/3611


いいなと思ったら応援しよう!