JL Computer Consultancy
A mechanism for controlling concurrent (similar) processes |
May 1999 |
A recent article described a way of emulating some aspects of Oracle Parallel Query engine for processing very large tables in calculated chunks bounded by rowid ranges. The main purpose of that article was simply to split the table up so that each chunk could be updated independently with minimum contention and wasted I/O.
This article describes a general-purpose mechanism that can be used to allow multiple concurrent processes to run through a list of (similar) tasks grabbing one task at a time, processing, and then going back for the next available task until all the tasks are complete. In a follow-up article I will show how to combine the table-splitting tools with the concurrency package to produce a simple program that will:
break up a table into a collection of chunks then kick off an arbitrary number of concurrent processes to update randomly selected chunks
By combining the two mechanisms in this way you can set up a very simple framework for updating very large tables (or doing other large jobs) very rapidly with minimum contention whilst taking advantage of all the resources available to you.
The Strategy:
The basic idea is very simple: we define a table which identifies a list of tasks, and holds a flag identifying the state of that task. The states I cater for are: 'Awaiting processing (or New)', 'Being processed (or Active)', 'Completed', 'Error'. The procedures we need to create are essentially processes which grab a row in one state, change its state, and commit.
The only important thing about implementing these procedures is to ensure that cater for concurrent usage; this is quite easy to manage by ensuring that all attempts to acquire a row from the table are done using 'select for update nowait' with a trap for 'resource busy (Oracle error 54)' and then issuing a commit as soon as they have done a state change. The following extract is the most significant section of code and demonstrates the idea:
The presence of the commit is probably the most important feature to consider when using this package - it is deliberate and in my opinion necessary to ensure that the mechanism cannot be oncorrectly used to produce contention, long queues and dead-locks and accidetal re-processing - however it is important to remember (for example) that if the code you write to do a particular task fails, then you have to do a rollback for your changes before you call the package to mark the task as Errored.
Code Sample
procedure allocate_target_item(
i_driving_task in varchar2,
o_payload out varchar2,
io_return_code in out number,
io_error_message in out varchar2
) is
m_rowid rowid;m_payload varchar2(40);m_loop number;begin
io_return_code := c_success; io_error_message := null; o_payload := null; commit; for m_loop in 1..5 loop beginselect rowid, payload
into m_rowid, m_payload
from parallel_allocation_list
where driving_task = i_driving_task
and status = c_awaiting_processing
and rownum = 1
for update of status nowait; exception when no_data_found then io_return_code := c_no_rows_left; io_error_message := sqlerrm; return; when nowait_failed then if m_loop = 5 then io_return_code := c_row_locked; io_error_message := sqlerrm; return; else sys.dbms_lock.sleep(1); end if; when others then io_return_code := c_general_error; io_error_message := sqlerrm; return; end; end loop; update parallel_allocation_list set status = c_being_processed where rowid = m_rowid; o_payload := m_payload; commit;end allocate_target_item;
The code was written on Oracle 7.2, so there are a few changes that could be made to improve the efficiency and reduce the volume of code. The most significant change I plan to make, though, is in the mechanism of the driving table - in theory each class of task could have its own table with its own payload type, in the short term I created a static table and added an extra column to it to identify the class of task (hence class of concurrent processes) that a row was relevant to.
One little warning - as you can see the package does a back-off if the row it tried to grab was locked. To do this it calls the dbms_lock.sleep procedure that is usually owned by SYS. The id that owns my code has to have the privilege to execute this package granted to it.
Apart from the obvious 'grab a row', the package as a whole contains a total of 10 procedures:
|
create_driver |
Create (logically, but not actually) a driving table for a new class of tasks |
|
drop_driver |
Drop (logically) the driving table for a class of tasks |
|
populate_driver |
Insert one task definition (a.k.a. payload) into the driving table |
|
allocate_target_item |
Find a 'New' task and change it to 'Active', returning the payload to the caller |
|
complete_target_item |
Change an 'Active' task to 'Complete' |
|
error_target_item |
Change an 'Active task to 'Errored' |
|
free_target_item |
Set an 'Active' task back to 'New' - to cater for doing one bit again |
|
reset_all_targets |
Change all 'Complete' tasks back to 'New' - to cater for a total batch restart |
|
clear_error_for_target_item |
Change an 'Errored' task back to 'New' - after fixing a problem |
|
clear_all_errors |
Change all 'Errored' tasks back to 'New' - after fixing a big problem. |
Between them, these 10 procedures should allow you to create simple, but robust, batch processes to handle arbitrarily large groups of similar tasks with an arbitrary degree of concurrency. (My first use was for a batch run to turn 104 tables of weekly detail into 104 tables of summaries each Saturday).
There are three pieces of code to collect:
create the driving table
create the package header
create the package body




