見出し画像

分散型GPUクラウドのIOCloudの使い方(ジョブ実行編:ファッション MNIST で PyTorch モデルをトレーニングする: Jupyter Notebook)

IO Cloud ユーザーは、Visual Studio Code と Jupyter Notebook を使用してクラスター上でジョブを実行できます。

機械学習 (ML) は、データに基づく意思決定を可能にし、複雑なタスクを自動化することで、さまざまな分野に革命をもたらしました。ML モデルがより高度になるにつれて、これらのモデルのトレーニングとデプロイに必要な計算能力が大幅に増加します。相互接続されたコンピューターが連携して動作するクラスターで機械学習ジョブを実行することは、これらの要求を満たすための重要なアプローチになっています。このドキュメントでは、効率的でスケーラブルな ML ワークフローを確保するために、クラスターで機械学習ジョブを実行する利点、セットアップ、およびベスト プラクティスの概要を示します。

クラスター上で ML ジョブを実行する利点

  1. 強化された計算能力
    クラスターは、複数のマシンの計算リソースを集約し、大規模な ML タスクを処理するために必要な能力を提供します。これにより、単一のマシンでは実行不可能な膨大なデータセットで複雑なモデルをトレーニングできるようになります。

  2. スケーラビリティ
    クラスターは、ML ジョブのニーズに基づいてリソースを拡大または縮小する機能を提供します。この柔軟性により、効率的なリソース管理とコストの最適化が可能になり、ML ワークフローのさまざまな段階でリソースが適切に割り当てられるようになります。

  3. 並列処理
    タスクをクラスター内の複数のノードに分散することで、ML ジョブを並列に実行でき、トレーニングと推論に必要な時間を大幅に短縮できます。この並列処理は、ML アルゴリズムの反復的で計算集約的な性質を処理するために不可欠です。

  4. フォールト トレランスと信頼性
    クラスターはフォールト トレランスを考慮して設計されています。ハードウェア障害やその他の問題が発生した場合、ワークロードを他のノード間で再分散できるため、ダウンタイムが最小限に抑えられ、ML ジョブ実行の信頼性が確保されます。

ファッション MNIST で PyTorch モデルをトレーニングする: Jupyter Notebook

Ray の分散コンピューティング機能を使用して、トレーニング ワークロードを複数のワーカーに分散するジョブをクラスターで実行する方法について説明します。これにより、トレーニング プロセスを並列化でき、全体的なトレーニング時間を短縮できる可能性があります。以下の手順では、Jupyter Notebook を使用して Fashion MNIST ジョブで PyTorch モデルのトレーニングを実行します。
Jupyter Notebook の詳細については、ドキュメントを参照してください。
Jupyter Notebook を使用してこのジョブを実行するには、以下の手順に従います。

1.クラスターの展開が完了したら、「クラスターの表示」に移動します。

2.クラスターの詳細ページで、IDE パスワードをコピーし、Jupyter Notebookをクリックします。

3.Jupyterパスワード フィールドにコピーしたIDE パスワードを入力します。

4.新しい Python ノートブックを作成するには、[新規]をクリックします。

5.「新規」ドロップダウンで、「Python 3」を選択します。新しいタブが起動します。

6.以下のコードサンプルをセルに入力し、「実行」をクリックします。

import os
from typing import Dict

import torch
from filelock import FileLock
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.transforms import Normalize, ToTensor
from tqdm import tqdm

import ray.train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer


def get_dataloaders(batch_size):
    transform = transforms.Compose([ToTensor(), Normalize((0.5,), (0.5,))])

    with FileLock(os.path.expanduser("~/data.lock")):
        training_data = datasets.FashionMNIST(
            root="~/data",
            train=True,
            download=True,
            transform=transform,
        )

        test_data = datasets.FashionMNIST(
            root="~/data",
            train=False,
            download=True,
            transform=transform,
        )

    train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True)
    test_dataloader = DataLoader(test_data, batch_size=batch_size)

    return train_dataloader, test_dataloader


class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(512, 10),
            nn.ReLU(),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits


