546 lines
19 KiB
C++
546 lines
19 KiB
C++
/*
|
|
* Copyright (C) 2016 The Android Open Source Project
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
#ifndef TNT_UTILS_JOBSYSTEM_H
|
|
#define TNT_UTILS_JOBSYSTEM_H
|
|
|
|
#include <assert.h>
|
|
|
|
#include <atomic>
|
|
#include <functional>
|
|
#include <thread>
|
|
#include <vector>
|
|
|
|
#include <tsl/robin_map.h>
|
|
|
|
#include <utils/Allocator.h>
|
|
#include <utils/architecture.h>
|
|
#include <utils/compiler.h>
|
|
#include <utils/Condition.h>
|
|
#include <utils/Log.h>
|
|
#include <utils/memalign.h>
|
|
#include <utils/Mutex.h>
|
|
#include <utils/Slice.h>
|
|
#include <utils/WorkStealingDequeue.h>
|
|
|
|
namespace utils {
|
|
|
|
class JobSystem {
|
|
static constexpr size_t MAX_JOB_COUNT = 16384;
|
|
static_assert(MAX_JOB_COUNT <= 0x7FFE, "MAX_JOB_COUNT must be <= 0x7FFE");
|
|
using WorkQueue = WorkStealingDequeue<uint16_t, MAX_JOB_COUNT>;
|
|
|
|
public:
|
|
class Job;
|
|
|
|
using JobFunc = void(*)(void*, JobSystem&, Job*);
|
|
|
|
class alignas(CACHELINE_SIZE) Job {
|
|
public:
|
|
Job() noexcept {} /* = default; */ /* clang bug */ // NOLINT(modernize-use-equals-default,cppcoreguidelines-pro-type-member-init)
|
|
Job(const Job&) = delete;
|
|
Job(Job&&) = delete;
|
|
|
|
private:
|
|
friend class JobSystem;
|
|
|
|
// Size is chosen so that we can store at least std::function<>
|
|
// the alignas() qualifier ensures we're multiple of a cache-line.
|
|
static constexpr size_t JOB_STORAGE_SIZE_BYTES =
|
|
sizeof(std::function<void()>) > 48 ? sizeof(std::function<void()>) : 48;
|
|
static constexpr size_t JOB_STORAGE_SIZE_WORDS =
|
|
(JOB_STORAGE_SIZE_BYTES + sizeof(void*) - 1) / sizeof(void*);
|
|
|
|
// keep it first, so it's correctly aligned with all architectures
|
|
// this is were we store the job's data, typically a std::function<>
|
|
// v7 | v8
|
|
void* storage[JOB_STORAGE_SIZE_WORDS]; // 48 | 48
|
|
JobFunc function; // 4 | 8
|
|
uint16_t parent; // 2 | 2
|
|
std::atomic<uint16_t> runningJobCount = { 1 }; // 2 | 2
|
|
mutable std::atomic<uint16_t> refCount = { 1 }; // 2 | 2
|
|
// 6 | 2 (padding)
|
|
// 64 | 64
|
|
};
|
|
|
|
explicit JobSystem(size_t threadCount = 0, size_t adoptableThreadsCount = 1) noexcept;
|
|
|
|
~JobSystem();
|
|
|
|
// Make the current thread part of the thread pool.
|
|
void adopt();
|
|
|
|
// Remove this adopted thread from the parent. This is intended to be used for
|
|
// shutting down a JobSystem. In particular, this doesn't allow the parent to
|
|
// adopt more thread.
|
|
void emancipate();
|
|
|
|
|
|
// If a parent is not specified when creating a job, that job will automatically take the
|
|
// root job as a parent.
|
|
// The root job is reset when waited on.
|
|
Job* setRootJob(Job* job) noexcept { return mRootJob = job; }
|
|
|
|
// use setRootJob() instead
|
|
UTILS_DEPRECATED
|
|
Job* setMasterJob(Job* job) noexcept { return setRootJob(job); }
|
|
|
|
|
|
Job* create(Job* parent, JobFunc func) noexcept;
|
|
|
|
// NOTE: All methods below must be called from the same thread and that thread must be
|
|
// owned by JobSystem's thread pool.
|
|
|
|
/*
|
|
* Job creation examples:
|
|
* ----------------------
|
|
*
|
|
* struct Functor {
|
|
* uintptr_t storage[6];
|
|
* void operator()(JobSystem&, Jobsystem::Job*);
|
|
* } functor;
|
|
*
|
|
* struct Foo {
|
|
* uintptr_t storage[6];
|
|
* void method(JobSystem&, Jobsystem::Job*);
|
|
* } foo;
|
|
*
|
|
* Functor and Foo size muse be <= uintptr_t[6]
|
|
*
|
|
* createJob()
|
|
* createJob(parent)
|
|
* createJob<Foo, &Foo::method>(parent, &foo)
|
|
* createJob<Foo, &Foo::method>(parent, foo)
|
|
* createJob<Foo, &Foo::method>(parent, std::ref(foo))
|
|
* createJob(parent, functor)
|
|
* createJob(parent, std::ref(functor))
|
|
* createJob(parent, [ up-to 6 uintptr_t ](JobSystem*, Jobsystem::Job*){ })
|
|
*
|
|
* Utility functions:
|
|
* ------------------
|
|
* These are less efficient, but handle any size objects using the heap if needed.
|
|
* (internally uses std::function<>), and don't require the callee to take
|
|
* a (JobSystem&, Jobsystem::Job*) as parameter.
|
|
*
|
|
* struct BigFoo {
|
|
* uintptr_t large[16];
|
|
* void operator()();
|
|
* void method(int answerToEverything);
|
|
* static void exec(BigFoo&) { }
|
|
* } bigFoo;
|
|
*
|
|
* jobs::createJob(js, parent, [ any-capture ](int answerToEverything){}, 42);
|
|
* jobs::createJob(js, parent, &BigFoo::method, &bigFoo, 42);
|
|
* jobs::createJob(js, parent, &BigFoo::exec, std::ref(bigFoo));
|
|
* jobs::createJob(js, parent, bigFoo);
|
|
* jobs::createJob(js, parent, std::ref(bigFoo));
|
|
* etc...
|
|
*
|
|
* struct SmallFunctor {
|
|
* uintptr_t storage[3];
|
|
* void operator()(T* data, size_t count);
|
|
* } smallFunctor;
|
|
*
|
|
* jobs::parallel_for(js, data, count, [ up-to 3 uintptr_t ](T* data, size_t count) { });
|
|
* jobs::parallel_for(js, data, count, smallFunctor);
|
|
* jobs::parallel_for(js, data, count, std::ref(smallFunctor));
|
|
*
|
|
*/
|
|
|
|
// creates an empty (no-op) job with an optional parent
|
|
Job* createJob(Job* parent = nullptr) noexcept {
|
|
return create(parent, nullptr);
|
|
}
|
|
|
|
// creates a job from a KNOWN method pointer w/ object passed by pointer
|
|
// the caller must ensure the object will outlive the Job
|
|
template<typename T, void(T::*method)(JobSystem&, Job*)>
|
|
Job* createJob(Job* parent, T* data) noexcept {
|
|
Job* job = create(parent, [](void* user, JobSystem& js, Job* job) {
|
|
(*static_cast<T**>(user)->*method)(js, job);
|
|
});
|
|
if (job) {
|
|
job->storage[0] = data;
|
|
}
|
|
return job;
|
|
}
|
|
|
|
// creates a job from a KNOWN method pointer w/ object passed by value
|
|
template<typename T, void(T::*method)(JobSystem&, Job*)>
|
|
Job* createJob(Job* parent, T data) noexcept {
|
|
static_assert(sizeof(data) <= sizeof(Job::storage), "user data too large");
|
|
Job* job = create(parent, [](void* user, JobSystem& js, Job* job) {
|
|
T* that = static_cast<T*>(user);
|
|
(that->*method)(js, job);
|
|
that->~T();
|
|
});
|
|
if (job) {
|
|
new(job->storage) T(std::move(data));
|
|
}
|
|
return job;
|
|
}
|
|
|
|
// creates a job from a functor passed by value
|
|
template<typename T>
|
|
Job* createJob(Job* parent, T functor) noexcept {
|
|
static_assert(sizeof(functor) <= sizeof(Job::storage), "functor too large");
|
|
Job* job = create(parent, [](void* user, JobSystem& js, Job* job){
|
|
T& that = *static_cast<T*>(user);
|
|
that(js, job);
|
|
that.~T();
|
|
});
|
|
if (job) {
|
|
new(job->storage) T(std::move(functor));
|
|
}
|
|
return job;
|
|
}
|
|
|
|
|
|
/*
|
|
* Jobs are normally finished automatically, this can be used to cancel a job before it is run.
|
|
*
|
|
* Never use this once a flavor of run() has been called.
|
|
*/
|
|
void cancel(Job*& job) noexcept;
|
|
|
|
/*
|
|
* Adds a reference to a Job.
|
|
*
|
|
* This allows the caller to waitAndRelease() on this job from multiple threads.
|
|
* Use runAndWait() if waiting from multiple threads is not needed.
|
|
*
|
|
* This job MUST BE waited on with waitAndRelease(), or released with release().
|
|
*/
|
|
Job* retain(Job* job) noexcept;
|
|
|
|
/*
|
|
* Releases a reference from a Job obtained with runAndRetain() or a call to retain().
|
|
*
|
|
* The job can't be used after this call.
|
|
*/
|
|
void release(Job*& job) noexcept;
|
|
void release(Job*&& job) noexcept {
|
|
Job* p = job;
|
|
release(p);
|
|
}
|
|
|
|
/*
|
|
* Add job to this thread's execution queue. It's reference will drop automatically.
|
|
* Current thread must be owned by JobSystem's thread pool. See adopt().
|
|
*
|
|
* The job can't be used after this call.
|
|
*/
|
|
void run(Job*& job) noexcept;
|
|
void run(Job*&& job) noexcept { // allows run(createJob(...));
|
|
Job* p = job;
|
|
run(p);
|
|
}
|
|
|
|
void signal() noexcept;
|
|
|
|
/*
|
|
* Add job to this thread's execution queue and and keep a reference to it.
|
|
* Current thread must be owned by JobSystem's thread pool. See adopt().
|
|
*
|
|
* This job MUST BE waited on with wait(), or released with release().
|
|
*/
|
|
Job* runAndRetain(Job* job) noexcept;
|
|
|
|
/*
|
|
* Wait on a job and destroys it.
|
|
* Current thread must be owned by JobSystem's thread pool. See adopt().
|
|
*
|
|
* The job must first be obtained from runAndRetain() or retain().
|
|
* The job can't be used after this call.
|
|
*/
|
|
void waitAndRelease(Job*& job) noexcept;
|
|
|
|
/*
|
|
* Runs and wait for a job. This is equivalent to calling
|
|
* runAndRetain(job);
|
|
* wait(job);
|
|
*
|
|
* The job can't be used after this call.
|
|
*/
|
|
void runAndWait(Job*& job) noexcept;
|
|
void runAndWait(Job*&& job) noexcept { // allows runAndWait(createJob(...));
|
|
Job* p = job;
|
|
runAndWait(p);
|
|
}
|
|
|
|
// for debugging
|
|
friend utils::io::ostream& operator << (utils::io::ostream& out, JobSystem const& js);
|
|
|
|
|
|
// utility functions...
|
|
|
|
// set the name of the current thread (on OSes that support it)
|
|
static void setThreadName(const char* threadName) noexcept;
|
|
|
|
enum class Priority {
|
|
NORMAL,
|
|
DISPLAY,
|
|
URGENT_DISPLAY
|
|
};
|
|
|
|
static void setThreadPriority(Priority priority) noexcept;
|
|
static void setThreadAffinityById(size_t id) noexcept;
|
|
|
|
size_t getParallelSplitCount() const noexcept {
|
|
return mParallelSplitCount;
|
|
}
|
|
|
|
private:
|
|
// this is just to avoid using std::default_random_engine, since we're in a public header.
|
|
class default_random_engine {
|
|
static constexpr uint32_t m = 0x7fffffffu;
|
|
uint32_t mState; // must be 0 < seed < 0x7fffffff
|
|
public:
|
|
inline constexpr explicit default_random_engine(uint32_t seed = 1u) noexcept
|
|
: mState(((seed % m) == 0u) ? 1u : seed % m) {
|
|
}
|
|
inline uint32_t operator()() noexcept {
|
|
return mState = uint32_t((uint64_t(mState) * 48271u) % m);
|
|
}
|
|
};
|
|
|
|
struct alignas(CACHELINE_SIZE) ThreadState { // this causes 40-bytes padding
|
|
// make sure storage is cache-line aligned
|
|
WorkQueue workQueue;
|
|
|
|
// these are not accessed by the worker threads
|
|
alignas(CACHELINE_SIZE) // this causes 56-bytes padding
|
|
JobSystem* js;
|
|
std::thread thread;
|
|
default_random_engine rndGen;
|
|
uint32_t id;
|
|
};
|
|
|
|
static_assert(sizeof(ThreadState) % CACHELINE_SIZE == 0,
|
|
"ThreadState doesn't align to a cache line");
|
|
|
|
ThreadState& getState() noexcept;
|
|
|
|
void incRef(Job const* job) noexcept;
|
|
void decRef(Job const* job) noexcept;
|
|
|
|
Job* allocateJob() noexcept;
|
|
JobSystem::ThreadState* getStateToStealFrom(JobSystem::ThreadState& state) noexcept;
|
|
bool hasJobCompleted(Job const* job) noexcept;
|
|
|
|
void requestExit() noexcept;
|
|
bool exitRequested() const noexcept;
|
|
bool hasActiveJobs() const noexcept;
|
|
|
|
void loop(ThreadState* state) noexcept;
|
|
bool execute(JobSystem::ThreadState& state) noexcept;
|
|
Job* steal(JobSystem::ThreadState& state) noexcept;
|
|
void finish(Job* job) noexcept;
|
|
|
|
void put(WorkQueue& workQueue, Job* job) noexcept {
|
|
assert(job);
|
|
size_t index = job - mJobStorageBase;
|
|
assert(index >= 0 && index < MAX_JOB_COUNT);
|
|
workQueue.push(uint16_t(index + 1));
|
|
}
|
|
|
|
Job* pop(WorkQueue& workQueue) noexcept {
|
|
size_t index = workQueue.pop();
|
|
assert(index <= MAX_JOB_COUNT);
|
|
return !index ? nullptr : &mJobStorageBase[index - 1];
|
|
}
|
|
|
|
Job* steal(WorkQueue& workQueue) noexcept {
|
|
size_t index = workQueue.steal();
|
|
assert(index <= MAX_JOB_COUNT);
|
|
return !index ? nullptr : &mJobStorageBase[index - 1];
|
|
}
|
|
|
|
void wait(std::unique_lock<Mutex>& lock, Job* job = nullptr) noexcept;
|
|
void wakeAll() noexcept;
|
|
void wakeOne() noexcept;
|
|
|
|
// these have thread contention, keep them together
|
|
utils::Mutex mWaiterLock;
|
|
utils::Condition mWaiterCondition;
|
|
|
|
std::atomic<uint32_t> mActiveJobs = { 0 };
|
|
utils::Arena<utils::ThreadSafeObjectPoolAllocator<Job>, LockingPolicy::NoLock> mJobPool;
|
|
|
|
template <typename T>
|
|
using aligned_vector = std::vector<T, utils::STLAlignedAllocator<T>>;
|
|
|
|
// these are essentially const, make sure they're on a different cache-lines than the
|
|
// read-write atomics.
|
|
// We can't use "alignas(CACHELINE_SIZE)" because the standard allocator can't make this
|
|
// guarantee.
|
|
char padding[CACHELINE_SIZE];
|
|
|
|
alignas(16) // at least we align to half (or quarter) cache-line
|
|
aligned_vector<ThreadState> mThreadStates; // actual data is stored offline
|
|
std::atomic<bool> mExitRequested = { false }; // this one is almost never written
|
|
std::atomic<uint16_t> mAdoptedThreads = { 0 }; // this one is almost never written
|
|
Job* const mJobStorageBase; // Base for conversion to indices
|
|
uint16_t mThreadCount = 0; // total # of threads in the pool
|
|
uint8_t mParallelSplitCount = 0; // # of split allowable in parallel_for
|
|
Job* mRootJob = nullptr;
|
|
|
|
utils::SpinLock mThreadMapLock; // this should have very little contention
|
|
tsl::robin_map<std::thread::id, ThreadState *> mThreadMap;
|
|
};
|
|
|
|
// -------------------------------------------------------------------------------------------------
|
|
// Utility functions built on top of JobSystem
|
|
|
|
namespace jobs {
|
|
|
|
// These are convenience C++11 style job creation methods that support lambdas
|
|
//
|
|
// IMPORTANT: these are less efficient to call and may perform heap allocation
|
|
// depending on the capture and parameters
|
|
//
|
|
template<typename CALLABLE, typename ... ARGS>
|
|
JobSystem::Job* createJob(JobSystem& js, JobSystem::Job* parent,
|
|
CALLABLE&& func, ARGS&&... args) noexcept {
|
|
struct Data {
|
|
std::function<void()> f;
|
|
// Renaming the method below could cause an Arrested Development.
|
|
void gob(JobSystem&, JobSystem::Job*) noexcept { f(); }
|
|
} user{ std::bind(std::forward<CALLABLE>(func),
|
|
std::forward<ARGS>(args)...) };
|
|
return js.createJob<Data, &Data::gob>(parent, std::move(user));
|
|
}
|
|
|
|
template<typename CALLABLE, typename T, typename ... ARGS,
|
|
typename = typename std::enable_if<
|
|
std::is_member_function_pointer<typename std::remove_reference<CALLABLE>::type>::value
|
|
>::type
|
|
>
|
|
JobSystem::Job* createJob(JobSystem& js, JobSystem::Job* parent,
|
|
CALLABLE&& func, T&& o, ARGS&&... args) noexcept {
|
|
struct Data {
|
|
std::function<void()> f;
|
|
// Renaming the method below could cause an Arrested Development.
|
|
void gob(JobSystem&, JobSystem::Job*) noexcept { f(); }
|
|
} user{ std::bind(std::forward<CALLABLE>(func), std::forward<T>(o),
|
|
std::forward<ARGS>(args)...) };
|
|
return js.createJob<Data, &Data::gob>(parent, std::move(user));
|
|
}
|
|
|
|
|
|
namespace details {
|
|
|
|
template<typename S, typename F>
|
|
struct ParallelForJobData {
|
|
using SplitterType = S;
|
|
using Functor = F;
|
|
using JobData = ParallelForJobData;
|
|
using size_type = uint32_t;
|
|
|
|
ParallelForJobData(size_type start, size_type count, uint8_t splits,
|
|
Functor functor,
|
|
const SplitterType& splitter) noexcept
|
|
: start(start), count(count),
|
|
functor(std::move(functor)),
|
|
splits(splits),
|
|
splitter(splitter) {
|
|
}
|
|
|
|
void parallelWithJobs(JobSystem& js, JobSystem::Job* parent) noexcept {
|
|
assert(parent);
|
|
|
|
// this branch is often miss-predicted (it both sides happen 50% of the calls)
|
|
right_side:
|
|
if (splitter.split(splits, count)) {
|
|
const size_type lc = count / 2;
|
|
JobData ld(start, lc, splits + uint8_t(1), functor, splitter);
|
|
JobSystem::Job* l = js.createJob<JobData, &JobData::parallelWithJobs>(parent, std::move(ld));
|
|
if (UTILS_UNLIKELY(l == nullptr)) {
|
|
// couldn't create a job, just pretend we're done splitting
|
|
goto execute;
|
|
}
|
|
|
|
// start the left side before attempting the right side, so we parallelize in case
|
|
// of job creation failure -- rare, but still.
|
|
js.run(l);
|
|
|
|
// don't spawn a job for the right side, just reuse us -- spawning jobs is more
|
|
// costly than we'd like.
|
|
start += lc;
|
|
count -= lc;
|
|
++splits;
|
|
goto right_side;
|
|
|
|
} else {
|
|
execute:
|
|
// we're done splitting, do the real work here!
|
|
functor(start, count);
|
|
}
|
|
}
|
|
|
|
private:
|
|
size_type start; // 4
|
|
size_type count; // 4
|
|
Functor functor; // ?
|
|
uint8_t splits; // 1
|
|
SplitterType splitter; // 1
|
|
};
|
|
|
|
} // namespace details
|
|
|
|
|
|
// parallel jobs with start/count indices
|
|
template<typename S, typename F>
|
|
JobSystem::Job* parallel_for(JobSystem& js, JobSystem::Job* parent,
|
|
uint32_t start, uint32_t count, F functor, const S& splitter) noexcept {
|
|
using JobData = details::ParallelForJobData<S, F>;
|
|
JobData jobData(start, count, 0, std::move(functor), splitter);
|
|
return js.createJob<JobData, &JobData::parallelWithJobs>(parent, std::move(jobData));
|
|
}
|
|
|
|
// parallel jobs with pointer/count
|
|
template<typename T, typename S, typename F>
|
|
JobSystem::Job* parallel_for(JobSystem& js, JobSystem::Job* parent,
|
|
T* data, uint32_t count, F functor, const S& splitter) noexcept {
|
|
auto user = [data, f = std::move(functor)](uint32_t s, uint32_t c) {
|
|
f(data + s, c);
|
|
};
|
|
using JobData = details::ParallelForJobData<S, decltype(user)>;
|
|
JobData jobData(0, count, 0, std::move(user), splitter);
|
|
return js.createJob<JobData, &JobData::parallelWithJobs>(parent, std::move(jobData));
|
|
}
|
|
|
|
// parallel jobs on a Slice<>
|
|
template<typename T, typename S, typename F>
|
|
JobSystem::Job* parallel_for(JobSystem& js, JobSystem::Job* parent,
|
|
utils::Slice<T> slice, F functor, const S& splitter) noexcept {
|
|
return parallel_for(js, parent, slice.data(), slice.size(), functor, splitter);
|
|
}
|
|
|
|
|
|
template <size_t COUNT, size_t MAX_SPLITS = 12>
|
|
class CountSplitter {
|
|
public:
|
|
bool split(size_t splits, size_t count) const noexcept {
|
|
return (splits < MAX_SPLITS && count >= COUNT * 2);
|
|
}
|
|
};
|
|
|
|
} // namespace jobs
|
|
} // namespace utils
|
|
|
|
#endif // TNT_UTILS_JOBSYSTEM_H
|