見出し画像

Pythonのsalabimで工場シミュレーション - 前編

Pythonの離散事象シミュレーション用のライブラリ「salabim」を用いて簡単な工場のシミュレーションを行ってみました。
 ⇒前編: 工場シミュレータの作成と計画生産シミュレーション
  後編: 強化学習エージェントを組み合わせた自動生産指示にトライ

#python #salabim #離散事象シミュレーション #simpy


salabimってなに?

salabimはPythonの離散事象シミュレーション(discrete-event-simulation)用のライブラリです。アニメーションやグラフなどで結果を可視化する機能が充実しています。

https://github.com/salabim/salabim

2024年7月現在も精力的にリリースが続けられているようです。

公式のドキュメントやサンプルが大変充実しているのでそちらもぜひ参照ください。

この分野ではSimpyが比較的有名なようで日本語の記事もいくつか見つかりましたが、salabimに関しては2024年7月現在日本語の記事は見つけられませんでした。

私も最初はSimpyを使っていたのですが、salabimは簡単に可視化ができて楽しいので是非紹介したいと思いこのメモを書いた次第です。

シミュレーションする工場の概要図

今回シミュレーションする工場の概要図

subassyラインが2つ、assyラインが1つのシンプルな工場です。
生産ラインはジョブショップラインではなくトランスファライン(ロット生産)を想定しています。

Partロケには部品が無尽蔵にあるものとし、subassyラインは部材を待つことなく加工が行えるものとします。対して、assyラインは生産品番の部材となるsubassyがsubassy完成品ロケに十分数ない場合ひたすら待ち続けます。

生産品番が前ロットの生産品番と異なる場合は指定の段取替え時間だけライン(先頭)が停止するものとします。構成をシンプルにするために不良やライン故障停止の発生は考慮しないものとします。

顧客トラックは指定したインターバル時間ごとにassyライン完成品ロケからランダムに1品番だけ指定数持ち出します。この際、持ち出し品番がロケに不足している場合は欠品クレームカウンタをインクリメントして何もせずに帰るものとします。

全てのラインはあらかじめ決められた生産計画に従って生産する品番と数量が設定されるものとします。

各機能の実装

今回の実装内容の解説をしながら可能な限りsalabimの使い方についても説明していきます。

コード全体は本ページの末尾に付記します。
salabimのバージョンは23.2.0(yield)を使用しました。

環境

salabimを含めた必要なモジュールをインポートし、
main先頭でシミュレーションのベースとなる環境を生成しています。

この際、isdefault_env=True(デフォルトでTrue)と宣言されることでこの環境が「デフォルト環境」として扱われ、この後で(envを指定せずに)宣言された種々のオブジェクトはデフォルト環境に属するものとして生成されます。

traceは環境内で発生するイベントの情報を標準出力に出力させるかさせないかのオプションです。デバッグのときはTrueとしておくことをお勧めします 。

# 必要なモジュールのインポート
import salabim as sim
import pandas as pd
import random
from pathlib import Path

# yieldを使用するモード
sim.yieldless(False)
env = sim.Environment(trace=TRACE, isdefault_env=True)

マスタファイル

各マスタファイルは実行ファイルと同じ階層にmasterフォルダを作り、その中に保管するものとします。

【品番マスタファイル】
part, subassy, assyごとに下に示すようなcsvファイルを作成し、各品番の部品構成などの情報をまとめました。
 pn: 品番
 color: 画面表示される際の色(詳細はsalabimの公式ドキュメント参照)
 capacity: ロケの容量
 init_claimed: ロケ初期減量値(ロケ初期値 = capacity - init_claimed)
 ct: 加工サイクルタイム
 c1_pn, c1_num...: 部品構成

//part_master.csv
pn,color,capacity,init_claimed,ct,c1_pn,c1_num,c2_pn,c2_num,c3_pn,c3_num
PART_A,red,,,,,,,,,
PART_B,blue,,,,,,,,,
PART_C,green,,,,,,,,,

//subassy_master.csv
pn,color,capacity,init_claimed,ct,c1_pn,c1_num,c2_pn,c2_num,c3_pn,c3_num
SUB_A,red,1000,990,5,PART_A,1,,,,
SUB_B,blue,1000,950,5,PART_B,1,,,,
SUB_C,green,1000,950,5,PART_C,1,,,,

