Branch data Line data Source code
1 : : // Copyright (c) 2015-2022 The Bitcoin Core developers 2 : : // Distributed under the MIT software license, see the accompanying 3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : : 5 : : #ifndef BITCOIN_SCHEDULER_H 6 : : #define BITCOIN_SCHEDULER_H 7 : : 8 : : #include <attributes.h> 9 : : #include <sync.h> 10 : : #include <threadsafety.h> 11 : : #include <util/task_runner.h> 12 : : 13 : : #include <chrono> 14 : : #include <condition_variable> 15 : : #include <cstddef> 16 : : #include <functional> 17 : : #include <list> 18 : : #include <map> 19 : : #include <thread> 20 : : #include <utility> 21 : : 22 : : /** 23 : : * Simple class for background tasks that should be run 24 : : * periodically or once "after a while" 25 : : * 26 : : * Usage: 27 : : * 28 : : * CScheduler* s = new CScheduler(); 29 : : * s->scheduleFromNow(doSomething, std::chrono::milliseconds{11}); // Assuming a: void doSomething() { } 30 : : * s->scheduleFromNow([=] { this->func(argument); }, std::chrono::milliseconds{3}); 31 : : * std::thread* t = new std::thread([&] { s->serviceQueue(); }); 32 : : * 33 : : * ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue: 34 : : * s->stop(); 35 : : * t->join(); 36 : : * delete t; 37 : : * delete s; // Must be done after thread is interrupted/joined. 38 : : */ 39 : : class CScheduler 40 : : { 41 : : public: 42 : : CScheduler(); 43 : : ~CScheduler(); 44 : : 45 : : std::thread m_service_thread; 46 : : 47 : : typedef std::function<void()> Function; 48 : : 49 : : /** Call func at/after time t */ 50 : : void schedule(Function f, std::chrono::steady_clock::time_point t) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); 51 : : 52 : : /** Call f once after the delta has passed */ 53 : 0 : void scheduleFromNow(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) 54 : : { 55 [ # # # # ]: 0 : schedule(std::move(f), std::chrono::steady_clock::now() + delta); 56 : 0 : } 57 : : 58 : : /** 59 : : * Repeat f until the scheduler is stopped. First run is after delta has passed once. 60 : : * 61 : : * The timing is not exact: Every time f is finished, it is rescheduled to run again after delta. If you need more 62 : : * accurate scheduling, don't use this method. 63 : : */ 64 : : void scheduleEvery(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); 65 : : 66 : : /** 67 : : * Mock the scheduler to fast forward in time. 68 : : * Iterates through items on taskQueue and reschedules them 69 : : * to be delta_seconds sooner. 70 : : */ 71 : : void MockForward(std::chrono::seconds delta_seconds) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); 72 : : 73 : : /** 74 : : * Services the queue 'forever'. Should be run in a thread. 75 : : */ 76 : : void serviceQueue() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); 77 : : 78 : : /** Tell any threads running serviceQueue to stop as soon as the current task is done */ 79 : 2073 : void stop() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) 80 : : { 81 : 4146 : WITH_LOCK(newTaskMutex, stopRequested = true); 82 : 2073 : newTaskScheduled.notify_all(); 83 [ - + ]: 2073 : if (m_service_thread.joinable()) m_service_thread.join(); 84 : 2073 : } 85 : : /** Tell any threads running serviceQueue to stop when there is no work left to be done */ 86 : : void StopWhenDrained() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) 87 : : { 88 : : WITH_LOCK(newTaskMutex, stopWhenEmpty = true); 89 : : newTaskScheduled.notify_all(); 90 : : if (m_service_thread.joinable()) m_service_thread.join(); 91 : : } 92 : : 93 : : /** 94 : : * Returns number of tasks waiting to be serviced, 95 : : * and first and last task times 96 : : */ 97 : : size_t getQueueInfo(std::chrono::steady_clock::time_point& first, 98 : : std::chrono::steady_clock::time_point& last) const 99 : : EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); 100 : : 101 : : /** Returns true if there are threads actively running in serviceQueue() */ 102 : : bool AreThreadsServicingQueue() const EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); 103 : : 104 : : private: 105 : : mutable Mutex newTaskMutex; 106 : : std::condition_variable newTaskScheduled; 107 : : std::multimap<std::chrono::steady_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex); 108 : : int nThreadsServicingQueue GUARDED_BY(newTaskMutex){0}; 109 : : bool stopRequested GUARDED_BY(newTaskMutex){false}; 110 : : bool stopWhenEmpty GUARDED_BY(newTaskMutex){false}; 111 [ + + + - ]: 4170456 : bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } 112 : : }; 113 : : 114 : : /** 115 : : * Class used by CScheduler clients which may schedule multiple jobs 116 : : * which are required to be run serially. Jobs may not be run on the 117 : : * same thread, but no two jobs will be executed 118 : : * at the same time and memory will be release-acquire consistent 119 : : * (the scheduler will internally do an acquire before invoking a callback 120 : : * as well as a release at the end). In practice this means that a callback 121 : : * B() will be able to observe all of the effects of callback A() which executed 122 : : * before it. 123 : : */ 124 : : class SerialTaskRunner : public util::TaskRunnerInterface 125 : : { 126 : : private: 127 : : CScheduler& m_scheduler; 128 : : 129 : : Mutex m_callbacks_mutex; 130 : : 131 : : // We are not allowed to assume the scheduler only runs in one thread, 132 : : // but must ensure all callbacks happen in-order, so we end up creating 133 : : // our own queue here :( 134 : : std::list<std::function<void()>> m_callbacks_pending GUARDED_BY(m_callbacks_mutex); 135 : 2055 : bool m_are_callbacks_running GUARDED_BY(m_callbacks_mutex) = false; 136 : : 137 : : void MaybeScheduleProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); 138 : : void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); 139 : : 140 : : public: 141 : 4110 : explicit SerialTaskRunner(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {} 142 : : 143 : : /** 144 : : * Add a callback to be executed. Callbacks are executed serially 145 : : * and memory is release-acquire consistent between callback executions. 146 : : * Practically, this means that callbacks can behave as if they are executed 147 : : * in order by a single thread. 148 : : */ 149 : : void insert(std::function<void()> func) override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); 150 : : 151 : : /** 152 : : * Processes all remaining queue members on the calling thread, blocking until queue is empty 153 : : * Must be called after the CScheduler has no remaining processing threads! 154 : : */ 155 : : void flush() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); 156 : : 157 : : size_t size() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); 158 : : }; 159 : : 160 : : #endif // BITCOIN_SCHEDULER_H