aiohttp

【Python】aiohttpを用いた非同期HTTP通信の基本

【Python】aiohttpを用いた非同期HTTP通信の基本
naoki-hn

Python で aiohttp パッケージを用いて非同期 HTTP 通信する方法を解説します。

aiohttp による非同期 HTTP 通信

非同期処理とは、複数のタスクが協調しあって処理を実行するプログラミングの実装方法です。非同期プログラミングは多くの支持を得ており、Python 3.5 で asyncawait といった非同期プログラミング方法が導入されています。

この記事では、非同期処理で効果が期待できる通信処理に焦点を当て、非同期 HTTP 通信が可能な aiohttp を使用する方法を紹介します。

async / await を用いたプログラミングの基本は「async/awaitを用いた非同期プログラミングの基本」を参考にしてください。

aiohttp とは

aiohttp は、クライアントとサーバーの両方のユースケースで非同期 HTTP 通信をサポートしているサードパーティ製ライブラリです。

非同期プログラミングは、I/O バウンドな処理の実装には非常に有益な方法です。I/O バウンドな処理とは、ファイルへの入出力やネットワーク通信等が該当します。

Python のネットワーク通信では requests パッケージが有名ですが、requests パッケージは、非同期 I/O に対応していません。aiohttp は、非同期な HTTP 通信パッケージとして非常に信頼できるライブラリとして使用することができます。

他にも非同期 I/O に対応した人気の HTTP クライアントとしては「HTTPX」があります。requests と互換性が高く、同期・非同期の両方に対応しているため、実務でも広く利用されています。

一方で、aiohttp はサーバー機能も含めて非同期 HTTP を扱えるライブラリであり、asyncio ベースの非同期処理でしっかり実装したい場合に適しています。

I/O バウンドなど並行・並列処理の特徴は「並行・並列処理、I/Oバウンド・CPUバウンドを理解する」を参考にしてください。

aiohttp を用いた非同期通信の基本

この記事では、OpenWeather の API を使って aiohttp を用いた非同期通信の実装方法を紹介します。OpenWeather は、グローバルな気象データを取得できるサービスです。なお、OpenWeather に限らず類似の API で、今回紹介するコードは参考にしていただけるかと思います。

以降で async / awaitasyncio を使用します。これらの基本は「async/awaitを用いた非同期プログラミングの基本」を参考にしてください。また、OpenWeather API を requests で使用している例は「OpenWeatherのAPI使用方法」で紹介しています。

aiohttp のインストール

aiohttp はサードパーティ製のライブラリのため、インストールをしてください。

pip install aiohttp

uv を使用している場合は、以下でインストールしてください。uv は高速な Python パッケージ管理ツールであり、pip の代替として利用できます。

uv add aiohttp

aiohttp の非同期通信

以降で紹介するプログラムでは、OpenWeather の「Current weather data API」を使用し、経度・緯度を用いて現在の天気情報を取得する処理を非同期に行います。Current weather data API のドキュメントはこちらを参照してください。

OpenWeather の API は、アカウント登録して取得できる API キーを使います。API キーと今回使用する API の URL は、コンフィグファイル(config.ini)に記載して使用することにします。以下の「xxxxxxxxxxxxxxxxxxxxxxxx」の部分には取得した API キーを指定してください。

【設定ファイル】config.ini

[API]
key = xxxxxxxxxxxxxxxxxxxxxxxx
url_current_weather_data = https://api.openweathermap.org/data/2.5/weather

【実行ファイル】aiohttp_current_weather.py

import asyncio
import configparser
import time
from pprint import pprint
from typing import Any

import aiohttp


async def get_current_weather_data(
    session: aiohttp.ClientSession,
    url: str,
    api_key: str,
    lat: float,
    lon: float,
    lang: str,
) -> dict[str, Any]:
    """現在の天気情報を取得する

    Args:
        session (aiohttp.ClientSession): セッション情報
        url (str): APIのURL
        api_key (str): APIキー
        lat (float): 緯度
        lon (float): 経度
        lang (str): 言語

    Returns:
        dict[str, Any]: 天気情報
    """

    # パラメータの設定
    params = {
        "lat": lat,
        "lon": lon,
        "appid": api_key,
        "lang": lang,
    }

    # 非同期通信
    async with session.get(url, params=params) as response:
        # HTTPエラーが発生した場合に例外をスロー
        response.raise_for_status()

        # レスポンスをJSON形式で取得
        return await response.json()