//assy_master.csv
pn,color,capacity,init_claimed,ct,c1_pn,c1_num,c2_pn,c2_num,c3_pn,c3_num
ASSY_AB,purple,1000,950,10,SUB_A,1,SUB_B,1,,
ASSY_AC,darkorange,1000,950,10,SUB_A,1,SUB_C,1,,

assyラインの部品待ちを発生させたく、あえてSUB_Aの初期個数を少なく(1000 - 990 = 10台)設定しています。

【生産計画マスタファイル】
全ライン分の生産計画を設定するファイルです。同じくcsvで作成します。
 line: ライン名
 line_no: ラインNo.
 in_process_order: 生産順
 prod_pn: 生産品番
 prod_qty: 生産数量

//prod_plan.csv
line,line_no,in_process_order,prod_pn,prod_qty
line_sub_1,0,0,SUB_A,10
line_sub_1,0,1,SUB_B,10
line_sub_1,0,2,SUB_C,10
line_sub_2,1,0,SUB_C,10
line_sub_2,1,1,SUB_B,10
line_sub_2,1,2,SUB_A,10
line_assy_1,2,0,ASSY_AB,5
line_assy_1,2,1,ASSY_AC,5
line_assy_1,2,2,ASSY_AB,5

これらのcsvファイルをpandasのread_csvでDataFrame(df)として取得し、ロケやラインを生成する際に活用していきます。

ロケ

部品や製品のロケをsalabimのResourceを使用して作成します。

salabimのResourceは大きく分けて2つの使い方があり、anonymous=True/Falseで切り替えることができます。

anonymous=Falseとした場合は同時利用可能な数に制限のあるものを表していて、simpyのResource(名前が同じで紛らわしい)に近い使い方ができます。

anonymous=Trueの場合はなにかを貯めておくタンクのようなイメージで、simpyのContainerに似た働きをします。

部品や製品を在庫として貯めておくロケとしては後者の使い方が適しているためそちらを活用します。品番マスタdfを読み込み、pnをキーとした辞書として各ロケ群を環境内に生成します。

# ロケを生成する処理
def generate_locas(mdf: pd.DataFrame) -> dict:
    locas = {
        x["pn"]: sim.Resource(name=x["pn"],
                              anonymous=True,
                              capacity=x["capacity"],
                              initial_claimed_quantity=x["init_claimed"])
        for _, x in mdf[["pn", "capacity", "init_claimed"]].iterrows()
    }
    return locas

生産ライン

salabimでは環境内で動作するオブジェクトを定義する際にはsim.Componentを継承したクラスを作成することが推奨されています。
作成したクラスの"process"メソッドの中身が動作内容になります。(processのないクラスを作成することも可)

また、コンストラクタとしては__init__は使わずにsetupを使うことも推奨されています。このあたりは公式ドキュメントのComponentの項にまとめられているので是非参照ください。

生産ラインクラスのコードは下記の通りです。
(途中で出てくるwork_obj, line_to_locaは後述)

