見出し画像

【コピペで競馬AI開発】|競馬予想AIを「コピペ」で開発!【回収率120%】 -ver.1.2.1


2024.08.08 ver.1.2.1 にアップデートしました!

・00.data_scraping.ipynb の不具合を修正しました。
・00.data_scraping.ipynb
のコードを全体的に見直し、スクレイピング速度を60%ほどアップしました!

2024.07.08 ver.1.2.0 に大幅アップデートしました!

・特徴量(調教師)追加
・全コードの見直し / 最適化を行い、処理速度を向上
・有料特典 myChatGPT 「Pakapaka」を更新

詳しいアップデート内容は以下の記事をご覧ください。

※ ver.1.1.0 含め旧バージョンファイルは、有料エリアの【旧ver. 保管庫】にて保管しています。



この記事は、プログラミング初心者・非エンジニアを含め、「誰でも簡単に」競馬AIを作れるよう考えて作られた解説書です。

  • プログラミングの知識は全くないけれども、本格的な競馬AIを自分で動かしてみたい人

  • 独自の競馬予想AIを作って運用したい人

  • 競馬予想で機械学習を始めてみたい人


コードを全公開しており、コピペだけ回収率120%(2023年実績)の競馬AIを動かせるようになります。

コピペするのも面倒だという方に向けて、プログラムファイルをダウンロードできるようにしました!

2024.02.14 更新

Python環境の構築方法から解説していますので、現時点で全く知識がなくてもOKです!

本記事の通りに進めていけば、必ず競馬AIを動かせます。


「競馬AIを作ってみたいけれども、プログラミングがさっぱり…。」
「競馬AIで膨大なデータを処理しつつ、自分のロジックも加えて予想すれば最強なのでは…?」

こんな悩みを抱えていませんか?

競馬AIを一から作るには、膨大な時間と試行回数、時には大きなコストがかかってきます。

そして、厄介なことに、競馬AIは自らの利益に直結するため開発プロセスは機密とされ、個人開発者のほとんどが0からのスタートを余儀なくされます。

この記事を読むことで、私の長期にわたる足跡を辿り、試行錯誤をすっ飛ばすことができます。

少ないながらも転がっている情報の取捨選択、プログラミングの話、実際にAIを稼働させるための環境構築etc…。
競馬AI開発は、取りかかるだけでも相当の手間がかかります。

完成された競馬予想AIをベースにして、そのまま使うも良し、カスタマイズして自分流に育てていくもよし。

素敵な競馬ライフを手にしましょう!

自分なりのロジックや、独自の方法を追加しながら、長期運用し楽しむことができます。

約60,000字本一冊分くらいあるかなりの分量となりますが、ぜひぜひ、ご一読ください!

さまざまな特典もご用意しています。




【更新情報】

この note および競馬予想AIは、継続的にアップデートされ、進化していく予定です。
ここでは、更新履歴、開発の中長期目標を掲載しています。


◯ 更新履歴 -(最新:ver.1.2.0)

・2024.01.10 記事を公開しました
・2024.01.24 
パラメーターチューニングのプログラムを追加しました。
・2024.02.01 払い戻し情報をスクレイピングするプログラムを追加し、02.model.ipynb のプログラムを修正しました。
・2024.02.14 プログラムファイルをダウンロードできるよう更新しました。
・2024.03.02 特典や特徴量エンジニアリングなどの項目、その他記事を大幅に見直し・追記し、再公開を行いました。-(ver.1.1.0 へ更新)
・2024.03.17 04.prediction.ipynb のコードを最適化し、処理速度と可読性を向上させました。
・2024.04.07 馬券購入を最適化するプログラム ◆ bet_optimization.ipynb を追加しました。
・2024.07.08 特徴量として「調教師」を追加、全コードの最適化を行い、処理速度を向上しました。-(ver.1.2.0 へ更新)
※ 旧バージョンファイルは、【旧ver. 保管庫】よりダウンロード可能です。
・2024.08.08 ① 00.data_scraping.ipynb でスクレイピングができない不具合を修正しました。(原因はnetkeibaのサイト構成変更によるもの)
② 00.data_scraping.ipynb のコードを全体的に見直し、スクレイピング速度を60%ほどアップしました!-(ver.1.2.1 へ更新)


◯ 中長期目標

・特徴量(調教師)追加:近日中追加予定(2024.03.17 追記)
→追加済(24.07.08)

・特徴量(馬ごとの血統テーブル)追加

・コードのクラス化 - ひとつのファイルを実行するだけで、すべての工程が行われるようにする。コードの可読性、保守性を向上する。
・全コードの解説を追加


【はじめに】

  1. この記事は、確実に儲かることを保証するものではありません。実運用など、必ず自己責任でご利用ください。

  2. 馬券購入、実運用は、十分な余剰資金を用意して行いましょう。

  3. コードや文章の転載、ファイルの譲渡は禁止です。


【特典】

はじめに の次が特典って…。こんな記事見たことありませんが、本記事はChatGPTやGoogleのGemini(旧Bard)をはじめとした生成AIの使用を推奨していることもあり、載せる位置を迷った結果ここになりました。


◯ 生成AIのプロンプト集