async def main() -> None:
    """メインコルーチン"""
    # APIキーの読み込み
    config = configparser.ConfigParser()
    config.read("./config.ini", encoding="utf-8")
    api_key = config["API"]["key"]
    url = config["API"]["url_current_weather_data"]

    locations = [
        {"name": "sapporo", "lat": 43.065, "lon": 141.347},
        {"name": "tokyo", "lat": 35.689, "lon": 139.692},
        {"name": "nagoya", "lat": 35.180, "lon": 136.907},
        {"name": "osaka", "lat": 34.686, "lon": 135.520},
        {"name": "hiroshima", "lat": 34.397, "lon": 132.460},
        {"name": "okinawa", "lat": 26.212, "lon": 127.681},
    ]

    # タイムアウトの設定
    timeout = aiohttp.ClientTimeout(total=10)

    # 非同期セッションを作成
    async with aiohttp.ClientSession(timeout=timeout) as session:
        # タスクを作成
        tasks = [
            get_current_weather_data(
                session,
                url,
                api_key,
                loc["lat"],
                loc["lon"],
                "ja",
            )
            for loc in locations
        ]

        # タスクを実行して結果を取得
        results = await asyncio.gather(*tasks, return_exceptions=True)

        for result, loc in zip(results, locations):
            print(f"対象地域: {loc['name']}")

            if isinstance(result, Exception):
                print(f"エラーが発生しました: {result}")
            else:
                pprint(result)

            print("----------------------------------")


if __name__ == "__main__":
    # 時間計測の開始
    t = time.perf_counter()

    # asyncio.runを使用してメインコルーチンを実行
    asyncio.run(main())

    # 実行時間を表示
    print(f"実行時間: {time.perf_counter() - t:.5f} sec")
対象地域: sapporo
{'base': 'stations',
 'clouds': {'all': 1},
 'cod': 200,
 'coord': {'lat': 43.065, 'lon': 141.347},
 'dt': 1774384379,
 'id': 2128295,
 'main': {'feels_like': 274.25,
          'grnd_level': 994,
          'humidity': 64,
          'pressure': 1021,
          'sea_level': 1021,
          'temp': 274.25,
          'temp_max': 274.25,
          'temp_min': 274.25},
 'name': '札幌市',
 'sys': {'country': 'JP', 'sunrise': 1774384203, 'sunset': 1774428679},
 'timezone': 32400,
 'visibility': 10000,
 'weather': [{'description': '晴天', 'icon': '01d', 'id': 800, 'main': 'Clear'}],
 'wind': {'deg': 222, 'gust': 1.08, 'speed': 1.19}}
----------------------------------

...(途中省略)...

----------------------------------
対象地域: okinawa
{'base': 'stations',
 'clouds': {'all': 11},
 'cod': 200,
 'coord': {'lat': 26.212, 'lon': 127.681},
 'dt': 1774384379,
 'id': 1856035,
 'main': {'feels_like': 295.82,
          'grnd_level': 1009,
          'humidity': 87,
          'pressure': 1011,
          'sea_level': 1011,
          'temp': 295.28,
          'temp_max': 295.28,
          'temp_min': 295.28},
 'name': '那覇市',
 'sys': {'country': 'JP', 'sunrise': 1774387704, 'sunset': 1774431737},
 'timezone': 32400,
 'visibility': 10000,
 'weather': [{'description': '薄い雲',
              'icon': '02n',
              'id': 801,
              'main': 'Clouds'}],
 'wind': {'deg': 221, 'gust': 11.35, 'speed': 6.66}}
----------------------------------
実行時間: 0.45794 sec

例では、指定した複数の緯度・経度に対して現在の天気情報を非同期で取得します。主要なライブラリとして asyncioaiohttp をインポートして使用しています。

API を使用する関数としては get_current_weather_data 関数を定義しています。取得した各種情報は JSON 形式で返却されますが、json メソッドを使用することで辞書形式で扱うことができます。

