Threading Layer#

oneDAL uses Intel® oneAPI Threading Building Blocks (Intel® oneTBB) to do parallel computations on CPU. oneTBB is not used in the code of oneDAL algorithms directly. The algorithms rather use custom primitives that either wrap oneTBB functionality or are in-house developed. Those primitives form oneDAL’s threading layer.

This is done in order not to be dependent on possible oneTBB API changes and even on the particular threading technology like oneTBB, C++11 standard threads, etc.

The API of the layer is defined in threading.h. Please be aware that the threading API is not a part of oneDAL product API. This is the product internal API that aimed to be used only by oneDAL developers, and can be changed at any time without any prior notification.

This chapter describes common parallel patterns and primitives of the threading layer.

threader_for#

Consider a case where you need to compute an elementwise sum of two arrays and store the results into another array. Here is a variant of sequential implementation:

void sum(const size_t n, const float* a, const float* b, float* c) {
   for (size_t i = 0; i < n; ++i) {
     c[i] = a[i] + b[i];
   }
}

There are several options available in the threading layer of oneDAL to let the iterations of this code run in parallel. One of the options is to use daal::threader_for as shown here:

#include "src/threading/threading.h"

void sum(const size_t n, const float* a, const float* b, float* c) {
   daal::threader_for(n, n, [&](size_t i) {
      c[i] = a[i] + b[i];
   });
}

The iteration space here goes from 0 to n-1. The last argument is a function object that performs a single iteration of the loop, given loop index i.

Blocking#

To have more control over the parallel execution and to increase cache locality oneDAL usually splits the data into blocks and then processes those blocks in parallel.

This code shows how a typical parallel loop in oneDAL looks like:

#include "src/threading/threading.h"

void sum(const size_t n, const float* a, const float* b, float* c) {
   constexpr size_t blockSize = 256;
   const size_t nBlocks = (n + blockSize - 1) / blockSize;

   daal::threader_for(nBlocks, nBlocks, [&](size_t iBlock) {
      const size_t iStart = iBlock * blockSize;
      const size_t iEnd = (iBlock < (nBlocks - 1)) ? iStart + blockSize : n;
      for (size_t i = iStart; i < iEnd; ++i) {
         c[i] = a[i] + b[i];
      }
   });
}

Thread-local Storage (TLS)#

Consider you need to compute a dot product of two arrays. Here is a variant of sequential implementation:

float dot(const size_t n, const float* a, const float* b) {
   float result = 0.0f;
   for (size_t i = 0; i < n; ++i) {
      result += a[i] * b[i];
   }
   return result;
}

Parallel computations can be performed in two steps:

  1. Compute partial dot product in each thread.

  2. Perform a reduction: Add the partial results from all threads to compute the final dot product.

daal::tls provides a local storage where each thread can accumulate its local results. The following code allocates memory that would store partial dot products for each thread:

#include "src/algorithms/service_error_handling.h"
#include "src/threading/threading.h"

SafeStatus safeStat;
daal::tls<float *> dotProductTLS([=, &safeStat]() {
   float * dotProductPtr = new (std::nothrow) float;
   if (!dotProductPtr) {
      safeStat.add(services::ErrorMemoryAllocationFailed);
   }
   dotProductPtr[0] = 0.0f;
   return dotProductPtr;
});

SafeStatus in this code denotes a thread-safe counterpart of the Status class. SafeStatus allows to collect errors from all threads and report them to the user using the detach() method. An example will be shown later in the documentation.

Checking the status right after the initialization code won’t show the allocation errors, because oneTBB uses lazy evaluation and the lambda function passed to the constructor of the TLS is evaluated on first use of the thread-local storage (TLS).

There are several options available in the threading layer of oneDAL to compute the partial dot product results at each thread. One of the options is to use the already mentioned daal::threader_for and blocking approach as shown here:

constexpr size_t blockSize = 1024;
const size_t nBlocks = (n + blockSize - 1) / blockSize;

