【Python】zstd によりデータを圧縮/解凍する基本

Python で zstd により Zstandard アルゴリズムを使ったデータ圧縮/解凍をする方法の基本について解説します。
目次
Zstandard (zstd) の概要
Zstandard は、Meta(旧 Facebook)が開発した圧縮アルゴリズムです。Zstandard は、「高速」で「高圧縮率」のバランスの取れた圧縮形式として普及が進んでいます。
zstd は、Zstandard の略称でライブラリ名としてよく使われており、Python では、Python 3.14 から標準ライブラリとして zstd モジュールが提供されています。
この記事では、zstd により Zstandard アルゴリズムを使ったデータを圧縮する方法の基本について紹介します。
Zstandard の標準ライブラリ化 zstd
これまでは、Python における圧縮ライブラリは、gzip、zlib、bz2、lzma といったライブラリが標準ライブラリとして使用できました。これらは標準であることの安心感や長年の互換性・安定性という面で優れる一方で、性能面では Zstandard の方が高速、高圧縮率という状況でした。
Python の 3.13 以前でも Zstandard が使えなかったというわけではなく、外部ライブラリとして「pip install zstandard」とすることで使用できました。外部ライブラリとして成熟はしていたものの、Python の標準ライブラリではなかったため、依存関係や運用面の観点から標準的には採用しずらい状況がありました。
PEP 784 に基づき、Python 3.14 に zstd ライブラリとして取り込まれたことで、これらの懸念は大きく緩和され、標準的に使用しやすくなりました。
zstd の基本的な使い方
zstd による基本的なデータ圧縮/解凍方法
zstd では、bytes データの圧縮を compress() 関数、解凍を decompress() 関数を使って実行することができます。以下の簡単な例で見てみましょう。
from compression import zstd
# 圧縮/解凍対象のデータ
data = "Zstandard 圧縮テストデータ".encode("utf-8")
# zstd を使ってデータを圧縮
compressed = zstd.compress(data)
# zstd を使ってデータを解凍
restored = zstd.decompress(compressed)
print(f"解凍後のデータ: {restored.decode('utf-8')}")【実行結果】 解凍後のデータ: Zstandard 圧縮テストデータ
zstd は、compression モジュール配下にあるので「from compression import zstd」のようにインポートして使います。
例では、文字列を圧縮/解凍してます。compress() に対象データを渡すと圧縮データが生成され、圧縮したデータを decompress() に渡して解凍することで元のデータを復元できます。
なお、compress() / decompress() は、bytes 型を扱う API であるため、例のように文字列(str)を圧縮する場合には、事前に encode() で bytes に変換し、解凍後には decode() で文字列に戻します。
圧縮レベルの設定 level
zstd では、圧縮のレベルを 1 ~ 22 の値を指定することができます。値が大きいほど高圧縮になりますが、処理時間が多くかかります。圧縮レベルを指定するには level 引数を指定します。何も指定しないときのデフォルトは level=3 となります。
以下では、4つのパターンで圧縮率を比較してみます。
level=1(高速・低圧縮)level=3(デフォルト)level=10(中程度の圧縮)level=22(最大設定で高圧縮)
対象データは単純なデータの繰り返しでは差が見えないため、ログ風のデータを繰り返しで生成して確認しています。
from compression import zstd
from datetime import datetime, timedelta
# ログ風の大きなテストデータを生成
lines = []
start = datetime.now()
num = 100_000
for i in range(num):
ts = start + timedelta(seconds=i % 60)
lines.append(
f"{ts.strftime('%Y-%m-%dT%H:%M:%S')} INFO user_id={i%10000} action=click page=/items/{i%500}\n"
)
data = "".join(lines).encode("utf-8")
def report_compression_results(level, compressed, original_size):
"""圧縮結果を表示する関数"""
compressed_size = len(compressed)
compression_percent = (compressed_size / original_size) * 100
print(
f"Level {level:>2}:"
f" 圧縮後サイズ: {compressed_size:>10,} バイト,"
f" 圧縮後サイズ比: {compression_percent:>6.2f} %"
)
original_size = len(data)
print(f"元のサイズ: {original_size:,} バイト")
# level=1 (高速圧縮)
compressed_level1 = zstd.compress(data, level=1)
report_compression_results(1, compressed_level1, original_size)
# zstd を使ってデータを圧縮 (level指定による変化を確認)
# デフォルトは level=3
compressed = zstd.compress(data)
report_compression_results(3, compressed, original_size)
# level=10 (中程度の圧縮)
compressed_level10 = zstd.compress(data, level=10)
report_compression_results(10, compressed_level10, original_size)
# level=22 (最大の設定、最高圧縮、非常に遅い)
compressed_level22 = zstd.compress(data, level=22)
report_compression_results(22, compressed_level22, original_size)【実行結果】 元のサイズ: 6,666,900 バイト Level 1: 圧縮後サイズ: 192,821 バイト, 圧縮後サイズ比: 2.89 % Level 3: 圧縮後サイズ: 139,030 バイト, 圧縮後サイズ比: 2.09 % Level 10: 圧縮後サイズ: 37,436 バイト, 圧縮後サイズ比: 0.56 % Level 22: 圧縮後サイズ: 25,827 バイト, 圧縮後サイズ比: 0.39 %
結果を見るとレベルを上げるほど、高い圧縮率で圧縮できていることが分かります。
圧縮レベルは高いほど圧縮率は向上しますが、その分CPU負荷が増加します。多くのケースではデフォルト(level=3)が速度と圧縮率のバランスがよく、まずは level 指定せずに利用するのがおすすめです。
一方で、ストレージ削減や転送量削減を重視する場合は、level=6~10 程度を検討するのが良いでしょう。それ以上の level は、CPU 負荷増加に対して圧縮率の改善幅は小さい結果となる可能性が高いです。
zstd によるストリーミング圧縮/解凍方法
compress() / decompress() は、bytes を一括で圧縮/解凍するために非常に便利ですが、巨大なファイルを対象にする場合は、すべてのデータをメモリに読み込む必要があり、メモリ消費量が大きくなってしまいます。
そこで、大容量のファイルでは、一定サイズのチャンク(塊)毎にデータ読み込みながら、圧縮/解凍する「ストリーミング処理」を利用します。
以降の例では、以下スクリプトで指定したサイズの大きなログ風のファイルを生成してストリーミング圧縮/解凍します。
from datetime import datetime, timedelta
def create_sample_log_file(filepath: str, size_mb: int = 10) -> None:
"""指定サイズ(MB) 以下に収まるログ風のサンプルファイルを作成する関数
Args:
filepath (str): 作成するファイルのパス
size_mb (int, optional): 目標サイズ(MB). デフォルトは10MB.
"""
target_size = size_mb * 1024 * 1024 # MB をバイトに変換
start = datetime.now()
written = 0
i = 0
with open(filepath, "wb") as f:
while written < target_size:
ts = start + timedelta(seconds=i % 60)
line = (
f"{ts:%Y-%m-%dT%H:%M:%S} INFO user_id={i%10000} "
f"action=click page=/items/{i%500}\n"
).encode("utf-8")
# 書き込み後にサイズ超過しそうなら終了
if written + len(line) > target_size:
break
f.write(line)
written += len(line)
i += 1
print(f"Created: {filepath}")
print(f"Target : {size_mb} MB ({target_size:,} bytes)")
print(f"Actual : {written / 1024 / 1024:.2f} MB ({written:,} bytes)")
print(f"Lines : {i:,}")
if __name__ == "__main__":
path = "sample.log"
create_sample_log_file(path, size_mb=100)【実行結果例】 Created: sample.log Target : 100 MB (104,857,600 bytes) Actual : 100.00 MB (104,857,567 bytes) Lines : 1,572,821
上記スクリプトは、create_sample_log_file 関数にファイルパスとサイズ(MB)を指定するとログ風のファイルを作成します。以降例では、sample.logという 100 MB のファイルを例に紹介します。
このように事前に大きなファイルを用意しておくことで、ストリーミング圧縮によるメモリ使用量の違いを確認しやすくなります。もちろん、お手元に大きなファイルがあればそちらを使って試していただいて構いません。
ストリーミング圧縮
zstd を使用して大きなファイルをストリーミング圧縮するには、以下のようにします。
from compression import zstd
def compress_file(
src_path: str, dst_path: str, level: int | None = None
) -> None:
"""ファイルをストリーム圧縮する関数
Args:
src_path (str): 圧縮対象のファイルパス
dst_path (str): 圧縮先ファイルパス
level (int | None, optional): 圧縮レベル. デフォルトは None (zstd のデフォルトレベル 3 ).
"""
# 1 MB のチャンクで処理
chunk_size = 1024 * 1024
# 対象ファイルを開いてストリーム圧縮
with open(src_path, "rb") as f_in:
# zstd で圧縮先ファイルを開く
with zstd.open(dst_path, "wb", level=level) as f_out:
# chunk_size ごとに読み込みながら圧縮して書き込み
# ファイル末尾の b"" で停止するイテレータを使用
for chunk in iter(lambda: f_in.read(chunk_size), b""):
f_out.write(chunk)
if __name__ == "__main__":
source_path = "sample.log"
dest_path = "sample.log.zst"
compress_file(source_path, dest_path)例では、圧縮対象のパス(src_path)と圧縮先のパス(dst_path)、および圧縮レベル(level)を引数に受け取り、ストリーム圧縮する関数 compress_file() を作成しています。なお、level は、指定がない(None)の場合は、zstd のデフォルトの level=3 になります。
今回は、1 MB のチャンク(塊)でソースファイルのデータを読み込んでいきます。
zstd.open は、圧縮ファイル(.zst)をファイルオブジェクトとして扱えるようにするものです。例では、読み込みファイルを open のバイナリ読み込み("rb")で、圧縮先ファイルを zstd.open のバイナリ書き込み("wb")で開いています。なお、圧縮レベル(level)は、zstd.open の引数に指定します。
# ファイル末尾の b"" で停止するイテレータを使用"
for chunk in iter(lambda: f_in.read(chunk_size), b""):
f_out.write(chunk)具体的に圧縮書き込みをしている上記部分はイテレーターを活用している部分で少しだけ複雑なので丁寧に説明します。
iter(callable, sentinel) は、Python 組み込み関数 iter() の特殊な使い方です。第 1 引数の callable に指定した関数を繰り返し呼び出し、その戻り値が sentinel と一致したタイミングで繰り返しを停止します。
file.read() は、ファイル終端の EOF に到達すると b"" を返却するため「EOF まで chunk_size ごとに chunk に読み込む」ための for 文になっています。
なお、ここで指定している chunk_size は「読み込みの単位」であり、Zstandard の圧縮形式としてのブロックサイズとは関係ありません。write() をチャンク単位で呼び出していますが、これはデータを圧縮ストリームに順次渡すことを意味しており、write() を 1 回呼び出すたびに必ずしも圧縮先ファイルへ即座に書き込まれるわけではありません。圧縮処理では、内部でバッファリングが行われ、圧縮されたデータは適切な単位、タイミングでまとめて圧縮先ファイルに書き込まれます。
ストリーミング解凍
zstd を使って圧縮したファイルのストリーミング解凍で元のファイルに復元する場合は、ストリーミング圧縮と同様に以下のようにします。
from compression import zstd
def decompress_file(src_path: str, dst_path: str) -> None:
"""zstd圧縮ファイルをストリーム解凍する関数
Args:
src_path (str): 対象ファイル(.zst)のパス
dst_path (str): 解凍先のファイルパス
"""
# 1 MB のチャンクで処理
chunk_size = 1024 * 1024 # 1MB
# zstd で圧縮元ファイルを開いてストリーム解凍
with zstd.open(src_path, "rb") as f_in:
# 解凍先ファイルを開く
with open(dst_path, "wb") as f_out:
for chunk in iter(lambda: f_in.read(chunk_size), b""):
f_out.write(chunk)
if __name__ == "__main__":
source_path = "sample.log.zst"
dest_path = "sample_restored.log"
decompress_file(source_path, dest_path)上記は、圧縮時と比べると入力(f_in)を zstd.open() で開いており、出力(f_out)を open() で開いている点で異なりますが構造は同様です。
圧縮前のファイルと圧縮/解凍後のファイルが一致しているかどうかは、以下のようなスクリプトで前後のファイルのハッシュ値を一致を確認するのが簡単です。
import hashlib
def sha256(path: str) -> str:
"""ハッシュ値計算用
Args:
path (str): 対象ファイルのパス
Returns:
str: SHA256 ハッシュ値(16 進文字列)
"""
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
if __name__ == "__main__":
original_hash = sha256("sample.log")
restored_hash = sha256("sample_restored.log")
print(f"Original SHA256: {original_hash}")
print(f"Restored SHA256: {restored_hash}")
print("一致" if original_hash == restored_hash else "不一致")【実行結果】※ハッシュ値は対象ファイルにより変化します。 Original SHA256: 3fbff20cef2ac6b9a19f3399bb9ac58c603e597db79bb78332702bd48367b3ba Restored SHA256: 3fbff20cef2ac6b9a19f3399bb9ac58c603e597db79bb78332702bd48367b3ba 一致
結果を見ると、処理前後でハッシュ値が一致していることが分かります。なお、ハッシュ値は対象ファイルにより変化しますので注意してください。
このようにすることで、サイズの大きなファイルについてもメモリを節約しつつ圧縮/解凍することが可能になります。
まとめ
Python で zstd により Zstandard アルゴリズムを使ったデータ圧縮/解凍をする方法の基本について解説しました。
Zstandard は、Python 3.13 以前では、外部ライブラリとして pip でインストールをして使う必要がありましたが、Python 3.14 で標準ライブラリに追加され、compression.zstd を使ってデータを圧縮/解凍ができるようになりました。
この記事では、一括で bytes データを圧縮/解凍する compress() / decompress() の使い方や level 指定による圧縮率の違いについて説明しました。また、大容量ファイルを扱う場合の、ストリーミング圧縮/解凍の方法についても紹介しています。
zstd は高速かつ高圧縮な形式として普及が進んでおり、ログ圧縮やデータのアーカイブ用途でも非常に使いやすい選択肢です。ぜひ、zstd を有効活用してみてください。

