Main Content

This example shows how to compute the mean and covariance for several variables in a large data set using `mapreduce`

. It then uses the covariance to perform several follow-up calculations that do not require another iteration over the entire data set.

Create a datastore using the `airlinesmall.csv`

data set. This 12-megabyte data set contains 29 columns of flight information for several airline carriers, including arrival and departure times. In this example, select `ActualElapsedTime`

(total flight time), `Distance`

(total flight distance), `DepDelay`

(flight departure delay), and `ArrDelay`

(flight arrival delay) as the variables of interest.

ds = tabularTextDatastore('airlinesmall.csv', 'TreatAsMissing', 'NA'); ds.SelectedVariableNames = {'ActualElapsedTime', 'Distance', ... 'DepDelay', 'ArrDelay'};

The datastore treats `'NA'`

values as missing, and replaces the missing values with `NaN`

values by default. Additionally, the `SelectedVariableNames`

property allows you to work with only the selected variables of interest, which you can verify using `preview`

.

preview(ds)

`ans=`*8×4 table*
ActualElapsedTime Distance DepDelay ArrDelay
_________________ ________ ________ ________
53 308 12 8
63 296 1 8
83 480 20 21
59 296 12 13
77 373 -1 4
61 308 63 59
84 447 -2 3
155 954 -1 11

The `mapreduce`

function requires a map function and a reduce function as inputs. The mapper receives blocks of data and outputs intermediate results. The reducer reads the intermediate results and produces a final result.

In this example, the mapper computes the count, mean, and covariance for the variables in each block of data in the datastore, `ds`

. Then, the mapper stores the computed values for each block as an intermediate key-value pair consisting of a single key with a cell array containing the three computed values.

Display the map function file.

function covarianceMapper(t,~,intermKVStore) % Get data from input table and remove any rows with missing values x = t{:,:}; x = x(~any(isnan(x),2),:); % Compute and save the count, mean, and covariance n = size(x,1); m = mean(x,1); c = cov(x,1); % Store values as a single item in the intermediate key/value store add(intermKVStore,'key',{n m c}) end

The reducer combines the intermediate results for each block to obtain the count, mean, and covariance for each variable of interest in the entire data set. The reducer stores the final key-value pairs for the keys `'count'`

, `'mean'`

, and `'cov'`

with the corresponding values for each variable.

Display the reduce function file.

function covarianceReducer(~,intermValIter,outKVStore) % We will combine results computed in the mapper for different chunks of % the data, updating the count, mean, and covariance each time we add a new % chunk. % First, initialize everything to zero (scalar 0 is okay) n1 = 0; % no rows so far m1 = 0; % mean so far c1 = 0; % covariance so far while hasnext(intermValIter) % Get the next chunk, and extract the count, mean, and covariance t = getnext(intermValIter); n2 = t{1}; m2 = t{2}; c2 = t{3}; % Use weighting formulas to update the values so far n = n1+n2; % new count m = (n1*m1 + n2*m2) / n; % new mean % New covariance is a weighted combination of the two covariance, plus % additional terms that relate to the difference in means c1 = (n1*c1 + n2*c2 + n1*(m1-m)'*(m1-m) + n2*(m2-m)'*(m2-m))/ n; % Store the new mean and count for the next iteration m1 = m; n1 = n; end % Save results in the output key/value store add(outKVStore,'count',n1); add(outKVStore,'mean',m1); add(outKVStore,'cov',c1); end

Use `mapreduce`

to apply the map and reduce functions to the datastore, `ds`

.

outds = mapreduce(ds, @covarianceMapper, @covarianceReducer);

******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 16% Reduce 0% Map 32% Reduce 0% Map 48% Reduce 0% Map 65% Reduce 0% Map 81% Reduce 0% Map 97% Reduce 0% Map 100% Reduce 0% Map 100% Reduce 100%

`mapreduce`

returns a datastore, `outds`

, with files in the current folder.

View the results of the `mapreduce`

call by using the `readall`

function on the output datastore.

results = readall(outds)

`results=`*3×2 table*
Key Value
_________ ___________________________________
{'count'} {[ 120664]}
{'mean' } {[120.2452 703.3926 8.1334 7.1235]}
{'cov' } {4x4 double }