# 生産ライン
class product_line(sim.Component):
    def setup(self, linename, mdf, linecapacity, lt, lotsize, initpn, initqty, prodpn, partlocas, locas):
        # ライン名
        self._name = linename
        # 生産品番マスタdf
        self.mdf = mdf
        # ラインリードタイム
        self.lt = lt
        # ラインロットサイズ
        self.lotsize = lotsize
        # ライン内容量
        self.linecapacity = linecapacity  # (> LT / CT)
        # 加工中キュー
        self.in_q = sim.Queue("in_line", capacity=self.linecapacity)
        # 排出ステーション
        self.out_st = sim.Resource("out_st", anonymous=True, capacity=self.linecapacity, initial_claimed_quantity=self.linecapacity)
        # 部品ロケ
        self.partlocas = partlocas
        # 完成品ロケ
        self.locas = locas
        # 初期生産品番
        self.initpn = initpn
        # 前回生産品番(初期値はinitpn)
        self.prepn = initpn
        # 現在生産品番
        self.prodpn = prodpn
        # 現在生産数量(初期値はinitqty)
        self.prodqty = initqty

    # 生産品番と生産数量の登録(1lot固定の場合はprodqtyをself.lotsizeにする)
    def set_assypn_and_qty(self, prodpn, prodqty):
        self.prodpn = prodpn
        self.prodqty = prodqty  # self.lotsize

    def process(self):
        # initpnを生産している状態からスタート
        self.prodpn = self.initpn
        self.set_mode("operation")
        while True:
            # 生産するassy品番とその情報一式
            pn = self.prodpn
            pninfo = self.mdf[self.mdf["pn"] == pn].head(1)

            # 生産する品番の部品品番・必要個数と, ct, colorを取得
            parts = [(pninfo[f"c{i}_pn"].values[0], pninfo[f"c{i}_num"].values[0])
                     for i in range(1, 4) if not pd.isnull(pninfo[f"c{i}_pn"].values[0])]
            ct = pninfo["ct"].values[0]
            color = pninfo["color"].values[0]

            # 生産数量を取得
            qty = int(self.prodqty)

            # 生産数量分の部材を部材ロケから投入STへ取り出す(足りない場合は待ち続ける)
            self.set_mode("wait_for_parts")
            for part, num in parts:
                yield self.get((self.partlocas[part], int(num) * qty))
            self.set_mode("operation")

            # 品番が異なる場合は段取り時間待つ
            if pn != self.prepn:
                self.set_mode("dandori")
                yield self.hold(15)
                self.set_mode("operation")

            # CTごとに1個ずつワークオブジェクトを生成して加工中キューへ入れる
            for _ in range(qty):
                yield self.hold(ct)
                # ワークは生成されてすぐに加工中キューに入りラインlt待って排出STに入る
                work_obj(lt=self.lt, pn=pn, color=color, in_q=self.in_q, out_st=self.out_st)

            # 全て加工中キューへ入れたら排出ステーションからロケへ渡すプロセスを起動し次の生産指示を取得しにいく
            line_to_loca(line=self, num=qty, pn=pn)

            # 生産した品番を先回生産品番として保持
            self.prepn = pn

            # 生産指示待ちリストへ自ラインを追加し次の生産指示があるまで待機
            self.env.order_waiting_lines.append(self)
            yield self.passivate()

コードだけだと動きがわかりづらいので簡単な絵を用意しました。

生産ラインの動き

生産ラインを投入ST, ライン内, 排出STの3つの領域に分けて考えます。

投入STでは部品ロケから必要数部品を確保し、CT毎に1つずつワーク(work_obj)を生成します。生成されたワークはすぐにライン内(加工中キュー)に入り、ltかけてラインを通過し排出STに入ります。
※この動作はwork_objのprocessに記述してあります。

1ロット分のワークが全てライン内を抜けて排出STに入ったら、まとめて完成品ロケに移載(line_to_loca)します。その間にもラインに切れ目が生じないように次の生産指示を取得しにいき、ワークの投入が行われるようにします。

品番切り替え時の段取停止中は、投入STのみが止まりライン内や排出STからの移載は動作し続けます。

コードはやや煩雑になってしまいましたが、
実際のトランスファラインのワークの流れを可能な限り再現しようとするとこのような形になりました。

ワーク

ワーククラスのコードは下記の通りです。

上記生産ラインの項で説明したワークの動作がprocessメソッド内に記述されています。

また、ワーク1つ1つを画面に表示するためにanimation_objectsでの設定も行っています。設定ファイルに記述した色の長方形内に、品番とラインを抜ける時刻が表示されるようにしています。

# ワーク
class work_obj(sim.Component):
    def setup(self, lt, pn, color, in_q: sim.Queue, out_st: sim.Resource):
        # リードタイムはワーク生成時にラインリードタイムが与えられる
        self.lt = lt
        self.pn = pn
        self.in_q = in_q
        self.out_st = out_st
        self.color = color

    def animation_objects(self, id):
        ao0 = sim.AnimateRectangle((-45, -5, 50, 20),
                                   text=self.pn + ": " +
                                        self.text(self.env.now() + self.lt),
                                   arg=self,
                                   fillcolor=self.color)
        return 45, 30, ao0

    def text(self, t):
        return f"{t:0.1f}"

    def process(self):
        self.enter(self.in_q)
        yield self.hold(self.lt)
        self.leave(self.in_q)
        yield self.put((self.out_st, 1))

ラインからロケへの移載

ロケへの払出しクラスのコードは下記の通りです。
動作は上記までで説明および下記コード内コメントの通りなので説明は省略します。

