5.2.3.1. Create an Application That Monitors Table Queues and Imports Unprocessed Data¶
The application that monitors table queues and imports unprocessed data will be explained using an example application.
- Overview of the function to be created
Regularly monitor the table queues and import the data of the unprocessed data (status:
0
). Information of the table queues and table to be imported are as given below.Monitored table queue: Receive the project registration message (INS_PROJECT_RECEIVE_MESSAGE) Table to be imported: Project (PROJECT) - Execution procedure of the example application
SQL should be executed within the procedure after connecting any arbitrary database client to H2. For information on the connection destination, see
config/database.properties
.- Delete data in the project table.
truncate table project
- Execute the application.
$cd nablarch-example-db-queue $mvn clean package $mvn exec:java -Dexec.mainClass=nablarch.fw.launcher.Main ^ -Dexec.args="'-diConfig' 'com/nablarch/example/app/batch/project-creation-service.xml' '-requestPath' 'ProjectCreationService' '-userId' 'samp'"
- Add the unprocessed data to the table queue.
Execute the following SQL and add the unprocessed data to the monitored table queue.
insert into ins_project_receive_message ( received_message_sequence, project_name, project_type, project_class, project_start_date, project_end_date, client_id, project_manager, project_leader, user_id, note, sales, cost_of_goods_sold, sga, allocation_of_corp_expenses, status ) values ( 1, 'Project name', 'development', 'Classification', '2011-01-01', '2020-12-31', 1, 'admin', 'user1', 1, ' ', 100, 200, 300, 400, '0' )
- Confirm that the data has been registered in the project table.
Execute the following SQL and check whether unprocessed data is imported into the project table.
select * from project
- Stop the application.
Force terminate the process with (Ctrl + C etc.) from the command prompt, etc. from where the application was executed.
5.2.3.1.1. Create a action class¶
Creates an action class by inheriting BatchAction.
- Implementation examples
public class ProjectCreationServiceAction extends BatchAction<SqlRow> { // How to create the contents will be described later }
- Point
- Since the table is handled as a queue, the input data becomes the search result of the table. For this reason, SqlRow is specified in the type parameter of BatchAction.
5.2.3.1.2. Create a reader to monitor the table¶
Create a method to generate a reader that monitors the table in the action class created with Create a action class.
As described in Reader used for database queue, create DatabaseTableQueueReader as reader.
- Implementation examples
- Action class
@Override public DataReader<SqlRow> createReader(final ExecutionContext context) { final DatabaseRecordReader databaseRecordReader = new DatabaseRecordReader(); databaseRecordReader.setStatement( getParameterizedSqlStatement("FIND_RECEIVED_PROJECTS"), PROCESS_MAP); databaseRecordReader.setListener(() -> { final SimpleDbTransactionManager transactionManager = SystemRepository.get("redundancyTransaction"); new SimpleDbTransactionExecutor<Void>(transactionManager) { @Override public Void execute(final AppDbConnection appDbConnection) { appDbConnection .prepareParameterizedSqlStatementBySqlId( SQL_ID_PREFIX + "UPDATE_PROCESS_ID") .executeUpdateByMap(PROCESS_MAP); return null; } }.doTransaction(); }); return new DatabaseTableQueueReader( databaseRecordReader, 1000, "RECEIVED_MESSAGE_SEQUENCE"); }
- SQL file (ProjectCreationServiceAction.sql)
-- SQL for pessimistically locking the unprocessed data that is received UPDATE_PROCESS_ID= update ins_project_receive_message set process_id = :processId where status = '0' and process_id is null -- SQL to acquire the unprocessed data that is received FIND_RECEIVED_PROJECTS= select received_message_sequence from ins_project_receive_message where status = '0' and process_id = :processId
- Point
- Implement createReader and create DatabaseTableQueueReader.
- Specify the following in DatabaseTableQueueReader.
- Reader for searching from the database (DatabaseRecordReader)
- Wait time when there is no unprocessed data (1 second in this example)
- List of primary key column names
- Specify the following in DatabaseRecordReader.
- SqlPStatement to search the unprocessed data
- Implementation class DatabaseRecordListener for pessimistic lock of unprocessed data. For details, see Multi-process.
- Define the following SQL in the SQL file.
- SQL for the pessimistic locking of unprocessed data to avoid the data from being used as processing object of other processes
- SQL that acquires records with the
STATUS
column value0
andPROCESS_ID
column value same as the process ID for acquiring unprocessed data to be processed by the process
- For SQL description rules to prepare the SQL file, see Manage SQL in files.
5.2.3.1.3. Execute business process based on unprocessed data¶
Create a method to implement the business process in the action class created with Create a action class.
- Implementation examples
@Override public Result handle(final SqlRow inputData, final ExecutionContext context) { // Retrieve attribute data based on the primary key of unprocessed data final Project project = UniversalDao.findBySqlFile( Project.class, SQL_ID + "GET_RECEIVED_PROJECT", inputData); if (!isValidProjectPeriod(project)) { throw new ApplicationException( MessageUtil.createMessage(MessageLevel.ERROR, "abnormal.project.period")); } // Register to project table UniversalDao.insert(project); return new Result.Success(); }
- Point
- Implement the business process in the method handle. (Detailed explanation of the process details is omitted as depends on the example dependent.)
- Returns Result.Success, which indicates that the processing was normal. Since an exception is thrown even if the processing fails, Result.Success can be always returned.
5.2.3.1.4. Update the status of processed data¶
Create a method to update the status in the action class created with Create a action class.
- Implementation examples
- Action class
@Override protected void transactionSuccess(final SqlRow inputData, final ExecutionContext context) { // Update status to success updateStatus(inputData, StatusUpdateDto::createNormalEnd); } @Override protected void transactionFailure(final SqlRow inputData, final ExecutionContext context) { // Update status to abnormal (failed) updateStatus(inputData, StatusUpdateDto::createAbnormalEnd); } private void updateStatus( final SqlRow inputData, final Function<String, StatusUpdateDto> function) { getParameterizedSqlStatement("UPDATE_STATUS") .executeUpdateByObject( function.apply(inputData.getString("RECEIVED_MESSAGE_SEQUENCE"))); } public static final class StatusUpdateDto { // Property and accessors, Javadoc omitted private static StatusUpdateDto createNormalEnd(String id) { return new StatusUpdateDto(id, "1"); } private static StatusUpdateDto createAbnormalEnd(String id) { return new StatusUpdateDto(id, "2"); } }
- SQL file (ProjectCreationServiceAction.sql)
-- SQL to update status UPDATE_STATUS = update ins_project_receive_message set status = :newStatus where received_message_sequence = :id
- Point
- Implement the update process of the normally processed record in transactionSuccess. (if the processing is normal (exception is not thrown), the method is called back by Nablarch.)
- The update process of records that were not processed normally is implemented in transactionFailure. (The method where an exception or error is thrown during processing is called back by Nablarch.)
- The status of the specific record is updated in SQL.
- For SQL description rules to prepare the SQL file, see Manage SQL in files.