aboutsummaryrefslogtreecommitdiff
path: root/src/multicast
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-05-13 12:08:14 +0000
committerGabor X Toth <*@tg-x.net>2014-05-13 12:08:14 +0000
commit093f0291be26fa3dfc6fc98a536028ef99517832 (patch)
treec10078bfe4136f940183d8dfde85617ab75acf46 /src/multicast
parent783fc956a05c0f321fa63fbcaeab00bc1865a069 (diff)
downloadgnunet-093f0291be26fa3dfc6fc98a536028ef99517832.tar.gz
gnunet-093f0291be26fa3dfc6fc98a536028ef99517832.zip
multicast: send messages between client lib & service
Diffstat (limited to 'src/multicast')
-rw-r--r--src/multicast/Makefile.am6
-rw-r--r--src/multicast/gnunet-service-multicast.c261
-rw-r--r--src/multicast/multicast.h31
-rw-r--r--src/multicast/multicast_api.c785
4 files changed, 827 insertions, 256 deletions
diff --git a/src/multicast/Makefile.am b/src/multicast/Makefile.am
index f8d49034e..752736c49 100644
--- a/src/multicast/Makefile.am
+++ b/src/multicast/Makefile.am
@@ -39,12 +39,18 @@ gnunet_multicast_SOURCES = \
39gnunet_multicast_LDADD = \ 39gnunet_multicast_LDADD = \
40 $(top_builddir)/src/util/libgnunetutil.la \ 40 $(top_builddir)/src/util/libgnunetutil.la \
41 $(GN_LIBINTL) 41 $(GN_LIBINTL)
42gnunet_multicast_DEPENDENCIES = \
43 $(top_builddir)/src/util/libgnunetutil.la
42 44
43gnunet_service_multicast_SOURCES = \ 45gnunet_service_multicast_SOURCES = \
44 gnunet-service-multicast.c 46 gnunet-service-multicast.c
45gnunet_service_multicast_LDADD = \ 47gnunet_service_multicast_LDADD = \
46 $(top_builddir)/src/util/libgnunetutil.la \ 48 $(top_builddir)/src/util/libgnunetutil.la \
49 $(top_builddir)/src/statistics/libgnunetstatistics.la \
47 $(GN_LIBINTL) 50 $(GN_LIBINTL)
51gnunet_service_multicast_DEPENDENCIES = \
52 $(top_builddir)/src/util/libgnunetutil.la \
53 $(top_builddir)/src/statistics/libgnunetstatistics.la
48 54
49 55
50check_PROGRAMS = \ 56check_PROGRAMS = \
diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c
index 47c8bce36..0265660e1 100644
--- a/src/multicast/gnunet-service-multicast.c
+++ b/src/multicast/gnunet-service-multicast.c
@@ -25,6 +25,102 @@
25 */ 25 */
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_util_lib.h" 27#include "gnunet_util_lib.h"
28#include "gnunet_signatures.h"
29#include "gnunet_statistics_service.h"
30#include "gnunet_multicast_service.h"
31#include "multicast.h"
32
33/**
34 * Handle to our current configuration.
35 */
36static const struct GNUNET_CONFIGURATION_Handle *cfg;
37
38/**
39 * Handle to the statistics service.
40 */
41static struct GNUNET_STATISTICS_Handle *stats;
42/**
43 * Notification context, simplifies client broadcasts.
44 */
45static struct GNUNET_SERVER_NotificationContext *nc;
46
47/**
48 * All connected origins.
49 * Group's pub_key_hash -> struct Group
50 */
51static struct GNUNET_CONTAINER_MultiHashMap *origins;
52
53/**
54 * All connected members.
55 * Group's pub_key_hash -> struct Group
56 */
57static struct GNUNET_CONTAINER_MultiHashMap *members;
58
59/**
60 * Common part of the client context for both an origin and member.
61 */
62struct Group
63{
64 struct GNUNET_SERVER_Client *client;
65
66 /**
67 * Public key of the group.
68 */
69 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
70
71 /**
72 * Hash of @a pub_key.
73 */
74 struct GNUNET_HashCode pub_key_hash;
75
76 /**
77 * Is this an origin (#GNUNET_YES), or member (#GNUNET_NO)?
78 */
79 uint8_t is_origin;
80
81 /**
82 * Is the client disconnected? #GNUNET_YES or #GNUNET_NO
83 */
84 uint8_t disconnected;
85};
86
87
88/**
89 * Client context for a group's origin.
90 */
91struct Origin
92{
93 struct Group grp;
94
95 /**
96 * Private key of the group.
97 */
98 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
99
100 /**
101 * Last message fragment ID sent to the group.
102 */
103 uint64_t max_fragment_id;
104};
105
106
107/**
108 * Client context for a group member.
109 */
110struct Member
111{
112 struct Group grp;
113
114 /**
115 * Private key of the member.
116 */
117 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
118
119 /**
120 * Last request fragment ID sent to the origin.
121 */
122 uint64_t max_fragment_id;
123};
28 124
29 125
30/** 126/**
@@ -41,13 +137,86 @@ cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
41 137
42 138
43/** 139/**
140 * Iterator callback for sending a message to clients.
141 */
142static int
143message_callback (void *cls, const struct GNUNET_HashCode *pub_key_hash,
144 void *group)
145{
146 const struct GNUNET_MessageHeader *msg = cls;
147 struct Group *grp = group;
148
149 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
150 "%p Sending message to client.\n", grp);
151
152 GNUNET_SERVER_notification_context_add (nc, grp->client);
153 GNUNET_SERVER_notification_context_unicast (nc, grp->client, msg, GNUNET_NO);
154
155 return GNUNET_YES;
156}
157
158
159/**
160 * Send message to all origin and member clients connected to the group.
161 *
162 * @param grp The group to send @a msg to.
163 * @param msg Message to send.
164 */
165static void
166message_to_group (struct Group *grp, const struct GNUNET_MessageHeader *msg)
167{
168 if (origins != NULL)
169 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
170 message_callback, (void *) msg);
171 if (members != NULL)
172 GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
173 message_callback, (void *) msg);
174}
175
176
177/**
178 * Send message to all origin clients connected to the group.
179 *
180 * @param grp The group to send @a msg to.
181 * @param msg Message to send.
182 */
183static void
184message_to_origin (struct Group *grp, const struct GNUNET_MessageHeader *msg)
185{
186 if (origins != NULL)
187 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
188 message_callback, (void *) msg);
189}
190
191
192/**
44 * Handle a connecting client starting an origin. 193 * Handle a connecting client starting an origin.
45 */ 194 */
46static void 195static void
47handle_origin_start (void *cls, struct GNUNET_SERVER_Client *client, 196handle_origin_start (void *cls, struct GNUNET_SERVER_Client *client,
48 const struct GNUNET_MessageHeader *msg) 197 const struct GNUNET_MessageHeader *m)
49{ 198{
199 const struct MulticastOriginStartMessage *
200 msg = (const struct MulticastOriginStartMessage *) m;
201
202 struct Origin *orig = GNUNET_new (struct Origin);
203 orig->priv_key = msg->group_key;
50 204
205 struct Group *grp = &orig->grp;
206 grp->is_origin = GNUNET_YES;
207 grp->client = client;
208
209 GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key);
210 GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash);
211
212 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
213 "%p Client connected as origin to group %s.\n",
214 orig, GNUNET_h2s (&grp->pub_key_hash));
215
216 GNUNET_SERVER_client_set_user_context (client, grp);
217 GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
218 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
219 GNUNET_SERVER_receive_done (client, GNUNET_OK);
51} 220}
52 221
53 222
@@ -58,7 +227,6 @@ static void
58handle_origin_stop (void *cls, struct GNUNET_SERVER_Client *client, 227handle_origin_stop (void *cls, struct GNUNET_SERVER_Client *client,
59 const struct GNUNET_MessageHeader *msg) 228 const struct GNUNET_MessageHeader *msg)
60{ 229{
61
62} 230}
63 231
64 232
@@ -67,9 +235,28 @@ handle_origin_stop (void *cls, struct GNUNET_SERVER_Client *client,
67 */ 235 */
68static void 236static void
69handle_member_join (void *cls, struct GNUNET_SERVER_Client *client, 237handle_member_join (void *cls, struct GNUNET_SERVER_Client *client,
70 const struct GNUNET_MessageHeader *msg) 238 const struct GNUNET_MessageHeader *m)
71{ 239{
72 240 struct MulticastMemberJoinMessage *
241 msg = (struct MulticastMemberJoinMessage *) m;
242
243 struct Member *mem = GNUNET_new (struct Member);
244 mem->priv_key = msg->member_key;
245
246 struct Group *grp = &mem->grp;
247 grp->is_origin = GNUNET_NO;
248 grp->client = client;
249 grp->pub_key = msg->group_key;
250 GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash);
251
252 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
253 "%p Client connected as member to group %s.\n",
254 mem, GNUNET_h2s (&grp->pub_key_hash));
255
256 GNUNET_SERVER_client_set_user_context (client, grp);
257 GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
258 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
259 GNUNET_SERVER_receive_done (client, GNUNET_OK);
73} 260}
74 261
75 262
@@ -89,9 +276,33 @@ handle_member_part (void *cls, struct GNUNET_SERVER_Client *client,
89 */ 276 */
90static void 277static void
91handle_multicast_message (void *cls, struct GNUNET_SERVER_Client *client, 278handle_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
92 const struct GNUNET_MessageHeader *msg) 279 const struct GNUNET_MessageHeader *m)
93{ 280{
94 281 struct Group *
282 grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
283 GNUNET_assert (GNUNET_YES == grp->is_origin);
284 struct Origin *orig = (struct Origin *) grp;
285 struct GNUNET_MULTICAST_MessageHeader *
286 msg = (struct GNUNET_MULTICAST_MessageHeader *) m;
287
288 msg->fragment_id = GNUNET_htonll (orig->max_fragment_id++);
289 msg->purpose.size = htonl (sizeof (*msg) + ntohs (m->size)
290 - sizeof (msg->header)
291 - sizeof (msg->hop_counter)
292 - sizeof (msg->signature));
293 msg->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
294
295 if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&orig->priv_key, &msg->purpose,
296 &msg->signature))
297 {
298 /* FIXME: handle error */
299 return;
300 }
301
302 /* FIXME: send to remote members */
303
304 message_to_group (grp, m);
305 GNUNET_SERVER_receive_done (client, GNUNET_OK);
95} 306}
96 307
97 308
@@ -100,9 +311,35 @@ handle_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
100 */ 311 */
101static void 312static void
102handle_multicast_request (void *cls, struct GNUNET_SERVER_Client *client, 313handle_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
103 const struct GNUNET_MessageHeader *msg) 314 const struct GNUNET_MessageHeader *m)
104{ 315{
316 struct Group *
317 grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
318 GNUNET_assert (GNUNET_NO == grp->is_origin);
319 struct Member *mem = (struct Member *) grp;
320
321 struct GNUNET_MULTICAST_RequestHeader *
322 req = (struct GNUNET_MULTICAST_RequestHeader *) m;
323
324 req->fragment_id = GNUNET_ntohll (mem->max_fragment_id++);
325
326 req->purpose.size = htonl (sizeof (*req) + ntohs (m->size)
327 - sizeof (req->header)
328 - sizeof (req->member_key)
329 - sizeof (req->signature));
330 req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
105 331
332 if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->priv_key, &req->purpose,
333 &req->signature))
334 {
335 /* FIXME: handle error */
336 return;
337 }
338
339 /* FIXME: send to remote origin */
340
341 message_to_origin (grp, m);
342 GNUNET_SERVER_receive_done (client, GNUNET_OK);
106} 343}
107 344
108/** 345/**
@@ -114,7 +351,7 @@ handle_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
114 */ 351 */
115static void 352static void
116run (void *cls, struct GNUNET_SERVER_Handle *server, 353run (void *cls, struct GNUNET_SERVER_Handle *server,
117 const struct GNUNET_CONFIGURATION_Handle *cfg) 354 const struct GNUNET_CONFIGURATION_Handle *c)
118{ 355{
119 static const struct GNUNET_SERVER_MessageHandler handlers[] = { 356 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
120 { &handle_origin_start, NULL, 357 { &handle_origin_start, NULL,
@@ -137,7 +374,13 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
137 374
138 {NULL, NULL, 0, 0} 375 {NULL, NULL, 0, 0}
139 }; 376 };
140 /* FIXME: do setup here */ 377
378 cfg = c;
379 stats = GNUNET_STATISTICS_create ("multicast", cfg);
380 origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
381 members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
382 nc = GNUNET_SERVER_notification_context_create (server, 1);
383
141 GNUNET_SERVER_add_handlers (server, handlers); 384 GNUNET_SERVER_add_handlers (server, handlers);
142 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &cleanup_task, 385 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &cleanup_task,
143 NULL); 386 NULL);
diff --git a/src/multicast/multicast.h b/src/multicast/multicast.h
index facf8f54e..daa79e260 100644
--- a/src/multicast/multicast.h
+++ b/src/multicast/multicast.h
@@ -183,9 +183,8 @@ struct MulticastReplayEndMessage
183 */ 183 */
184struct MulticastOriginStartMessage 184struct MulticastOriginStartMessage
185{ 185{
186
187 /** 186 /**
188 * 187 * Type: GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START
189 */ 188 */
190 struct GNUNET_MessageHeader header; 189 struct GNUNET_MessageHeader header;
191 190
@@ -195,16 +194,36 @@ struct MulticastOriginStartMessage
195 uint32_t reserved; 194 uint32_t reserved;
196 195
197 /** 196 /**
198 * Private, non-ephemeral key for the mutlicast group. 197 * Private, non-ephemeral key for the multicast group.
199 */ 198 */
200 struct GNUNET_CRYPTO_EddsaPrivateKey group_key; 199 struct GNUNET_CRYPTO_EddsaPrivateKey group_key;
201 200
202 /** 201 /**
203 * Last fragment ID, used to continue counting fragments if we resume operating 202 * Last fragment ID sent to the group, used to continue counting fragments if
204 * a group. 203 * we resume operating * a group.
204 */
205 uint64_t max_fragment_id;
206
207};
208
209
210struct MulticastMemberJoinMessage
211{
212 /**
213 * Type: GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN
205 */ 214 */
206 uint64_t last_fragment_id; 215 struct GNUNET_MessageHeader header;
216
217 uint32_t relay_count GNUNET_PACKED;
218
219 struct GNUNET_CRYPTO_EddsaPublicKey group_key;
220
221 struct GNUNET_CRYPTO_EddsaPrivateKey member_key;
222
223 struct GNUNET_PeerIdentity origin;
207 224
225 /* Followed by struct GNUNET_PeerIdentity relays[relay_count] */
226 /* Followed by struct GNUNET_MessageHeader join_request */
208}; 227};
209 228
210 229
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index da81de486..d42f438ae 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -26,7 +26,6 @@
26 */ 26 */
27#include "platform.h" 27#include "platform.h"
28#include "gnunet_util_lib.h" 28#include "gnunet_util_lib.h"
29#include "gnunet_signatures.h"
30#include "gnunet_multicast_service.h" 29#include "gnunet_multicast_service.h"
31#include "multicast.h" 30#include "multicast.h"
32 31
@@ -46,11 +45,18 @@ static struct GNUNET_CONTAINER_MultiHashMap *origins;
46static struct GNUNET_CONTAINER_MultiHashMap *members; 45static struct GNUNET_CONTAINER_MultiHashMap *members;
47 46
48 47
48struct MessageQueue
49{
50 struct MessageQueue *prev;
51 struct MessageQueue *next;
52};
53
54
49/** 55/**
50 * Handle for a request to send a message to all multicast group members 56 * Handle for a request to send a message to all multicast group members
51 * (from the origin). 57 * (from the origin).
52 */ 58 */
53struct GNUNET_MULTICAST_OriginMessageHandle 59struct GNUNET_MULTICAST_OriginTransmitHandle
54{ 60{
55 GNUNET_MULTICAST_OriginTransmitNotify notify; 61 GNUNET_MULTICAST_OriginTransmitNotify notify;
56 void *notify_cls; 62 void *notify_cls;
@@ -62,47 +68,104 @@ struct GNUNET_MULTICAST_OriginMessageHandle
62}; 68};
63 69
64 70
65struct GNUNET_MULTICAST_Group 71/**
72 * Handle for a message to be delivered from a member to the origin.
73 */
74struct GNUNET_MULTICAST_MemberTransmitHandle
66{ 75{
67 uint8_t is_origin; 76 GNUNET_MULTICAST_MemberTransmitNotify notify;
77 void *notify_cls;
78 struct GNUNET_MULTICAST_Member *member;
79
80 uint64_t request_id;
81 uint64_t fragment_offset;
68}; 82};
69 83
70/** 84
71 * Handle for the origin of a multicast group. 85struct GNUNET_MULTICAST_Group
72 */
73struct GNUNET_MULTICAST_Origin
74{ 86{
75 struct GNUNET_MULTICAST_Group grp; 87 /**
88 * Configuration to use.
89 */
90 const struct GNUNET_CONFIGURATION_Handle *cfg;
76 91
77 struct GNUNET_MULTICAST_OriginMessageHandle msg_handle; 92 /**
78 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; 93 * Socket (if available).
94 */
95 struct GNUNET_CLIENT_Connection *client;
96
97 /**
98 * Currently pending transmission request, or NULL for none.
99 */
100 struct GNUNET_CLIENT_TransmitHandle *th;
101
102 /**
103 * Head of operations to transmit.
104 */
105 struct MessageQueue *tmit_head;
106
107 /**
108 * Tail of operations to transmit.
109 */
110 struct MessageQueue *tmit_tail;
111
112 /**
113 * Message being transmitted to the Multicast service.
114 */
115 struct MessageQueue *tmit_msg;
116
117 /**
118 * Message to send on reconnect.
119 */
120 struct GNUNET_MessageHeader *reconnect_msg;
121
122 /**
123 * Task doing exponential back-off trying to reconnect.
124 */
125 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
126
127 /**
128 * Time for next connect retry.
129 */
130 struct GNUNET_TIME_Relative reconnect_delay;
131
132 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
133 struct GNUNET_HashCode pub_key_hash;
79 134
80 GNUNET_MULTICAST_JoinCallback join_cb; 135 GNUNET_MULTICAST_JoinCallback join_cb;
81 GNUNET_MULTICAST_MembershipTestCallback mem_test_cb; 136 GNUNET_MULTICAST_MembershipTestCallback member_test_cb;
82 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb; 137 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
83 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb; 138 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
84 GNUNET_MULTICAST_RequestCallback request_cb;
85 GNUNET_MULTICAST_MessageCallback message_cb; 139 GNUNET_MULTICAST_MessageCallback message_cb;
86 void *cls; 140 void *cb_cls;
87 141
88 uint64_t next_fragment_id; 142 /**
143 * Are we polling for incoming messages right now?
144 */
145 uint8_t in_receive;
89 146
90 struct GNUNET_CRYPTO_EddsaPublicKey pub_key; 147 /**
91 struct GNUNET_HashCode pub_key_hash; 148 * Are we currently transmitting a message?
149 */
150 uint8_t in_transmit;
151
152 /**
153 * Is this the origin or a member?
154 */
155 uint8_t is_origin;
92}; 156};
93 157
94 158
95/** 159/**
96 * Handle for a message to be delivered from a member to the origin. 160 * Handle for the origin of a multicast group.
97 */ 161 */
98struct GNUNET_MULTICAST_MemberRequestHandle 162struct GNUNET_MULTICAST_Origin
99{ 163{
100 GNUNET_MULTICAST_MemberTransmitNotify notify; 164 struct GNUNET_MULTICAST_Group grp;
101 void *notify_cls; 165 struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
102 struct GNUNET_MULTICAST_Member *member; 166 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
103 167
104 uint64_t request_id; 168 GNUNET_MULTICAST_RequestCallback request_cb;
105 uint64_t fragment_offset;
106}; 169};
107 170
108 171
@@ -112,24 +175,16 @@ struct GNUNET_MULTICAST_MemberRequestHandle
112struct GNUNET_MULTICAST_Member 175struct GNUNET_MULTICAST_Member
113{ 176{
114 struct GNUNET_MULTICAST_Group grp; 177 struct GNUNET_MULTICAST_Group grp;
178 struct GNUNET_MULTICAST_MemberTransmitHandle tmit;
115 179
116 struct GNUNET_MULTICAST_MemberRequestHandle req_handle; 180 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
117
118 struct GNUNET_CRYPTO_EddsaPublicKey group_key;
119 struct GNUNET_CRYPTO_EddsaPrivateKey member_key;
120 struct GNUNET_PeerIdentity origin; 181 struct GNUNET_PeerIdentity origin;
121 struct GNUNET_PeerIdentity relays; 182 struct GNUNET_PeerIdentity relays;
122 uint32_t relay_count; 183 uint32_t relay_count;
184
123 struct GNUNET_MessageHeader *join_request; 185 struct GNUNET_MessageHeader *join_request;
124 GNUNET_MULTICAST_JoinCallback join_cb;
125 GNUNET_MULTICAST_MembershipTestCallback member_test_cb;
126 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
127 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
128 GNUNET_MULTICAST_MessageCallback message_cb;
129 void *cls;
130 186
131 uint64_t next_fragment_id; 187 uint64_t next_fragment_id;
132 struct GNUNET_HashCode group_key_hash;
133}; 188};
134 189
135 190
@@ -168,34 +223,233 @@ struct GNUNET_MULTICAST_MemberReplayHandle
168}; 223};
169 224
170 225
226static void
227reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
228
229
230static void
231reschedule_connect (struct GNUNET_MULTICAST_Group *grp);
232
233
234/**
235 * Schedule transmission of the next message from our queue.
236 *
237 * @param grp PSYC channel handle
238 */
239static void
240transmit_next (struct GNUNET_MULTICAST_Group *grp);
241
242
243static void
244message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
245
246
247/**
248 * Reschedule a connect attempt to the service.
249 *
250 * @param c channel to reconnect
251 */
252static void
253reschedule_connect (struct GNUNET_MULTICAST_Group *grp)
254{
255 GNUNET_assert (grp->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
256
257 if (NULL != grp->th)
258 {
259 GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th);
260 grp->th = NULL;
261 }
262 if (NULL != grp->client)
263 {
264 GNUNET_CLIENT_disconnect (grp->client);
265 grp->client = NULL;
266 }
267 grp->in_receive = GNUNET_NO;
268 LOG (GNUNET_ERROR_TYPE_DEBUG,
269 "Scheduling task to reconnect to Multicast service in %s.\n",
270 GNUNET_STRINGS_relative_time_to_string (grp->reconnect_delay, GNUNET_YES));
271 grp->reconnect_task =
272 GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, &reconnect, grp);
273 grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
274}
275
276
277/**
278 * Reset stored data related to the last received message.
279 */
280static void
281recv_reset (struct GNUNET_MULTICAST_Group *grp)
282{
283}
284
285
286static void
287recv_error (struct GNUNET_MULTICAST_Group *grp)
288{
289 if (NULL != grp->message_cb)
290 grp->message_cb (grp->cb_cls, NULL);
291
292 recv_reset (grp);
293}
294
295
296/**
297 * Transmit next message to service.
298 *
299 * @param cls The struct GNUNET_MULTICAST_Group.
300 * @param size Number of bytes available in @a buf.
301 * @param buf Where to copy the message.
302 *
303 * @return Number of bytes copied to @a buf.
304 */
305static size_t
306send_next_message (void *cls, size_t size, void *buf)
307{
308 LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
309 struct GNUNET_MULTICAST_Group *grp = cls;
310 struct MessageQueue *mq = grp->tmit_head;
311 if (NULL == mq)
312 return 0;
313 struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
314 size_t ret = ntohs (qmsg->size);
315 grp->th = NULL;
316 if (ret > size)
317 {
318 reschedule_connect (grp);
319 return 0;
320 }
321 memcpy (buf, qmsg, ret);
322
323 GNUNET_CONTAINER_DLL_remove (grp->tmit_head, grp->tmit_tail, mq);
324 GNUNET_free (mq);
325
326 if (NULL != grp->tmit_head)
327 transmit_next (grp);
328
329 if (GNUNET_NO == grp->in_receive)
330 {
331 grp->in_receive = GNUNET_YES;
332 GNUNET_CLIENT_receive (grp->client, &message_handler, grp,
333 GNUNET_TIME_UNIT_FOREVER_REL);
334 }
335 return ret;
336}
337
338
339/**
340 * Schedule transmission of the next message from our queue.
341 *
342 * @param grp Multicast group handle.
343 */
344static void
345transmit_next (struct GNUNET_MULTICAST_Group *grp)
346{
347 LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n");
348 if (NULL != grp->th || NULL == grp->client)
349 return;
350
351 struct MessageQueue *mq = grp->tmit_head;
352 if (NULL == mq)
353 return;
354 struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
355
356 grp->th = GNUNET_CLIENT_notify_transmit_ready (grp->client,
357 ntohs (qmsg->size),
358 GNUNET_TIME_UNIT_FOREVER_REL,
359 GNUNET_NO,
360 &send_next_message,
361 grp);
362}
363
364
365/**
366 * Try again to connect to the Multicast service.
367 *
368 * @param cls Channel handle.
369 * @param tc Scheduler context.
370 */
371static void
372reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
373{
374 struct GNUNET_MULTICAST_Group *grp = cls;
375
376 recv_reset (grp);
377 grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
378 LOG (GNUNET_ERROR_TYPE_DEBUG,
379 "Connecting to Multicast service.\n");
380 GNUNET_assert (NULL == grp->client);
381 grp->client = GNUNET_CLIENT_connect ("multicast", grp->cfg);
382 GNUNET_assert (NULL != grp->client);
383 uint16_t reconn_size = ntohs (grp->reconnect_msg->size);
384
385 if (NULL == grp->tmit_head ||
386 0 != memcmp (&grp->tmit_head[1], grp->reconnect_msg, reconn_size))
387 {
388 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size);
389 memcpy (&mq[1], grp->reconnect_msg, reconn_size);
390 GNUNET_CONTAINER_DLL_insert (grp->tmit_head, grp->tmit_tail, mq);
391 }
392 transmit_next (grp);
393}
394
395
396/**
397 * Disconnect from the Multicast service.
398 *
399 * @param g Group handle to disconnect.
400 */
401static void
402disconnect (void *g)
403{
404 struct GNUNET_MULTICAST_Group *grp = g;
405
406 GNUNET_assert (NULL != grp);
407 if (grp->tmit_head != grp->tmit_tail)
408 {
409 LOG (GNUNET_ERROR_TYPE_ERROR,
410 "Disconnecting while there are still outstanding messages!\n");
411 GNUNET_break (0);
412 }
413 if (grp->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
414 {
415 GNUNET_SCHEDULER_cancel (grp->reconnect_task);
416 grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
417 }
418 if (NULL != grp->th)
419 {
420 GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th);
421 grp->th = NULL;
422 }
423 if (NULL != grp->client)
424 {
425 GNUNET_CLIENT_disconnect (grp->client);
426 grp->client = NULL;
427 }
428 if (NULL != grp->reconnect_msg)
429 {
430 GNUNET_free (grp->reconnect_msg);
431 grp->reconnect_msg = NULL;
432 }
433}
434
435
171/** 436/**
172 * Iterator callback for calling message callbacks for all groups. 437 * Iterator callback for calling message callbacks for all groups.
173 */ 438 */
174static int 439static int
175message_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash, 440message_callback (void *cls, const struct GNUNET_HashCode *pub_key_hash,
176 void *group) 441 void *group)
177{ 442{
178 const struct GNUNET_MessageHeader *msg = cls; 443 const struct GNUNET_MessageHeader *msg = cls;
179 struct GNUNET_MULTICAST_Group *grp = group; 444 struct GNUNET_MULTICAST_Group *grp = group;
180 445
181 if (GNUNET_YES == grp->is_origin) 446 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
182 { 447 "Calling message callback with a message "
183 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 448 "of type %u and size %u.\n",
184 "Calling origin's message callback "
185 "with a message of type %u and size %u.\n",
186 ntohs (msg->type), ntohs (msg->size)); 449 ntohs (msg->type), ntohs (msg->size));
187 struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) grp; 450
188 orig->message_cb (orig->cls, msg); 451 if (NULL != grp->message_cb)
189 } 452 grp->message_cb (grp->cb_cls, msg);
190 else
191 {
192 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
193 "Calling member's message callback "
194 "with a message of type %u and size %u.\n",
195 ntohs (msg->type), ntohs (msg->size));
196 struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) grp;
197 mem->message_cb (mem->cls, msg);
198 }
199 453
200 return GNUNET_YES; 454 return GNUNET_YES;
201} 455}
@@ -213,25 +467,12 @@ static void
213handle_multicast_message (struct GNUNET_MULTICAST_Group *grp, 467handle_multicast_message (struct GNUNET_MULTICAST_Group *grp,
214 const struct GNUNET_MULTICAST_MessageHeader *msg) 468 const struct GNUNET_MULTICAST_MessageHeader *msg)
215{ 469{
216 struct GNUNET_HashCode *hash;
217
218 if (GNUNET_YES == grp->is_origin)
219 {
220 struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) grp;
221 hash = &orig->pub_key_hash;
222 }
223 else
224 {
225 struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) grp;
226 hash = &mem->group_key_hash;
227 }
228
229 if (origins != NULL) 470 if (origins != NULL)
230 GNUNET_CONTAINER_multihashmap_get_multiple (origins, hash, message_callback, 471 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
231 (void *) msg); 472 message_callback, (void *) msg);
232 if (members != NULL) 473 if (members != NULL)
233 GNUNET_CONTAINER_multihashmap_get_multiple (members, hash, message_callback, 474 GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
234 (void *) msg); 475 message_callback, (void *) msg);
235} 476}
236 477
237 478
@@ -249,7 +490,7 @@ request_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash,
249 "Calling request callback for a request of type %u and size %u.\n", 490 "Calling request callback for a request of type %u and size %u.\n",
250 ntohs (req->header.type), ntohs (req->header.size)); 491 ntohs (req->header.type), ntohs (req->header.size));
251 492
252 orig->request_cb (orig->cls, &req->member_key, 493 orig->request_cb (orig->grp.cb_cls, &req->member_key,
253 (const struct GNUNET_MessageHeader *) req, 0); 494 (const struct GNUNET_MessageHeader *) req, 0);
254 return GNUNET_YES; 495 return GNUNET_YES;
255} 496}
@@ -264,16 +505,94 @@ request_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash,
264 * @param msg The message. 505 * @param msg The message.
265 */ 506 */
266static void 507static void
267handle_multicast_request (const struct GNUNET_HashCode *group_key_hash, 508handle_multicast_request (struct GNUNET_MULTICAST_Group *grp,
268 const struct GNUNET_MULTICAST_RequestHeader *req) 509 const struct GNUNET_MULTICAST_RequestHeader *req)
269{ 510{
270 if (NULL != origins) 511 if (NULL != origins)
271 GNUNET_CONTAINER_multihashmap_get_multiple (origins, group_key_hash, 512 GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
272 request_callback, (void *) req); 513 request_callback, (void *) req);
273} 514}
274 515
275 516
276/** 517/**
518 * Function called when we receive a message from the service.
519 *
520 * @param cls struct GNUNET_MULTICAST_Group
521 * @param msg Message received, NULL on timeout or fatal error.
522 */
523static void
524message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
525{
526 struct GNUNET_MULTICAST_Group *grp = cls;
527
528 if (NULL == msg)
529 {
530 // timeout / disconnected from service, reconnect
531 reschedule_connect (grp);
532 return;
533 }
534
535 uint16_t size_eq = 0;
536 uint16_t size_min = 0;
537 uint16_t size = ntohs (msg->size);
538 uint16_t type = ntohs (msg->type);
539
540 LOG (GNUNET_ERROR_TYPE_DEBUG,
541 "Received message of type %d and size %u from Multicast service\n",
542 type, size);
543
544 switch (type)
545 {
546 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
547 size_min = sizeof (struct GNUNET_MULTICAST_MessageHeader);
548 break;
549
550 case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
551 size_min = sizeof (struct GNUNET_MULTICAST_RequestHeader);
552 break;
553
554 default:
555 GNUNET_break_op (0);
556 return;
557 }
558
559 if (! ((0 < size_eq && size == size_eq)
560 || (0 < size_min && size_min <= size)))
561 {
562 GNUNET_break_op (0);
563 return;
564 }
565
566 switch (type)
567 {
568 case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
569 handle_multicast_message (grp, (struct GNUNET_MULTICAST_MessageHeader *) msg);
570 break;
571
572 case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
573 if (GNUNET_YES != grp->is_origin)
574 {
575 GNUNET_break (0);
576 break;
577 }
578
579 handle_multicast_request (grp, (struct GNUNET_MULTICAST_RequestHeader *) msg);
580 break;
581
582 default:
583 GNUNET_break_op (0);
584 return;
585 }
586
587 if (NULL != grp->client)
588 {
589 GNUNET_CLIENT_receive (grp->client, &message_handler, grp,
590 GNUNET_TIME_UNIT_FOREVER_REL);
591 }
592}
593
594
595/**
277 * Function to call with the decision made for a join request. 596 * Function to call with the decision made for a join request.
278 * 597 *
279 * Must be called once and only once in response to an invocation of the 598 * Must be called once and only once in response to an invocation of the
@@ -375,28 +694,29 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
375 * candidate will be given a response. Members in the group can send messages 694 * candidate will be given a response. Members in the group can send messages
376 * to the origin (one at a time). 695 * to the origin (one at a time).
377 * 696 *
378 * @param cfg Configuration to use. 697 * @param cfg Configuration to use.
379 * @param priv_key ECC key that will be used to sign messages for this 698 * @param priv_key ECC key that will be used to sign messages for this
380 * multicast session; public key is used to identify the multicast group; 699 * multicast session; public key is used to identify the multicast group;
381 * @param next_fragment_id Next fragment ID to continue counting fragments from 700 * @param max_fragment_id Maximum fragment ID already sent to the group.
382 * when restarting the origin. 0 for a new group. 701 * 0 for a new group.
383 * @param join_cb Function called to approve / disapprove joining of a peer. 702 * @param join_cb Function called to approve / disapprove joining of a peer.
384 * @param mem_test_cb Function multicast can use to test group membership. 703 * @param member_test_cb Function multicast can use to test group membership.
385 * @param replay_frag_cb Function that can be called to replay a message fragment. 704 * @param replay_frag_cb Function that can be called to replay a message fragment.
386 * @param replay_msg_cb Function that can be called to replay a message. 705 * @param replay_msg_cb Function that can be called to replay a message.
387 * @param request_cb Function called with message fragments from group members. 706 * @param request_cb Function called with message fragments from group members.
388 * @param message_cb Function called with the message fragments sent to the 707 * @param message_cb Function called with the message fragments sent to the
389 * network by GNUNET_MULTICAST_origin_to_all(). These message fragments 708 * network by GNUNET_MULTICAST_origin_to_all(). These message fragments
390 * should be stored for answering replay requests later. 709 * should be stored for answering replay requests later.
391 * @param cls Closure for the various callbacks that follow. 710 * @param cls Closure for the various callbacks that follow.
711 *
392 * @return Handle for the origin, NULL on error. 712 * @return Handle for the origin, NULL on error.
393 */ 713 */
394struct GNUNET_MULTICAST_Origin * 714struct GNUNET_MULTICAST_Origin *
395GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, 715GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
396 const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key, 716 const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key,
397 uint64_t next_fragment_id, 717 uint64_t max_fragment_id,
398 GNUNET_MULTICAST_JoinCallback join_cb, 718 GNUNET_MULTICAST_JoinCallback join_cb,
399 GNUNET_MULTICAST_MembershipTestCallback mem_test_cb, 719 GNUNET_MULTICAST_MembershipTestCallback member_test_cb,
400 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb, 720 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
401 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb, 721 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
402 GNUNET_MULTICAST_RequestCallback request_cb, 722 GNUNET_MULTICAST_RequestCallback request_cb,
@@ -404,28 +724,40 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
404 void *cls) 724 void *cls)
405{ 725{
406 struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig)); 726 struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
407 orig->grp.is_origin = GNUNET_YES; 727 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
408 orig->priv_key = *priv_key; 728 struct MulticastOriginStartMessage *start = GNUNET_malloc (sizeof (*start));
409 orig->next_fragment_id = next_fragment_id; 729
410 orig->join_cb = join_cb; 730 start->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
411 orig->mem_test_cb = mem_test_cb; 731 start->header.size = htons (sizeof (*start));
412 orig->replay_frag_cb = replay_frag_cb; 732 start->max_fragment_id = max_fragment_id;
413 orig->replay_msg_cb = replay_msg_cb; 733 memcpy (&start->group_key, priv_key, sizeof (*priv_key));
734
735 grp->reconnect_msg = (struct GNUNET_MessageHeader *) start;
736 grp->is_origin = GNUNET_YES;
737 grp->cfg = cfg;
738
739 grp->cb_cls = cls;
740 grp->join_cb = join_cb;
741 grp->member_test_cb = member_test_cb;
742 grp->replay_frag_cb = replay_frag_cb;
743 grp->replay_msg_cb = replay_msg_cb;
744 grp->message_cb = message_cb;
745
414 orig->request_cb = request_cb; 746 orig->request_cb = request_cb;
415 orig->message_cb = message_cb; 747 orig->priv_key = *priv_key;
416 orig->cls = cls;
417 748
418 GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &orig->pub_key); 749 GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key);
419 GNUNET_CRYPTO_hash (&orig->pub_key, sizeof (orig->pub_key), 750 GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key),
420 &orig->pub_key_hash); 751 &grp->pub_key_hash);
421 752
422 if (NULL == origins) 753 if (NULL == origins)
423 origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 754 origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
424 755
425 GNUNET_CONTAINER_multihashmap_put (origins, &orig->pub_key_hash, orig, 756 GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
426 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 757 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
427 758
428 /* FIXME: send ORIGIN_START to service */ 759 grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
760 grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp);
429 761
430 return orig; 762 return orig;
431} 763}
@@ -439,25 +771,26 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
439void 771void
440GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig) 772GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig)
441{ 773{
442 GNUNET_CONTAINER_multihashmap_remove (origins, &orig->pub_key_hash, orig); 774 disconnect (&orig->grp);
775 GNUNET_CONTAINER_multihashmap_remove (origins, &orig->grp.pub_key_hash, orig);
443 GNUNET_free (orig); 776 GNUNET_free (orig);
444} 777}
445 778
446 779
447/* FIXME: for now just call clients' callbacks
448 * without sending anything to multicast. */
449static void 780static void
450schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 781origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
451{ 782{
452 LOG (GNUNET_ERROR_TYPE_DEBUG, "schedule_origin_to_all()\n"); 783 LOG (GNUNET_ERROR_TYPE_DEBUG, "origin_to_all()\n");
453 struct GNUNET_MULTICAST_Origin *orig = cls; 784 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
454 struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle; 785 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
455 786
456 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; 787 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
457 char buf[GNUNET_MULTICAST_FRAGMENT_MAX_SIZE] = ""; 788 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size);
458 struct GNUNET_MULTICAST_MessageHeader *msg 789 GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
459 = (struct GNUNET_MULTICAST_MessageHeader *) buf; 790
460 int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]); 791 struct GNUNET_MULTICAST_MessageHeader *
792 msg = (struct GNUNET_MULTICAST_MessageHeader *) &mq[1];
793 int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
461 794
462 if (! (GNUNET_YES == ret || GNUNET_NO == ret) 795 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
463 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size) 796 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
@@ -465,98 +798,78 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc
465 LOG (GNUNET_ERROR_TYPE_ERROR, 798 LOG (GNUNET_ERROR_TYPE_ERROR,
466 "OriginTransmitNotify() returned error or invalid message size.\n"); 799 "OriginTransmitNotify() returned error or invalid message size.\n");
467 /* FIXME: handle error */ 800 /* FIXME: handle error */
801 GNUNET_free (mq);
468 return; 802 return;
469 } 803 }
470 804
471 if (GNUNET_NO == ret && 0 == buf_size) 805 if (GNUNET_NO == ret && 0 == buf_size)
806 {
807 GNUNET_free (mq);
472 return; /* Transmission paused. */ 808 return; /* Transmission paused. */
809 }
473 810
474 msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); 811 msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
475 msg->header.size = htons (sizeof (*msg) + buf_size); 812 msg->header.size = htons (sizeof (*msg) + buf_size);
476 msg->message_id = GNUNET_htonll (mh->message_id); 813 msg->message_id = GNUNET_htonll (tmit->message_id);
477 msg->group_generation = mh->group_generation; 814 msg->group_generation = tmit->group_generation;
478 815 msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
479 /* FIXME: add fragment ID and signature in the service instead of here */ 816 tmit->fragment_offset += sizeof (*msg) + buf_size;
480 msg->fragment_id = GNUNET_htonll (orig->next_fragment_id++);
481 msg->fragment_offset = GNUNET_htonll (mh->fragment_offset);
482 mh->fragment_offset += sizeof (*msg) + buf_size;
483 msg->purpose.size = htonl (sizeof (*msg) + buf_size
484 - sizeof (msg->header)
485 - sizeof (msg->hop_counter)
486 - sizeof (msg->signature));
487 msg->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
488
489 if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&orig->priv_key, &msg->purpose,
490 &msg->signature))
491 {
492 /* FIXME: handle error */
493 return;
494 }
495
496 /* FIXME: send msg to the service and only then call handle_multicast_message
497 * with the returned signed message.
498 */
499 handle_multicast_message (&orig->grp, msg);
500 817
501 if (GNUNET_NO == ret) 818 transmit_next (grp);
502 GNUNET_SCHEDULER_add_delayed (
503 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1),
504 schedule_origin_to_all, orig);
505} 819}
506 820
507 821
508/** 822/**
509 * Send a message to the multicast group. 823 * Send a message to the multicast group.
510 * 824 *
511 * @param origin Handle to the multicast group. 825 * @param orig Handle to the multicast group.
512 * @param message_id Application layer ID for the message. Opaque to multicast. 826 * @param message_id Application layer ID for the message. Opaque to multicast.
513 * @param group_generation Group generation of the message. Documented in 827 * @param group_generation Group generation of the message.
514 * `struct GNUNET_MULTICAST_MessageHeader`. 828 * Documented in struct GNUNET_MULTICAST_MessageHeader.
515 * @param notify Function to call to get the message. 829 * @param notify Function to call to get the message.
516 * @param notify_cls Closure for @a notify. 830 * @param notify_cls Closure for @a notify.
517 * @return NULL on error (i.e. request already pending). 831 *
832 * @return Message handle on success,
833 * NULL on error (i.e. another request is already pending).
518 */ 834 */
519struct GNUNET_MULTICAST_OriginMessageHandle * 835struct GNUNET_MULTICAST_OriginTransmitHandle *
520GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *origin, 836GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
521 uint64_t message_id, 837 uint64_t message_id,
522 uint64_t group_generation, 838 uint64_t group_generation,
523 GNUNET_MULTICAST_OriginTransmitNotify notify, 839 GNUNET_MULTICAST_OriginTransmitNotify notify,
524 void *notify_cls) 840 void *notify_cls)
525{ 841{
526 struct GNUNET_MULTICAST_OriginMessageHandle *mh = &origin->msg_handle; 842 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
527 mh->origin = origin; 843 tmit->origin = orig;
528 mh->message_id = message_id; 844 tmit->message_id = message_id;
529 mh->group_generation = group_generation; 845 tmit->group_generation = group_generation;
530 mh->notify = notify; 846 tmit->notify = notify;
531 mh->notify_cls = notify_cls; 847 tmit->notify_cls = notify_cls;
532 848
533 /* add some delay for testing */ 849 origin_to_all (orig);
534 GNUNET_SCHEDULER_add_delayed ( 850 return tmit;
535 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1),
536 schedule_origin_to_all, origin);
537 return &origin->msg_handle;
538} 851}
539 852
540 853
541/** 854/**
542 * Resume message transmission to multicast group. 855 * Resume message transmission to multicast group.
543 * 856 *
544 * @param mh Request to cancel. 857 * @param th Transmission to cancel.
545 */ 858 */
546void 859void
547GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginMessageHandle *mh) 860GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
548{ 861{
549 GNUNET_SCHEDULER_add_now (schedule_origin_to_all, mh->origin); 862 origin_to_all (th->origin);
550} 863}
551 864
552 865
553/** 866/**
554 * Cancel request for message transmission to multicast group. 867 * Cancel request for message transmission to multicast group.
555 * 868 *
556 * @param mh Request to cancel. 869 * @param th Transmission to cancel.
557 */ 870 */
558void 871void
559GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginMessageHandle *mh) 872GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
560{ 873{
561} 874}
562 875
@@ -584,12 +897,12 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginMessageHand
584 * @param relays Peer identities of members of the group, which serve as relays 897 * @param relays Peer identities of members of the group, which serve as relays
585 * and can be used to join the group at. and send the @a join_request to. 898 * and can be used to join the group at. and send the @a join_request to.
586 * If empty, the @a join_request is sent directly to the @a origin. 899 * If empty, the @a join_request is sent directly to the @a origin.
587 * @param join_request Application-dependent join request to be passed to the peer 900 * @param join_req Application-dependent join request to be passed to the peer
588 * @a relay (might, for example, contain a user, bind user 901 * @a relay (might, for example, contain a user, bind user
589 * identity/pseudonym to peer identity, application-level message to 902 * identity/pseudonym to peer identity, application-level message to
590 * origin, etc.). 903 * origin, etc.).
591 * @param join_cb Function called to approve / disapprove joining of a peer. 904 * @param join_cb Function called to approve / disapprove joining of a peer.
592 * @param mem_test_cb Function multicast can use to test group membership. 905 * @param member_test_cb Function multicast can use to test group membership.
593 * @param replay_frag_cb Function that can be called to replay message fragments 906 * @param replay_frag_cb Function that can be called to replay message fragments
594 * this peer already knows from this group. NULL if this 907 * this peer already knows from this group. NULL if this
595 * client is unable to support replay. 908 * client is unable to support replay.
@@ -609,7 +922,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
609 const struct GNUNET_PeerIdentity *origin, 922 const struct GNUNET_PeerIdentity *origin,
610 uint32_t relay_count, 923 uint32_t relay_count,
611 const struct GNUNET_PeerIdentity *relays, 924 const struct GNUNET_PeerIdentity *relays,
612 const struct GNUNET_MessageHeader *join_request, 925 const struct GNUNET_MessageHeader *join_req,
613 GNUNET_MULTICAST_JoinCallback join_cb, 926 GNUNET_MULTICAST_JoinCallback join_cb,
614 GNUNET_MULTICAST_MembershipTestCallback member_test_cb, 927 GNUNET_MULTICAST_MembershipTestCallback member_test_cb,
615 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb, 928 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
@@ -618,33 +931,47 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
618 void *cls) 931 void *cls)
619{ 932{
620 struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem)); 933 struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem));
621 mem->group_key = *group_key; 934 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
622 mem->member_key = *member_key; 935
936 uint16_t relay_size = relay_count * sizeof (*relays);
937 uint16_t join_req_size = (NULL != join_req) ? ntohs (join_req->size) : 0;
938 struct MulticastMemberJoinMessage *
939 join = GNUNET_malloc (sizeof (*join) + relay_size + join_req_size);
940 join->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
941 join->header.size = htons (sizeof (*join) + relay_size + join_req_size);
942 join->group_key = *group_key;
943 join->member_key = *member_key;
944 join->origin = *origin;
945 memcpy (&join[1], relays, relay_size);
946 memcpy (((char *) &join[1]) + relay_size, join_req, join_req_size);
947
948 grp->reconnect_msg = (struct GNUNET_MessageHeader *) join;
949 grp->is_origin = GNUNET_NO;
950 grp->cfg = cfg;
951 grp->pub_key = *group_key;
952
953 grp->join_cb = join_cb;
954 grp->member_test_cb = member_test_cb;
955 grp->replay_frag_cb = replay_frag_cb;
956 grp->message_cb = message_cb;
957 grp->cb_cls = cls;
958
623 mem->origin = *origin; 959 mem->origin = *origin;
624 mem->relay_count = relay_count; 960 mem->relay_count = relay_count;
625 mem->relays = *relays; 961 mem->relays = *relays;
626 mem->join_cb = join_cb; 962 mem->priv_key = *member_key;
627 mem->member_test_cb = member_test_cb;
628 mem->replay_frag_cb = replay_frag_cb;
629 mem->message_cb = message_cb;
630 mem->cls = cls;
631
632 if (NULL != join_request)
633 {
634 uint16_t size = ntohs (join_request->size);
635 mem->join_request = GNUNET_malloc (size);
636 memcpy (mem->join_request, join_request, size);
637 }
638 963
639 GNUNET_CRYPTO_hash (&mem->group_key, sizeof (mem->group_key), &mem->group_key_hash); 964 GNUNET_CRYPTO_eddsa_key_get_public (&mem->priv_key, &grp->pub_key);
965 GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash);
640 966
641 if (NULL == members) 967 if (NULL == members)
642 members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 968 members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
643 969
644 GNUNET_CONTAINER_multihashmap_put (members, &mem->group_key_hash, mem, 970 GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
645 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 971 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
646 972
647 /* FIXME: send MEMBER_JOIN to service */ 973 grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
974 grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp);
648 975
649 return mem; 976 return mem;
650} 977}
@@ -663,7 +990,8 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
663void 990void
664GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem) 991GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem)
665{ 992{
666 GNUNET_CONTAINER_multihashmap_remove (members, &mem->group_key_hash, mem); 993 disconnect (&mem->grp);
994 GNUNET_CONTAINER_multihashmap_remove (members, &mem->grp.pub_key_hash, mem);
667 GNUNET_free (mem); 995 GNUNET_free (mem);
668} 996}
669 997
@@ -729,19 +1057,20 @@ GNUNET_MULTICAST_member_replay_cancel (struct GNUNET_MULTICAST_MemberReplayHandl
729} 1057}
730 1058
731 1059
732/* FIXME: for now just send back to the client what it sent. */
733static void 1060static void
734schedule_member_to_origin (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 1061member_to_origin (struct GNUNET_MULTICAST_Member *mem)
735{ 1062{
736 LOG (GNUNET_ERROR_TYPE_DEBUG, "schedule_member_to_origin()\n"); 1063 LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
737 struct GNUNET_MULTICAST_Member *mem = cls; 1064 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
738 struct GNUNET_MULTICAST_MemberRequestHandle *rh = &mem->req_handle; 1065 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
739 1066
740 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; 1067 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD;
741 char buf[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; 1068 struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size);
742 struct GNUNET_MULTICAST_RequestHeader *req 1069 GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
743 = (struct GNUNET_MULTICAST_RequestHeader *) buf; 1070
744 int ret = rh->notify (rh->notify_cls, &buf_size, &req[1]); 1071 struct GNUNET_MULTICAST_RequestHeader *
1072 req = (struct GNUNET_MULTICAST_RequestHeader *) &mq[1];
1073 int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
745 1074
746 if (! (GNUNET_YES == ret || GNUNET_NO == ret) 1075 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
747 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size) 1076 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
@@ -757,73 +1086,47 @@ schedule_member_to_origin (void *cls, const struct GNUNET_SCHEDULER_TaskContext
757 1086
758 req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST); 1087 req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
759 req->header.size = htons (sizeof (*req) + buf_size); 1088 req->header.size = htons (sizeof (*req) + buf_size);
760 req->request_id = GNUNET_htonll (rh->request_id); 1089 req->request_id = GNUNET_htonll (tmit->request_id);
761 1090 req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
762 /* FIXME: add fragment ID and signature in the service instead of here */ 1091 tmit->fragment_offset += sizeof (*req) + buf_size;
763 req->fragment_id = GNUNET_ntohll (mem->next_fragment_id++);
764 req->fragment_offset = GNUNET_ntohll (rh->fragment_offset);
765 rh->fragment_offset += sizeof (*req) + buf_size;
766 req->purpose.size = htonl (sizeof (*req) + buf_size
767 - sizeof (req->header)
768 - sizeof (req->member_key)
769 - sizeof (req->signature));
770 req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
771
772 if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->member_key, &req->purpose,
773 &req->signature))
774 {
775 /* FIXME: handle error */
776 return;
777 }
778
779 /* FIXME: send req to the service and only then call handle_multicast_request
780 * with the returned request.
781 */
782 handle_multicast_request (&mem->group_key_hash, req);
783 1092
784 if (GNUNET_NO == ret) 1093 transmit_next (grp);
785 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
786 (GNUNET_TIME_UNIT_SECONDS, 1),
787 schedule_member_to_origin, mem);
788} 1094}
789 1095
790 1096
791/** 1097/**
792 * Send a message to the origin of the multicast group. 1098 * Send a message to the origin of the multicast group.
793 * 1099 *
794 * @param member Membership handle. 1100 * @param mem Membership handle.
795 * @param request_id Application layer ID for the request. Opaque to multicast. 1101 * @param request_id Application layer ID for the request. Opaque to multicast.
796 * @param notify Callback to call to get the message. 1102 * @param notify Callback to call to get the message.
797 * @param notify_cls Closure for @a notify. 1103 * @param notify_cls Closure for @a notify.
798 * @return Handle to cancel request, NULL on error (i.e. request already pending). 1104 * @return Handle to cancel request, NULL on error (i.e. request already pending).
799 */ 1105 */
800struct GNUNET_MULTICAST_MemberRequestHandle * 1106struct GNUNET_MULTICAST_MemberTransmitHandle *
801GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *member, 1107GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
802 uint64_t request_id, 1108 uint64_t request_id,
803 GNUNET_MULTICAST_MemberTransmitNotify notify, 1109 GNUNET_MULTICAST_MemberTransmitNotify notify,
804 void *notify_cls) 1110 void *notify_cls)
805{ 1111{
806 struct GNUNET_MULTICAST_MemberRequestHandle *rh = &member->req_handle; 1112 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
807 rh->member = member; 1113 tmit->member = mem;
808 rh->request_id = request_id; 1114 tmit->request_id = request_id;
809 rh->notify = notify; 1115 tmit->notify = notify;
810 rh->notify_cls = notify_cls; 1116 tmit->notify_cls = notify_cls;
811 1117
812 /* FIXME: remove delay, it's there only for testing */ 1118 member_to_origin (mem);
813 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 1119 return tmit;
814 (GNUNET_TIME_UNIT_SECONDS, 1),
815 schedule_member_to_origin, member);
816 return &member->req_handle;
817} 1120}
818 1121
819 1122
820/** 1123/**
821 * Resume message transmission to origin. 1124 * Resume message transmission to origin.
822 * 1125 *
823 * @param rh Request to cancel. 1126 * @param th Transmission to cancel.
824 */ 1127 */
825void 1128void
826GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberRequestHandle *rh) 1129GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
827{ 1130{
828 1131
829} 1132}
@@ -832,10 +1135,10 @@ GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberRequestH
832/** 1135/**
833 * Cancel request for message transmission to origin. 1136 * Cancel request for message transmission to origin.
834 * 1137 *
835 * @param rh Request to cancel. 1138 * @param th Transmission to cancel.
836 */ 1139 */
837void 1140void
838GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberRequestHandle *rh) 1141GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
839{ 1142{
840} 1143}
841 1144