見出し画像

【Python】ChatGPT効率化の為に非同期処理を実装

非同期処理は、データ解析、APIリクエスト、ベクター化など多様なタスクにおいてパフォーマンスを向上させる鍵となる技術です。特に、待ち時間が発生しやすい多数のリクエストや処理を一度に効率よく処理したい場合、この技術は不可欠です。

非同期処理と並列処理の違い

非同期処理と並列処理は、よく一緒に取り上げられることが多いですが、実はそれぞれ異なる目的と特性を持っています。

非同期処理

非同期処理は、I/O待ち(ディスクへの読み書きやネットワーク通信など)といった待機時間を有効に使いながら、他のタスクを進める技術です。この方法で、全体のプログラムがスムーズに動作します。

並列処理

一方で、並列処理は複数の処理を物理的に同時に行う技術です。簡単な例でいえば動画です。動画の再生と音声の再生を遅延が無いよう同時に行っています。
並列処理により、大量のデータ処理や高度な計算を高速に行えます。

まとめ

簡単に言えば、非同期は「待っている間に何か他のことをする」技術、並列処理は「多くのことを同時に行う」技術といえます。それぞれの技術には独自の利点と制限がありますので、詳細は別途Googleなどで調べてみてください。

目的

この記事では、非同期処理を用いてどのように業務効率を向上させるかに焦点を当てます。具体的には、Pythonで非同期処理を実装する手法と、それをローカル環境とGoogle Colabでどう活用するかを解説します。

コードを使う際の注意点

1,サーバーに高負荷なアクセスがかかりますのであまりに大量なアクセスは注意してください。今回のコードでは実装していませんが、ループにウェイト1秒等を入れると親切です。
2,OpenAIのレート制限に注意してください。
3,当コードはOpenAIAPIを考慮して作成してあります。大量アクセスを考慮されていないサイトへの等コードの使用は、DoS攻撃と間違えられ訴えられる可能性もあるため、注意して活用してください。(スクレイピング等)

非同期処理ローカル用コード

実際に非同期処理のコードです。
この記事ではcolab用とローカル用のコードを記載します。
まずはローカル用のコードです。

※create_message関数内でmessageListとuserMessageがありますが、今回は使っていません。

# 必要なライブラリとモジュールをインポート
import openai
import asyncio
import logging
import tiktoken
from concurrent.futures import ThreadPoolExecutor


# OpenAI APIの設定
API_KEY = "your_api_key_here"
openai.api_key = API_KEY
GptModel = "gpt-3.5-turbo"
MaxTokens = 4000  # 1回のAPI呼び出しで処理できる最大トークン数
encoding: Encoding = tiktoken.encoding_for_model(GptModel)

# メッセージを作成する関数
def create_message(systemPrompt, messageList=[], userMessage=None):
    # システムメッセージを作成
    systemMessage = {'role': 'system', 'content': systemPrompt}
    messages = []
    messages.append(systemMessage)
    
    # ユーザーメッセージが指定された場合、それも追加
    if userMessage is not None and userMessage != "":
        messageList.append({'role': 'user', 'content': userMessage})
    
    # OpenAI GPT-3のトークン制限に合わせてメッセージを調整
    total_chars = len(encoding.encode(systemPrompt)) + sum([len(encoding.encode(msg['content'])) for msg in messageList])
    
    while total_chars > MaxTokens and len(messageList) > 0:
        messageList.pop(0)
        total_chars = len(encoding.encode(systemPrompt)) + sum([len(encoding.encode(msg['content'])) for msg in messageList])
    
    # 最終的なメッセージリストを作成
    for message in messageList:
        messages.append(message)
    
    return messages

# GPT-3からの応答を取得する関数
def get_gpt_response(systemprompt: str) -> str:
    try:
        # メッセージを作成
        messages = create_message(systemprompt)
        
        # OpenAI APIを呼び出し
        response_json = openai.ChatCompletion.create(
                            model=GptModel,
                            messages=messages
                        )
        
        # 応答をパース
        if response_json is not None:
            responseText = response_json['choices'][0]['message']['content'].strip() 
            return responseText
        
        raise ValueError("ChatGPTからの返答がありません")
    except Exception as e:
        # エラーロギング
        logging.error(f"エラー: {str(e)}")
        raise ValueError(f"エラーが発生しました。{str(e)}") 

# 複数のプロンプトに対して非同期に応答を取得する関数
def async_gpt_responses(prompts):
    try:
        # 非同期ループを取得
        loop = asyncio.get_event_loop()
        responses = []
        
        # ThreadPoolExecutorで非同期にAPI呼び出し
        with ThreadPoolExecutor() as executor:
            tasks = [loop.run_in_executor(executor, get_gpt_response, prompt) for prompt in prompts]
            responses = loop.run_until_complete(asyncio.gather(*tasks))
        
        return responses
    except Exception as e:
        # エラーロギング
        logging.error(f"エラー: {str(e)}")
        raise ValueError(f"エラーが発生しました。{str(e)}")

# メイン実行部
if __name__ == "__main__":
    # プロンプトのリスト
    prompts = ["天気はどうですか?", "おすすめの映画は何ですか?", "AIは将来どのように進化しますか?"]
    
    # 非同期で応答を取得
    responses = async_gpt_responses(prompts)
    
    # 結果を出力
    for i, response in enumerate(responses):
        print(f"プロンプト {i + 1}: {prompts[i]}")
        print(f"返答: {response}\n")

インポート部分

  • openai: OpenAIのGPT-3 APIを呼び出すためのライブラリ。

  • asyncio: 非同期処理を扱うための標準ライブラリ。

  • logging: ログ出力のための標準ライブラリ。

  • ThreadPoolExecutor: スレッドベースの並列処理を行うためのクラス。

  • Encoding: GPT-3 APIのトークン数を計算するためのライブラリ。

関数1: create_message

この関数は、GPT-3.5に送るメッセージを作成します。

  • systemPrompt: システムプロンプト。

  • messageList: 既存のメッセージのリスト。

  • userMessage: 新たに追加するユーザーからのメッセージ。

この関数は、GPT-3に送るためのメッセージのリストを返します。また、メッセージのトークン数が指定の上限を超えないように調整します。

関数2: get_gpt_response

この関数は、特定のプロンプト(systemprompt)に対するGPT-3.5からの応答を取得します。

  • まず、create_message関数を使ってメッセージを作成します。

  • 次に、openai.ChatCompletion.createを使ってAPIリクエストを行い、GPT-3からの応答を取得します。

関数3: async_gpt_responses

この関数は、複数のプロンプトに対して非同期にGPT-3からの応答を取得します。

  • ThreadPoolExecutorasyncioを使って、複数のプロンプトに対して並行してAPIリクエストを行います。

メイン実行部

この部分は、スクリプトが直接実行された場合に動作します。具体的には、

  1. プロンプトのリストを定義します。

  2. async_gpt_responses関数を使って非同期に応答を取得します。

  3. 取得した応答を出力します。

まとめ

この解説で特に注目すべきは、ThreadPoolExecutorを使用した非同期処理の部分です。ThreadPoolExecutorasyncioを組み合わせることで、GPT-3 APIに対する複数のリクエストを効率的に処理する非同期コードが実装されています。

具体的には、async_gpt_responses関数内でThreadPoolExecutorを用いて複数のAPIリクエストを同時に発行しています。これにより、各リクエストの待ち時間を有効に活用して、全体のレスポンス時間を短縮することができます。これは特に、APIのレスポンスに待ち時間がある場合や、大量のリクエストを高速に処理したい場面で非常に有用です。

このアプローチの美点は、既存の同期的なコードに非常に少ない修正で非同期処理を導入できる点です。ThreadPoolExecutorが提供する並列実行環境は、元々同期的に設計された関数(この場合、get_gpt_response関数)をそのまま使用できるため、コードの大幅なリファクタリングを避けられます。

非同期処理Google Colab用コード

colab用のコードですが、基本的にはローカル用のものと同じです。
変更点はnest_asyncioを使用している点です。
これは何かというと、非同期処理のネストを許可するものです。
colab自体が非同期処理のコードのようなものなので、その中で更に非同期実行??というイメージで大丈夫です。私もよくわかりません()

#最初に実行
!pip install openai
!pip install nest_asyncio
!pip install tiktoken
#非同期テストコード
import openai  # OpenAIのAPIを利用するためのライブラリ
import asyncio  # 非同期プログラミングを容易にするライブラリ
import nest_asyncio  # asyncioのネスト(入れ子)を許可するライブラリ
import logging  # ログ情報を出力するためのライブラリ
from concurrent.futures import ThreadPoolExecutor  # スレッドプールを使って非同期処理を行うためのクラス
from tiktoken.core import Encoding


# OpenAI APIの設定
API_KEY = "your_api_key_here"
openai.api_key = API_KEY
GptModel = "gpt-3.5-turbo"
MaxTokens = 4000  # 1回のAPI呼び出しで処理できる最大トークン数
encoding: Encoding = tiktoken.encoding_for_model(GptModel)

# メッセージを作成する関数
def create_message(systemPrompt, messageList=[], userMessage=None):
    # システムメッセージを作成
    systemMessage = {'role': 'system', 'content': systemPrompt}
    messages = []
    messages.append(systemMessage)
    
    # ユーザーメッセージが指定された場合、それも追加
    if userMessage is not None and userMessage != "":
        messageList.append({'role': 'user', 'content': userMessage})
    
    # OpenAI GPT-3のトークン制限に合わせてメッセージを調整
    total_chars = len(encoding.encode(systemPrompt)) + sum([len(encoding.encode(msg['content'])) for msg in messageList])
    
    while total_chars > MaxTokens and len(messageList) > 0:
        messageList.pop(0)
        total_chars = len(encoding.encode(systemPrompt)) + sum([len(encoding.encode(msg['content'])) for msg in messageList])
    
    # 最終的なメッセージリストを作成
    for message in messageList:
        messages.append(message)
    
    return messages

