89.13% Lines (41/46) 100.00% Functions (7/7)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2026 Steve Gerbino 2   // Copyright (c) 2026 Steve Gerbino
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/corosio 7   // Official repository: https://github.com/cppalliance/corosio
8   // 8   //
9   9  
10   #ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP 10   #ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
11   #define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP 11   #define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
12   12  
13   #include <boost/corosio/detail/config.hpp> 13   #include <boost/corosio/detail/config.hpp>
14   #include <boost/corosio/detail/intrusive.hpp> 14   #include <boost/corosio/detail/intrusive.hpp>
15 - #include <boost/capy/test/thread_name.hpp>  
16   #include <boost/capy/ex/execution_context.hpp> 15   #include <boost/capy/ex/execution_context.hpp>
17   16  
18 - #include <cstdio>  
19   #include <condition_variable> 17   #include <condition_variable>
20   #include <mutex> 18   #include <mutex>
21   #include <stdexcept> 19   #include <stdexcept>
22   #include <thread> 20   #include <thread>
23   #include <vector> 21   #include <vector>
24   22  
25   namespace boost::corosio::detail { 23   namespace boost::corosio::detail {
26   24  
27   /** Base class for thread pool work items. 25   /** Base class for thread pool work items.
28   26  
29   Derive from this to create work that can be posted to a 27   Derive from this to create work that can be posted to a
30   @ref thread_pool. Uses static function pointer dispatch, 28   @ref thread_pool. Uses static function pointer dispatch,
31   consistent with the IOCP `op` pattern. 29   consistent with the IOCP `op` pattern.
32   30  
33   @par Example 31   @par Example
34   @code 32   @code
35   struct my_work : pool_work_item 33   struct my_work : pool_work_item
36   { 34   {
37   int* result; 35   int* result;
38   static void execute( pool_work_item* w ) noexcept 36   static void execute( pool_work_item* w ) noexcept
39   { 37   {
40   auto* self = static_cast<my_work*>( w ); 38   auto* self = static_cast<my_work*>( w );
41   *self->result = 42; 39   *self->result = 42;
42   } 40   }
43   }; 41   };
44   42  
45   my_work w; 43   my_work w;
46   w.func_ = &my_work::execute; 44   w.func_ = &my_work::execute;
47   w.result = &r; 45   w.result = &r;
48   pool.post( &w ); 46   pool.post( &w );
49   @endcode 47   @endcode
50   */ 48   */
51   struct pool_work_item : intrusive_queue<pool_work_item>::node 49   struct pool_work_item : intrusive_queue<pool_work_item>::node
52   { 50   {
53   /// Static dispatch function signature. 51   /// Static dispatch function signature.
54   using func_type = void (*)(pool_work_item*) noexcept; 52   using func_type = void (*)(pool_work_item*) noexcept;
55   53  
56   /// Completion handler invoked by the worker thread. 54   /// Completion handler invoked by the worker thread.
57   func_type func_ = nullptr; 55   func_type func_ = nullptr;
58   }; 56   };
59   57  
60   /** Shared thread pool for dispatching blocking operations. 58   /** Shared thread pool for dispatching blocking operations.
61   59  
62   Provides a fixed pool of reusable worker threads for operations 60   Provides a fixed pool of reusable worker threads for operations
63   that cannot be integrated with async I/O (e.g. blocking DNS 61   that cannot be integrated with async I/O (e.g. blocking DNS
64   calls). Registered as an `execution_context::service` so it 62   calls). Registered as an `execution_context::service` so it
65   is a singleton per io_context. 63   is a singleton per io_context.
66   64  
67   Threads are created eagerly in the constructor. The default 65   Threads are created eagerly in the constructor. The default
68   thread count is 1. 66   thread count is 1.
69   67  
70   @par Thread Safety 68   @par Thread Safety
71   All public member functions are thread-safe. 69   All public member functions are thread-safe.
72   70  
73   @par Shutdown 71   @par Shutdown
74   Sets a shutdown flag, notifies all threads, and joins them. 72   Sets a shutdown flag, notifies all threads, and joins them.
75   In-flight blocking calls complete naturally before the thread 73   In-flight blocking calls complete naturally before the thread
76   exits. 74   exits.
77   */ 75   */
78   class thread_pool final : public capy::execution_context::service 76   class thread_pool final : public capy::execution_context::service
79   { 77   {
80   std::mutex mutex_; 78   std::mutex mutex_;
81   std::condition_variable cv_; 79   std::condition_variable cv_;
82   intrusive_queue<pool_work_item> work_queue_; 80   intrusive_queue<pool_work_item> work_queue_;
83   std::vector<std::thread> threads_; 81   std::vector<std::thread> threads_;
84   bool shutdown_ = false; 82   bool shutdown_ = false;
85   83  
86 - void worker_loop(unsigned index); 84 + void worker_loop();
87   85  
88   public: 86   public:
89   using key_type = thread_pool; 87   using key_type = thread_pool;
90   88  
91   /** Construct the thread pool service. 89   /** Construct the thread pool service.
92   90  
93   Eagerly creates all worker threads. 91   Eagerly creates all worker threads.
94   92  
95   @par Exception Safety 93   @par Exception Safety
96   Strong guarantee. If thread creation fails, all 94   Strong guarantee. If thread creation fails, all
97   already-created threads are shut down and joined 95   already-created threads are shut down and joined
98   before the exception propagates. 96   before the exception propagates.
99   97  
100   @param ctx Reference to the owning execution_context. 98   @param ctx Reference to the owning execution_context.
101   @param num_threads Number of worker threads. Must be 99   @param num_threads Number of worker threads. Must be
102   at least 1. 100   at least 1.
103   101  
104   @throws std::logic_error If `num_threads` is 0. 102   @throws std::logic_error If `num_threads` is 0.
105   */ 103   */
HITCBC 106   607 explicit thread_pool(capy::execution_context& ctx, unsigned num_threads = 1) 104   607 explicit thread_pool(capy::execution_context& ctx, unsigned num_threads = 1)
HITCBC 107   607 { 105   607 {
108   (void)ctx; 106   (void)ctx;
HITCBC 109   607 if (!num_threads) 107   607 if (!num_threads)
HITCBC 110   1 throw std::logic_error("thread_pool requires at least 1 thread"); 108   1 throw std::logic_error("thread_pool requires at least 1 thread");
HITCBC 111   606 threads_.reserve(num_threads); 109   606 threads_.reserve(num_threads);
112   try 110   try
113   { 111   {
HITCBC 114   1215 for (unsigned i = 0; i < num_threads; ++i) 112   1215 for (unsigned i = 0; i < num_threads; ++i)
HITCBC 115 - 1218 threads_.emplace_back([this, i] { worker_loop(i + 1); }); 113 + 1218 threads_.emplace_back([this] { worker_loop(); });
116   } 114   }
MISUBC 117   catch (...) 115   catch (...)
118   { 116   {
MISUBC 119   shutdown(); 117   shutdown();
MISUBC 120   throw; 118   throw;
MISUBC 121   } 119   }
HITCBC 122   609 } 120   609 }
123   121  
HITCBC 124   1211 ~thread_pool() override = default; 122   1211 ~thread_pool() override = default;
125   123  
126   thread_pool(thread_pool const&) = delete; 124   thread_pool(thread_pool const&) = delete;
127   thread_pool& operator=(thread_pool const&) = delete; 125   thread_pool& operator=(thread_pool const&) = delete;
128   126  
129   /** Enqueue a work item for execution on the thread pool. 127   /** Enqueue a work item for execution on the thread pool.
130   128  
131   Zero-allocation: the caller owns the work item's storage. 129   Zero-allocation: the caller owns the work item's storage.
132   130  
133   @param w The work item to execute. Must remain valid until 131   @param w The work item to execute. Must remain valid until
134   its `func_` has been called. 132   its `func_` has been called.
135   133  
136   @return `true` if the item was enqueued, `false` if the 134   @return `true` if the item was enqueued, `false` if the
137   pool has already shut down. 135   pool has already shut down.
138   */ 136   */
139   bool post(pool_work_item* w) noexcept; 137   bool post(pool_work_item* w) noexcept;
140   138  
141   /** Shut down the thread pool. 139   /** Shut down the thread pool.
142   140  
143   Signals all threads to exit after draining any 141   Signals all threads to exit after draining any
144   remaining queued work, then joins them. 142   remaining queued work, then joins them.
145   */ 143   */
146   void shutdown() override; 144   void shutdown() override;
147   }; 145   };
148   146  
149   inline void 147   inline void
HITCBC 150 - 609 thread_pool::worker_loop(unsigned index) 148 + 609 thread_pool::worker_loop()
151 - // Name format chosen to fit Linux's 15-char pthread limit:  
152 - // "tpool-svc-" (10) + up to 4 digit index leaves "tpool-svc-9999".  
153 - char name[16];  
154 - std::snprintf(name, sizeof(name), "tpool-svc-%u", index);  
DCB 155 - 609 capy::set_current_thread_name(name);  
DCB 156 - 609  
157   { 149   {
158   for (;;) 150   for (;;)
159   { 151   {
160   pool_work_item* w; 152   pool_work_item* w;
161   { 153   {
HITCBC 162   788 std::unique_lock<std::mutex> lock(mutex_); 154   788 std::unique_lock<std::mutex> lock(mutex_);
HITCBC 163   788 cv_.wait( 155   788 cv_.wait(
HITCBC 164   1242 lock, [this] { return shutdown_ || !work_queue_.empty(); }); 156   1438 lock, [this] { return shutdown_ || !work_queue_.empty(); });
165   157  
HITCBC 166   788 w = work_queue_.pop(); 158   788 w = work_queue_.pop();
HITCBC 167   788 if (!w) 159   788 if (!w)
168   { 160   {
HITCBC 169   609 if (shutdown_) 161   609 if (shutdown_)
HITCBC 170   1218 return; 162   1218 return;
MISUBC 171   continue; 163   continue;
172   } 164   }
HITCBC 173   788 } 165   788 }
HITCBC 174   179 w->func_(w); 166   179 w->func_(w);
HITCBC 175   179 } 167   179 }
176   } 168   }
177   169  
178   inline bool 170   inline bool
HITCBC 179   180 thread_pool::post(pool_work_item* w) noexcept 171   180 thread_pool::post(pool_work_item* w) noexcept
180   { 172   {
181   { 173   {
HITCBC 182   180 std::lock_guard<std::mutex> lock(mutex_); 174   180 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 183   180 if (shutdown_) 175   180 if (shutdown_)
HITCBC 184   1 return false; 176   1 return false;
HITCBC 185   179 work_queue_.push(w); 177   179 work_queue_.push(w);
HITCBC 186   180 } 178   180 }
HITCBC 187   179 cv_.notify_one(); 179   179 cv_.notify_one();
HITCBC 188   179 return true; 180   179 return true;
189   } 181   }
190   182  
191   inline void 183   inline void
HITCBC 192   610 thread_pool::shutdown() 184   610 thread_pool::shutdown()
193   { 185   {
194   { 186   {
HITCBC 195   610 std::lock_guard<std::mutex> lock(mutex_); 187   610 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 196   610 shutdown_ = true; 188   610 shutdown_ = true;
HITCBC 197   610 } 189   610 }
HITCBC 198   610 cv_.notify_all(); 190   610 cv_.notify_all();
199   191  
HITCBC 200   1219 for (auto& t : threads_) 192   1219 for (auto& t : threads_)
201   { 193   {
HITCBC 202   609 if (t.joinable()) 194   609 if (t.joinable())
HITCBC 203   609 t.join(); 195   609 t.join();
204   } 196   }
HITCBC 205   610 threads_.clear(); 197   610 threads_.clear();
206   198  
207   { 199   {
HITCBC 208   610 std::lock_guard<std::mutex> lock(mutex_); 200   610 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 209   610 while (work_queue_.pop()) 201   610 while (work_queue_.pop())
210   ; 202   ;
HITCBC 211   610 } 203   610 }
HITCBC 212   610 } 204   610 }
213   205  
214   } // namespace boost::corosio::detail 206   } // namespace boost::corosio::detail
215   207  
216   #endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP 208   #endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP