見出し画像

Pythonで雀魂の対局結果を自動集計する⑦(完結編)

こんにちは!カツキチです。

前回、ラズパイ環境にBOTプログラムを移行し、対局結果集計が常時実行できるようになりました。
しかし、対局結果以外の画像でも中途半端にプログラムが動いてしまいます。ちゃんとエラー処理しないとダメです。

敏感なBOT君

画像が対局結果であるかの判定

どのような判定基準にするか迷いましたが、OpenCVで実装しているテンプレートマッチングで返却される値のうち、「信頼度」を用いることにしました。
いくつかの画像を処理したところ、信頼度の閾値として0.7が妥当なようです。
0.7以下の場合は結果集計プログラムを中断する処理を追加しました。

    # 検索結果の信頼度と位置座標の取得
    min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)

    # 信頼度が低い場合は終了
    if max_val < 0.7:
        exit()

AIにコードレビューしてもらう

上手くいくことしか考えずに実装してきたので、想定外の動作の時に原因分析できるための情報を出力しなければなりません。
しかしPython初心者の私にはそのような技量はありませんので、AIに頼ることにしました。
ChatGPTにとてもシンプルなプロンプトで聞いてみます。

プロンプト:
以下のコードをレビューしてください。
(Pythonのコードをべた書き)

1番自信のなかったDiscord連携プログラムで試したところ、ChatGPTは以下のように答えました。


🚩 1. セキュリティ上の問題

❌ 問題: アクセストークンがハードコーディングされている

  • アクセストークンが直接記載されていると、流出リスクがあります。

✅ 改善: 環境変数で管理

import os
from dotenv import load_dotenv

load_dotenv()
TOKEN = os.getenv('DISCORD_BOT_TOKEN')
bot.run(TOKEN)
  • .envファイルに以下を記載:

DISCORD_BOT_TOKEN=YOUR_DISCORD_TOKEN

🚩 2. サブプロセス実行の危険性

❌ 問題: subprocess.run()に未検証データを渡している

  • 直接URLを渡すのはコマンドインジェクションのリスクがあります。

  • また、attachmentはオブジェクトであり、文字列として渡しても正しく動作しない可能性があります。

✅ 改善: 添付ファイルを一旦ダウンロード

import aiohttp
import os

async with aiohttp.ClientSession() as session: 
    async with session.get(attachment.url) as resp:
        if resp.status == 200:
            file_path = "temp_file.png"
            with open(file_path, "wb") as f:
                f.write(await resp.read())

            # 結果抽出プログラムを実行
            command = ["python", result_py, file_path]
            proc = subprocess.run(command, capture_output=True, text=True)

            await message.reply(proc.stdout)
            os.remove(file_path) # 一時ファイルを削除

🚩 3. イベント処理の問題

❌ 問題: on_message使用時にコマンドが動作しない

  • on_messageを使うと、commandsが無視されます。

✅ 改善: await bot.process_commands(message)を追加

@bot.event
async def on_message(message):
    if message.author.bot:
        return

    await bot.process_commands(message) # これを追加

🚩 4. エラーハンドリングの欠如

❌ 問題: サブプロセスやネットワークエラーの考慮がない

  • 失敗時にエラーメッセージを返せないと、デバッグが困難です。

✅ 改善: try-exceptで例外処理を追加

try:
    proc = subprocess.run(command, capture_output=True, text=True, check=True)
    await message.reply(proc.stdout)
except subprocess.CalledProcessError as e:
    await message.reply(f"実行エラー: {e.stderr}")
except Exception as ex:
    await message.reply(f"予期せぬエラー: {str(ex)}")

🚩 5. ハードコーディングの回避

❌ 問題: チャンネルIDが直書き

  • 別の環境で動かす場合やID変更時に不便です。

✅ 改善: 環境変数または設定ファイルで管理

