欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

并行程序设计 MPI实现矩阵乘法(按行并行,分块并行)

程序员文章站 2022-07-12 21:11:16
...

最简单的实现-按行并行

主进程不参与计算,只负责分发和收集数据。在主进程中,A按行划分为大致相等的部分,然后将部分的A和全部的B传递给子进程。子进程计算部分乘法并返回结果,主进程收集并报告结果。
使用最简单的Send和Recv进行通信。

#include <mpi.h>
#include <iostream>
#include <cstdlib>
#include <ctime>
using namespace std;
const int N = 317; // 随便写的,随机数位于[0, N]之间

void matGene(int *A, int row, int column){
    srand(time(NULL));
    for (int i = 0; i < row; i++)
        for (int j = 0; j < column; j++)
            A[i * row + j] = rand() % N; //A[i][j]
}

void matMulti(int *A, int *B, int*C, int row, int n){
    for (int i = 0; i < row; i++){
        for (int j = 0; j < n; j++){
            C[i*n + j] = 0;
            for (int k = 0; k < n; k++) 
                C[i*n + j] = A[i*n + k] * B[k*n + j];
        }
    }
}

int main(int argc, char *argv[]){
    // Sorry but Only Deal With Square Matrixs

    // To Run: 
    // mpicxx matrix_multi.cpp 
    // mpiexec -n comm_sz ./a.out mat_dim

    // Calculate Parameters Definition
    int n = atoi(argv[1]); // matrix dimension
    int beginRow, endRow; // the range of rows calculating in certain process
    double beginTime, endTime; // time record

    // MPI Common Head
    int my_rank = 0, comm_sz = 0;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
    MPI_Comm_size(MPI_COMM_WORLD, &comm_sz);
    MPI_Status status;

    if (comm_sz == 1){ // no parallel
        // Prepare data
        int* A = new int[n * n];
        int* B = new int[n * n];
        int* C = new int[n * n];
        matGene(A, n, n);
        matGene(B, n, n);

        // Calculate C[i][j] & Time
        beginTime = MPI_Wtime();
        matMulti(A, B, C, n, n);
        endTime = MPI_Wtime();
        cout << "Time: " << endTime - beginTime << endl;

        // End
        delete[] A;
        delete[] B;
        delete[] C;
    }

    else{ // parallel: main process collect the result, others calculate for result of "each_row" rows

        int each_row = n / (comm_sz - 1);
        
        if (my_rank == 0){ // process 0: main process, data distribution & collect calculate results
            // Prepare data
            int* A = new int[n * n];
            int* B = new int[n * n];
            int* C = new int[n * n];
            matGene(A, n, n);
            matGene(B, n, n);  

            beginTime = MPI_Wtime();

            // Send: A[beginRow:endRow, :], whole B
            // beginRow = each_row * (my_rank-1), endRow = each_row * my_rank;
            for (int i = 0; i < comm_sz-1; i++){
                beginRow = each_row * i, endRow = each_row * (i + 1);
                MPI_Send(&A[beginRow * n + 0], each_row * n, MPI_INT, i + 1, 1, MPI_COMM_WORLD);
                MPI_Send(&B[0 * n + 0], n * n, MPI_INT, i + 1, 2, MPI_COMM_WORLD);
                // cout << "I am alive in sending data to process " << i << endl;
            }

            // Recv: C[beginRow:endRow, :]
            for (int i = 0; i < comm_sz-1; i++){
                MPI_Recv(&C[beginRow * n + 0], each_row*n, MPI_INT, i + 1, 3, MPI_COMM_WORLD, &status);
                // cout << "I am alive in receiving data from process " << i << endl;
            }

            endTime = MPI_Wtime();
            cout << "Time: " << endTime - beginTime << endl;
     
            delete[] A;
            delete[] B;
            delete[] C;
        }
     
        if (my_rank != 0){ // other processes: calculate A * B

            // beginRow = each_row * (my_rank-1), endRow = each_row * my_rank;

            int* A = new int[each_row * n]; // A[beginRow:endRow, :]
            int* B = new int[n * n];
            int* C = new int[each_row * n]; // C[beginRow:endRow, :]
            
            MPI_Recv(&A[0 * n + 0], each_row * n, MPI_INT, 0, 1, MPI_COMM_WORLD, &status);//从A[0][0]和B[0][0]开始接受
            MPI_Recv(&B[0 * n + 0], n * n, MPI_INT, 0, 2, MPI_COMM_WORLD, &status);

            matMulti(A, B, C, each_row, n);


            // cout << "Hello and I am process " << my_rank << endl;

            MPI_Send(&C[0 * n + 0], each_row*n, MPI_INT, 0, 3, MPI_COMM_WORLD);//起始地址是C[my_rank-1][0],大小是each_row*n

            delete[] A;
            delete[] B;
            delete[] C;
        }
    }

    MPI_Finalize();
    return 0;
}

