Source code for parallel_statistics.mean_variance

# coding: utf-8

import numpy as np
from .sparse import SparseArray
from .tools import AllOne
import os

# We use numba's just-in-time compilation to speed up this class;
# the difference can be a factor of several hundred.  We let the user
# disable this with an environment variable, because in case places like
# NERSC or other strange systems will have problems with JIT.
# User can disable by setting the env var PAR_STATS_NO_JIT to anything
# other than zero.  JIT is also disabled if the numba package is not installed.
# In either of these cases we replace the numba jit decorator with an identity
# decorator
if os.environ.get("PAR_STATS_NO_JIT", "0") != "0":
    njit = lambda p: p
else:
    try:
        from numba import njit
    except:
        njit = lambda p: p


[docs]class ParallelMeanVariance: """``ParallelMeanVariance`` is a parallel and incremental calculator for mean and variance statistics. "Incremental" means that it does not need to read the entire data set at once, and requires only a single pass through the data. The calculator is designed to work on data in a collection of different bins, for example a map (where the bins are pixels). The usual life-cycle of this class is: * create an instance of the class (on each process if in parallel) * repeatedly call ``add_data`` or ``add_datum`` on it to add new data points * call ``collect``, (supplying in MPI communicator if in parallel) You can also call the ``run`` method with an iterator to combine these. If only a few indices in the data are expected to be used, the sparse option can be set to change how data is represented and returned to a sparse form which will use less memory and be faster below a certain size. Bins which have no objects in will be given weight=0, mean=nan, and var=nan. The algorithm here is basd on Schubert & Gertz 2018, Numerically Stable Parallel Computation of (Co-)Variance By default the module looks for the package "Numba" and uses its just-in-time compilation to speed up this class. To disable this, export the environment variable PAR_STATS_NO_JIT=1 Attributes ---------- size: int number of pixels or bins sparse: bool whether are using sparse representations of arrays """ def __init__(self, size, sparse=False): """Create a parallel, on-line mean and variance calcuator. Parameters ---------- size: int The number of bins (or pixels) in which statistics will be calculated sparse: bool, optional Whether to use a sparse representation of the arrays, internally and returned. """ self.size = size self.sparse = sparse if sparse: t = SparseArray else: t = np.zeros self._mean = t(size) self._weight = t(size) self._M2 = t(size)
[docs] def add_datum(self, bin, value, weight=1): """Add a single data point to the sum. Parameters ---------- bin: int Index of bin or pixel these value apply to value: float Value for this bin to accumulate weight: float Optional, default=1, a weight for this data point """ if weight == 0: return self._weight[bin] += weight delta = value - self._mean[bin] self._mean[bin] += (weight / self._weight[bin]) * delta delta2 = value - self._mean[bin] self._M2[bin] += weight * delta * delta2
[docs] def add_data(self, bin, values, weights=None): """Add a chunk of data in the same bin. Add a set of values assinged to a given bin or pixel. Weights may be supplied, and if they are not will be set to 1. Parameters ---------- bin: int The bin or pixel for these values values: sequence A sequence (e.g. array or list) of values assigned to this bin weights: sequence, optional A sequence (e.g. array or list) of weights per value """ if weights is None: if self.sparse: self._add_data_noweight_sparse(bin, values, self._weight, self._mean, self._M2) else: self._add_data_noweight_dense(bin, values, self._weight, self._mean, self._M2) else: if self.sparse: self._add_data_weight_sparse(bin, values, weights, self._weight, self._mean, self._M2) else: self._add_data_weight_dense(bin, values, weights, self._weight, self._mean, self._M2)
# To be able to use Numba's jit tool these seem to have to be static methods # so that the type of everything can be inferred. # The Numba version seems to be several hundred times faster after first compilation def _add_data_noweight_core(bin, values, _weight, _mean, _M2): n = len(values) for i in range(n): value = values[i] _weight[bin] += 1 delta = value - _mean[bin] _mean[bin] += delta / _weight[bin] delta2 = value - _mean[bin] _M2[bin] += delta * delta2 # We also can't use the Numba version on the sparse arrays, so we need to # make a JIT and non-JIT version here. _add_data_noweight_sparse = staticmethod(_add_data_noweight_core) _add_data_noweight_dense = staticmethod(njit(_add_data_noweight_core)) def _add_data_weight_core(bin, values, weights, _weight, _mean, _M2): n = len(values) for i in range(n): w = weights[i] if w == 0: continue value = values[i] _weight[bin] += w delta = value - _mean[bin] _mean[bin] += (w / _weight[bin]) * delta delta2 = value - _mean[bin] _M2[bin] += w * delta * delta2 # Same as above for the weighted version _add_data_weight_sparse = staticmethod(_add_data_weight_core) _add_data_weight_dense = staticmethod(njit(_add_data_weight_core))
[docs] @np.errstate(divide="ignore", invalid="ignore") def collect(self, comm=None, mode="gather"): """Finalize the statistics calculation, collecting togther results from multiple processes. If mode is set to "allgather" then every calling process will return the same data. Otherwise the non-root processes will return None for all the values. You can only call this once, when you've finished calling add_data. After that internal data is deleted. Parameters ---------- comm: MPI Communicator, optional mode: string, optional 'gather' (default), or 'allgather' Returns ------- weight: array or SparseArray The total weight or count in each bin mean: array or SparseArray An array of the computed mean for each bin variance: array or SparseArray An array of the computed variance for each bin """ # Serial version - just take the values from this processor, # set the values where the weight is zero, and return if comm is None or comm.Get_size() == 1: variance = self._M2 / self._weight self._mean[self._weight == 0] = np.nan results = self._weight, self._mean, variance del self._M2 del self._weight del self._mean return results # Otherwise we do this in parallel. The general approach is # a little crude because the reduction operation here is not # that simple (we can't just sum things, because we also need # the variance and combining those is slightly more complicated). rank = comm.Get_rank() size = comm.Get_size() if mode not in ["gather", "allgather"]: raise ValueError( "mode for ParallelStatsCalculator.collect must be" "'gather' or 'allgather'" ) # The send command differs depending whether we are sending # a sparse object (which is pickled) or an array. if self.sparse: send = lambda x: comm.send(x, dest=0) else: send = lambda x: comm.Send(x, dest=0) # If we are not the root process we send our results # to the root one by one. Then delete them to save space, # since for the mapping case this can get quite large. if rank > 0: send(self._weight) del self._weight send(self._mean) del self._mean send(self._M2) del self._M2 # If we are running allgather and need dense arrays # then we make a buffer for them now and will send # them below if mode == "allgather" and not self.sparse: weight = np.empty(self.size) mean = np.empty(self.size) variance = np.empty(self.size) else: weight = None mean = None variance = None # Otherwise this is the root node, which accumulates the # results else: # start with our own results weight = self._weight mean = self._mean sq = self._M2 if not self.sparse: # In the sparse case MPI4PY unpickles and creates a new variable. # In the dense case we have to pre-allocate it. w = np.empty(self.size) m = np.empty(self.size) s = np.empty(self.size) # Now received each processes's data chunk in turn # at root. for i in range(1, size): if self.sparse: w = comm.recv(source=i) m = comm.recv(source=i) s = comm.recv(source=i) else: comm.Recv(w, source=i) comm.Recv(m, source=i) comm.Recv(s, source=i) # Add this to the overall sample. This is very similar # to what's done in add_data except it combines all the # pixels/bins at once. weight, mean, sq = self._accumulate(weight, mean, sq, w, m, s) # get the population variance from the squared deviations # and set the it to nan where we can't estimate it. variance = sq / weight mean[weight == 0] = np.nan if mode == "allgather": if self.sparse: weight, mean, variance = comm.bcast([weight, mean, variance]) else: comm.Bcast(weight) comm.Bcast(mean) comm.Bcast(variance) return weight, mean, variance
[docs] def run(self, iterator, comm=None, mode="gather"): """Run the whole life cycle on an iterator returning data chunks. This is equivalent to calling add_data repeatedly and then collect. Parameters ---------- iterator: iterator Iterator yieding (bin, values) or (bin, values, weights) comm: MPI comm, optional The comm, or None for serial mode: str, optional "gather" or "allgather" Returns ------- weight: array or SparseArray The total weight or count in each bin mean: array or SparseArray An array of the computed mean for each bin variance: array or SparseArray An array of the computed variance for each bin """ for values in iterator: self.add_data(*values) return self.collect(comm=comm, mode=mode)
def _accumulate(self, weight, mean, sq, w, m, s): # Algorithm from Shubert and Gertz. if self.sparse: weight = weight + w delta = m - mean mean = mean + (w / weight) * delta delta2 = m - mean sq = sq + s + w * delta * delta2 else: good = w != 0 weight[good] = weight[good] + w[good] delta = m[good] - mean[good] mean[good] = mean[good] + (w[good] / weight[good]) * delta delta2 = m[good] - mean[good] sq[good] = sq[good] + s[good] + w[good] * delta * delta2 return weight, mean, sq