Using MapReduce to Compute Covariance and Related Quantities
This example shows how to compute the mean and covariance for several variables in a large data set using mapreduce
. It then uses the covariance to perform several follow-up calculations that do not require another iteration over the entire data set.
Prepare Data
Create a datastore using the airlinesmall.csv
data set. This 12-megabyte data set contains 29 columns of flight information for several airline carriers, including arrival and departure times. In this example, select ActualElapsedTime
(total flight time), Distance
(total flight distance), DepDelay
(flight departure delay), and ArrDelay
(flight arrival delay) as the variables of interest.
ds = tabularTextDatastore('airlinesmall.csv', 'TreatAsMissing', 'NA'); ds.SelectedVariableNames = {'ActualElapsedTime', 'Distance', ... 'DepDelay', 'ArrDelay'};
The datastore treats 'NA'
values as missing, and replaces the missing values with NaN
values by default. Additionally, the SelectedVariableNames
property allows you to work with only the selected variables of interest, which you can verify using preview
.
preview(ds)
ans=8×4 table
ActualElapsedTime Distance DepDelay ArrDelay
_________________ ________ ________ ________
53 308 12 8
63 296 1 8
83 480 20 21
59 296 12 13
77 373 -1 4
61 308 63 59
84 447 -2 3
155 954 -1 11
Run MapReduce
The mapreduce
function requires a map function and a reduce function as inputs. The mapper receives blocks of data and outputs intermediate results. The reducer reads the intermediate results and produces a final result.
In this example, the mapper computes the count, mean, and covariance for the variables in each block of data in the datastore, ds
. Then, the mapper stores the computed values for each block as an intermediate key-value pair consisting of a single key with a cell array containing the three computed values.
Display the map function file.
function covarianceMapper(t,~,intermKVStore) % Get data from input table and remove any rows with missing values x = t{:,:}; x = x(~any(isnan(x),2),:); % Compute and save the count, mean, and covariance n = size(x,1); m = mean(x,1); c = cov(x,1); % Store values as a single item in the intermediate key/value store add(intermKVStore,'key',{n m c}) end
The reducer combines the intermediate results for each block to obtain the count, mean, and covariance for each variable of interest in the entire data set. The reducer stores the final key-value pairs for the keys 'count'
, 'mean'
, and 'cov'
with the corresponding values for each variable.
Display the reduce function file.
function covarianceReducer(~,intermValIter,outKVStore) % We will combine results computed in the mapper for different chunks of % the data, updating the count, mean, and covariance each time we add a new % chunk. % First, initialize everything to zero (scalar 0 is okay) n1 = 0; % no rows so far m1 = 0; % mean so far c1 = 0; % covariance so far while hasnext(intermValIter) % Get the next chunk, and extract the count, mean, and covariance t = getnext(intermValIter); n2 = t{1}; m2 = t{2}; c2 = t{3}; % Use weighting formulas to update the values so far n = n1+n2; % new count m = (n1*m1 + n2*m2) / n; % new mean % New covariance is a weighted combination of the two covariance, plus % additional terms that relate to the difference in means c1 = (n1*c1 + n2*c2 + n1*(m1-m)'*(m1-m) + n2*(m2-m)'*(m2-m))/ n; % Store the new mean and count for the next iteration m1 = m; n1 = n; end % Save results in the output key/value store add(outKVStore,'count',n1); add(outKVStore,'mean',m1); add(outKVStore,'cov',c1); end
Use mapreduce
to apply the map and reduce functions to the datastore, ds
.
outds = mapreduce(ds, @covarianceMapper, @covarianceReducer);
******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 16% Reduce 0% Map 32% Reduce 0% Map 48% Reduce 0% Map 65% Reduce 0% Map 81% Reduce 0% Map 97% Reduce 0% Map 100% Reduce 0% Map 100% Reduce 100%
mapreduce
returns a datastore, outds
, with files in the current folder.
View the results of the mapreduce
call by using the readall
function on the output datastore.
results = readall(outds)
results=3×2 table
Key Value
_________ ___________________________________
{'count'} {[ 120664]}
{'mean' } {[120.2452 703.3926 8.1334 7.1235]}
{'cov' } {4x4 double }
Count = results.Value{1}; MeanVal = results.Value{2}; Covariance = results.Value{3};
Compute Correlation Matrix
The covariance, mean, and count values are useful to perform further calculations. Compute a correlation matrix by finding the standard deviations and normalizing them to correlation form.
s = sqrt(diag(Covariance)); Correlation = Covariance ./ (s*s')
Correlation = 4×4
1.0000 0.9666 0.0278 0.0902
0.9666 1.0000 0.0216 0.0013
0.0278 0.0216 1.0000 0.8748
0.0902 0.0013 0.8748 1.0000
The elapsed time (first column) and distance (second column) are highly correlated, since Correlation(2,1) = 0.9666
. The departure delay (third column) and arrival delay (fourth column) are also highly correlated, since Correlation(4,3) = 0.8748
.
Compute Regression Coefficients
Compute some regression coefficients to predict the arrival delay, ArrDelay
, using the other three variables as predictors.
slopes = Covariance(1:3,1:3)\Covariance(1:3,4); intercept = MeanVal(4) - MeanVal(1:3)*slopes; b = table([intercept; slopes], 'VariableNames', {'Estimate'}, ... 'RowNames', {'Intercept','ActualElapsedTime','Distance','DepDelay'})
b=4×1 table
Estimate
_________
Intercept -19.912
ActualElapsedTime 0.56278
Distance -0.068721
DepDelay 0.94689
Perform PCA
Use svd
to perform PCA (principal components analysis). PCA is a technique for finding a lower dimensional summary of a data set. The following calculation is a simplified version of PCA, but more options are available from the pca
and pcacov
functions in Statistics and Machine Learning Toolbox™.
You can carry out PCA using either the covariance or correlation. In this case, use the correlation since the difference in scale of the variables is large. The first two components capture most of the variance.
[~,latent,pcacoef] = svd(Correlation); latent = diag(latent)
latent = 4×1
2.0052
1.8376
0.1407
0.0164
Display the coefficient matrix. Each column of the coefficients matrix describes how one component is defined as a linear combination of the standardized original variables. The first component is mostly an average of the first two variables, with some additional contribution from the other variables. Similarly, the second component is mostly an average of the last two variables.
pcacoef
pcacoef = 4×4
-0.6291 0.3222 -0.2444 -0.6638
-0.6125 0.3548 0.2591 0.6572
-0.3313 -0.6244 0.6673 -0.2348
-0.3455 -0.6168 -0.6541 0.2689
See Also
mapreduce
| tabularTextDatastore