daal::threader_for(nBlocks, nBlocks, [&](size_t iBlock) {
   const size_t iStart = iBlock * blockSize;
   const size_t iEnd = (iBlock < (nBlocks - 1)) ? iStart + blockSize : n;

   // Compute partial result for this block
   float partialDotProduct = 0.0f;
   for (size_t i = iStart; i < iEnd; ++i) {
      partialDotProduct += a[i] * b[i];
   }

   // Update thread-local result
   float * localDotProduct = dotProductTLS.local();
   if (!localDotProduct) {
      // Allocation error happened earlier
      return;
   }
   localDotProduct[0] += partialDotProduct;
});
DAAL_CHECK_SAFE_STATUS();  // if (!safeStat) return safeStat.detach();

To compute the final result it is required to reduce each thread’s partial results as shown here:

float dotProduct = 0.0f;
tls.reduce([&](float * localDotProduct) {
  if (localDotProduct) {
    dotProduct += localDotProduct[0];
    delete localDotProduct;
  }
});

Local memory of the threads should be released when it is no longer needed.

The complete parallel version of dot product computations would look like:

#include "src/algorithms/service_error_handling.h"
#include "src/threading/threading.h"

services::Status dot(const size_t n, const float* a, const float* b, float &dotProduct) {
   SafeStatus safeStat;
   daal::tls<float *> dotProductTLS([=, &safeStat]() {
      float * dotProductPtr = new (std::nothrow) float;
      if (!dotProductPtr) {
         safeStat.add(services::ErrorMemoryAllocationFailed);
      }
      dotProductPtr[0] = 0.0f;
      return dotProductPtr;
   });

   constexpr size_t blockSize = 1024;
   const size_t nBlocks = (n + blockSize - 1) / blockSize;

   daal::threader_for(nBlocks, nBlocks, [&](size_t iBlock) {
      const size_t iStart = iBlock * blockSize;
      const size_t iEnd = (iBlock < (nBlocks - 1)) ? iStart + blockSize : n;

      // Compute partial result for this block
      float partialDotProduct = 0.0f;
      for (size_t i = iStart; i < iEnd; ++i) {
         partialDotProduct += a[i] * b[i];
      }

      // Update thread-local result
      float * localDotProduct = dotProductTLS.local();
      if (!localDotProduct) {
         // Allocation error happened earlier
         return;
      }
      localDotProduct[0] += partialDotProduct;
   });
   DAAL_CHECK_SAFE_STATUS();

   tls.reduce([&](float * localDotProduct) {
      if (localDotProduct) {
         dotProduct += localDotProduct[0];
         delete localDotProduct;
      }
   });
   return services::Status();
}

Static Work Scheduling#

By default, oneTBB uses dynamic work scheduling and work stealing. It means that two different runs of the same parallel loop can produce different mappings of the loop’s iteration space to the available threads. This strategy is beneficial when it is difficult to estimate the amount of work performed by each iteration.

In the cases when it is known that the iterations perform an equal amount of work, it is more performant to use predefined mapping of the loop’s iterations to threads. This is what static work scheduling does.

daal::static_threader_for and daal::static_tls allow implementation of static work scheduling within oneDAL.

Here is a variant of parallel dot product computation with static scheduling:

#include "src/algorithms/service_error_handling.h"
#include "src/threading/threading.h"