TARGET_CHANNEL_ID = int(os.getenv("TARGET_CHANNEL_ID", "938086413352714260"))
if message.channel.id == TARGET_CHANNEL_ID:
    # 処理を実行
  • .envファイル:

TARGET_CHANNEL_ID=938086413352714260

🚀 最終的な改善ポイントまとめ

改善ポイント

この改善で、セキュリティ安定性拡張性が大幅に向上します🚀


すごーー!
知らない事ばかりです。修正コードまで提案してくれます。しかもこのコード、コピペでもちゃんと動作しました!

他のプログラムもレビューを受け、一つ一つ理解しながらコードを変更していきます。環境設定ファイルやログファイル、Pythonモジュールの追加などプログラムを構成自体もかなり変わりました。

雀魂友人戦結果集計プログラム完成版

AIコードレビューより今まで作成した3つのプログラムに以下を実装しました。

  • 環境設定ファイル作成(ハードコーディング対策)

  • コマンドインジェクション回避(セキュリティ対策)

  • エラーハンドリング追加

  • ログ出力

  • 名前補正用辞書ファイルの外部ファイル化

実装後に再レビューを受けましたがいくらでも改善できそうですので、現時点ではこれを完成版とします。

新たに追加するライブラリはこちらです。

PS > pip install python-dotenv

環境設定ファイル Ver 1.0

# Pythonファイル、フォルダ、
RESULT_PY = './result.py'
SUMMARY_PY = './summary.py'
OUTPUT_DIR = './output/'
TEMPLATE_DIR= './template/'
INPUT_IMAGE = './output/image.png'
WRITE_IMAGE = './output/image_wr.png'
LOG_FILE = './log/mst.log'

# 集計用Excelファイル
RESULT_EXCEL = './result.xlsx' #ファイル名
EXCEL_SHEET = '対局結果' #シート名
BIKOU = 'XXX-BOT' #備考欄に記載する情報
SUMMARY_EXCEL = 'https://1drv.ms/XXXXX' # Excel(OneDrive)のURL

# OCR関連
CONFIDENCE_THRESHOLD = 0.7 #OCR信頼度の閾値
INPUT_IMAGE_WIDTH = 2000 #画像の横幅がこれより小さければPC(ブラウザ版)、大きければSP(スマホ版)

# Tesseractのパス(Windows)
TESSERACT_PATH = 'C:\\Program Files\\Tesseract-OCR'
TESSDATA_PATH = 'C:\\Program Files\\Tesseract-OCR\\tessdata'

# 名前辞書変換
NAME_DICT_FLAG = 1 # 1:有効, 0:無効
NAME_DICT_CSV = './name_dict.csv' # 名前辞書用CSVファイル

# Discord関連
DISCORD_BOT_TOKEN = 'XXXXX'
TARGET_CHANNEL_ID = 'XXXXX'

Discord連携プログラム Ver 1.1

from discord.ext import commands
import discord
import subprocess
import os
from dotenv import load_dotenv
import aiohttp
from my_logger import set_logger, getLogger

load_dotenv()
set_logger()
logger = getLogger(__name__)

# 結果抽出プログラム
RESULT_PY = os.getenv('RESULT_PY')
INPUT_IMAGE = os.getenv('INPUT_IMAGE')

# Discordの権限設定
intents = discord.Intents.default()
intents.members = True # メンバー管理の権限
intents.message_content = True # メッセージの内容を取得する権限

bot = commands.Bot(
    command_prefix="$", # $コマンド名 でコマンドを実行できるようになる
    case_insensitive=True, # 大文字小文字を区別しない
    intents=intents # 権限を設定
)

# Discordにログイン
@bot.event
async def on_ready():
    print(f"Logged in as {bot.user.name}")
    logger.info(f"Logged in as {bot.user.name}")

