StreamDiffusionを外部プログラムから利用する(3)TCP/IPによる画像生成サーバの実装
連載3回目はTCP/IP通信を利用してイメージデータの受け渡しをするサンプルです。TCP/IPよりも上層のプロトコルが無いので高速通信が期待できます。次回4回目はこれまでに集めた様々な条件における生成パフォーマンスをまとめて最終とします。 力尽き、今回で最後とします。
どのようにTCP/IPを使うか
PythonのSOCKET通信機能を利用してTCP/IPパケット上にデータを載せてサーバ↔クライント間で通信します。データは独自に定義できますから自由度は高いですが互換性はありません。閉じたネットワーク環境内での通信やPC内部での通信に使います。
実装したTCP/IP通信部
私が以前から利用している実績の豊富な方式です。
データ形式とフロー
送信時
送りたいデータをリスト化しpickleでバイナリーデータに変換
TCP/IPパケット上にデータがなくなるまで順次送信する
(基本的なエラー処理はLANチップまたはドライバがやります)
受信時
TCP/IPパケットから流れてくるデータを最後まで受け取る
pickle.dumpで元のデータに戻す
リスト形式のデータが出来るので、各要素毎に処理を実行する
リストの先頭に、処理を依頼したい関数名を記載します。例えば
stream_i2iやstream_t2iです。
受信時に処理名をチェックし、予めサーバ↔クライアント間で決めている仕様に基づき続く要素を処理します。通常は変数が入ってイます。
その後、変数をセットして該当する関数またはクラスを呼び出します。
返信はも同様にpickle化しますが、クライアントは自分が問い合わせたサーバからの返信であることが解っているので、単純に受け取ったデータをpickle.dumpし、呼び出し元へreturnします。
付録に全コードを置きます。
サーバ側コード
TCP/IPプロトコル処理
#--------------------TCPIP プロトコル--------------------------
import socket
import pickle
from datetime import datetime
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # ソケット定義(IPv4,TCPによるソケット)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((host,port))
s.listen(10) # ソケット接続待受(キューの最大数を指定)
print(host,port,'クライアントからの接続待ち...')
while True:
try:
try: # ソケット接続受信待ち
clientsock, client_address = s.accept()
except KeyboardInterrupt: # 接続待ちの間に強制終了が入った時の例外処理
clientsock.shutdown(1)# データ送信完了後、送信路を閉じる
print("KeyboardInterrupt: shutdown")
break
else: # 接続待ちの間に強制終了なく、クライアントからの接続が来た場合
all_data=b'' # 受信データ保存用変数の初期化
while True: #ソケット接続開始後の処理
data = clientsock.recv(4096*256) # データ受信、 画像なのでバッファーはできるだけ大きく
if not data: # 全データ受信完了(受信路切断)時に、ループ離脱
break
all_data += data # 受信データを追加し繋げていく
get_data=(pickle.loads(all_data)) #受信データ解析 元の形式にpickle.loadsで復元
tx_gen_out=request_check(get_data)#画像生成
# 結果をクライアントに送信
tx_dat=pickle.dumps(tx_gen_out,5)
clientsock.send(tx_dat) #pickle.dumpsでシリアライズ
clientsock.shutdown(1)# データ送信完了後、送信路を閉じる
except:
print("ai or 通信エラー")
ソケットを準備したらwhile True:でループに入ります。キーボード割り込みで送信路を閉じるようにします。これをしないとアドレスとポートが開放されません。
get_data=(pickle.loads(all_data)) #受信データ解析 元の形式にpickle.loadsで復元
ここで、受信したデータを復元していいます。get_dataには送信側と同じデータが入っています。
tx_gen_out=request_check(get_data)#画像生成
次は、実際のデータの処理に移ります。get_dataを解析して必要な処理を行い、帰ってきたデータをtx_gen_outに入れて相手側に返します。画像生成ならばここは画像データです。
受信データの解析
get_dataはリスト形式になるよう決めています。先頭に処理の関数名や処理名があるので、チックします。
#--------------------TCPIP プロトコル 解析--------------------------
def request_check(get_data):
if get_data[0]=="i2i":
in_image=get_data[1]
prompt=get_data[2]
image_type=get_data[3]
out_type=get_data[4]
ximage=stream_i2i(in_image , prompt , image_type , out_type)
out_image= ximage
elif get_data[0]=="t2i":
prompt = get_data[1]
out_type = get_data[2]
ximage = stream_t2i(prompt , out_type)
out_image= ximage
else:
print("error")
if out_type=="cv2":
#---pillowイメージをOpenVCに変換
new_image = np.array(ximage, dtype=np.uint8)
out_image = cv2.cvtColor(new_image, cv2.COLOR_RGB2BGR)
return out_image
上記の例では
get_data[0]が"i2i"ならば
ximage=stream_i2i(in_image , prompt , image_type , out_type)
get_data[0]が"t2i"ならば
ximage = stream_t2i(prompt , out_type)
を実行しています。
画像生成を実行
stream_i2i()とstream_t2i()は以下の関数を呼んでいます。
#--------------------画像生成 ---------------------i2i
def stream_i2i(in_image , prompt , image_type , out_type):
if image_type=="cv2":
image = cv2.cvtColor(in_image, cv2.COLOR_BGR2RGB)
in_image = Image.fromarray( image)
print("+++++++++++ prompt ++++++++++++",prompt)
if prompt != None:
#動的にプロンプトを変える
stream.update_prompt(prompt)
#---画像生成 i2i
x_output = stream(in_image)
ximage=postprocess_image(x_output, output_type="pil")[0]
return ximage
#--------------------画像生成 ---------------------t2i
def stream_t2i(prompt , out_type):
if prompt !=None:
#動的にプロンプトを変える
stream.update_prompt(prompt)
print("prompt=",prompt)
#画像生成 t2i
x_output = stream.txt2img()
ximage=postprocess_image(x_output, output_type="pil")[0]
return ximage
見慣れたStreamDiffusionの画像生成コード部分です。ここで使用しているstreamはサーバコード起動時に初期設定として通常のStreamの準備を行ったオブジェクトです。単一のプログラムの場合はStreamオブジェクト生成後は上記生成コードを実行しますが、サーバ化においてはStreamオブジェクト生成後にクライアント側からの処理要求を待ちます。要求に従って画像を生成し、返送する流れです。
クライント側
i2i
前回までの(1)、(2)同様にWebCAMの入力をサーバへ送りi2i変換して返すコードです。画像の変換が無いのでとてもシンプルです。ただし、サーバ側のTCP/IPプロトコルに合わせるために、tcpip_client.pyモジュールを呼んでいます。生成コードの前後は(1)、(2)同様なので生成部分のみ示します。gen_img_i2i()に引数をもたせてtcpip_client.pyモジュールを呼び出してます。画像の形式毎にサンプルとして3種類準備しました。プロンプトを指定するとインタラクティブに画像生成が変化します。
画像生成部
# pilloイメージでリクエスト 返答はpillow
#prompt = "masterpiece, best quality, 1girl, solo, long hair, white shirt, serafuku, brown hair,looking at viewer,blush,smile,bangs,blue eyes,simple background,t-shirt,white background,closed mouth,standing,white t-shirt,shorts,short shorts,headphones,black shorts,light brown hair,blue shorts ,running"
prompt =None
# pilloイメージでリクエスト 返答はpillow
#status_code , image_data=gen_img_i2i(pil_image , prompt , image_type="pil" , out_type="pil") #リクエスト送信
# Opencvイメージでリクエスト 返答はpillow
#status_code , image_data=gen_img_i2i(pil_image , prompt , image_type="pil" , out_type="cv2") #リクエスト送信
# Opencvイメージでリクエスト 返答はOpencv
status_code , image_data=gen_img_i2i(cv2_image , prompt , image_type="cv2" , out_type="cv2") #リクエスト送信
表示部
こちらも(1)、(2)同様です。一旦OpenCVに変換してスレッド表示またはループ内表示を行います。
#pillowで受ける時
# レスポンスが成功であることを確認
#if status_code == 200:
# print("response.status_code== 200")
# imgCV_RGB = np.array(image_data, dtype=np.uint8)# Opencvイメージに変換
# image = np.array(imgCV_RGB)[:, :, ::-1]
#opencvで受ける時
if status_code == 200:
# バイナリデータからOpenCVカラー画像に変換
image = image_data
#表示をループ内で行う時
#cv2.imshow("image", image)
#cv2.waitKey(1)
#スレッドで表示
th_img = image #生成画像を準備
th_img_flag=True #画像準備フラグセット
t2i
画像の入力が無いので、さらにシンプルです。付録のコードではtcpip_client.pyモジュールを自分のコードの中に置く例を示しています。必要ならモジュールを呼ぶ形へ変更してください。
画像生成部
gen_img_t2i()を呼んでいるだけです。プロンプトを指定するとインタラクティブに生成用プロンプトを変えることかできます。
# リクエストを送信
status_code , image_data=gen_img_t2i( prompt )
表示はi2iと同じです。
パフィーマンス
気になるパフォーマンスです
H/W
GPU: RTX4090
CPU:i5-13600k
t_index_list =4step
rcfg="none"
t2i のパフォーマンス
出力形式はpillow
動的プロンプト 有り 27.4fps(36.7mS)
動的プロンプト 無し 30.7fps(32.6mS)
4stepですが、かなり早いです。
i2iのパフォーマンス
入力:pillow 返答:pillow 23fps(43mS)
入力:Opencv 返答:pillow 23.1 fps(43mS)
入力:Opencv 返答:Opencv 22.8fps(44 mS)
こちらも充分な速度が得られています。
まとめ
今回3回に分けてアプリからStreamDiffusionを使うための関数化やAPI,サーバ化について記事にしました。こうして分離が出来ると、アプリ側ではStreamDiffusionの細部を気にせずに気楽に高速画像生成が利用できます。
またアプリと分離しているので、アプリ側でLLMなどの大型AIを使う場合もVRAMを気にする必要がなくなります。
付録
サーバ全コード
import torch
from diffusers import AutoencoderTiny, StableDiffusionPipeline,StableDiffusionImg2ImgPipeline
from diffusers.utils import load_image
from streamdiffusion import StreamDiffusion
from streamdiffusion.image_utils import postprocess_image
import numpy as np
import time
import cv2
from PIL import Image
#-------------------- TCP/IP URL --------------------------
host="0.0.0.0"
port=8000
#-------------------- モデル定義--------------------------
model_path = "./models/Model/Counterfeit-V3.0_fix_fp16.safetensors"
lora_path="./models/LoRA/megu_sports_v02.safetensors"
#-------------------- パラメータ定義--------------------------
lora_preload=True
lora_preload_weights=0.5
lora_load=False
lora_scale_weights=0.1
tensorrt=True
prompt="masterpiece, best quality, 1girl,"
negative_prompt=""
guidance_scale=1.2
delta=1.0
seed=1
#--- RCFG の指定
cfg_type = "none" #t2i <<<<<<<<< t2iのとき選択(リポジトリデフォ)
#cfg_type = "full"
#cfg_type = "self"
#cfg_type = "initialize"
#--- t_index_lisの指定
#t_index_list=[40]
#t_index_list=[32, 45]
t_index_list=[38,40 ,42,45]#cam2 < アニメ寄り<<<<<<< i2iのときは有効にすること
#t_index_list=[20,30,40]
#t_index_list=[40 ,42,45]
#t_index_list=[41,42,44,45] #cam<<<<<<<<<<<<<<<<<< i2iのときは有効にすること
#t_index_list=[0, 16, 32, 45] #t2i <<<リアル寄り<<<<<<< t2iのときは有効にすること(リポジトリデフォ)
img_size=[512,512]
#TensorRT engineフォルダー名 "engines_tindex"+t_index_listの要素数+CFGタイプ
tensorrt_engine="engines_tindex" + str(len(t_index_list))+"_"+ cfg_type
#-------------------- 初期設定--------------------------
#--- モデルのロード
pipe = StableDiffusionPipeline.from_single_file(
model_path).to(
device=torch.device("cuda"),
dtype=torch.float16,
)
#--- 非公式 独自LoRAのロード
if lora_preload==True:
print("lora_path=",lora_path)
pipe.load_lora_weights("latent-consistency/lcm-lora-sdv1-5", adapter_name="lcm") #Stable Diffusion 1.5 のLCM LoRA
pipe.load_lora_weights(lora_path, adapter_name="papercut")
pipe.set_adapters(["lcm", "papercut"], adapter_weights=[1.0, lora_preload_weights])
# ---Wrap the pipeline in StreamDiffusion
stream = StreamDiffusion(
pipe,
t_index_list=t_index_list,
torch_dtype=torch.float16,
cfg_type=cfg_type,
width =img_size[0], #height/width TensorRT有効のときはサイズに注意 512x512のみ
height = img_size[1],
)
# ---IIf the loaded model is not LCM, merge LCM
stream.load_lcm_lora()
stream.fuse_lora()
#---公式 独自LoRAのロード
if lora_load & (lora_path != "none"):
stream.load_lora(lora_path)
stream.fuse_lora(lora_scale=lora_scale_weights)
# ---Use Tiny VAE for further acceleration
stream.vae = AutoencoderTiny.from_pretrained("madebyollin/taesd").to(device=pipe.device, dtype=pipe.dtype)
# --- Enable acceleration いずれかを有効にする
if tensorrt==False:
pipe.enable_xformers_memory_efficient_attention()
else: # Enable acceleration with TensroRT
from streamdiffusion.acceleration.tensorrt import accelerate_with_tensorrt
stream = accelerate_with_tensorrt(stream, tensorrt_engine, max_batch_size=4,) #Step=3
# ---事前計算
stream.prepare(
prompt,
negative_prompt=negative_prompt,
guidance_scale = guidance_scale,
delta=delta,
seed=seed,
)
# ---Warmup >= len(t_index_list) x frame_buffer_size
init_image= Image.new('RGB', img_size, (255, 255, 255))
for _ in range(len(t_index_list)):
stream(init_image)
#--------------------画像生成 ---------------------i2i
def stream_i2i(in_image , prompt , image_type , out_type):
if image_type=="cv2":
image = cv2.cvtColor(in_image, cv2.COLOR_BGR2RGB)
in_image = Image.fromarray( image)
print("+++++++++++ prompt ++++++++++++",prompt)
if prompt != None:
#動的にプロンプトを変える
stream.update_prompt(prompt)
#---画像生成 i2i
x_output = stream(in_image)
ximage=postprocess_image(x_output, output_type="pil")[0]
return ximage
#--------------------画像生成 ---------------------t2i
def stream_t2i(prompt , out_type):
if prompt !=None:
#動的にプロンプトを変える
stream.update_prompt(prompt)
print("prompt=",prompt)
#画像生成 t2i
x_output = stream.txt2img()
ximage=postprocess_image(x_output, output_type="pil")[0]
return ximage
#--------------------TCPIP プロトコル 解析--------------------------
def request_check(get_data):
if get_data[0]=="i2i":
in_image=get_data[1]
prompt=get_data[2]
image_type=get_data[3]
out_type=get_data[4]
ximage=stream_i2i(in_image , prompt , image_type , out_type)
out_image= ximage
elif get_data[0]=="t2i":
prompt = get_data[1]
out_type = get_data[2]
ximage = stream_t2i(prompt , out_type)
out_image= ximage
else:
print("error")
if out_type=="cv2":
#---pillowイメージをOpenVCに変換
new_image = np.array(ximage, dtype=np.uint8)
out_image = cv2.cvtColor(new_image, cv2.COLOR_RGB2BGR)
return out_image
#--------------------TCPIP プロトコル--------------------------
import socket
import pickle
from datetime import datetime
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # ソケット定義(IPv4,TCPによるソケット)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((host,port))
s.listen(10) # ソケット接続待受(キューの最大数を指定)
print(host,port,'クライアントからの接続待ち...')
while True:
try:
try: # ソケット接続受信待ち
clientsock, client_address = s.accept()
except KeyboardInterrupt: # 接続待ちの間に強制終了が入った時の例外処理
clientsock.shutdown(1)# データ送信完了後、送信路を閉じる
print("KeyboardInterrupt: shutdown")
break
else: # 接続待ちの間に強制終了なく、クライアントからの接続が来た場合
all_data=b'' # 受信データ保存用変数の初期化
while True: #ソケット接続開始後の処理
data = clientsock.recv(4096*256) # データ受信、 画像なのでバッファーはできるだけ大きく
if not data: # 全データ受信完了(受信路切断)時に、ループ離脱
break
all_data += data # 受信データを追加し繋げていく
get_data=(pickle.loads(all_data)) #受信データ解析 元の形式にpickle.loadsで復元
tx_gen_out=request_check(get_data)#画像生成
# 結果をクライアントに送信
tx_dat=pickle.dumps(tx_gen_out,5)
clientsock.send(tx_dat) #pickle.dumpsでシリアライズ
clientsock.shutdown(1)# データ送信完了後、送信路を閉じる
except:
print("ai or 通信エラー")
i2iクライント全コード
import os
from PIL import Image
import cv2
import time
from time import sleep
import numpy as np
from streamdiffusion.image_utils import postprocess_image
import requests
import io
from io import BytesIO
from capture import init_cam,cap_img_pil,cap_img_pil_t,cap_close
import socket
import pickle
from datetime import datetime
# -----表示のスレッド化準備(必要に応じて)
global th_img , th_img_flag
#>>> 表示スレッドの定義
import threading
def disp_th():
global th_img , th_img_flag
while True:
if th_img_flag==True:
cv2.imshow("image", th_img)
cv2.waitKey(1)
th_img_flag=False
time.sleep(0.02)
# -----プログラムの開始
def main():
global th_img , th_img_flag #表示スレッド用共有データとセマフォ
from tcpip_client import gen_img_i2i , gen_img_t2i
#capture.pyを使わないとき
"""
cap = cv2.VideoCapture(0)
# キャプチャがオープンしていることを確認
if not cap.isOpened():
print("カメラを開けません")
status=False
"""
# Webカメラの初期化キャプチャを開始
status=init_cam()
# キャプチャがオープンしていることを確認
if status==False:
print("カメラを開けません")
#-----表示スレッドを開始
th_img_flag=False
thread = threading.Thread(target=disp_th, name='disp_th',daemon = True)
thread.start()
count=20
total_time=0
for n in range(count):
start_time=time.time()
#カメラ入力 cap_img_pil()はpilloとOpenCV形式が得られる
#ret, cv2_image = cap.read() #capture.pyを使わず、OpenCV形式で送る場合
pil_image , cv2_image = cap_img_pil()
prompt = "masterpiece, best quality, 1girl, solo, long hair, white shirt, serafuku, brown hair,looking at viewer,blush,smile,bangs,blue eyes,simple background,t-shirt,white background,closed mouth,standing,white t-shirt,shorts,short shorts,headphones,black shorts,light brown hair,blue shorts ,running"
# pilloイメージでリクエスト 返答はpillow
#prompt = "masterpiece, best quality, 1girl, solo, long hair, white shirt, serafuku, brown hair,looking at viewer,blush,smile,bangs,blue eyes,simple background,t-shirt,white background,closed mouth,standing,white t-shirt,shorts,short shorts,headphones,black shorts,light brown hair,blue shorts ,running"
prompt =None
# pilloイメージでリクエスト 返答はpillow
#status_code , image_data=gen_img_i2i(pil_image , prompt , image_type="pil" , out_type="pil") #リクエスト送信
# Opencvイメージでリクエスト 返答はpillow
#status_code , image_data=gen_img_i2i(pil_image , prompt , image_type="pil" , out_type="cv2") #リクエスト送信
# Opencvイメージでリクエスト 返答はOpencv
status_code , image_data=gen_img_i2i(cv2_image , prompt , image_type="cv2" , out_type="cv2") #リクエスト送信
#pillowで受ける時
# レスポンスが成功であることを確認
#if status_code == 200:
# print("response.status_code== 200")
# imgCV_RGB = np.array(image_data, dtype=np.uint8)# Opencvイメージに変換
# image = np.array(imgCV_RGB)[:, :, ::-1]
#opencvで受ける時
if status_code == 200:
# バイナリデータからOpenCVカラー画像に変換
image = image_data
#表示をループ内で行う時
#cv2.imshow("image", image)
#cv2.waitKey(1)
#スレッドで表示
th_img = image #生成画像を準備
th_img_flag=True #画像準備フラグセット
#生成時間とフレームレートの表示
end_time=time.time()
print("生成時間",end_time- start_time)
print("i-fps",1/(end_time- start_time))
total_time=total_time+(end_time- start_time)
cap_close()
print("avr-time:",total_time/count)
print("avr-fps :",1/(total_time/count))
if __name__ == '__main__':
main()
t2iクライアント全コード
import os
from PIL import Image
import cv2
import time
from time import sleep
import numpy as np
import requests
import json
import io
from io import BytesIO
import socket
import pickle
from datetime import datetime
host="0.0.0.0" # サーバーIPアドレス定義
port=8000 # サーバー待ち受けポート番号定義
# ++++++++++++++ StreamDiffusion TCP/IP server ++++++++
def get_out(tx_list):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # ソケットクライアント作成
s.connect((host, port)) # 送信先サーバーに接続
all_data=b'' # 受信データ保存用変数の初期化
print(host , port , "StreamDiffusion TX start",datetime.now())
tx_data=pickle.dumps(tx_list,5)
s.send(tx_data) #pickle.dumpsで送信データをシリアライズしサーバに送信
s.shutdown(1)# データ送信完了後、送信路を閉じる
while True: # ソケット接続開始後の処理
data = s.recv(4096*256) # データ受信。受信バッファサイズ4096*256バイト
if not data: # 全データ受信完了(受信路切断)時に、ループ離脱
break
all_data += data # 受信データを追加し繋げていく
get_out =(pickle.loads(all_data))#元の形式にpickle.loadsで復元
print("upscal function RX done",datetime.now())
return get_out
# ++++++++++++++ StreamDiffusion ++++++++++++++++
def gen_img_i2i( img , prompt=None , image_type="pil" , out_type="pil"):
tx_list=[]
tx_list=append("i2i")
tx_list.append(img)
tx_list.append(prompt)
tx_list.append(image_type)
tx_list.append(out_type)
try:
image =get_out(tx_list)
status_code=200
except RuntimeError as error:
print('Error', error)
status_code="error"
return status_code , image
def gen_img_t2i(prompt=None, out_type="pil"):
tx_list=[]
tx_list.append("t2i")
tx_list.append(prompt)
tx_list.append(out_type)
try:
image =get_out(tx_list)
status_code=200
except RuntimeError as error:
print('Error', error)
status_code="error"
return status_code , image
# -----表示のスレッド化準備(必要に応じて)
global th_img , th_img_flag
#>>> 表示スレッドの定義
import threading
def disp_th():
global th_img , th_img_flag
while True:
if th_img_flag==True:
imgCV_RGB = np.array(th_img, dtype=np.uint8)
th_img = np.array(imgCV_RGB)[:, :, ::-1]
cv2.imshow("image", th_img)
cv2.waitKey(1)
th_img_flag=False
time.sleep(0.01)
# -----プログラムの開始
def main():
global th_img , th_img_flag #表示スレッド用共有データとセマフォ
#-----表示スレッドを開始
th_img_flag=False
thread = threading.Thread(target=disp_th, name='disp_th',daemon = True)
thread.start()
#>>>動的プロンプトの初期プロンプト
prompt = "masterpiece, best quality, 1girl,"
prompt_list=[
"1girl","long hair,","white shirt,","serafuku,","brown hair,","looking at viewer,","blush,","smile,", "bangs,","blue eyes,","simple background,", "t-shirt,",\
"white background,","walk a head,","white background,","walk a head,","white background,","walk a head,","white background,","walk a head,","white background,"]
count=len(prompt_list)
total_time=0
for n in range(len(prompt_list)):
start_time=time.time()
#prompt=prompt+ prompt_list[n]
prompt=None
# リクエストを送信
status_code , image_data=gen_img_t2i( prompt )
# レスポンスが成功であることを確認
if status_code == 200:
print("response.status_code== 200")
#スレッドで表示
th_img = image_data #生成画像を準備
th_img_flag=True #画像準備フラグセット
#生成時間とフレームレートの表示
end_time=time.time()
print("生成時間",end_time- start_time)
print("i-fps",1/(end_time- start_time))
total_time=total_time+(end_time- start_time)
else:
print("t2i-error")
print("avr-time:",total_time/count)
print("avr-fps :",1/(total_time/count))
if __name__ == '__main__':
main()
tcpip_client.py全コード
import socket
import pickle
from datetime import datetime
host="0.0.0.0" # サーバーIPアドレス定義
port=8000 # サーバー待ち受けポート番号定義
# ++++++++++++++ StreamDiffusion TCP/IP server ++++++++
def get_out(tx_list):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # ソケットクライアント作成
s.connect((host, port)) # 送信先サーバーに接続
all_data=b'' # 受信データ保存用変数の初期化
print(host , port , "StreamDiffusion TX start",datetime.now())
tx_data=pickle.dumps(tx_list,5)
s.send(tx_data) #pickle.dumpsで送信データをシリアライズしサーバに送信
s.shutdown(1)# データ送信完了後、送信路を閉じる
while True: # ソケット接続開始後の処理
data = s.recv(4096*256) # データ受信。受信バッファサイズ4096*256バイト
if not data: # 全データ受信完了(受信路切断)時に、ループ離脱
break
all_data += data # 受信データを追加し繋げていく
get_out =(pickle.loads(all_data))#元の形式にpickle.loadsで復元
print("upscal function RX done",datetime.now())
return get_out
# ++++++++++++++ StreamDiffusion ++++++++++++++++
def gen_img_i2i( img , prompt=None , image_type="pil" , out_type="pil"):
tx_list=[]
tx_list.append("i2i")
tx_list.append(img)
tx_list.append(prompt)
tx_list.append(image_type)
tx_list.append(out_type)
try:
image =get_out(tx_list)
status_code=200
except RuntimeError as error:
print('Error', error)
status_code="error"
return status_code , image
def gen_img_t2i(prompt=None, out_type="pil"):
tx_list=[]
tx_list.append("t2i")
tx_list.append(prompt)
tx_list.append(out_type)
try:
image =get_out(tx_list)
status_code=200
except RuntimeError as error:
print('Error', error)
status_code="error"
return status_code , image
capture.py全コード
import os
from PIL import Image
import cv2
import time
def init_cam():
global cap
# Webカメラのキャプチャを開始
cap = cv2.VideoCapture(0)
# キャプチャがオープンしていることを確認
if not cap.isOpened():
print("カメラを開けません")
status=False
return status
ret, frame = cap.read()
print("caputure init")
#cv2.imshow('Frame',frame )
status=True
return status
def cap_img():
global cap
# カメラからフレームを読み込む
ret, frame = cap.read()
# フレームの表示と返送
#cv2.imshow('Frame', frame)
return frame
#get pil and cv2イメージ
def cap_img_pil():
img_cv2=cap_img()
# cv2-> PIL
new_image = cv2.cvtColor(img_cv2, cv2.COLOR_BGR2RGB)
pil_image= Image.fromarray(new_image)
#サイズ調整
w,h = pil_image.size
#正方形
img_new = Image.new('RGB', [h,h], (255, 255, 255))
img_new .paste(pil_image ,(-int((w-h)/2),0))
#縦長
#pil_image=pil_image.resize((768,768))
#img_new = Image.new('RGB', [512,768], (255, 255, 255))
#img_new .paste(pil_image ,(-100,0))
return img_new , img_cv2
#get pil and cv2イメージ
def cap_img_pil_t(interval , queue):#interval=mS
print("Start CAM")
while True:
start_time = time.time()
print("CAM time",start_time)
img_cv2=cap_img()
new_image = cv2.cvtColor(img_cv2, cv2.COLOR_BGR2RGB)
pil_image= Image.fromarray(new_image)
w,h = pil_image.size
img_new = Image.new('RGB', [h,h], (255, 255, 255))
img_new .paste(pil_image ,(-int((w-h)/2),0))
#img_new .paste(pil_image ,(-50,0))
#return img_new , img_cv2
queue_data=[img_new,img_cv2]
print("queue.put")
queue.put(queue_data)
# 次の実行までの待機時間を計算
elapsed_time = time.time() - start_time
time_to_wait = max(0, interval - elapsed_time) # interval - 実行にかかった時間
time.sleep(time_to_wait)
def cap_close():
cap.release()