見出し画像

【実装付きAI技術解説】RAGシステム最適化のためのチャンキング手法比較|TECH BLOG #06


1. はじめに

はじめまして、AIVALIX株式会社でAIエンジニアを務めております、小峰天馬と申します。現在は慶應義塾大学理工学部1年生で来年からは物理情報工学科に進学いたします。AIVALIXでは、主にLLM・RAG関連のR&Dに取り組んでおり、現在はAIチャットサービスの精度向上プロジェクトに従事しています。

さて、弊社ではテックブログを通じたAI・LLM関連の情報発信に力を入れております。2週間に1回有益な情報を投稿しておりますので、研究や開発の一助となれば幸いです。

今までのテックブログでは論文紹介をしていたのですが、第6回目である今回のテックブログでは、「チャンキング(Chunking)」と呼ばれる、文章を複数の短いブロックに分割する手法について紹介します。一口にチャンキングと言ってもたくさんの手法があり、状況に応じて使い分けることが重要です。そこで、この記事では主に4つのチャンキング手法を説明、比較していきます。LLM初学者でもわかりやすい内容になっておりますので、ぜひご覧ください。


2. Chunkingとは

Chunking とは、長い文章(テキスト)を複数の短いブロック(チャンク)に分割する手法を指します。

なぜそのようなプロセスが有効なのか?疑問に思う方も多いのではないでしょうか。

ニュース記事・論文・書籍などの非常に長い文章を扱う際、そのまま自然言語処理モデルに入力すると、以下のような問題が生じることがあります。

問題1. 計算コストが膨大になる

モデルが一度に扱えるトークン数にはハードウェア的な上限や制約が存在し、長い文章を入力するとメモリ不足や極端な処理時間の増大が起こることがあります。

問題2. 入力長制限(Context Window Limit)

多くのモデルでは一度に扱える入力長(トークン数)に上限があります。これを超える部分は切り捨てられる、またはエラーになるなどの問題が起きるため、長すぎる文章はモデルに入力しづらいのです。

問題3. モデルが要点を見落とす

長大な文章には重要でない情報も大量に含まれます。単純に全部詰め込むとノイズが増え、正解にたどり着きにくくなる可能性があります。そこで、文章を小さなまとまりに分割する(Chunking) ことで、モデルが扱う情報量を適度に抑え、かつ必要な文脈を切り出しやすくするのが狙いです。

Chunking は「テキストをどう区切るか」によって、モデルの応答品質や計算効率が左右されるため、さまざまな手法が研究・実装されている領域です。
Chunkingの変数のうち最も重要になってくるものの一つがチャンクサイズです。

チャンクが大きい場合:チャンク内に大きな文脈情報が入るためモデルが情報を逃しにくくなります、その一方で不要な情報や文脈も一緒に含まれてしまい、正答率が下がる可能性があるほか、メモリや計算負荷が増大してしまいます。
チャンクが小さい場合:ノイズや計算負荷は低減されるのですが、工夫をしないと文脈が分断されてしまい回答精度が下がってしまう可能性があります。

このようにChunking は「テキストをどう区切るか」によって、モデルの応答品質や計算効率が左右されるため、さまざまな手法が研究・実装されている領域です。


3. Chunking手法の紹介

Chunkingがどういったものなのかわかったところで、いくつかの実際のChunking手法を見ていきましょう。

チャンク分けの仕方は大きく2つに分かれており、1つが機械的に単語数や文単位で分けていく方法、そしてもう1つが意味的な切れ目で文章を区切っていく方法です。

今回紹介する手法は固定長(Fixed)LangchianspaCy-SentMeta-PPLの4つで、固定長とspaCyに関しては機械的に、LangchainとMeta-PPLに関しては意味的にチャンクに分割しています。

手法1. 固定長(Fixed)

文字数や単語数で機械的に分割する単純な手法です。実装が非常に簡単で計算コストが少ないですが、文脈を無視して分割してしまいます。

手法2. Langchain

LangChain というライブラリの仕組みを使った手法です。これは RecursiveCharacterTextSplitter というクラスを用いて、設定された文字数を基準に分割を行いつつ、適度な重複(overlap)を持たせるのが特徴です。重複を加えることで、文脈がチャンクの境界で中途半端に切れず、前後の情報を多少なりとも保持できます。単純な固定長手法よりは幾分意味を尊重した分割ができるものの、実際にはまだ「文法的な単位」を確実に守っているわけではなく、設定値の微調整が必要な場合があります。

手法3. SpaCy-Sent