第一次改进-用Scatter, Gather, Bcast分发数据

A平均分配给各个子进程,用Scatter分发更合适;B要传递给每个子进程,用Broadcast(Bcast)通信更合适。收集计算结果使用Gather.
注意:
1.改用Scatter分配数据后,每个进程分配的部分矩阵具有完全相等的规模。因此。记comm_sz为进程数,矩阵的维度需要是comm_sz的倍数。我们将矩阵的维度扩展到comm_sz的倍数,多余的部分用0填充,保证正确性。

if(n % comm_sz != 0){
    n -= n % comm_sz;
    n += comm_sz;
}

2.改用Scatter分配数据后,计算任务平均地分配给每一个进程,所以主进程不仅要分发收集结果,也要参与计算。起初我没有注意到这个情况,而将矩阵按(comm_sz-1)的倍数填充维度,导致分发的不平均。如果分发的行数不够,就不能保证结果正确;如果分发的行数超出,就会出现很多不同类型的内存错误(大多都源于free时内存泄漏等原因)。调这个bug很有意思,在调整指令执行顺序(比如delete的先后),调整进程数和矩阵维度时都会报不同类型的错误信息,使人误入歧途。

#include <mpi.h>
#include <iostream>
#include <cstdlib>
#include <ctime>
using namespace std;

// To Run: 
// mpicxx matrix_multi_improvement1.cpp 
// mpiexec -n 4 ./a.out 64

// Improvement 1: 
// Matrix A: different processes deal with different components, so Scatter instead of Send/Recv
// Matrix B: Shared by all processes, so Bcast instead of Send/Recv
// Matrix C: receive different components from different processes, so Gather instead of Send/Recv
// Main process should also involve in calculation

void matGene(int *A, int size, int actual_size){
    // actual size: the matrix we use may have a larger dimension than n * n
    for (int i = 0; i < actual_size; i++){
        for (int j = 0; j < actual_size; j++){
            if(i < size && j < size) A[i * actual_size + j] = rand() % 5; //A[i][j]
            else A[i * actual_size + j] = 0;
        }
    }
}

void matMulti(int *A, int *B, int*C, int row, int n){
    for (int i = 0; i < row; i++){
        for (int j = 0; j < n; j++){
            C[i*n + j] = 0;
            for (int k = 0; k < n; k++) 
                C[i*n + j] += A[i*n + k] * B[k*n + j];
        }
    }
}