深津式プロンプト・システムは、Note社主催のセミナー「あなたの仕事が劇的に変わる!?チャットAI使いこなし最前線」で深津さんから紹介された革新的なプロンプト・システムです。(おすすめ参考記事
競馬AIでエラーが出てしまった時カスタマイズに苦戦した時…。
AIから一撃で精度の高い回答を得るのに効果的なプロンプト(指示書)です。ぜひ使ってみてください。

①コード解説のプロンプト
よくわからない、読めないコードを解説してもらうときに使うプロンプトです。

②コード記述プロンプト
こんなコードを書いてくれ、の時に使うプロンプトです。

③コード効率化プロンプト
コードの高速化、効率的な記述に修正したい時に使うプロンプトです。


【Python を動かすための環境構築】

Python は、プログラミング言語の一つです。
後述しますが、簡単かつ機能的な言語として有名で、YoutubeやInstagramなんかもPythonを使って作られています。
機械学習に向いており、この世に存在するほぼ全ての競馬AIはPythonによってコード記述されているといっても過言ではないでしょう。

当然本記事の競馬AIも、使用言語はPythonです。

開発を進める前に、まずはご自身のPCでPythonが動作する環境を整えましょう。

【Windows 編】

私はMacOS環境ですので詳細までわかりませんが、こちらのサイトがわかりやすいように感じました。
Progate という有名なプログラミング学習サイトの記事ですので、信頼度も高いです。

この記事では「Visual Studio」を使用していますが、この記事では「jupiter lab」というツールをおすすめしております。

この記事にある、「2.Jupyter Labだけをインストールする方法」を参照に、コマンドプロンプトに以下のコードを入力してインストールしましょう。

pip install jupyter lab

jupyter lab はブラウザ上で動作するソフトです。
起動方法は、コマンドプロンプトに以下コードを入力しEnterを押すだけです。

jupyter lab

これで準備が整いました。ブラウザ上で jupyter lab が起動します。
jupyter lab の使い方についても、先ほどの記事に詳しく記載があります。
jupyter labは割と直感的に操作ができますが、わからなければ記事を見てみてください。

【Mac 編】

この記事を見ながら、見よう見まねでやってみましょう。
何をどうしてるのかよくわからないと思いますが、こんなのわからなくて大丈夫です。
コードをコピペしながら言われるがまま、環境構築をチャチャっと済ませてしまいましょう。

多くの環境構築を解説するサイトで Visual Studio Code というコーディングツールを入れているようですが、おすすめは jupyter lab です。
下のコードを入力してインストールしましょう。

pip install jupyterlab

jupyter lab はブラウザ上で動作するソフトです。
起動方法は、コマンドプロンプトに以下コードを入力しEnterを押すだけです。

jupyter lab

jupyter notebook というものもありますが、今入れた jupyter lab は jupyter notebook の正統進化版、上位互換です。
jupyter lab を入れましょう。

jupyter lab の使い方も確認してみてください。


◯ 競馬AIで使用するプログラミング言語はPython一択

競馬AIで使用するプログラミング言語は、現状 Python 一択です。
一応、Pythonを使うメリットを書いておきますが、興味のない方はすっ飛ばしていただいて大丈夫です!

◆ Python のメリットは ?

・習得難易度が他言語に比べて低い。
Python は、文法がとてもシンプルで、習得難易度の低い言語として有名です。また、それによってPythonを操ることのできる人口が非常に多く、学習教材やYoutube動画が多く存在しています。

無料教材も他言語と比較して極めて多いため、一円もかけずに独学なんてことも可能です。私も一切お金はかけず、かつ短期間で言語を習得しました。

・コードが読みやすい
Python は、コードがシンプルであるがゆえに、初心者でも非常に読みやすいです。

クラスや複雑な構造を持つコードになるとさすがに読むにも一苦労しますが、それでも他言語と比べれば解読しやすく、理解の容易な言語といえるでしょう。

また、今は生成AI全盛の時代。エラーが出て困ったときはChatGPTに聞いてしまえば、大体解決できます。ChatGPTを操る力の方が、プログラミング能力より使えるかも…と思っています。

(プログラミングに使える ChatGPTのプロンプト集を用意したので、上の目次から飛んでぜひ使ってみてください。)

・競馬AIに必要な「機械学習」に向いている言語

機械学習で使われるプログラミング言語には、Java、C++、Cなどの汎用性がある言語のほか、JavaScriptやPython、R、Scala、Julia、Octaveなどがあります。この中で圧倒的に利用人口が多く、人気の高いプログラミング言語が「Python」です。

なぜ人気で、かつ機械学習に向いているのかというと、「参照できるライブラリやフレームワークが充実している」「信頼性が高い」「軽量」といった理由があります。

「参照できるライブラリ」というのは、簡単に言ってしまえば、コードを書かずともインポートするだけですぐに欲しい機能が実装できるもの、です。
Pythonはコードを書かずとも、このライブラリをプログラムにインポートするだけで様々な機能を実現することができます。

そして、この便利なライブラリが、機械学習に必要な高度な機能を持つものまで幅広く揃えられているのです。


お疲れさまでした!
前置きが長くなりましたが、ここまでの作業で、PC上にPython環境の構築と、コードを実行 / 編集する jupyter lab がインストールできました!

それでは、いよいよ【コピペで】競馬AI開発をしていきましょう!!


【競馬予想AI の仕組み】

競馬予想AI がレースの結果を予想する仕組みといいますか、流れは以下のような感じです。

① 過去のレース結果のデータを取得する。
膨大な数の過去レース結果を取得します。
net.keiba.comのサイトページから、馬の名前や着順、天気に至るまで、必要となる様々なデータを取得していきます(スクレイピングといいます)。

それを表の形で保存して、次の工程に進みます。

② スクレイピングしたデータを加工して、処理する。
スクレイピングした過去のレース結果データを、予想に使えるデータに変換していきます。例えば、馬や騎手の名前、天気、馬場状態の情報などを数値に変換することで、機械が理解できるようなデータにしていきます。

また、特徴量と言われる、AIが予想をする際に重要な指標となるデータを作成していくのも、この段階です。

コピペで作成する競馬AIは、現在約130個の特徴量をもとに予測を行っています。

③ 予想モデルを用いて、レースを予想する。
LightGBMといった機械学習の分析アルゴリズムを使用し、②のデータを基にした予想モデルを作成します。

そして、レース当日のデータを読み込ませて、馬ごとのスコアを算出し予想していく、という流れになります。


【競馬予想AI の使い方】

ここで、競馬AIの使い方について記載しておきます。

【競馬AIの 使い方】
これから記載していくプログラムコードのファイル名には、頭に数字が振ってあります。基本的にはその数字の順番にプログラムを実行していくものと覚えておいてください。


◆ 事前準備 <①〜③ >
まずはじめに、①〜③を実行してください。このプラグラム達は、一度実行したらOK、その後毎回実行する必要はありません。
プログラムを更新した時や、データを新しくしたい時のみ、再度実行します。

① 00.data_scraping.ipynb を実行する。
まず、過去のレースデータを取得します。初期値は 2013~2023 の10年分を指定していますが、膨大なデータを取ってくるかなり時間のかかる処理ですので、PCのスペックによっては途中で止まってしまいます。(1年のデータ取得に30分~1時間かかります。)

その場合、2013 - 2015、2016 - 2018といったように数年ごとに分けて少しずつデータを取得していってください。

あまり古いデータを取得してもあまり意味がないので、初期値の通りの範囲内、2013からの10年分くらいをスクレイピングしてくるのが良いと思います。

② 01.encode.ipynb を実行する。
取得した過去のレースデータを処理して加工していきます。
次のプログラムで予想モデルを作る際に、このデータ処理が必要になります。

独自の特徴量を追加する場合、ここのプログラムにコードを追加していきます。(ここのコードの特徴量数と④の特徴量数が合わないとエラーになるので、同様のコードを④にも追加する)

③ 02.model.ipynb を実行する。
②で加工したデータをもとに、予測モデルを作成します。

特徴量の重要度(AIがどの特徴量を重視して予測しているか)を表示する機能もあり、今後のカスタマイズの際に力を発揮できるようにしています。


◆ 実際にレースを予想する <④〜⑥ >
実際にレースを予想する際に、このプログラムを実行します。
予想したいレースの詳細が発表され、netkeiba.comの出馬表が確定したら、実行してください。

④ 03.race_scraping.ipynb を実行する。
予想したい当日のレースデータを取得し、データ処理を行います。

⑤ 04.prediction.ipynb を実行する。
④で取得したレースデータと、③で作成した予想モデルを用いて、馬の予測スコアを算出します。数値が高い馬ほど、3着以内にくる期待値が高くなります。

⑥ 05.predict_result.ipynb を実行する。
⑤で実行したスコアをもとに、予想印を打ってくれるプログラムです。


◆ 追加プログラム
頭に数字が振られていないプログラムは、AIの最適化などに使える追加プログラムで、適宜実行してください。

・parameter_tuner.ipynb
③のモデルを作成するプログラムにあるLightGBMのパラメーターを、自動でチューニングしてくれるプログラムです。
最適なパラメーターを自動で導き出し、数値を提示してくれます。

・payback_scraping.ipynb
回収率算出のために、払い戻し情報をスクレイピングするプログラムです。


【競馬AI をコピペで開発する】

● 下準備① AI格納フォルダの作成

まず最初に、AIのプログラミングを格納するフォルダを作成します。
基本的にPC上のどの場所にフォルダを作成しても構いません。
jupyter lab上ですぐにフォルダにアクセスできるよう、マイフォルダ上やデスクトップなどわかりやすい位置がおすすめです。

ここでは、フォルダ名をKeiba_AI とでもしておきましょう。

● 下準備② データ格納フォルダの作成

プログラムを実行する中で、取得したデータや出力するファイルを保存するフォルダを作成しておきます。

ここで作成するフォルダの名前は、書いた通りのものにしてください。プログラム上でフォルダ名を指定しているので、違った名前にするとエラーが出ます。

ファイル名を変更したい場合は、必ずプログラムの該当箇所も変更するようにしてください。

  • data

  • encoded

  • model

  • race_data

  • predict_result

  • payback

  • calc_rate

  • config

上記の計8個のフォルダを作成し、先ほどのAI格納フォルダ内に置いてください。


私のPC環境では、これより提示するコードの全てでエラーが出ず、実行できることを確認しています。
しかし、PC環境によっては予期せずエラーが発生する場合があります。

エラーが発生したら、エラー部分を全てコピペして、ChatGPTGemini「以下のようなエラーが発生します。解決策を具体的なコードを提示しながら教えてください。」と聞いてください。

その通りに修正をすれば、ほとんどの場合解決できます。


① 過去レースデータのスクレイピング

それでは、いよいよプログラムを作成していきましょう!
jupyter lab上で、Keiba_AIフォルダの中にプログラムファイルを作成します。
そうしたら、以下のコードをコピペしてください。

◆ 00.data_scraping.ipynb

#過去レースをスクレイピングし、学習データを蓄積するプログラム。

import requests
from bs4 import BeautifulSoup
import concurrent.futures
import pandas as pd
from tqdm.notebook import tqdm
import time
import os

#取得開始年〜取得終了年までのデータをスクレイピング
#取得開始年
year_start = 2013
#取得終了年
year_end = 2024


place_dict = {"01": "札幌", "02": "函館", "03": "福島", "04": "新潟", "05": "東京",
              "06": "中山", "07": "中京", "08": "京都", "09": "阪神", "10": "小倉"}

# ユーザーエージェントのリストを設定
user_agents = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.67",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36",
    "Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1"
]

