/* * Copyright (C) 2016 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef TNT_UTILS_JOBSYSTEM_H #define TNT_UTILS_JOBSYSTEM_H #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace utils { class JobSystem { static constexpr size_t MAX_JOB_COUNT = 16384; static_assert(MAX_JOB_COUNT <= 0x7FFE, "MAX_JOB_COUNT must be <= 0x7FFE"); using WorkQueue = WorkStealingDequeue; public: class Job; using JobFunc = void(*)(void*, JobSystem&, Job*); class alignas(CACHELINE_SIZE) Job { public: Job() noexcept {} /* = default; */ /* clang bug */ // NOLINT(modernize-use-equals-default,cppcoreguidelines-pro-type-member-init) Job(const Job&) = delete; Job(Job&&) = delete; private: friend class JobSystem; // Size is chosen so that we can store at least std::function<> // the alignas() qualifier ensures we're multiple of a cache-line. static constexpr size_t JOB_STORAGE_SIZE_BYTES = sizeof(std::function) > 48 ? sizeof(std::function) : 48; static constexpr size_t JOB_STORAGE_SIZE_WORDS = (JOB_STORAGE_SIZE_BYTES + sizeof(void*) - 1) / sizeof(void*); // keep it first, so it's correctly aligned with all architectures // this is where we store the job's data, typically a std::function<> // v7 | v8 void* storage[JOB_STORAGE_SIZE_WORDS]; // 48 | 48 JobFunc function; // 4 | 8 uint16_t parent; // 2 | 2 std::atomic runningJobCount = { 1 }; // 2 | 2 mutable std::atomic refCount = { 1 }; // 2 | 2 // 6 | 2 (padding) // 64 | 64 }; explicit JobSystem(size_t threadCount = 0, size_t adoptableThreadsCount = 1) noexcept; ~JobSystem(); // Make the current thread part of the thread pool. void adopt(); // Remove this adopted thread from the parent. This is intended to be used for // shutting down a JobSystem. In particular, this doesn't allow the parent to // adopt more thread. void emancipate(); // If a parent is not specified when creating a job, that job will automatically take the // root job as a parent. // The root job is reset when waited on. Job* setRootJob(Job* job) noexcept { return mRootJob = job; } // use setRootJob() instead UTILS_DEPRECATED Job* setMasterJob(Job* job) noexcept { return setRootJob(job); } Job* create(Job* parent, JobFunc func) noexcept; // NOTE: All methods below must be called from the same thread and that thread must be // owned by JobSystem's thread pool. /* * Job creation examples: * ---------------------- * * struct Functor { * uintptr_t storage[6]; * void operator()(JobSystem&, Jobsystem::Job*); * } functor; * * struct Foo { * uintptr_t storage[6]; * void method(JobSystem&, Jobsystem::Job*); * } foo; * * Functor and Foo size muse be <= uintptr_t[6] * * createJob() * createJob(parent) * createJob(parent, &foo) * createJob(parent, foo) * createJob(parent, std::ref(foo)) * createJob(parent, functor) * createJob(parent, std::ref(functor)) * createJob(parent, [ up-to 6 uintptr_t ](JobSystem*, Jobsystem::Job*){ }) * * Utility functions: * ------------------ * These are less efficient, but handle any size objects using the heap if needed. * (internally uses std::function<>), and don't require the callee to take * a (JobSystem&, Jobsystem::Job*) as parameter. * * struct BigFoo { * uintptr_t large[16]; * void operator()(); * void method(int answerToEverything); * static void exec(BigFoo&) { } * } bigFoo; * * jobs::createJob(js, parent, [ any-capture ](int answerToEverything){}, 42); * jobs::createJob(js, parent, &BigFoo::method, &bigFoo, 42); * jobs::createJob(js, parent, &BigFoo::exec, std::ref(bigFoo)); * jobs::createJob(js, parent, bigFoo); * jobs::createJob(js, parent, std::ref(bigFoo)); * etc... * * struct SmallFunctor { * uintptr_t storage[3]; * void operator()(T* data, size_t count); * } smallFunctor; * * jobs::parallel_for(js, data, count, [ up-to 3 uintptr_t ](T* data, size_t count) { }); * jobs::parallel_for(js, data, count, smallFunctor); * jobs::parallel_for(js, data, count, std::ref(smallFunctor)); * */ // creates an empty (no-op) job with an optional parent Job* createJob(Job* parent = nullptr) noexcept { return create(parent, nullptr); } // creates a job from a KNOWN method pointer w/ object passed by pointer // the caller must ensure the object will outlive the Job template Job* createJob(Job* parent, T* data) noexcept { Job* job = create(parent, [](void* user, JobSystem& js, Job* job) { (*static_cast(user)->*method)(js, job); }); if (job) { job->storage[0] = data; } return job; } // creates a job from a KNOWN method pointer w/ object passed by value template Job* createJob(Job* parent, T data) noexcept { static_assert(sizeof(data) <= sizeof(Job::storage), "user data too large"); Job* job = create(parent, [](void* user, JobSystem& js, Job* job) { T* that = static_cast(user); (that->*method)(js, job); that->~T(); }); if (job) { new(job->storage) T(std::move(data)); } return job; } // creates a job from a functor passed by value template Job* createJob(Job* parent, T functor) noexcept { static_assert(sizeof(functor) <= sizeof(Job::storage), "functor too large"); Job* job = create(parent, [](void* user, JobSystem& js, Job* job){ T& that = *static_cast(user); that(js, job); that.~T(); }); if (job) { new(job->storage) T(std::move(functor)); } return job; } /* * Jobs are normally finished automatically, this can be used to cancel a job before it is run. * * Never use this once a flavor of run() has been called. */ void cancel(Job*& job) noexcept; /* * Adds a reference to a Job. * * This allows the caller to waitAndRelease() on this job from multiple threads. * Use runAndWait() if waiting from multiple threads is not needed. * * This job MUST BE waited on with waitAndRelease(), or released with release(). */ Job* retain(Job* job) noexcept; /* * Releases a reference from a Job obtained with runAndRetain() or a call to retain(). * * The job can't be used after this call. */ void release(Job*& job) noexcept; void release(Job*&& job) noexcept { Job* p = job; release(p); } /* * Add job to this thread's execution queue. It's reference will drop automatically. * Current thread must be owned by JobSystem's thread pool. See adopt(). * * The job can't be used after this call. */ void run(Job*& job) noexcept; void run(Job*&& job) noexcept { // allows run(createJob(...)); Job* p = job; run(p); } void signal() noexcept; /* * Add job to this thread's execution queue and and keep a reference to it. * Current thread must be owned by JobSystem's thread pool. See adopt(). * * This job MUST BE waited on with wait(), or released with release(). */ Job* runAndRetain(Job* job) noexcept; /* * Wait on a job and destroys it. * Current thread must be owned by JobSystem's thread pool. See adopt(). * * The job must first be obtained from runAndRetain() or retain(). * The job can't be used after this call. */ void waitAndRelease(Job*& job) noexcept; /* * Runs and wait for a job. This is equivalent to calling * runAndRetain(job); * wait(job); * * The job can't be used after this call. */ void runAndWait(Job*& job) noexcept; void runAndWait(Job*&& job) noexcept { // allows runAndWait(createJob(...)); Job* p = job; runAndWait(p); } // for debugging friend utils::io::ostream& operator << (utils::io::ostream& out, JobSystem const& js); // utility functions... // set the name of the current thread (on OSes that support it) static void setThreadName(const char* threadName) noexcept; enum class Priority { NORMAL, DISPLAY, URGENT_DISPLAY }; static void setThreadPriority(Priority priority) noexcept; static void setThreadAffinityById(size_t id) noexcept; size_t getParallelSplitCount() const noexcept { return mParallelSplitCount; } size_t getThreadCount() const { return mThreadCount; } private: // this is just to avoid using std::default_random_engine, since we're in a public header. class default_random_engine { static constexpr uint32_t m = 0x7fffffffu; uint32_t mState; // must be 0 < seed < 0x7fffffff public: inline constexpr explicit default_random_engine(uint32_t seed = 1u) noexcept : mState(((seed % m) == 0u) ? 1u : seed % m) { } inline uint32_t operator()() noexcept { return mState = uint32_t((uint64_t(mState) * 48271u) % m); } }; struct alignas(CACHELINE_SIZE) ThreadState { // this causes 40-bytes padding // make sure storage is cache-line aligned WorkQueue workQueue; // these are not accessed by the worker threads alignas(CACHELINE_SIZE) // this causes 56-bytes padding JobSystem* js; std::thread thread; default_random_engine rndGen; uint32_t id; }; static_assert(sizeof(ThreadState) % CACHELINE_SIZE == 0, "ThreadState doesn't align to a cache line"); ThreadState& getState() noexcept; void incRef(Job const* job) noexcept; void decRef(Job const* job) noexcept; Job* allocateJob() noexcept; JobSystem::ThreadState* getStateToStealFrom(JobSystem::ThreadState& state) noexcept; bool hasJobCompleted(Job const* job) noexcept; void requestExit() noexcept; bool exitRequested() const noexcept; bool hasActiveJobs() const noexcept; void loop(ThreadState* state) noexcept; bool execute(JobSystem::ThreadState& state) noexcept; Job* steal(JobSystem::ThreadState& state) noexcept; void finish(Job* job) noexcept; void put(WorkQueue& workQueue, Job* job) noexcept; Job* pop(WorkQueue& workQueue) noexcept; Job* steal(WorkQueue& workQueue) noexcept; void wait(std::unique_lock& lock, Job* job = nullptr) noexcept; void wakeAll() noexcept; void wakeOne() noexcept; // these have thread contention, keep them together utils::Mutex mWaiterLock; utils::Condition mWaiterCondition; std::atomic mActiveJobs = { 0 }; utils::Arena, LockingPolicy::NoLock> mJobPool; template using aligned_vector = std::vector>; // these are essentially const, make sure they're on a different cache-lines than the // read-write atomics. // We can't use "alignas(CACHELINE_SIZE)" because the standard allocator can't make this // guarantee. char padding[CACHELINE_SIZE]; alignas(16) // at least we align to half (or quarter) cache-line aligned_vector mThreadStates; // actual data is stored offline std::atomic mExitRequested = { false }; // this one is almost never written std::atomic mAdoptedThreads = { 0 }; // this one is almost never written Job* const mJobStorageBase; // Base for conversion to indices uint16_t mThreadCount = 0; // total # of threads in the pool uint8_t mParallelSplitCount = 0; // # of split allowable in parallel_for Job* mRootJob = nullptr; utils::SpinLock mThreadMapLock; // this should have very little contention tsl::robin_map mThreadMap; }; // ------------------------------------------------------------------------------------------------- // Utility functions built on top of JobSystem namespace jobs { // These are convenience C++11 style job creation methods that support lambdas // // IMPORTANT: these are less efficient to call and may perform heap allocation // depending on the capture and parameters // template JobSystem::Job* createJob(JobSystem& js, JobSystem::Job* parent, CALLABLE&& func, ARGS&&... args) noexcept { struct Data { std::function f; // Renaming the method below could cause an Arrested Development. void gob(JobSystem&, JobSystem::Job*) noexcept { f(); } } user{ std::bind(std::forward(func), std::forward(args)...) }; return js.createJob(parent, std::move(user)); } template::type>::value >::type > JobSystem::Job* createJob(JobSystem& js, JobSystem::Job* parent, CALLABLE&& func, T&& o, ARGS&&... args) noexcept { struct Data { std::function f; // Renaming the method below could cause an Arrested Development. void gob(JobSystem&, JobSystem::Job*) noexcept { f(); } } user{ std::bind(std::forward(func), std::forward(o), std::forward(args)...) }; return js.createJob(parent, std::move(user)); } namespace details { template struct ParallelForJobData { using SplitterType = S; using Functor = F; using JobData = ParallelForJobData; using size_type = uint32_t; ParallelForJobData(size_type start, size_type count, uint8_t splits, Functor functor, const SplitterType& splitter) noexcept : start(start), count(count), functor(std::move(functor)), splits(splits), splitter(splitter) { } void parallelWithJobs(JobSystem& js, JobSystem::Job* parent) noexcept { assert(parent); // this branch is often miss-predicted (it both sides happen 50% of the calls) right_side: if (splitter.split(splits, count)) { const size_type lc = count / 2; JobData ld(start, lc, splits + uint8_t(1), functor, splitter); JobSystem::Job* l = js.createJob(parent, std::move(ld)); if (UTILS_UNLIKELY(l == nullptr)) { // couldn't create a job, just pretend we're done splitting goto execute; } // start the left side before attempting the right side, so we parallelize in case // of job creation failure -- rare, but still. js.run(l); // don't spawn a job for the right side, just reuse us -- spawning jobs is more // costly than we'd like. start += lc; count -= lc; ++splits; goto right_side; } else { execute: // we're done splitting, do the real work here! functor(start, count); } } private: size_type start; // 4 size_type count; // 4 Functor functor; // ? uint8_t splits; // 1 SplitterType splitter; // 1 }; } // namespace details // parallel jobs with start/count indices template JobSystem::Job* parallel_for(JobSystem& js, JobSystem::Job* parent, uint32_t start, uint32_t count, F functor, const S& splitter) noexcept { using JobData = details::ParallelForJobData; JobData jobData(start, count, 0, std::move(functor), splitter); return js.createJob(parent, std::move(jobData)); } // parallel jobs with pointer/count template JobSystem::Job* parallel_for(JobSystem& js, JobSystem::Job* parent, T* data, uint32_t count, F functor, const S& splitter) noexcept { auto user = [data, f = std::move(functor)](uint32_t s, uint32_t c) { f(data + s, c); }; using JobData = details::ParallelForJobData; JobData jobData(0, count, 0, std::move(user), splitter); return js.createJob(parent, std::move(jobData)); } // parallel jobs on a Slice<> template JobSystem::Job* parallel_for(JobSystem& js, JobSystem::Job* parent, utils::Slice slice, F functor, const S& splitter) noexcept { return parallel_for(js, parent, slice.data(), slice.size(), functor, splitter); } template class CountSplitter { public: bool split(size_t splits, size_t count) const noexcept { return (splits < MAX_SPLITS && count >= COUNT * 2); } }; } // namespace jobs } // namespace utils #endif // TNT_UTILS_JOBSYSTEM_H