int main(int argc, char *argv[]){
    // Only Deal With Square Matrixs

    // Calculate Parameters Definition
    int n = atoi(argv[1]); // matrix dimension
    // int beginRow, endRow; // the range of rows calculating in certain process
    double beginTime, endTime; // time record
    srand(time(NULL));

    // MPI Common Head
    int my_rank = 0, comm_sz = 0;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
    MPI_Comm_size(MPI_COMM_WORLD, &comm_sz);
    MPI_Status status;

    if (comm_sz == 1){ // no parallel
        // Prepare data
        int* A = new int[n * n + 2];
        int* B = new int[n * n + 2];
        int* C = new int[n * n + 2];
        int saveN = n;
        matGene(A, saveN, n);
        matGene(B, saveN, n);

        // Calculate C[i][j] & Time
        beginTime = MPI_Wtime();
        matMulti(A, B, C, n, n);
        endTime = MPI_Wtime();
        cout << "Time: " << endTime - beginTime << endl;

        // Output

        cout << "A" << endl;
        for(int i = 0; i < saveN; i++){
            for(int j = 0; j < saveN; j++)
                cout << A[i * n + j] << " ";
            cout << endl;
        }

        cout << "B" << endl;
        for(int i = 0; i < saveN; i++){
            for(int j = 0; j < saveN; j++)
                cout << B[i * n + j] << " ";
            cout << endl;
        }

        cout << "C" << endl;
        for(int i = 0; i < saveN; i++){
            for(int j = 0; j < saveN; j++)
                cout << C[i * n + j] << " ";
            cout << endl;
        }

        delete[] A;
        delete[] B;
        delete[] C;
    }

    else{ // parallel: main process collect the result and also involve in calculation

        int saveN = n;

        // must equal scatter: actual n is bigger than input
        if(n % comm_sz != 0){
            n -= n % comm_sz;
            n += comm_sz;
        }

        int each_row = n / comm_sz;

        // Matrixs
        int* A = new int[n * n + 2];
        int* B = new int[n * n + 2];
        int* C = new int[n * n + 2];
        // beginRow = each_row * (my_rank-1), endRow = each_row * my_rank;
        int* partA = new int[each_row * n + 2]; // A[beginRow:endRow, :]
        int* partC = new int[each_row * n + 2]; // C[beginRow:endRow, :]
        
        if (my_rank == 0){

            // Prepare data
            cout << "n = " << n << endl;
            matGene(A, saveN, n);
            matGene(B, saveN, n);  

            beginTime = MPI_Wtime();

        }

        // data distribution & calculate results & collect 

        // Send: Scatter A, Bcast whole B
        MPI_Scatter(&A[0 * n + 0], each_row * n, MPI_INT, &partA[0 * n + 0], each_row * n, MPI_INT, 0, MPI_COMM_WORLD);
        MPI_Bcast(&B[0 * n + 0], n * n, MPI_INT, 0, MPI_COMM_WORLD);

        // All processes involve calculation
        matMulti(partA, B, partC, each_row, n);

        // Recv: Gather C
        MPI_Gather(&partC[0 * n + 0], each_row * n, MPI_INT, &C[0 * n + 0], each_row * n, MPI_INT, 0, MPI_COMM_WORLD);

        if (my_rank == 0){

            endTime = MPI_Wtime();
            cout << "Time: " << endTime - beginTime << endl;

            // Output
            
            cout << "A" << endl;
            for(int i = 0; i < saveN; i++){
                for(int j = 0; j < saveN; j++)
                    cout << A[i * n + j] << " ";
                cout << endl;
            }   

            cout << "B" << endl;
            for(int i = 0; i < saveN; i++){
                for(int j = 0; j < saveN; j++)
                    cout << B[i * n + j] << " ";
                cout << endl;
            }   

            cout << "C" << endl;
            for(int i = 0; i < saveN; i++){
                for(int j = 0; j < saveN; j++)
                    cout << C[i * n + j] << " ";
                cout << endl;
            }
        
        }

        // if(my_rank != 0)
        if(true){
            delete[] A;
            delete[] B;
            delete[] C;
            delete[] partA;
            delete[] partC;
        }
        else{
            delete[] A;
            cout << "Delete A in " << my_rank << endl;
            delete[] B;
            cout << "Delete B in " << my_rank << endl;
            delete[] C;
            cout << "Delete C in " << my_rank << endl;
            delete[] partA;
            cout << "Delete partA in " << my_rank << endl;
            delete[] partC; 
            cout << "Delete partC in " << my_rank << endl;
        }

    }

    MPI_Finalize();
    return 0;
}

第二次改进-实现分块乘法