SpaCy ライブラリの文分割機能(nlp(t).sents) を利用し、文単位にチャンクを作ります。ピリオドなどの文字を目印にしてチャンク分けするため途中で文が切れてしまうことは少ないですが、1文が長文の場合や、1文の中に答えが完全に含まれていない場合などは、文脈が分かれてしまうこともあります。

手法4. Meta-PPL

Meta-PPL という方法は、GPT-2 のような言語モデルで各文のパープレキシティ(Perplexity)を計算し、値が大きく変化する箇所で文章を区切るという手法です。文脈や意味の急な変化があるとパープレキシティが跳ね上がる(あるいは大きく下がる)傾向があり、そうした箇所を区切れ目とすることで、意味のまとまりを保ったチャンクを形成しやすくなります。しかしながら、このアプローチは文ごとにモデルでの推論を行うため、長大なテキストや多数のサンプルを扱う際には計算量が膨大になるデメリットがあります。

上記の手法は以下の論文を参考にしました。


4. 実装

以上の4つの手法を比較するために実際に実装してみました。

今回のコードでは、まず SQuAD v2 という質問応答用のデータセットを読み込み、そこから必要な数のサンプルを選び出しています。各サンプルは「質問文(question)」「正解となる回答群(answers)」「回答を探すための文脈(context)」の3つを含んでおり、これらを一括で qa_pairs というリストに格納します。こうすることで、後の処理でスムーズにチャンク分割やモデルへの入力を行えるようになります。

次に、pipeline("question-answering", model="deepset/roberta-large-squad2") を呼び出して、抽出型質問応答に特化した RoBERTa のモデルを用意します。ここで用いられるモデルは、あらかじめ SQuAD タスクで学習されたもので、文章と質問を与えると最もスコアが高い回答箇所を抽出してくれる仕組みです。サンプルコードでは、複数のチャンクに対して順番に推論を実行し、スコアの最も高い結果を「最終的な回答」とみなす方針を採っています。もしスコアが一定の閾値(threshold)を下回った場合は「回答なし」とし、無理に不正確な回答を出さないように設計しました。

回答の評価には F1 スコアが用いられています。SQuAD 形式に即した F1 スコアを計算する関数を実装しており、正解が複数ある場合でも最も高い一致度を取る正解と比較することで最大値を算出します。質問応答の実験では、どの程度文字列が正確にマッチしているかを確認するうえで F1 は代表的な指標ですが、今回のコードでは回答がない場合(回答なし)にも対処できるようになっています。

最後に、これらの手法をまとめて評価するために evaluate_method という関数を用意し、Chunking にかかる時間や生成されるチャンク数、平均チャンク長、そして最終的な F1 スコアを計測・集計しています。結果を表形式でまとめることで、Chunking の違いが QA タスクにどう影響するかを客観的に把握できるようになります。

F1スコアとは:適合率(precision)と再現率(recall)の調和平均で、QAタスクでは、「予測した文字列」と「正解の文字列」に含まれる共通単語の数からF1スコアを求めています。

以下に各手法の説明とコード全文を載せておきます。

手法1. 固定長(Fixed)

lambda t: [t[i:i+100] for i in range(0, len(t), 100)]

固定長で文字数で区切ります。50、100、200など変更して比較しました。

手法2. Langchain

RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=50).split_text

LangChain ライブラリの RecursiveCharacterTextSplitter を使い、1チャンクあたり 512 or 256文字+最大50文字のオーバーラップを持たせました。

手法3. SpaCy-Sent

lambda t: [sent.text.strip() for sent in nlp(t).sents if sent.text.strip()]

SpaCy の言語モデルを用いた文分割機能を活用しています。内部的には nlp(t).sents というプロパティを呼び出し、文章内の文ごとの区切りを自動解析しながら、文を1つずつ取り出してリスト化しています。

手法4. Meta-PPL

def compute_perplexity(text, max_length=512):
    with torch.no_grad():
        inputs = tokenizer_ppl(text, return_tensors="pt", truncation=True, max_length=max_length)
        loss = model_ppl(**inputs, labels=inputs["input_ids"]).loss
    return float(torch.exp(loss))

def meta_perplexity_chunking(text, chunk_size=512, threshold=0.4):
    sentences = text.split(". ")
    if not sentences:
        return [""]

    ppl_scores = [compute_peplexity(sent) for sent in sentences]
    
    chunks = []
    current_chunk = []
    
    for i, sentence in enumerate(sentences):
        if i == 0:
            current_chunk.append(sentence)
        else:
            if ppl_scores[i-1] - ppl_scores[i] > threshold:
                if len(" ".join(current_chunk)) > chunk_size:
                    chunks.append(" ".join(current_chunk))
                    current_chunk = [sentence]
                else:
                    current_chunk.append(sentence)
            else:
                current_chunk.append(sentence)

    if current_chunk:
        chunks.append(" ".join(current_chunk))

    return chunks