def train_func_per_worker(config: Dict):
    lr = config["lr"]
    epochs = config["epochs"]
    batch_size = config["batch_size_per_worker"]

    train_dataloader, test_dataloader = get_dataloaders(batch_size=batch_size)

    train_dataloader = ray.train.torch.prepare_data_loader(train_dataloader)
    test_dataloader = ray.train.torch.prepare_data_loader(test_dataloader)

    model = NeuralNetwork()

    model = ray.train.torch.prepare_model(model)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)

    # Model training loop
    for epoch in range(epochs):
        if ray.train.get_context().get_world_size() > 1:
            train_dataloader.sampler.set_epoch(epoch)

        model.train()
        for X, y in tqdm(train_dataloader, desc=f"Train Epoch {epoch}"):
            pred = model(X)
            loss = loss_fn(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        model.eval()
        test_loss, num_correct, num_total = 0, 0, 0
        with torch.no_grad():
            for X, y in tqdm(test_dataloader, desc=f"Test Epoch {epoch}"):
                pred = model(X)
                loss = loss_fn(pred, y)

                test_loss += loss.item()
                num_total += y.shape[0]
                num_correct += (pred.argmax(1) == y).sum().item()

        test_loss /= len(test_dataloader)
        accuracy = num_correct / num_total

        ray.train.report(metrics={"loss": test_loss, "accuracy": accuracy})


def train_fashion_mnist(num_workers=2, use_gpu=False):
    global_batch_size = 32

    train_config = {
        "lr": 1e-3,
        "epochs": 10,
        "batch_size_per_worker": global_batch_size // num_workers,
    }

    # Configure computation resources
    scaling_config = ScalingConfig(num_workers=num_workers, use_gpu=use_gpu)

    # Initialize a Ray TorchTrainer
    trainer = TorchTrainer(
        train_loop_per_worker=train_func_per_worker,
        train_loop_config=train_config,
        scaling_config=scaling_config,
    )

    result = trainer.fit()
    print(f"Training result: {result}")

7.以下のエラーが表示された場合:

2024-05-16 15:42:45,780	INFO util.py:154 -- Outdated packages:
  ipywidgets==7.8.1 found, needs ipywidgets>=8
Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2024-05-16 15:42:46,495	INFO util.py:154 -- Outdated packages:
  ipywidgets==7.8.1 found, needs ipywidgets>=8
Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2024-05-16 15:42:46,528	INFO util.py:154 -- Outdated packages:
  ipywidgets==7.8.1 found, needs ipywidgets>=8
Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.

次に以下を実行します:

pip install -U ipywidgets

8.ツールバーで「カーネル」をクリックし、ドロップダウンから「カーネルの再起動」を選択します。これにより、パッケージが更新されます。

9.新しいセルに以下の Python コードを入力します。[実行]をクリックします。

train_fashion_mnist(num_workers=2, use_gpu=True)

10.以下はジョブの出力です。

11.出力の一番下までスクロールすると、トレーニング結果が表示されます。

Training result: Result(
  metrics={'loss': 0.3572742183404133, 'accuracy': 0.8728},
  path='/home/ray/ray_results/TorchTrainer_2024-05-17_18-55-55/TorchTrainer_c3725_00000_0_2024-05-17_18-55-55',
  filesystem='local',
  checkpoint=None
)

12.クラスターに戻ります。クラスターの詳細ページで、IDE パスワードをコピーし、「Ray Dashboard」をクリックします。

13.パスワード フィールドにパスワードを入力します。[すべてのジョブを表示] をクリックします。ここで、ジョブが実行中であることを確認できます。

14.io.net でこれを確認することもできます。 [クラスター] > [クラスターを選択] > [IO ワーカー] > [ジョブ]の順にクリックします。

Mac(Appleチップ)で同じコードを実行する場合

pipでライブラリのインストールが必要ですが追って記載します。

import os
from typing import Dict

import torch
from filelock import FileLock
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.transforms import Normalize, ToTensor
from tqdm import tqdm

import ray.train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer


def get_dataloaders(batch_size):
    transform = transforms.Compose([ToTensor(), Normalize((0.5,), (0.5,))])

    with FileLock(os.path.expanduser("~/data.lock")):
        training_data = datasets.FashionMNIST(
            root="~/data",
            train=True,
            download=True,
            transform=transform,
        )

        test_data = datasets.FashionMNIST(
            root="~/data",
            train=False,
            download=True,
            transform=transform,
        )

    train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True)
    test_dataloader = DataLoader(test_data, batch_size=batch_size)

    return train_dataloader, test_dataloader


class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(512, 10),
            nn.ReLU(),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits


def train_func_per_worker(config: Dict):
    lr = config["lr"]
    epochs = config["epochs"]
    batch_size = config["batch_size_per_worker"]

    train_dataloader, test_dataloader = get_dataloaders(batch_size=batch_size)

    model = NeuralNetwork()

    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)

    for epoch in range(epochs):
        model.train()
        for X, y in tqdm(train_dataloader, desc=f"Train Epoch {epoch}"):
            pred = model(X)
            loss = loss_fn(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        model.eval()
        test_loss, num_correct, num_total = 0, 0, 0
        with torch.no_grad():
            for X, y in tqdm(test_dataloader, desc=f"Test Epoch {epoch}"):
                pred = model(X)
                loss = loss_fn(pred, y)

                test_loss += loss.item()
                num_total += y.shape[0]
                num_correct += (pred.argmax(1) == y).sum().item()

        test_loss /= len(test_dataloader)
        accuracy = num_correct / num_total

        ray.train.report(metrics={"loss": test_loss, "accuracy": accuracy})


def train_fashion_mnist(num_workers=2):
    global_batch_size = 32

    train_config = {
        "lr": 1e-3,
        "epochs": 10,
        "batch_size_per_worker": global_batch_size // num_workers,
    }

    scaling_config = ScalingConfig(num_workers=num_workers, use_gpu=False)  # use_gpuをFalseに設定

    trainer = TorchTrainer(
        train_loop_per_worker=train_func_per_worker,
        train_loop_config=train_config,
        scaling_config=scaling_config,
    )

    result = trainer.fit()
    print(f"Training result: {result}")


if __name__ == "__main__":
    train_fashion_mnist(num_workers=2)  # 実行時にuse_gpuパラメータを削除


この記事が気に入ったらサポートをしてみませんか?