見出し画像

"究極のマルチモデルデータベース"「SurrealDB」を使ってRAGを構築してみた

こんにちは、IZAIエンジニアチームです。
今回は「The ultimate multi-model database(=究極のマルチモデルデータベース)と銘打つ「SurrealDB」を使ったRAGシステムを構築してみます。


究極の全部入りDB「SurrealDB」とは


SurrealDBは、データベース、API、セキュリティをすべて一体化した次世代の「全部入り」データベースです。
従来のバックエンド開発なしで、リアルタイム同期やWebSocketによる高速通信が可能。さらに、従来のSQLのJOIN操作を不要にするグラフクエリ機能を備え、スケーラブルな構成でクラスタや単一ノードに柔軟に対応します。JavaScriptやWebAssemblyによるカスタム拡張も可能で、クラウドやエッジ環境で高速に動作する、まさに万能型のデータベースです。

それでは早速最強のデータベースとLLM APIを接続してRAGを構築してみます

1. データセットの準備:Typst公式ドキュメントのスクレイピング


今回は、Typst(マークアップベースの新しい組版システム)の公式ドキュメントをデータセットとして使用します。TypstはLatexに代表される組版システムの一つですが、導入コストや書きやすさコンパイルの速さから近年急速に注目されています。
以下のPythonスクリプトを使用して、Typstの公式ドキュメントページ(https://typst.app/docs)から必要な情報をスクレイピングします。

このスクリプトは非同期処理を利用して効率的にウェブスクレイピングを行い、取得したデータをJSON形式で保存します。主な特徴は以下の通りです:

  1. `playwright`ライブラリを使用して、JavaScriptで動的に生成されるコンテンツも取得可能

  2. 非同期処理により、複数のページを並行してスクレイピング

  3. エラーハンドリングとリトライ機能を実装し、安定したスクレイピングを実現

  4. プログレス表示機能により、スクレイピングの進捗を可視化

以下がスクリプトの主要な部分です:

import asyncio
from playwright.async_api import async_playwright, TimeoutError
import json
from typing import List, Dict
import aiofiles
import random
from asyncio import Semaphore
from urllib.parse import urljoin

class TypstDocScraper:
    def __init__(self, base_url: str = "https://typst.app/docs"):
        self.base_url = base_url
        self.documents = []
        self.max_retries = 5
        self.delay_between_requests = 5  # seconds
        self.max_concurrent_requests = 3
        self.semaphore = Semaphore(self.max_concurrent_requests)
        self.total_links = 0
        self.processed_links = 0

    async def extract_nav_structure(self, page):
        # JavaScriptを使用してナビゲーション構造を抽出
        nav_structure = await page.evaluate('''
            () => {
                function extractLinks(ul, depth = 0) {
                    const links = [];
                    for (const li of ul.children) {
                        const a = li.querySelector('a');
                        if (a) {
                            const link = {
                                title: a.textContent.trim(),
                                href: a.getAttribute('href'),
                                depth: depth
                            };
                            links.push(link);
                            
                            const subUl = li.querySelector('ul');
                            if (subUl) {
                                links.push(...extractLinks(subUl, depth + 1));
                            }
                        }
                    }
                    return links;
                }
                
                const mainUl = document.querySelector('ul.animated');
                return mainUl ? extractLinks(mainUl) : [];
            }
        ''')
        return nav_structure

    async def scrape_page(self, browser, item, retry_count=0):
        async with self.semaphore:
            page = await browser.new_page()
            try:
                full_url = urljoin(self.base_url, item['href'].lstrip('/'))
                
                response = await page.goto(full_url, wait_until="domcontentloaded", timeout=120000)
                
                if response.status == 404:
                    print(f"{'  ' * item['depth']}404 Error: Page not found for {item['title']}")
                    return

                # コンテンツセレクタを待機
                content_selector = await page.wait_for_selector('main, .docs-content, article', timeout=120000)
                
                if content_selector:
                    content = await content_selector.inner_text()
                    if content and not content.startswith("404: Page not found"):
                        document = {
                            "title": item['title'],
                            "content": content,
                            "url": full_url,
                            "depth": item['depth']
                        }
                        self.documents.append(document)
                        await self.save_document(document)
                        self.processed_links += 1
                        self.print_progress()
                    else:
                        print(f"{'  ' * item['depth']}Warning: No valid content found for {item['title']}")
                else:
                    print(f"{'  ' * item['depth']}Warning: Content selector not found for {item['title']}")
            
            except Exception as e:
                if retry_count < self.max_retries:
                    print(f"{'  ' * item['depth']}Error occurred while processing {item['title']}: {str(e)}. Retrying ({retry_count + 1}/{self.max_retries})...")
                    await asyncio.sleep(random.uniform(5, 15))  # リトライ前のランダムな待機時間
                    await self.scrape_page(browser, item, retry_count + 1)
                else:
                    print(f"{'  ' * item['depth']}Max retries reached for {item['title']}. Skipping...")
            
            finally:
                await page.close()

    # その他のメソッド(print_progress, scrape, save_document, get_documents, save_to_json)は省略

async def main():
    scraper = TypstDocScraper()
    await scraper.scrape()

    documents = scraper.get_documents()
    print(f"\n\nScraped {len(documents)} documents")

    await scraper.save_to_json()

    if documents:
        print("\nPreview of the first document:")
        print(f"Title: {documents[0]['title']}")
        print(f"URL: {documents[0]['url']}")
        print(f"Depth: {documents[0]['depth']}")
        print(f"Content preview: {documents[0]['content'][:200]}...")
    else:
        print("No documents were scraped. Please check the console output for errors.")

if __name__ == "__main__":
    asyncio.run(main())

※ スクレイピング時は、画像や文章などの著作権を確認してください。
Webサイトによっては禁止されている場合もあるため、利用規約やrobots.txtを確認しましょう。
また、サーバーに負荷をかけないよう1秒程度間隔をあけて実行してください。

スクリプトを実行すると、Typstの公式ドキュメントから以下のようなJSONフォーマットのデータが得られます:

[
  {
    "title": "Overview",
    "content": "Overview\nOverview\n\nWelcome to Typst's documentation! Typst is a new markup-based typesetting system for the sciences. It is designed to be an alternative both to advanced tools like LaTeX and simpler tools like Word and Google Docs. Our goal with Typst is to build a typesetting tool that is highly capable and a pleasure to use.\n\n...",
    "url": "https://typst.app/docs/",
    "depth": 0
  },
  {
    "title": "Tutorial",
    "content": "Tutorial\nTutorial\n\nWelcome to Typst's tutorial! In this tutorial, you will learn how to write and format documents in Typst. We will start with everyday tasks and gradually introduce more advanced features. This tutorial does not assume prior knowledge of Typst, other markup languages, or programming. We do assume that you know how to edit a text file.\n\n...",
    "url": "https://typst.app/docs/tutorial/",
    "depth": 0
  },
  {
    "title": "Writing in Typst",
    "content": "Tutorial\nWriting in Typst\nWriting in Typst\n\nLet's get started! Suppose you got assigned to write a technical report for university. It will contain prose, maths, headings, and figures. To get started, you create a new project on the Typst app. You'll be taken to the editor where you see two panels: A source panel where you compose your document and a preview panel where you see the rendered document.\n\n...",
    "url": "https://typst.app/docs/tutorial/writing-in-typst/",
    "depth": 1
  },
  // ... 他のドキュメントデータ
]

2. データ格納:ドキュメントのインデックス化


前章で生成したTypstの公式ドキュメントデータを、SurrealDBを使用して効率的に格納し、後のRAG(Retrieval-Augmented Generation)システムで利用できるようにします。ここでは、データのチャンク分割埋め込み(embedding)、そしてデータベースへの保存が含まれます。

2.1 SurrealDBの準備

まず、Docker Composeを使用してSurrealDBのコンテナを起動します。以下の`docker-compose.yml`ファイルを使用します:

version: '3'
services:
  surrealdb:
    image: surrealdb/surrealdb:latest
    entrypoint:
      - /surreal
      - start
      - --auth
      - --log
      - trace
      - --user
      - $DB_USER
      - --pass
      - $DB_PASSWORD
      - memory # This starts SurrealDB in memory mode. Remove "memory" for persistent storage.
    ports:
      - "8000:8000"
    env_file:
      - .env # Ensure you have a .env file in the same directory as your docker-compose.yml

このファイルをプロジェクトのルートディレクトリに配置し、同じディレクトリに`.env`ファイルを作成してデータベースのユーザー名とパスワードを設定します:

DB_USER=your_username
DB_PASSWORD=your_password
OPENAI_API_KEY=your_openai_api_key

そして、以下のコマンドでSurrealDBコンテナを起動します:

docker compose up -d


2.2 データをSurrealDBに格納

TypstのドキュメントデータをSurrealDBに格納してみます。このPythonスクリプトは、データの読み込み、チャンク分割、埋め込み(embedding)の生成、そしてデータベースへの保存という内容になっています。コード全体を見てみましょう:

import json
import re
import os
import asyncio
import time
from surrealdb import Surreal
from openai import AsyncOpenAI
from dotenv import load_dotenv
from tenacity import retry, wait_random_exponential, stop_after_attempt

# Constants
COLLECTION_NAME = "typst_docs"
TEXT_FIELD_NAME = "text"
EMBEDDING_FIELD_NAME = "embedding"
TITLE_FIELD_NAME = "title"
URL_FIELD_NAME = "url"
MODEL = "text-embedding-3-small"
RATE_LIMIT = 20  # Requests per minute
BATCH_SIZE = 5  # Reduced batch size to minimize concurrent DB operations

class RateLimiter:
    def __init__(self, rate_limit):
        self.rate_limit = rate_limit
        self.tokens = rate_limit
        self.last_refill = time.time()

    async def wait(self):
        while self.tokens < 1:
            self._refill()
            await asyncio.sleep(0.1)
        self.tokens -= 1

    def _refill(self):
        now = time.time()
        time_since_refill = now - self.last_refill
        new_tokens = time_since_refill * (self.rate_limit / 60.0)
        self.tokens = min(self.rate_limit, self.tokens + new_tokens)
        self.last_refill = now

rate_limiter = RateLimiter(RATE_LIMIT)

def load_typst_data(file_path):
    with open(file_path, 'r') as file:
        content = file.read()
    data = json.loads(content)
    return data

def chunk_document(document):
    paragraphs = re.split(r'\n\n+', document['content'])
    chunks = []
    for i, paragraph in enumerate(paragraphs):
        chunk = {
            'title': document['title'],
            'content': paragraph.strip(),
            'url': document['url'],
            'chunk_id': f"{document['url']}_{i}"
        }
        chunks.append(chunk)
    return chunks

@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(5))
async def create_embedding(openai_client, query_string, model=MODEL):
    await rate_limiter.wait()
    response = await openai_client.embeddings.create(
        input=query_string,
        model=model
    )
    return response.data[0].embedding