# 生産ラインの排出STから完成品ロケに移すプロセス
class line_to_loca(sim.Component):
    def setup(self, line: product_line, num, pn):
        self.line = line
        self.out_st = line.out_st
        self.locas = line.locas
        self.num = num
        self.pn = pn

    def process(self):
        # 仕掛けた数が全て排出STに入るまで待つ
        yield self.get((self.out_st, self.num))

        # どのロケに入れるかを振り分ける
        putloca = self.locas[self.pn]

        # 全て排出STに入った時点で加工中キューが空なら生産終了扱いにする
        if self.line.in_q.length.value == 0:
            self.line.set_mode("prod_end")

        # 仕掛けた数が全て排出STに入ったらその数を完成ロケに入れる
        # 不良率を乗算するなど仕掛け数より完成品を減らす場合はこのタイミングで行う
        # 排出STから完成ロケに入るまでの遅延時間も同じくここで設定する
        yield self.put((putloca, self.num))

顧客

顧客を表すクラスのコードです。

顧客は指定したインターバル時間ごとにAssyロケからAssy_ABかAssy_ACのどちらか1品番を指定個数だけ持ち出します。欲しい品番・数量がロケに無い場合は欠品クレームカウント変数をインクリメントして帰ります。

# 完成品ロケから定期的に製品を持ち出すプロセス
class customer(sim.Component):
    def setup(self, locas, mdf, interval, buyqty, chpn_list=None, chwt_list=None):
        self.locas = locas
        self.mdf = mdf
        self.interval = interval
        self.buyqty = buyqty
        self.chpn_list = chpn_list
        self.chwt_list = chwt_list
        self.buypn = None
        self.buyt = None

    def process(self):
        # intervalごとにランダムな品番を1種類buyqty個持ち出す
        while True:
            yield self.hold(self.interval)
            # 重みづけ無しか有りで品番をランダムに選ぶ
            if self.chpn_list == None or self.chwt_list == None:
                getpn_n = random.randint(0, self.mdf.shape[0] - 1)
                getpn = self.mdf["pn"][getpn_n]
            else:
                getpn_n = random.choices(self.chpn_list, k=1, weights=self.chwt_list)[0]
                getpn = self.mdf["pn"][getpn_n]

            # 表示用に品番と時刻を保持
            self.buypn = getpn
            self.buyt = self.env.now()

            # 欲しい品番&数量がロケに無い場合は欠品クレームカウントを+1してgetせずに顧客は帰る
            if self.locas[getpn].available_quantity.value < self.buyqty:
                self.env.claim_count += 1
            else:
                yield self.get((self.locas[getpn], self.buyqty))

画面表示

今回のシミュレータでは画面表示アイテムとして
AnimateQueue, AnimateMonitor, AnimateTextを使用しています。
表示座標の(x, y)を指定するのがやや面倒です。

AnimateQueue, AnimateTextは設定に迷うところはあまりなかったのですが
AnimateMoniterの縦軸のスケール調整で少しつまづきました。
グラフの縦方向の値はheightの値が基準となるみたいで、グラフの縦の長さを100[pix] & 最大値を80[台]にしたい場合は
height=100, vertical_scale=1.25(⇒ 100/1.25=80)のように記述する必要があります。

sim.AnimateMonitor(assylocas["ASSY_AB"].available_quantity,
                   labels=[0, 20, 40, 60, 80],
                   linewidth=3, x=700, y=10, width=300, height=100,
                   horizontal_scale=1, vertical_scale=1.25)

メインループ

シミュレーションを実行する部分です。
env.order_waiting_linesに指示待ちラインが入ってくるまで時刻を1ずつ進め、ここにラインが入って来たら次の生産指示を与えるようなループとなっています。

後編で行う強化学習を見据えた実装となっています。

# 指示待ちラインが発生するまで初期条件でシミュレーション実行
while len(env.order_waiting_lines) == 0:
    env.run(till=env.now() + 1)

# 継続実行ループ
while env.now() < SIMTIME:
    # 生産指示待ちラインを一つ取り出す
    order_line: product_line = env.order_waiting_lines.pop(0)

    # 指示待ちラインの生産品番を仕掛順リストから一つ取り出す
    if prod_plans[order_line._name] == []:
        # 仕掛順リストが空の場合は何もせずpassiveのまま
        # その間もライン内(加工中キュー)のワークはそのまま流動されることに注意
        order_line.set_mode("no_order(ope.)")
    else:
        # 生産指示待ちラインの生産品番と生産数量を更新してactivate
        next_plan = prod_plans[order_line._name].pop(0)
        prodpn = next_plan[0]
        prodqty = next_plan[1]
        order_line.set_assypn_and_qty(prodpn=prodpn, prodqty=prodqty)
        order_line.activate()

    while len(env.order_waiting_lines) == 0:
        env.run(till=env.now() + 1)
        if env.now() > SIMTIME:
            break

