JackOLantern - 1 month ago 9

C Question

Suppose that I have a sequence

`x(n)`

`K * N`

`N`

`N`

`K * N`

`N`

`K`

`K = 2`

Let us begin by considering the case

`K = 2`

`x(n)`

If

`k`

`k = 2 * m`

which means that such values of the DFT can be calculated through an FFT of a sequence of length

`N`

`K * N`

If

`k`

`k = 2 * m + 1`

which means that such values of the DFT can be again calculated through an FFT of a sequence of length

`N`

`K * N`

So, in conclusion, I can exchange a single FFT of length

`2 * N`

`2`

`N`

`K`

In this case, we have

On writing

`k = m * K + t`

So, in conclusion, I can exchange a single FFT of length

`K * N`

`K`

`N`

`fftw_plan_many_dft`

To verify that, I have set up the following code

`#include <stdio.h>`

#include <stdlib.h> /* srand, rand */

#include <time.h> /* time */

#include <math.h>

#include <fstream>

#include <fftw3.h>

#include "TimingCPU.h"

#define PI_d 3.141592653589793

void main() {

const int N = 100000;

const int K = 20;

fftw_plan plan_zp;

fftw_complex *h_x = (fftw_complex *)malloc(N * sizeof(fftw_complex));

fftw_complex *h_xzp = (fftw_complex *)calloc(N * K, sizeof(fftw_complex));

fftw_complex *h_xpruning = (fftw_complex *)malloc(N * K * sizeof(fftw_complex));

fftw_complex *h_xhatpruning = (fftw_complex *)malloc(N * K * sizeof(fftw_complex));

fftw_complex *h_xhatpruning_temp = (fftw_complex *)malloc(N * K * sizeof(fftw_complex));

fftw_complex *h_xhat = (fftw_complex *)malloc(N * K * sizeof(fftw_complex));

// --- Random number generation of the data sequence

srand(time(NULL));

for (int k = 0; k < N; k++) {

h_x[k][0] = (double)rand() / (double)RAND_MAX;

h_x[k][1] = (double)rand() / (double)RAND_MAX;

}

memcpy(h_xzp, h_x, N * sizeof(fftw_complex));

plan_zp = fftw_plan_dft_1d(N * K, h_xzp, h_xhat, FFTW_FORWARD, FFTW_ESTIMATE);

fftw_plan plan_pruning = fftw_plan_many_dft(1, &N, K, h_xpruning, NULL, 1, N, h_xhatpruning_temp, NULL, 1, N, FFTW_FORWARD, FFTW_ESTIMATE);

TimingCPU timerCPU;

timerCPU.StartCounter();

fftw_execute(plan_zp);

printf("Stadard %f\n", timerCPU.GetCounter());

timerCPU.StartCounter();

double factor = -2. * PI_d / (K * N);

for (int k = 0; k < K; k++) {

double arg1 = factor * k;

for (int n = 0; n < N; n++) {

double arg = arg1 * n;

double cosarg = cos(arg);

double sinarg = sin(arg);

h_xpruning[k * N + n][0] = h_x[n][0] * cosarg - h_x[n][1] * sinarg;

h_xpruning[k * N + n][1] = h_x[n][0] * sinarg + h_x[n][1] * cosarg;

}

}

printf("Optimized first step %f\n", timerCPU.GetCounter());

timerCPU.StartCounter();

fftw_execute(plan_pruning);

printf("Optimized second step %f\n", timerCPU.GetCounter());

timerCPU.StartCounter();

for (int k = 0; k < K; k++) {

for (int p = 0; p < N; p++) {

h_xhatpruning[p * K + k][0] = h_xhatpruning_temp[p + k * N][0];

h_xhatpruning[p * K + k][1] = h_xhatpruning_temp[p + k * N][1];

}

}

printf("Optimized third step %f\n", timerCPU.GetCounter());

double rmserror = 0., norm = 0.;

for (int n = 0; n < N; n++) {

rmserror = rmserror + (h_xhatpruning[n][0] - h_xhat[n][0]) * (h_xhatpruning[n][0] - h_xhat[n][0]) + (h_xhatpruning[n][1] - h_xhat[n][1]) * (h_xhatpruning[n][1] - h_xhat[n][1]);

norm = norm + h_xhat[n][0] * h_xhat[n][0] + h_xhat[n][1] * h_xhat[n][1];

}

printf("rmserror %f\n", 100. * sqrt(rmserror / norm));

fftw_destroy_plan(plan_zp);

}