def fetch_race_data(url, max_retries=3):
    retries = 0
    while retries < max_retries:
        try:
            headers = {'User-Agent': random.choice(user_agents)}  # ランダムなユーザーエージェントを選択
            r = requests.get(url, headers=headers)
            r.raise_for_status()
            return r.content
        except requests.exceptions.RequestException as e:
            print(f"Error: {e}")
            retries += 1
            wait_time = random.uniform(2, 5)  # リトライ間隔を増加
            print(f"Retrying in {wait_time} seconds...")
            time.sleep(wait_time)
    return None

def parse_race_data(race_id, content):
    if content is None:
        return []
    soup = BeautifulSoup(content, "html.parser", from_encoding="euc-jp")
    soup_span = soup.find_all("span")
    main_table = soup.find("table", {"class": "race_table_01 nk_tb_common"})
    if not main_table:
        print('continue: ' + race_id)
        return []

    race_data = []
    for row in main_table.find_all("tr")[1:]:  # ヘッダ行をスキップ
        cols = row.find_all("td")

        # 走破時間
        runtime = cols[7].text.strip() if len(cols) > 7 else ''
        # 通過順
        pas = cols[10].text.strip() if len(cols) > 10 else ''
        # 体重
        var = cols[14].text.strip()
        try:
            weight = int(var.split("(")[0])
            weight_dif = int(var.split("(")[1].replace(")", ""))  # `[:-1]` の代わりに `replace(")", "")` を使用
        except (ValueError, IndexError):  # ValueErrorとIndexErrorの両方を捕捉
            weight = 0
            weight_dif = 0

        # 調教師名の抽出
        trainer_name = cols[18].find('a').text.strip() if cols[18].find('a') else ''

        # 上がり
        last = cols[11].text.strip() if len(cols) > 11 else ''
        # 人気
        pop = cols[13].text.strip() if len(cols) > 13 else ''
        
        # レースの詳細情報を取得
        try:
            var = soup_span[8]
            sur = str(var).split("/")[0].split(">")[1][0]
            rou = str(var).split("/")[0].split(">")[1][1]
            dis = str(var).split("/")[0].split(">")[1].split("m")[0][-4:]
            con = str(var).split("/")[2].split(":")[1][1]
            wed = str(var).split("/")[1].split(":")[1][1]
        except IndexError:
            try:
                var = soup_span[7]
                sur = str(var).split("/")[0].split(">")[1][0]
                rou = str(var).split("/")[0].split(">")[1][1]
                dis = str(var).split("/")[0].split(">")[1].split("m")[0][-4:]
                con = str(var).split("/")[2].split(":")[1][1]
                wed = str(var).split("/")[1].split(":")[1][1]
            except IndexError:
                var = soup_span[6]
                sur = str(var).split("/")[0].split(">")[1][0]
                rou = str(var).split("/")[0].split(">")[1][1]
                dis = str(var).split("/")[0].split(">")[1].split("m")[0][-4:]
                con = str(var).split("/")[2].split(":")[1][1]
                wed = str(var).split("/")[1].split(":")[1][1]
        soup_smalltxt = soup.find_all("p", class_="smalltxt")
        detail = str(soup_smalltxt).split(">")[1].split(" ")[1]
        date = str(soup_smalltxt).split(">")[1].split(" ")[0]
        clas = str(soup_smalltxt).split(">")[1].split(" ")[2].replace(u'\xa0', u' ').split(" ")[0]
        title = str(soup.find_all("h1")[1]).split(">")[1].split("<")[0]
        
        race_data.append([
            race_id,
            cols[3].text.strip(),  # 馬の名前
            cols[6].text.strip(),  # 騎手の名前
            cols[2].text.strip(),  # 馬番
            trainer_name,  # 調教師
            runtime,  # 走破時間
            cols[12].text.strip(),  # オッズ
            pas,  # 通過順
            cols[0].text.strip(),  # 着順
            weight,  # 体重
            weight_dif,  # 体重変化
            cols[4].text.strip()[0],  # 性
            cols[4].text.strip()[1],  # 齢
            cols[5].text.strip(),  # 斤量
            cols[20].text.strip(),  # 賞金
            last,  # 上がり
            pop,  # 人気
            title,  # レース名
            date,  # 日付
            detail,
            clas,  # クラス
            sur,  # 芝かダートか
            dis,  # 距離
            rou,  # 回り
            con,  # 馬場状態
            wed,  # 天気
            place_code,  # 場id
            place,  # 場名
        ])
    return race_data

