見出し画像

FastAPIの核心:Starletteを徹底解剖 🌟🌟


FastAPIは基本的にStarletteのAPIラッパーです。FastAPIを完全に理解するには、まずStarletteを理解する必要があります。

1. ASGIプロトコル

Uvicornは共通のインターフェースを通じてASGIアプリケーションと相互作用します。アプリケーションは、以下のコードを実装することでUvicornを介して情報を送受信できます:

async def app(scope, receive, send):
    # 最も単純なASGIアプリケーション
    assert scope['type'] == 'http'
    await send({
        'type': 'http.response.start',
        'status': 200,
        'headers': [
            [b'content-type', b'text/plain'],
        ]
    })
    await send({
        'type': 'http.response.body',
        'body': b'Hello, world!',
    })
if __name__ == "__main__":
    # Uvicornサービス
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")

2. Starlette

UvicornでStarletteを起動するには、以下のコードを使用します:

from starlette.applications import Starlette
from starlette.middleware.gzip import GZipMiddleware

app: Starlette = Starlette()

@app.route("/")
def demo_route() -> None: pass

@app.websocket_route("/")
def demo_websocket_route() -> None: pass

@app.add_exception_handlers(404)
def not_found_route() -> None: pass

@app.on_event("startup")
def startup_event_demo() -> None: pass

@app.on_event("shutdown")
def shutdown_event_demo() -> None: pass

app.add_middleware(GZipMiddleware)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=5000)

このコードはStarletteを初期化し、ルート、例外ハンドラ、イベント、およびミドルウェアを登録してから、それを`uvicorn.run`に渡します。`uvicorn.run`メソッドはStarletteの`call`メソッドを呼び出すことでリクエストデータを送信します。

まず、Starletteの初期化を分析してみましょう:

class Starlette:
    def __init__(
        self,
        debug: bool = False,
        routes: typing.Sequence[BaseRoute] = None,
        middleware: typing.Sequence[Middleware] = None,
        exception_handlers: typing.Dict[
            typing.Union[int, typing.Type[Exception]], typing.Callable
        ] = None,
        on_startup: typing.Sequence[typing.Callable] = None,
        on_shutdown: typing.Sequence[typing.Callable] = None,
        lifespan: typing.Callable[["Starlette"], typing.AsyncGenerator] = None,
    ) -> None:
        """
        :param debug: デバッグ機能を有効にするかどうかを決定します。
        :param route: ルートのリストで、HTTPおよびWebSocketサービスを提供します。
        :param middleware: ミドルウェアのリストで、各リクエストに適用されます。
        :param exception_handler: 例外コールバックを格納する辞書で、HTTPステータスコードをキーとし、コールバック関数を値とします。
        :on_startup: 起動時に呼び出されるコールバック関数。
        :on_shutdown: シャットダウン時に呼び出されるコールバック関数。
        :lifespan: ASGIにおけるlifespan関数。
        """

        # lifespanが渡された場合、on_startupとon_shutdownは渡せません
        # なぜならStarletteは基本的にon_start_upとon_shutdownをUvicorn呼び出し用のlifespanに変換するからです
        assert lifespan is None or (
            on_startup is None and on_shutdown is None
        ), "Use either 'lifespan' or 'on_startup'/'on_shutdown', not both."

        # 変数を初期化する
        self._debug = debug
        self.state = State()
        self.router = Router(
            routes, on_startup=on_startup, on_shutdown=on_shutdown, lifespan=lifespan
        )
        self.exception_handlers = (
            {} if exception_handlers is None else dict(exception_handlers)
        )
        self.user_middleware = [] if middleware is None else list(middleware)
        # ミドルウェアを構築する
        self.middleware_stack = self.build_middleware_stack()

コードからわかるように、この初期化では大部分の要件がすでに満たされています。ただし、ミドルウェアを構築する関数があり、さらに分析が必要です:

class Starlette:
    def build_middleware_stack(self) -> ASGIApp:
        debug = self.debug
        error_handler = None
        exception_handlers = {}

        # 例外処理コールバックを解析し、error_handlerとexception_handlersに格納する
        # HTTPステータスコード500のみがerror_handlerに格納される
        for key, value in self.exception_handlers.items():
            if key in (500, Exception):
                error_handler = value
            else:
                exception_handlers[key] = value

        # 異なる種類のミドルウェアを順序付ける
        # 最初の層はServerErrorMiddlewareで、例外が発生したときにエラースタックを表示したり、デバッグモードではエラーページを表示してデバッグを容易にします。
        # 2番目の層はユーザーミドルウェア層で、すべてのユーザー登録ミドルウェアが格納されます。
        # 3番目の層はExceptionMiddlewareで、これは例外処理層で、ルート実行中に発生するすべての例外を処理します。
        middleware = (
            [Middleware(ServerErrorMiddleware, handler=error_handler, debug=debug)]
            + self.user_middleware
            + [
                Middleware(
                    ExceptionMiddleware, handlers=exception_handlers, debug=debug
                )
            ]
        )

        # 最後に、ミドルウェアをアプリにロードする
        app = self.router
        for cls, options in reversed(middleware):
            # clsはミドルウェアクラスそのもので、optionsは渡すパラメータです
            # ミドルウェア自体もASGI APPであり、ミドルウェアをロードすることは、1つのASGI APPを別のASGI APPに入れ子にするようなもので、マトリョーシカ人形のようなものです。
            app = cls(app=app, **options)

        # ミドルウェアは入れ子にロードされ、`call_next`を通じて上位のASGI APPを呼び出すため、逆順の方法が使用されます。
        return app

ミドルウェアを構築した後、初期化は完了し、`uvicorn.run`メソッドが`call`メソッドを呼び出します:

class Starlette:
    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        scope["app"] = self
        await self.middleware_stack(scope, receive, send)

このメソッドはシンプルです。`scope`を通じてリクエストフロー内のアプリを設定し、後続の呼び出しのために`middleware_stack`を呼び出すことでリクエスト処理を開始します。このメソッドとミドルウェアの初期化からわかるように、StarletteのミドルウェアもASGI APPです(また、呼び出しスタックの最下部にあるルートもASGI APPであることがわかります)。同時に、Starletteは例外処理もミドルウェアに任せており、他のWebアプリケーションフレームワークではめったに見られないことです。Starletteは、各コンポーネントができるだけASGI APPになるように設計されていることがわかります。

2. ミドルウェア

上述したように、StarletteではミドルウェアはASGI APPです。そのため、Starletteのすべてのミドルウェアは、以下の形式に合致するクラスでなければなりません:

class BaseMiddleware:
    def __init__(self, app: ASGIApp) -> None:
        pass

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        pass

`starlette.middleware`には、この要件を満たす多くのミドルウェア実装があります。ただし、この章ではすべてのミドルウェアをカバーするわけではなく、ルートに最も近いものから最も遠いものまで、いくつかの代表的なものを選んで分析します。

2.1. 例外処理ミドルウェア - ExceptionMiddleware

まずはExceptionMiddlewareです。ユーザーはこのミドルウェアと直接やり取りしません(そのため`starlette.middleware`には置かれていません)が、以下の方法を通じて間接的にやり取りします:

@app.app_exception_handlers(404)
def not_found_route() -> None: pass

ユーザーがこの方法を使用すると、StarletteはHTTPステータスコードをキーとし、コールバック関数を値とする対応する辞書にコールバック関数を掛けます。
ExceptionMiddlewareがルートリクエスト処理に例外があることを検出すると、例外応答のHTTPステータスコードを通じて対応するコールバック関数を見つけ、リクエストと例外をユーザーが設定したコールバック関数に渡し、最後にユーザーのコールバック関数の結果を前のASGI APPに投げ戻します。
さらに、ExceptionMiddlewareは例外登録もサポートしています。ルートが投げる例外が登録された例外と一致する場合、その例外登録に対応するコールバック関数が呼び出されます。
このクラスのソースコードとコメントは以下の通りです:

class ExceptionMiddleware:
    def __init__(
        self, app: ASGIApp, handlers: dict = None, debug: bool = false
    ) -> None:
        self.app = app
        self.debug = debug  # TODO: debugが設定されている場合、404のケースを処理する必要があります。
        # StarletteはHTTPステータスコードとExceptionタイプの両方をサポートする
        self._status_handlers = {}  # type: typing.Dict[int, typing.Callable]
        self._exception_handlers = {
            HTTPException: self.http_exception
        }  # type: typing.Dict[typing.Type[Exception], typing.Callable]
        if handlers is not None:
            for key, value in handlers.items():
                self.add_exception_handler(key, value)

    def add_exception_handler(
        self,
        exc_class_or_status_code: typing.Union[int, typing.Type[Exception]],
        handler: typing.Callable,
    ) -> None:
        # ユーザーがStarletteアプリのメソッドを通じてマウントする例外コールバックは、このメソッドを通じて最終的にクラス内の_status_handlersまたは_exception_handlerにマウントされます。
        if isinstance(exc_class_or_status_code, int):
            self._status_handlers[exc_class_or_status_code] = handler
        else:
            assert issubclass(exc_class_or_status_code, Exception)
            self._exception_handlers[exc_class_or_status_code] = handler

    def _lookup_exception_handler(
        self, exc: Exception
    ) -> typing.Optional[typing.Callable]:
        # 登録された例外に関連するコールバック関数を検索し、mroを通じて例外に対応するコールバック関数を見つける
        # 
        # ユーザーは基底クラスをマウントする可能性があり、マウントされた例外の後続のサブクラスも、基底クラスに登録されたコールバックを呼び出します。
        # たとえば、ユーザーが基底クラスを登録し、その後、この基底クラスを継承する2つの例外、ユーザー例外とシステム例外があります。
        # 後で関数がユーザー例外またはシステム例外を投げると、基底クラスに登録された対応するコールバックが実行されます。
        for cls in type(exc).__mro__:
            if cls in self._exception_handlers:
                return self._exception_handlers[cls]
        return None

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        # おなじみのASGI呼び出しメソッド
        if scope["type"]!= "http":
            # WebSocketリクエストはサポートしていません
            await self.app(scope, receive, send)
            return

        # 同じ応答内で複数の例外が発生するのを防ぐ
        response_started = False

        async def sender(message: Message) -> None:
            nonlocal response_started

            if message["type"] == "http.response.start":
                response_started = True
            await send(message)

        try:
            # 次のASGI APPを呼び出す
            await self.app(scope, receive, sender)
        except Exception as exc:
            handler = None

            if isinstance(exc, HTTPException):
                # HTTPExceptionの場合、登録されたHTTPコールバック辞書で探す
                handler = self._status_handlers.get(exc.status_code)

            if handler is None:
                # 通常の例外の場合、例外コールバック辞書で探す
                handler = self._lookup_exception_handler(exc)

            if handler is None:
                # 対応する例外が見つからない場合は、上位に投げる
                raise exc from None

            # 1つの応答に対して1つの例外のみを処理する
            if response_started:
                msg = "Caught handled exception, but response already started."
                raise RuntimeError(msg) from exc

            request = Request(scope, receive=receive)
            if asyncio.iscoroutinefunction(handler):
                response = await handler(request, exc)
            else:
                response = await run_in_threadpool(handler, request, exc)
            # コールバック関数で生成された応答でリクエストを処理する
            await response(scope, receive, sender)

2.2. ユーザーミドルウェア

次にユーザーミドルウェアです。これは最も頻繁に触れるミドルウェアです。`starlette.middleware`を使用するとき、通常は`BaseHTTPMiddleware`というミドルウェアを継承し、以下のコードに基づいて拡張します:

class DemoMiddleware(BaseHTTPMiddleware):
    def __init__(
        self,
        app: ASGIApp,
    ) -> None:
        super(DemoMiddleware, self).__init__(app)

    async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
        # 前処理
        response: Response = await call_next(request)
        # 後処理
        return response

リクエスト前に前処理を行いたい場合は、`before`ブロックに関連コードを書きます。リクエスト後に処理を行いたい場合は、`after`ブロックにコードを書きます。使い方は非常に簡単で、同じスコープ内にあるため、このメソッド内の変数はコンテキストや動的変数を通じて伝播させる必要がありません(DjangoやFlaskのミドルウェア実装に触れたことがあれば、Starletteの実装のエレガンスがわかるでしょう)。

では、実装を見てみましょう。コードは約60行ですが、コメントが多いです:

class BaseHTTPMiddleware:
    def __init__(self, app: ASGIApp, dispatch: DispatchFunction = None) -> None:
        # 次のレベルのASGIアプリを割り当てる
        self.app = app
        # ユーザーがdispatchを渡す場合は、その関数を使用し、そうでない場合は独自のdispatchを使用する
        # 一般的に、ユーザーはBaseHTTPMiddlewareを継承してdispatchメソッドを書き換える
        self.dispatch_func = self.dispatch if dispatch is None else dispatch

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        """
        ASGI標準関数シグネチャを持つ関数で、ASGIリクエストがここから入ることを表しています。
        """
        if scope["type"]!= "http":
            # タイプがhttpでない場合、ミドルウェアは通過しません(つまり、WebSocketはサポートされていません)
            # WebSocketをサポートするには、この方法でミドルウェアを実装するのは非常に難しいです。私がrapフレームワークを実装したとき、WebSocketのようなトラフィックのミドルウェア処理を実現するためにいくつかの機能を犠牲にしました。
            await self.app(scope, receive, send)
            return

        # scopeからリクエストオブジェクトを生成する
        request = Request(scope, receive=receive)
        # dispatchロジックに入る、つまりユーザーの処理ロジック
        # このロジックから得られる応答は実際にはcall_next関数によって生成され、dispatch関数は単に中継の役割を果たします。
        response = await self.dispatch_func(request, self.call_next)
        # 生成された応答に応じて上位層にデータを返す
        await response(scope, receive, send)

    async def call_next(self, request: Request) -> Response:
        loop = asyncio.get_event_loop()
        # キューの生成と消費モデルを通じて次のレベルのメッセージを取得する
        queue: "asyncio.Queue[typing.Optional[Message]]" = asyncio.Queue()

        scope = request.scope
        # uvicornのreceiveオブジェクトをrequest.receiveオブジェクトを通じて渡す
        # ここで使用するreceiveオブジェクトは依然としてuvicornによって初期化されたreceiveオブジェクトです。
        receive = request.receive
        send = queue.put

        async def coro() -> None:
            try:
                await self.app(scope, receive, send)
            finally:
                # このput操作は、get側がブロックされないようにする
                await queue.put(None)

        # loop.create_taskを通じて別のコルーチンで次のASGI APPを実行する
        task = loop.create_task(coro())
        # 次のASGI APPの戻りを待つ
        message = await queue.get()
        if message is None:
            # 取得した値が空の場合、次のASGI APPが応答を返していないことを意味し、エラーが発生している可能性があります。
            # task.result()を呼び出すと、コルーチンに例外がある場合、そのコルーチンのエラーが投げられます。
            task.result()
            # 例外が投げられない場合は、ユーザーエラーの可能性があり、たとえば空の応答を返している場合です。
            # このとき、クライアントに応答を返すことはできないため、エラーを作成して、後続の500応答の生成を容易にする必要があります。
            raise RuntimeError("No response returned.")

        # ASGIが応答を処理するとき、複数のステップで行われます。通常、上記のqueue.getは応答を取得する最初のステップです。
        assert message["type"] == "http.response.start"

        async def body_stream() -> typing.AsyncGenerator[bytes, None]:
            # 他の処理はbody_stream関数に任せる
            # このメソッドは単にデータストリームを返し続ける
            while True:
                message = await queue.get()
                if message is None:
                    break
                assert message["type"] == "http.response.body"
                yield message.get("body", b"")
            task.result()

        # body_stream関数をResponseメソッドに入れる
        # 応答自体もASGI APPに似たクラスです。

2.3. ServerErrorMiddleware

ServerErrorMiddlewareはExceptionMiddlewareと非常に似ています(そのため、この部分は詳細を述べません)。全体的なロジックはほとんど同じです。ただし、ExceptionMiddlewareはルート例外の捕捉と処理を担当するのに対し、ServerErrorMiddlewareは主にフォールバック手段として機能し、常に正しいHTTP応答が返されるようにします。

ServerErrorMiddlewareの間接呼び出し関数はExceptionMiddlewareと同じです。ただし、登録されたHTTPステータスコードが500の場合のみ、ServerErrorMiddlewareにコールバックが登録されます:

@app.exception_handlers(500)
def not_found_route() -> None: pass

ServerErrorMiddlewareはASGI APPの最上位レベルにあります。フォールバック例外を処理するタスクを担っています。その処理は簡単です:次のレベルのASGI APPの処理中に例外が発生した場合、フォールバックロジックに入ります:

    1. デバッグが有効になっている場合、デバッグページを返す。

    1. 登録されたコールバックがある場合、その登録されたコールバックを実行する。

    1. 上記のどちらでもない場合、500応答を返す。

3. ルート

Starletteでは、ルートは2つの部分に分かれています。1つは、私が「本当のアプリのルーター」と呼ぶもので、ミドルウェアの下のレベルにあります。これはStarletteの中でミドルウェア以外のほとんどすべての処理を担当し、主にルート検索とマッチング、アプリの起動とシャットダウン処理などを行います。もう1つの部分は、ルーターに登録されるルートです。

3.1. ルーター