シミュレーションの実行

いよいよ実行させます。

実行中の画面

こんな感じでアニメーション用の画面が立ち上がり動きます。
たのしいです^p^
是非動かしてみてください。

画面右上には現在時刻が表示されます。
また、画面左上のメニューから時間の進み方の速さを変えたり途中終了させたりすることができます。

シミュレーション結果の確認

シミュレーション結果ログは様々な形で取得できますがas_dataframe()でpandasのDataFrameとして取得できる機能が便利でした。
salabimのComponentはデフォルトのログ取得機能が手厚く、特に自分でログを取得する機能を書かなくても最低限のログは得ることができるのが良いですね。

もちろん、思い通りのログを残したい場合は自分で細かく記述したほうが良いかと思います。

# as_dataframeで結果をdfとして取得しcsvとして出力
line_assy_1.mode.as_dataframe().to_csv(EXE_DIR / "modelog_line_assy_1.csv")

たとえば上記で得られるdataframeはこんな感じで、modeの時刻変化が追えます。お好みでさらにこれをプロットしてもよいと思います。

,t,product_line.2.mode.x
0,0.0,operation
1,50.0,operation
2,100.0,wait_for_parts
3,150.0,dandori
4,165.0,operation
5,215.0,dandori
6,230.0,operation
7,280.0,no_order(ope.)
8,380.0,prod_end

後編へ

後編では強化学習のエージェントと組み合わせて自動で生産指示を行わせることにトライします。


参考資料

salabim公式ドキュメント

GitHub - salabim

コード全体

import salabim as sim
import pandas as pd
import random
from pathlib import Path
sim.yieldless(False)


# ワーク
class work_obj(sim.Component):
    def setup(self, lt, pn, color, in_q: sim.Queue, out_st: sim.Resource):
        # リードタイムはワーク生成時にラインリードタイムが与えられる
        self.lt = lt
        self.pn = pn
        self.in_q = in_q
        self.out_st = out_st
        self.color = color

    def animation_objects(self, id):
        ao0 = sim.AnimateRectangle((-45, -5, 50, 20),
                                   text=self.pn + ": " + self.text(self.env.now() + self.lt),
                                   arg=self,
                                   fillcolor=self.color)
        return 45, 30, ao0

    def text(self, t):
        return f"{t:0.1f}"

    def process(self):
        self.enter(self.in_q)
        yield self.hold(self.lt)
        self.leave(self.in_q)
        yield self.put((self.out_st, 1))