services::Status dot(const size_t n, const float* a, const float* b, float &dotProduct) {
   SafeStatus safeStat;
   daal::static_tls<float *> dotProductTLS([=, &safeStat]() {
      float * dotProductPtr = new (std::nothrow) float;
      if (!dotProductPtr) {
         safeStat.add(services::ErrorMemoryAllocationFailed);
      }
      dotProductPtr[0] = 0.0f;
      return dotProductPtr;
   });

   constexpr size_t blockSize = 1024;
   const size_t nBlocks = (n + blockSize - 1) / blockSize;

   daal::static_threader_for(nBlocks, [&](size_t iBlock, size_t threadId) {
      const size_t iStart = iBlock * blockSize;
      const size_t iEnd = (iBlock < (nBlocks - 1)) ? iStart + blockSize : n;

      // Compute partial result for this block
      float partialDotProduct = 0.0f;
      for (size_t i = iStart; i < iEnd; ++i) {
         partialDotProduct += a[i] * b[i];
      }

      // Update thread-local result
      // Note that exact thread index is used to get access to the thread's data
      float * localDotProduct = dotProductTLS.local(threadId);
      if (!localDotProduct) {
         // Allocation error happened earlier
         return;
      }
      localDotProduct[0] += partialDotProduct;
   });
   DAAL_CHECK_SAFE_STATUS();

   tls.reduce([&](float * localDotProduct) {
      if (localDotProduct) {
         dotProduct += localDotProduct[0];
         delete localDotProduct;
      }
   });
   return services::Status();
}

Nested Parallelism#

oneDAL supports nested parallel loops. It is important to know that:

“when a parallel construct calls another parallel construct, a thread can obtain a task

from the outer-level construct while waiting for completion of the inner-level one.”

oneTBB documentation

In practice, this means that a thread-local variable might unexpectedly change its value after a nested parallel construct:

#include "src/algorithms/service_error_handling.h"
#include "src/threading/threading.h"

SafeStatus safeStat;
daal::tls<float *> tls([=, &safeStat]() {
   float * localBuffer = new (std::nothrow) float[localSize];
   if (!localBuffer) {
      safeStat.add(services::ErrorMemoryAllocationFailed);
   }
   return localBuffer;
})
daal::threader_for(n, n, [&](size_t i) {
   float * localBuffer = tls.local();
   if (!localBuffer) {
      // Allocation error happened earlier
      return;
   }

   // Initialize localBuffer with some data here

   daal::threader_for(m, m, [&](size_t j) {
      /* Some work */
   });

   // While executing the above parallel_for, the thread might have run iterations
   // of the outer parallel_for, and so might have changed the thread specific value.
   assert(localBuffer == tls.local()); // The assertion may fail!
});
DAAL_CHECK_SAFE_STATUS()

tls.reduce([&](float * localBuffer) {
   if (localBuffer) {
      /* Do reduction */
      delete localBuffer;
   }
});

In some scenarios this can lead to deadlocks, segmentation faults and other issues.

oneTBB provides ways to isolate execution of a parallel construct, for its tasks to not interfere with other simultaneously running tasks.

Those options are preferred when the parallel loops are initially written as nested. But in oneDAL there are cases when one parallel algorithm, the outer one, calls another parallel algorithm, the inner one, within a parallel region.

The inner algorithm in this case can also be called solely, without additional nesting. And we do not always want to make it isolated.

For the cases like that, oneDAL provides daal::ls. Its local() method always returns the same value for the same thread, regardless of the nested execution:

#include "src/algorithms/service_error_handling.h"
#include "src/threading/threading.h"

SafeStatus safeStat;
daal::ls<float *> ls([=, &safeStat]() {
   float * localBuffer = new (std::nothrow) float[localSize];
   if (!localBuffer) {
      safeStat.add(services::ErrorMemoryAllocationFailed);
   }
   return localBuffer;
})
daal::threader_for(n, n, [&](size_t i) {
   float * localBuffer = ls.local();
   if (!localBuffer) {
      // Allocation error happened earlier
      return;
   }

   // Initialize localBuffer with some data here

   daal::threader_for(m, m, [&](size_t j) {
      /* Some work */
   });

   // The thread specific value always stays unchanged after the nested execution.
   assert(localBuffer == ls.local()); // Assertion never fails!
});
DAAL_CHECK_SAFE_STATUS()

ls.reduce([&](float * localBuffer) {
   if (localBuffer) {
      /* Do reduction */
      delete localBuffer;
   }
});