def process_race(url_race_id_tuple):
    url, race_id = url_race_id_tuple
    content = fetch_race_data(url)
    return parse_race_data(race_id, content)

total_years = year_end - year_start + 1

with tqdm(total=total_years, desc="Total Progress", position=0, leave=True) as pbar_total:
    for year in range(year_start, year_end + 1):
        race_data_all = []
        urls = []
        race_ids = []

        for place_code, place in place_dict.items():
            for z in range(1, 8):  # 開催回数分ループ(1回〜6回)
                for y in range(1, 14):  # 開催日数分ループ(1日〜12日)
                    race_id_base = f"{year}{place_code}{z:02d}{y:02d}"
                    for x in range(1, 13):  # レース数分ループ(1R〜12R)
                        race_id = f"{race_id_base}{x:02d}"
                        url = f"https://db.netkeiba.com/race/{race_id}"
                        urls.append(url)
                        race_ids.append(race_id)

        with tqdm(total=len(urls), desc=f"Year {year}", position=1, leave=True) as pbar_year:
            with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
                future_to_url = {executor.submit(process_race, url_race_id): url_race_id for url_race_id in zip(urls, race_ids)}
                for future in concurrent.futures.as_completed(future_to_url):
                    result = future.result()
                    race_data_all.extend(result)
                    pbar_year.update(1)

        # スクレイピングしたデータをPandas DataFrameに変換
        df = pd.DataFrame(race_data_all, columns=[
            'race_id', '馬', '騎手', '馬番', '調教師', '走破時間', 'オッズ', '通過順', '着順', '体重', '体重変化',
            '性', '齢', '斤量', '賞金', '上がり', '人気', 'レース名', '日付', '開催', 'クラス',
            '芝・ダート', '距離', '回り', '馬場', '天気', '場id', '場名'
        ])

        # 各race_idごとに出走頭数を計算
        headcount_series = df.groupby('race_id')['race_id'].transform('count')

        # 'race_id'列の次に出走頭数列を挿入
        race_id_index = df.columns.get_loc('race_id') + 1  # 'race_id'列の位置を取得し、その次の位置を計算
        df.insert(race_id_index, '出走頭数', headcount_series)
        
        # SHIFT-JISでエンコーディングする前にデータをクレンジング
        df = df.apply(lambda col: col.map(lambda x: x if isinstance(x, str) else str(x)).fillna(''))

        # 変更を加えたDataFrameをCSVファイルとして保存
        output_dir = 'data'
        os.makedirs(output_dir, exist_ok=True)
        output_path = os.path.join(output_dir, f'{year}.csv')
        df.to_csv(output_path, index=False, encoding="SHIFT-JIS", errors="replace")
        
        print(f"{year}年のデータを保存しました: {output_path}")
        
        pbar_total.update(1)