# 投稿を確認
@bot.event
async def on_message(message: discord.Message):

    # BOTのメッセージは無視
    if message.author.bot:
        return
    
    # 投稿されたメッセージが特定のチャンネルIDの場合
    target_channel_id = os.getenv('TARGET_CHANNEL_ID')
    if message.channel.id == int(target_channel_id):

        logger.info(f"投稿メッセージ: {message}")

        # ファイルが添付されている場合
        if message.attachments:

            # 添付ファイルごとに処理
            for attachment in message.attachments:

                logger.info(f"添付ファイル: {attachment}")

                # 添付ファイルをダウンロード
                async with aiohttp.ClientSession() as session: 
                    async with session.get(attachment.url) as resp:
                        if resp.status == 200:
                            with open(INPUT_IMAGE, "wb") as f:
                                f.write(await resp.read())

                            try:
                                # 結果抽出プログラムを実行
                                command = ["python", RESULT_PY] 
                                proc = subprocess.run(command, capture_output=True, text=True)
                                print(f"標準出力: {proc.stdout}")
                                logger.info(f"標準出力: {proc.stdout}")
                                await message.reply(proc.stdout) # 標準出力のみDiscordに投稿
                            except subprocess.CalledProcessError as e:
                                print(f"実行エラー: {e.stderr}")
                                logger.error(f"実行エラー: {e.stderr}")
                            except Exception as ex:
                                print(f"予期せぬエラー: {str(ex)}")
                                logger.error(f"予期せぬエラー: {str(ex)}")

                            #os.remove(INPUT_IMAGE) # 一時ファイルを削除

    # on_message をオーバーライドすると、コマンドが実行されなくなる対策
    await bot.process_commands(message)

# BOTの実行
TOKEN = os.getenv('DISCORD_BOT_TOKEN')
bot.run(TOKEN)

結果抽出プログラム Ver 1.1

# ライブラリのインポート
import cv2
import pyocr
from PIL import Image
import os
import requests
import sys
from datetime import datetime,date
import subprocess
import openpyxl as px
import time
from my_logger import set_logger, getLogger
from dotenv import load_dotenv
import csv

#start = time.time()  # 現在時刻(処理開始前)を取得

load_dotenv()
set_logger()
logger = getLogger(__name__)

# 環境設定の読み込み
SUMMARY_PY = os.getenv('SUMMARY_PY')
OUTPUT_DIR = os.getenv('OUTPUT_DIR')
TEMPLATE_DIR = os.getenv('TEMPLATE_DIR')
NAME_DICT_FLAG = os.getenv('NAME_DICT_FLAG')
NAME_DICT_CSV = os.getenv('NAME_DICT_CSV')

# Windows環境ならPATHを設定
if os.environ['PATH'][0] == "C":
    TESSERACT_PATH = os.getenv('TESSERACT_PATH')
    TESSDATA_PATH = os.getenv('TESSDATA_PATH')
    os.environ["PATH"] += os.pathsep + TESSERACT_PATH
    os.environ["TESSDATA_PREFIX"] = TESSDATA_PATH

# OCRエンジン取得
tools = pyocr.get_available_tools()
tool = tools[0]

# OCRの設定 ※tesseract_layout=6が精度には重要。デフォルトは3
builder = pyocr.builders.TextBuilder(tesseract_layout=6)

# データ補正(名前)
if int(NAME_DICT_FLAG):
    with open(NAME_DICT_CSV, encoding='utf8') as f:
        csvreader  = csv.DictReader(f,delimiter=",", doublequote=True, lineterminator="\r\n", quotechar='"', skipinitialspace=True)
        NAME_CORRECTIONS = [row for row in csvreader]

# 指定した画像をOCRしてテキストを抽出
def img2text(image_file, lang, place):

    # グレイスケール&二値化
    img = cv2.imread(OUTPUT_DIR + image_file, 0); #グレイスケールで読み込む
    ret, img = cv2.threshold(img, 150, 255, cv2.THRESH_BINARY)
    cv2.imwrite(OUTPUT_DIR + str(place) + image_file, img)

    # OCR処理
    img = Image.open(OUTPUT_DIR + str(place) + image_file)
    txt = tool.image_to_string(img, lang=lang, builder=builder)
    txt = ''.join(txt.split())

    return(txt)