ルーターはシンプルです。主な責任はルートをロードしてマッチングすることです。以下はソースコードとコメントですが、ルートをロードする部分は除外しています:

class Router:
    def __init__(
        self,
        routes: typing.Sequence[BaseRoute] = None,
        redirect_slashes: bool = true,
        default: ASGIApp = None,
        on_startup: typing.Sequence[typing.Callable] = None,
        on_shutdown: typing.Sequence[typing.Callable] = None,
        lifespan: typing.Callable[[typing.Any], typing.AsyncGenerator] = None,
    ) -> None:
        # Starlette初期化から情報をロードする
        self.routes = [] if routes is None else list(routes)
        self.redirect_slashes = redirect_slashes
        self.default = self.not_found if default is None else default
        self.on_startup = [] if on_startup is None else list(on_startup)
        self.on_shutdown = [] if on_shutdown is None else list(on_shutdown)

        async def default_lifespan(app: typing.Any) -> typing.AsyncGenerator:
            await self.startup()
            yield
            await self.shutdown()

        # 初期化されたlifespanが空の場合、on_startupとon_shutdownをlifespanに変換する
        self.lifespan_context = default_lifespan if lifespan is None else lifespan

    async def not_found(self, scope: Scope, receive: Receive, send: Send) -> None:
        """マッチするルートがない場合に実行されるロジック"""
        if scope["type"] == "websocket":
            # WebSocketマッチに失敗
            websocket_close = WebSocketClose()
            await websocket_close(scope, receive, send)
            return

        # Starletteアプリケーション内で実行されている場合、例外を投げる
        # そうすることで、構成可能な例外ハンドラが応答を処理できるようになる。
        # 通常のASGIアプリの場合は、直接応答を返す。
        if "app" in scope:
            # starlette.applicationsの__call__メソッドでは、starletteは自身をscopeに格納していることがわかる。
            # ここで例外を投げると、ServerErrorMiddlewareによって捕捉される。
            raise HTTPException(status_code=404)
        else:
            # Starletteからの呼び出しでない場合は、直接エラーを返す
            response = PlainTextResponse("Not Found", status_code=404)
        await response(scope, receive, send)

    async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None:
        """
        ASGI lifespanメッセージを処理し、アプリケーションの起動とシャットダウンイベントを管理できるようにします。
        """
        # Lifespan実行ロジック。実行するとき、StarletteはASGIサーバーと通信します。ただし、現在このコードにはまだ開発されていない機能がいくつかあるかもしれません。
        first = True
        app = scope.get("app")
        await receive()
        try:
            if inspect.isasyncgenfunction(self.lifespan_context):
                async for item in self.lifespan_context(app):
                    assert first, "Lifespan context yielded multiple times."
                    first = False
                    await send({"type": "lifespan.startup.complete"})
                    await receive()
            else:
                for item in self.lifespan_context(app):  # type: ignore
                    assert first, "Lifespan context yielded multiple times."
                    first = False
                    await send({"type": "lifespan.startup.complete"})
                    await receive()
        except BaseException:
            if first:
                exc_text = traceback.format_exc()
                await send({"type": "lifespan.startup.failed", "message": exc_text})
            raise
        else:
            await send({"type": "lifespan.shutdown.complete"})

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        """
        Routerクラスのメインエントリポイントです。
        """
        # ルートをマッチングして実行するメイン関数
        # 現在は、http、websocket、lifespanタイプのみがサポートされています。
        assert scope["type"] in ("http", "websocket", "lifespan")

        # scope内でルーターを初期化する
        if "router" not in scope:
            scope["router"] = self

        if scope["type"] == "lifespan":
            # lifespanロジックを実行する
            await self.lifespan(scope, receive, send)
            return

        partial = None

        # ルートマッチングを実行する
        for route in self.routes:
            match, child_scope = route.matches(scope)
            if match == Match.FULL:
                # 完全マッチ(URLとメソッドの両方がマッチする)の場合
                # 通常のルート処理を実行する
                scope.update(child_scope)
                await route.handle(scope, receive, send)
                return
            elif match == Match.PARTIAL and partial is None:
                # 部分マッチ(URLはマッチするが、メソッドはマッチしない)の場合
                # 値を保持してマッチングを続ける
                partial = route
                partial_scope = child_scope

        if partial is not None:
            # 部分マッチするルートがある場合、実行も続けるが、ルートはHTTPメソッドエラーで応答する
            scope.update(partial_scope)
            await partial.handle(scope, receive, send)
            return

        if scope["type"] == "http" and self.redirect_slashes and scope["path"]!= "/":
            # マッチしない場合、リダイレクトを判定する
            redirect_scope = dict(scope)
            if scope["path"].endswith("/"):
                redirect_scope["path"] = redirect_scope["path"].rstrip("/")
            else:
                redirect_scope["path"] = redirect_scope["path"] + "/"

            for route in self.routes:
                match, child_scope = route.matches(redirect_scope)
                if match!= Match.NONE:
                    # 再度マッチング。結果が空でない場合は、リダイレクト応答を送信する
                    redirect_url = URL(scope=redirect_scope)
                    response = RedirectResponse(url=str(redirect_url))
                    await response(scope, receive, send)
                    return

        # 上記のいずれのプロセスもヒットしない場合、ルートが見つからないことを意味する。このとき、デフォルトルートが実行され、デフォルトのデフォルトルートは404 Not Foundである
        await self.default(scope, receive, send)