Meta-PPL のアルゴリズムでは、まず文章を「.(ピリオド)」で文単位に分割し、得られた各文について GPT-2 の損失値から導出されるパープレキシティ(PPL)を計算します。前の文と比較して PPL が一定の閾値(たとえば 0.4)以上に変化したところを文脈の切れ目とみなし、そこで新しいチャンクを作ります。さらに、1 つのチャンク内があまりにも大きくなりすぎる場合には強制的に区切ることで、1 チャンクあたりの文字数が極端に増えないようにしています。 今回はGPT-2の事前学習モデルであるmodel_pplを用いて損失関数を計算しています。 その内部機構は次のとおりです。

  1. テキストをトークナイズ し、トークンIDに変換する。

  2. モデルに入力し、各トークンの予測確率を出力 する。

  3. 出力した確率分布と、実際の次の単語のIDを比較し、交差エントロピー損失を求める。

  4. この損失を torch.exp(loss) で指数変換すると、Perplexity になる。
    以下がコード全文です。

!pip install transformers torch datasets langchain sentence-transformers spacy pandas tqdm
!python -m spacy download en_core_web_sm

import time
import torch
import numpy as np
import spacy
import pandas as pd
from tqdm import tqdm
from transformers import AutoTokenizer, pipeline, AutoModelForCausalLM
from datasets import load_dataset
from langchain.text_splitter import RecursiveCharacterTextSplitter
from IPython.display import display

**#データセットの読み込み**

dataset = load_dataset("squad_v2", split="validation[:100]")

qa_pairs = []
for item in tqdm(dataset, desc="Processing Dataset"):
    qa_pairs.append({
        "question": item["question"],
        "answers": item["answers"]["text"],
        "context": item["context"]
    })

print("\\nQAペアの統計:")
print(f" - 総サンプル数: {len(qa_pairs)}")
print(f" - 回答あり: {sum(1 for p in qa_pairs if len(p['answers'])>0 and p['answers'][0].strip()!='')}件")
print(f" - 回答なし: {sum(1 for p in qa_pairs if len(p['answers'])==0 or p['answers'][0].strip()=='')}件")

**#QAモデルの準備**

qa_pipeline = pipeline("question-answering", model="deepset/roberta-large-squad2")

def get_answer_from_chunks(question, chunks, threshold=0.15):
    best_answer, best_score = "[]", -1
    for chunk in chunks:
        try:
            result = qa_pipeline(question=question, context=chunk, top_k=1)
            if result["score"] > best_score:
                best_score = result["score"]
                best_answer = result["answer"]
        except:
            continue
    return best_answer if best_score > threshold else "[]"

#F1スコアの計算

def compute_f1(predicted, ground_truth):
    pred_empty = predicted in ("[]", "")
    gold_empty = len(ground_truth) == 0 or all(ans.strip() == "" for ans in ground_truth)
    
    if gold_empty and pred_empty:
        return 1.0
    if gold_empty or pred_empty:
        return 0.0
    
    predicted_tokens = set(predicted.lower().split())
    max_f1 = 0.0
    for ans in ground_truth:
        ans_tokens = set(ans.lower().split())
        common = predicted_tokens & ans_tokens
        if not common:
            continue
        precision = len(common) / len(predicted_tokens)
        recall = len(common) / len(ans_tokens)
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        max_f1 = max(max_f1, f1)
    return max_f1
    
#チャンキング手法

nlp = spacy.load("en_core_web_sm")

tokenizer_ppl = AutoTokenizer.from_pretrained("gpt2")
model_ppl = AutoModelForCausalLM.from_pretrained("gpt2")

def compute_perplexity(text, max_length=512):
    with torch.no_grad():
        inputs = tokenizer_ppl(text, return_tensors="pt", truncation=True, max_length=max_length)
        loss = model_ppl(**inputs, labels=inputs["input_ids"]).loss
    return float(torch.exp(loss))

def meta_perplexity_chunking(text, chunk_size=512, threshold=0.4):
    sentences = text.split(". ")
    if not sentences:
        return [""]

    ppl_scores = [compute_perplexity(sent) for sent in sentences]
    
    chunks = []
    current_chunk = []
    
    for i, sentence in enumerate(sentences):
        if i == 0:
            current_chunk.append(sentence)
        else:
            if ppl_scores[i-1] - ppl_scores[i] > threshold:
                if len(" ".join(current_chunk)) > chunk_size:
                    chunks.append(" ".join(current_chunk))
                    current_chunk = [sentence]
                else:
                    current_chunk.append(sentence)
            else:
                current_chunk.append(sentence)

    if current_chunk:
        chunks.append(" ".join(current_chunk))

    return chunks

