Scale Up with Parallel Jobs and Tasks
This example shows how to use parallel jobs and tasks to scale up your computations to hundreds of workers on a large clusters.
You can scale up an existing parfor
workflow beyond parallel pool limits by converting the parfor
-loop into multiple tasks for an independent job. This example converts the parfor
workflow in the Run parfor-Loops Without a Parallel Pool example to a job and tasks workflow.
This example recreates an update of the ARGESIM benchmark CP2 Monte Carlo study [1] by Jammer et al [2]. In the Monte Carlo study, you simulate a spring-mass system with different, randomly sampled damping factors using jobs and tasks.
Create Cluster Object and Job
Create the cluster object and display the number of workers available in the cluster. HPCProfile
is a profile for a MATLAB® Job Scheduler cluster. Replace the HPCProfile
profile with your own cluster profile.
cluster = parcluster("HPCProfile"); maxNumWorkers = cluster.NumWorkers; fprintf("Number of workers available: %d",maxNumWorkers)
Number of workers available: 496
Create an independent job using the cluster object.
job = createJob(cluster);
Define Simulation Parameters
period = [0 2];
h = 0.001; % time step
t_interval = period(1):h:period(2);
Set the number of iterations.
nReps = 10000000;
Initialize the random number generator and create an array of damping coefficients sampled from a uniform distribution between 800 and 1200.
rng(0); a = 800; b = 1200; d = (b-a).*rand(nReps,1) + a;
Modify to Job and Task Workflow
To change the parfor
workflow into a jobs and tasks workflow, convert the main body of the parfor
-loop into a function that takes in a vector of damping coefficients and returns the sum of the mass-spring motion.
function y_sum = taskFcn(d)
Define the simulation parameters on the worker. To reduce data transfer overheads, specify the time interval and any other constant parameters directly on the workers instead of transferring them to the workers as input arguments.
period = [0 2]; h = 0.001; y0 = [0 0.1]; t_interval = period(1):h:period(2);
Initialize the results variable for the reduction operation.
y_sum = zeros(numel(t_interval),1);
To reduce scheduling overheads, partition the iterations into groups for each task instead of scheduling a task for each iteration. Use a for
-loop to iterate through this task's set of damping coefficients. Use a reduction variable to compute the sum of the motion at each time point.
for n = 1:length(d) f = @(t,y) massSpringODE(t,y,d(n)); [~,yOut] = ode45(f,t_interval,y0); y_sum = y_sum + yOut(:,1); end
You can use a job's ValueStore
when the combined size of all the results is large, or if the client must process interim results while the job is running. Otherwise, if your results data is small, you can send the results back to client using the task's OutputArgument
property.
end
Prepare Input Data for Tasks
To help reduce overheads when you schedule multiple tasks for a job, partition the iterations into groups for each task. Try to partition the iterations into groups that are:
Large enough that the computation time is large compared to the overhead of scheduling the partition.
Small enough that there are enough tasks to keep all workers busy.
Decreasing in size in the last sets of tasks to keep as many workers busy as possible.
The partitionIterations
helper function, attached to this example, uses the number of iterations and desired maximum number of workers to divide the iterations into appropriately sized groups and returns a cell array where each cell corresponds to one group of iteration indices. The partitionIterations
function allocates larger groups to the initial tasks and progressively smaller groups to later tasks for a balanced workload distribution.
taskGroups = partitionIterations(nReps,maxNumWorkers);
After you specify the iteration indices for each task, use the cellfun
function to extract the damping coefficients corresponding to each task group into a cell array.
dampingCoeffs = cellfun(@(ind) {d(ind)},taskGroups,UniformOutput=false);
Create Tasks and Submit Job
Use a single call to create multiple tasks for the job. Each task executes the taskFcn
function with the corresponding set of input arguments sourced from the dampingCoeffs
cell array. Instruct the workers to return one output argument for each task.
tasks = createTask(job,@taskFcn,1,dampingCoeffs);
Submit the job to run on the cluster.
submit(job);
If you want to block the MATLAB client until the job completes, use the wait
function on the job
object. The wait
function is useful when subsequent code depends on the completion of the job.
wait(job);
Access Results
After the job completes, you can retrieve the results from all the tasks using the fetchOutputs
function.
results = fetchOutputs(job);
The fetchOutputs
function returns a cell array, where each element is the output of a task. Convert the cell into a numerical array and compute the sum and the mean of each row.
y_sum = sum(cell2mat(results'),2); meanY = y_sum./nReps;
Plot the mean response of the system against time.
plot(t_interval,meanY) title("ODE Solution of Mass-Spring System") xlabel("Time") ylabel("Motion") grid on
Display the job duration.
jobDuration = job.RunningDuration
jobDuration = duration
00:07:32
Compare Computational Speedup
Compare the computational speedup of converting the parfor
workflow into a jobs and tasks workflow to that of running the parfor
-loop on a parallel pool and directly on a cluster.
Use the timeExecution
helper function attached to this example to measure the execution time of the parfor
workflow on the client, on a parallel pool with 496 workers, and directly on a cluster with 496 workers available. Convert the job duration into seconds.
[serialTime,hpcPoolTime,hpcClusterTime] = timeExecution("HPCProfile",maxNumWorkers);
jobsAndTaskTime = double(seconds(jobDuration));
elapsedTimes = [serialTime hpcPoolTime hpcClusterTime jobsAndTaskTime];
Calculate the computational speedup and create a bar chart comparing the speedup of each workflow. The chart shows that using a jobs and tasks workflow has a similar speedup to that of running the parfor
-loop on a parallel pool and larger speedup than running the parfor
-loop directly on the cluster.
speedUp = elapsedTimes(1)./elapsedTimes; x = ["parfor Client","parfor Pool","parfor Cluster","Jobs and Tasks"]; bar(x,speedUp); xlabel("Workflow") ylabel("Computational Speedup")
Helper Functions
This helper function represents the mass-spring system's ODEs the solver uses. You can rewrite the differential equation that describes the spring-mass system (eq1) as a system of first-order ODEs (eq2) that you can solve using the ode45
solver.
(eq1)
(eq2)
function dy = massSpringODE(t,y0,d) k = 9000; % spring stiffness (N/m) m = 450; % mass (kg) dy = zeros(2,1); dy(1) = y0(2); dy(2) = -(d*y0(2)+k*y0(1))/m; end
References
[1] Breitenecker, Felix, Gerhard Höfinger, Thorsten Pawletta, Sven Pawletta, and Rene Fink. "ARGESIM Benchmark on Parallel and Distributed Simulation." Simulation News Europe SNE 17, no. 1 (2007): 53-56.
[2] Jammer, David, Peter Junglas, and Sven Pawletta. “Solving ARGESIM Benchmark CP2 ’Parallel and Distributed Simulation’ with Open MPI/GSL and Matlab PCT - Monte Carlo and PDE Case Studies.” SNE Simulation Notes Europe 32, no. 4 (December 2022): 211–20. https://doi.org/10.11128/sne.32.bncp2.10625.
See Also
Scale Up Parallel Code to Large Clusters