/** * Computes row sums of a matrix containing (0-indexed) consecutive numbers * in column major order * * Example (entries are array indices): * matrix (11 x 7): row_sums (11): * 0 11 22 33 44 55 66 -> 231 * 1 12 23 34 45 56 67 -> 238 * 2 13 24 35 46 57 68 -> 245 * 3 14 25 36 47 58 69 -> 252 * 4 15 26 37 48 59 70 -> 259 * 5 16 27 38 49 60 71 -> 266 * 6 17 28 39 50 61 72 -> 273 * 7 18 29 40 51 62 73 -> 280 * 8 19 30 41 52 63 74 -> 287 * 9 20 31 42 53 64 75 -> 294 * 10 21 32 43 54 65 76 -> 301 * * Each worker process computes one row sum at a time. Therefore, the scheduler * process (rank 0) sends one row to the worker (rank > 0) using a * MPI_Type_vector representing a sparse row layout which is reseaved by the * worker in a dense layout (MPI_DOUBLE) * * Send/Recv Example: * Data send (entries are array indices): * - - - - - - - * - - - - - - - * - - - - - - - * - - - - - - - * - - - - - - - * 5 16 27 38 49 60 71 * - - - - - - - * - - - - - - - * - - - - - - - * - - - - - - - * - - - - - - - * * Receved (indices 0, 1, 2, 3, 4, 5, 6): * 5 16 27 38 49 60 71 * * Usage: * mpirun -n MPI_RowSums [ []] * * must be at least 2 * number of rows, between 1 and 1024 defaults to 11 * number of cols, between 1 and 1024 defaults to 7 * * on parse error; , are set to there defaults. * * Compile: * mpic++ -Wall -Wpedantic -pedantic MPI_RowSums.cpp -o MPI_RowSums * * Interesting Parameters: * # Single Worker Process * mpirun -n 2 MPI_RowSums 1 10 * mpirun -n 2 MPI_RowSums 20 10 * # Less Rows than workers (some processes don't get any work at all) * mpirun -n 4 MPI_RowSums 2 10 * # Classic Example (bunch of work and some workers) * mpirun -n 4 MPI_RowSums 100 42 */ #include #include #include int min(int a, int b) { return a < b ? a : b; } int main(int argn, char* argv[]) { // Build a (simple and barebones, column major) matrix model, it's just a // vector with external row/col count. int nrow = 11; // defaults int ncol = 7; // defaults // Parse arguments to set nrow, ncol (sloppy, but not the point of the example) if (argn > 1) { nrow = atoi(argv[1]); if (nrow < 1 || nrow > 1024) { nrow = 11; } } if (argn > 2) { ncol = atoi(argv[2]); if (ncol < 1 || ncol > 1024) { ncol = 7; } } // Initialize MPI (always required) MPI_Init(nullptr, nullptr); // Allocate MPI Settings int mpi_size; /*< Number of processes */ int mpi_rank; /*< This process rank (a.k.a. the MPI process ID) */ // Set/Get MPI Settings MPI_Comm_size(MPI_COMM_WORLD, &mpi_size); MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); // Check if there is at least a single worker, otherwise abort right away if (mpi_size < 2) { std::cerr << "Nr. processes must be at least 2! (No workers -> No work done)" << std::endl; MPI_Finalize(); return -1; } // Setup shutdown tag (tags must be non-negative) int shutdown = nrow + 1; // unreachable row index // Sparce row data type, elements are strided in the matrix MPI_Datatype mpi_type_row; MPI_Type_vector(ncol, 1, nrow, MPI_DOUBLE, &mpi_type_row); MPI_Type_commit(&mpi_type_row); // Distinguish between workers (rank > 0) and scheduler (rank = 0) if (mpi_rank == 0) { // Create row sums result array std::vector row_sums(nrow); // Construct a nrow x ncol matrix (and enumerate elems) std::vector matrix(nrow * ncol); for (size_t i = 0; i < matrix.size(); ++i) { matrix[i] = static_cast(i); } // tracks processed rows int row_counter = 0; // Start by sending to all workers some data for (int rank = 1; rank < min(mpi_size, nrow + 1); ++rank) { // Send rows MPI_Send( matrix.data() + row_counter, // Pos of first row elem 1, // Send one row mpi_type_row, // row datatype, (sparce layout) rank, // target worker process row_counter, // tag = row index MPI_COMM_WORLD ); // Increment processed row count row_counter++; } // In case of less work than workers (nrow < mpi_size) send remaining // workers home (all ranks with mpi_rank >= nrow get shutdown signal) for (int rank = min(mpi_size, nrow + 1); rank < mpi_size; ++rank) { // Empty workload with shutdown tag MPI_Send( nullptr, // no data 0, // no data MPI_CHAR, // something rank, // ranks without work shutdown, // tag MPI_COMM_WORLD ); } // Repeat till all rows are processed while (row_counter < nrow) { double row_sum; // First listen for any worker process to respond (rank finished) MPI_Status mpi_status; MPI_Recv( &row_sum, // responding rank result 1, // row sum is a scalar MPI_DOUBLE, // and has type double MPI_ANY_SOURCE, // listen for everything MPI_ANY_TAG, // unknown who finishes first MPI_COMM_WORLD, &mpi_status ); // Write result to row sums row_sums[mpi_status.MPI_TAG] = row_sum; // Send the next row to process MPI_Send( matrix.data() + row_counter, 1, mpi_type_row, mpi_status.MPI_SOURCE, // responding rank gets new work row_counter, MPI_COMM_WORLD ); // Increment processed row count row_counter++; } // Now collect remaining results and send a "shutdown" message for (int rank = 1; rank < min(mpi_size, nrow + 1); ++rank) { double row_sum; // First listen for any rank to respond (a rank finished working) MPI_Status mpi_status; MPI_Recv( &row_sum, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &mpi_status ); // Write result to row sums row_sums[mpi_status.MPI_TAG] = row_sum; // Send rank shutdown message (work done) MPI_Send( nullptr, // no data 0, // no data MPI_CHAR, // something mpi_status.MPI_SOURCE, // responding rank gets shutdown shutdown, // tag MPI_COMM_WORLD ); } // Report final result (row sums) std::cout << "Rank 0: Done.\n\033[1m"; for (double& val : row_sums) { std::cout << val << ' '; } std::cout << "\033[0m\nCheck result with the following R code:\n" << " rowSums(matrix(seq(0, len = " << (nrow * ncol) << "), " << nrow << ", " << ncol << "))" << std::endl; } else { // Dense row representation, NO stride std::vector row(ncol); // Counts the number of processed rows (for analytic purposes) int work_count = 0; // Repeate till a shutdown signal is send (shutdown tag) while (true) { // Receive new work MPI_Status mpi_status; MPI_Recv( row.data(), // Raw row vector data ncol, // nr of row elments MPI_DOUBLE, // simple double data type (dense layout) 0, // Listen to main rank (rank 0) MPI_ANY_TAG, // tag (at this point) unknown MPI_COMM_WORLD, &mpi_status ); // check shutdown -> go home, all work done if (mpi_status.MPI_TAG == shutdown) { break; } // Process new work (compute row sum) double sum = 0; for (double& val : row) { sum += val; } // Increment work count (track number of completed jobs) work_count++; // Send result back to scheduler MPI_Send( &sum, // processing result 1, // single scalar MPI_DOUBLE, 0, // target scheduler (rank 0) mpi_status.MPI_TAG, // row index MPI_COMM_WORLD ); } // Report shutdown (worker goes home) std::cout << "Rank " << mpi_rank << ": processed " << work_count << " rows done -> shutdown" << std::endl; } // Shutdown MPI (always required) MPI_Finalize(); return 0; }