chunking_methods = {
    "Fixed-100": lambda t: [t[i:i+100] for i in range(0, len(t), 100)],
    "Fixed-200": lambda t: [t[i:i+200] for i in range(0, len(t), 200)],
    "LangChain-512": RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=50).split_text,
    "SpaCy-Sent": lambda t: [sent.text.strip() for sent in nlp(t).sents if sent.text.strip()],
    "Meta-PPL": meta_perplexity_chunking
}

#評価関数

def evaluate_method(method_name, chunking_func):
    total_f1, total_time = 0.0, 0.0
    chunk_stats = []
    
    contexts = [qa["context"] for qa in qa_pairs]
    
    start_time = time.time()
    all_chunks = [chunking_func(ctx) for ctx in tqdm(contexts, desc=f"Chunking ({method_name})")]
    total_time += time.time() - start_time
    
    for chunks in all_chunks:
        if len(chunks) == 0:
            chunk_stats.append({"count": 0, "avg_length": 0})
        else:
            chunk_stats.append({
                "count": len(chunks),
                "avg_length": sum(len(c) for c in chunks) / len(chunks)
            })
    
    for i, qa in enumerate(tqdm(qa_pairs, desc=f"Evaluating ({method_name})")):
        predicted = get_answer_from_chunks(qa["question"], all_chunks[i])
        total_f1 += compute_f1(predicted, qa["answers"])
    
    avg_f1 = total_f1 / len(qa_pairs)
    time_per_ctx = total_time / len(qa_pairs)
    avg_chunks = sum(s["count"] for s in chunk_stats) / len(chunk_stats)
    avg_chars = sum(s["avg_length"] for s in chunk_stats) / len(chunk_stats)
    
    return {
        "method": method_name,
        "avg_f1": avg_f1,
        "time_per_ctx": time_per_ctx,
        "avg_chunks": avg_chunks,
        "avg_chars": avg_chars
    }

results = []
for method_name, func in chunking_methods.items():
    res = evaluate_method(method_name, func)
    results.append(res)

df = pd.DataFrame(results)
df = df[["method", "avg_f1", "time_per_ctx", "avg_chunks", "avg_chars"]]
df.columns = ["Method", "F1 Score", "Time/Ctx (s)", "Avg Chunks", "Avg Chars"]

print("\\nFinal Results:")
display(df.style.format({
    "F1 Score": "{:.3f}",
    "Time/Ctx (s)": "{:.3f}",
    "Avg Chars": "{:.1f}"
}))


5. 結果

実際に出力を確認して結果を見ていきましょう。

手法比較の結果

結果は以上のようにMeta-PPLが最も高い結果となりました。しかし、処理時間が圧倒的にかかっていることから計算に見合うような精度かという点には疑問が残ります。さらに、表に注目すると平均文字数とF1スコアには強い相関が見られ、精度向上がチャンキング手法によるものというよりは、単にチャンクサイズが増えたことの影響が強いと考えられます。(以下の図が示す通り、相関係数は0.8近くもあります)

また、同じようなチャンクサイズで比較した際に手法ごとにF1スコアに大差がないことからもこの影響は見て取れます。

F1スコアとchunkサイズの相関図

この影響を減らすために、データセットやQAモデルを変更することも考えられますが、チャンクサイズと回答精度に強い依存関係があることは変わらないでしょう。

また、データセットやQAモデルなどに応じてチャンキング手法を変えていくことが大切であり、実際の社内のシステムや環境に合わせて最適なものを選択していくことが重要です。


6. 最後に

近年のRAGの普及に伴い、その精度の向上は大きなトピックになっています。RAGには様々な要所技術があり、今回はその中でもChunkingに注目しました。日々新たな手法や論文が増えている中で、良い手法はどんどん取り入れ、これからも精度を高めていきたいと思っています。

今回のテックブログは以上となります。この記事が少しでも読者の皆様のためになったのであれば幸いです。最後まで読んでいただき、ありがとうございました。今後も引き続き情報発信していきますのでどうぞよろしくお願いいたします。


AIVALIX株式会社では、AIに関する開発やR&Dの案件を常時承っております。また、共同開発・共同研究にご協力いただける企業様も広く募集しております。

AI技術を通じた新たな価値創造に向け、一緒に挑戦していただけるパートナーを心よりお待ちしております。ご興味をお持ちの際は、ぜひお気軽にお問い合わせください。

▼ コーポレートサイトはこちら ▼

いいなと思ったら応援しよう!