JackOLantern - 17 days ago 6
C Question

# Accelerating FFTW pruning

Suppose that I have a sequence

`x(n)`
which is
`K * N`
long and that only the first
`N`
elements are different from zero. I want to calculate the FFT, by FFTW, of such a sequence. This is equivalent to having a sequence of length
`N`
and having a zero padding to
`K * N`
. Since
`N`
and
`K`
may be "large", I have a significant zero padding. I'm exploring if I can save some computation time avoid explicit zero padding.

The case
`K = 2`

Let us begin by considering the case
`K = 2`
. In this case, the DFT of
`x(n)`
can be written as

If
`k`
is even, namely
`k = 2 * m`
, then

which means that such values of the DFT can be calculated through an FFT of a sequence of length
`N`
, and not
`K * N`
.

If
`k`
is odd, namely
`k = 2 * m + 1`
, then

which means that such values of the DFT can be again calculated through an FFT of a sequence of length
`N`
, and not
`K * N`
.

So, in conclusion, I can exchange a single FFT of length
`2 * N`
with
`2`
FFTs of length
`N`
.

The case of arbitrary
`K`

In this case, we have

On writing
`k = m * K + t`
, we have

So, in conclusion, I can exchange a single FFT of length
`K * N`
with
`K`
FFTs of length
`N`
. Since the FFTW has
`fftw_plan_many_dft`
, I can expect to have some gaining against the case of a single FFT.

To verify that, I have set up the following code

``````#include <stdio.h>
#include <stdlib.h>     /* srand, rand */
#include <time.h>       /* time */
#include <math.h>
#include <fstream>

#include <fftw3.h>

#include "TimingCPU.h"

#define PI_d            3.141592653589793

void main() {

const int N = 100000;
const int K = 20;

fftw_plan plan_zp;

fftw_complex *h_x = (fftw_complex *)malloc(N     * sizeof(fftw_complex));
fftw_complex *h_xzp = (fftw_complex *)calloc(N * K, sizeof(fftw_complex));
fftw_complex *h_xpruning = (fftw_complex *)malloc(N * K * sizeof(fftw_complex));
fftw_complex *h_xhatpruning = (fftw_complex *)malloc(N * K * sizeof(fftw_complex));
fftw_complex *h_xhatpruning_temp = (fftw_complex *)malloc(N * K * sizeof(fftw_complex));
fftw_complex *h_xhat = (fftw_complex *)malloc(N * K * sizeof(fftw_complex));

// --- Random number generation of the data sequence
srand(time(NULL));
for (int k = 0; k < N; k++) {
h_x[k][0] = (double)rand() / (double)RAND_MAX;
h_x[k][1] = (double)rand() / (double)RAND_MAX;
}

memcpy(h_xzp, h_x, N * sizeof(fftw_complex));

plan_zp = fftw_plan_dft_1d(N * K, h_xzp, h_xhat, FFTW_FORWARD, FFTW_ESTIMATE);
fftw_plan plan_pruning = fftw_plan_many_dft(1, &N, K, h_xpruning, NULL, 1, N, h_xhatpruning_temp, NULL, 1, N, FFTW_FORWARD, FFTW_ESTIMATE);

TimingCPU timerCPU;
timerCPU.StartCounter();
fftw_execute(plan_zp);

timerCPU.StartCounter();
double factor = -2. * PI_d / (K * N);
for (int k = 0; k < K; k++) {
double arg1 = factor * k;
for (int n = 0; n < N; n++) {
double arg = arg1 * n;
double cosarg = cos(arg);
double sinarg = sin(arg);
h_xpruning[k * N + n][0] = h_x[n][0] * cosarg - h_x[n][1] * sinarg;
h_xpruning[k * N + n][1] = h_x[n][0] * sinarg + h_x[n][1] * cosarg;
}
}
printf("Optimized first step %f\n", timerCPU.GetCounter());

timerCPU.StartCounter();
fftw_execute(plan_pruning);
printf("Optimized second step %f\n", timerCPU.GetCounter());
timerCPU.StartCounter();
for (int k = 0; k < K; k++) {
for (int p = 0; p < N; p++) {
h_xhatpruning[p * K + k][0] = h_xhatpruning_temp[p + k * N][0];
h_xhatpruning[p * K + k][1] = h_xhatpruning_temp[p + k * N][1];
}
}
printf("Optimized third step %f\n", timerCPU.GetCounter());

double rmserror = 0., norm = 0.;
for (int n = 0; n < N; n++) {
rmserror = rmserror + (h_xhatpruning[n][0] - h_xhat[n][0]) * (h_xhatpruning[n][0] - h_xhat[n][0]) + (h_xhatpruning[n][1] - h_xhat[n][1]) * (h_xhatpruning[n][1] - h_xhat[n][1]);
norm = norm + h_xhat[n][0] * h_xhat[n][0] + h_xhat[n][1] * h_xhat[n][1];
}
printf("rmserror %f\n", 100. * sqrt(rmserror / norm));

fftw_destroy_plan(plan_zp);

}
``````