このように、Routerのコードはかなりシンプルです。コードの多くは`call`メソッドに集中しています。ただし、ルートを照会するために複数回のトラバースがあり、各ルートは正規表現を実行してマッチするかどうかを判定します。一部の人は、この実行速度が遅いと感じるかもしれません。私も以前はそう思っており、その後、ルートツリーを実装して置き換えました(詳細は`route_trie.py`を参照)。しかし、パフォーマンステストを行ったところ、ルートの数が50を超えない場合、ループマッチングの性能はルートツリーよりも優れており、100を超えない場合は、両者は同等でした。通常の状況では、指定するルートの数は100を超えないため、この部分のルートのマッチング性能について心配する必要はありません。もしまだ心配する場合は、`Mount`を使用してルートをグループ化し、マッチング回数を減らすことができます。

3.2. その他のルート

`Mount`は`BaseRoute`を継承しており、`HostRoute`、`WebSocketRoute`などの他のルートも同様です。これらは似たようなメソッドを提供しており、実装における違いはわずかです(主に初期化、ルートマッチング、逆引き検索です)。まずは`BaseRoute`を見てみましょう:

class BaseRoute:
    def matches(self, scope: Scope) -> typing.Tuple[Match, Scope]:
        # 標準的なマッチング関数シグネチャ。各Routeは(Match, Scope)タプルを返す必要があります。
        # Matchには3種類あります:
        #   NONE: マッチしない
        #   PARTIAL: 部分マッチ(URLはマッチするが、メソッドはマッチしない)
        #   FULL: 完全マッチ(URLとメソッドの両方がマッチする)
        # Scopeは基本的に以下の形式を返しますが、Mountはより多くの内容を返します:
        #   {"endpoint": self.endpoint, "path_params": path_params}
        raise NotImplementedError()  # pragma: no cover

    def url_path_for(self, name: str, **path_params: str) -> URLPath:
        # 名前に基づいて逆引き検索を生成する
        raise NotImplementedError()  # pragma: no cover

    async def handle(self, scope: Scope, receive: Receive, send: Send) -> None:
        # Routerによってマッチされた後に呼び出すことができる関数
        raise NotImplementedError()  # pragma: no cover

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        """
        ルートは独立したASGIアプリとして個別に使用することができます。
        これはやや不自然なケースですが、ルートはほとんどの場合Router内で使用されますが、いくつかのツーリングや最小限のアプリでは有用かもしれません。
        """
        # ルートが独立したASGI APPとして呼び出される場合、それ自身でマッチングを行い、応答します
        match, child_scope = self.matches(scope)
        if match == Match.NONE:
            if scope["type"] == "http":
                response = PlainTextResponse("Not Found", status_code=404)
                await response(scope, receive, send)
            elif scope["type"] == "websocket":
                websocket_close = WebSocketClose()
                await websocket_close(scope, receive, send)
            return

        scope.update(child_scope)
        await self.handle(scope, receive, send)

