5.2.4.7.1. マルチプロセス化

データベースをキューとしたメッセージングを複数プロセス化 [1] したい場合、アプリケーション側で複数起動できるように対応する必要がある。 もし、何も対応せずに同一のデータベースをキューとしたメッセージングを複数起動した場合、同一のデータを処理し2重取り込みの障害などが発生するため注意すること。

以下にマルチプロセスでデータベースをキューとしたメッセージングを起動する為の実装例を示す。

1.処理対象を保持するテーブルに悲観ロック用のカラムを追加する。

データベースをキューとしたメッセージングで処理対象データを抽出する際は、通常ステータスカラムなどを用いて未処理データのみを抽出する。 マルチプロセス化する際には、この情報だけでは複数のプロセスが同一データを取得してしまうため、 各プロセスが処理する対象レコードを悲観ロックする為のカラムを追加する。

例(必要なカラムのみ記載)
カラム名 説明
ID 主キー
STATUS 未処理データかどうかを判断するためのステータスカラム
PROCESS_ID 各プロセスがレコードを悲観ロックするために使用するカラム
2.処理対象レコードを悲観ロックするSQLを作成する。

他のプロセスからロックされていない未処理データを悲観ロックするSQLを作成する。

上記のテーブル定義とした場合、 PROCESS_ID に値が設定されていない(nullの場合)レコードで、 STATUS が未処理のレコードがロックされていないレコードとなる。

UPDATE SAMPLE_TABLE
SET PROCESS_ID = :processId
WHERE STATUS = '0'
 AND PROCESS_ID IS NULL
3.悲観ロックした未処理レコードを抽出するSQLを作成する。

悲観ロックしたレコードを抽出するため、条件は未処理かつ PROCESS_ID が自身のプロセスIDであることとする。

SELECT
  *
FROM
  SAMPLE_TABLE
WHERE
  STATUS = '0'
  AND PROCESS_ID = :processId

4.悲観ロック処理と悲観ロックしたレコードを抽出するようActionを実装する。

/**
 * プロセスID。
 *
 * この例では、UUIDを元にプロセスIDを生成している。
 */
private static final String PROCESS_ID = UUID.randomUUID().toString();

@Override
public DataReader<SqlRow> createReader(ExecutionContext context) {
    final Map<String, String> param = new HashMap<>();
    param.put("processId", PROCESS_ID);

    // 自身が悲観ロックした未処理データを抽出するDatabaseRecordReaderを作成する
    final DatabaseRecordReader reader = new DatabaseRecordReader();
    reader.setStatement(getParameterizedSqlStatement("FIND_RECEIVED_PROJECTS"), param);

    // DatabaseRecordReaderがデータ抽出前に行うコールバック処理に、
    // 悲観ロックSQLを実行する処理を登録する。
    // なお、この処理は別トランザクションで実行する必要がある。
    databaseRecordReader.setListener(new DatabaseRecordListener() {
        @Override
        public void beforeReadRecords() {
            final SimpleDbTransactionManager transactionManager = SystemRepository.get("redundancyTransaction");
            new SimpleDbTransactionExecutor<Void>(transactionManager) {
                @Override
                public Void execute(final AppDbConnection appDbConnection) {
                    appDbConnection
                            .prepareParameterizedSqlStatementBySqlId(SQL_ID_PREFIX + "UPDATE_PROCESS_ID")
                            .executeUpdateByMap(PROCESS_MAP);
                    return null;
                }
            }.doTransaction();

        }
    });

    return new DatabaseTableQueueReader(reader, 1000, "RECEIVED_MESSAGE_SEQUENCE");;
}
[1]冗長化構成の複数のサーバ上で同一のデータベースをキューとしたメッセージングを起動するケースなどが該当する。