main コルーチン】

プログラムは main コルーチンによって制御されます。この中で API キーを読み込み、取得対象の場所の緯度・経度情報を locations という辞書のリストとして定義しています。今回は以下の 6 か所の天気を取得しました。

    locations = [
        {"name": "sapporo", "lat": 43.065, "lon": 141.347},
        {"name": "tokyo", "lat": 35.689, "lon": 139.692},
        {"name": "nagoya", "lat": 35.180, "lon": 136.907},
        {"name": "osaka", "lat": 34.686, "lon": 135.520},
        {"name": "hiroshima", "lat": 34.397, "lon": 132.460},
        {"name": "okinawa", "lat": 26.212, "lon": 127.681},
    ]

aiohttp を使用する場合には、aiohttp.ClientSession で非同期セッションを作成します。今回は、このセッション (session) と API キー (api_key)、条件となる緯度・経度の情報を各関数に渡します。また、aiohttp.ClientTimeout(total=10) でタイムアウトを作成し、セッションに設定しておきます。

    # タイムアウトの設定
    timeout = aiohttp.ClientTimeout(total=10)

    # 非同期セッションを作成
    async with aiohttp.ClientSession(timeout=timeout) as session:
        # タスクを作成
        tasks = [
            get_current_weather_data(
                session,
                url,
                api_key,
                loc["lat"],
                loc["lon"],
                "ja",
            )
            for loc in locations
        ]

        # タスクを実行して結果を取得
        results = await asyncio.gather(*tasks, return_exceptions=True)

        for result, loc in zip(results, locations):
            print(f"対象地域: {loc['name']}")

            if isinstance(result, Exception):
                print(f"エラーが発生しました: {result}")
            else:
                pprint(result)

タスクリストを作成し、asyncio.gatherでタスクを渡します。asyncio.gather では、全てのタスクの完了を待ち、結果リストを取得することができます。戻り値の順序はタスク順序と一致するため、zip を使って元のデータと対応付けて処理できます。

また、いずれかのタスクで例外が発生した場合の挙動は、return_exceptions の指定によって変わります。今回は各地域の天気を取得するタスクのため、各処理結果は独立しています。このような場合、asyncio.gather(*tasks, return_exceptions=True) とすることで、個々の結果を取得しつつエラーを個別に扱うことができます。

一方、すべての処理成功を前提とする場合は、例外発生時に処理全体を停止する設計が一般的です。この場合は、asyncio.gather(*tasks)try...except... で囲むことで例外処理を行います。

get_current_weather_data 関数】

実際に API を実行している get_current_weather_data 関数の主要なポイントを説明します。まず、API に設定するパラメータを以下のように用意します。

    # パラメータの設定
    params = {
        "lat": lat,
        "lon": lon,
        "appid": api_key,
        "lang": lang,
    }

ここでは、緯度 (lat)、経度 (lon)、APIキー (api_key)、言語 (lang) を指定しています。非同期通信を実行しているのは以下部分です。

    # 非同期通信
    async with session.get(url, params=params) as response:
        # HTTPエラーが発生した場合に例外をスロー
        response.raise_for_status()

        # レスポンスをJSON形式で取得
        return await response.json()

非同期セッションの get を使用して、引数に API の URL と作成したパラメータを渡します。この時、async with 句を使用してレスポンスを取得します。

raise_for_status() は HTTP エラーが発生した場合に例外をスローします。例外が発生しなかった場合、await response.json() で結果を JSON 形式にして返却します。

今回の実行は、使用環境で「0.45794秒」で完了しましたが、非同期の効果が出ているのか分かりにくいため requests による同期実行と効果を比較してみましょう。

同期通信との比較(requests を使用)

上記で aiohttp を使用した非同期通信の例を紹介しました。通信処理は I/O バウンドな処理であるため、同期処理に比べて非同期処理は効率的です。では、request モジュールを使って同期的に通信した場合と具体的に比べてみましょう。

以下は、上記のプログラムを requests による同期版にしたプログラムです。

import configparser
import time
from pprint import pprint
from typing import Any

import requests