The approach I have developed consists of three steps:

1. Multiplying the input sequence by "twiddle" complex exponentials;

2. Performing the
`fftw_many`
;

3. Reorganizing the results.

I have the following timing:

``````Stadard 53.121002
Optimized first step 27.445898
Optimized second step 23.004223
Optimized third step 21.770467
``````

The
`fftw_many`
indeed is faster than the single FFTW on
`K * N`
input points. However, steps #1 and #3 completely destroy such a gain. I would expect that steps #1 and #3 be computationally much lighter than step #2.

My questions are:

1. How is it possible that steps #1 and #3 a so computationally more demanding than step #2?

2. How can I improve steps #1 and #3 to have a net gain against the "standard" approach?

Thank you very much for any hint.

EDIT #1

I'm working with Visual Studio 2013 and compiling in Release mode.

EDIT #2

I have changed steps #1 and #3 using OpenMP as

``````timerCPU.StartCounter();
double factor = -2. * PI_d / (K * N);
int n;
omp_set_nested(1);
#pragma omp parallel for private(n) num_threads(4)
for (int k = 0; k < K; k++) {
double arg1 = factor * k;
for (n = 0; n < N; n++) {
double arg = arg1 * n;
double cosarg = cos(arg);
double sinarg = sin(arg);
h_xpruning[k * N + n][0] = h_x[n][0] * cosarg - h_x[n][1] * sinarg;
h_xpruning[k * N + n][1] = h_x[n][0] * sinarg + h_x[n][1] * cosarg;
}
}
printf("Optimized first step %f\n", timerCPU.GetCounter());
``````

and

``````timerCPU.StartCounter();
fftw_execute(plan_pruning);
printf("Optimized second step %f\n", timerCPU.GetCounter());

timerCPU.StartCounter();
int p;
omp_set_nested(1);
#pragma omp parallel for private(p) num_threads(4)
for (int k = 0; k < K; k++) {
for (int p = 0; p < N; p++) {
h_xhatpruning[p * K + k][0] = h_xhatpruning_temp[p + k * N][0];
h_xhatpruning[p * K + k][1] = h_xhatpruning_temp[p + k * N][1];
}
}
printf("Optimized third step %f\n", timerCPU.GetCounter());
``````

respectively. Now, my timings are

``````Stadard 52.553050
Optimized first step 7.318480
Optimized second step 24.120965
Optimized third step 16.504329
rmserror 0.000000
``````

Accordingly, now I have a net gain, but I'm feeling like I'm make an unfair comparison: the two FFTWs are run sequentially, while the
`for`

EDIT #3
Following Paul R's answer, I have changed the last step to

``````for (int p = 0; p < N; p++) {
for (int k = 0; k < K; k++) {
h_xhatpruning[p * K + k][0] = h_xhatpruning_temp[p + k * N][0];
h_xhatpruning[p * K + k][1] = h_xhatpruning_temp[p + k * N][1];
}
}
``````

For the case

``````const int N = 100000;
const int K = 100;
``````

I have the following timings:

`````` 50.86ms for the new version
101.13ms for my old version
``````

So, Paul R's suggestion has halved the computation time of the third step. Unfortunately, unrolling the loop does not help further improving the result.

For the third step you might want to try switching the order of the loops:

``````for (int p = 0; p < N; p++) {
for (int k = 0; k < K; k++) {
h_xhatpruning[p * K + k][0] = h_xhatpruning_temp[p + k * N][0];
h_xhatpruning[p * K + k][1] = h_xhatpruning_temp[p + k * N][1];
}
}
``````

Either way you have a cache-unfriendly access pattern though. You could try working with blocks to improve this, e.g. assuming N is a multiple of 4:

``````for (int p = 0; p < N; p += 4) {
for (int k = 0; k < K; k++) {
for (int p0 = 0; p0 < 4; p0++) {
h_xhatpruning[(p + p0) * K + k][0] = h_xhatpruning_temp[(p + p0) + k * N][0];
h_xhatpruning[(p + p0) * K + k][1] = h_xhatpruning_temp[(p + p0) + k * N][1];
}
}
}
``````

This should help to reduce the churn of cache lines somewhat. If it does then maybe also experiment with block sizes other than 4 to see if there is a "sweet spot".

Source (Stackoverflow)