| 81 | | message.tag = tag; |
| 82 | | message.size = size; |
| 83 | | message.data = $malloc<top>(&mp_heap, size); |
| 84 | | memcpy(message.data, buf, size); |
| 85 | | // enqueue on comm->buf_front[i][j] … |
| 86 | | // update message.next, message.prev, buf_back[i][j] |
| 87 | | } |
| | 81 | message.tag = tag; |
| | 82 | message.size = count*sizeofDatatype(type); |
| | 83 | message.data = $malloc(&mp_heap, size); |
| | 84 | memcpy(message.data, buf, size); |
| | 85 | // enqueue on comm->buf_front[i][j] … |
| | 86 | // update message.next, message.prev, buf_back[i][j] |
| | 87 | } |
| 89 | | // can I use a function as a guard? not now |
| 90 | | // can always use busy-wait loop? |
| 91 | | boolean recv_guard(int source, int dest, int tag, Comm comm) { |
| | 89 | boolean match(MPI_Message *p, int source, int tag, Comm comm) { |
| | 90 | return (source == MPI_ANY_SOURCE || source == p->source) |
| | 91 | && (tag == MPI_ANY_TAG || tag == p->tag) |
| | 92 | && (comm == p->comm); |
| | 93 | } |
| | 94 | |
| | 95 | boolean probe(int source, int tag, Comm comm) { |
| | 96 | for (MPI_Message *p = comm->buf_front[source][pid]; p != NULL; p=p->next) |
| | 97 | if (match(p, source, dest, tag)) return true; |
| | 98 | return false; |
| | 99 | } |
| | 100 | |
| | 101 | boolean recv_guard(int source, int tag, Comm comm) { |
| | 102 | if (source == MPI_ANY_SOURCE) { |
| | 103 | for (int i=0; i<NPROCS; i++) { |
| | 104 | if (probe(i, tag, comm)) return true; |
| | 105 | } |
| | 106 | return false; |
| | 107 | } else { |
| | 108 | return probe(source, tag, comm); |
| | 109 | } |
| | 110 | } |
| | 111 | |
| | 112 | void recv(int dest, void *buf, int size, int source, int tag, Comm comm, Status *status) { |
| | 113 | int message_source; |
| | 114 | int message_dest; |
| | 115 | MPI_Message *message; |
| | 116 | |
| | 117 | when (recv_guard(source, tag, comm)) ; |
| | 118 | if (source == MPI_ANY_SOURCE) { |
| | 119 | for (int i=0; i<NPROCS; i++) { |
| | 120 | message = comm->buf_front[source][dest]; |
| | 121 | ... |
| | 122 | // search the queue looking for the message |
| | 123 | // also set message_source, message_dest; |
| | 124 | // and remove message from the queue |
| | 125 | } |
| | 126 | } else { |
| | 127 | … |
| | 128 | } |
| | 129 | |
| | 130 | if (message->size > size) error…; |
| | 131 | memcpy(buf, message->data, message.size); |
| | 132 | status->size = message.size; |
| | 133 | status->source = message_source; |
| | 134 | status->dest = message_dest; |
| | 135 | $free(&mp_heap, message->data); |
| | 136 | } |
| | 137 | |
| | 138 | // main body of process... |
| 95 | | void recv(int dest, void *buf, int size, int tag, int source, Comm comm, Status *status) { |
| 96 | | Message *message; |
| 97 | | int message_source; |
| 98 | | int message_dest; |
| 99 | | Message *queue = comm->buf_front[source][dest]; |
| 100 | | |
| 101 | | // first, the guard: |
| 102 | | when ($exists {int i | 0<= i && |
| 103 | | // search the queue looking for the message |
| 104 | | // also set message_source, message_dest; |
| 105 | | // and remove message from the queue |
| 106 | | |
| 107 | | if (message.size > size) error…; |
| 108 | | memcpy(buf, message.data, message.size); |
| 109 | | status->size = message.size; |
| 110 | | status->source = message_source; |
| 111 | | status->dest = message_dest; |
| 112 | | $free(message->data); |
| 113 | | } |
| 114 | | |