print("終了")

◆ コピペが面倒な人は、こちらからファイルをダウンロードしてください。



② データ処理 (特徴量エンジニアリング)

取得した過去のレースデータを処理して加工していきます。
以下のコードをコピペしてください。

01.encode.ipynb

## 00.data_scraping.ipynbのスクレイピングデータを抽出、加工する。馬ごとの近5走における各特徴量データを追加。

import pandas as pd
from sklearn.preprocessing import LabelEncoder
import numpy as np

def class_mapping(row):
    # クラス名を数値にマッピング
    mappings = {'障害': 0, 'G1': 10, 'G2': 9, 'G3': 8, '(L)': 7, 'オープン': 7, 'OP': 7, '3勝': 6, '1600': 6, '2勝': 5, '1000': 5, '1勝': 4, '500': 4, '新馬': 3, '未勝利': 1}
    for key, value in mappings.items():
        if key in row:
            return value
    return 0

def standardize_times(df, col_name):
    # 走破時間を秒に変換
    time_parts = df[col_name].str.split(':', expand=True)
    seconds = time_parts[0].astype(float) * 60 + time_parts[1].str.split('.', expand=True)[0].astype(float) + time_parts[1].str.split('.', expand=True)[1].astype(float) / 10
    seconds = seconds.ffill()
    
    # 平均と標準偏差を計算し標準化
    mean_seconds = seconds.mean()
    std_seconds = seconds.std()
    df[col_name] = -((seconds - mean_seconds) / std_seconds)
    
    # 外れ値処理
    df[col_name] = df[col_name].apply(lambda x: -3 if x < -3 else (2 if x > 2.5 else x))
    
    # 再度標準化
    mean_seconds_2 = df[col_name].mean()
    std_seconds_2 = df[col_name].std()
    df[col_name] = (df[col_name] - mean_seconds_2) / std_seconds_2
    
    return mean_seconds, std_seconds, mean_seconds_2, std_seconds_2

