StreamDiffusionを外部プログラムから利用する(1)関数編
はにめに
StreamDiffusionをアプリから使う時、アプリ側のプログラムとStreamDiffusionの生成を分離できれば、アプリ側のプログラムの見通しが良くなります。リポジトリのデモでも例が示されていると思いますが(筆者はデモのソースコード殆ど読んでないのでわかりませんが。。。)そのためには工夫が必要です。いくつか手法はあると思いますが、3種類の方法を実装したので、順次公開と説明をしたいと思います。
1)関数化する(クラスでも同じですね)
2)POST/GETによるAPIサーバ化
3)ソケットを使うTCP/IPパケットによるサーバ化
これら以外にソケットを使うUDPなども考えられます。
1回目は関数化の話です。
StreamDiffusionを関数化して使う
これは、公式リポジトリでもラッパーで実装されています。もう少し融通が効いて、改造も出来るよう、独自に実装しました。以下の記事にあるコードの流用で、区切りのいい処理毎に関数としました。
ソースコード全体はこの章の最後に記載しています。
何処を関数にするのか
StreamDiffusionの処理の大まかな流れは以下のとおりです。
1)Diffusersのpipeにモデルをロードする
2)pipeをStreamDiffusionでラップしてStreamオブジェクトを定義
このとき、重要なパラメータを同時にラップします。
3)出来たStreamオブジェクトを利用して画像を生成する。
関数に分解する場合は上記3分割が良さそうです。
Diffusersのpipeにモデルをロードする
model_path = "./models/Model/Counterfeit-V3.0_fix_fp16.safetensors"
style_lora_path="./models/LoRA/megu_sports_v02.safetensors"
tensorrt_engine="engines_i2i_t4_non"
def load_model(model_path=model_path):
pipe = StableDiffusionPipeline.from_single_file(
model_path).to(
device=torch.device("cuda"),
dtype=torch.float16,
)
return pipe
引数はmodel_pathのみです。デフォルトは
model_path = "./models/Model/Counterfeit-V3.0_fix_fp16.safetensors"
で定義しています。
この関数はDiffuserでpipeにモデルをロードします。返り値は生成したpipeです。
Streamオブジェクト生成する
生成されたpipeを利用してStreamオブジェクトを準備すると共に画像生成に必要な各種の前処理を行います。以下は引数部分です。pipe以外はデフォルトが設定されているのでアプリからの呼び出しは単純です。
def init_stream(
pipe,
init_image=None,
lora_path:str=style_lora_path,
lora_preload:bool =True,
lora_preload_weights:float=0.3,
cfg_type:str="self",
t_index_list:list[int]=[32, 45],
img_size=[512,512],
lora_load:bool =False,
lora_scale_weights:float=0.3,
tensorrt:bool =False ,
tensorrt_engine:str = tensorrt_engine,
prompt:str ="masterpiece, best quality",
negative_prompt:str = "",
guidance_scale:float = 1.2,
delta:float = 1.0,
seed:int = 1
):
非公式LoRAのロード
if lora_preload:
#--- 非公式 独自LoRAのロード
print("lora_path=",lora_path)
pipe.load_lora_weights("latent-consistency/lcm-lora-sdv1-5", adapter_name="lcm") #Stable Diffusion 1.5 のLCM LoRA
pipe.load_lora_weights("./models/LoRA/megu_sports_v02.safetensors", adapter_name="papercut")
pipe.set_adapters(["lcm", "papercut"], adapter_weights=[1.0, lora_preload_weights])
lora_preload:bool =Trueの場合に実行されます
(非公式です。私個人の実験の基に実装しています)
Streamオブジェクト生成する
# ---Wrap the pipeline in StreamDiffusion
stream = StreamDiffusion(
pipe,
t_index_list=t_index_list,
torch_dtype=torch.float16,
cfg_type=cfg_type,
width =img_size[0], #height/width TensorRT有効のときはサイズに注意 512x512のみ
height = img_size[1],
)
StreamDiffusionでpipeをラップしています。引数はinit_stream()から引き継いでいます。
事前計算
生成時に再計算を省くために様々な計算をしておきます。
# ---IIf the loaded model is not LCM, merge LCM
stream.load_lcm_lora()
stream.fuse_lora()
#---公式 独自LoRAのロード
if lora_load & (lora_path != "none"):
stream.load_lora(lora_path)
stream.fuse_lora(lora_scale=lora_scale_weights)
# Use Tiny VAE for further acceleration
stream.vae = AutoencoderTiny.from_pretrained("madebyollin/taesd").to(device=pipe.device, dtype=pipe.dtype)
# --- Enable acceleration いずれかを有効にする
if tensorrt==False:
pipe.enable_xformers_memory_efficient_attention()
else: # Enable acceleration with TensroRT
from streamdiffusion.acceleration.tensorrt import accelerate_with_tensorrt
stream = accelerate_with_tensorrt(stream, tensorrt_engine, max_batch_size=4,) #Step=3
# ---事前計算
stream.prepare(
prompt,
negative_prompt=negative_prompt,
guidance_scale = guidance_scale,
delta=delta,
seed=seed,
)
# ---Warmup >= len(t_index_list) x frame_buffer_size
if init_image is None:
init_image= Image.new('RGB', img_size, (255, 255, 255))
for _ in range(len(t_index_list)):
stream(init_image)
# ---Streamを返す
return stream
1)#---IIf the loaded model is not LCM, merge LCM
LCM-LoRAをロード
#---公式 独自LoRAのロード
独自 LoRAをロード
2)---Use Tiny VAE for further acceleration
tiny VAEを適用
3)--- Enable acceleration
高速化処理。以下のA)かB)のいずれかを有効にします。
A) xformers
B) TensroRT xformersに比べて2~3倍高速になります。
4)---事前計算
ここでも引数はinit_stream()から引き継いでいます。
5) # ---Warmup >= len(t_index_list) x frame_buffer_size
パイプのウォームアップ(詳しくは論文参照)
6)---Streamを返す
作成したStreamを返します。
i2i生成
def stream_i2i(in_image,stream,prompt=""):
#start_time = time.time()
if prompt != "":
#動的にプロンプトを変える
prompt=prompt+prompt_list[i]
stream.update_prompt(prompt)
#画像生成 i2i
x_output = stream(in_image)
ximage=postprocess_image(x_output, output_type="pil")[0]
#gen_time=time.time() - start_time
#print("i-time:",gen_time)
#print("i-fps: ",1/gen_time)
#生成画像はpillow
return ximage
アプリから引数streamを用いて画像を生成します。
同時にレファレンスとなるイメージと、インターラクティブプロンプト(オプション)も用いて画像を生成し、pillow形式の画像を返します。ここでプロンプトを渡さない場合はinit_stream()で渡したプロンプトで生成されます。
t2i生成
def stream_t2i(stream , prompt=""):
#start_time = time.time()
if prompt != "":
#動的にプロンプトを変える
stream.update_prompt(prompt)
#画像生成 t2i
x_output = stream.txt2img()
ximage=postprocess_image(x_output, output_type="pil")[0]
#gen_time=time.time() - start_time
#print("i-time:",gen_time)
#print("i-fps: ",1/gen_time)
#生成画像はpillow
return ximage
i2iと殆ど同じです。プロンプトからのみ生成するので、レファレンスのイメージが引数にありません。インターラクティブにプロンプトを変更で来ます。ここでプロンプトを渡さない場合はinit_stream()で渡したプロンプトで生成されます。以下全体のコードです。
sd_i2i_t2i_function_v1.py コード全体
import torch
from diffusers import AutoencoderTiny, StableDiffusionPipeline,StableDiffusionImg2ImgPipeline
from diffusers.utils import load_image
from streamdiffusion import StreamDiffusion
from streamdiffusion.image_utils import postprocess_image
import numpy as np
import time
import cv2
from PIL import Image
model_path = "./models/Model/Counterfeit-V3.0_fix_fp16.safetensors"
style_lora_path="./models/LoRA/megu_sports_v02.safetensors"
tensorrt_engine="engines_i2i_t4_non"
def load_model(model_path=model_path):
pipe = StableDiffusionPipeline.from_single_file(
model_path).to(
device=torch.device("cuda"),
dtype=torch.float16,
)
return pipe
def init_stream(
pipe,
init_image=None,
lora_path:str=style_lora_path,
lora_preload:bool =True,
lora_preload_weights:float=0.3,
cfg_type:str="self",
t_index_list:list[int]=[32, 45],
img_size=[512,512],
lora_load:bool =False,
lora_scale_weights:float=0.3,
tensorrt:bool =False ,
tensorrt_engine:str = tensorrt_engine,
prompt:str ="masterpiece, best quality",
negative_prompt:str = "",
guidance_scale:float = 1.2,
delta:float = 1.0,
seed:int = 1
):
if lora_preload:
#--- 非公式 独自LoRAのロード
print("lora_path=",lora_path)
pipe.load_lora_weights("latent-consistency/lcm-lora-sdv1-5", adapter_name="lcm") #Stable Diffusion 1.5 のLCM LoRA
pipe.load_lora_weights("./models/LoRA/megu_sports_v02.safetensors", adapter_name="papercut")
pipe.set_adapters(["lcm", "papercut"], adapter_weights=[1.0, lora_preload_weights])
#--- RCFG の指定
#cfg_type = "none"
#cfg_type = "full"
#cfg_type = "self"
#cfg_type = "initialize"
#--- t_index_lisの指定
#index_list=[40]
#index_list=[32, 45]
#index_list=[38,40 ,42,45]
#index_list=[20,30,40]
#index_list=[40 ,42,45]
#t_index_list=[41,42,44,45] #cam
# ---Wrap the pipeline in StreamDiffusion
stream = StreamDiffusion(
pipe,
t_index_list=t_index_list,
torch_dtype=torch.float16,
cfg_type=cfg_type,
width =img_size[0], #height/width TensorRT有効のときはサイズに注意 512x512のみ
height = img_size[1],
)
# ---IIf the loaded model is not LCM, merge LCM
stream.load_lcm_lora()
stream.fuse_lora()
#---公式 独自LoRAのロード
if lora_load & (lora_path != "none"):
stream.load_lora(lora_path)
stream.fuse_lora(lora_scale=lora_scale_weights)
# ---Use Tiny VAE for further acceleration
stream.vae = AutoencoderTiny.from_pretrained("madebyollin/taesd").to(device=pipe.device, dtype=pipe.dtype)
# --- Enable acceleration いずれかを有効にする
if tensorrt==False:
pipe.enable_xformers_memory_efficient_attention()
else: # Enable acceleration with TensroRT
from streamdiffusion.acceleration.tensorrt import accelerate_with_tensorrt
stream = accelerate_with_tensorrt(stream, tensorrt_engine, max_batch_size=4,) #Step=3
# ---事前計算
stream.prepare(
prompt,
negative_prompt=negative_prompt,
guidance_scale = guidance_scale,
delta=delta,
seed=seed,
)
# ---Warmup >= len(t_index_list) x frame_buffer_size
if init_image is None:
init_image= Image.new('RGB', img_size, (255, 255, 255))
for _ in range(len(t_index_list)):
stream(init_image)
# ---Streamを返す
return stream
# ---画像生成 Run the stream infinitely in_imageはOpenCV/255.0の0〜1へ変換したデータ=ndarrey
def stream_i2i(in_image,stream,prompt=""):
#start_time = time.time()
if prompt != "":
#動的にプロンプトを変える
prompt=prompt+prompt_list[i]
stream.update_prompt(prompt)
#画像生成 i2i
x_output = stream(in_image)
ximage=postprocess_image(x_output, output_type="pil")[0]
#gen_time=time.time() - start_time
#print("i-time:",gen_time)
#print("i-fps: ",1/gen_time)
#生成画像はpillow
return ximage
def stream_t2i(stream , prompt=""):
#start_time = time.time()
if prompt != "":
#動的にプロンプトを変える
stream.update_prompt(prompt)
#画像生成 t2i
x_output = stream.txt2img()
ximage=postprocess_image(x_output, output_type="pil")[0]
#gen_time=time.time() - start_time
#print("i-time:",gen_time)
#print("i-fps: ",1/gen_time)
#生成画像はpillow
return ximage
アプリ側サンプルコード
以下のアプリでは表示をスレッド化して少しでも生成時間の短縮を工夫しています。以下表示部スレッドです。生成を始める前に
#-----表示スレッドを開始 th_img_flag=False thread = threading.Thread(target=disp_th, name='disp_th',daemon = True) thread.start()
で表示スレッドを動かし、画像が生成されたら
th_img = image #生成画像を準備
th_img_flag=True #画像準備フラグセット
のようにセットして表示させています。
# -----表示のスレッド化準備(必要に応じて)
global th_img , th_img_flag
#>>> 表示スレッドの定義
import threading
def disp_th():
global th_img , th_img_flag
while True:
if th_img_flag==True:
imgCV_RGB = np.array(th_img, dtype=np.uint8)
th_img = np.array(imgCV_RGB)[:, :, ::-1]
cv2.imshow("image", th_img)
cv2.waitKey(1)
th_img_flag=False
time.sleep(0.01)
i2i WebCAMでキャプチャした画像をプロンプトに従い変換
import os
from PIL import Image
import cv2
import time
from time import sleep
import numpy as np
from sd_i2i_t2i_function_v1 import load_model , init_stream , stream_i2i , stream_t2i
model_path = "./models/Model/Counterfeit-V3.0_fix_fp16.safetensors"
lora_path="./models/LoRA/megu_sports_v02.safetensors"
# -----表示のスレッド化準備(必要に応じて)
global th_img , th_img_flag
#>>> 表示スレッドの定義
import threading
def disp_th():
global th_img , th_img_flag
while True:
if th_img_flag==True:
imgCV_RGB = np.array(th_img, dtype=np.uint8)
th_img = np.array(imgCV_RGB)[:, :, ::-1]
cv2.imshow("image", th_img)
cv2.waitKey(1)
th_img_flag=False
time.sleep(0.01)
# -----プログラムの開始
def main():
from capture import init_cam,cap_img_pil, cap_close
global th_img , th_img_flag #表示スレッド用共有データとセマフォ
#-----pipeの準備(モデルのロード)
pipe = load_model ()
#-----プロンプト
prompt = "masterpiece, best quality, 1girl, solo, long hair, white shirt, brown hair,looking at viewer,blush,bangs,blue eyes,smile,simple background, t-shirt,white background,closed mouth,white t-shirt,shorts,short shorts,headphones,black shorts,light brown hair,blue shorts "
# -----Webカメラのキャプチャを開始
status=init_cam()
# -----キャプチャがオープンしていることを確認
if status==False:
print("カメラを開けません")
#----- Prepare image
image, init_image=cap_img_pil()
#--- RCFG の指定
cfg_type = "none"
#cfg_type = "full"
#cfg_type = "self"
#cfg_type = "initialize"
#--- t_index_lisの指定
#index_list=[40]
#index_list=[32, 45]
#index_list=[38,40 ,42,45]
#index_list=[20,30,40]
#index_list=[40 ,42,45]
#index_list=[41,42,44,45] #cam
index_list=[35,35,35,45] #cam
#-----Streamの初期化:=default
stream=init_stream(
pipe=pipe ,
init_image=init_image,
lora_preload =True,
t_index_list= index_list,
#img_size=[512,768], #[w , h]
lora_load =True,
lora_scale_weights=0.5,
prompt=prompt ,
guidance_scale = 1.1,
cfg_type = cfg_type,
tensorrt=True,
)
#-----表示スレッドを開始
th_img_flag=False
thread = threading.Thread(target=disp_th, name='disp_th',daemon = True)
thread.start()
#-----画像生成ループ
count=500 #ループ回数
total_time=0 #時間計測初期化
for n in range(count):
start_time=time.time()
#カメラ入力
pil_image,cv2_image=cap_img_pil()
#画像生成
image = stream_i2i(
pil_image,
stream,
)
#スレッドで表示
th_img = image #生成画像を準備
th_img_flag=True #画像準備フラグセット
#ループ内で表示
#cv2.imshow("i2i_c",image)
#cv2.waitKey(1)
#生成時間とフレームレートの表示
end_time=time.time()
print("生成時間",end_time- start_time)
print("i-fps",1/(end_time- start_time))
total_time=total_time+(end_time- start_time)
cap_close()
print("avr-time:",total_time/count)
print("avr-fps :",1/(total_time/count))
if __name__ == '__main__':
main()
t2i 単純に何度も生成を行います
import os
from PIL import Image
import cv2
import time
from time import sleep
import numpy as np
from sd_i2i_t2i_function_v1 import load_model , init_stream , stream_i2i , stream_t2i
model_path = "./models/Model/Counterfeit-V3.0_fix_fp16.safetensors"
lora_path="./models/LoRA/megu_sports_v02.safetensors"
# -----表示のスレッド化準備(必要に応じて)
global th_img , th_img_flag
#>>> 表示スレッドの定義
import threading
def disp_th():
global th_img , th_img_flag
while True:
if th_img_flag==True:
imgCV_RGB = np.array(th_img, dtype=np.uint8)
th_img = np.array(imgCV_RGB)[:, :, ::-1]
cv2.imshow("image", th_img)
cv2.waitKey(1)
th_img_flag=False
time.sleep(0.01)
# -----プログラムの開始
def main():
from capture import init_cam,cap_img_pil, cap_close
global th_img , th_img_flag #表示スレッド用共有データとセマフォ
#-----pipeの準備(モデルのロード)
pipe = load_model ()
#-----プロンプト
prompt = "masterpiece, best quality, 1girl,"
prompt_list=[
"1girl","long hair,","white shirt,","serafuku,","brown hair,","looking at viewer,","blush,","smile,", "bangs,","blue eyes,","simple background,", "t-shirt,",\
"white background,","walk a head,","white background,","walk a head,","white background,","walk a head,","white background,","walk a head,","white background,"]
#--- RCFG の指定
cfg_type = "none"
#cfg_type = "full"
#cfg_type = "self"
#cfg_type = "initialize"
#--- t_index_lisの指定
#index_list=[40]
#index_list=[32, 45]#t2i
#index_list=[38,40 ,42,45]
#index_list=[20,30,40]
#index_list=[40 ,42,45]
#index_list=[41,42,44,45] #cam
#index_list=[35,35,35,45] #cam2
index_list=[0, 16, 32, 45] #t2i
#-----Streamの初期化:=default
stream=init_stream(
pipe=pipe ,
lora_preload =True,
t_index_list= index_list,
#img_size=[512,768], #[w , h]
lora_load =True,
lora_scale_weights=0.5,
prompt=prompt ,
guidance_scale = 1.2,
cfg_type = cfg_type,
#tensorrt=True,
)
#-----表示スレッドを開始
th_img_flag=False
thread = threading.Thread(target=disp_th, name='disp_th',daemon = True)
thread.start()
#-----画像生成ループ
count=500 #ループ回数
#count=len( prompt_list) #ループ回数
total_time=0 #時間計測初期化
for n in range(count):
start_time=time.time()
#動的にプロンプトを変える
#next_prompt=prompt + prompt_list[n]
#画像生成
image = stream_t2i(
stream=stream,
#prompt= next_prompt, #動的にプロンプトを変える
)
#スレッドで表示
th_img = image #生成画像を準備
th_img_flag=True #画像準備フラグセット
#ループ内で表示
#cv2.imshow("i2i_c",image)
#cv2.waitKey(1)
#生成時間とフレームレートの表示
end_time=time.time()
print("生成時間",end_time- start_time)
print("i-fps",1/(end_time- start_time))
total_time=total_time+(end_time- start_time)
print("avr-time:",total_time/count)
print("avr-fps :",1/(total_time/count))
if __name__ == '__main__':
main()
まとめ
関数化はリポジトリのラッパーの独自実装版です。クラス形式になっていませんが、変更は容易です。ぜひ皆さんも独自実装にチャレンジしてください。
次回
FastAPIによるサーバ化、またはTCP/IPによるサーバー化の記事です。