このように、`BaseRoute`は多くの機能を提供していません。その他のルートはこれをベースに拡張されています:

  • Route:標準的なHTTPルートです。HTTP URLとHTTPメソッドを通じてルートマッチングを担当し、その後、HTTPルートを呼び出すメソッドを提供します。

  • WebSocketRoute:標準的なWebSocketルートです。HTTP URLに従ってルートをマッチングし、その後、`starlette.websocket`の`WebSocket`を通じてセッションを生成し、対応する関数に渡します。

  • Mount:ルートの入れ子式のカプセル化です。そのマッチング方法はURLのプレフィックスマッチングであり、条件に合致する次のレベルのASGI APPにリクエストを転送します。その次のレベルのASGI APPがRouterの場合、呼び出しチェインはこのようになります:Router->Mount->Router->Mount->Router->Route。`Mount`を使用することで、ルートをグループ化し、マッチング速度を上げることができます。おすすめです。さらに、これは他のASGI APPにリクエストを配信することもできます。例えば、Starlette->ASGI Middleware->Mount->Other Starlette->...

  • Host:ユーザーリクエストの`Host`に応じてリクエストを対応するASGI APPに配信します。このASGI APPは`Route`、`Mount`、ミドルウェアなどです。

4. その他のコンポーネント

上記からわかるように、Starletteの多くのコンポーネントはASGI APPとして設計されており、高い互換性があります。これにより少し性能は犠牲になりますが、互換性は非常に高いです。他のコンポーネントも多かれ少なかれASGI APPのように設計されています。他のコンポーネントを紹介する前に、まずStarletteの全体的なプロジェクト構造を見てみましょう:

├── middleware                       # ミドルウェア
├── applications.py                  # 起動アプリケーション
├── authentication.py                # 認証関連
�── background.py                    # バックグラウンドタスクをカプセル化し、応答が返された後に実行される
├── concurrency.py                   # いくつかの小さなasyncio関連のカプセル化。新しいバージョンでは、直接anyioライブラリを使用する。
├── config.py                        # 設定
├── convertors.py                    # いくつかの型変換メソッド
�── datastructures.py                # いくつかのデータ構造。例えば、Url、Header、Form、QueryParam、Stateなど
├── endpoints.py                     # cbvをサポートするルートと、少し高度なWebSocketカプセル化
├── exceptions.py                    # 例外処理
�── formparsers.py                   # Form、Fileなどの解析
�── graphql.py                       # GraphQL関連の処理を担当する
├── __init__.py
├── py.typed                         # Starletteに必要な型ヒント
├── requests.py                      # リクエスト。ユーザーがデータを取得するためのもの
├── responses.py                     # 応答。ヘッダーとクッキーを初期化し、異なるResponseクラスに応じて応答データを生成し、その後、ASGI呼び出しインターフェースを持つクラス。このインターフェースはASGIプロトコルをUvicornサービスに送信します。送信後、バックグラウンドタスクがある場合は、完了するまで実行されます。
├── routing.py                       # ルーティング
├── schemas.py                       # OpenApi関連のスキーマ
�── staticfiles.py                   # 静的ファイル
�── status.py                        # HTTPステータスコード
├── templating.py                    # Jinjaに基づくテンプレート応答
├── testclient.py                    # テストクライアント
├── types.py                         # 型
└── websockets.py                    # WebSocket

上記のファイルは多数あり、いくつかの簡単なものはスキップします。

4.1. Request

`Request`クラスは非常にシンプルです。`HttpConnection`を継承しています。このクラスは主にASGIプロトコルから渡される`Scope`を解析し、URLやメソッドなどの情報を抽出します。そして`Request`クラスは、リクエストデータを読み取り、データを返す機能を追加しています(HTTP 1.1はサーバーがクライアントにデータをプッシュすることをサポートしています)。その中で、データの読み取りは`stream`というコア関数に依存しています。そのソースコードは以下の通りです:

async def stream(self) -> typing.AsyncGenerator[bytes, None]:
    # 既に読み取られている場合、キャッシュからデータを取得する
    if hasattr(self, "_body"):
        yield self._body
        yield b""
        return

    if self._stream_consumed:
        raise RuntimeError("Stream consumed")

    self._stream_consumed = true
    while True:
        # ASGIコンテナのreceiveループからデータを継続的に取得する
        message = await self._receive()
        if message["type"] == "http.request":
            body = message.get("body", b"")
            if body:
                # データが空でない場合は、それを返す
                yield body
            if not message.get("more_body", false):
                # すべてのボディデータが取得されたことを意味する
                break
        elif message["type"] == "http.disconnect":
            # クライアントとの接続が切断されたことを意味する
            self._is_disconnected = true
            # 例外を投げる。`await request.body()`や`await request.json()`を呼び出すユーザーは例外を受け取る。
            raise ClientDisconnect()
    # 空のバイトを返して終了をマークする
    yield b""