The approach I have developed consists of three steps:

- Multiplying the input sequence by "twiddle" complex exponentials;
- Performing the ;
`fftw_many`

- Reorganizing the results.

I have the following timing:

`Stadard 53.121002`

Optimized first step 27.445898

Optimized second step 23.004223

Optimized third step 21.770467

The

`fftw_many`

`K * N`

My questions are:

- How is it possible that steps #1 and #3 a so computationally more demanding than step #2?
- How can I improve steps #1 and #3 to have a net gain against the "standard" approach?

Thank you very much for any hint.

I'm working with Visual Studio 2013 and compiling in Release mode.

I have changed steps #1 and #3 using OpenMP as

`timerCPU.StartCounter();`

double factor = -2. * PI_d / (K * N);

int n;

omp_set_nested(1);

#pragma omp parallel for private(n) num_threads(4)

for (int k = 0; k < K; k++) {

double arg1 = factor * k;

#pragma omp parallel for num_threads(4)

for (n = 0; n < N; n++) {

double arg = arg1 * n;

double cosarg = cos(arg);

double sinarg = sin(arg);

h_xpruning[k * N + n][0] = h_x[n][0] * cosarg - h_x[n][1] * sinarg;

h_xpruning[k * N + n][1] = h_x[n][0] * sinarg + h_x[n][1] * cosarg;

}

}

printf("Optimized first step %f\n", timerCPU.GetCounter());

and

`timerCPU.StartCounter();`

fftw_execute(plan_pruning);

printf("Optimized second step %f\n", timerCPU.GetCounter());

timerCPU.StartCounter();

int p;

omp_set_nested(1);

#pragma omp parallel for private(p) num_threads(4)

for (int k = 0; k < K; k++) {

#pragma omp parallel for num_threads(4)

for (int p = 0; p < N; p++) {

h_xhatpruning[p * K + k][0] = h_xhatpruning_temp[p + k * N][0];

h_xhatpruning[p * K + k][1] = h_xhatpruning_temp[p + k * N][1];

}

}

printf("Optimized third step %f\n", timerCPU.GetCounter());

respectively. Now, my timings are

`Stadard 52.553050`

Optimized first step 7.318480

Optimized second step 24.120965

Optimized third step 16.504329

rmserror 0.000000

Accordingly, now I have a net gain, but I'm feeling like I'm make an unfair comparison: the two FFTWs are run sequentially, while the

`for`

Following Paul R's answer, I have changed the last step to

`for (int p = 0; p < N; p++) {`

for (int k = 0; k < K; k++) {

h_xhatpruning[p * K + k][0] = h_xhatpruning_temp[p + k * N][0];

h_xhatpruning[p * K + k][1] = h_xhatpruning_temp[p + k * N][1];

}

}

For the case

`const int N = 100000;`

const int K = 100;

I have the following timings:

`50.86ms for the new version`

101.13ms for my old version

So, Paul R's suggestion has halved the computation time of the third step. Unfortunately, unrolling the loop does not help further improving the result.

Answer

For the third step you might want to try switching the order of the loops:

```
for (int p = 0; p < N; p++) {
for (int k = 0; k < K; k++) {
h_xhatpruning[p * K + k][0] = h_xhatpruning_temp[p + k * N][0];
h_xhatpruning[p * K + k][1] = h_xhatpruning_temp[p + k * N][1];
}
}
```

since it's generally more beneficial to have the store addresses be contiguous than the load addresses.

Either way you have a cache-unfriendly access pattern though. You could try working with blocks to improve this, e.g. assuming N is a multiple of 4:

```
for (int p = 0; p < N; p += 4) {
for (int k = 0; k < K; k++) {
for (int p0 = 0; p0 < 4; p0++) {
h_xhatpruning[(p + p0) * K + k][0] = h_xhatpruning_temp[(p + p0) + k * N][0];
h_xhatpruning[(p + p0) * K + k][1] = h_xhatpruning_temp[(p + p0) + k * N][1];
}
}
}
```

This should help to reduce the churn of cache lines somewhat. If it does then maybe also experiment with block sizes other than 4 to see if there is a "sweet spot".