def add_seasonal_features(df, date_columns):
    # 日付カラムから季節特徴量を追加
    for date_col in date_columns:
        if not np.issubdtype(df[date_col].dtype, np.datetime64):
            df[date_col] = pd.to_datetime(df[date_col])
        df[f'{date_col}_sin'] = np.sin((df[date_col].dt.month - 1) * (2 * np.pi / 12))
        df[f'{date_col}_cos'] = np.cos((df[date_col].dt.month - 1) * (2 * np.pi / 12))

yearStart = 2011
yearEnd = 2011
yearList = np.arange(yearStart, yearEnd + 1)
dfs = []

print("ファイル取得:開始")

for year in yearList:
    file_path = f"data/{year}.csv"
    df = pd.read_csv(file_path, encoding="SHIFT-JIS", header=0)
    df['日付'] = pd.to_datetime(df['日付'], format='%Y年%m月%d日', errors='coerce')
    df['着順'] = pd.to_numeric(df['着順'], errors='coerce')
    df = df.dropna(subset=['着順'])
    df['着順'] = df['着順'].astype(int)
    df['賞金'] = pd.to_numeric(df['賞金'], errors='coerce').fillna(0)
    dfs.append(df)

df_combined = pd.concat(dfs, ignore_index=True)

print("ファイル取得:完了")
print("データ変換:開始")

# NaNが含まれる行を削除
df_combined = df_combined.dropna(subset=['走破時間'])

# 走破時間の標準化
mean_seconds, std_seconds, mean_seconds_2, std_seconds_2 = standardize_times(df_combined, '走破時間')

print('1回目平均' + str(mean_seconds))
print('2回目平均' + str(mean_seconds_2))
print('1回目標準偏差' + str(std_seconds))
print('2回目標準偏差' + str(std_seconds_2))

# 標準化情報をCSVに保存
time_df = pd.DataFrame({
    'Mean': [mean_seconds, mean_seconds_2],
    'Standard Deviation': [std_seconds, std_seconds_2]
}, index=['First Time', 'Second Time'])
time_df.to_csv('config/standard_deviation.csv')

# 通過順の平均を計算
pas = df_combined['通過順'].str.split('-', expand=True)
df_combined['通過順'] = pas.astype(float).mean(axis=1)

# マッピング情報を適用
mappings = {
    '性': {'牡': 0, '牝': 1, 'セ': 2},
    '芝・ダート': {'芝': 0, 'ダ': 1, '障': 2},
    '回り': {'右': 0, '左': 1, '芝': 2, '直': 2},
    '馬場': {'良': 0, '稍': 1, '重': 2, '不': 3},
    '天気': {'晴': 0, '曇': 1, '小': 2, '雨': 3, '雪': 4}
}
for column, mapping in mappings.items():
    df_combined[column] = df_combined[column].map(mapping)

# クラス変換を適用
df_combined['クラス'] = df_combined['クラス'].apply(class_mapping)

print("データ変換:完了")
print("近5走取得:開始")

# データをソート
df_combined.sort_values(by=['馬', '日付'], ascending=[True, False], inplace=True)

features = ['馬番', '騎手', '斤量', 'オッズ', '体重', '体重変化', '上がり', '通過順', '着順', '距離', 'クラス', '走破時間', '芝・ダート', '天気', '馬場']

# 各馬の過去5走の情報をシフトしながら取得し、ffillで欠損値を補完
shifts = {}
for i in range(1, 6):
    shifts[f'日付{i}'] = df_combined.groupby('馬')['日付'].shift(-i)
    for feature in features:
        shifts[f'{feature}{i}'] = df_combined.groupby('馬')[feature].shift(-i).ffill()

# 新しい列を一度にDataFrameに追加
df_combined = pd.concat([df_combined, pd.DataFrame(shifts)], axis=1)

# race_idと馬でグルーピングし、最新の特徴量を取得
df_combined = df_combined.groupby(['race_id', '馬'], as_index=False).last()
df_combined.sort_values(by='race_id', ascending=False, inplace=True)

print("近5走取得:終了")
print("日付変換:開始")

df_combined.replace('---', np.nan, inplace=True)

# 距離差と日付差を計算
df_combined['距離差'] = df_combined['距離'] - df_combined['距離1']
df_combined['日付差'] = (df_combined['日付'] - df_combined['日付1']).dt.days
for i in range(1, 5):
    df_combined[f'距離差{i}'] = df_combined[f'距離{i}'] - df_combined[f'距離{i+1}']
    df_combined[f'日付差{i}'] = (df_combined[f'日付{i}'] - df_combined[f'日付{i+1}']).dt.days

# 斤量関連の列を数値に変換し、変換できないデータはNaNに
kinryo_columns = ['斤量', '斤量1', '斤量2', '斤量3', '斤量4', '斤量5']
df_combined[kinryo_columns] = df_combined[kinryo_columns].apply(pd.to_numeric, errors='coerce')
df_combined['平均斤量'] = df_combined[kinryo_columns].mean(axis=1)

