Main Content

Receive Communication on Workers

Since R2023b

This example shows how to setup a data queue on the workers to receive data.

You can use a data queue to transfer data or messages between the client and the workers.

This example generates instrument data on the workers and sends the data back to the client. To start and stop the signal generation, the client can send a message to the workers using a data queue. This approach provides a smoother way to stop a parfeval computation on a worker.

Start a parallel pool with three workers.

pool = parpool("Processes",3);
Starting parallel pool (parpool) using the 'Processes' profile ...
Connected to parallel pool with 3 workers.

Set Up Queue to Receive Data on Client

Prepare and initialize plots to visualize the instrument data from the workers. The createPlots function is defined at the end of the example.

[fig,p] = createPlots;

Create a DataQueue and use afterEach to specify the function to execute each time the queue receives data. The receiveDataOnClient function plots the data received from the workers and is defined at the end of the example.

clientQueue = parallel.pool.DataQueue;
afterEach(clientQueue,@(data) receiveDataOnClient(p,data));

Set Up Queues to Receive Communications on Workers

Create a helper PollableDataQueue on the client.

helperClientQueue = parallel.pool.PollableDataQueue;

Use parfeval to establish data queues on the three workers in the parallel pool. The connectToWorker helper function assigns a unique ID to each worker, creates a PollableDataQueue on each worker, and sends the data queue to the client using the helperClientQueue queue. The workers then wait for instructions from the client to start data generation.

wkrF(1:3) = parallel.FevalFuture;
for ID = 1:3
    wkrF(ID) = parfeval(@connectToWorker,0,clientQueue,helperClientQueue,ID);
end

At the client, receive the labelled worker queues. You can now use these queues to send data to each worker.

allWkrQueues = struct('ID',{},'Queue',{});
for i = 1:3
    wkrQueue = poll(helperClientQueue,inf);
    allWkrQueues(wkrQueue.ID) = wkrQueue;
end

Start and Stop Data Generation

Next, instruct the workers to start generating data.

for ID = 1:3
    send(allWkrQueues(ID).Queue,"Start generating data");
end

This figure shows the instrument data each worker generates and sends to the client.

fig.Visible="on";

Generate data for a period of 10 seconds.

pause(10)

To stop gathering data on worker 2, send a message to the worker using the queue created on worker 2. You can observe that the line for Instrument 2 stops around 0.9 seconds.

send(allWkrQueues(2).Queue,"stop");

Poll the helperClientQueue queue to receive confirmation from worker 2.

[status, ~] = poll(helperClientQueue,inf);
disp(status)
Data generation stopped on worker 2

Wait for the other workers to finish their computation.

wait(wkrF);

Helper Functions

The connectToWorker function creates a PollableDataQueue on the workers, sends them to the client, then polls the wkrQueue queue to wait for instructions from the client.

When the worker receives a message from the client, the function generates a dummy signal on the workers that mimics continuous data from an instrument. At each time step, the worker sends one point of the signal to the client with the clientQueue queue, then polls the wkrQueue queue to check if the queue has data. If there is data to receive, the worker stops generating data and sends a message to the client to confirm that it has stopped generating data.

function connectToWorker(clientQueue,helperClientQueue,ID)
% Assign an ID to this worker.
wkrQueue.ID = ID;
% Create a PollableDataQueue on this specific worker.
wkrQueue.Queue = parallel.pool.PollableDataQueue;
% Send the queue to the client.
send(helperClientQueue,wkrQueue);

% Wait for instructions from client.
[~, OK] = poll(wkrQueue.Queue,inf);
if OK
    t = 0:0.01:4;
    step = 1;
    while step < numel(t)
        % Generate dummy instrument data.
        data_point = sin(ID*2*pi*t(step));
        % Send data to client using a data queue.
        send(clientQueue,{ID,t(step),data_point});
        % Check if worker queue has data to receive and use a timeout.
        [~, OK] = poll(wkrQueue.Queue,0.1);
        if OK
            send(helperClientQueue,sprintf("Data generation stopped on worker %d",ID));
            return
        else
            step = step + 1;
        end
    end
else
    return
end
end

Define a function to prepare and initialize plots to visualize the data from the workers. Specify different line properties for each worker.

function [fig,p] = createPlots
fig = figure(Name="Signal from Instruments",Visible="off");
t = tiledlayout(fig,3,1);
lineColor = ["k","b","g"];
p = gobjects(1,3);
for i=1:3
    nexttile(t);
    xlabel("Time (s)");
    ylabel("Amplitude");
    title(sprintf("Instrument %d",i))
    p(i) = animatedline(NaN,NaN,Color=lineColor(i));
end
end

Define a function to update the plots when the workers send data to the client.

function receiveDataOnClient(p,data)
addpoints(p(data{1,1}),data{1,2},data{1,3})
drawnow limitrate;
end

See Also

| |

Related Topics