# 生産ライン
class product_line(sim.Component):
    def setup(self, linename, mdf, linecapacity, lt, lotsize, initpn, initqty, prodpn, partlocas, locas):
        # ライン名
        self._name = linename
        # 生産品番マスタdf
        self.mdf = mdf
        # ラインリードタイム
        self.lt = lt
        # ラインロットサイズ
        self.lotsize = lotsize
        # ライン内容量
        self.linecapacity = linecapacity  # (> LT / CT)
        # 加工中キュー
        self.in_q = sim.Queue("in_line", capacity=self.linecapacity)
        # 排出ステーション
        self.out_st = sim.Resource("out_st", anonymous=True, capacity=self.linecapacity, initial_claimed_quantity=self.linecapacity)
        # 部品ロケ
        self.partlocas = partlocas
        # 完成品ロケ
        self.locas = locas
        # 初期生産品番
        self.initpn = initpn
        # 前回生産品番(初期値はinitpn)
        self.prepn = initpn
        # 現在生産品番
        self.prodpn = prodpn
        # 現在生産数量(初期値はinitqty)
        self.prodqty = initqty

    # 生産品番と生産数量の登録(1lot固定の場合はprodqtyをself.lotsizeにする)
    def set_assypn_and_qty(self, prodpn, prodqty):
        self.prodpn = prodpn
        self.prodqty = prodqty  # self.lotsize

    def process(self):
        # initpnを生産している状態からスタート
        self.prodpn = self.initpn
        self.set_mode("operation")
        while True:
            # 生産するassy品番とその情報一式
            pn = self.prodpn
            pninfo = self.mdf[self.mdf["pn"] == pn].head(1)

            # 生産する品番の部品品番・必要個数と, ct, colorを取得
            parts = [(pninfo[f"c{i}_pn"].values[0], pninfo[f"c{i}_num"].values[0])
                     for i in range(1, 4) if not pd.isnull(pninfo[f"c{i}_pn"].values[0])]
            ct = pninfo["ct"].values[0]
            color = pninfo["color"].values[0]

            # 生産数量を取得
            qty = int(self.prodqty)

            # 生産数量分の部材を部材ロケから投入STへ取り出す(足りない場合は待ち続ける)
            self.set_mode("wait_for_parts")
            for part, num in parts:
                yield self.get((self.partlocas[part], int(num) * qty))
            self.set_mode("operation")

            # 品番が異なる場合は段取り時間待つ
            if pn != self.prepn:
                self.set_mode("dandori")
                yield self.hold(15)
                self.set_mode("operation")

            # CTごとに1個ずつワークオブジェクトを生成して加工中キューへ入れる
            for _ in range(qty):
                yield self.hold(ct)
                # ワークは生成されてすぐに加工中キューに入りラインlt待って排出STに入る
                work_obj(lt=self.lt, pn=pn, color=color, in_q=self.in_q, out_st=self.out_st)

            # 全て加工中キューへ入れたら排出ステーションからロケへ渡すプロセスを起動し次の生産指示を取得しにいく
            line_to_loca(line=self, num=qty, pn=pn)

            # 生産した品番を先回生産品番として保持
            self.prepn = pn

            # 生産指示待ちリストへ自ラインを追加し次の生産指示があるまで待機
            self.env.order_waiting_lines.append(self)
            yield self.passivate()


# 生産ラインの排出STから完成品ロケに移すプロセス
class line_to_loca(sim.Component):
    def setup(self, line: product_line, num, pn):
        self.line = line
        self.out_st = line.out_st
        self.locas = line.locas
        self.num = num
        self.pn = pn

    def process(self):
        # 仕掛けた数が全て排出STに入るまで待つ
        yield self.get((self.out_st, self.num))

        # どのロケに入れるかを振り分ける
        putloca = self.locas[self.pn]

        # 全て排出STに入った時点で加工中キューが空なら生産終了扱いにする
        if self.line.in_q.length.value == 0:
            self.line.set_mode("prod_end")

        # 仕掛けた数が全て排出STに入ったらその数を完成ロケに入れる
        # 不良率を乗算するなど仕掛け数より完成品を減らす場合はこのタイミングで行う
        # 排出STから完成ロケに入るまでの遅延時間も同じくここで設定する
        yield self.put((putloca, self.num))


# 完成品ロケから定期的に製品を持ち出すプロセス
class customer(sim.Component):
    def setup(self, locas, mdf, interval, buyqty, chpn_list=None, chwt_list=None):
        self.locas = locas
        self.mdf = mdf
        self.interval = interval
        self.buyqty = buyqty
        self.chpn_list = chpn_list
        self.chwt_list = chwt_list
        self.buypn = None
        self.buyt = None

    def process(self):
        # intervalごとにランダムな品番を1種類buyqty個持ち出す
        while True:
            yield self.hold(self.interval)
            # 重みづけ無しか有りで品番をランダムに選ぶ
            if self.chpn_list == None or self.chwt_list == None:
                getpn_n = random.randint(0, self.mdf.shape[0] - 1)
                getpn = self.mdf["pn"][getpn_n]
            else:
                getpn_n = random.choices(self.chpn_list, k=1, weights=self.chwt_list)[0]
                getpn = self.mdf["pn"][getpn_n]

            # 表示用に品番と時刻を保持
            self.buypn = getpn
            self.buyt = self.env.now()

            # 欲しい品番&数量がロケに無い場合は欠品クレームカウントを+1してgetせずに顧客は帰る
            if self.locas[getpn].available_quantity.value < self.buyqty:
                self.env.claim_count += 1
            else:
                yield self.get((self.locas[getpn], self.buyqty))