async def save_chunk_and_embedding(db, chunk, embedding):
    data = {
        TITLE_FIELD_NAME: chunk['title'],
        TEXT_FIELD_NAME: chunk['content'],
        EMBEDDING_FIELD_NAME: embedding,
        URL_FIELD_NAME: chunk['url'],
        'chunk_id': chunk['chunk_id']
    }
    await db.create(COLLECTION_NAME, data)

async def process_chunk(db, openai_client, chunk):
    try:
        embedding = await create_embedding(openai_client, chunk['content'])
        await save_chunk_and_embedding(db, chunk, embedding)
        print(f"Processed chunk: {chunk['content'][:50]}...")
    except Exception as e:
        print(f"Failed to process chunk. Error: {str(e)}")

async def process_chunks(db, openai_client, chunks):
    for i in range(0, len(chunks), BATCH_SIZE):
        batch = chunks[i:i+BATCH_SIZE]
        tasks = [process_chunk(db, openai_client, chunk) for chunk in batch]
        await asyncio.gather(*tasks)
        print(f"Processed batch {i//BATCH_SIZE + 1}/{(len(chunks) + BATCH_SIZE - 1)//BATCH_SIZE}")

async def main():
    load_dotenv()
    typst_data = load_typst_data('typst_docs.json')
    all_chunks = []
    for document in typst_data:
        all_chunks.extend(chunk_document(document))
    
    db = Surreal("ws://localhost:8000/rpc")
    await db.connect()
    await db.signin({
        "user": os.getenv("DB_USER", "default_username"), 
        "pass": os.getenv("DB_PASSWORD", "default_password")
    })
    await db.use("test", "test")
    
    openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    
    await process_chunks(db, openai_client, all_chunks)
    
    await db.close()

