In a previous post, we discussed how to create a simple thread pool that will process a queue of tasks. When writing programs that use multi-threading, we run into the problem of output. If we put output statements in each task, they will print in the order that the tasks are executed, which is not necessarily the order that the tasks were assigned nor in an order that we desire. We need a class that gathers the output from the threads as they run and will organize that output in an order that we desire. Here we will explore a class OutputCollator
for that purpose.
Below is a simple multi-threaded program that will compute and output the digits of π in hexadecimal notation. Each task queued into our ThreadPool
will compute 16 digits then output a message that includes the position within π and the 16 hexadecimal digits.
int main(int argc, char** argv) { int max_hex_digits = argc > 1 ? atoi(argv[1]) : 10000; auto pool = ThreadPool::getThreadPool(); for (int i = 0; i < max_hex_digits; i += 16) { pool->enqueue([i] { mpreal::set_default_prec(256); // Set default precision in bits std::string hex_digits = compute_pi_hex_digit(i); cout << "Hex digits of pi at position " << to_string(i) << ": " << hex_digits << endl; }); } pool->shutdown(); return 0; }
Because there is no coordination between the threads, the program output will vary from run to run and can exhibit two classes of problems as illustrated below.
- Out-of-Order Lines
- Interrupted Output
Hex digits of pi at position 39632: 6B855314E162481D
Hex digits of pi at position 39648: 6517FCE5C1B9007A
Hex digits of pi at position 39664: 5C423922552E5BAE
Hex digits of pi at position 39680: 1EE1576D60E9A2AC
Hex digits of pi at position 39696: EAFC62B01C318D81
Hex digits of pi at position Hex digits of pi at position 39712: 132513D444620A49
39744: 9C2D45E357F0E5AD
Hex digits of pi at position 39760: 0D427D9DE1614D64
Hex digits of pi at position 39728: 7B3CEC079DD62052
Hex digits of pi at position 39776: 4E5438F456A0E9C9
Hex digits of pi at position 39792: 979C644796287CDA
Out-of-Order Lines
For this program we would like to see lines output in order of their position within π. Each output line should be for a position 16 digits greater than the previous one. While this can occur sometimes, it is equally likely that lines appear out of order because a later task finishes before an earlier assigned task. In the example output, notice that position 39760 prints before position 39728.
Interrupted Output
Because a thread can be interrupted to allow other threads time to execute, it might happen that a thread is in the middle of an output statement when it is suspended. This can allow another running thread to output before the suspended thread completes its output. In the example output, notice how the first part of the position 39744 output line is printed but then it is interrupted by the complete output for position 39712. When the suspended thread resumes, it then finishes its output.
To solve both these problems we introduce the OutputCollator
class. We simply create an OutputCollator
object using an output stream to which the output will eventually be written by the OutputCollator
. We must pass a reference to the OutputCollator
to the enqueued lambda expression so that the task can send output to the OutputCollator
. Using the enqueue()
method of the OutputCollator
, the task composes and sends strings to the OutputCollator
. In addition a position within the output is passed. The output positions are non-negative integers starting with 0
. When the OutputCollator
is sent output that is in order, that output will be sent to the output stream. If the output is sent to the OutputCollator
out-of-order, the output will be cached until it can be output. When the OutputCollator
is destroyed it will flush all remaining cached output. This can also be accomplished using the OutputCollator
flush()
method. The OutputCollator
will throw an exception if an output position is used more than once.
int main(int argc, char** argv) { int max_hex_digits = argc > 1 ? atoi(argv[1]) : 10000; StreamOutputCollator oc(std::cout); auto pool = ThreadPool::getThreadPool(); for (int i = 0; i < max_hex_digits; i += 16) { pool->enqueue([i, &oc] { mpreal::set_default_prec(256); // Set default precision in bits std::string hex_digits = compute_pi_hex_digit(i); oc.enqueue(i / 16, "Hex digits of pi at position " + to_string(i) + ": " + hex_digits + "\n"); }); } pool->shutdown(); oc.flush(); return 0; }
Because the OutputCollator
is the single entity responsible for output, we don't see any interrupted output. Because the output position is supplied to the OutputCollator
, it can send the output in the correct order to the output stream. Below is an example of the collated output.
Hex digits of pi at position 39632: 6B855314E162481D
Hex digits of pi at position 39648: 6517FCE5C1B9007A
Hex digits of pi at position 39664: 5C423922552E5BAE
Hex digits of pi at position 39680: 1EE1576D60E9A2AC
Hex digits of pi at position 39696: EAFC62B01C318D81
Hex digits of pi at position 39712: 132513D444620A49
Hex digits of pi at position 39728: 7B3CEC079DD62052
Hex digits of pi at position 39744: 9C2D45E357F0E5AD
Hex digits of pi at position 39760: 0D427D9DE1614D64
Hex digits of pi at position 39776: 4E5438F456A0E9C9
Hex digits of pi at position 39792: 979C644796287CDA
If we prefer to gather up all the output in a string instead of sending it to an output stream, we can use the StringOutputCollator
class. This class gathers the output into a stringstream
that we can eventually convert to a string
. For example, say we want to output a progress meter to stdout
. This would prevent us from using a StreamOutputCollator
for the hexadecimal digits of π. Instead we can use a StringOutputCollator
to gather the digits of π and then print them once all the computations are done.
int main(int argc, char** argv) { int max_hex_digits = argc > 1 ? atoi(argv[1]) : 10000; StringOutputCollator oc; auto pool = ThreadPool::getThreadPool(); double progress_reported = 0; double num_progress_intervals = 20.; double progress_interval = 100 / num_progress_intervals; for (int i = 0; i < max_hex_digits; i += 16) { pool->enqueue([i, max_hex_digits, progress_interval, &progress_reported, &oc] { mpreal::set_default_prec(256); // Set default precision in bits std::string hex_digits = compute_pi_hex_digit(i); int work_unit = i / 16; oc.enqueue(work_unit, hex_digits); if (i * 100. / max_hex_digits >= progress_reported) { cout << setprecision(0) << fixed << progress_reported << "%..." << flush; progress_reported += progress_interval; } }); } pool->shutdown(); cout << "100%" << endl; oc.flush(); cout << endl << oc.str() << endl; return 0; }
% ./example3 1000
0%...5%...10%...15%...20%...25%...30%...35%...40%...45%...50%...55%...60%...65%...70%...75%...80%...85%...90%...95%...100%
243F6A8885A308D313198A2E03707344A4093822299F31D0082EFA98EC4E6C89452821E638D01377BE5466CF34E90C6CC0AC29B7C97C50DD3F84D5B5B5
OutputCollator.hpp
/** * @headerfile OutputCollator.hpp * @brief Classes to manage the output of worker threads. * * These classes are used to buffer the outout of a pool of worker threads and * output them in the correct order not the order computed by the threads. * * @author Richard Lesh * @date 2025-07-02 * @version 1.0 */ #ifndef OUTPUT_COLLATOR_HPP #define OUTPUT_COLLATOR_HPP #pragma once #include <iostream> #include <sstream> #include <string> #include <map> #include <mutex> /** * @class OutputCollator * @brief Collator to organize output from multiple threads. This is an * abstract base class. * * @author Richard Lesh * @date 2025-07-02 * @version 1.0 */ class OutputCollator { public: OutputCollator() noexcept = default; OutputCollator(const OutputCollator& o) = delete; OutputCollator(OutputCollator&& o) = delete; OutputCollator& operator=(const OutputCollator& o) = delete; OutputCollator& operator=(OutputCollator&& o) = delete; virtual ~OutputCollator() noexcept = default; /** * @brief Method used to send strings to the OutputCollator. * * Enqueues the passed string at the specified position in the output. * * @param position The output position [0, int_max]. * @param data The string to output. * @throw invalid_argument Throws if the position is invalid or reused. */ virtual void enqueue(size_t position, const std::string& data) = 0; /** * @brief Method used to send string literals to the OutputCollator. * * Enqueues the passed C-string at the specified position in the output. * * @param position The output position [0, int_max]. * @param data The C-string to output. * @throw invalid_argument Throws if the position is invalid or reused. */ void enqueue(size_t position, const char* data) { enqueue(position, std::string(data)); }; /** * @brief Method used to flush the remaining output stored in the collator. * * Flushes the remaining cached output stored in the collector. Missing * output positions are skipped. */ virtual void flush() = 0; /** * @brief Method to return the number of strings currently in the collator. * * Returns the number of strings still stored in the collator that have not * been flushed. * * @return Number of strings currently buffered. */ size_t size() noexcept { return queue_.size(); } /** * @brief Method to return the number of strings flushed. * * Returns the number of strings that have been flushed to the underlying * output. * * @return Number of strings written. */ size_t getUnitsWritten() noexcept { return units_written_; } protected: size_t units_written_ = 0; std::map<size_t, std::string> queue_; std::mutex mutex_; }; /** * @class NullOutputCollator * @brief Collator to send output to the null device. * * @author Richard Lesh * @date 2025-07-02 * @version 1.0 */ class NullOutputCollator : public OutputCollator { NullOutputCollator() noexcept = default; NullOutputCollator(const NullOutputCollator& o) = delete; NullOutputCollator(NullOutputCollator&& o) = delete; NullOutputCollator& operator=(const NullOutputCollator& o) = delete; NullOutputCollator& operator=(NullOutputCollator&& o) = delete; ~NullOutputCollator() noexcept = default; void enqueue(size_t position, const std::string& data) override {} void flush() override {} }; /** * @class StreamOutputCollator * @brief Collator to send output to an output stream. * * @author Richard Lesh * @date 2025-07-02 * @version 1.0 */ class StreamOutputCollator : public OutputCollator { public: explicit StreamOutputCollator(std::ostream& os) noexcept : OutputCollator(), os_{&os} {} StreamOutputCollator(const StreamOutputCollator& o) = delete; StreamOutputCollator(StreamOutputCollator&& o) = delete; StreamOutputCollator& operator=(const StreamOutputCollator& o) = delete; StreamOutputCollator& operator=(StreamOutputCollator&& o) = delete; ~StreamOutputCollator() noexcept { flush(); } using OutputCollator::enqueue; void enqueue(size_t position, const std::string& data) override; void flush() override; protected: StreamOutputCollator() noexcept {} std::ostream* os_; }; /** * @class StringOutputCollator * @brief Collator to send output to a string stream. * * @author Richard Lesh * @date 2025-07-02 * @version 1.0 */ class StringOutputCollator : public StreamOutputCollator { public: StringOutputCollator() noexcept : StreamOutputCollator() { os_ = &ss; } StringOutputCollator(const StringOutputCollator& o) = delete; StringOutputCollator(StringOutputCollator&& o) = delete; StringOutputCollator& operator=(const StringOutputCollator& o) = delete; StringOutputCollator& operator=(StringOutputCollator&& o) = delete; ~StringOutputCollator() noexcept = default; /** * @brief Method to return all output as a string. * * Flushes the collector and returns all the output as a string. * * @return All the output sent to this collator. */ std::string str() { flush(); return ss.str(); } private: std::stringstream ss; }; #endif
OutputCollator.cpp
#include "OutputCollator.hpp" #include <algorithm> using namespace std; void StreamOutputCollator::enqueue(size_t position, const string& data) { lock_guard<mutex> lock(mutex_); if (position < units_written_) throw std::invalid_argument("Work unit " + to_string(position) + " already written!"); if (queue_.find(position) != queue_.end()) throw std::invalid_argument("Work unit " + to_string(position) + " already queued!"); queue_.emplace(position, data); auto it = queue_.find(units_written_); while (it != queue_.end()) { *os_ << it->second; os_->flush(); queue_.erase(it); ++units_written_; it = queue_.find(units_written_); } } void StreamOutputCollator::flush() { lock_guard<mutex> lock(mutex_); // Create a vector to store the keys std::vector<int> keys; // Use cbegin() and cend() to iterate over the map for (auto it = queue_.cbegin(); it != queue_.cend(); ++it) { keys.push_back(it->first); // Collect the keys } // Sort the vector in ascending order sort(keys.begin(), keys.end()); for (auto key = keys.cbegin(); key != keys.cend(); ++key) { auto it = queue_.find(*key); *os_ << it->second; ++units_written_; } queue_.clear(); os_->flush(); }
Complete example.cpp to compute hexadecimal digits of π
#include <gmpxx.h> // GMP C++ Wrapper #include <iostream> #include <iomanip> #include <sstream> #include <string> #include <mpreal.h> // https://github.com/advanpix/mpreal/blob/master/mpreal.h #include <OutputCollator.hpp> #include <ThreadPool.hpp> using namespace mpfr; using namespace std; // Function to compute (base^exp) using fast exponentiation mpreal fastPow(mpreal base, long long exp) { mpreal result = 1; while (exp > 0) { if (exp & 1) { // If exp is odd, multiply the base with result result = result * base; } base = (base * base); // Square the base exp >>= 1; // Divide exponent by 2 } return result; } // Function to compute (base^exp) % mod using fast exponentiation mpz_class modFastPow(mpz_class base, long long exp, mpz_class mod) { mpz_class result = 1; base = base % mod; // Ensure base is within mod while (exp > 0) { if (exp & 1) { // If exp is odd, multiply the base with result result = (result * base) % mod; } base = (base * base) % mod; // Square the base exp >>= 1; // Divide exponent by 2 } return result; } // Compute the sum of the BBP series modulo 1 mpreal bbp_sum(long n, int j) { mpreal sum = 0.0; // First sum for (unsigned long k = 0; k <= n; ++k) { mpz_class denominator(8 * k + j); sum += mpreal(modFastPow(16, n - k, denominator).get_mpz_t()) / mpreal(denominator.get_mpz_t()); sum -= floor(sum); // Keep only fractional part } // Second sum for (long k = n + 1; k <= n + 16; ++k) { sum += 1.0 / fastPow(16.0, k - n) / (8 * k + j); sum -= floor(sum); } return sum; } // Compute the nth hexadecimal digit of π string compute_pi_hex_digit(long n) { mpreal x = 4.0 * bbp_sum(n, 1) - 2.0 * bbp_sum(n, 4) - 1.0 * bbp_sum(n, 5) - 1.0 * bbp_sum(n, 6); x -= floor(x); // Keep only fractional part char digits[17]; digits[16] = 0; for (int i = 0; i < 16; ++i) { x *= 16; int digit = static_cast<int>(x); // Extract hex digit // Convert digit to hexadecimal character if (digit < 10) digits[i] = '0' + digit; else digits[i] = 'A' + (digit - 10); x -= digit; } return string(digits); } int main(int argc, char** argv) { int max_hex_digits = argc > 1 ? atoi(argv[1]) : 10000; StringOutputCollator oc; auto pool = ThreadPool::getThreadPool(); double progress_reported = 0; double num_progress_intervals = 20.; double progress_interval = 100 / num_progress_intervals; for (int i = 0; i < max_hex_digits; i += 16) { pool->enqueue([i, max_hex_digits, progress_interval, &progress_reported, &oc] { mpreal::set_default_prec(256); // Set default precision in bits std::string hex_digits = compute_pi_hex_digit(i); int work_unit = i / 16; oc.enqueue(work_unit, hex_digits); if (i * 100. / max_hex_digits >= progress_reported) { cout << setprecision(0) << fixed << progress_reported << "%..." << flush; progress_reported += progress_interval; } }); } pool->shutdown(); cout << "100%" << endl; oc.flush(); cout << endl << oc.str() << endl; return 0; }