# 順位に応じたプレイヤーの名前、素点、順位点を出力
def player_info_print(template_file, place, flag, img):

    # テンプレート画像の読み込み
    template = cv2.imread(template_file)
    h, w, _ = template.shape # 幅と高さを取得

    # 画像の検索(Template Matching)
    result = cv2.matchTemplate(img, template, cv2.TM_CCORR_NORMED)

    # 検索結果の信頼度と位置座標の取得
    min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)

    # 信頼度が低い場合は終了
    CONFIDENCE_THRESHOLD = os.getenv('CONFIDENCE_THRESHOLD')
    if max_val < float(CONFIDENCE_THRESHOLD):
        logger.error(f"画像検索結果の信頼度({max_val})が閾値({CONFIDENCE_THRESHOLD})よりも低いため処理を終了します。")
        exit()

    # 検索結果の座標を取得
    top_left = max_loc # 左上
    bottom_right = (top_left[0] + w, top_left[1] + h) #右下

    # 検索対象画像内に、検索結果を水色の長方形で描画
    cv2.rectangle(img, top_left, bottom_right, (255, 255, 0), 2)

    # 名前と素点の座標を指定
    if flag == 'SP': #アプリ版
        if place == 1:
            player_name_tl  = (top_left[0] + 230, top_left[1] - 20)
            player_name_br  = (top_left[0] + 530, top_left[1] + 40)
            player_point_tl = (top_left[0] + 230, top_left[1] + 45)
            player_point_br = (top_left[0] + 530, top_left[1] + 135)
        else:
            player_name_tl  = (top_left[0] + 195, top_left[1] + 0)
            player_name_br  = (top_left[0] + 420, top_left[1] + 40)
            player_point_tl = (top_left[0] + 195, top_left[1] + 45)
            player_point_br = (top_left[0] + 420, top_left[1] + 115)
    else: #ブラウザ版
        if place == 1:
            player_name_tl  = (top_left[0] + 190, top_left[1] + 7)
            player_name_br  = (top_left[0] + 450, top_left[1] + 60)
            player_point_tl = (top_left[0] + 220, top_left[1] + 65)
            player_point_br = (top_left[0] + 450, top_left[1] + 120)
        else:
            player_name_tl  = (top_left[0] + 160, top_left[1] + 5)
            player_name_br  = (top_left[0] + 360, top_left[1] + 40)
            player_point_tl = (top_left[0] + 160, top_left[1] + 45)
            player_point_br = (top_left[0] + 360, top_left[1] + 95)

    # 指定した範囲を切り取り、画像ファイルとして保存
    cv2.imwrite(OUTPUT_DIR + 'player_name.png',  img[player_name_tl[1]:player_name_br[1],player_name_tl[0]:player_name_br[0]])
    cv2.imwrite(OUTPUT_DIR + 'player_point.png', img[player_point_tl[1]:player_point_br[1],player_point_tl[0]:player_point_br[0]])

    # 切り取った画像ファイルから文字認識
    name  = str(img2text('player_name.png', 'jpn', place))
    point = int(img2text('player_point.png','eng', place))

    # 順位点の計算
    uma = {1:15, 2:5, 3:-5, 4:-15} #順位ウマ
    rank = round(( point - 25000 ) / 1000 + uma[place], 2)

    # データ補正(名前)
    if int(NAME_DICT_FLAG):
        for wrong, correct in NAME_CORRECTIONS[0].items():
            name = name.replace(str(wrong), str(correct))

    # 1位の場合のみヘッダを標準出力
    if place == 1:
        print("==========対局結果==========")

    # 名前、素点、順位点を整形して標準出力(名前が短い人はタブを1つ追加)
    name_padding = '\t' if len(name.encode('utf-8')) < 17 else ''
    print(f"{place: >2}  {name}{name_padding}\t{point: >6}\t{round(float(rank),2): >5}")

    # 集計用Excelファイルに追記
    BIKOU = os.getenv('BIKOU')
    updatelist(today, now, place, name, point, rank, str(BIKOU))

    # 検索対象画像内に、検索結果を黄色い長方形で描画
    cv2.rectangle(img, player_name_tl, player_name_br, (0, 255, 255), 2)
    cv2.rectangle(img, player_point_tl, player_point_br, (0, 255, 255), 2)

