Example

This complete example shows how the use the ParallelMeanVariance calculator on chunks of data loaded from an HDF5 file.

import mpi4py.MPI
import h5py
import parallel_statistics
import numpy as np

# This data file is available at
# https://portal.nersc.gov/project/lsst/txpipe/tomo_challenge_data/ugrizy/mini_training.hdf5
f = h5py.File("mini_training.hdf5", "r")
comm = mpi4py.MPI.COMM_WORLD

# We must divide up the data between the processes
# Choose the chunk sizes to use here
chunk_size = 1000
total_size = f['redshift_true'].size
nchunk = total_size // chunk_size
if nchunk * chunk_size < total_size:
    nchunk += 1

# Choose the binning in which to put values
nbin = 20
dz = 0.2

# Make our calculator
calc = parallel_statistics.ParallelMeanVariance(size=nbin)

# Loop through the data
for i in range(nchunk):
    # Each process only reads its assigned chunks,
    # otherwise, skip this chunk
    if i % comm.size != comm.rank:
        continue
    # work out the data range to read
    start = i * chunk_size
    end = start + chunk_size

    # read in the input data
    z = f['redshift_true'][start:end]
    r = f['r_mag'][start:end]

    # Work out which bins to use for it
    b = (z / dz).astype(int)

    # add add each one
    for j in range(z.size):
        # skip inf, nan, and sentinel values
        if not r[j] < 30:
            continue
        # add each data point
        calc.add_datum(b[j], r[j])

# Finally, collect the results together
weight, mean, variance = calc.collect(comm)

# Print out results - only the root process gets the data, unless you pass
# mode=allreduce to collect.  Will print out NaNs for bins with no objects in.
if comm.rank == 0:
    for i in range(nbin):
        print(f"z = [{ dz * i :.1f} .. { dz * (i+1) :.1f}]    r = { mean[i] :.2f} ± { variance[i] :.2f}")