この実装はシンプルですが、小さなバグがあります。Nginxや他のWebサービスに詳しい人は、一般的な中間サーバーはボディデータを処理せず、単に転送することを知っているでしょう。ASGIにおいても同じです。URLとヘッドダーを処理した後、UvicornはASGI APPを呼び出し、`send`と`receive`オブジェクトを下に渡します。これら2つのオブジェクトは複数のASGI APPを通過し、ユーザーが関数で使用するルートASGI APPに到達します。そのため、`Request`が受け取る`receive`オブジェクトはUvicornによって生成されます。そして`receive`のデータソースは`asyncio.Queue`キューから来ます。

4.1. Request(続き)

の分析からわかるように、各層のASGI APPは`scope`と`receive`に基づいて`Request`オブジェクトを生成しており、これは各層のASGI APPの`Request`オブジェクトが一致しないことを意味します。もしミドルウェアが`Request`オブジェクトを呼び出してボディを読み取ると、`receive`を通じてキュー内のデータを事前に消費してしまい、後続のASGI APPが`Request`オブジェクトを通じてボディデータを読み取れなくなります。この問題のサンプルコードは以下の通りです:

import asyncio
from starlette.applications import Starlette
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.requests import Request
from starlette.responses import JSONResponse, Response

app: Starlette = Starlette()

class DemoMiddleware(BaseHTTPMiddleware):
    async def dispatch(
        self, request: Request, call_next: RequestResponseEndpoint
    ) -> Response:
        print(request, await request.body())
        return await call_next(request)

app.add_middleware(DemoMiddleware)

@app.route("/")
async def demo(request: Request) -> JSONResponse:
    try:
        await asyncio.wait_for(request.body(), 1)
        return JSONResponse({"result": True})
    except asyncio.TimeoutError:
        return JSONResponse({"result": False})

if __name__ == "__main__":
    import uvicorn  # type: ignore
    uvicorn.run(app)

これを実行してリクエストの結果を確認すると:

-> curl http://127.0.0.1:8000
{"result":false} 

上記のように、結果は`false`になります。これは`request.body`の実行がタイムアウトしたことを意味しており、`receive`キューがすでに空で、データを取得できないためです。タイムアウトがなければ、このリクエストはハングアップしてしまいます。

この問題を解決するために、まず`Request`がボディを取得する方法を見てみましょう。ユーザーはボディを複数回取得でき、データは同じままです。そのため、取得後にデータをキャッシュするというアイデアで実装されています。このアイデアに従えば、データは`receive`を通じて取得されるので、データを読み取った後に`receive`関数を構築することができます。この関数はASGI通信プロトコルに似たデータを返し、完全なボディデータを持っています(`Request.stream`がボディを取得するための構築に合致しています)。コードは以下の通りです:

async def proxy_get_body(request: Request) -> bytes:
    async def receive() -> Message:
        return {"type": "http.request", "body": body}

    body = await request.body()
    request._receive = receive
    return body

その後、どのレベルのASGI APPでもボディデータが必要な場合、この関数を呼び出すことでボディデータを取得でき、後続のASGI APPがボディデータを取得する能力に影響を与えません。

5. まとめ

これまでに、Starletteのいくつかの重要な機能コードを分析してきました。Starletteは優れたライブラリで、素晴らしい設計概念を持っています。ぜひ自分でStarletteのソースコードを読んでみることをお勧めします。これは将来、独自のフレームワークを書くときに役立つでしょう。

Leapcell: The Best Serverless Platform for Python Hosting

最後に、FastAPIをデプロイするのに最適なプラットフォームを紹介します:Leapcell

1. 多言語サポート

  • JavaScript、Python、Go、またはRustで開発できます。

2. 無制限のプロジェクトを無料でデプロイ

  • 使用量に応じて支払います — リクエストがなければ、料金はかかりません。

3. 比類なきコスト効率

  • 使った分だけ支払い、アイドル時の料金はかかりません。

  • 例:25ドルで平均応答時間60msの694万件のリクエストをサポートします。

4. 簡素化された開発者体験

  • 直感的なUIで簡単にセットアップできます。

  • 完全自動化されたCI/CDパイプラインとGitOps統合。

  • リアルタイムのメトリクスとロギングで実行可能な洞察を得られます。

5. 簡単なスケーラビリティと高性能

  • 自動スケーリングで高い並列性を簡単に処理できます。

  • オペレーションオーバーヘッドはゼロ — 開発に集中できます。


探索して、ドキュメントで詳細を見つけよう!

Leapcell Twitter: https://x.com/LeapcellHQ

いいなと思ったら応援しよう!