# 集計用Excelに対局結果を追記
def updatelist(today, now, place, name, point, rank, comment):
    
    # 集計用Excelファイル
    RESULT_EXCEL = os.getenv('RESULT_EXCEL')
    EXCEL_SHEET = os.getenv('EXCEL_SHEET')

    # Excelファイルを開く
    wb = px.load_workbook(RESULT_EXCEL)
    ws = wb[EXCEL_SHEET]

    # 最大行番号を取得
    max = ws.max_row

    # 最終行に結果を追記
    ws.cell(row=max+1, column=1).value = today
    ws.cell(row=max+1, column=2).value = now
    ws.cell(row=max+1, column=3).value = place
    ws.cell(row=max+1, column=4).value = name
    ws.cell(row=max+1, column=5).value = point
    ws.cell(row=max+1, column=6).value = rank
    ws.cell(row=max+1, column=7).value = comment

    # Excelファイルを上書き保存
    wb.save(RESULT_EXCEL)

##############
#    MAIN    #
##############

# 検索対象画像をダウンロードして保存
#url = input("画像のURLを入力してください: ")
INPUT_IMAGE = os.getenv('INPUT_IMAGE')

# 引数(URL)があればダウンロードする
if len(sys.argv) > 1:
    urlData = requests.get(sys.argv[1]).content
    with open(INPUT_IMAGE, "wb") as aaa:
        aaa.write(urlData)

# 画像をOpenCVで読み込む
try:
    img = cv2.imread(INPUT_IMAGE)
    if img is None:
        raise ValueError(f"画像 '{INPUT_IMAGE}' を読み込めませんでした。")
except Exception as ex:
    print(f"エラー発生: {ex}")
    logger.error(f"エラー発生: {ex}")
    sys.exit()

# 画像のサイズからPCかSPかを決める
INPUT_IMAGE_WIDTH = os.getenv('INPUT_IMAGE_WIDTH')
if img.shape[1] < int(INPUT_IMAGE_WIDTH):
    flag = 'PC' #ブラウザ版
else:
    flag = 'SP' #アプリ版

# 現在日時を取得
today = date.today()
now = datetime.now().time()

# 順位ごとに処理
player_info_print(TEMPLATE_DIR + 'no1.png', 1, flag, img)
player_info_print(TEMPLATE_DIR + 'no2.png', 2, flag, img)
player_info_print(TEMPLATE_DIR + 'no3.png', 3, flag, img)
player_info_print(TEMPLATE_DIR + 'no4.png', 4, flag, img)

# 検索結果を描画した画像を出力
WRITE_IMAGE = os.getenv('WRITE_IMAGE')
cv2.imwrite(WRITE_IMAGE, img)

# 集計用コードを実行
command = ['python', SUMMARY_PY] 
proc = subprocess.run(command, capture_output=True, text=True).stdout
print(proc)

# 集計用Excelのリンクを出力
print("集計用Excelファイルはこちら")
print(os.getenv('SUMMARY_EXCEL'))

#end = time.time()  # 現在時刻(処理完了後)を取得
#print(f"処理時間: {round(end - start, 2)}秒")  # 処理にかかった時間データを使用

結果集計プログラム Ver 1.1

import openpyxl as px
from my_logger import set_logger, getLogger
import time
import os
from dotenv import load_dotenv

#start = time.time()  # 現在時刻(処理開始前)を取得

load_dotenv()
set_logger()
logger = getLogger(__name__)

