0

I have been adapting answers to this question to run an interpolation of a 3D masked array along the axis 0. The mask is also interpolated.

The non-parallel version of my code runs fine and outputs what I want, so I'm pretty confident in the masked_interpolation function (see this question).

However it's pretty slow so I want to parallelize the code for multiple CPUs using this answer and I tried to adapt it to the use of masked arrays. But it does not work (it returns a warning and not the expected interpolated array) and I am not sure why?

#!/usr/bin/python

def unpacking_apply_along_axis(all_args):
    """
    Like numpy.apply_along_axis(), but with arguments in a tuple
    instead.

    This function is useful with multiprocessing.Pool().map(): (1)
    map() only handles functions that take a single argument, and (2)
    this function can generally be imported from a module, as required
    by map().
    """
    (func1d, axis, arr, args, kwargs) = all_args
    return func1d, axis, arr, args, kwargs


def parallel_apply_along_axis(func1d, axis, arr, *args, **kwargs):
    """
    Like numpy.apply_along_axis(), but takes advantage of multiple
    cores.
    """
    import multiprocessing
    import numpy as np
    import numpy.ma as ma
    # Effective axis where apply_along_axis() will be applied by each
    # worker (any non-zero axis number would work, so as to allow the use
    # of `np.array_split()`, which is only done on axis 0):
    effective_axis = 1 if axis == 0 else axis
    if effective_axis != axis:
        arr = arr.swapaxes(axis, effective_axis)

    # Chunks for the mapping (only a few chunks):
    chunks = [(func1d, effective_axis, sub_arr, args, kwargs)
              for sub_arr in np.array_split(arr, multiprocessing.cpu_count()-1)]

    pool = multiprocessing.Pool()
    individual_results = pool.map(unpacking_apply_along_axis, chunks)
    # Freeing the workers:
    pool.close()
    pool.join()

    return ma.concatenate(individual_results)

def masked_interpolation(data, x, xp, propagate_mask=True):
    import math
    import numpy as np
    import numpy.ma as ma

    # The x-coordinates (missing times) at which to evaluate the interpolated values.
    assert len(x) >= 1
    # The x-coordinates (existing times) of the data points (where returns a tuple because each element of the tuple refers to a dimension.)
    assert len(xp) >= 2
    assert np.all(np.diff(xp) > 0) , 'Sequence of yearsBP must be increasning'
    # The y-coordinates (value at existing times) of the data points, that is the valid entries
    fp = np.take(data, xp)
    assert len(fp) >= 2

    # Returns the one-dimensional piecewise linear interpolant to a function with given discrete data points (xp, fp), evaluated at x.
    new_y = np.interp(x, xp, fp.filled(np.nan))
    np.nan_to_num(new_y, copy=False)

    # interpolate mask & apply to interpolated data
    if propagate_mask:
        new_mask = data.mask[:]
        new_mask[new_mask]  = 1
        new_mask[~new_mask] = 0
        # the mask y values at existing times
        new_fp = np.take(new_mask, xp)
        new_mask = np.interp(x, xp, new_fp)
        new_y = np.ma.masked_array(new_y, new_mask > 0.5)

    data[x] = new_y
    return data

import numpy as np
import numpy.ma as ma

shape=(5,1,3)
A = np.ma.zeros(shape)
arr = np.ma.array(data=A, mask = [ [[1, 0, 0]], [[0, 0, 0]], [[0, 1, 0]], [[0, 0, 0]], [[0, 0, 1]] ])
print(arr)
arr[0] = [1, 1, 1]
arr[2] = [3, 3, 3]
arr[4] = [5, 5, 5]
print(arr)
interpolated = parallel_apply_along_axis(func1d=masked_interpolation, axis=0, arr=arr, x=[1,3], xp=[0,2,4])
print(interpolated)

It returns this:

/home/becheler/dev/virtual_environments/crumbs-env/lib/python3.8/site-packages/numpy/ma/core.py:711: VisibleDeprecationWarning: Creating an ndarray from ragged nested sequences (which is a list-or-tuple of lists-or-tuples-or ndarrays with different lengths or shapes) is deprecated. If you meant to do this, you must specify 'dtype=object' when creating the ndarray.
  data = np.array(a, copy=False, subok=subok)
[<function masked_interpolation at 0x7fdaae445e50> 1
 masked_array(
   data=[[[1.0, 1.0, 1.0],
          [0.0, 0.0, 0.0],
          [3.0, 3.0, 3.0],
          [0.0, 0.0, 0.0],
          [5.0, 5.0, 5.0]]],
   mask=[[[False, False, False],
          [False, False, False],
          [False, False, False],
          [False, False, False],
          [False, False, False]]],
   fill_value=1e+20)               () {'x': [1, 3], 'xp': [0, 2, 4]}
 <function masked_interpolation at 0x7fdaae445e50> 1 masked_array(
                                                       data=[],
                                                       mask=[],
                                                       fill_value=1e+20,
                                                       dtype=float64)
 () {'x': [1, 3], 'xp': [0, 2, 4]}
 <function masked_interpolation at 0x7fdaae445e50> 1 masked_array(
                                                       data=[],
                                                       mask=[],
                                                       fill_value=1e+20,
                                                       dtype=float64)
 () {'x': [1, 3], 'xp': [0, 2, 4]}]

Arnaud Becheler
  • 367
  • 1
  • 13

0 Answers0