5

I'd like to write the vectored version of code that calculates Arnaud Legoux Moving Average using NumPy (or Pandas). Could you help me with this, please? Thanks.

Non-vectored version looks like following (see below).

def NPALMA(pnp_array, **kwargs) :
    '''
    ALMA - Arnaud Legoux Moving Average,
    http://www.financial-hacker.com/trend-delusion-or-reality/
    https://github.com/darwinsys/Trading_Strategies/blob/master/ML/Features.py
    '''
    length = kwargs['length']
    # just some number (6.0 is useful)
    sigma = kwargs['sigma']
    # sensisitivity (close to 1) or smoothness (close to 0)
    offset = kwargs['offset']

    asize = length - 1
    m = offset * asize
    s = length  / sigma
    dss = 2 * s * s

    alma = np.zeros(pnp_array.shape)
    wtd_sum = np.zeros(pnp_array.shape)

    for l in range(len(pnp_array)):
        if l >= asize:
            for i in range(length):
                im = i - m
                wtd = np.exp( -(im * im) / dss)
                alma[l] += pnp_array[l - length + i] * wtd
                wtd_sum[l] += wtd
            alma[l] = alma[l] / wtd_sum[l]

    return alma
Prokhozhii
  • 530
  • 1
  • 6
  • 11

1 Answers1

4

Starting Approach

We can create sliding windows along the first axis and then use tensor multiplication with the range of wtd values for the sum-reductions.

The implementation would look something like this -

# Get all wtd values in an array
wtds = np.exp(-(np.arange(length) - m)**2/dss)

# Get the sliding windows for input array along first axis
pnp_array3D = strided_axis0(pnp_array,len(wtds))

# Initialize o/p array
out = np.zeros(pnp_array.shape)

# Get sum-reductions for the windows which don't need wrapping over
out[length:] = np.tensordot(pnp_array3D,wtds,axes=((1),(0)))[:-1]

# Last element of the output needed wrapping. So, do it separately.
out[length-1] = wtds.dot(pnp_array[np.r_[-1,range(length-1)]])

# Finally perform the divisions
out /= wtds.sum()

Function to get the sliding windows : strided_axis0 is from here.

Boost with 1D convolution

Those multiplications with wtds values and then their sum-reductions are basically convolution along the first axis. As such, we can use scipy.ndimage.convolve1d along axis=0. This would be much faster given the memory efficiency, as we won't be creating huge sliding windows.

The implementation would be -

from scipy.ndimage import convolve1d as conv

avgs = conv(pnp_array, weights=wtds/wtds.sum(),axis=0, mode='wrap')

Thus, out[length-1:], which are the non-zero rows would be same as avgs[:-length+1].

There could be some precision difference if we are working with really small kernel numbers from wtds. So, keep that in mind if using this convolution method.

Runtime test

Approaches -

def original_app(pnp_array, length, m, dss):
    alma = np.zeros(pnp_array.shape)
    wtd_sum = np.zeros(pnp_array.shape)

    for l in range(len(pnp_array)):
        if l >= asize:
            for i in range(length):
                im = i - m
                wtd = np.exp( -(im * im) / dss)
                alma[l] += pnp_array[l - length + i] * wtd
                wtd_sum[l] += wtd
            alma[l] = alma[l] / wtd_sum[l]
    return alma

def vectorized_app1(pnp_array, length, m, dss):
    wtds = np.exp(-(np.arange(length) - m)**2/dss)
    pnp_array3D = strided_axis0(pnp_array,len(wtds))
    out = np.zeros(pnp_array.shape)
    out[length:] = np.tensordot(pnp_array3D,wtds,axes=((1),(0)))[:-1]
    out[length-1] = wtds.dot(pnp_array[np.r_[-1,range(length-1)]])
    out /= wtds.sum()
    return out

def vectorized_app2(pnp_array, length, m, dss):
    wtds = np.exp(-(np.arange(length) - m)**2/dss)
    return conv(pnp_array, weights=wtds/wtds.sum(),axis=0, mode='wrap')

Timings -

In [470]: np.random.seed(0)
     ...: m,n = 1000,100
     ...: pnp_array = np.random.rand(m,n)
     ...: 
     ...: length = 6
     ...: sigma = 0.3
     ...: offset = 0.5
     ...: 
     ...: asize = length - 1
     ...: m = np.floor(offset * asize)
     ...: s = length  / sigma
     ...: dss = 2 * s * s
     ...: 

In [471]: %timeit original_app(pnp_array, length, m, dss)
     ...: %timeit vectorized_app1(pnp_array, length, m, dss)
     ...: %timeit vectorized_app2(pnp_array, length, m, dss)
     ...: 
10 loops, best of 3: 36.1 ms per loop
1000 loops, best of 3: 1.84 ms per loop
1000 loops, best of 3: 684 µs per loop

In [472]: np.random.seed(0)
     ...: m,n = 10000,1000 # rest same as previous one

In [473]: %timeit original_app(pnp_array, length, m, dss)
     ...: %timeit vectorized_app1(pnp_array, length, m, dss)
     ...: %timeit vectorized_app2(pnp_array, length, m, dss)
     ...: 
1 loop, best of 3: 503 ms per loop
1 loop, best of 3: 222 ms per loop
10 loops, best of 3: 106 ms per loop
Divakar
  • 212,295
  • 18
  • 231
  • 332
  • I've checked your second approach with 1d convolution. And it looks like there is something wrong with it. But I can't get what exactly. My example: pnp_array=np.array([ 3924.00752506 , 5774.30335369 , 5519.40734814 , 4931.71344059]) offset=0.85 sigma=6 length=3 m=1.7 dss=0.5 Expected result should be [0 , 0, 5594.17030842 , 5115.59420056]. But second application returns [ 0 , 0 , 5693.3358598 , 5333.61073335]. So cummulative error is -317.182084168. Is this because of small kernel numbers as you mentioned? – Prokhozhii Oct 28 '17 at 21:45
  • @Prokhozhii What are the values of `wtds` from `wtds = np.exp(-(np.arange(length) - m)**2/dss)` for that set? – Divakar Oct 28 '17 at 21:47
  • They are correct in second application: array([ 0.00308872, 0.3753111 , 0.83527021]) – Prokhozhii Oct 28 '17 at 21:56
  • @Prokhozhii I know those are correct. I am asking for their values to get a sense of how they compare against values in `pnp_array`. If those are much smaller, then yes please use app#1. Okay, so they look very small, so yes please app#1 for such cases. It's the same precision issue I talked about. – Divakar Oct 28 '17 at 21:57
  • As far as I understand app#1 has error with indexes. It returns [ 0, 0, 5199.97996566, 5594.17030842]. But 4th element should be third and 5th element, which is absent, should be 4th. Can you help with this, please? – Prokhozhii Oct 28 '17 at 22:12
  • Maybe I'm wrong (I'm newbie in NumPy) but following code works correctly in my case: `def vectorized_app1(pnp_array, length, m, dss): wtds = np.exp( -(np.arange(length) - m)**2 / dss ) pnp_array3D = strided_axis0(pnp_array, len(wtds)) out = np.zeros(pnp_array.shape) out[length-1:] = np.tensordot( pnp_array3D , wtds , axes=( (1), (0) ) ) [:] out /= wtds.sum() return out` – Prokhozhii Oct 28 '17 at 22:51