aboutsummaryrefslogtreecommitdiff
path: root/src/reduce.c
blob: 6310b9664e81466dbc0f936795a97d5f0337667c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/**
 * @file reduce.c
 * @brief functionality for the reduction operations
 * @author Sree Harsha Totakura <sreeharsha@totakura.in> 
 */

#include "common.h"
#include <gnunet/gnunet_util_lib.h>
#include <mpi.h>
#include "addressmap.h"
#include "mshd.h"
#include "mtypes.h"

#define LOG(kind,...)                           \
  GNUNET_log_from (kind, "mshd-reduce", __VA_ARGS__)

#define LOG_DEBUG(...) LOG(GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)

#define LOG_ERROR(...) LOG(GNUNET_ERROR_TYPE_ERROR, __VA_ARGS__)


/**
 * Perform an ntree reduction on address maps
 *
 * @return GNUNET_OK upon success; GNUNET_SYSERR upon failure
 */
int
reduce_ntree ()
{
  struct MSH_MSG_InstanceAdresses **iaddr_msgs;
  char *buf;
  int nmsg;
  unsigned int cnt;
  unsigned int grow;
  unsigned int size;
  unsigned int preamble[2];         /* 0: total size; 1: n messages */
  unsigned int nr;
  
  for (cnt = 0; cnt < nproc; cnt++)
  {
    buf = NULL;
    size = 0;
    grow = 0;
    preamble[0] = 0;
    preamble[1] = 0;
    iaddr_msgs = NULL;
    if (rank == cnt)
    {
        if (GNUNET_SYSERR == (nmsg = addressmap_pack (addrmap, &iaddr_msgs)))
        {
          GNUNET_break (0);
          return GNUNET_SYSERR;
        }
        for (nr = 0; nr < nmsg; nr++)
        {
          grow = ntohs (iaddr_msgs[nr]->header.size);
          buf = GNUNET_realloc (buf, size + grow);
          (void) memcpy (buf + size, iaddr_msgs[nr], grow);
          size += grow;
        }
        preamble[0] = size;
        preamble[1] = nmsg;
        LOG_DEBUG ("Broadcasting address map from instance %u\n", cnt);
    }
    if (MPI_SUCCESS != MPI_Bcast (preamble, 2, MPI_UNSIGNED, cnt, MPI_COMM_WORLD))
    {
      GNUNET_break (0);
      return GNUNET_SYSERR;
    }
    if (rank != cnt)
    {
      size = preamble[0];
      nmsg = preamble[1];
      buf = GNUNET_malloc (size);
    }
    GNUNET_assert (NULL != buf);
    GNUNET_assert (0 < size);
    if (MPI_SUCCESS != MPI_Bcast (buf, size, MPI_BYTE, cnt, MPI_COMM_WORLD))
    {
      GNUNET_break (0);
      return GNUNET_SYSERR;
    }
    if (rank == cnt)
    {
      GNUNET_free (buf);
      for (nr = 0; nr < nmsg; nr++)
        GNUNET_free (iaddr_msgs[nr]);
      GNUNET_free (iaddr_msgs);
      continue;
    }

    iaddr_msgs = GNUNET_malloc (sizeof (struct MSH_MSG_InstanceAdresses *) *
                                nmsg);
    grow = 0;    
    for (nr = 0; nr < nmsg; nr++)
    {
      iaddr_msgs[nr] = (struct MSH_MSG_InstanceAdresses *) (buf + grow);
      grow += ntohs (iaddr_msgs[nr]->header.size);
      if (0 >= addressmap_intersect_msg (addrmap, iaddr_msgs[nr]))
      {
        LOG_ERROR ("No common address found for instance %u\n", 
                   ntohs (iaddr_msgs[nr]->rank));
        break;
      }
    }
    GNUNET_free (buf);
    GNUNET_free (iaddr_msgs);
    if (nr == nmsg)
      LOG_DEBUG ("Intersected received addressmap from instance %u\n", cnt);
    else
      return GNUNET_SYSERR;
  }
  GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_NONCE, &shash);
  if (MPI_SUCCESS != MPI_Bcast (&shash, sizeof (struct GNUNET_HashCode),
                                MPI_BYTE, 0, MPI_COMM_WORLD))
  {
    GNUNET_break (0);
    return GNUNET_SYSERR;
  }
  return GNUNET_OK;
}