# 騎手の勝率を計算し、保存およびマージ
jockey_win_rate = df_combined.groupby('騎手')['着順'].apply(lambda x: (x == 1).sum() / x.count()).reset_index()
jockey_win_rate.columns = ['騎手', '騎手の勝率']
jockey_win_rate.to_csv('calc_rate/jockey_win_rate.csv', index=False)
df_combined = pd.merge(df_combined, jockey_win_rate, on='騎手', how='left')

# 各レースの出走頭数を計算
df_combined['出走頭数'] = df_combined.groupby('race_id')['race_id'].transform('count')

# 各馬に対して過去5レースの出走頭数を特徴量として追加
for i in range(1, 6):
    df_combined[f'出走頭数{i}'] = df_combined.groupby('馬')['出走頭数'].shift(i).fillna(0)

# 距離と走破時間からスピードを計算し、平均スピードを新しい列として追加
for i in range(1, 6):
    df_combined[f'スピード{i}'] = df_combined[f'距離{i}'] / df_combined[f'走破時間{i}']
df_combined['平均スピード'] = df_combined[[f'スピード{i}' for i in range(1, 6)]].mean(axis=1, skipna=True)
df_combined.drop(columns=[f'スピード{i}' for i in range(1, 6)], inplace=True)

# 過去5走の賞金を取得し、賞金合計を計算
for i in range(1, 6):
    df_combined[f'賞金{i}'] = df_combined.groupby('馬')['賞金'].shift(i)
df_combined['過去5走の合計賞金'] = df_combined[[f'賞金{i}' for i in range(1, 6)]].sum(axis=1)
df_combined.drop(columns=[f'賞金{i}' for i in range(1, 6)] + ['賞金'], inplace=True)

df_combined.sort_values(by='race_id', ascending=False, inplace=True)

# 日付カラムから年、月、日を抽出
df_combined['year'] = df_combined['日付'].dt.year
df_combined['month'] = df_combined['日付'].dt.month
df_combined['day'] = df_combined['日付'].dt.day

# 季節特徴量を追加
date_columns = ['日付1', '日付2', '日付3', '日付4', '日付5']
add_seasonal_features(df_combined, date_columns)

date_columns = ['日付', '日付1', '日付2', '日付3', '日付4', '日付5']
for col in date_columns:
    df_combined['year'] = df_combined[col].dt.year
    df_combined['month'] = df_combined[col].dt.month
    df_combined['day'] = df_combined[col].dt.day
    df_combined[col] = (df_combined['year'] - yearStart) * 365 + df_combined['month'] * 30 + df_combined['day']
df_combined.drop(['year', 'month', 'day'], axis=1, inplace=True)

print("日付変換:終了")

# 騎手の乗り替わり特徴量を追加
df_combined['騎手の乗り替わり'] = df_combined.groupby('馬')['騎手'].transform(lambda x: (x != x.shift()).astype(int))

# カテゴリカル変数のラベルエンコーディング
categorical_features = ['馬', '騎手', '調教師', 'レース名', '開催', '場名', '騎手1', '騎手2', '騎手3', '騎手4', '騎手5']
for i, feature in enumerate(categorical_features):
    print(f"\rProcessing feature {i+1}/{len(categorical_features)}", end="")
    le = LabelEncoder()
    df_combined[feature] = le.fit_transform(df_combined[feature].astype(str))

# エンコーディングとスケーリング後のデータを確認
print("ファイル出力:開始")
df_combined.to_csv('encoded/encoded_data.csv', index=False)
print("ファイル出力:終了")

◆ コピペが面倒な人は、こちらからファイルをダウンロードしてください。


③ 予想モデルの作成 (LightGBM)

予想モデルを作成するプログラムです。
以下をコピペしてください。

予想モデルの作成と、回収率計算の算出プログラムの2つが実行されるようになっています。

回収率計算の算出プログラムでは、下の項目にあるpayback_scraping.ipynb で取得したデータを使用します。

