Compute Mean by Group Using MapReduce

This example shows how to compute the mean by group in a data set using mapreduce. It demonstrates how to do computations on subgroups of data.

Prepare Data

Create a datastore using the airlinesmall.csv data set. This 12-megabyte data set contains 29 columns of flight information for several airline carriers, including arrival and departure times. In this example, select DayOfWeek and ArrDelay (flight arrival delay) as the variables of interest.

ds = datastore('airlinesmall.csv', 'TreatAsMissing', 'NA');
ds.SelectedVariableNames = {'ArrDelay', 'DayOfWeek'};

The datastore treats 'NA' values as missing, and replaces the missing values with NaN values by default. Additionally, the SelectedVariableNames property allows you to work with only the selected variables of interest, which you can verify using preview.

preview(ds)
ans=8×2 table
    ArrDelay    DayOfWeek
    ________    _________

        8           3    
        8           1    
       21           5    
       13           5    
        4           4    
       59           3    
        3           4    
       11           6    

Run MapReduce

The mapreduce function requires a map function and a reduce function as inputs. The mapper receives blocks of data and outputs intermediate results. The reducer reads the intermediate results and produces a final result.

In this example, the mapper computes the count and sum of delays by the day of week in each block of data, and then stores the results as intermediate key-value pairs. The keys are integers (1 to 7) representing the days of the week and the values are two-element vectors representing the count and sum of the delay of each day.

Display the map function file.

function meanArrivalDelayByDayMapper(data, ~, intermKVStore)
  % Data is an n-by-2 table: first column is the DayOfWeek and the second
  % is the ArrDelay. Remove missing values first.
  delays = data.ArrDelay;
  day = data.DayOfWeek;
  notNaN = ~isnan(delays);
  day = day(notNaN);
  delays = delays(notNaN);

  % find the unique days in this chunk
  [intermKeys,~,idx] = unique(day, 'stable');

  % group delays by idx and apply @grpstatsfun function to each group
  intermVals = accumarray(idx,delays,size(intermKeys),@countsum);
  addmulti(intermKVStore,intermKeys,intermVals);

  function out = countsum(x)
    n = length(x); % count
    s = sum(x); % mean
    out = {[n, s]};
  end
end

After the Map phase, mapreduce groups the intermediate key-value pairs by unique key (in this case, day of the week). Thus, each call to the reducer works on the values associated with one day of the week. The reducer receives a list of the intermediate count and sum of delays for the day specified by the input key (intermKey) and sums up the values into the total count, n and total sum s. Then, the reducer calculates the overall mean, and adds one final key-value pair to the output. This key-value pair represents the mean flight arrival delay for one day of the week.

Display the reduce function file.

function meanArrivalDelayByDayReducer(intermKey, intermValIter, outKVStore)
  n = 0;
  s = 0;

  % get all sets of intermediate results
  while hasnext(intermValIter)
    intermValue = getnext(intermValIter);
    n = n + intermValue(1);
    s = s + intermValue(2);
  end

  % accumulate the sum and count
  mean = s/n;
  % add results to the output datastore
  add(outKVStore,intermKey,mean);
end

Use mapreduce to apply the map and reduce functions to the datastore, ds.

meanDelayByDay = mapreduce(ds, @meanArrivalDelayByDayMapper, ...
                               @meanArrivalDelayByDayReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce  14%
Map 100% Reduce  29%
Map 100% Reduce  43%
Map 100% Reduce  57%
Map 100% Reduce  71%
Map 100% Reduce  86%
Map 100% Reduce 100%

mapreduce returns a datastore, meanDelayByDay, with files in the current folder.

Read the final result from the output datastore, meanDelayByDay.

result = readall(meanDelayByDay)
result=7×2 table
    Key      Value   
    ___    __________

     3     {[7.0038]}
     1     {[7.0833]}
     5     {[9.4193]}
     4     {[9.3185]}
     6     {[4.2095]}
     2     {[5.8569]}
     7     {[6.5241]}

Organize Results

The integer keys (1 to 7) represent the days of the week. To organize the results more, convert the keys to a categorical array, retrieve the numeric values from the single element cells, and rename the variable names of the resulting table.

result.Key = categorical(result.Key, 1:7, ...
               {'Mon','Tue','Wed','Thu','Fri','Sat','Sun'});
result.Value = cell2mat(result.Value);
result.Properties.VariableNames = {'DayOfWeek', 'MeanArrDelay'}
result=7×2 table
    DayOfWeek    MeanArrDelay
    _________    ____________

       Wed          7.0038   
       Mon          7.0833   
       Fri          9.4193   
       Thu          9.3185   
       Sat          4.2095   
       Tue          5.8569   
       Sun          6.5241   

Sort the rows of the table by mean flight arrival delay. This reveals that Saturday is the best day of the week to travel, whereas Friday is the worst.

result = sortrows(result,'MeanArrDelay')
result=7×2 table
    DayOfWeek    MeanArrDelay
    _________    ____________

       Sat          4.2095   
       Tue          5.8569   
       Sun          6.5241   
       Wed          7.0038   
       Mon          7.0833   
       Thu          9.3185   
       Fri          9.4193   

Local Functions

Listed here are the map and reduce functions that mapreduce applies to the data.

function meanArrivalDelayByDayMapper(data, ~, intermKVStore)
  % Data is an n-by-2 table: first column is the DayOfWeek and the second
  % is the ArrDelay. Remove missing values first.
  delays = data.ArrDelay;
  day = data.DayOfWeek;
  notNaN = ~isnan(delays);
  day = day(notNaN);
  delays = delays(notNaN);

  % find the unique days in this chunk
  [intermKeys,~,idx] = unique(day, 'stable');

  % group delays by idx and apply @grpstatsfun function to each group
  intermVals = accumarray(idx,delays,size(intermKeys),@countsum);
  addmulti(intermKVStore,intermKeys,intermVals);

  function out = countsum(x)
    n = length(x); % count
    s = sum(x); % mean
    out = {[n, s]};
  end
end
%---------------------------------------------------------------------------
function meanArrivalDelayByDayReducer(intermKey, intermValIter, outKVStore)
  n = 0;
  s = 0;

  % get all sets of intermediate results
  while hasnext(intermValIter)
    intermValue = getnext(intermValIter);
    n = n + intermValue(1);
    s = s + intermValue(2);
  end

  % accumulate the sum and count
  mean = s/n;
  % add results to the output datastore
  add(outKVStore,intermKey,mean);
end
%---------------------------------------------------------------------------

See Also

|

Related Topics