我的并行编程07 mpi并行归并算法
程序员文章站
2022-07-12 21:34:38
...
这里用了二分法和归并排序
#include <stdio.h>
#include<stdlib.h>
#include<mpi.h>
double* readata(double* data, int n, int my_rank, int comm_sz)
{
if (my_rank == 0)
{
FILE* fp;
fp = fopen("in.txt", "r");
if (fp == NULL)
{
printf("open failed");
exit(0);
}
int i = 0;
data = (double*)malloc(sizeof(double)*n);
for (i = 0; i < n; i++)fscanf(fp, "%le", &data[i]);
//printf("process %d readata\n", my_rank);
}
return data;
}
double* scatterdata(double* data, int n, double* local_data, int* local_n, int my_rank, int comm_sz, MPI_Comm comm)
{
if (n%comm_sz == 0)
{
//printf("zhengchu ok\n");
*local_n = n / comm_sz;
local_data = (double*)malloc(sizeof(double)*(*local_n));
MPI_Scatter(data, *local_n, MPI_DOUBLE, local_data, *local_n, MPI_DOUBLE, 0, comm);
}
else
{
//printf("no zhengchu%d\n", my_rank);
//printf("n is %ld\n", n);
*local_n = n / comm_sz;
if (my_rank == comm_sz - 1) (*local_n) += n % comm_sz;
local_data = (double*)malloc(sizeof(double)*(*local_n));
int* send_count=NULL;
int* displ=NULL;
//output(data, n, my_rank);
if (my_rank == 0)
{
send_count = (int*)malloc(sizeof(int)*comm_sz);//放每个进程要获得的数据量
displ = (int*)malloc(sizeof(int)*comm_sz);//放每个进程要读取的第一个数据在data中的位置
int flag = 0;
for (int i = 0; i < comm_sz; i++)
{
send_count[i] = n / comm_sz;
if (i == 0)flag = 0;
else flag += send_count[i - 1];
displ[i] = flag;
if (i == comm_sz - 1)send_count[i] += n % comm_sz;
//printf("process%d send_count[%d] is %d\n", my_rank, i, send_count[i]);
}
//printf("first %d\n", send_count[0]);
//output(send_count,comm_sz, my_rank);
}
//printf("qian%d,%d\n", my_rank,*local_n);
MPI_Scatterv(data, send_count, displ, MPI_DOUBLE, local_data, *local_n, MPI_DOUBLE, 0, comm);
//printf("hou%d,%d\n", my_rank, *local_n);
//output(local_data, *local_n, my_rank);
if (my_rank == 0)
{
free(send_count);
free(displ);
free(data);
}
}
return local_data;
}
double* mergesort_parallel(double* local_data, int* local_n, double* local_partner_data, int local_partner_n)
{
int n = *local_n + local_partner_n, i = 0, j = 0, k = 0;
double* alldata = (double*)malloc(sizeof(double)*n);
while (i < *local_n && j < local_partner_n)
{
if (local_data[i] <= local_partner_data[j])alldata[k++] = local_data[i++];
else alldata[k++] = local_partner_data[j++];
}
while (i < *local_n)alldata[k++] = local_data[i++];
while (j < local_partner_n)alldata[k++] = local_partner_data[j++];
free(local_data);
free(local_partner_data);
*local_n = n;
return alldata;
}
void merge(double* a, int l1, int r1, int l2, int r2)
{
int i = l1, j = l2;
double* temp;
int index = 0;
temp = (double*)malloc(sizeof(double)*(r1 - l1 + r2 - l2 + 2));
while (i <= r1 && j <= r2)
{
if (a[i] <= a[j])
{
temp[index++] = a[i++];
}
else
{
temp[index++] = a[j++];
}
}
while (i <= r1) temp[index++] = a[i++];
while (j <= r2) temp[index++] = a[j++];
for (i = 0; i < index; i++)
{
a[l1 + i] = temp[i];
}
free(temp);
}
void mergesort(double *a,int left, int right)
{
if (left < right)
{
int mid = (left + right) / 2;
mergesort(a, left, mid);
mergesort(a, mid + 1, right);
merge(a, left, mid, mid + 1, right);
}
}
double* sortdata(double* local_data, int* local_n, int my_rank, int comm_sz, MPI_Comm comm)
{
mergesort(local_data, 0, *local_n - 1);
//printf("process %d sortdata\n", my_rank);
int last = comm_sz - 1, mid;
while (last != 0)
{
if (last % 2 == 0)
{
mid = last / 2;
if (my_rank > mid)
{
MPI_Send(local_n, 1, MPI_INT, last-my_rank, 0, comm);
MPI_Send(local_data, *local_n, MPI_DOUBLE,last- my_rank , 0, comm);
}
else if (my_rank < mid)
{
int local_partner_n;
MPI_Recv(&local_partner_n, 1, MPI_INT, last - my_rank, 0, comm, MPI_STATUS_IGNORE);
if (my_rank == 0)printf("process %d local_partner_n is %ld\n", my_rank, local_partner_n);
double* local_partner_data = (double*)malloc(sizeof(double)*local_partner_n);
MPI_Recv(local_partner_data, local_partner_n, MPI_DOUBLE, last - my_rank, 0, comm, MPI_STATUS_IGNORE);
local_data = mergesort_parallel(local_data, local_n, local_partner_data, local_partner_n);
if (my_rank == 0)printf("new local_n is ");
printf("here\n");
}
last = mid;
}
else
{
mid = (last + 1) / 2;
if (my_rank >= mid)
{
MPI_Send(local_n, 1, MPI_INT, last - my_rank, 0, comm);
MPI_Send(local_data, *local_n, MPI_DOUBLE, last - my_rank, 0, comm);
}
else
{
int local_partner_n;
MPI_Recv(&local_partner_n, 1, MPI_INT, last - my_rank, 0, comm, MPI_STATUS_IGNORE);
//printf("my partner number is %d\n", local_partner_n);
double* local_partner_data = (double*)malloc(sizeof(double)*local_partner_n);
MPI_Recv(local_partner_data, local_partner_n, MPI_DOUBLE, last - my_rank, 0, comm, MPI_STATUS_IGNORE);
local_data = mergesort_parallel(local_data, local_n, local_partner_data, local_partner_n);
}
last = mid - 1;
}
if (my_rank > last)break;//如果我的进程是用过的,那就不要再使用了
}
//if (my_rank == 0)printf("process 0 local_n%d\n", *local_n);
return local_data;
}
void writedata(double* local_data,int local_n, int my_rank, int comm_sz)
{
if (my_rank == 0)
{
FILE* fp;
fp = fopen("output.txt", "w");
if (fp == NULL)
{
printf("open failed");
exit(0);
}
int i = 0;
while (local_n--)fprintf(fp, "%f\n", local_data[i++]);
//printf("process %d writedata\n", my_rank);
}
}
void output(double* local_data, int local_n,int my_rank)
{
if (local_data == NULL)
{
printf("process %d is NULL\n", my_rank);
return;
}
for (int i = 0; i < local_n; i++)
{
printf("process %d local_n-----------%d\n", my_rank,local_data[i]);
//printf("%f\n", local_data[i]);
}
}
int main()
{
int my_rank, comm_sz;
MPI_Init(NULL, NULL);
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
MPI_Comm_size(MPI_COMM_WORLD, &comm_sz);
int n, local_n;
double* data = NULL, *local_data = NULL;
n = 100000;
data=readata(data, n, my_rank, comm_sz);
//printf("hehe%d\n",my_rank);
MPI_Barrier(MPI_COMM_WORLD);
//printf("haha%d\n",my_rank);
double local_start = MPI_Wtime();
local_data=scatterdata(data, n, local_data, &local_n, my_rank, comm_sz, MPI_COMM_WORLD); //千万不要忘记传递通信子
//printf("after scatter process %d local_n is %d\n", my_rank, local_n);
//output(local_data, local_n, my_rank);
local_data=sortdata(local_data, &local_n, my_rank, comm_sz, MPI_COMM_WORLD);
//printf("process %d sortdata\n", my_rank);
//output(local_data, local_n, my_rank);
double local_finish = MPI_Wtime();
double local_elapsed = local_finish - local_start;
double elapsed;
MPI_Reduce(&local_elapsed, &elapsed, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
if (my_rank == 0)printf("elapsed time is %e s\n", elapsed);
writedata(local_data, local_n, my_rank, comm_sz);
MPI_Finalize();
return 0;
}
上一篇: 并行编程之MPI使用简析(1)
下一篇: DP数字三角形