Using MapReduce to Compute Covariance and Related Quantities

This example shows how to compute the mean and covariance for several variables in a large data set using mapreduce. It then uses the covariance to perform several follow-up calculations that do not require another iteration over the entire data set.

Prepare Data

Create a datastore using the airlinesmall.csv data set. This 12-megabyte data set contains 29 columns of flight information for several airline carriers, including arrival and departure times. In this example, select ActualElapsedTime (total flight time), Distance (total flight distance), DepDelay (flight departure delay), and ArrDelay (flight arrival delay) as the variables of interest.

ds = tabularTextDatastore('airlinesmall.csv', 'TreatAsMissing', 'NA');
ds.SelectedVariableNames = {'ActualElapsedTime', 'Distance', ...
'DepDelay', 'ArrDelay'};

The datastore treats 'NA' values as missing, and replaces the missing values with NaN values by default. Additionally, the SelectedVariableNames property allows you to work with only the selected variables of interest, which you can verify using preview.

preview(ds)
ans=8×4 table
ActualElapsedTime    Distance    DepDelay    ArrDelay
_________________    ________    ________    ________

53             308          12           8
63             296           1           8
83             480          20          21
59             296          12          13
77             373          -1           4
61             308          63          59
84             447          -2           3
155             954          -1          11

Run MapReduce

The mapreduce function requires a map function and a reduce function as inputs. The mapper receives blocks of data and outputs intermediate results. The reducer reads the intermediate results and produces a final result.

In this example, the mapper computes the count, mean, and covariance for the variables in each block of data in the datastore, ds. Then, the mapper stores the computed values for each block as an intermediate key-value pair consisting of a single key with a cell array containing the three computed values.

Display the map function file.

function covarianceMapper(t,~,intermKVStore)
% Get data from input table and remove any rows with missing values
x = t{:,:};
x = x(~any(isnan(x),2),:);

% Compute and save the count, mean, and covariance
n = size(x,1);
m = mean(x,1);
c = cov(x,1);

% Store values as a single item in the intermediate key/value store
end

The reducer combines the intermediate results for each block to obtain the count, mean, and covariance for each variable of interest in the entire data set. The reducer stores the final key-value pairs for the keys 'count', 'mean', and 'cov' with the corresponding values for each variable.

Display the reduce function file.

function covarianceReducer(~,intermValIter,outKVStore)
% We will combine results computed in the mapper for different chunks of
% the data, updating the count, mean, and covariance each time we add a new
% chunk.

% First, initialize everything to zero (scalar 0 is okay)
n1 = 0; % no rows so far
m1 = 0; % mean so far
c1 = 0; % covariance so far

while hasnext(intermValIter)
% Get the next chunk, and extract the count, mean, and covariance
t = getnext(intermValIter);
n2 = t{1};
m2 = t{2};
c2 = t{3};

% Use weighting formulas to update the values so far
n = n1+n2;                     % new count
m = (n1*m1 + n2*m2) / n;       % new mean

% New covariance is a weighted combination of the two covariance, plus
% additional terms that relate to the difference in means
c1 = (n1*c1 + n2*c2 + n1*(m1-m)'*(m1-m) + n2*(m2-m)'*(m2-m))/ n;

% Store the new mean and count for the next iteration
m1 = m;
n1 = n;
end

% Save results in the output key/value store
end

Use mapreduce to apply the map and reduce functions to the datastore, ds.

