Producer-consumer scheme with MPI
Below is a simple implementation of the producer-consumer parallel strategy with MPI. It’s just a dummy example, and could probably be improved greatly, but it is a nice illustration of the producer-consumer model, as well as uses for MPI_ANY_SOURCE
, MPI_ANY_TAG
, and MPI_Status
.
#include <stdio.h>
#include <mpi.h>
#include <time.h>
#include <stdlib.h>
#include <math.h>
// Producer-consumer scheme with MPI
int main(int argc, char** argv) {
MPI_Init(&argc, &argv);
int world_rank, nranks;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
MPI_Comm_size(MPI_COMM_WORLD, &nranks);
const int MAX_NUMBERS = 100;
int numbers[MAX_NUMBERS];
for (int i = 0; i < MAX_NUMBERS; i++) {
numbers[i] = i;
}
double roots[MAX_NUMBERS];
int number_amount;
if (world_rank == 0) { // Producer
srand(time(NULL));
number_amount = (rand() / (float)RAND_MAX) * MAX_NUMBERS;
printf("[0] Distributing %i numbers in total.\n", number_amount);
for (int nextnum = 1; nextnum <= number_amount; ++nextnum) {
// Wait for a worker to become available
MPI_Status status;
double root = 0;
MPI_Recv(&root, 1, MPI_DOUBLE, MPI_ANY_SOURCE,
MPI_ANY_TAG, MPI_COMM_WORLD, &status);
// If a root was computed
if (status.MPI_TAG > 0) {
roots[status.MPI_TAG] = root;
}
printf("[0] Distributing %i to rank %i.\n", numbers[nextnum], status.MPI_SOURCE);
MPI_Send(&(numbers[nextnum]), 1, MPI_INT, status.MPI_SOURCE,
nextnum, MPI_COMM_WORLD);
}
// Send termination signal to each rank when they submit their last job
int num_terminated = 0;
for (int num_terminated = 0; num_terminated < nranks-1; num_terminated++) {
// Wait for a worker to become available
MPI_Status status;
double root = 0;
MPI_Recv(&root, 1, MPI_DOUBLE, MPI_ANY_SOURCE,
MPI_ANY_TAG, MPI_COMM_WORLD, &status);
// If a root was computed
if (status.MPI_TAG > 0) {
roots[status.MPI_TAG] = root;
}
printf("[0] Terminating rank %i.\n", status.MPI_SOURCE);
// Send termination signal (tag = 0)
MPI_Send(&num_terminated, 1, MPI_INT, status.MPI_SOURCE,
0, MPI_COMM_WORLD);
}
} else { // Consumer
// Announce myself to producer
double root = 0;
MPI_Send(&root, 1, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD);
bool terminated = false;
do {
// Wait for a job
int num = 0;
MPI_Status status;
MPI_Recv(&num, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
//printf("[%i] Received job: %i [tag = %i].\n", world_rank, num, status.MPI_TAG);
if (status.MPI_TAG == 0) {
terminated = true;
printf("[%i] Terminated.\n", world_rank);
} else {
root = sqrt(num);
printf("[%i] Submitting result: sqrt(%i) = %f.\n", world_rank, num, root);
MPI_Send(&root, 1, MPI_DOUBLE, 0, status.MPI_TAG, MPI_COMM_WORLD);
}
} while (!terminated);
}
MPI_Barrier(MPI_COMM_WORLD);
if (world_rank == 0) {
printf("\n\nResults:\n");
for (int i = 1; i <= number_amount; i++) {
printf("sqrt(%i) = %f\n", numbers[i], roots[i]);
}
}
MPI_Finalize();
}