if __name__ == "__main__":
    asyncio.run(main())

さて、このコードが何をしているのか、ざっくり説明しましょう。

  1. お約束: まずは必要なライブラリをインポートして、定数を定義しています。

  2. RateLimiterの配置: OpenAI APIの利用制限に気をつけたいので、これは、APIにリクエストを送りすぎないようにしています。

  3. チャンク分け: load_typst_data関数で、Typstのデータをファイルから読み込みます。そして、chunk_document関数で、そのデータを小さな塊(チャンク)に分けます。

  4. 埋め込み(embedding)ベクトル生成: create_embedding関数で、各チャンクをOpenAI APIを使って「翻訳」(いわゆるembedding)します。ここでの翻訳とは、テキストを数値のベクトルに変換することです。

  5. データベースへの格納: save_chunk_and_embedding関数で、チャンクとその埋め込みベクトルをSurrealDBに保存します。

3. RAGチャットボットを実装する


前章で構築したTypstドキュメントのデータベースを活用し、ユーザーの質問に答える対話システムを実装します。このシステムは、ベクトル検索と生成AIを組み合わせたRAG(Retrieval-Augmented Generation)アプローチを採用しています。

3.1 コードの概要

以下のPythonスクリプトは、SurrealDBに格納されたTypstドキュメントデータを検索し、OpenAI APIを使用して回答を生成する対話システムを実装しています。