# 集計用Excelファイル
RESULT_EXCEL = os.getenv('RESULT_EXCEL')
EXCEL_SHEET = os.getenv('EXCEL_SHEET')

# 順位毎の桁表示
place_dict = {
    1 : 1000,
    2 :  100,
    3 :   10,
    4 :    1
}

# Excelファイルを読み込む
try:
    wb = px.load_workbook(RESULT_EXCEL)
    ws = wb[EXCEL_SHEET]
except FileNotFoundError:
    print(f"ファイル '{RESULT_EXCEL}' が見つかりません。")
    logger.error(f"ファイル '{RESULT_EXCEL}' が見つかりません。")
    exit()
except KeyError:
    print(f"シート '{EXCEL_SHEET}' が見つかりません。")
    logger.error(f"シート '{EXCEL_SHEET}' が見つかりません。")
    exit()

# 全データを取得
all_data = list(ws.iter_rows(min_row=2, min_col=1, max_col=7, values_only=True))
wb.close()

# 重複排除したプレイヤーリストを取得
name_list = sorted(set(row[3] for row in all_data))

# 結果格納リストの初期化
result_list = []

print()
print("==========通算成績==========")

# 順位点と順位を出力
def calculate_rank_points(all_data, name, place_dict):
    rank_points = 0
    place = 0
    for row in all_data:
        if str(name) == row[3]:
            rank_points = round(rank_points + row[5], 2)
            place += place_dict.get(row[2], 0)
    return rank_points, '【' + f'{place:04}' + '】'

# 一人ずつ処理
for name in name_list:
    rank_points, place = calculate_rank_points(all_data, name, place_dict)
    result_list.append([name, rank_points, place])

# 結果格納リストを順位点でソート
result_list = sorted(result_list, reverse=True, key=lambda x: x[1])  

# 1位から名前、順位点、順位を出力
count = 1
for x in result_list:
    name_padding = '\t' if len(x[0].encode('utf-8')) < 17 else '' # 名前が短い人はタブを1つ追加
    print(f"{count: >2}  {x[0]}{name_padding}\t{round(float(x[1]),2): >6}\t{x[2]}")
    count += 1

#end = time.time()  # 現在時刻(処理完了後)を取得
#time_diff = end - start  # 処理完了後の時刻から処理開始前の時刻を減算する
#print(time_diff)  # 処理にかかった時間データを使用

GitHubに公開

作成したファイルはほかにもいくつかあり、またテンプレートフォルダやログフォルダなどもありますので、思い切ってGitHubに公開してみました。
GitHubという名前は知っていて、利用(主にダウンロード)はしたことがあるものの公開するのは初めてでした。主な流れはこんな感じです。

  1. GitHubアカウントの作成

  2. Gitのインストール

  3. リポジトリ作成(よくわかっていません)

  4. 個人情報が入らないようにプログラムを汎用化

  5. README、LICENSEファイルを作成(MITライセンスを選択)

  6. VSCodeからプッシュ

こちらに公開しています。ご指摘やご相談など遠慮なくお知らせください。

katsukitti/MahjongSoulTotalling

完走した感想

何気ない思い付きで2月1日から試作を始め、約1か月で完成にこぎつけました。もともとCやPerlなどのプログラミング経験があったため、言語は違えどやりたいことのイメージはありましたが、まさかこんなにいろいろなことが思い通り進むとは思っていませんでした。Python恐るべし!

一つの言語でDiscord、Excel、OCRの操作ができてしまう。こんなに汎用性が高く、手軽に書ける言語ってすごいですよね。

またDiscord、Tesseract-OCR、GitHubの世界もある程度知ることができて、自分自身、本当に良い経験になりました。
noteも初めて本格的に執筆し、全7回の投稿となりました。チキンタツタの予想シリーズも再開しようかなって思いました。

最後までお読みいただき、ありがとうございました!

いいなと思ったら応援しよう!