Count = results.Value{1}; MeanVal = results.Value{2}; Covariance = results.Value{3};

The covariance, mean, and count values are useful to perform further calculations. Compute a correlation matrix by finding the standard deviations and normalizing them to correlation form.

s = sqrt(diag(Covariance)); Correlation = Covariance ./ (s*s')

`Correlation = `*4×4*
1.0000 0.9666 0.0278 0.0902
0.9666 1.0000 0.0216 0.0013
0.0278 0.0216 1.0000 0.8748
0.0902 0.0013 0.8748 1.0000

The elapsed time (first column) and distance (second column) are highly correlated, since `Correlation(2,1) = 0.9666`

. The departure delay (third column) and arrival delay (fourth column) are also highly correlated, since `Correlation(4,3) = 0.8748`

.

Compute some regression coefficients to predict the arrival delay, `ArrDelay`

, using the other three variables as predictors.

slopes = Covariance(1:3,1:3)\Covariance(1:3,4); intercept = MeanVal(4) - MeanVal(1:3)*slopes; b = table([intercept; slopes], 'VariableNames', {'Estimate'}, ... 'RowNames', {'Intercept','ActualElapsedTime','Distance','DepDelay'})

`b=`*4×1 table*
Estimate
_________
Intercept -19.912
ActualElapsedTime 0.56278
Distance -0.068721
DepDelay 0.94689

Use `svd`

to perform PCA (principal components analysis). PCA is a technique for finding a lower dimensional summary of a data set. The following calculation is a simplified version of PCA, but more options are available from the `pca`

and `pcacov`

functions in Statistics and Machine Learning Toolbox™.

You can carry out PCA using either the covariance or correlation. In this case, use the correlation since the difference in scale of the variables is large. The first two components capture most of the variance.

[~,latent,pcacoef] = svd(Correlation); latent = diag(latent)

`latent = `*4×1*
2.0052
1.8376
0.1407
0.0164

Display the coefficient matrix. Each column of the coefficients matrix describes how one component is defined as a linear combination of the standardized original variables. The first component is mostly an average of the first two variables, with some additional contribution from the other variables. Similarly, the second component is mostly an average of the last two variables.

pcacoef

`pcacoef = `*4×4*
-0.6291 0.3222 -0.2444 -0.6638
-0.6125 0.3548 0.2591 0.6572
-0.3313 -0.6244 0.6673 -0.2348
-0.3455 -0.6168 -0.6541 0.2689

Listed here are the map and reduce functions that `mapreduce`

applies to the data.

function covarianceMapper(t,~,intermKVStore) % Get data from input table and remove any rows with missing values x = t{:,:}; x = x(~any(isnan(x),2),:); % Compute and save the count, mean, and covariance n = size(x,1); m = mean(x,1); c = cov(x,1); % Store values as a single item in the intermediate key/value store add(intermKVStore,'key',{n m c}) end %------------------------------------------------------------------ function covarianceReducer(~,intermValIter,outKVStore) % We will combine results computed in the mapper for different chunks of % the data, updating the count, mean, and covariance each time we add a new % chunk. % First, initialize everything to zero (scalar 0 is okay) n1 = 0; % no rows so far m1 = 0; % mean so far c1 = 0; % covariance so far while hasnext(intermValIter) % Get the next chunk, and extract the count, mean, and covariance t = getnext(intermValIter); n2 = t{1}; m2 = t{2}; c2 = t{3}; % Use weighting formulas to update the values so far n = n1+n2; % new count m = (n1*m1 + n2*m2) / n; % new mean % New covariance is a weighted combination of the two covariance, plus % additional terms that relate to the difference in means c1 = (n1*c1 + n2*c2 + n1*(m1-m)'*(m1-m) + n2*(m2-m)'*(m2-m))/ n; % Store the new mean and count for the next iteration m1 = m; n1 = n; end % Save results in the output key/value store add(outKVStore,'count',n1); add(outKVStore,'mean',m1); add(outKVStore,'cov',c1); end %------------------------------------------------------------------

`mapreduce`

| `tabularTextDatastore`