Source code for parallel_statistics.sum

import numpy as np
from .sparse import SparseArray
from .tools import AllOne, in_place_reduce

[docs]class ParallelSum: """``ParallelMean`` is a parallel and incremental calculator for sums. "Incremental" means that it does not need to read the entire data set at once, and requires only a single pass through the data. The calculator is designed to work on data in a collection of different bins, for example a map (where the bins are pixels). The usual life-cycle of this class is: * create an instance of the class (on each process if in parallel) * repeatedly call ``add_data`` or ``add_datum`` on it to add new data points * call ``collect``, (supplying in MPI communicator if in parallel) You can also call the ``run`` method with an iterator to combine these. If only a few indices in the data are expected to be used, the sparse option can be set to change how data is represented and returned to a sparse form which will use less memory and be faster below a certain size. Bins which have no objects in will be given weight=0 and sum=0. """ def __init__(self, size, sparse=False): """Create the calculator Parameters ---------- size: int The maximum number of bins or pixels sparse: bool, optional If True, use sparse arrays to minimize memory usage """ self.size = size self.sparse = sparse if sparse: t = SparseArray else: t = np.zeros self._sum = t(size) self._weight = t(size)
[docs] def add_datum(self, bin, value, weight=None): """Add a single data point to the sum. Parameters ---------- bin: int Index of bin or pixel these value apply to value: float Value for this bin to accumulate """ if weight is None: weight = 1 self._weight[bin] += weight self._sum[bin] += value * weight
[docs] def add_data(self, bin, values, weights=None): """Add a chunk of data in the same bin to the sum. Parameters ---------- bin: int Index of bin or pixel these value apply to values: sequence Values for this bin to accumulate weights: sequence Optional, weights per value """ if weights is None: self._weight[bin] += np.size(values) self._sum[bin] += np.sum(values) else: self._weight[bin] += np.sum(weights) self._sum[bin] += np.dot(values, weights)
[docs] def collect(self, comm=None, mode="gather"): """Finalize the sum and return the counts and the sums. The "mode" decides whether all processes receive the results or just the root. Parameters ---------- comm: mpi communicator or None If in parallel, supply this mode: str, optional "gather" or "allgather" Returns ------- count: array or SparseArray The number of values hitting each pixel sum: array or SparseArray The total of values hitting each pixel """ if comm is None: return self._weight, self._sum if self.sparse: if mode == "allgather": self._weight = comm.allreduce(self._weight) self._sum = comm.allreduce(self._sum) else: self._weight = comm.reduce(self._weight) self._sum = comm.reduce(self._sum) else: in_place_reduce(self._weight, comm, allreduce=(mode == "allgather")) in_place_reduce(self._sum, comm, allreduce=(mode == "allgather")) return self._weight, self._sum
[docs] def run(self, iterator, comm=None, mode="gather"): """Run the whole life cycle on an iterator returning data chunks. This is equivalent to calling add_data repeatedly and then collect. Parameters ---------- iterator: iterator Iterator yielding (pixel, values) pairs comm: MPI comm or None The comm, or None for serial Returns ------- count: array or SparseArray The number of values hitting each pixel sum: array or SparseArray The total of values hitting each pixel """ for values in iterator: self.add_data(*values) return self.collect(comm=comm, mode=mode)