# ロケを生成する処理
def generate_locas(mdf: pd.DataFrame) -> dict:
    locas = {
        x["pn"]: sim.Resource(name=x["pn"],
                              anonymous=True,
                              capacity=x["capacity"],
                              initial_claimed_quantity=x["init_claimed"])
        for _, x in mdf[["pn", "capacity", "init_claimed"]].iterrows()
    }
    return locas


# 仕掛計画をdfからリストにソートする処理
def get_sorted_prodplans(plan_mdf: pd.DataFrame, linenames: list):
    prod_plans = {
        linename: plan_mdf[["prod_pn", "prod_qty"]][plan_mdf["line"] == linename].values.tolist()
        for linename in linenames
    }
    return prod_plans


def main() -> None:
    env = sim.Environment(trace=TRACE, isdefault_env=True)
    env.animate(ANIMATE)
    env.order_waiting_lines = []
    env.claim_count = 0

    # 各種マスタの取得
    part_mdf = pd.read_csv(MASTER_DIR / "part_master.csv")
    subassy_mdf = pd.read_csv(MASTER_DIR / "subassy_master.csv")
    assy_mdf = pd.read_csv(MASTER_DIR / "assy_master.csv")
    plan_mdf = pd.read_csv(MASTER_DIR / "prod_plan.csv")
    linenames = ["line_sub_1", "line_sub_2", "line_assy_1"]

    # 仕掛計画をソートして取得
    prod_plans = get_sorted_prodplans(plan_mdf, linenames)

    # ロケ生成
    partlocas = generate_locas(part_mdf)
    sublocas = generate_locas(subassy_mdf)
    assylocas = generate_locas(assy_mdf)

    # ライン生成
    line_sub_1 = product_line(linename=linenames[0], mdf=subassy_mdf,
                              linecapacity=30, lt=100,
                              lotsize=10, initpn="SUB_A", prodpn=None, initqty=10,
                              partlocas=partlocas, locas=sublocas)
    line_sub_2 = product_line(linename=linenames[1], mdf=subassy_mdf,
                              linecapacity=30, lt=100,
                              lotsize=10, initpn="SUB_B", prodpn=None, initqty=10,
                              partlocas=partlocas, locas=sublocas)
    line_assy_1 = product_line(linename=linenames[2], mdf=assy_mdf,
                              linecapacity=30, lt=100,
                              lotsize=5, initpn="ASSY_AB", prodpn=None, initqty=5,
                              partlocas=sublocas, locas=assylocas)

    # 顧客生成
    customer_0 = customer(locas=assylocas,mdf=assy_mdf, interval=50, buyqty=10)

    # ライン状態を表す画面表示アイテムの生成
    sim.AnimateQueue(line_sub_1.in_q, x=100, y=150, direction="n", title="subassy-line-1")
    sim.AnimateText(text=lambda: "STATUS: " + line_sub_1.mode.value, x=50, y=100)
    sim.AnimateText(text=lambda: "OUT_ST: " + str(line_sub_1.out_st.available_quantity.value), x=50, y=80)
    sim.AnimateText(text=lambda: "PROD_PN: " + line_sub_1.prodpn, x=50, y=60)

    sim.AnimateQueue(line_sub_2.in_q, x=300, y=150, direction="n", title="subassy-line-2")
    sim.AnimateText(text=lambda: "STATUS: " + line_sub_2.mode.value, x=250, y=100)
    sim.AnimateText(text=lambda: "OUT_ST: " + str(line_sub_2.out_st.available_quantity.value), x=250, y=80)
    sim.AnimateText(text=lambda: "PROD_PN: " + line_sub_2.prodpn, x=250, y=60)

    sim.AnimateQueue(line_assy_1.in_q, x=500, y=150, direction="n", title="assy-line-1")
    sim.AnimateText(text=lambda: "STATUS: " + line_assy_1.mode.value, x=450, y=100)
    sim.AnimateText(text=lambda: "OUT_ST: " + str(line_assy_1.out_st.available_quantity.value), x=450, y=80)
    sim.AnimateText(text=lambda: "PROD_PN: " + line_assy_1.prodpn, x=450, y=60)

    # ロケ状態を表す画面表示アイテムの生成
    sim.AnimateText(text=lambda: "ASSY_AB: " + str(assylocas["ASSY_AB"].available_quantity.value), x=900, y=110)
    sim.AnimateMonitor(assylocas["ASSY_AB"].available_quantity, labels=[0, 20, 40, 60, 80],
                       linewidth=3, x=700, y=10, width=300, height=100,
                       horizontal_scale=1, vertical_scale=1.25)
    sim.AnimateText(text=lambda: "ASSY_AC: " + str(assylocas["ASSY_AC"].available_quantity.value), x=900, y=230)
    sim.AnimateMonitor(assylocas["ASSY_AC"].available_quantity, labels=[0, 20, 40, 60, 80],
                       linewidth=3, x=700, y=130, width=300, height=100,
                       horizontal_scale=1, vertical_scale=1.25)
    sim.AnimateText(text=lambda: "SUB_A: " + str(sublocas["SUB_A"].available_quantity.value), x=900, y=400)
    sim.AnimateMonitor(sublocas["SUB_A"].available_quantity, labels=[0, 20, 40, 60, 80],
                       linewidth=3, x=700, y=300, width=300, height=100,
                       horizontal_scale=1, vertical_scale=1.25)
    sim.AnimateText(text=lambda: "SUB_B: " + str(sublocas["SUB_B"].available_quantity.value), x=900, y=520)
    sim.AnimateMonitor(sublocas["SUB_B"].available_quantity, labels=[0, 20, 40, 60, 80],
                       linewidth=3, x=700, y=420, width=300, height=100,
                       horizontal_scale=1, vertical_scale=1.25)
    sim.AnimateText(text=lambda: "SUB_C: " + str(sublocas["SUB_C"].available_quantity.value), x=900, y=640)
    sim.AnimateMonitor(sublocas["SUB_C"].available_quantity, labels=[0, 20, 40, 60, 80],
                       linewidth=3, x=700, y=540, width=300, height=100,
                       horizontal_scale=1, vertical_scale=1.25)

    # 顧客要求情報を表す画面表示アイテムの生成
    sim.AnimateText(text=lambda: "CLAIM_COUNT: " + str(env.claim_count), x=800, y=660)
    sim.AnimateText(text=lambda: "BUY TIME: " + str(customer_0.buyt), x=800, y=680)
    sim.AnimateText(text=lambda: "BUY PN: " + str(customer_0.buypn), x=800, y=700)
    sim.AnimateText(text=lambda: "BUY QTY: " + str(customer_0.buyqty), x=800, y=720)

    # 指示待ちラインが発生するまで初期条件でシミュレーション実行
    while len(env.order_waiting_lines) == 0:
        env.run(till=env.now() + 1)

    # 継続実行ループ
    while env.now() < SIMTIME:
        # 生産指示待ちラインを一つ取り出す
        order_line: product_line = env.order_waiting_lines.pop(0)

        # 指示待ちラインの生産品番を仕掛順リストから一つ取り出す
        if prod_plans[order_line._name] == []:
            # 仕掛順リストが空の場合は何もせずpassiveのまま
            # その間もライン内(加工中キュー)のワークはそのまま流動されることに注意
            order_line.set_mode("no_order(ope.)")
        else:
            # 生産指示待ちラインの生産品番と生産数量を更新してactivate
            next_plan = prod_plans[order_line._name].pop(0)
            prodpn = next_plan[0]
            prodqty = next_plan[1]
            order_line.set_assypn_and_qty(prodpn=prodpn, prodqty=prodqty)
            order_line.activate()

        while len(env.order_waiting_lines) == 0:
            env.run(till=env.now() + 1)
            if env.now() > SIMTIME:
                break

    # as_dataframeで結果をdfとして取得
    line_assy_1.mode.as_dataframe().to_csv(EXE_DIR / "modelog_line_assy_1.csv")
    assylocas["ASSY_AB"].available_quantity.as_dataframe().to_csv(EXE_DIR / "localog_assy_ab.csv")
    assylocas["ASSY_AC"].available_quantity.as_dataframe().to_csv(EXE_DIR / "localog_assy_ac.csv")

    # print_histogramで結果を簡易ヒストグラムで表示
    assylocas["ASSY_AB"].available_quantity.print_histogram(number_of_bins=20, bin_width=5)

    return None


if __name__ == "__main__":
    TRACE = False
    ANIMATE = True
    EXE_DIR = Path(__file__).parent
    MASTER_DIR = Path(__file__).parent / "master"
    SIMTIME = 500

    main()


この記事が気に入ったらサポートをしてみませんか?