def get_current_weather_data(
    url: str,
    api_key: str,
    lat: float,
    lon: float,
    lang: str,
) -> dict[str, Any]:
    """現在の天気情報を取得する

    Args:
        url (str): APIのURL
        api_key (str): APIキー
        lat (float): 緯度
        lon (float): 経度
        lang (str): 言語

    Returns:
        dict[str, Any]: 天気情報の辞書
    """

    # パラメータの設定
    params = {
        "lat": lat,
        "lon": lon,
        "appid": api_key,
        "lang": lang,
    }

    # 同期通信
    response = requests.get(url, params=params, timeout=10)

    # HTTPエラーが発生した場合に例外をスロー
    response.raise_for_status()

    # レスポンスをJSON形式で取得
    return response.json()


def main() -> None:
    """メイン関数"""
    # APIキーの読み込み
    config = configparser.ConfigParser()
    config.read("./config.ini", encoding="utf-8")
    api_key = config["API"]["key"]
    url = config["API"]["url_current_weather_data"]

    locations = [
        {"name": "sapporo", "lat": 43.065, "lon": 141.347},
        {"name": "tokyo", "lat": 35.689, "lon": 139.692},
        {"name": "nagoya", "lat": 35.180, "lon": 136.907},
        {"name": "osaka", "lat": 34.686, "lon": 135.520},
        {"name": "hiroshima", "lat": 34.397, "lon": 132.460},
        {"name": "okinawa", "lat": 26.212, "lon": 127.681},
    ]

    results = []
    for loc in locations:
        try:
            # 同期通信で天気情報を取得
            result = get_current_weather_data(
                url,
                api_key,
                loc["lat"],
                loc["lon"],
                "ja",
            )
            results.append(result)

        except requests.RequestException as ex:
            results.append(ex)

    for result, loc in zip(results, locations):
        print(f"対象地域: {loc['name']}")

        if isinstance(result, Exception):
            print(f"エラーが発生しました: {result}")
        else:
            pprint(result)

        print("----------------------------------")


if __name__ == "__main__":
    # 時間計測の開始
    t = time.perf_counter()

    # メイン関数の呼び出し
    main()

    # 実行時間を表示
    print(f"実行時間: {time.perf_counter() - t:.5f} sec")
【実行結果】
(...実行結果は同様のため省略...)
----------------------------------
実行時間: 3.09365 sec

今回の同期版の実行では「3.09365秒」かかりましたが、aiohttp を使用した非同期通信では「0.45794秒」であり、処理時間は短縮されています。

このように、I/O バウンドな処理では並行して通信できるため、非同期処理により処理時間が短縮される場合があります。状況にあわせて使用を検討してみてください。

まとめ

Python で aiohttp パッケージを用いて非同期 HTTP 通信する方法を解説しました。

aiohttp は、クライアントとサーバーの両方のユースケースで非同期 HTTP 通信をサポートしているサードパーティ製ライブラリです。

この記事では具体例として OpenWeather の API を使って aiohttp を用いた非同期通信の実装例を紹介しました。また、requests を使用した同期通信に比べて処理速度が速くなることも確認しています。

非同期プログラミングは、I/O バウンドな処理の実装では効果的である場合があります。ぜひ、aiohttp の使い方の基本を理解していただき、効率的な通信処理を実装できるようになってもらえたらと思います。

ソースコード

上記で紹介しているソースコードについては GitHub にて公開しています。参考にしていただければと思います。

あわせて読みたい
【Python Tech】プログラミングガイド
【Python Tech】プログラミングガイド

ABOUT ME
ホッシー
ホッシー
システムエンジニア
はじめまして。当サイトをご覧いただきありがとうございます。 私は製造業のメーカーで、DX推進や業務システムの設計・開発・導入を担当しているシステムエンジニアです。これまでに転職も経験しており、以前は大手電機メーカーでシステム開発に携わっていました。

プログラミング言語はこれまでC、C++、JAVA等を扱ってきましたが、最近では特に機械学習等の分析でも注目されているPythonについてとても興味をもって取り組んでいます。これまでの経験をもとに、Pythonに興味を持つ方のお役に立てるような情報を発信していきたいと思います。どうぞよろしくお願いいたします。

※キャラクターデザイン:ゼイルン様
記事URLをコピーしました