source: CIVL/examples/mpi-omp/AMG2013/utilities/exchange_data.c

main
Last change on this file was ea777aa, checked in by Alex Wilton <awilton@…>, 3 years ago

Moved examples, include, build_default.properties, common.xml, and README out from dev.civl.com into the root of the repo.

git-svn-id: svn://vsl.cis.udel.edu/civl/trunk@5704 fb995dde-84ed-4084-dfe6-e5aef3e2452c

  • Property mode set to 100644
File size: 19.8 KB
Line 
1/*BHEADER**********************************************************************
2 * Copyright (c) 2008, Lawrence Livermore National Security, LLC.
3 * Produced at the Lawrence Livermore National Laboratory.
4 * This file is part of HYPRE. See file COPYRIGHT for details.
5 *
6 * HYPRE is free software; you can redistribute it and/or modify it under the
7 * terms of the GNU Lesser General Public License (as published by the Free
8 * Software Foundation) version 2.1 dated February 1999.
9 *
10 * $Revision: 2.4 $
11 ***********************************************************************EHEADER*/
12
13/* see exchange_data.README for additional information */
14/* AHB 6/04 */
15
16#include <stdlib.h>
17#include <stdio.h>
18#include <math.h>
19
20#include "utilities.h"
21
22
23/*---------------------------------------------------
24 * hypre_CreateBinaryTree()
25 * its children and parent processor ids)
26 *----------------------------------------------------*/
27
28int hypre_CreateBinaryTree(int myid, int num_procs, hypre_BinaryTree *tree)
29{
30
31 int i, proc, size=0;
32 int ierr = 0;
33 int *tmp_child_id;
34 int num=0, parent = 0;
35
36 /* initialize*/
37 proc = myid;
38
39
40
41 /*how many children can a processor have?*/
42 for (i = 1; i < num_procs; i *= 2)
43 {
44 size++;
45 }
46
47 /* allocate space */
48 tmp_child_id = hypre_TAlloc(int, size);
49
50 /* find children and parent */
51 for (i = 1; i < num_procs; i *= 2)
52 {
53 if ( (proc % 2) == 0)
54 {
55 if( (myid + i) < num_procs )
56 {
57 tmp_child_id[num] = myid + i;
58 num++;
59 }
60 proc /= 2;
61 }
62 else
63 {
64 parent = myid - i;
65 break;
66 }
67
68 }
69
70 hypre_BinaryTreeParentId(tree) = parent;
71 hypre_BinaryTreeNumChild(tree) = num;
72 hypre_BinaryTreeChildIds(tree) = tmp_child_id;
73
74 return(ierr);
75
76}
77
78/*---------------------------------------------------
79 * hypre_DestroyBinaryTree()
80 * Destroy storage created by createBinaryTree
81 *----------------------------------------------------*/
82int hypre_DestroyBinaryTree(hypre_BinaryTree *tree)
83{
84
85 int ierr = 0;
86
87 hypre_TFree(hypre_BinaryTreeChildIds(tree));
88
89 return ierr;
90}
91
92
93/*---------------------------------------------------
94 * hypre_DataExchangeList()
95 * This function is for sending a list of messages ("contacts" to
96 * a list of processors. The receiving processors
97 * do not know how many messages they are getting. The
98 * sending process expects a "response" (either a confirmation or
99 * some sort of data back from the receiving processor).
100 *
101 *----------------------------------------------------*/
102
103
104
105/* should change to where the buffers for sending and receiving are voids
106 instead of ints - then cast accordingly */
107
108int hypre_DataExchangeList(int num_contacts,
109 int *contact_proc_list, void *contact_send_buf,
110 int *contact_send_buf_starts, int contact_obj_size,
111 int response_obj_size,
112 hypre_DataExchangeResponse *response_obj, int max_response_size,
113 int rnum, MPI_Comm comm, void **p_response_recv_buf,
114 int **p_response_recv_buf_starts)
115{
116
117
118 /*-------------------------------------------
119 * parameters:
120 *
121 * num_contacts = how many procs to contact
122 * contact_proc_list = list of processors to contact
123 * contact_send_buf = array of data to send
124 * contact_send_buf_starts = index for contact_send_buf corresponding to
125 * contact_proc_list
126 * contact_obj_size = sizeof() one obj in contact list
127
128 * response_obj_size = sizeof() one obj in response_recv_buf
129 * response_obj = this will give us the function we need to
130 * fill the reponse as well as
131 * any data we might need to accomplish that
132 * max_response_size = max size of a single response expected (do NOT
133 * need to be an absolute upper bound)
134 * rnum = two consequentive exchanges should have different
135 * rnums. Alternate rnum = 1
136 * and rnum=2 - these flags will be even (so odd
137 * numbered tags could be used in calling code)
138 * p_response_recv_buf = where to receive the reponses - will be allocated
139 * in this function
140 * p_response_recv_buf_starts = index of p_response_buf corresponding to
141 * contact_buf_list - will be allocated here
142
143 *-------------------------------------------*/
144
145 int num_procs, myid;
146 int ierr = 0;
147 int i;
148 int terminate, responses_complete;
149 int children_complete;
150 int contact_flag;
151 int proc;
152 int contact_size;
153
154 int size, post_size, copy_size;
155 int total_size, count;
156
157 void *start_ptr = NULL, *index_ptr=NULL;
158 int *int_ptr=NULL;
159
160 void *response_recv_buf = NULL;
161 void *send_response_buf = NULL;
162
163 int *response_recv_buf_starts = NULL;
164 void *initial_recv_buf = NULL;
165
166 void *recv_contact_buf = NULL;
167 int recv_contact_buf_size = 0;
168
169 int response_message_size = 0;
170
171 int overhead;
172
173 int max_response_size_bytes;
174
175 int max_response_total_bytes;
176
177 void **post_array = NULL; /*this must be set to null or realloc will crash */
178 int post_array_storage = 0;
179 int post_array_size = 0;
180 int num_post_recvs =0;
181
182 void **contact_ptrs = NULL, **response_ptrs=NULL, **post_ptrs=NULL;
183
184
185
186 hypre_BinaryTree tree;
187
188 MPI_Request *response_requests, *contact_requests;
189 MPI_Status *response_statuses, *contact_statuses;
190
191 MPI_Request *post_send_requests = NULL, *post_recv_requests = NULL;
192 MPI_Status *post_send_statuses = NULL, *post_recv_statuses = NULL;
193
194 MPI_Request *term_requests, term_request1, request_parent;
195 MPI_Status *term_statuses, term_status1, status_parent;
196 MPI_Status status, fill_status;
197
198
199 const int contact_tag = 1000*rnum;
200 const int response_tag = 1002*rnum;
201 const int term_tag = 1004*rnum;
202 const int post_tag = 1006*rnum;
203
204
205 MPI_Comm_size(comm, &num_procs );
206 MPI_Comm_rank(comm, &myid );
207
208
209 /* ---------initializations ----------------*/
210
211
212
213 /*if the response_obj_size or contact_obj_size is 0, set to sizeof(int) */
214 if (!response_obj_size) response_obj_size = sizeof(int);
215 if (!contact_obj_size) contact_obj_size = sizeof(int);
216
217 max_response_size_bytes = max_response_size*response_obj_size;
218
219
220 /* pre-allocate the max space for responding to contacts */
221 overhead = ceil((double) sizeof(int)/response_obj_size); /*for appending an integer*/
222
223 max_response_total_bytes = (max_response_size+overhead)*response_obj_size;
224
225
226
227 response_obj->send_response_overhead = overhead;
228 response_obj->send_response_storage = max_response_size;
229
230 send_response_buf = hypre_MAlloc(max_response_total_bytes);
231
232 /*allocate space for inital recv array for the responses - give each processor
233 size max_response_size */
234
235 initial_recv_buf = hypre_MAlloc(max_response_total_bytes*num_contacts);
236 response_recv_buf_starts = hypre_CTAlloc(int, num_contacts+1);
237
238
239 contact_ptrs = hypre_TAlloc( void *, num_contacts);
240 response_ptrs = hypre_TAlloc(void *, num_contacts);
241
242
243 /*-------------SEND CONTACTS AND POST RECVS FOR RESPONSES---*/
244
245 for (i=0; i<= num_contacts; i++)
246 {
247 response_recv_buf_starts[i] = i*(max_response_size+overhead);
248 }
249
250 /* Send "contact" messages to the list of processors and
251 pre-post receives to wait for their response*/
252
253 responses_complete = 1;
254 if (num_contacts > 0 )
255 {
256 responses_complete = 0;
257 response_requests = hypre_CTAlloc(MPI_Request, num_contacts);
258 response_statuses = hypre_CTAlloc(MPI_Status, num_contacts);
259 contact_requests = hypre_CTAlloc(MPI_Request, num_contacts);
260 contact_statuses = hypre_CTAlloc(MPI_Status, num_contacts);
261
262 /* post receives - could be confirmation or data*/
263 /* the size to post is max_response_total_bytes*/
264
265 for (i=0; i< num_contacts; i++)
266 {
267 /* response_ptrs[i] = initial_recv_buf + i*max_response_total_bytes ; */
268 response_ptrs[i] = (void *)((char *) initial_recv_buf + i*max_response_total_bytes) ;
269
270 MPI_Irecv(response_ptrs[i], max_response_total_bytes, MPI_BYTE, contact_proc_list[i],
271 response_tag, comm, &response_requests[i]);
272
273 }
274
275
276 /* send out contact messages */
277 start_ptr = contact_send_buf;
278 for (i=0; i< num_contacts; i++)
279 {
280 contact_ptrs[i] = start_ptr;
281 size = contact_send_buf_starts[i+1] - contact_send_buf_starts[i] ;
282 MPI_Isend(contact_ptrs[i], size*contact_obj_size, MPI_BYTE, contact_proc_list[i],
283 contact_tag, comm, &contact_requests[i]);
284 /* start_ptr += (size*contact_obj_size); */
285 start_ptr = (void *) ((char *) start_ptr + (size*contact_obj_size));
286
287
288 }
289 }
290
291
292 /*------------BINARY TREE-----------------------*/
293
294
295 /*initialize for the termination check sweep */
296 children_complete = 1;/*indicates whether we have recv. term messages
297 from our children*/
298
299 if (num_procs > 1) {
300 ierr = hypre_CreateBinaryTree(myid, num_procs, &tree);
301
302 /* we will get a message from all of our children when they
303 have received responses for all of their contacts.
304 So post receives now */
305
306
307 term_requests = hypre_CTAlloc(MPI_Request, tree.num_child);
308 term_statuses = hypre_CTAlloc(MPI_Status, tree.num_child);
309
310 for (i=0; i< tree.num_child; i++)
311 {
312 MPI_Irecv(NULL, 0, MPI_INT, tree.child_id[i], term_tag, comm,
313 &term_requests[i]);
314 }
315
316 terminate = 0;
317
318 children_complete = 0;
319
320 }
321 else if (num_procs ==1 && num_contacts > 0 ) /* added 11/08 */
322 {
323 terminate = 0;
324 }
325
326
327 /*---------PROBE LOOP-----------------------------------------*/
328
329 /*Look for incoming contact messages - don't know how many I will get!*/
330
331 while (!terminate)
332 {
333
334 /* did I receive any contact messages? */
335 MPI_Iprobe(MPI_ANY_SOURCE, contact_tag, comm, &contact_flag, &status);
336
337 while (contact_flag)
338 {
339 /* received contacts - from who and what do we do ?*/
340 proc = status.MPI_SOURCE;
341 MPI_Get_count(&status, MPI_BYTE, &contact_size);
342
343 contact_size = contact_size/contact_obj_size;
344
345
346 /*---------------FILL RESPONSE ------------------------*/
347
348 /*first receive the contact buffer - then call a function
349 to determine how to populate the send buffer for the reponse*/
350
351 /* do we have enough space to recv it? */
352 if(contact_size > recv_contact_buf_size)
353 {
354 recv_contact_buf = hypre_ReAlloc(recv_contact_buf,
355 contact_obj_size*contact_size);
356 recv_contact_buf_size = contact_size;
357 }
358
359
360 /* this must be blocking - can't fill recv without the buffer*/
361 MPI_Recv(recv_contact_buf, contact_size*contact_obj_size, MPI_BYTE, proc,
362 contact_tag, comm, &fill_status);
363
364
365
366 response_obj->fill_response(recv_contact_buf, contact_size, proc, response_obj,
367 comm, &send_response_buf, &response_message_size );
368
369
370 /* we need to append the size of the send obj */
371 /* first we copy out any part that may be needed to send later so we don't overwrite */
372 post_size = response_message_size - max_response_size;
373 if (post_size > 0) /*we will need to send the extra information later */
374 {
375 /*printf("myid = %d, post_size = %d\n", myid, post_size);*/
376
377 if (post_array_size == post_array_storage)
378
379 {
380 /* allocate room for more posts - add 20*/
381 post_array_storage += 20;
382 post_array = hypre_TReAlloc(post_array, void *, post_array_storage);
383 post_send_requests = hypre_TReAlloc(post_send_requests, MPI_Request, post_array_storage);
384 }
385 /* allocate space for the data this post only*/
386 /* this should not happen often (unless a poor max_size has been chosen)
387 - so we will allocate space for the data as needed */
388 size = post_size*response_obj_size;
389 post_array[post_array_size] = hypre_MAlloc(size);
390 /* index_ptr = send_response_buf + max_response_size_bytes */;
391 index_ptr = (void *) ((char *) send_response_buf + max_response_size_bytes);
392
393 memcpy(post_array[post_array_size], index_ptr, size);
394
395 /*now post any part of the message that is too long with a non-blocking send and
396 a different tag */
397
398 MPI_Isend(post_array[post_array_size], size, MPI_BYTE, proc, post_tag,
399 comm, &post_send_requests[post_array_size]);
400
401 post_array_size++;
402 }
403
404 /*now append the size information into the overhead storage */
405 /* index_ptr = send_response_buf + max_response_size_bytes; */
406 index_ptr = (void *) ((char *) send_response_buf + max_response_size_bytes);
407
408 memcpy(index_ptr, &response_message_size, response_obj_size);
409
410
411 /*send the block of data that includes the overhead */
412 /* this is a blocking send - the recv has already been posted */
413 MPI_Send(send_response_buf, max_response_total_bytes,
414 MPI_BYTE, proc, response_tag, comm);
415
416
417 /*--------------------------------------------------------------*/
418
419
420 /* look for any more contact messages*/
421 MPI_Iprobe(MPI_ANY_SOURCE, contact_tag, comm, &contact_flag, &status);
422
423 }
424
425 /* no more contact messages waiting - either
426 (1) check to see if we have received all of our response messages
427 (2) partitcipate in the termination sweep (check for messages from children)
428 (3) participate in termination sweep (check for message from parent)*/
429
430 if (!responses_complete)
431 {
432 ierr = MPI_Testall(num_contacts, response_requests, &responses_complete,
433 response_statuses);
434 if (responses_complete && num_procs == 1) terminate = 1; /*added 11/08 */
435
436 }
437 else if(!children_complete) /* have all of our children received all of their
438 response messages?*/
439 {
440
441
442 ierr = MPI_Testall(tree.num_child, term_requests, &children_complete,
443 term_statuses);
444
445
446
447 /* if we have gotten term messages from all of our children, send a term
448 message to our parent. Then post a receive to hear back from parent */
449 if (children_complete & (myid > 0)) /*root does not have a parent*/
450 {
451
452
453 MPI_Isend(NULL, 0, MPI_INT, tree.parent_id, term_tag, comm, &request_parent);
454
455 MPI_Irecv(NULL, 0, MPI_INT, tree.parent_id, term_tag, comm,
456 &term_request1);
457 }
458 }
459 else /*have we gotten a term message from our parent? */
460 {
461 if (myid == 0) /* root doesn't have a parent */
462 {
463 terminate = 1;
464 }
465 else
466 {
467 MPI_Test(&term_request1, &terminate, &term_status1);
468 }
469 if(terminate) /*tell children to terminate */
470 {
471 if (myid > 0 ) MPI_Wait(&request_parent, &status_parent);
472
473 for (i=0; i< tree.num_child; i++)
474 { /*a blocking send - recv has been posted already*/
475 MPI_Send(NULL, 0, MPI_INT, tree.child_id[i], term_tag, comm);
476
477 }
478 }
479 }
480 }
481
482 /* end of (!terminate) loop */
483
484
485 /* ----some clean up before post-processing ----*/
486 if (recv_contact_buf_size > 0)
487 {
488 hypre_TFree(recv_contact_buf);
489 }
490
491 hypre_Free(send_response_buf);
492 hypre_TFree(contact_ptrs);
493 hypre_TFree(response_ptrs);
494
495
496
497
498 /*-----------------POST PROCESSING------------------------------*/
499
500 /* more data to receive? */
501 /* move to recv buffer and update response_recv_buf_starts */
502
503 total_size = 0; /*total number of items in response buffer */
504 num_post_recvs = 0; /*num of post processing recvs to post */
505 start_ptr = initial_recv_buf;
506 response_recv_buf_starts[0] = 0; /*already allocated above */
507
508 /*an extra loop to determine sizes. This is better than reallocating
509 the array that will be used in posting the irecvs */
510 for (i=0; i< num_contacts; i++)
511 {
512 /* int_ptr = (int *) (start_ptr + max_response_size_bytes); */ /*the overhead int*/
513 int_ptr = (int *) ( (char *) start_ptr + max_response_size_bytes); /*the overhead int*/
514
515
516 response_message_size = *int_ptr;
517 response_recv_buf_starts[i+1] = response_recv_buf_starts[i] + response_message_size;
518 total_size += response_message_size;
519 if (max_response_size < response_message_size) num_post_recvs++;
520 /* start_ptr += max_response_total_bytes; */
521 start_ptr = (void *) ((char *) start_ptr + max_response_total_bytes);
522 }
523
524 post_recv_requests = hypre_TAlloc(MPI_Request, num_post_recvs);
525 post_recv_statuses = hypre_TAlloc(MPI_Status, num_post_recvs);
526 post_ptrs = hypre_TAlloc(void *, num_post_recvs);
527
528 /*second loop to post any recvs and set up recv_response_buf */
529 response_recv_buf = hypre_MAlloc(total_size*response_obj_size);
530 index_ptr = response_recv_buf;
531 start_ptr = initial_recv_buf;
532 count = 0;
533
534 for (i=0; i< num_contacts; i++)
535 {
536
537 response_message_size = response_recv_buf_starts[i+1] - response_recv_buf_starts[i];
538 copy_size = hypre_min(response_message_size, max_response_size);
539
540 memcpy(index_ptr, start_ptr, copy_size*response_obj_size);
541 /* index_ptr += copy_size*response_obj_size; */
542 index_ptr = (void *) ((char *) index_ptr + copy_size*response_obj_size);
543
544 if (max_response_size < response_message_size)
545 {
546 size = (response_message_size - max_response_size)*response_obj_size;
547 post_ptrs[count] = index_ptr;
548 MPI_Irecv(post_ptrs[count], size, MPI_BYTE, contact_proc_list[i] ,
549 post_tag, comm, &post_recv_requests[count]);
550 count++;
551 /* index_ptr+=size;*/
552 index_ptr= (void *) ((char *) index_ptr + size);
553 }
554
555 /* start_ptr += max_response_total_bytes; */
556 start_ptr = (void *) ((char *) start_ptr + max_response_total_bytes);
557
558 }
559
560 post_send_statuses = hypre_TAlloc(MPI_Status, post_array_size);
561
562
563 /*--------------CLEAN UP------------------- */
564
565 hypre_Free(initial_recv_buf);
566
567
568 if (num_contacts > 0 )
569 {
570 /*these should be done */
571 ierr = MPI_Waitall(num_contacts, contact_requests,
572 contact_statuses);
573
574
575 hypre_TFree(response_requests);
576 hypre_TFree(response_statuses);
577 hypre_TFree(contact_requests);
578 hypre_TFree(contact_statuses);
579
580
581 }
582
583 /* clean up from the post processing - the arrays, requests, etc. */
584
585 if (num_post_recvs)
586 {
587 ierr = MPI_Waitall(num_post_recvs, post_recv_requests,
588 post_recv_statuses);
589 hypre_TFree(post_recv_requests);
590 hypre_TFree(post_recv_statuses);
591 hypre_TFree(post_ptrs);
592 }
593
594
595 if (post_array_size)
596 {
597 ierr = MPI_Waitall(post_array_size, post_send_requests,
598 post_send_statuses);
599
600 hypre_TFree(post_send_requests);
601 hypre_TFree(post_send_statuses);
602
603 for (i=0; i< post_array_size; i++)
604 {
605 hypre_Free(post_array[i]);
606 }
607 hypre_TFree(post_array);
608
609 }
610
611
612 if (num_procs > 1)
613 {
614 hypre_TFree(term_requests);
615 hypre_TFree(term_statuses);
616
617 ierr = hypre_DestroyBinaryTree(&tree);
618
619 }
620
621 /* output */
622 *p_response_recv_buf = response_recv_buf;
623 *p_response_recv_buf_starts = response_recv_buf_starts;
624
625
626 return(ierr);
627
628
629
630}
Note: See TracBrowser for help on using the repository browser.