将可用于计算的进程数(comm_sz-1)分解为a*b,然后将全体行划分为a个部分,全体列划分为b个部分,从而将整个矩阵划分为size相同的(comm_sz-1)个块。每个子进程负责计算最终结果的一块,只需要接收A对应范围的行和B对应范围的列,而不需要把整个矩阵传过去。主进程负责分发和汇总结果。
注意:
1.为了保证平均划分,矩阵仍需要扩展,具体操作同第一次改进。
2.第二次改进继承的是最初版本,通信接口仍然使用Send/Recv,所以进程编号要管理好。另外,因为主进程不能和自己通信,所以主进程没有参与局部计算,这点和第一次改进不同。

#include <mpi.h>
#include <iostream>
#include <cstdlib>
#include <ctime>
#include <cmath>
using namespace std;

// To Run: 
// mpicxx matrix_multi_improvement2.cpp 
// mpiexec -n 4 ./a.out 64

// Improvement 2: Block Multiplication, based on Original
// Main process: process 0, data distribution & collect calculate results, no calculation
// Others: calculate for a block of A * B

void matGene(int *A, int size, int actual_size){
    // actual size: the matrix we use may have a larger dimension than n * n
    for (int i = 0; i < actual_size; i++){
        for (int j = 0; j < actual_size; j++){
            if(i < size && j < size) A[i * actual_size + j] = 1; //A[i][j]
            else A[i * actual_size + j] = 0;
        }
    }
}

void matMulti(int *A, int *B, int*C, int m, int n, int p){
    for (int i = 0; i < m; i++){
        for (int j = 0; j < p; j++){
            C[i*p + j] = 0;
            for (int k = 0; k < n; k++) 
                C[i*p + j] += A[i*n + k] * B[k*p + j];
        }
    }
}

int factor(int n){
    // return the largest factor that is not larger than sqrt(n)
    double sqr_root = sqrt(n);
    for(int i = sqr_root; i >= 1; i--){
        if(n % i == 0) return i;
    } 
}

