欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

我的并行编程07 mpi并行归并算法

程序员文章站 2022-07-12 21:34:38
...

这里用了二分法和归并排序

						


#include <stdio.h>
#include<stdlib.h>
#include<mpi.h>


double* readata(double* data, int n, int my_rank, int comm_sz)
{
	if (my_rank == 0)
	{
		FILE* fp;
		fp = fopen("in.txt", "r");
		if (fp == NULL)
		{
			printf("open failed");
			exit(0);
		}
		int i = 0;
		data = (double*)malloc(sizeof(double)*n);
		for (i = 0; i < n; i++)fscanf(fp, "%le", &data[i]);
		//printf("process %d readata\n", my_rank);

	}
	return data;



}
double* scatterdata(double* data, int n, double* local_data, int* local_n, int my_rank, int comm_sz, MPI_Comm comm)
{
	
	if (n%comm_sz == 0)
	{
		//printf("zhengchu ok\n");
		*local_n = n / comm_sz;
		local_data = (double*)malloc(sizeof(double)*(*local_n));
		MPI_Scatter(data, *local_n, MPI_DOUBLE, local_data, *local_n, MPI_DOUBLE, 0, comm);

	}
	else
	{
		//printf("no zhengchu%d\n", my_rank);
		//printf("n is %ld\n", n);
		*local_n = n / comm_sz;
		if (my_rank == comm_sz - 1)  (*local_n) += n % comm_sz;
		local_data = (double*)malloc(sizeof(double)*(*local_n));
		int* send_count=NULL;
		int* displ=NULL;
		
		
		//output(data, n, my_rank);
		if (my_rank == 0)
		{
			send_count = (int*)malloc(sizeof(int)*comm_sz);//放每个进程要获得的数据量
			displ = (int*)malloc(sizeof(int)*comm_sz);//放每个进程要读取的第一个数据在data中的位置 
			int flag = 0;
			for (int i = 0; i < comm_sz; i++)
			{
				send_count[i] = n / comm_sz;

				if (i == 0)flag = 0;
				else flag += send_count[i - 1];
				displ[i] = flag;
				if (i == comm_sz - 1)send_count[i] += n % comm_sz;
				//printf("process%d send_count[%d] is %d\n", my_rank, i, send_count[i]);

			}
			//printf("first %d\n", send_count[0]);
			//output(send_count,comm_sz, my_rank);
		}
		//printf("qian%d,%d\n", my_rank,*local_n);
		MPI_Scatterv(data, send_count, displ, MPI_DOUBLE, local_data, *local_n, MPI_DOUBLE, 0, comm);
		
		//printf("hou%d,%d\n", my_rank, *local_n);
		//output(local_data, *local_n, my_rank);
		if (my_rank == 0)
		{
			free(send_count);
			free(displ);
			free(data);
		}
		
	}
	
	
	return local_data;

}


double* mergesort_parallel(double* local_data, int* local_n, double* local_partner_data, int local_partner_n)
{
	int n = *local_n + local_partner_n, i = 0, j = 0, k = 0;
	double* alldata = (double*)malloc(sizeof(double)*n);
	while (i < *local_n && j < local_partner_n)
	{
		if (local_data[i] <= local_partner_data[j])alldata[k++] = local_data[i++];
		else alldata[k++] = local_partner_data[j++];
	}
	while (i < *local_n)alldata[k++] = local_data[i++];
	while (j < local_partner_n)alldata[k++] = local_partner_data[j++];
	free(local_data);
	free(local_partner_data);
	*local_n = n;
	
	return alldata;
	



}


void merge(double* a, int l1, int r1, int l2, int r2)
{
	int i = l1, j = l2;
	double* temp;
	int index = 0;
	temp = (double*)malloc(sizeof(double)*(r1 - l1 + r2 - l2 + 2));
	while (i <= r1 && j <= r2)
	{
		if (a[i] <= a[j])
		{
			temp[index++] = a[i++];
		}
		else
		{
			temp[index++] = a[j++];
		}
	}
	while (i <= r1) temp[index++] = a[i++];
	while (j <= r2) temp[index++] = a[j++];
	for (i = 0; i < index; i++)
	{
		a[l1 + i] = temp[i];
	}
	free(temp);
}

void mergesort(double *a,int left, int right)
{
	if (left < right)
	{
		int mid = (left + right) / 2;
		mergesort(a, left, mid);
		mergesort(a, mid + 1, right);
		merge(a, left, mid, mid + 1, right);
	}
}