outds = mapreduce(ds, @covarianceMapper, @covarianceReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce 100%

mapreduce returns a datastore, outds, with files in the current folder.

View the results of the mapreduce call by using the readall function on the output datastore.

results=3×2 table
Key                      Value
_________    ___________________________________

{'count'}    {[                         120664]}
{'mean' }    {[120.2452 703.3926 8.1334 7.1235]}
{'cov'  }    {4x4 double                       }

Count = results.Value{1};
MeanVal = results.Value{2};
Covariance = results.Value{3};

Compute Correlation Matrix

The covariance, mean, and count values are useful to perform further calculations. Compute a correlation matrix by finding the standard deviations and normalizing them to correlation form.

s = sqrt(diag(Covariance));
Correlation = Covariance ./ (s*s')
Correlation = 4×4

1.0000    0.9666    0.0278    0.0902
0.9666    1.0000    0.0216    0.0013
0.0278    0.0216    1.0000    0.8748
0.0902    0.0013    0.8748    1.0000

The elapsed time (first column) and distance (second column) are highly correlated, since Correlation(2,1) = 0.9666. The departure delay (third column) and arrival delay (fourth column) are also highly correlated, since Correlation(4,3) = 0.8748.

Compute Regression Coefficients

Compute some regression coefficients to predict the arrival delay, ArrDelay, using the other three variables as predictors.

slopes = Covariance(1:3,1:3)\Covariance(1:3,4);
intercept = MeanVal(4) - MeanVal(1:3)*slopes;
b = table([intercept; slopes], 'VariableNames', {'Estimate'}, ...
'RowNames', {'Intercept','ActualElapsedTime','Distance','DepDelay'})
b=4×1 table
Estimate
_________

Intercept              -19.912
ActualElapsedTime      0.56278
Distance             -0.068721
DepDelay               0.94689

Perform PCA

Use svd to perform PCA (principal components analysis). PCA is a technique for finding a lower dimensional summary of a data set. The following calculation is a simplified version of PCA, but more options are available from the pca and pcacov functions in Statistics and Machine Learning Toolbox™.

You can carry out PCA using either the covariance or correlation. In this case, use the correlation since the difference in scale of the variables is large. The first two components capture most of the variance.

[~,latent,pcacoef] = svd(Correlation);
latent = diag(latent)
latent = 4×1

2.0052
1.8376
0.1407
0.0164

Display the coefficient matrix. Each column of the coefficients matrix describes how one component is defined as a linear combination of the standardized original variables. The first component is mostly an average of the first two variables, with some additional contribution from the other variables. Similarly, the second component is mostly an average of the last two variables.

pcacoef
pcacoef = 4×4

-0.6291    0.3222   -0.2444   -0.6638
-0.6125    0.3548    0.2591    0.6572
-0.3313   -0.6244    0.6673   -0.2348
-0.3455   -0.6168   -0.6541    0.2689

Local Functions

Listed here are the map and reduce functions that mapreduce applies to the data.

function covarianceMapper(t,~,intermKVStore)
% Get data from input table and remove any rows with missing values
x = t{:,:};
x = x(~any(isnan(x),2),:);

% Compute and save the count, mean, and covariance
n = size(x,1);
m = mean(x,1);
c = cov(x,1);

% Store values as a single item in the intermediate key/value store
end
%------------------------------------------------------------------
function covarianceReducer(~,intermValIter,outKVStore)
% We will combine results computed in the mapper for different chunks of
% the data, updating the count, mean, and covariance each time we add a new
% chunk.

% First, initialize everything to zero (scalar 0 is okay)
n1 = 0; % no rows so far
m1 = 0; % mean so far
c1 = 0; % covariance so far

while hasnext(intermValIter)
% Get the next chunk, and extract the count, mean, and covariance
t = getnext(intermValIter);
n2 = t{1};
m2 = t{2};
c2 = t{3};

% Use weighting formulas to update the values so far
n = n1+n2;                     % new count
m = (n1*m1 + n2*m2) / n;       % new mean

% New covariance is a weighted combination of the two covariance, plus
% additional terms that relate to the difference in means
c1 = (n1*c1 + n2*c2 + n1*(m1-m)'*(m1-m) + n2*(m2-m)'*(m2-m))/ n;

% Store the new mean and count for the next iteration
m1 = m;
n1 = n;
end

% Save results in the output key/value store