Threading Layer#
oneDAL uses Intel® oneAPI Threading Building Blocks (Intel® oneTBB) to do parallel computations on CPU. oneTBB is not used in the code of oneDAL algorithms directly. The algorithms rather use custom primitives that either wrap oneTBB functionality or are in-house developed. Those primitives form oneDAL’s threading layer.
This is done in order not to be dependent on possible oneTBB API changes and even on the particular threading technology like oneTBB, C++11 standard threads, etc.
The API of the layer is defined in threading.h. Please be aware that the threading API is not a part of oneDAL product API. This is the product internal API that aimed to be used only by oneDAL developers, and can be changed at any time without any prior notification.
This chapter describes common parallel patterns and primitives of the threading layer.
threader_for#
Consider a case where you need to compute an elementwise sum of two arrays and store the results into another array. Here is a variant of sequential implementation:
void sum(const size_t n, const float* a, const float* b, float* c) {
for (size_t i = 0; i < n; ++i) {
c[i] = a[i] + b[i];
}
}
There are several options available in the threading layer of oneDAL to let the iterations of this code
run in parallel.
One of the options is to use daal::threader_for
as shown here:
#include "src/threading/threading.h"
void sum(const size_t n, const float* a, const float* b, float* c) {
daal::threader_for(n, n, [&](size_t i) {
c[i] = a[i] + b[i];
});
}
The iteration space here goes from 0
to n-1
.
The last argument is a function object that performs a single iteration of the loop, given loop index i
.
Blocking#
To have more control over the parallel execution and to increase cache locality oneDAL usually splits the data into blocks and then processes those blocks in parallel.
This code shows how a typical parallel loop in oneDAL looks like:
#include "src/threading/threading.h"
void sum(const size_t n, const float* a, const float* b, float* c) {
constexpr size_t blockSize = 256;
const size_t nBlocks = (n + blockSize - 1) / blockSize;
daal::threader_for(nBlocks, nBlocks, [&](size_t iBlock) {
const size_t iStart = iBlock * blockSize;
const size_t iEnd = (iBlock < (nBlocks - 1)) ? iStart + blockSize : n;
for (size_t i = iStart; i < iEnd; ++i) {
c[i] = a[i] + b[i];
}
});
}
Thread-local Storage (TLS)#
Consider you need to compute a dot product of two arrays. Here is a variant of sequential implementation:
float dot(const size_t n, const float* a, const float* b) {
float result = 0.0f;
for (size_t i = 0; i < n; ++i) {
result += a[i] * b[i];
}
return result;
}
Parallel computations can be performed in two steps:
Compute partial dot product in each thread.
Perform a reduction: Add the partial results from all threads to compute the final dot product.
daal::tls
provides a local storage where each thread can accumulate its local results.
The following code allocates memory that would store partial dot products for each thread:
#include "src/algorithms/service_error_handling.h"
#include "src/threading/threading.h"
SafeStatus safeStat;
daal::tls<float *> dotProductTLS([=, &safeStat]() {
float * dotProductPtr = new (std::nothrow) float;
if (!dotProductPtr) {
safeStat.add(services::ErrorMemoryAllocationFailed);
}
dotProductPtr[0] = 0.0f;
return dotProductPtr;
});
SafeStatus
in this code denotes a thread-safe counterpart of the Status
class.
SafeStatus
allows to collect errors from all threads and report them to the user using the
detach()
method. An example will be shown later in the documentation.
Checking the status right after the initialization code won’t show the allocation errors, because oneTBB uses lazy evaluation and the lambda function passed to the constructor of the TLS is evaluated on first use of the thread-local storage (TLS).
There are several options available in the threading layer of oneDAL to compute the partial
dot product results at each thread.
One of the options is to use the already mentioned daal::threader_for
and blocking approach
as shown here:
constexpr size_t blockSize = 1024;
const size_t nBlocks = (n + blockSize - 1) / blockSize;
daal::threader_for(nBlocks, nBlocks, [&](size_t iBlock) {
const size_t iStart = iBlock * blockSize;
const size_t iEnd = (iBlock < (nBlocks - 1)) ? iStart + blockSize : n;
// Compute partial result for this block
float partialDotProduct = 0.0f;
for (size_t i = iStart; i < iEnd; ++i) {
partialDotProduct += a[i] * b[i];
}
// Update thread-local result
float * localDotProduct = dotProductTLS.local();
if (!localDotProduct) {
// Allocation error happened earlier
return;
}
localDotProduct[0] += partialDotProduct;
});
DAAL_CHECK_SAFE_STATUS(); // if (!safeStat) return safeStat.detach();
To compute the final result it is required to reduce each thread’s partial results as shown here:
float dotProduct = 0.0f;
tls.reduce([&](float * localDotProduct) {
if (localDotProduct) {
dotProduct += localDotProduct[0];
delete localDotProduct;
}
});
Local memory of the threads should be released when it is no longer needed.
The complete parallel version of dot product computations would look like:
#include "src/algorithms/service_error_handling.h"
#include "src/threading/threading.h"
services::Status dot(const size_t n, const float* a, const float* b, float &dotProduct) {
SafeStatus safeStat;
daal::tls<float *> dotProductTLS([=, &safeStat]() {
float * dotProductPtr = new (std::nothrow) float;
if (!dotProductPtr) {
safeStat.add(services::ErrorMemoryAllocationFailed);
}
dotProductPtr[0] = 0.0f;
return dotProductPtr;
});
constexpr size_t blockSize = 1024;
const size_t nBlocks = (n + blockSize - 1) / blockSize;
daal::threader_for(nBlocks, nBlocks, [&](size_t iBlock) {
const size_t iStart = iBlock * blockSize;
const size_t iEnd = (iBlock < (nBlocks - 1)) ? iStart + blockSize : n;
// Compute partial result for this block
float partialDotProduct = 0.0f;
for (size_t i = iStart; i < iEnd; ++i) {
partialDotProduct += a[i] * b[i];
}
// Update thread-local result
float * localDotProduct = dotProductTLS.local();
if (!localDotProduct) {
// Allocation error happened earlier
return;
}
localDotProduct[0] += partialDotProduct;
});
DAAL_CHECK_SAFE_STATUS();
tls.reduce([&](float * localDotProduct) {
if (localDotProduct) {
dotProduct += localDotProduct[0];
delete localDotProduct;
}
});
return services::Status();
}
Static Work Scheduling#
By default, oneTBB uses dynamic work scheduling and work stealing. It means that two different runs of the same parallel loop can produce different mappings of the loop’s iteration space to the available threads. This strategy is beneficial when it is difficult to estimate the amount of work performed by each iteration.
In the cases when it is known that the iterations perform an equal amount of work, it is more performant to use predefined mapping of the loop’s iterations to threads. This is what static work scheduling does.
daal::static_threader_for
and daal::static_tls
allow implementation of static
work scheduling within oneDAL.
Here is a variant of parallel dot product computation with static scheduling:
#include "src/algorithms/service_error_handling.h"
#include "src/threading/threading.h"
services::Status dot(const size_t n, const float* a, const float* b, float &dotProduct) {
SafeStatus safeStat;
daal::static_tls<float *> dotProductTLS([=, &safeStat]() {
float * dotProductPtr = new (std::nothrow) float;
if (!dotProductPtr) {
safeStat.add(services::ErrorMemoryAllocationFailed);
}
dotProductPtr[0] = 0.0f;
return dotProductPtr;
});
constexpr size_t blockSize = 1024;
const size_t nBlocks = (n + blockSize - 1) / blockSize;
daal::static_threader_for(nBlocks, [&](size_t iBlock, size_t threadId) {
const size_t iStart = iBlock * blockSize;
const size_t iEnd = (iBlock < (nBlocks - 1)) ? iStart + blockSize : n;
// Compute partial result for this block
float partialDotProduct = 0.0f;
for (size_t i = iStart; i < iEnd; ++i) {
partialDotProduct += a[i] * b[i];
}
// Update thread-local result
// Note that exact thread index is used to get access to the thread's data
float * localDotProduct = dotProductTLS.local(threadId);
if (!localDotProduct) {
// Allocation error happened earlier
return;
}
localDotProduct[0] += partialDotProduct;
});
DAAL_CHECK_SAFE_STATUS();
tls.reduce([&](float * localDotProduct) {
if (localDotProduct) {
dotProduct += localDotProduct[0];
delete localDotProduct;
}
});
return services::Status();
}
Nested Parallelism#
oneDAL supports nested parallel loops. It is important to know that:
- “when a parallel construct calls another parallel construct, a thread can obtain a task
from the outer-level construct while waiting for completion of the inner-level one.”
In practice, this means that a thread-local variable might unexpectedly change its value after a nested parallel construct:
#include "src/algorithms/service_error_handling.h"
#include "src/threading/threading.h"
SafeStatus safeStat;
daal::tls<float *> tls([=, &safeStat]() {
float * localBuffer = new (std::nothrow) float[localSize];
if (!localBuffer) {
safeStat.add(services::ErrorMemoryAllocationFailed);
}
return localBuffer;
})
daal::threader_for(n, n, [&](size_t i) {
float * localBuffer = tls.local();
if (!localBuffer) {
// Allocation error happened earlier
return;
}
// Initialize localBuffer with some data here
daal::threader_for(m, m, [&](size_t j) {
/* Some work */
});
// While executing the above parallel_for, the thread might have run iterations
// of the outer parallel_for, and so might have changed the thread specific value.
assert(localBuffer == tls.local()); // The assertion may fail!
});
DAAL_CHECK_SAFE_STATUS()
tls.reduce([&](float * localBuffer) {
if (localBuffer) {
/* Do reduction */
delete localBuffer;
}
});
In some scenarios this can lead to deadlocks, segmentation faults and other issues.
oneTBB provides ways to isolate execution of a parallel construct, for its tasks to not interfere with other simultaneously running tasks.
Those options are preferred when the parallel loops are initially written as nested. But in oneDAL there are cases when one parallel algorithm, the outer one, calls another parallel algorithm, the inner one, within a parallel region.
The inner algorithm in this case can also be called solely, without additional nesting. And we do not always want to make it isolated.
For the cases like that, oneDAL provides daal::ls
. Its local()
method always
returns the same value for the same thread, regardless of the nested execution:
#include "src/algorithms/service_error_handling.h"
#include "src/threading/threading.h"
SafeStatus safeStat;
daal::ls<float *> ls([=, &safeStat]() {
float * localBuffer = new (std::nothrow) float[localSize];
if (!localBuffer) {
safeStat.add(services::ErrorMemoryAllocationFailed);
}
return localBuffer;
})
daal::threader_for(n, n, [&](size_t i) {
float * localBuffer = ls.local();
if (!localBuffer) {
// Allocation error happened earlier
return;
}
// Initialize localBuffer with some data here
daal::threader_for(m, m, [&](size_t j) {
/* Some work */
});
// The thread specific value always stays unchanged after the nested execution.
assert(localBuffer == ls.local()); // Assertion never fails!
});
DAAL_CHECK_SAFE_STATUS()
ls.reduce([&](float * localBuffer) {
if (localBuffer) {
/* Do reduction */
delete localBuffer;
}
});