8#include "../mpiwrap/cp_mpi.h"
9#include "../offload/offload_mempool.h"
17#define DBM_MULTIPLY_COMM_MEMPOOL
24static int gcd(
const int a,
const int b) {
35static int lcm(
const int a,
const int b) {
return (a * b) / gcd(a, b); }
42 assert(0 <= nelements);
43 assert(element_size <= INT_MAX);
44 assert(nelements <= INT_MAX / (
int)element_size);
45 return nelements * (int)element_size;
52static inline int isum(
const int n,
const int input[n]) {
54 for (
int i = 0;
i < n;
i++) {
64static inline void icumsum(
const int n,
const int input[n],
int output[n]) {
65 int oval = output[0] = 0, ival = input[0];
66 for (
int i = 1;
i < n;
i++) {
67 output[
i] = (oval += ival);
77 const int blks_recv_count[nranks],
78 const int blks_recv_displ[nranks],
79 const int free_index_sizes[],
80 const int sum_index_sizes[],
82 int data_recv_count[nranks]) {
83 memset(data_recv_count, 0, nranks *
sizeof(
int));
84 for (
int irank = 0; irank < nranks; irank++) {
85 for (
int i = 0;
i < blks_recv_count[irank];
i++) {
87 &blks_recv[blks_recv_displ[irank] +
i];
88 const int block_size =
90 assert(block_size >= 0);
91 assert(data_recv_count[irank] <= INT_MAX - block_size);
92 data_recv_count[irank] += block_size;
115 return ((
unsigned long long)sum_index * 1021ULL) % (
unsigned long long)nticks;
127 const int npacks,
plan_t *plans_per_pack[npacks],
128 int nblks_per_pack[npacks],
129 int ndata_per_pack[npacks]) {
130 memset(nblks_per_pack, 0, npacks *
sizeof(
int));
131 memset(ndata_per_pack, 0, npacks *
sizeof(
int));
136 int nblks_mythread[npacks];
137 memset(nblks_mythread, 0, npacks *
sizeof(
int));
138#pragma omp for schedule(static)
141 for (
int iblock = 0; iblock < shard->
nblocks; iblock++) {
143 const int sum_index = (trans_matrix) ? blk->
row : blk->
col;
145 const int ipack = itick64 / dist_ticks->
nranks;
146 nblks_mythread[ipack]++;
152 for (
int ipack = 0; ipack < npacks; ipack++) {
153 nblks_per_pack[ipack] += nblks_mythread[ipack];
154 nblks_mythread[ipack] = nblks_per_pack[ipack];
158 for (
int ipack = 0; ipack < npacks; ipack++) {
159 const int nblks = nblks_per_pack[ipack];
160 plans_per_pack[ipack] = malloc(nblks *
sizeof(
plan_t));
161 assert(plans_per_pack[ipack] != NULL || nblks == 0);
165 int ndata_mythread[npacks];
166 memset(ndata_mythread, 0, npacks *
sizeof(
int));
167#pragma omp for schedule(static)
170 for (
int iblock = 0; iblock < shard->
nblocks; iblock++) {
172 const int free_index = (trans_matrix) ? blk->
col : blk->
row;
173 const int sum_index = (trans_matrix) ? blk->
row : blk->
col;
175 const int ipack = itick64 / dist_ticks->
nranks;
177 const int coord_free_idx = dist_indices->
index2coord[free_index];
178 const int coord_sum_idx = itick64 % dist_ticks->
nranks;
179 const int coords[2] = {(trans_dist) ? coord_sum_idx : coord_free_idx,
180 (trans_dist) ? coord_free_idx : coord_sum_idx};
184 ndata_mythread[ipack] += row_size * col_size;
186 const int iplan = --nblks_mythread[ipack];
187 plans_per_pack[ipack][iplan].blk = blk;
188 plans_per_pack[ipack][iplan].rank = rank;
189 plans_per_pack[ipack][iplan].row_size = row_size;
190 plans_per_pack[ipack][iplan].col_size = col_size;
194 for (
int ipack = 0; ipack < npacks; ipack++) {
195 ndata_per_pack[ipack] += ndata_mythread[ipack];
205 const dbm_matrix_t *matrix,
const bool trans_matrix,
const int nblks_send,
206 const int ndata_send,
plan_t plans[nblks_send],
const int nranks,
207 int blks_send_count[nranks],
int data_send_count[nranks],
208 int blks_send_displ[nranks],
int data_send_displ[nranks],
210 memset(blks_send_count, 0, nranks *
sizeof(
int));
211 memset(data_send_count, 0, nranks *
sizeof(
int));
216 int nblks_mythread[nranks], ndata_mythread[nranks];
217 memset(nblks_mythread, 0, nranks *
sizeof(
int));
218 memset(ndata_mythread, 0, nranks *
sizeof(
int));
219#pragma omp for schedule(static)
220 for (
int iblock = 0; iblock < nblks_send; iblock++) {
221 const plan_t *plan = &plans[iblock];
222 nblks_mythread[plan->
rank] += 1;
228 for (
int irank = 0; irank < nranks; irank++) {
229 blks_send_count[irank] += nblks_mythread[irank];
230 data_send_count[irank] += ndata_mythread[irank];
231 nblks_mythread[irank] = blks_send_count[irank];
232 ndata_mythread[irank] = data_send_count[irank];
239 icumsum(nranks, blks_send_count, blks_send_displ);
240 icumsum(nranks, data_send_count, data_send_displ);
241 const int m = nranks - 1;
242 assert(nblks_send == blks_send_displ[m] + blks_send_count[m]);
243 assert(ndata_send == data_send_displ[m] + data_send_count[m]);
248#pragma omp for schedule(static)
249 for (
int iblock = 0; iblock < nblks_send; iblock++) {
250 const plan_t *
const plan = &plans[iblock];
254 const double *blk_data = &shard->
data[blk->
offset];
256 const int plan_size = row_size * col_size;
257 const int irank = plan->
rank;
262 nblks_mythread[irank] -= 1;
263 ndata_mythread[irank] -= plan_size;
264 const int offset = data_send_displ[irank] + ndata_mythread[irank];
265 const int jblock = blks_send_displ[irank] + nblks_mythread[irank];
270 for (
int i = 0;
i < row_size;
i++) {
271 for (
int j = 0; j < col_size; j++) {
272 const double element = blk_data[j * row_size +
i];
273 data_send[offset +
i * col_size + j] = element;
274 norm += element * element;
277 blks_send[jblock].free_index = plan->
blk->
col;
278 blks_send[jblock].sum_index = plan->
blk->
row;
280 for (
int i = 0;
i < plan_size;
i++) {
281 const double element = blk_data[
i];
282 data_send[offset +
i] = element;
283 norm += element * element;
285 blks_send[jblock].free_index = plan->
blk->
row;
286 blks_send[jblock].sum_index = plan->
blk->
col;
288 blks_send[jblock].norm = (float)norm;
291 blks_send[jblock].offset = offset - data_send_displ[irank];
311 const int nranks,
const int nshards,
const int nblocks_recv,
312 const int blks_recv_count[nranks],
const int blks_recv_displ[nranks],
313 const int data_recv_displ[nranks],
315 int nblocks_per_shard[nshards], shard_start[nshards];
316 memset(nblocks_per_shard, 0, nshards *
sizeof(
int));
319 assert(blocks_tmp != NULL || nblocks_recv == 0);
324 for (
int irank = 0; irank < nranks; irank++) {
326 for (
int i = 0;
i < blks_recv_count[irank];
i++) {
327 blks_recv[blks_recv_displ[irank] +
i].offset += data_recv_displ[irank];
332 int nblocks_mythread[nshards];
333 memset(nblocks_mythread, 0, nshards *
sizeof(
int));
334#pragma omp for schedule(static)
335 for (
int iblock = 0; iblock < nblocks_recv; iblock++) {
336 blocks_tmp[iblock] = blks_recv[iblock];
337 const int ishard = blks_recv[iblock].
free_index % nshards;
338 nblocks_mythread[ishard]++;
341 for (
int ishard = 0; ishard < nshards; ishard++) {
342 nblocks_per_shard[ishard] += nblocks_mythread[ishard];
343 nblocks_mythread[ishard] = nblocks_per_shard[ishard];
347 icumsum(nshards, nblocks_per_shard, shard_start);
349#pragma omp for schedule(static)
350 for (
int iblock = 0; iblock < nblocks_recv; iblock++) {
351 const int ishard = blocks_tmp[iblock].
free_index % nshards;
352 const int jblock = --nblocks_mythread[ishard] + shard_start[ishard];
353 blks_recv[jblock] = blocks_tmp[iblock];
358 for (
int ishard = 0; ishard < nshards; ishard++) {
359 if (nblocks_per_shard[ishard] > 1) {
360 qsort(&blks_recv[shard_start[ishard]], nblocks_per_shard[ishard],
374 const bool trans_dist,
382 const dbm_dist_1d_t *dist_indices = (trans_dist) ? &dist->cols : &dist->rows;
383 const dbm_dist_1d_t *dist_ticks = (trans_dist) ? &dist->rows : &dist->cols;
384 const int *free_index_sizes =
385 (trans_matrix) ? matrix->col_sizes : matrix->row_sizes;
386 const int *sum_index_sizes =
387 (trans_matrix) ? matrix->row_sizes : matrix->col_sizes;
390 const int nsend_packs = nticks / dist_ticks->
nranks;
391 assert(nsend_packs * dist_ticks->
nranks == nticks);
397 assert(packed.
send_packs != NULL || nsend_packs == 0);
400 plan_t *plans_per_pack[nsend_packs];
401 int nblks_send_per_pack[nsend_packs], ndata_send_per_pack[nsend_packs];
403 dist_ticks, nticks, nsend_packs, plans_per_pack,
404 nblks_send_per_pack, ndata_send_per_pack);
407 int nblks_send_max = 0, ndata_send_max = 0;
408 for (
int ipack = 0; ipack < nsend_packs; ++ipack) {
409 nblks_send_max =
imax(nblks_send_max, nblks_send_per_pack[ipack]);
410 ndata_send_max =
imax(ndata_send_max, ndata_send_per_pack[ipack]);
417 for (
int ipack = 0; ipack < nsend_packs; ipack++) {
419 const int nranks = dist->nranks;
420 int blks_send_count[nranks], data_send_count[nranks];
421 int blks_send_displ[nranks], data_send_displ[nranks];
423 ndata_send_per_pack[ipack], plans_per_pack[ipack], nranks,
424 blks_send_count, data_send_count, blks_send_displ,
425 data_send_displ, blks_send, data_send);
426 free(plans_per_pack[ipack]);
429 int blks_recv_count[nranks], blks_recv_displ[nranks];
431 icumsum(nranks, blks_recv_count, blks_recv_displ);
432 const int nblocks_recv =
isum(nranks, blks_recv_count);
437 int blks_send_count_byte[nranks], blks_send_displ_byte[nranks];
438 int blks_recv_count_byte[nranks], blks_recv_displ_byte[nranks];
439 for (
int i = 0;
i < nranks;
i++) {
440 blks_send_count_byte[
i] =
442 blks_send_displ_byte[
i] =
444 blks_recv_count_byte[
i] =
446 blks_recv_displ_byte[
i] =
450 blks_recv, blks_recv_count_byte, blks_recv_displ_byte,
454 int data_recv_count[nranks], data_recv_displ[nranks];
456 free_index_sizes, sum_index_sizes, blks_recv,
458 icumsum(nranks, data_recv_count, data_recv_displ);
459 const int ndata_recv =
isum(nranks, data_recv_count);
462#if defined(DBM_MULTIPLY_COMM_MEMPOOL)
469 data_recv, data_recv_count, data_recv_displ,
474 blks_recv_count, blks_recv_displ,
475 data_recv_displ, blks_recv);
487 int max_nblocks = 0, max_data_size = 0;
488 for (
int ipack = 0; ipack < packed.
nsend_packs; ipack++) {
498#if defined(DBM_MULTIPLY_COMM_MEMPOOL)
519 const int itick_of_rank0 = (itick + nticks - my_rank) % nticks;
520 const int send_rank = (my_rank + nticks - itick_of_rank0) % nranks;
521 const int send_itick = (itick_of_rank0 + send_rank) % nticks;
522 const int send_ipack = send_itick / nranks;
523 assert(send_itick % nranks == my_rank);
526 const int recv_rank = itick % nranks;
527 const int recv_ipack = itick / nranks;
530 if (send_rank == my_rank) {
531 assert(send_rank == recv_rank && send_ipack == recv_ipack);
573#if defined(DBM_MULTIPLY_COMM_MEMPOOL)
578 for (
int ipack = 0; ipack < packed->
nsend_packs; ipack++) {
580#if defined(DBM_MULTIPLY_COMM_MEMPOOL)
599 assert(iter != NULL);
629 const int itick = (iter->
itick + shift) % iter->
nticks;
void cp_mpi_free_mem(void *mem)
Wrapper around MPI_Free_mem.
void cp_mpi_max_int(int *values, const int count, const cp_mpi_comm_t comm)
Wrapper around MPI_Allreduce for op MPI_MAX and datatype MPI_INT.
int cp_mpi_sendrecv_byte(const void *sendbuf, const int sendcount, const int dest, const int sendtag, void *recvbuf, const int recvcount, const int source, const int recvtag, const cp_mpi_comm_t comm)
Wrapper around MPI_Sendrecv for datatype MPI_BYTE.
void cp_mpi_alltoallv_double(const double *sendbuf, const int *sendcounts, const int *sdispls, double *recvbuf, const int *recvcounts, const int *rdispls, const cp_mpi_comm_t comm)
Wrapper around MPI_Alltoallv for datatype MPI_DOUBLE.
int cp_mpi_cart_rank(const cp_mpi_comm_t comm, const int coords[])
Wrapper around MPI_Cart_rank.
void * cp_mpi_alloc_mem(size_t size)
Wrapper around MPI_Alloc_mem.
void cp_mpi_alltoall_int(const int *sendbuf, const int sendcount, int *recvbuf, const int recvcount, const cp_mpi_comm_t comm)
Wrapper around MPI_Alltoall for datatype MPI_INT.
bool cp_mpi_comms_are_similar(const cp_mpi_comm_t comm1, const cp_mpi_comm_t comm2)
Wrapper around MPI_Comm_compare.
void cp_mpi_alltoallv_byte(const void *sendbuf, const int *sendcounts, const int *sdispls, void *recvbuf, const int *recvcounts, const int *rdispls, const cp_mpi_comm_t comm)
Wrapper around MPI_Alltoallv for datatype MPI_BYTE.
int cp_mpi_sendrecv_double(const double *sendbuf, const int sendcount, const int dest, const int sendtag, double *recvbuf, const int recvcount, const int source, const int recvtag, const cp_mpi_comm_t comm)
Wrapper around MPI_Sendrecv for datatype MPI_DOUBLE.
static int imax(int x, int y)
Returns the larger of two given integers (missing from the C standard)
static int dbm_get_shard_index(const dbm_matrix_t *matrix, const int row, const int col)
Internal routine for getting a block's shard index.
static int dbm_get_num_shards(const dbm_matrix_t *matrix)
Internal routine that returns the number of shards for given matrix.
static void free_packed_matrix(dbm_packed_matrix_t *packed)
Private routine for releasing a packed matrix.
static void icumsum(const int n, const int input[n], int output[n])
Private routine for computing the cumulative sums of given numbers.
static void create_pack_plans(const bool trans_matrix, const bool trans_dist, const dbm_matrix_t *matrix, const cp_mpi_comm_t comm, const dbm_dist_1d_t *dist_indices, const dbm_dist_1d_t *dist_ticks, const int nticks, const int npacks, plan_t *plans_per_pack[npacks], int nblks_per_pack[npacks], int ndata_per_pack[npacks])
Private routine for planing packs.
static void postprocess_received_blocks(const int nranks, const int nshards, const int nblocks_recv, const int blks_recv_count[nranks], const int blks_recv_displ[nranks], const int data_recv_displ[nranks], dbm_pack_block_t blks_recv[nblocks_recv])
Private routine for post-processing received blocks.
dbm_comm_iterator_t * dbm_comm_iterator_start(const bool transa, const bool transb, const dbm_matrix_t *matrix_a, const dbm_matrix_t *matrix_b, const dbm_matrix_t *matrix_c)
Internal routine for creating a communication iterator.
static void fill_send_buffers(const dbm_matrix_t *matrix, const bool trans_matrix, const int nblks_send, const int ndata_send, plan_t plans[nblks_send], const int nranks, int blks_send_count[nranks], int data_send_count[nranks], int blks_send_displ[nranks], int data_send_displ[nranks], dbm_pack_block_t blks_send[nblks_send], double data_send[ndata_send])
Private routine for filling send buffers.
void dbm_comm_iterator_stop(dbm_comm_iterator_t *iter)
Internal routine for releasing the given communication iterator.
static dbm_packed_matrix_t pack_matrix(const bool trans_matrix, const bool trans_dist, const dbm_matrix_t *restrict matrix, const dbm_distribution_t *restrict dist, const int nticks)
Private routine for redistributing a matrix along selected dimensions.
static dbm_pack_t * sendrecv_pack(const int itick, const int nticks, dbm_packed_matrix_t *packed)
Private routine for sending and receiving the pack for the given tick.
static int compare_pack_blocks_by_sum_index(const void *a, const void *b)
Private comperator passed to qsort to compare two blocks by sum_index.
bool dbm_comm_iterator_next(dbm_comm_iterator_t *iter, dbm_pack_t **pack_a, dbm_pack_t **pack_b)
Internal routine for retrieving next pair of packs of given iterator.
static unsigned long long calculate_tick_index(int sum_index, int nticks)
Private routine for calculating tick indices in pack plans.
static void compute_data_recv_count(const int nranks, const int blks_recv_count[nranks], const int blks_recv_displ[nranks], const int free_index_sizes[], const int sum_index_sizes[], const dbm_pack_block_t blks_recv[], int data_recv_count[nranks])
Private routine computing received data counts from block metadata.
static int isum(const int n, const int input[n])
Private routine for computing the sum of the given integers.
static int checked_byte_count(const int nelements, const size_t element_size)
Private routine for converting element counts to byte counts.
static void const int const int i
void offload_mempool_host_free(const void *memory)
Internal routine for releasing memory back to the pool.
void * offload_mempool_host_malloc(const size_t size)
Internal routine for allocating host memory from the pool.
Internal struct for storing a block's metadata.
Internal struct for storing a communication iterator.
dbm_packed_matrix_t packed_a
dbm_distribution_t * dist
dbm_packed_matrix_t packed_b
Internal struct for storing a one dimensional distribution.
Internal struct for storing a two dimensional distribution.
Internal struct for storing a matrix.
dbm_distribution_t * dist
Internal struct for storing a dbm_block_t plus its norm.
Internal struct for storing a pack - essentially a shard for MPI.
dbm_pack_block_t * blocks
Internal struct for storing a packed matrix.
const dbm_dist_1d_t * dist_ticks
const dbm_dist_1d_t * dist_indices
Internal struct for storing a matrix shard.
Private struct used for planing during pack_matrix.