payback_scraping.ipynb で払い戻しデータを取得していない場合、途中でエラーが発生しますが、予想モデル作成のプログラムは動作しデータ出力されるので、payback_scraping.ipynb の実行が面倒な場合、エラーは無視しても大丈夫です。(payback_scraping.ipynb でのデータ取得は結構時間がかかります。。

◆ 02.model.ipynb

## LigftGBMを用いて、着順予測を行うモデルを作成するプログラム。単勝_複勝回収率を計算する。

import lightgbm as lgb
import pandas as pd
from sklearn.metrics import roc_auc_score
import numpy as np
import ast


def split_date(df, test_size):
    sorted_df = df.sort_values('日付')
    train_size = int(len(sorted_df) * (1 - test_size))
    train = sorted_df.iloc[:train_size]
    test = sorted_df.iloc[train_size:]
    return train, test

# データの読み込み
data = pd.read_csv('encoded/encoded_data.csv')

# 着順を変換
data['着順'] = data['着順'].map(lambda x: 1 if x < 4 else 0)

# 特徴量とターゲットの分割
train, test = split_date(data, 0.3)
X_train = train.drop(['着順', 'オッズ', '人気', '上がり', '走破時間', '通過順'], axis=1)
y_train = train['着順']
X_test = test.drop(['着順', 'オッズ', '人気', '上がり', '走破時間', '通過順'], axis=1)
y_test = test['着順']

# LightGBMデータセットの作成
train_data = lgb.Dataset(X_train, label=y_train)
valid_data = lgb.Dataset(X_test, label=y_test)

# ハイパーパラメータ
params = {
    'objective': 'binary',
    'metric': 'binary_logloss',
    'verbosity': -1,
    'boosting_type': 'gbdt',
    'class_weight': 'balanced',
    'random_state': 100,
    'feature_pre_filter': False,
    'lambda_l1': 4.2560625081811865e-05,
    'lambda_l2': 4.74860278547497,
    'num_leaves': 5,
    'feature_fraction': 0.9520000000000001,
    'bagging_fraction': 1.0,
    'bagging_freq': 0,
    'min_child_samples': 20,
    'n_estimators': 1000,
}

# モデルの訓練
lgb_clf = lgb.LGBMClassifier(**params)
lgb_clf.fit(X_train, y_train)
y_pred_train = lgb_clf.predict_proba(X_train)[:, 1]
y_pred = lgb_clf.predict_proba(X_test)[:, 1]

# モデルの評価
print(roc_auc_score(y_test, y_pred))

# 混同行列の計算
TP = ((y_test == 1) & (y_pred >= 0.5)).sum()
FP = ((y_test == 0) & (y_pred >= 0.5)).sum()
TN = ((y_test == 0) & (y_pred < 0.5)).sum()
FN = ((y_test == 1) & (y_pred < 0.5)).sum()
total_cases = len(y_test)

accuracy_TP = TP / total_cases * 100
misclassification_rate_FP = FP / total_cases * 100
accuracy_TN = TN / total_cases * 100
misclassification_rate_FN = FN / total_cases * 100

print("Total cases:", total_cases)
print(f"True positives(実際に3着内で、予測も3着内だったもの): {TP} ({accuracy_TP:.2f}%)")
print(f"False positives(実際は3着外だが、予測では3着内だったもの): {FP} ({misclassification_rate_FP:.2f}%)")
print(f"True negatives(実際に3着外で、予測も3着外だったもの): {TN} ({accuracy_TN:.2f}%)")
print(f"False negatives(実際は3着内だが、予測では3着外だったもの): {FN} ({misclassification_rate_FN:.2f}%)")

# モデルの保存
lgb_clf.booster_.save_model('model/model.txt')

# 特徴量の重要度を取得し表示
importance = lgb_clf.feature_importances_
feature_names = X_train.columns
indices = np.argsort(importance)[::-1]
for f in range(X_train.shape[1]):
    print(f"{f + 1:2d}) {feature_names[indices[f]]:<30} {importance[indices[f]]}")

# 単勝回収率、複勝回収率の計算
years = range(2013, 2023)
df = [pd.read_csv(f"../payback/{year}.csv", encoding="SHIFT-JIS", header=None) for year in years]
betting_data = pd.concat(df, ignore_index=True)

# 予測結果を元に賭ける馬を決定
betting_horses = {(test.iloc[i]['race_id'], test.iloc[i]['馬番']): y_pred[i] for i in range(len(y_pred)) if y_pred[i] >= 0.5}
betting_data.set_index(betting_data.iloc[:, 0].astype(str).str.strip(), inplace=True)

win_return_amount = 0  # 単勝の回収金額
place_return_amount = 0  # 複勝の回収金額
for (race_id, horse_number) in betting_horses:
    race_id = str(int(float(race_id)))
    horse_number = str(int(float(horse_number)))  # 馬番を文字列に変換
    race = int(race_id[-2:])
    if race_id in betting_data.index:
        race_data = betting_data.loc[race_id]
        race_data_list = ast.literal_eval(race_data[1])

        win_data = race_data_list[0]  # 単勝のデータを取得
        place_data = race_data_list[1]  # 複勝のデータを取得

        for j in range(0, len(win_data), 2):
            if win_data[j] == horse_number:
                win_return_amount += int(win_data[j + 1].replace(',', ''))

        for j in range(0, len(place_data), 2):
            if place_data[j] == horse_number:
                place_return_amount += int(place_data[j + 1].replace(',', ''))
    else:
        print(f"Race ID {race_id} not found in betting data.")

# 単勝と複勝の回収率を計算
betting_amount = len(betting_horses)
win_return_rate = win_return_amount / betting_amount if betting_amount else 0
place_return_rate = place_return_amount / betting_amount if betting_amount else 0

print("単勝回収率:", win_return_rate)
print("複勝回収率:", place_return_rate)

◆ コピペが面倒な人は、こちらからファイルをダウンロードしてください。


ここから先は有料とさせてください。

④〜⑥ 当日のレースデータをスクレイピングで取得し、データを加工、その後実際にレース結果を予想していくプログラムです。

加えて、独自開発を支援してくれるようチューニングしたmyChatGPTなどの特典、盛りだくさんの内容を用意するとともに、今までと同様に継続的な更新を予定しております。
(どんな内容が書かれているかは、目次をご覧ください。

継続開発をしていくためにも、ご理解のほどよろしくお願いします🙇‍♂️

ここから先は

31,786字 / 1画像 / 7ファイル

¥ 6,980

この記事が参加している募集

この記事が気に入ったらチップで応援してみませんか?