int main(int argc, char *argv[]){
    // Only Deal With Square Matrixs

    // Calculate Parameters Definition
    int n = atoi(argv[1]); // matrix dimension
    // int beginRow, endRow; // the range of rows calculating in certain process
    double beginTime, endTime; // time record
    srand(time(NULL));

    // MPI Common Head
    int my_rank = 0, comm_sz = 0;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
    MPI_Comm_size(MPI_COMM_WORLD, &comm_sz);
    MPI_Status status;

    if (comm_sz == 1){ // no parallel
        // Prepare data
        int* A = new int[n * n + 2];
        int* B = new int[n * n + 2];
        int* C = new int[n * n + 2];
        int saveN = n;
        matGene(A, saveN, n);
        matGene(B, saveN, n);

        // Calculate C[i][j] & Time
        beginTime = MPI_Wtime();
        matMulti(A, B, C, n, n, n);
        endTime = MPI_Wtime();
        cout << "Time: " << endTime - beginTime << endl;

        // Output

        cout << "A" << endl;
        for(int i = 0; i < saveN; i++){
            for(int j = 0; j < saveN; j++)
                cout << A[i * n + j] << " ";
            cout << endl;
        }

        cout << "B" << endl;
        for(int i = 0; i < saveN; i++){
            for(int j = 0; j < saveN; j++)
                cout << B[i * n + j] << " ";
            cout << endl;
        }

        cout << "C" << endl;
        for(int i = 0; i < saveN; i++){
            for(int j = 0; j < saveN; j++)
                cout << C[i * n + j] << " ";
            cout << endl;
        }

        delete[] A;
        delete[] B;
        delete[] C;
    }

    else{ // parallel: main process collect the result and also involve in calculation

        int saveN = n;

        // must equal scatter: actual n is bigger than input
        if(n % (comm_sz - 1) != 0){
            n -= n % (comm_sz - 1);
            n += (comm_sz - 1);
        }

        int a = (comm_sz - 1) / factor(comm_sz - 1);
        int b = factor(comm_sz - 1);
        int each_row = n / a;
        int each_column = n / b;
        if(my_rank == 0) cout << each_row << "\t" << each_column << endl;
        
        if (my_rank == 0){

            int* A = new int[n * n + 2];
            int* B = new int[n * n + 2];
            int* C = new int[n * n + 2];

            // Prepare data
            cout << "n = " << n << endl;
            matGene(A, saveN, n);
            matGene(B, saveN, n);  

            beginTime = MPI_Wtime(); 

            // include begin not end
            // beginRow = ((my_rank - 1) / b) * each_row, endRow = beginRow + each_row;
            // beginColumn = ((my_rank - 1) % b) * each_column, endColumn = beginColumn + each_column;
            
            // Send: A[beginRow:endRow, :], B[:, beginColumn:endColumn]
            for(int i = 1; i < comm_sz; i++){
                int beginRow = ((i - 1) / b) * each_row;
                int beginColumn = ((i - 1) % b) * each_column;
                // A: beginRow ~ endRow
                MPI_Send(&A[beginRow * n + 0], each_row * n, MPI_INT, i, i, MPI_COMM_WORLD);
                // B: n times, once a row
                for(int j = 0; j < n; j++){
                    MPI_Send(&B[j * n + beginColumn], each_column, MPI_INT, i, i * n + j + comm_sz + 2, MPI_COMM_WORLD);;
                } 
            }

            // Recv: C[beginRow:endRow, beginColumn:endColumn]
            for (int i = 1; i < comm_sz; i++){
                int beginRow = ((i - 1) / b) * each_row;
                int endRow = beginRow + each_row;
                int beginColumn = ((i - 1) % b) * each_column;
                for(int j = beginRow; j < endRow; j++){
                    MPI_Recv(&C[j * n + beginColumn], each_column, MPI_INT, i, each_row * i + (j - beginRow), MPI_COMM_WORLD, &status);
                }
            }

            endTime = MPI_Wtime();
            cout << "Time: " << endTime - beginTime << endl;

            // Output
            
            cout << "A" << endl;
            for(int i = 0; i < saveN; i++){
                for(int j = 0; j < saveN; j++)
                    cout << A[i * n + j] << " ";
                cout << endl;
            }   

            cout << "B" << endl;
            for(int i = 0; i < saveN; i++){
                for(int j = 0; j < saveN; j++)
                    cout << B[i * n + j] << " ";
                cout << endl;
            }   

            cout << "C" << endl;
            for(int i = 0; i < saveN; i++){
                for(int j = 0; j < saveN; j++)
                    cout << C[i * n + j] << " ";
                cout << endl;
            }

            delete[] A;
            delete[] B;
            delete[] C;
        
        }

        else{

            int* partA = new int[each_row * n + 2]; // A[beginRow:endRow, :]
            int* partB = new int[n * each_column + 2]; // B[:, beginColumn:endColumn]
            int* partC = new int[each_row * each_column + 2]; // C[beginRow:endRow, beginColumn:endColumn]

            // include begin not end
            // beginRow = ((my_rank - 1) / b) * each_row, endRow = beginRow + each_row;
            // beginColumn = ((my_rank - 1) % b) * each_column, endColumn = beginColumn + each_column;

            // Recv: partA, partB
            MPI_Recv(&partA[0 * n + 0], each_row * n, MPI_INT, 0, my_rank, MPI_COMM_WORLD, &status);
            for(int j = 0; j < n; j++){
                MPI_Recv(&partB[j * each_column + 0], each_column, MPI_INT, 0, my_rank * n + j + comm_sz + 2, MPI_COMM_WORLD, &status);
            }
            
            matMulti(partA, partB, partC, each_row, n, each_column);    

            // Send: partC
            for(int j = 0; j < each_row; j++){
                MPI_Send(&partC[j * each_column + 0], each_column, MPI_INT, 0, each_row * my_rank + j, MPI_COMM_WORLD);
            } 

            delete[] partA;
            delete[] partB;
            delete[] partC;
        }

    }

    MPI_Finalize();
    return 0;
}