# GPTからレスポンスを取得する関数
def get_gpt_response(systemprompt: str) -> str:
    try:
        # メッセージを作成
        messages = create_message(systemprompt)

        # GPT-3 APIを呼び出してレスポンスを取得
        response_json = openai.ChatCompletion.create(
                            model=GptModel,
                            messages=messages
                        )

        # レスポンスを解析して返す
        if response_json is not None:
            responseText = response_json['choices'][0]['message']['content'].strip()
            return responseText
        raise ValueError("ChatGPTからの返答がありません")
    except Exception as e:
        logging.error(f"エラー: {str(e)}")
        raise ValueError(f"エラーが発生しました。{str(e)}")

# 複数のプロンプトに対して非同期でレスポンスを取得する関数
def async_gpt_responses(prompts):
    try:
        # 非同期処理の設定
        nest_asyncio.apply()
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        responses = []

        # スレッドプールを使って非同期でAPIを呼び出す
        with ThreadPoolExecutor() as executor:
            tasks = [loop.run_in_executor(executor, get_gpt_response, prompt) for prompt in prompts]
            responses = loop.run_until_complete(asyncio.gather(*tasks))

        return responses
    except Exception as e:
        logging.error(f"エラー: {str(e)}")
        raise ValueError(f"エラーが発生しました。{str(e)}")

# メイン処理
if __name__ == "__main__":
    # プロンプトのリスト
    prompts = ["天気はどうですか?", "おすすめの映画は何ですか?", "AIは将来どのように進化しますか?"]
    
    # 非同期でレスポンスを取得
    responses = async_gpt_responses(prompts)

    # レスポンスを出力
    for i, response in enumerate(responses):
        print(f"プロンプト {i + 1}: {prompts[i]}")
        print(f"返答: {response}\n")

実例:ChatGPTでの応用

では実際非同期の処理を使ったらどのようなことが便利にできるかを実際に検証します。
非同期処理のメリットは複数なので、10件のデータを同時に送り返答にどれくらいかかったかを計測したいと思います。
10件のデータは以下のように作成します。

main_text = "今回の記事"

text_list = [
    """
    上記の記事を要約してください。
    """,
    """
    上記の記事を箇条書きにまとめてください。
    """,
    """
    非同期処理についてのメリットを述べてください。
    """,
    """
    上記の記事をギャルとしてまとめてください。
    """,
    """
    上記の記事を弁護士としてまとめてください。
    """,
    """
    上記の記事を読んで、ギャルとしての感想をください。
    """,
    """
    上記の記事を読んで、プロのエンジニアとして感想をください。
    """,
    """
    上記の記事を読んで、改善点を上げてください。
    """,
    """
    上記の記事を読んで、ずんだもんとして返答してください。
    ずんだもんは「なのだ」を語尾につける
    ずんだもんとは東北地方応援キャラクター「東北ずん子」の関連キャラクターで
    東北ずん子が所持するずんだアローに変身するずんだ餅の妖精です。
    """,
    """
    上記の記事を読んで批判的に感想を書いてください。
    ================
    ツンデレ口調で慰めてください。
    """
]

時間差の計測のために同期実行の関数も用意します

def sync_gpt_responses(prompts):
    try:
        responses = []

        # 同期的にAPIを呼び出す
        for prompt in prompts:
            response = get_gpt_response(prompt)
            responses.append(response)

        return responses
    except Exception as e:
        logging.error(f"エラー: {str(e)}")
        raise ValueError(f"エラーが発生しました。{str(e)}")

そしてmainの箇所をpuronptを下記のように変更し実行時間を取得
2回めはasync_gpt_responsesをsync_gpt_responsesに変えれば完璧

# メイン処理
if __name__ == "__main__":
    # プロンプトのリスト
    #prompts = ["天気はどうですか?", "おすすめの映画は何ですか?", "AIは将来どのように進化しますか?"]
    prompts = [main_text + " " + prompt for prompt in text_list]
    start_time = time.time()
    # 非同期でレスポンスを取得
    responses = async_gpt_responses(prompts)
    end_time = time.time()
    print(f"実行時間: {end_time - start_time} 秒")
    # レスポンスを出力
    for i, response in enumerate(responses):
        #print(f"プロンプト {i + 1}: {prompts[i]}")
        print(f"返答: {response}\n")

結果

同期実行での返答時間は239秒に対し、
非同期実行では返答は49秒でした。
やはり圧倒的ですね。ソリャソウダ

まとめ

以上。
Pythonでの非同期処理の実装と、それをローカル環境とGoogle Colabで活用するコードについての解説及び検証でした。
個人的には大量のテキストデータをada002に投げるのに大活用しています。
非同期処理を使った場合は同期処理よりも処理時間が圧倒的に短縮されました。これは大量のAPIリクエストやデータ解析が必要な業務の短縮にとても役立つのでぜひ活用してみてください。


いいなと思ったら応援しよう!