import asyncio
import os
from typing import List, Dict, Any
from surrealdb import Surreal
from openai import AsyncOpenAI
from dotenv import load_dotenv

load_dotenv()

# Constants
COLLECTION_NAME = "typst_docs"
TEXT_FIELD_NAME = "text"
EMBEDDING_FIELD_NAME = "embedding"
TITLE_FIELD_NAME = "title"
URL_FIELD_NAME = "url"
GPT_MODEL = "gpt-4-turbo-preview"
EMBEDDING_MODEL = "text-embedding-3-small"

db = Surreal("ws://localhost:8000/rpc")
openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))

async def connect_to_db():
    await db.connect()
    await db.signin({
        "user": os.getenv("DB_USER", "default_username"),
        "pass": os.getenv("DB_PASSWORD", "default_password")
    })
    await db.use("test", "test")

async def get_embedding(text: str) -> List[float]:
    response = await openai_client.embeddings.create(
        input=text,
        model=EMBEDDING_MODEL
    )
    return response.data[0].embedding

async def vector_search(query_embedding: List[float], top_k: int = 5) -> List[Dict[str, Any]]:
    query = f"""
    SELECT 
        id, 
        {TEXT_FIELD_NAME}, 
        {TITLE_FIELD_NAME}, 
        {URL_FIELD_NAME},
        vector::similarity::cosine({EMBEDDING_FIELD_NAME}, $query_embedding) as similarity
    FROM {COLLECTION_NAME}
    ORDER BY similarity DESC
    LIMIT $top_k;
    """
    try:
        result = await db.query(query, {
            "query_embedding": query_embedding,
            "top_k": top_k
        })
        return result[0]['result']
    except Exception as e:
        print(f"Error in vector_search: {e}")
        return []

def format_context(documents: List[Dict[str, Any]]) -> str:
    context = ""
    for doc in documents:
        context += f"Title: {doc[TITLE_FIELD_NAME]}\n"
        context += f"Content: {doc[TEXT_FIELD_NAME]}\n"
        context += f"URL: {doc[URL_FIELD_NAME]}\n\n"
    return context

async def generate_response(query: str, context: str) -> str:
    prompt = f"""
    You are an AI assistant specializing in Typst documentation. Answer the following question based on the provided context.
    If the answer is not clear from the context, say so and provide your best guess based on general knowledge about Typst.
    Always cite the source URL when referring to specific information.

    Context:
    {context}

    Question: {query}

    Answer:
    """
    
    response = await openai_client.chat.completions.create(
        model=GPT_MODEL,
        messages=[
            {"role": "system", "content": "You are a helpful assistant specialized in Typst documentation."},
            {"role": "user", "content": prompt}
        ],
        temperature=0.7
    )
    
    return response.choices[0].message.content

async def chat():
    await connect_to_db()
    print("Welcome to the Typst Documentation Chat Assistant!")
    print("Type 'exit' to end the conversation.")
    
    while True:
        query = input("\nYour question: ")
        if query.lower() == 'exit':
            break
        
        print("Searching for relevant information...")
        query_embedding = await get_embedding(query)
        relevant_docs = await vector_search(query_embedding)
        context = format_context(relevant_docs)
        
        print("Generating response...")
        response = await generate_response(query, context)
        print("\nAssistant:", response)

    print("Thank you for using the Typst Documentation Chat Assistant. Goodbye!")

if __name__ == "__main__":
    asyncio.run(chat())

このコードは以下の主要な構成要素から成り立っています:

  1. データベース接続: SurrealDBとの接続を管理します。

  2. 埋め込みベクトル生成: ユーザーの質問をベクトル化します。

  3. ベクトル検索: 類似度の高いドキュメントを検索します。

  4. コンテキスト生成: 検索結果を適切な形式に整形します。

  5. 回答生成: GPT-4を使用して回答を生成します。

  6. 対話インターフェース: ユーザーとの対話を管理します。

