Pythonプログラムを動かしているとき、ネットワークエラー等で途中で落ちてしまった経験はありませんか?
リトライを入れていれば処理を再実行することはできますが、データベースへのデータ挿入を連続で行ったりしていた場合、処理を最初から行うことになり悲しい思いをします。
今回は、そのような場合に使えるPythonプログラムの実装についてご紹介します。
アプローチ
なんらかのエラーが発生しリトライさせる場合、その時点までの処理状況を記録しておく必要があります。変数に記録しておく手もありますが、安定性を考えてファイルで進捗を記録することにします。ファイルは進捗を記録するだけなので、読み書きでそれほど大きなオーバーヘッドは発生しません。
上記を踏まえて、以下のような形で実装します。
- 繰り返し処理の最初に進捗を確認する
- 繰り返し処理が1回完了するたびにファイルに進捗を記録する
実装
データをストリーミングする処理を例に実装しました。
loader.py
import os
import json
class Loader:
def __init__(self):
self.data = [] # データ格納用リスト
self.stream_progress_file = "stream_progress.json" # 進捗記録用ファイル
def load(self):
# データロード
# 10個の値をリストに追加するだけ
for i in range(10):
self.data.append(i)
def read_progress(self) -> int:
# 進捗をファイルから読み出す
# ファイルが存在しない場合、0を返す
try:
with open(self.stream_progress_file, "r") as f:
data = json.load(f)
return data["index"]
except FileNotFoundError:
return 0
def save_progress(self, data: dict):
# 現在の進捗をファイルに記録する
with open(self.stream_progress_file, "w") as f:
json.dump(data, f)
def stream(self):
# 進捗状況を確認する
start_index = self.read_progress()
# 取得した進捗時点から処理を再開
for i in range(start_index, len(self.data)):
self.save_progress({"index": i})
yield self.data[i]
def __del__(self):
# 進捗記録用ファイルの削除
try:
os.remove(self.stream_progress_file)
print("Cleaned up stream progress file.")
except FileNotFoundError:
pass
上記の loader.py は、ロードしたデータをストリーミング処理するものをLoader
クラスとして実装したものです。いくつか処理を書いていますが、大事なポイントは以下の3点です。
self.stream_progress_file
に記載のファイルで進捗を管理stream()
の最初にread_progress()
で進捗を確認stream()
の繰り返し処理が1回完了するごとにsave_progress()
で進捗を記録
resume.py
import random
from tenacity import retry, stop_after_attempt
from loader import Loader
@retry(stop=stop_after_attempt(3))
def handle_data(loader: Loader):
for data in loader.stream():
# ランダムにエラーを発生させる
if random.random() > 0.1:
print(data)
else:
print("Error!")
raise Exception
def main():
loader = Loader()
loader.load()
handle_data(loader)
if __name__ == "__main__":
try:
main()
print("Successfully completed.")
except Exception as e:
print(f"Maximum retries reached. The program failed: {e}")
上記の resume.py は処理を実行する本体のファイルです。
main()
でloader
の作成とストリーミング処理を行います。ストリーミング処理を行うhandle_data()
の部分は、tenacity ライブラリで最大3回リトライさせる形にしています。
実行すると以下のようになります。
$ python resume.py
0
1
Error!
2
3
4
5
6
7
8
9
Cleaned up stream progress file.
Successfully completed.
2のところで1回エラーが発生したものの、途中から問題なく再開できていることがわかります。
まとめ
Pythonプログラムでエラーになった処理を途中から再開する方法についてご紹介しました。
データベースにデータを連続で挿入する処理があり、認証の有効期限切れ等で不定期にエラーが発生していたのですが、今回の実装にしたことで安定した処理を実現できています。一度に1200件ほど処理していますが、ファイル読み書きの負荷もわずかで問題なく動作しています。
同じような課題に直面されている方は、ぜひ本記事の方法を試してみてください。