double* sortdata(double* local_data, int* local_n, int my_rank, int comm_sz, MPI_Comm comm)
{
	mergesort(local_data, 0, *local_n - 1);
	//printf("process %d sortdata\n", my_rank);
	int last = comm_sz - 1, mid;
	while (last != 0)
	{
		if (last % 2 == 0)
		{
			mid = last / 2;
			if (my_rank > mid)
			{
				MPI_Send(local_n, 1, MPI_INT, last-my_rank, 0, comm);
				MPI_Send(local_data, *local_n, MPI_DOUBLE,last- my_rank , 0, comm);
			}
			else if (my_rank < mid)
			{
				int local_partner_n;
				MPI_Recv(&local_partner_n, 1, MPI_INT, last - my_rank, 0, comm, MPI_STATUS_IGNORE);
				if (my_rank == 0)printf("process %d local_partner_n is %ld\n", my_rank, local_partner_n);
				double* local_partner_data = (double*)malloc(sizeof(double)*local_partner_n);
				MPI_Recv(local_partner_data, local_partner_n, MPI_DOUBLE, last - my_rank, 0, comm, MPI_STATUS_IGNORE);
				local_data = mergesort_parallel(local_data, local_n, local_partner_data, local_partner_n);
				if (my_rank == 0)printf("new local_n is ");
				printf("here\n");
			}
			last = mid;
		}
		else
		{
			mid = (last + 1) / 2;
			if (my_rank >= mid)
			{
				MPI_Send(local_n, 1, MPI_INT, last - my_rank, 0, comm);
				MPI_Send(local_data, *local_n, MPI_DOUBLE, last - my_rank, 0, comm);
			}
			else
			{
				int local_partner_n;
				MPI_Recv(&local_partner_n, 1, MPI_INT, last - my_rank, 0, comm, MPI_STATUS_IGNORE);
				//printf("my partner number is %d\n", local_partner_n);
				double* local_partner_data = (double*)malloc(sizeof(double)*local_partner_n);
				MPI_Recv(local_partner_data, local_partner_n, MPI_DOUBLE, last - my_rank, 0, comm, MPI_STATUS_IGNORE);
				
				local_data = mergesort_parallel(local_data, local_n, local_partner_data, local_partner_n);
				
				
			}
			last = mid - 1;
		}
		

		if (my_rank > last)break;//如果我的进程是用过的,那就不要再使用了 
	}
	//if (my_rank == 0)printf("process 0 local_n%d\n", *local_n);
	return local_data;

}

void writedata(double* local_data,int local_n, int my_rank, int comm_sz)
{
	if (my_rank == 0)
	{
		FILE* fp;
		fp = fopen("output.txt", "w");
		if (fp == NULL)
		{
			printf("open failed");
			exit(0);
		}
		int i = 0;
		while (local_n--)fprintf(fp, "%f\n", local_data[i++]);
		//printf("process %d writedata\n", my_rank);
	}
}
void output(double* local_data, int local_n,int my_rank)
{
	if (local_data == NULL)
	{
		printf("process %d is NULL\n", my_rank);
		return;
	}
	for (int i = 0; i < local_n; i++)
	{
		printf("process %d local_n-----------%d\n", my_rank,local_data[i]);
		//printf("%f\n", local_data[i]);
	}
	
}


int main()
{
	int my_rank, comm_sz;
	MPI_Init(NULL, NULL);
	MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
	MPI_Comm_size(MPI_COMM_WORLD, &comm_sz);
	int n, local_n;
	double* data = NULL, *local_data = NULL;
	n = 100000;
	data=readata(data, n, my_rank, comm_sz);

	//printf("hehe%d\n",my_rank);
	MPI_Barrier(MPI_COMM_WORLD);
	//printf("haha%d\n",my_rank);
	double local_start = MPI_Wtime();
	
	local_data=scatterdata(data, n, local_data, &local_n, my_rank, comm_sz, MPI_COMM_WORLD); //千万不要忘记传递通信子 
	//printf("after scatter process %d local_n is %d\n", my_rank, local_n);
	//output(local_data, local_n, my_rank);
	local_data=sortdata(local_data, &local_n, my_rank, comm_sz, MPI_COMM_WORLD);
	//printf("process %d sortdata\n", my_rank);
	//output(local_data, local_n, my_rank);
	
	double local_finish = MPI_Wtime();
	double local_elapsed = local_finish - local_start;
	double elapsed;
	MPI_Reduce(&local_elapsed, &elapsed, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
	if (my_rank == 0)printf("elapsed time is %e s\n", elapsed);


	writedata(local_data, local_n, my_rank, comm_sz);
	MPI_Finalize();
	return 0;

}