3.2 システムの動作

これでユーザーはTypstに関する質問を入力できます。システムは以下の手順で回答を生成します:

  1. ユーザーの質問をベクトル化

  2. SurrealDBで類似度の高いドキュメントを検索

  3. 検索結果を元に、GPT-4oを使用して回答を生成

それでは次のような質問を聞いてみましょう。
「typstで積分の'dx'の'd'の記号ををどうやって書くか?」

 How d in dx of the integral is written in typst?

3.3 通常のChatGPTの回答

まず、この質問に対してRAGを用いないGPT-4oに聞いた解答を見てみましょう

In Typst, writing mathematical expressions can be done using the `$...$` delimiters for inline math and `$$...$$` for display math (block mode). To represent the differential operator in the context of an integral, you would typically use the `\mathrm` command to ensure that the 'd' in "dx" is typeset in a roman (upright) font, distinguishing it from the variable.

Here's how to write the differential `dx` correctly in Typst:

```typst
$\int f(x) \, \mathrm{d}x$
```

For display math:

```typst
$$
\int f(x) \, \mathrm{d}x
$$
```

typstでは'\'の記号は数式では用いないのですが、\mathrmを使えといった回答がされていますね。明確に間違った回答がされています。typstが振興組版システムなのでしょうが無いと言えばしょうが無いですが。。。

3.4 surrealDBを利用したRAGでの回答

Searching for relevant information...
Generating response...

Assistant: The "d" in an integral's dx can be written as `$dif x$` in Typst. Outside math formulas, "dif" can be accessed as `math.dif`.

Source: https://typst.app/docs/reference/symbols/sym/

$dif x$と書けば積分で使う'dx'を書けるという回答が得られました。これは正解ですね。しかも、参考となるドキュメントのURLまで提示できています。

4. まとめ&今後の展望


4.1 まとめ

本RAGシステムでは、例えば、「How d in dx of the integral is written in typst?」という質問に対して、正確かつ具体的な回答を提供し、情報源も明示できました。非常に扱いやすいSurrealDBとRAGを組み合わせることで、情報が頻繁に更新されるデータベースや開発中に構造が変化しうるデータベースなど、様々な分野で活用できそうです。

4.2 SurrealDBの特徴と利点

本稿では、SurrealDBを使用しましたが、その特徴と利点は以下の通りです:

  1. マルチモデルデータベース: SurrealDBは関係型、ドキュメント型、グラフ型の各データモデルをサポートしており、複雑なデータ構造を柔軟に表現でる。

  2. スケーラビリティ: 分散アーキテクチャにより、大規模なデータセットでも高いパフォーマンスを維できる。

  3. リアルタイム処理: WebSocketを通じたリアルタイムのデータ更新と通知機能を提供。

  4. 強力なクエリ言語: SurrealQLという独自のクエリ言語により、複雑なデータ操作や分析が可能。

  5. 組み込みのセキュリティ機能: ロールベースのアクセス制御や暗号化機能により、データのセキュリティを確保。

  6. グラフ機能: ノードとエッジを使用した複雑な関係性のモデリング(グラフ機能)が可能。

  7. ベクトルデータベース機能: 埋め込みベクトルの格納と類似度検索をネイティブにサポートしている。

  8. 時系列データ処理: 時間軸に沿ったデータの格納と分析が容易。

  9. GeoJSONサポート: 位置情報を含むデータの処理が可能。

  10. イベント駆動型アーキテクチャ: データの変更に応じて自動的にアクションを実行できる。

4.3 今後の展望: GraphRAGの実装

上に述べた中でも特にSurrealDBの強力なグラフ機能を活用することで、より高度なGraphRAGシステムの構築が可能となります。

  1. 知識グラフの構築: Typstドキュメント内の概念や機能間の関係性を明示的にモデル化。

  2. コンテキストの豊富化: 関連する概念や依存関係を考慮に入れた、より深い文脈理解。

  3. 推論能力の向上: グラフ構造を利用することで、直接的な関係性だけでなく、間接的な関連性も考慮できる。

  4. 説明可能性の向上: 回答の根拠となる知識の経路を視覚化できる。

4.4 最後に

以上、次回はGraphRAGの実装をしていきたいと思います!
参考になった方はいいねコメントよろしくお願いします!
日々の開発の励みになります。それではまた!


参考資料


[1] A Step-by-Step Guide on RAG using SurrealDB
[2] SurrealDB Documentation

いいなと思ったら応援しよう!