レック・テクノロジー・コンサルティング株式会社TECH BLOG

【Python】エラーになった処理を途中から再開する方法

Pythonプログラムを動かしているとき、ネットワークエラー等で途中で落ちてしまった経験はありませんか?

リトライを入れていれば処理を再実行することはできますが、データベースへのデータ挿入を連続で行ったりしていた場合、処理を最初から行うことになり悲しい思いをします。

今回は、そのような場合に使えるPythonプログラムの実装についてご紹介します。

アプローチ

なんらかのエラーが発生しリトライさせる場合、その時点までの処理状況を記録しておく必要があります。変数に記録しておく手もありますが、安定性を考えてファイルで進捗を記録することにします。ファイルは進捗を記録するだけなので、読み書きでそれほど大きなオーバーヘッドは発生しません。

上記を踏まえて、以下のような形で実装します。

  • 繰り返し処理の最初に進捗を確認する
  • 繰り返し処理が1回完了するたびにファイルに進捗を記録する

実装

データをストリーミングする処理を例に実装しました。

loader.py

import os
import json

class Loader:
    def __init__(self):
        self.data = [] # データ格納用リスト
        self.stream_progress_file = "stream_progress.json" # 進捗記録用ファイル

    def load(self):
        # データロード
        # 10個の値をリストに追加するだけ
        for i in range(10):
            self.data.append(i)

    def read_progress(self) -> int:
        # 進捗をファイルから読み出す
        # ファイルが存在しない場合、0を返す
        try:
            with open(self.stream_progress_file, "r") as f:
                data = json.load(f)
                return data["index"]
        except FileNotFoundError:
            return 0

    def save_progress(self, data: dict):
        # 現在の進捗をファイルに記録する
        with open(self.stream_progress_file, "w") as f:
            json.dump(data, f)

    def stream(self):
        # 進捗状況を確認する
        start_index = self.read_progress()

        # 取得した進捗時点から処理を再開
        for i in range(start_index, len(self.data)):
            self.save_progress({"index": i})         
            yield self.data[i]

    def __del__(self):
        # 進捗記録用ファイルの削除
        try:
            os.remove(self.stream_progress_file)
            print("Cleaned up stream progress file.")
        except FileNotFoundError:
            pass

上記の loader.py は、ロードしたデータをストリーミング処理するものをLoaderクラスとして実装したものです。いくつか処理を書いていますが、大事なポイントは以下の3点です。

  • self.stream_progress_fileに記載のファイルで進捗を管理
  • stream()の最初にread_progress()で進捗を確認
  • stream()の繰り返し処理が1回完了するごとにsave_progress()で進捗を記録

resume.py

import random

from tenacity import retry, stop_after_attempt

from loader import Loader

@retry(stop=stop_after_attempt(3))
def handle_data(loader: Loader):
    for data in loader.stream():
        # ランダムにエラーを発生させる
        if random.random() > 0.1:
            print(data)
        else:
            print("Error!")
            raise Exception

def main():
    loader = Loader()
    loader.load()
    handle_data(loader)

if __name__ == "__main__":
    try:
        main()
        print("Successfully completed.")
    except Exception as e:
        print(f"Maximum retries reached. The program failed: {e}")

上記の resume.py は処理を実行する本体のファイルです。

main()loaderの作成とストリーミング処理を行います。ストリーミング処理を行うhandle_data()の部分は、tenacity ライブラリで最大3回リトライさせる形にしています。

実行すると以下のようになります。

$ python resume.py 
0
1
Error!
2
3
4
5
6
7
8
9
Cleaned up stream progress file.
Successfully completed.

2のところで1回エラーが発生したものの、途中から問題なく再開できていることがわかります。

まとめ

Pythonプログラムでエラーになった処理を途中から再開する方法についてご紹介しました。

データベースにデータを連続で挿入する処理があり、認証の有効期限切れ等で不定期にエラーが発生していたのですが、今回の実装にしたことで安定した処理を実現できています。一度に1200件ほど処理していますが、ファイル読み書きの負荷もわずかで問題なく動作しています。

同じような課題に直面されている方は、ぜひ本記事の方法を試してみてください。

この記事をシェアする

  • Facebook
  • X
  • Pocket
  • Line
  • Hatena
  • Linkedin

資料請求・お問い合わせはこちら

ページトップへ戻る