【Python】aiohttpを用いた非同期HTTP通信の基本

Python で aiohttp パッケージを用いて非同期 HTTP 通信する方法を解説します。
aiohttp による非同期 HTTP 通信
非同期処理とは、複数のタスクが協調しあって処理を実行するプログラミングの実装方法です。非同期プログラミングは多くの支持を得ており、Python 3.5 で async や await といった非同期プログラミング方法が導入されています。
この記事では、非同期処理で効果が期待できる通信処理に焦点を当て、非同期 HTTP 通信が可能な aiohttp を使用する方法を紹介します。
aiohttp とは
aiohttp は、クライアントとサーバーの両方のユースケースで非同期 HTTP 通信をサポートしているサードパーティ製ライブラリです。
非同期プログラミングは、I/O バウンドな処理の実装には非常に有益な方法です。I/O バウンドな処理とは、ファイルへの入出力やネットワーク通信等が該当します。
Python のネットワーク通信では requests パッケージが有名ですが、requests パッケージは、非同期 I/O に対応していません。aiohttp は、非同期な HTTP 通信パッケージとして非常に信頼できるライブラリとして使用することができます。
aiohttp を用いた非同期通信の基本
この記事では、OpenWeather の API を使って aiohttp を用いた非同期通信の実装方法を紹介します。OpenWeather は、グローバルな気象データを取得できるサービスです。なお、OpenWeather に限らず類似の API で、今回紹介するコードは参考にしていただけるかと思います。
aiohttp のインストール
aiohttp はサードパーティ製のライブラリのため、インストールをしてください。
pip install aiohttp
uv を使用している場合は、以下でインストールしてください。uv は高速な Python パッケージ管理ツールであり、pip の代替として利用できます。
uv add aiohttp
aiohttp の非同期通信
以降で紹介するプログラムでは、OpenWeather の「Current weather data API」を使用し、経度・緯度を用いて現在の天気情報を取得する処理を非同期に行います。Current weather data API のドキュメントはこちらを参照してください。
OpenWeather の API は、アカウント登録して取得できる API キーを使います。API キーと今回使用する API の URL は、コンフィグファイル(config.ini)に記載して使用することにします。以下の「xxxxxxxxxxxxxxxxxxxxxxxx」の部分には取得した API キーを指定してください。
【設定ファイル】config.ini
[API] key = xxxxxxxxxxxxxxxxxxxxxxxx url_current_weather_data = https://api.openweathermap.org/data/2.5/weather
【実行ファイル】aiohttp_current_weather.py
import asyncio
import configparser
import time
from pprint import pprint
from typing import Any
import aiohttp
async def get_current_weather_data(
session: aiohttp.ClientSession,
url: str,
api_key: str,
lat: float,
lon: float,
lang: str,
) -> dict[str, Any]:
"""現在の天気情報を取得する
Args:
session (aiohttp.ClientSession): セッション情報
url (str): APIのURL
api_key (str): APIキー
lat (float): 緯度
lon (float): 経度
lang (str): 言語
Returns:
dict[str, Any]: 天気情報
"""
# パラメータの設定
params = {
"lat": lat,
"lon": lon,
"appid": api_key,
"lang": lang,
}
# 非同期通信
async with session.get(url, params=params) as response:
# HTTPエラーが発生した場合に例外をスロー
response.raise_for_status()
# レスポンスをJSON形式で取得
return await response.json()
async def main() -> None:
"""メインコルーチン"""
# APIキーの読み込み
config = configparser.ConfigParser()
config.read("./config.ini", encoding="utf-8")
api_key = config["API"]["key"]
url = config["API"]["url_current_weather_data"]
locations = [
{"name": "sapporo", "lat": 43.065, "lon": 141.347},
{"name": "tokyo", "lat": 35.689, "lon": 139.692},
{"name": "nagoya", "lat": 35.180, "lon": 136.907},
{"name": "osaka", "lat": 34.686, "lon": 135.520},
{"name": "hiroshima", "lat": 34.397, "lon": 132.460},
{"name": "okinawa", "lat": 26.212, "lon": 127.681},
]
# タイムアウトの設定
timeout = aiohttp.ClientTimeout(total=10)
# 非同期セッションを作成
async with aiohttp.ClientSession(timeout=timeout) as session:
# タスクを作成
tasks = [
get_current_weather_data(
session,
url,
api_key,
loc["lat"],
loc["lon"],
"ja",
)
for loc in locations
]
# タスクを実行して結果を取得
results = await asyncio.gather(*tasks, return_exceptions=True)
for result, loc in zip(results, locations):
print(f"対象地域: {loc['name']}")
if isinstance(result, Exception):
print(f"エラーが発生しました: {result}")
else:
pprint(result)
print("----------------------------------")
if __name__ == "__main__":
# 時間計測の開始
t = time.perf_counter()
# asyncio.runを使用してメインコルーチンを実行
asyncio.run(main())
# 実行時間を表示
print(f"実行時間: {time.perf_counter() - t:.5f} sec")対象地域: sapporo
{'base': 'stations',
'clouds': {'all': 1},
'cod': 200,
'coord': {'lat': 43.065, 'lon': 141.347},
'dt': 1774384379,
'id': 2128295,
'main': {'feels_like': 274.25,
'grnd_level': 994,
'humidity': 64,
'pressure': 1021,
'sea_level': 1021,
'temp': 274.25,
'temp_max': 274.25,
'temp_min': 274.25},
'name': '札幌市',
'sys': {'country': 'JP', 'sunrise': 1774384203, 'sunset': 1774428679},
'timezone': 32400,
'visibility': 10000,
'weather': [{'description': '晴天', 'icon': '01d', 'id': 800, 'main': 'Clear'}],
'wind': {'deg': 222, 'gust': 1.08, 'speed': 1.19}}
----------------------------------
...(途中省略)...
----------------------------------
対象地域: okinawa
{'base': 'stations',
'clouds': {'all': 11},
'cod': 200,
'coord': {'lat': 26.212, 'lon': 127.681},
'dt': 1774384379,
'id': 1856035,
'main': {'feels_like': 295.82,
'grnd_level': 1009,
'humidity': 87,
'pressure': 1011,
'sea_level': 1011,
'temp': 295.28,
'temp_max': 295.28,
'temp_min': 295.28},
'name': '那覇市',
'sys': {'country': 'JP', 'sunrise': 1774387704, 'sunset': 1774431737},
'timezone': 32400,
'visibility': 10000,
'weather': [{'description': '薄い雲',
'icon': '02n',
'id': 801,
'main': 'Clouds'}],
'wind': {'deg': 221, 'gust': 11.35, 'speed': 6.66}}
----------------------------------
実行時間: 0.45794 sec例では、指定した複数の緯度・経度に対して現在の天気情報を非同期で取得します。主要なライブラリとして asyncio と aiohttp をインポートして使用しています。
API を使用する関数としては get_current_weather_data 関数を定義しています。取得した各種情報は JSON 形式で返却されますが、json メソッドを使用することで辞書形式で扱うことができます。
【main コルーチン】
プログラムは main コルーチンによって制御されます。この中で API キーを読み込み、取得対象の場所の緯度・経度情報を locations という辞書のリストとして定義しています。今回は以下の 6 か所の天気を取得しました。
locations = [
{"name": "sapporo", "lat": 43.065, "lon": 141.347},
{"name": "tokyo", "lat": 35.689, "lon": 139.692},
{"name": "nagoya", "lat": 35.180, "lon": 136.907},
{"name": "osaka", "lat": 34.686, "lon": 135.520},
{"name": "hiroshima", "lat": 34.397, "lon": 132.460},
{"name": "okinawa", "lat": 26.212, "lon": 127.681},
]aiohttp を使用する場合には、aiohttp.ClientSession で非同期セッションを作成します。今回は、このセッション (session) と API キー (api_key)、条件となる緯度・経度の情報を各関数に渡します。また、aiohttp.ClientTimeout(total=10) でタイムアウトを作成し、セッションに設定しておきます。
# タイムアウトの設定
timeout = aiohttp.ClientTimeout(total=10)
# 非同期セッションを作成
async with aiohttp.ClientSession(timeout=timeout) as session:
# タスクを作成
tasks = [
get_current_weather_data(
session,
url,
api_key,
loc["lat"],
loc["lon"],
"ja",
)
for loc in locations
]
# タスクを実行して結果を取得
results = await asyncio.gather(*tasks, return_exceptions=True)
for result, loc in zip(results, locations):
print(f"対象地域: {loc['name']}")
if isinstance(result, Exception):
print(f"エラーが発生しました: {result}")
else:
pprint(result)タスクリストを作成し、asyncio.gatherでタスクを渡します。asyncio.gather では、全てのタスクの完了を待ち、結果リストを取得することができます。戻り値の順序はタスク順序と一致するため、zip を使って元のデータと対応付けて処理できます。
また、いずれかのタスクで例外が発生した場合の挙動は、return_exceptions の指定によって変わります。今回は各地域の天気を取得するタスクのため、各処理結果は独立しています。このような場合、asyncio.gather(*tasks, return_exceptions=True) とすることで、個々の結果を取得しつつエラーを個別に扱うことができます。
一方、すべての処理成功を前提とする場合は、例外発生時に処理全体を停止する設計が一般的です。この場合は、asyncio.gather(*tasks) を try...except... で囲むことで例外処理を行います。
【get_current_weather_data 関数】
実際に API を実行している get_current_weather_data 関数の主要なポイントを説明します。まず、API に設定するパラメータを以下のように用意します。
# パラメータの設定
params = {
"lat": lat,
"lon": lon,
"appid": api_key,
"lang": lang,
}ここでは、緯度 (lat)、経度 (lon)、APIキー (api_key)、言語 (lang) を指定しています。非同期通信を実行しているのは以下部分です。
# 非同期通信
async with session.get(url, params=params) as response:
# HTTPエラーが発生した場合に例外をスロー
response.raise_for_status()
# レスポンスをJSON形式で取得
return await response.json()非同期セッションの get を使用して、引数に API の URL と作成したパラメータを渡します。この時、async with 句を使用してレスポンスを取得します。
raise_for_status() は HTTP エラーが発生した場合に例外をスローします。例外が発生しなかった場合、await response.json() で結果を JSON 形式にして返却します。
今回の実行は、使用環境で「0.45794秒」で完了しましたが、非同期の効果が出ているのか分かりにくいため requests による同期実行と効果を比較してみましょう。
同期通信との比較(requests を使用)
上記で aiohttp を使用した非同期通信の例を紹介しました。通信処理は I/O バウンドな処理であるため、同期処理に比べて非同期処理は効率的です。では、request モジュールを使って同期的に通信した場合と具体的に比べてみましょう。
以下は、上記のプログラムを requests による同期版にしたプログラムです。
import configparser
import time
from pprint import pprint
from typing import Any
import requests
def get_current_weather_data(
url: str,
api_key: str,
lat: float,
lon: float,
lang: str,
) -> dict[str, Any]:
"""現在の天気情報を取得する
Args:
url (str): APIのURL
api_key (str): APIキー
lat (float): 緯度
lon (float): 経度
lang (str): 言語
Returns:
dict[str, Any]: 天気情報の辞書
"""
# パラメータの設定
params = {
"lat": lat,
"lon": lon,
"appid": api_key,
"lang": lang,
}
# 同期通信
response = requests.get(url, params=params, timeout=10)
# HTTPエラーが発生した場合に例外をスロー
response.raise_for_status()
# レスポンスをJSON形式で取得
return response.json()
def main() -> None:
"""メイン関数"""
# APIキーの読み込み
config = configparser.ConfigParser()
config.read("./config.ini", encoding="utf-8")
api_key = config["API"]["key"]
url = config["API"]["url_current_weather_data"]
locations = [
{"name": "sapporo", "lat": 43.065, "lon": 141.347},
{"name": "tokyo", "lat": 35.689, "lon": 139.692},
{"name": "nagoya", "lat": 35.180, "lon": 136.907},
{"name": "osaka", "lat": 34.686, "lon": 135.520},
{"name": "hiroshima", "lat": 34.397, "lon": 132.460},
{"name": "okinawa", "lat": 26.212, "lon": 127.681},
]
results = []
for loc in locations:
try:
# 同期通信で天気情報を取得
result = get_current_weather_data(
url,
api_key,
loc["lat"],
loc["lon"],
"ja",
)
results.append(result)
except requests.RequestException as ex:
results.append(ex)
for result, loc in zip(results, locations):
print(f"対象地域: {loc['name']}")
if isinstance(result, Exception):
print(f"エラーが発生しました: {result}")
else:
pprint(result)
print("----------------------------------")
if __name__ == "__main__":
# 時間計測の開始
t = time.perf_counter()
# メイン関数の呼び出し
main()
# 実行時間を表示
print(f"実行時間: {time.perf_counter() - t:.5f} sec")【実行結果】 (...実行結果は同様のため省略...) ---------------------------------- 実行時間: 3.09365 sec
今回の同期版の実行では「3.09365秒」かかりましたが、aiohttp を使用した非同期通信では「0.45794秒」であり、処理時間は短縮されています。
このように、I/O バウンドな処理では並行して通信できるため、非同期処理により処理時間が短縮される場合があります。状況にあわせて使用を検討してみてください。
まとめ
Python で aiohttp パッケージを用いて非同期 HTTP 通信する方法を解説しました。
aiohttp は、クライアントとサーバーの両方のユースケースで非同期 HTTP 通信をサポートしているサードパーティ製ライブラリです。
この記事では具体例として OpenWeather の API を使って aiohttp を用いた非同期通信の実装例を紹介しました。また、requests を使用した同期通信に比べて処理速度が速くなることも確認しています。
非同期プログラミングは、I/O バウンドな処理の実装では効果的である場合があります。ぜひ、aiohttp の使い方の基本を理解していただき、効率的な通信処理を実装できるようになってもらえたらと思います。
上記で紹介しているソースコードについては GitHub にて公開しています。参考にしていただければと思います。

