diff options
author | Gabor X Toth <*@tg-x.net> | 2013-10-10 18:08:53 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2013-10-10 18:08:53 +0000 |
commit | 1b5d4aba9d8bbcb62c93f96782e3567f6f79d0cb (patch) | |
tree | 3cd28bfee831af0417c2dcbb543c03481517ad00 /src/psyc | |
parent | 67a8e21eedb6d35fec76841d4a1a6b4b41b37879 (diff) | |
download | gnunet-1b5d4aba9d8bbcb62c93f96782e3567f6f79d0cb.tar.gz gnunet-1b5d4aba9d8bbcb62c93f96782e3567f6f79d0cb.zip |
PSYC: master msg transmission
Diffstat (limited to 'src/psyc')
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 271 | ||||
-rw-r--r-- | src/psyc/psyc.h | 7 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 218 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 90 |
4 files changed, 472 insertions, 114 deletions
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index 7f5189ab8..d3f203ebf 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c | |||
@@ -34,8 +34,6 @@ | |||
34 | #include "gnunet_psyc_service.h" | 34 | #include "gnunet_psyc_service.h" |
35 | #include "psyc.h" | 35 | #include "psyc.h" |
36 | 36 | ||
37 | #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) | ||
38 | |||
39 | 37 | ||
40 | /** | 38 | /** |
41 | * Handle to our current configuration. | 39 | * Handle to our current configuration. |
@@ -58,6 +56,11 @@ static struct GNUNET_SERVER_NotificationContext *nc; | |||
58 | static struct GNUNET_PSYCSTORE_Handle *store; | 56 | static struct GNUNET_PSYCSTORE_Handle *store; |
59 | 57 | ||
60 | /** | 58 | /** |
59 | * channel's pub_key_hash -> struct Channel | ||
60 | */ | ||
61 | static struct GNUNET_CONTAINER_MultiHashMap *clients; | ||
62 | |||
63 | /** | ||
61 | * Message in the transmission queue. | 64 | * Message in the transmission queue. |
62 | */ | 65 | */ |
63 | struct TransmitMessage | 66 | struct TransmitMessage |
@@ -81,6 +84,7 @@ struct Channel | |||
81 | struct TransmitMessage *tmit_tail; | 84 | struct TransmitMessage *tmit_tail; |
82 | 85 | ||
83 | char *tmit_buf; | 86 | char *tmit_buf; |
87 | GNUNET_SCHEDULER_TaskIdentifier tmit_task; | ||
84 | uint32_t tmit_mod_count; | 88 | uint32_t tmit_mod_count; |
85 | uint32_t tmit_mod_recvd; | 89 | uint32_t tmit_mod_recvd; |
86 | uint16_t tmit_size; | 90 | uint16_t tmit_size; |
@@ -96,8 +100,9 @@ struct Channel | |||
96 | struct Master | 100 | struct Master |
97 | { | 101 | { |
98 | struct Channel channel; | 102 | struct Channel channel; |
99 | struct GNUNET_CRYPTO_EccPrivateKey private_key; | 103 | struct GNUNET_CRYPTO_EccPrivateKey priv_key; |
100 | struct GNUNET_CRYPTO_EccPublicSignKey public_key; | 104 | struct GNUNET_CRYPTO_EccPublicSignKey pub_key; |
105 | struct GNUNET_HashCode pub_key_hash; | ||
101 | 106 | ||
102 | struct GNUNET_MULTICAST_Origin *origin; | 107 | struct GNUNET_MULTICAST_Origin *origin; |
103 | struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; | 108 | struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; |
@@ -120,13 +125,20 @@ struct Slave | |||
120 | { | 125 | { |
121 | struct Channel channel; | 126 | struct Channel channel; |
122 | struct GNUNET_CRYPTO_EccPrivateKey slave_key; | 127 | struct GNUNET_CRYPTO_EccPrivateKey slave_key; |
123 | struct GNUNET_CRYPTO_EccPublicSignKey channel_key; | 128 | struct GNUNET_CRYPTO_EccPublicSignKey chan_key; |
129 | struct GNUNET_HashCode chan_key_hash; | ||
124 | 130 | ||
125 | struct GNUNET_MULTICAST_Member *member; | 131 | struct GNUNET_MULTICAST_Member *member; |
126 | struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle; | 132 | struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle; |
127 | 133 | ||
134 | struct GNUNET_PeerIdentity origin; | ||
135 | struct GNUNET_PeerIdentity *relays; | ||
136 | struct GNUNET_MessageHeader *join_req; | ||
137 | |||
128 | uint64_t max_message_id; | 138 | uint64_t max_message_id; |
129 | uint64_t max_request_id; | 139 | uint64_t max_request_id; |
140 | |||
141 | uint32_t relay_count; | ||
130 | }; | 142 | }; |
131 | 143 | ||
132 | 144 | ||
@@ -166,41 +178,151 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
166 | 178 | ||
167 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client); | 179 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client); |
168 | 180 | ||
169 | struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, | 181 | struct Channel *ch |
170 | struct Channel); | 182 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
171 | GNUNET_assert (NULL != ch); | 183 | if (NULL == ch) |
184 | { | ||
185 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
186 | "User context is NULL in client_disconnect()\n"); | ||
187 | GNUNET_break (0); | ||
188 | return; | ||
189 | } | ||
172 | 190 | ||
173 | if (NULL != ch->tmit_buf) | 191 | if (NULL != ch->tmit_buf) |
174 | { | 192 | { |
175 | GNUNET_free (ch->tmit_buf); | 193 | GNUNET_free (ch->tmit_buf); |
176 | ch->tmit_buf = NULL; | 194 | ch->tmit_buf = NULL; |
177 | } | 195 | } |
196 | |||
197 | if (ch->is_master) | ||
198 | { | ||
199 | struct Master *mst = (struct Master *) ch; | ||
200 | if (NULL != mst->origin) | ||
201 | GNUNET_MULTICAST_origin_stop (mst->origin); | ||
202 | } | ||
203 | else | ||
204 | { | ||
205 | struct Slave *slv = (struct Slave *) ch; | ||
206 | if (NULL != slv->join_req) | ||
207 | GNUNET_free (slv->join_req); | ||
208 | if (NULL != slv->relays) | ||
209 | GNUNET_free (slv->relays); | ||
210 | if (NULL != slv->member) | ||
211 | GNUNET_MULTICAST_member_part (slv->member); | ||
212 | } | ||
213 | |||
178 | GNUNET_free (ch); | 214 | GNUNET_free (ch); |
179 | } | 215 | } |
180 | 216 | ||
217 | void | ||
218 | join_cb (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *member_key, | ||
219 | const struct GNUNET_MessageHeader *join_req, | ||
220 | struct GNUNET_MULTICAST_JoinHandle *jh) | ||
221 | { | ||
222 | |||
223 | } | ||
181 | 224 | ||
182 | void | 225 | void |
183 | counters_cb (void *cls, uint64_t max_fragment_id, uint64_t max_message_id, | 226 | membership_test_cb (void *cls, |
184 | uint64_t max_group_generation, uint64_t max_state_message_id) | 227 | const struct GNUNET_CRYPTO_EccPublicSignKey *member_key, |
228 | uint64_t message_id, uint64_t group_generation, | ||
229 | struct GNUNET_MULTICAST_MembershipTestHandle *mth) | ||
185 | { | 230 | { |
186 | struct Channel *ch = cls; | 231 | |
232 | } | ||
233 | |||
234 | void | ||
235 | replay_fragment_cb (void *cls, | ||
236 | const struct GNUNET_CRYPTO_EccPublicSignKey *member_key, | ||
237 | uint64_t fragment_id, uint64_t flags, | ||
238 | struct GNUNET_MULTICAST_ReplayHandle *rh) | ||
239 | { | ||
240 | |||
241 | } | ||
242 | |||
243 | void | ||
244 | replay_message_cb (void *cls, | ||
245 | const struct GNUNET_CRYPTO_EccPublicSignKey *member_key, | ||
246 | uint64_t message_id, | ||
247 | uint64_t fragment_offset, | ||
248 | uint64_t flags, | ||
249 | struct GNUNET_MULTICAST_ReplayHandle *rh) | ||
250 | { | ||
251 | |||
252 | } | ||
253 | |||
254 | void | ||
255 | request_cb (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *member_key, | ||
256 | const struct GNUNET_MessageHeader *req, | ||
257 | enum GNUNET_MULTICAST_MessageFlags flags) | ||
258 | { | ||
259 | |||
260 | } | ||
261 | |||
262 | void | ||
263 | message_cb (void *cls, const struct GNUNET_MessageHeader *msg) | ||
264 | { | ||
265 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
266 | "Received message of type %u from multicast.\n", | ||
267 | ntohs (msg->type)); | ||
268 | } | ||
269 | |||
270 | void | ||
271 | master_counters_cb (void *cls, int result, uint64_t max_fragment_id, | ||
272 | uint64_t max_message_id, uint64_t max_group_generation, | ||
273 | uint64_t max_state_message_id) | ||
274 | { | ||
275 | struct Master *mst = cls; | ||
276 | struct Channel *ch = &mst->channel; | ||
187 | struct CountersResult *res = GNUNET_malloc (sizeof (*res)); | 277 | struct CountersResult *res = GNUNET_malloc (sizeof (*res)); |
278 | res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); | ||
188 | res->header.size = htons (sizeof (*res)); | 279 | res->header.size = htons (sizeof (*res)); |
280 | res->result_code = htonl (result); | ||
189 | res->max_message_id = GNUNET_htonll (max_message_id); | 281 | res->max_message_id = GNUNET_htonll (max_message_id); |
190 | 282 | ||
191 | if (ch->is_master) | 283 | if (GNUNET_OK == result || GNUNET_NO == result) |
192 | { | 284 | { |
193 | struct Master *mst = cls; | ||
194 | mst->max_message_id = max_message_id; | 285 | mst->max_message_id = max_message_id; |
195 | mst->max_state_message_id = max_state_message_id; | 286 | mst->max_state_message_id = max_state_message_id; |
196 | mst->max_group_generation = max_group_generation; | 287 | mst->max_group_generation = max_group_generation; |
197 | res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); | 288 | mst->origin |
289 | = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, | ||
290 | max_fragment_id + 1, | ||
291 | join_cb, membership_test_cb, | ||
292 | replay_fragment_cb, replay_message_cb, | ||
293 | request_cb, message_cb, ch); | ||
198 | } | 294 | } |
199 | else | 295 | GNUNET_SERVER_notification_context_add (nc, ch->client); |
296 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, | ||
297 | GNUNET_NO); | ||
298 | GNUNET_free (res); | ||
299 | } | ||
300 | |||
301 | |||
302 | void | ||
303 | slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, | ||
304 | uint64_t max_message_id, uint64_t max_group_generation, | ||
305 | uint64_t max_state_message_id) | ||
306 | { | ||
307 | struct Slave *slv = cls; | ||
308 | struct Channel *ch = &slv->channel; | ||
309 | struct CountersResult *res = GNUNET_malloc (sizeof (*res)); | ||
310 | res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); | ||
311 | res->header.size = htons (sizeof (*res)); | ||
312 | res->result_code = htonl (result); | ||
313 | res->max_message_id = GNUNET_htonll (max_message_id); | ||
314 | |||
315 | if (GNUNET_OK == result || GNUNET_NO == result) | ||
200 | { | 316 | { |
201 | struct Slave *slv = cls; | ||
202 | slv->max_message_id = max_message_id; | 317 | slv->max_message_id = max_message_id; |
203 | res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); | 318 | slv->member |
319 | = GNUNET_MULTICAST_member_join (cfg, &slv->chan_key, &slv->slave_key, | ||
320 | &slv->origin, | ||
321 | slv->relay_count, slv->relays, | ||
322 | slv->join_req, join_cb, | ||
323 | membership_test_cb, | ||
324 | replay_fragment_cb, replay_message_cb, | ||
325 | message_cb, ch); | ||
204 | } | 326 | } |
205 | 327 | ||
206 | GNUNET_SERVER_notification_context_add (nc, ch->client); | 328 | GNUNET_SERVER_notification_context_add (nc, ch->client); |
@@ -220,14 +342,17 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client, | |||
220 | mst->channel.client = client; | 342 | mst->channel.client = client; |
221 | mst->channel.is_master = GNUNET_YES; | 343 | mst->channel.is_master = GNUNET_YES; |
222 | mst->policy = ntohl (req->policy); | 344 | mst->policy = ntohl (req->policy); |
223 | mst->private_key = req->channel_key; | 345 | mst->priv_key = req->channel_key; |
224 | GNUNET_CRYPTO_ecc_key_get_public_for_signature (&mst->private_key, | 346 | GNUNET_CRYPTO_ecc_key_get_public_for_signature (&mst->priv_key, |
225 | &mst->public_key); | 347 | &mst->pub_key); |
348 | GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash); | ||
226 | 349 | ||
227 | GNUNET_PSYCSTORE_counters_get (store, &mst->public_key, | 350 | GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key, |
228 | counters_cb, mst); | 351 | master_counters_cb, mst); |
229 | 352 | ||
230 | GNUNET_SERVER_client_set_user_context (client, mst); | 353 | GNUNET_SERVER_client_set_user_context (client, &mst->channel); |
354 | GNUNET_CONTAINER_multihashmap_put (clients, &mst->pub_key_hash, mst, | ||
355 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
231 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 356 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
232 | } | 357 | } |
233 | 358 | ||
@@ -241,13 +366,25 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
241 | struct Slave *slv = GNUNET_new (struct Slave); | 366 | struct Slave *slv = GNUNET_new (struct Slave); |
242 | slv->channel.client = client; | 367 | slv->channel.client = client; |
243 | slv->channel.is_master = GNUNET_NO; | 368 | slv->channel.is_master = GNUNET_NO; |
244 | slv->channel_key = req->channel_key; | ||
245 | slv->slave_key = req->slave_key; | 369 | slv->slave_key = req->slave_key; |
246 | 370 | slv->chan_key = req->channel_key; | |
247 | GNUNET_PSYCSTORE_counters_get (store, &slv->channel_key, | 371 | GNUNET_CRYPTO_hash (&slv->chan_key, sizeof (slv->chan_key), |
248 | counters_cb, slv); | 372 | &slv->chan_key_hash); |
249 | 373 | slv->origin = req->origin; | |
250 | GNUNET_SERVER_client_set_user_context (client, slv); | 374 | slv->relay_count = ntohl (req->relay_count); |
375 | |||
376 | const struct GNUNET_PeerIdentity *relays | ||
377 | = (const struct GNUNET_PeerIdentity *) &req[1]; | ||
378 | slv->relays | ||
379 | = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity)); | ||
380 | uint32_t i; | ||
381 | for (i = 0; i < slv->relay_count; i++) | ||
382 | memcpy (&slv->relays[i], &relays[i], sizeof (*relays)); | ||
383 | |||
384 | GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key, | ||
385 | slave_counters_cb, slv); | ||
386 | |||
387 | GNUNET_SERVER_client_set_user_context (client, &slv->channel); | ||
251 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 388 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
252 | } | 389 | } |
253 | 390 | ||
@@ -268,34 +405,40 @@ send_transmit_ack (struct Channel *ch) | |||
268 | 405 | ||
269 | 406 | ||
270 | static int | 407 | static int |
271 | transmit_notify (void *cls, uint64_t fragment_id, size_t *data_size, void *data) | 408 | transmit_notify (void *cls, size_t *data_size, void *data) |
272 | { | 409 | { |
410 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify()\n"); | ||
273 | struct Channel *ch = cls; | 411 | struct Channel *ch = cls; |
274 | struct TransmitMessage *msg = ch->tmit_head; | 412 | struct TransmitMessage *msg = ch->tmit_head; |
275 | 413 | ||
276 | if (NULL == msg || *data_size < msg->size) | 414 | if (NULL == msg || *data_size < ntohs (msg->size)) |
277 | { | 415 | { |
278 | *data_size = 0; | 416 | *data_size = 0; |
279 | return GNUNET_NO; | 417 | return GNUNET_NO; |
280 | } | 418 | } |
281 | 419 | ||
282 | memcpy (data, msg->buf, msg->size); | 420 | *data_size = ntohs (msg->size); |
283 | *data_size = msg->size; | 421 | memcpy (data, msg->buf, *data_size); |
284 | 422 | ||
285 | GNUNET_free (ch->tmit_buf); | 423 | GNUNET_free (ch->tmit_buf); |
424 | ch->tmit_buf = NULL; | ||
286 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg); | 425 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg); |
287 | 426 | ||
288 | return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES; | 427 | return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES; |
289 | } | 428 | } |
290 | 429 | ||
291 | 430 | ||
292 | static int | 431 | static void |
293 | master_transmit_message (struct Master *mst) | 432 | master_transmit_message (void *cls, |
433 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
294 | { | 434 | { |
435 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n"); | ||
436 | struct Master *mst = cls; | ||
437 | mst->channel.tmit_task = 0; | ||
295 | if (NULL == mst->tmit_handle) | 438 | if (NULL == mst->tmit_handle) |
296 | { | 439 | { |
297 | mst->tmit_handle | 440 | mst->tmit_handle |
298 | = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id, | 441 | = GNUNET_MULTICAST_origin_to_all (mst->origin, ++mst->max_message_id, |
299 | mst->max_group_generation, | 442 | mst->max_group_generation, |
300 | transmit_notify, mst); | 443 | transmit_notify, mst); |
301 | } | 444 | } |
@@ -303,24 +446,25 @@ master_transmit_message (struct Master *mst) | |||
303 | { | 446 | { |
304 | GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle); | 447 | GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle); |
305 | } | 448 | } |
306 | return GNUNET_OK; | ||
307 | } | 449 | } |
308 | 450 | ||
309 | 451 | ||
310 | static int | 452 | static void |
311 | slave_transmit_message (struct Slave *slv) | 453 | slave_transmit_message (void *cls, |
454 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
312 | { | 455 | { |
456 | struct Slave *slv = cls; | ||
457 | slv->channel.tmit_task = 0; | ||
313 | if (NULL == slv->tmit_handle) | 458 | if (NULL == slv->tmit_handle) |
314 | { | 459 | { |
315 | slv->tmit_handle | 460 | slv->tmit_handle |
316 | = GNUNET_MULTICAST_member_to_origin(slv->member, slv->max_request_id, | 461 | = GNUNET_MULTICAST_member_to_origin(slv->member, ++slv->max_request_id, |
317 | transmit_notify, slv); | 462 | transmit_notify, slv); |
318 | } | 463 | } |
319 | else | 464 | else |
320 | { | 465 | { |
321 | GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle); | 466 | GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle); |
322 | } | 467 | } |
323 | return GNUNET_OK; | ||
324 | } | 468 | } |
325 | 469 | ||
326 | 470 | ||
@@ -328,6 +472,7 @@ static int | |||
328 | buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg) | 472 | buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg) |
329 | { | 473 | { |
330 | uint16_t size = ntohs (msg->size); | 474 | uint16_t size = ntohs (msg->size); |
475 | struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO; | ||
331 | 476 | ||
332 | if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size) | 477 | if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size) |
333 | return GNUNET_SYSERR; | 478 | return GNUNET_SYSERR; |
@@ -353,12 +498,17 @@ buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg) | |||
353 | tmit_msg->size = size; | 498 | tmit_msg->size = size; |
354 | tmit_msg->status = ch->tmit_status; | 499 | tmit_msg->status = ch->tmit_status; |
355 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); | 500 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); |
356 | 501 | tmit_delay = GNUNET_TIME_UNIT_ZERO; | |
357 | ch->is_master | ||
358 | ? master_transmit_message ((struct Master *) ch) | ||
359 | : slave_transmit_message ((struct Slave *) ch); | ||
360 | } | 502 | } |
361 | 503 | ||
504 | if (0 != ch->tmit_task) | ||
505 | GNUNET_SCHEDULER_cancel (ch->tmit_task); | ||
506 | |||
507 | ch->tmit_task | ||
508 | = ch->is_master | ||
509 | ? GNUNET_SCHEDULER_add_delayed (tmit_delay, master_transmit_message, ch) | ||
510 | : GNUNET_SCHEDULER_add_delayed (tmit_delay, slave_transmit_message, ch); | ||
511 | |||
362 | return GNUNET_OK; | 512 | return GNUNET_OK; |
363 | } | 513 | } |
364 | 514 | ||
@@ -368,8 +518,8 @@ handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, | |||
368 | { | 518 | { |
369 | const struct GNUNET_PSYC_MessageMethod *meth | 519 | const struct GNUNET_PSYC_MessageMethod *meth |
370 | = (const struct GNUNET_PSYC_MessageMethod *) msg; | 520 | = (const struct GNUNET_PSYC_MessageMethod *) msg; |
371 | struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, | 521 | struct Channel *ch |
372 | struct Channel); | 522 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
373 | GNUNET_assert (NULL != ch); | 523 | GNUNET_assert (NULL != ch); |
374 | 524 | ||
375 | if (GNUNET_NO != ch->in_transmit) | 525 | if (GNUNET_NO != ch->in_transmit) |
@@ -378,6 +528,7 @@ handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, | |||
378 | return; | 528 | return; |
379 | } | 529 | } |
380 | 530 | ||
531 | ch->in_transmit = GNUNET_YES; | ||
381 | ch->tmit_buf = NULL; | 532 | ch->tmit_buf = NULL; |
382 | ch->tmit_size = 0; | 533 | ch->tmit_size = 0; |
383 | ch->tmit_mod_recvd = 0; | 534 | ch->tmit_mod_recvd = 0; |
@@ -388,6 +539,8 @@ handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, | |||
388 | 539 | ||
389 | if (0 == ch->tmit_mod_count) | 540 | if (0 == ch->tmit_mod_count) |
390 | send_transmit_ack (ch); | 541 | send_transmit_ack (ch); |
542 | |||
543 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
391 | }; | 544 | }; |
392 | 545 | ||
393 | 546 | ||
@@ -397,8 +550,8 @@ handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client, | |||
397 | { | 550 | { |
398 | const struct GNUNET_PSYC_MessageModifier *mod | 551 | const struct GNUNET_PSYC_MessageModifier *mod |
399 | = (const struct GNUNET_PSYC_MessageModifier *) msg; | 552 | = (const struct GNUNET_PSYC_MessageModifier *) msg; |
400 | struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, | 553 | struct Channel *ch |
401 | struct Channel); | 554 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
402 | GNUNET_assert (NULL != ch); | 555 | GNUNET_assert (NULL != ch); |
403 | 556 | ||
404 | ch->tmit_mod_recvd++; | 557 | ch->tmit_mod_recvd++; |
@@ -406,6 +559,8 @@ handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client, | |||
406 | 559 | ||
407 | if (ch->tmit_mod_recvd == ch->tmit_mod_count) | 560 | if (ch->tmit_mod_recvd == ch->tmit_mod_count) |
408 | send_transmit_ack (ch); | 561 | send_transmit_ack (ch); |
562 | |||
563 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
409 | }; | 564 | }; |
410 | 565 | ||
411 | 566 | ||
@@ -415,13 +570,18 @@ handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client, | |||
415 | { | 570 | { |
416 | const struct GNUNET_PSYC_MessageData *data | 571 | const struct GNUNET_PSYC_MessageData *data |
417 | = (const struct GNUNET_PSYC_MessageData *) msg; | 572 | = (const struct GNUNET_PSYC_MessageData *) msg; |
418 | struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, | 573 | struct Channel *ch |
419 | struct Channel); | 574 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
420 | GNUNET_assert (NULL != ch); | 575 | GNUNET_assert (NULL != ch); |
421 | 576 | ||
422 | ch->tmit_status = data->status; | 577 | ch->tmit_status = ntohs (data->status); |
423 | buffer_message (ch, msg); | 578 | buffer_message (ch, msg); |
424 | send_transmit_ack (ch); | 579 | send_transmit_ack (ch); |
580 | |||
581 | if (GNUNET_PSYC_DATA_CONT != ch->tmit_status) | ||
582 | ch->in_transmit = GNUNET_NO; | ||
583 | |||
584 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
425 | }; | 585 | }; |
426 | 586 | ||
427 | 587 | ||
@@ -444,13 +604,13 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, | |||
444 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, | 604 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, |
445 | 605 | ||
446 | { &handle_transmit_method, NULL, | 606 | { &handle_transmit_method, NULL, |
447 | GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD, 0 }, | 607 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 }, |
448 | 608 | ||
449 | { &handle_transmit_modifier, NULL, | 609 | { &handle_transmit_modifier, NULL, |
450 | GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER, 0 }, | 610 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 }, |
451 | 611 | ||
452 | { &handle_transmit_data, NULL, | 612 | { &handle_transmit_data, NULL, |
453 | GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_DATA, 0 }, | 613 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 }, |
454 | 614 | ||
455 | { NULL, NULL, 0, 0 } | 615 | { NULL, NULL, 0, 0 } |
456 | }; | 616 | }; |
@@ -458,6 +618,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, | |||
458 | cfg = c; | 618 | cfg = c; |
459 | store = GNUNET_PSYCSTORE_connect (cfg); | 619 | store = GNUNET_PSYCSTORE_connect (cfg); |
460 | stats = GNUNET_STATISTICS_create ("psyc", cfg); | 620 | stats = GNUNET_STATISTICS_create ("psyc", cfg); |
621 | clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); | ||
461 | nc = GNUNET_SERVER_notification_context_create (server, 1); | 622 | nc = GNUNET_SERVER_notification_context_create (server, 1); |
462 | GNUNET_SERVER_add_handlers (server, handlers); | 623 | GNUNET_SERVER_add_handlers (server, handlers); |
463 | GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL); | 624 | GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL); |
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h index 35e9ae800..6a8826337 100644 --- a/src/psyc/psyc.h +++ b/src/psyc/psyc.h | |||
@@ -65,6 +65,11 @@ struct CountersResult | |||
65 | */ | 65 | */ |
66 | struct GNUNET_MessageHeader header; | 66 | struct GNUNET_MessageHeader header; |
67 | 67 | ||
68 | /** | ||
69 | * Status code for the operation. | ||
70 | */ | ||
71 | int32_t result_code GNUNET_PACKED; | ||
72 | |||
68 | uint64_t max_message_id; | 73 | uint64_t max_message_id; |
69 | }; | 74 | }; |
70 | 75 | ||
@@ -121,6 +126,8 @@ struct SlaveJoinRequest | |||
121 | 126 | ||
122 | struct GNUNET_CRYPTO_EccPrivateKey slave_key; | 127 | struct GNUNET_CRYPTO_EccPrivateKey slave_key; |
123 | 128 | ||
129 | struct GNUNET_PeerIdentity origin; | ||
130 | |||
124 | /* Followed by struct GNUNET_PeerIdentity relays[relay_count] */ | 131 | /* Followed by struct GNUNET_PeerIdentity relays[relay_count] */ |
125 | }; | 132 | }; |
126 | 133 | ||
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index abe7bb028..4178d920b 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -106,6 +106,28 @@ struct GNUNET_PSYC_Channel | |||
106 | * Are we currently transmitting a message? | 106 | * Are we currently transmitting a message? |
107 | */ | 107 | */ |
108 | int in_transmit; | 108 | int in_transmit; |
109 | |||
110 | /** | ||
111 | * Is this a master or slave channel? | ||
112 | */ | ||
113 | int is_master; | ||
114 | |||
115 | /** | ||
116 | * Buffer space available for transmitting the next data fragment. | ||
117 | */ | ||
118 | uint16_t tmit_buf_avail; | ||
119 | }; | ||
120 | |||
121 | |||
122 | /** | ||
123 | * Handle for a pending PSYC transmission operation. | ||
124 | */ | ||
125 | struct GNUNET_PSYC_MasterTransmitHandle | ||
126 | { | ||
127 | struct GNUNET_PSYC_Master *master; | ||
128 | GNUNET_PSYC_MasterTransmitNotify notify; | ||
129 | void *notify_cls; | ||
130 | enum GNUNET_PSYC_DataStatus status; | ||
109 | }; | 131 | }; |
110 | 132 | ||
111 | 133 | ||
@@ -116,6 +138,8 @@ struct GNUNET_PSYC_Master | |||
116 | { | 138 | { |
117 | struct GNUNET_PSYC_Channel ch; | 139 | struct GNUNET_PSYC_Channel ch; |
118 | 140 | ||
141 | struct GNUNET_PSYC_MasterTransmitHandle *tmit; | ||
142 | |||
119 | GNUNET_PSYC_MasterStartCallback start_cb; | 143 | GNUNET_PSYC_MasterStartCallback start_cb; |
120 | 144 | ||
121 | uint64_t max_message_id; | 145 | uint64_t max_message_id; |
@@ -146,19 +170,6 @@ struct GNUNET_PSYC_JoinHandle | |||
146 | /** | 170 | /** |
147 | * Handle for a pending PSYC transmission operation. | 171 | * Handle for a pending PSYC transmission operation. |
148 | */ | 172 | */ |
149 | struct GNUNET_PSYC_MasterTransmitHandle | ||
150 | { | ||
151 | struct GNUNET_PSYC_Master *master; | ||
152 | const struct GNUNET_ENV_Environment *env; | ||
153 | GNUNET_PSYC_MasterTransmitNotify notify; | ||
154 | void *notify_cls; | ||
155 | enum GNUNET_PSYC_MasterTransmitFlags flags; | ||
156 | }; | ||
157 | |||
158 | |||
159 | /** | ||
160 | * Handle for a pending PSYC transmission operation. | ||
161 | */ | ||
162 | struct GNUNET_PSYC_SlaveTransmitHandle | 173 | struct GNUNET_PSYC_SlaveTransmitHandle |
163 | { | 174 | { |
164 | 175 | ||
@@ -184,10 +195,10 @@ struct GNUNET_PSYC_StateQuery | |||
184 | 195 | ||
185 | 196 | ||
186 | /** | 197 | /** |
187 | * Try again to connect to the PSYCstore service. | 198 | * Try again to connect to the PSYC service. |
188 | * | 199 | * |
189 | * @param cls handle to the PSYCstore service. | 200 | * @param cls Handle to the PSYC service. |
190 | * @param tc scheduler context | 201 | * @param tc Scheduler context |
191 | */ | 202 | */ |
192 | static void | 203 | static void |
193 | reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | 204 | reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); |
@@ -215,7 +226,7 @@ reschedule_connect (struct GNUNET_PSYC_Channel *c) | |||
215 | } | 226 | } |
216 | c->in_receive = GNUNET_NO; | 227 | c->in_receive = GNUNET_NO; |
217 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 228 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
218 | "Scheduling task to reconnect to PSYCstore service in %s.\n", | 229 | "Scheduling task to reconnect to PSYC service in %s.\n", |
219 | GNUNET_STRINGS_relative_time_to_string (c->reconnect_delay, GNUNET_YES)); | 230 | GNUNET_STRINGS_relative_time_to_string (c->reconnect_delay, GNUNET_YES)); |
220 | c->reconnect_task = | 231 | c->reconnect_task = |
221 | GNUNET_SCHEDULER_add_delayed (c->reconnect_delay, &reconnect, c); | 232 | GNUNET_SCHEDULER_add_delayed (c->reconnect_delay, &reconnect, c); |
@@ -226,12 +237,56 @@ reschedule_connect (struct GNUNET_PSYC_Channel *c) | |||
226 | /** | 237 | /** |
227 | * Schedule transmission of the next message from our queue. | 238 | * Schedule transmission of the next message from our queue. |
228 | * | 239 | * |
229 | * @param h PSYCstore handle | 240 | * @param h PSYC handle |
230 | */ | 241 | */ |
231 | static void | 242 | static void |
232 | transmit_next (struct GNUNET_PSYC_Channel *c); | 243 | transmit_next (struct GNUNET_PSYC_Channel *c); |
233 | 244 | ||
234 | 245 | ||
246 | void | ||
247 | master_transmit_data (struct GNUNET_PSYC_Master *mst) | ||
248 | { | ||
249 | struct GNUNET_PSYC_Channel *ch = &mst->ch; | ||
250 | size_t data_size = ch->tmit_buf_avail; | ||
251 | struct GNUNET_PSYC_MessageData *pdata; | ||
252 | struct OperationHandle *op | ||
253 | = GNUNET_malloc (sizeof (*op) + sizeof (*pdata) + data_size); | ||
254 | pdata = (struct GNUNET_PSYC_MessageData *) &op[1]; | ||
255 | op->msg = (struct GNUNET_MessageHeader *) pdata; | ||
256 | pdata->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); | ||
257 | |||
258 | switch (mst->tmit->notify (mst->tmit->notify_cls, &data_size, &pdata[1])) | ||
259 | { | ||
260 | case GNUNET_NO: | ||
261 | mst->tmit->status = GNUNET_PSYC_DATA_CONT; | ||
262 | break; | ||
263 | |||
264 | case GNUNET_YES: | ||
265 | mst->tmit->status = GNUNET_PSYC_DATA_END; | ||
266 | break; | ||
267 | |||
268 | default: | ||
269 | mst->tmit->status = GNUNET_PSYC_DATA_CANCEL; | ||
270 | data_size = 0; | ||
271 | LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error\n"); | ||
272 | } | ||
273 | |||
274 | if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size)) | ||
275 | { | ||
276 | /* Transmission paused, nothing to send. */ | ||
277 | GNUNET_free (op); | ||
278 | } | ||
279 | else | ||
280 | { | ||
281 | GNUNET_assert (data_size <= ch->tmit_buf_avail); | ||
282 | pdata->header.size = htons (sizeof (*pdata) + data_size); | ||
283 | pdata->status = htons (mst->tmit->status); | ||
284 | GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); | ||
285 | transmit_next (ch); | ||
286 | } | ||
287 | } | ||
288 | |||
289 | |||
235 | /** | 290 | /** |
236 | * Type of a function to call when we receive a message | 291 | * Type of a function to call when we receive a message |
237 | * from the service. | 292 | * from the service. |
@@ -253,8 +308,8 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
253 | } | 308 | } |
254 | uint16_t size_eq = 0; | 309 | uint16_t size_eq = 0; |
255 | uint16_t size_min = 0; | 310 | uint16_t size_min = 0; |
256 | const uint16_t size = ntohs (msg->size); | 311 | uint16_t size = ntohs (msg->size); |
257 | const uint16_t type = ntohs (msg->type); | 312 | uint16_t type = ntohs (msg->type); |
258 | 313 | ||
259 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 314 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
260 | "Received message of type %d from PSYC service\n", type); | 315 | "Received message of type %d from PSYC service\n", type); |
@@ -265,6 +320,9 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
265 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: | 320 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: |
266 | size_eq = sizeof (struct CountersResult); | 321 | size_eq = sizeof (struct CountersResult); |
267 | break; | 322 | break; |
323 | case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: | ||
324 | size_eq = sizeof (struct TransmitAck); | ||
325 | break; | ||
268 | } | 326 | } |
269 | 327 | ||
270 | if (! ((0 < size_eq && size == size_eq) | 328 | if (! ((0 < size_eq && size == size_eq) |
@@ -276,6 +334,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
276 | } | 334 | } |
277 | 335 | ||
278 | struct CountersResult *cres; | 336 | struct CountersResult *cres; |
337 | struct TransmitAck *tack; | ||
279 | 338 | ||
280 | switch (type) | 339 | switch (type) |
281 | { | 340 | { |
@@ -294,17 +353,39 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
294 | mst->join_ack_cb (ch->cb_cls, mst->max_message_id); | 353 | mst->join_ack_cb (ch->cb_cls, mst->max_message_id); |
295 | #endif | 354 | #endif |
296 | break; | 355 | break; |
356 | |||
357 | case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: | ||
358 | tack = (struct TransmitAck *) msg; | ||
359 | if (ch->is_master) | ||
360 | { | ||
361 | GNUNET_assert (NULL != mst->tmit); | ||
362 | if (GNUNET_PSYC_DATA_CONT != mst->tmit->status | ||
363 | || NULL == mst->tmit->notify) | ||
364 | { | ||
365 | GNUNET_free (mst->tmit); | ||
366 | mst->tmit = NULL; | ||
367 | } | ||
368 | else | ||
369 | { | ||
370 | ch->tmit_buf_avail = ntohs (tack->buf_avail); | ||
371 | master_transmit_data (mst); | ||
372 | } | ||
373 | } | ||
374 | else | ||
375 | { | ||
376 | /* TODO: slave */ | ||
377 | } | ||
378 | break; | ||
297 | } | 379 | } |
298 | 380 | ||
299 | GNUNET_CLIENT_receive (ch->client, &message_handler, ch, | 381 | GNUNET_CLIENT_receive (ch->client, &message_handler, ch, |
300 | GNUNET_TIME_UNIT_FOREVER_REL); | 382 | GNUNET_TIME_UNIT_FOREVER_REL); |
301 | } | 383 | } |
302 | 384 | ||
303 | |||
304 | /** | 385 | /** |
305 | * Transmit next message to service. | 386 | * Transmit next message to service. |
306 | * | 387 | * |
307 | * @param cls The 'struct GNUNET_PSYCSTORE_Handle'. | 388 | * @param cls The 'struct GNUNET_PSYC_Channel'. |
308 | * @param size Number of bytes available in buf. | 389 | * @param size Number of bytes available in buf. |
309 | * @param buf Where to copy the message. | 390 | * @param buf Where to copy the message. |
310 | * @return Number of bytes copied to buf. | 391 | * @return Number of bytes copied to buf. |
@@ -326,7 +407,7 @@ send_next_message (void *cls, size_t size, void *buf) | |||
326 | return 0; | 407 | return 0; |
327 | } | 408 | } |
328 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 409 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
329 | "Sending message of type %d to PSYCstore service\n", | 410 | "Sending message of type %d to PSYC service\n", |
330 | ntohs (op->msg->type)); | 411 | ntohs (op->msg->type)); |
331 | memcpy (buf, op->msg, ret); | 412 | memcpy (buf, op->msg, ret); |
332 | 413 | ||
@@ -349,7 +430,7 @@ send_next_message (void *cls, size_t size, void *buf) | |||
349 | /** | 430 | /** |
350 | * Schedule transmission of the next message from our queue. | 431 | * Schedule transmission of the next message from our queue. |
351 | * | 432 | * |
352 | * @param h PSYCstore handle. | 433 | * @param h PSYC handle. |
353 | */ | 434 | */ |
354 | static void | 435 | static void |
355 | transmit_next (struct GNUNET_PSYC_Channel *ch) | 436 | transmit_next (struct GNUNET_PSYC_Channel *ch) |
@@ -391,14 +472,12 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
391 | if (NULL == ch->transmit_head || | 472 | if (NULL == ch->transmit_head || |
392 | ch->transmit_head->msg->type != ch->reconnect_msg->type) | 473 | ch->transmit_head->msg->type != ch->reconnect_msg->type) |
393 | { | 474 | { |
394 | struct OperationHandle *op | 475 | uint16_t reconn_size = ntohs (ch->reconnect_msg->size); |
395 | = GNUNET_malloc (sizeof (struct OperationHandle) | 476 | struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size); |
396 | + ntohs (ch->reconnect_msg->size)); | 477 | memcpy (&op[1], ch->reconnect_msg, reconn_size); |
397 | memcpy (&op[1], ch->reconnect_msg, ntohs (ch->reconnect_msg->size)); | ||
398 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; | 478 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; |
399 | GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); | 479 | GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); |
400 | } | 480 | } |
401 | |||
402 | transmit_next (ch); | 481 | transmit_next (ch); |
403 | } | 482 | } |
404 | 483 | ||
@@ -414,7 +493,12 @@ disconnect (void *c) | |||
414 | { | 493 | { |
415 | struct GNUNET_PSYC_Channel *ch = c; | 494 | struct GNUNET_PSYC_Channel *ch = c; |
416 | GNUNET_assert (NULL != ch); | 495 | GNUNET_assert (NULL != ch); |
417 | GNUNET_assert (ch->transmit_head == ch->transmit_tail); | 496 | if (ch->transmit_head != ch->transmit_tail) |
497 | { | ||
498 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
499 | "Disconnecting while there are still outstanding messages!\n"); | ||
500 | GNUNET_break (0); | ||
501 | } | ||
418 | if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK) | 502 | if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK) |
419 | { | 503 | { |
420 | GNUNET_SCHEDULER_cancel (ch->reconnect_task); | 504 | GNUNET_SCHEDULER_cancel (ch->reconnect_task); |
@@ -431,7 +515,10 @@ disconnect (void *c) | |||
431 | ch->client = NULL; | 515 | ch->client = NULL; |
432 | } | 516 | } |
433 | if (NULL != ch->reconnect_msg) | 517 | if (NULL != ch->reconnect_msg) |
518 | { | ||
519 | GNUNET_free (ch->reconnect_msg); | ||
434 | ch->reconnect_msg = NULL; | 520 | ch->reconnect_msg = NULL; |
521 | } | ||
435 | } | 522 | } |
436 | 523 | ||
437 | 524 | ||
@@ -475,12 +562,13 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
475 | struct GNUNET_PSYC_Channel *ch = &mst->ch; | 562 | struct GNUNET_PSYC_Channel *ch = &mst->ch; |
476 | struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); | 563 | struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); |
477 | 564 | ||
478 | req->header.size = htons (sizeof (*req) + sizeof (*channel_key)); | 565 | req->header.size = htons (sizeof (*req)); |
479 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); | 566 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); |
480 | req->channel_key = *channel_key; | 567 | req->channel_key = *channel_key; |
481 | req->policy = policy; | 568 | req->policy = policy; |
482 | 569 | ||
483 | ch->cfg = cfg; | 570 | ch->cfg = cfg; |
571 | ch->is_master = GNUNET_YES; | ||
484 | ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; | 572 | ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; |
485 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | 573 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; |
486 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); | 574 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); |
@@ -532,7 +620,7 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst) | |||
532 | void | 620 | void |
533 | GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, | 621 | GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, |
534 | int is_admitted, | 622 | int is_admitted, |
535 | unsigned int relay_count, | 623 | uint32_t relay_count, |
536 | const struct GNUNET_PeerIdentity *relays, | 624 | const struct GNUNET_PeerIdentity *relays, |
537 | const char *method_name, | 625 | const char *method_name, |
538 | const struct GNUNET_ENV_Environment *env, | 626 | const struct GNUNET_ENV_Environment *env, |
@@ -556,13 +644,13 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod) | |||
556 | pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1]; | 644 | pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1]; |
557 | op->msg = (struct GNUNET_MessageHeader *) pmod; | 645 | op->msg = (struct GNUNET_MessageHeader *) pmod; |
558 | 646 | ||
559 | pmod->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER; | 647 | pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); |
560 | pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size); | 648 | pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size); |
561 | pmod->name_size = htons (name_size); | 649 | pmod->name_size = htons (name_size); |
562 | memcpy (&pmod[1], mod->name, name_size); | 650 | memcpy (&pmod[1], mod->name, name_size); |
563 | memcpy ((void *) &pmod[1] + name_size, mod->value, mod->value_size); | 651 | memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size); |
564 | 652 | ||
565 | GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); | 653 | GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); |
566 | return GNUNET_YES; | 654 | return GNUNET_YES; |
567 | } | 655 | } |
568 | 656 | ||
@@ -594,29 +682,41 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *mst, | |||
594 | return NULL; | 682 | return NULL; |
595 | ch->in_transmit = GNUNET_YES; | 683 | ch->in_transmit = GNUNET_YES; |
596 | 684 | ||
685 | size_t size = strlen (method_name) + 1; | ||
597 | struct GNUNET_PSYC_MessageMethod *pmeth; | 686 | struct GNUNET_PSYC_MessageMethod *pmeth; |
598 | struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth)); | 687 | struct OperationHandle *op |
688 | = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth) + size); | ||
599 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1]; | 689 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1]; |
600 | op->msg = (struct GNUNET_MessageHeader *) pmeth; | 690 | op->msg = (struct GNUNET_MessageHeader *) pmeth; |
601 | 691 | ||
602 | pmeth->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD; | 692 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); |
603 | size_t size = strlen (method_name) + 1; | ||
604 | pmeth->header.size = htons (sizeof (*pmeth) + size); | 693 | pmeth->header.size = htons (sizeof (*pmeth) + size); |
605 | pmeth->flags = htonl (flags); | 694 | pmeth->flags = htonl (flags); |
606 | pmeth->mod_count | 695 | pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env)); |
607 | = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env)); | ||
608 | memcpy (&pmeth[1], method_name, size); | 696 | memcpy (&pmeth[1], method_name, size); |
609 | 697 | ||
610 | GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); | 698 | GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); |
611 | |||
612 | GNUNET_ENV_environment_iterate (env, send_modifier, mst); | 699 | GNUNET_ENV_environment_iterate (env, send_modifier, mst); |
700 | transmit_next (ch); | ||
701 | |||
702 | mst->tmit = GNUNET_malloc (sizeof (*mst->tmit)); | ||
703 | mst->tmit->master = mst; | ||
704 | mst->tmit->notify = notify; | ||
705 | mst->tmit->notify_cls = notify_cls; | ||
706 | mst->tmit->status = GNUNET_PSYC_DATA_CONT; | ||
707 | return mst->tmit; | ||
708 | } | ||
709 | |||
613 | 710 | ||
614 | struct GNUNET_PSYC_MasterTransmitHandle *th = GNUNET_malloc (sizeof (*th)); | 711 | /** |
615 | th->master = mst; | 712 | * Resume transmission to the channel. |
616 | th->env = env; | 713 | * |
617 | th->notify = notify; | 714 | * @param th Handle of the request that is being resumed. |
618 | th->notify_cls = notify_cls; | 715 | */ |
619 | return th; | 716 | void |
717 | GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) | ||
718 | { | ||
719 | master_transmit_data (th->master); | ||
620 | } | 720 | } |
621 | 721 | ||
622 | 722 | ||
@@ -671,7 +771,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
671 | const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, | 771 | const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, |
672 | const struct GNUNET_CRYPTO_EccPrivateKey *slave_key, | 772 | const struct GNUNET_CRYPTO_EccPrivateKey *slave_key, |
673 | const struct GNUNET_PeerIdentity *origin, | 773 | const struct GNUNET_PeerIdentity *origin, |
674 | size_t relay_count, | 774 | uint32_t relay_count, |
675 | const struct GNUNET_PeerIdentity *relays, | 775 | const struct GNUNET_PeerIdentity *relays, |
676 | GNUNET_PSYC_Method method, | 776 | GNUNET_PSYC_Method method, |
677 | GNUNET_PSYC_JoinCallback join_cb, | 777 | GNUNET_PSYC_JoinCallback join_cb, |
@@ -680,7 +780,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
680 | const char *method_name, | 780 | const char *method_name, |
681 | const struct GNUNET_ENV_Environment *env, | 781 | const struct GNUNET_ENV_Environment *env, |
682 | const void *data, | 782 | const void *data, |
683 | size_t data_size) | 783 | uint16_t data_size) |
684 | { | 784 | { |
685 | struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); | 785 | struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); |
686 | struct GNUNET_PSYC_Channel *ch = &slv->ch; | 786 | struct GNUNET_PSYC_Channel *ch = &slv->ch; |
@@ -692,10 +792,12 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
692 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); | 792 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); |
693 | req->channel_key = *channel_key; | 793 | req->channel_key = *channel_key; |
694 | req->slave_key = *slave_key; | 794 | req->slave_key = *slave_key; |
795 | req->origin = *origin; | ||
695 | req->relay_count = relay_count; | 796 | req->relay_count = relay_count; |
696 | memcpy (&req[1], relays, relay_count * sizeof (*relays)); | 797 | memcpy (&req[1], relays, relay_count * sizeof (*relays)); |
697 | 798 | ||
698 | ch->cfg = cfg; | 799 | ch->cfg = cfg; |
800 | ch->is_master = GNUNET_NO; | ||
699 | ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; | 801 | ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; |
700 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | 802 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; |
701 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); | 803 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); |
@@ -746,6 +848,18 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, | |||
746 | 848 | ||
747 | 849 | ||
748 | /** | 850 | /** |
851 | * Resume transmission to the master. | ||
852 | * | ||
853 | * @param th Handle of the request that is being resumed. | ||
854 | */ | ||
855 | void | ||
856 | GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) | ||
857 | { | ||
858 | |||
859 | } | ||
860 | |||
861 | |||
862 | /** | ||
749 | * Abort transmission request to master. | 863 | * Abort transmission request to master. |
750 | * | 864 | * |
751 | * @param th Handle of the request that is being aborted. | 865 | * @param th Handle of the request that is being aborted. |
@@ -822,7 +936,7 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *ch, | |||
822 | slvadd->announced_at = GNUNET_htonll (announced_at); | 936 | slvadd->announced_at = GNUNET_htonll (announced_at); |
823 | slvadd->effective_since = GNUNET_htonll (effective_since); | 937 | slvadd->effective_since = GNUNET_htonll (effective_since); |
824 | 938 | ||
825 | GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); | 939 | GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); |
826 | transmit_next (ch); | 940 | transmit_next (ch); |
827 | } | 941 | } |
828 | 942 | ||
@@ -863,7 +977,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *ch, | |||
863 | slvrm->header.size = htons (sizeof (*slvrm)); | 977 | slvrm->header.size = htons (sizeof (*slvrm)); |
864 | slvrm->announced_at = GNUNET_htonll (announced_at); | 978 | slvrm->announced_at = GNUNET_htonll (announced_at); |
865 | 979 | ||
866 | GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); | 980 | GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); |
867 | transmit_next (ch); | 981 | transmit_next (ch); |
868 | } | 982 | } |
869 | 983 | ||
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index b5bc6d135..1d7035a87 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c | |||
@@ -19,8 +19,8 @@ | |||
19 | */ | 19 | */ |
20 | 20 | ||
21 | /** | 21 | /** |
22 | * @file psycstore/test_psycstore.c | 22 | * @file psyc/test_psyc.c |
23 | * @brief Test for the PSYCstore service. | 23 | * @brief Test for the PSYC service. |
24 | * @author Gabor X Toth | 24 | * @author Gabor X Toth |
25 | * @author Christian Grothoff | 25 | * @author Christian Grothoff |
26 | */ | 26 | */ |
@@ -30,6 +30,7 @@ | |||
30 | #include "gnunet_common.h" | 30 | #include "gnunet_common.h" |
31 | #include "gnunet_util_lib.h" | 31 | #include "gnunet_util_lib.h" |
32 | #include "gnunet_testing_lib.h" | 32 | #include "gnunet_testing_lib.h" |
33 | #include "gnunet_env_lib.h" | ||
33 | #include "gnunet_psyc_service.h" | 34 | #include "gnunet_psyc_service.h" |
34 | 35 | ||
35 | #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) | 36 | #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) |
@@ -59,6 +60,8 @@ static struct GNUNET_CRYPTO_EccPrivateKey *slave_key; | |||
59 | static struct GNUNET_CRYPTO_EccPublicSignKey channel_pub_key; | 60 | static struct GNUNET_CRYPTO_EccPublicSignKey channel_pub_key; |
60 | static struct GNUNET_CRYPTO_EccPublicSignKey slave_pub_key; | 61 | static struct GNUNET_CRYPTO_EccPublicSignKey slave_pub_key; |
61 | 62 | ||
63 | struct GNUNET_PSYC_MasterTransmitHandle *mth; | ||
64 | |||
62 | /** | 65 | /** |
63 | * Clean up all resources used. | 66 | * Clean up all resources used. |
64 | */ | 67 | */ |
@@ -120,11 +123,14 @@ end () | |||
120 | 123 | ||
121 | static int | 124 | static int |
122 | method (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, | 125 | method (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, |
123 | uint64_t message_id, const char *method_name, | 126 | uint64_t message_id, const char *name, |
124 | size_t modifier_count, const struct GNUNET_ENV_Modifier *modifiers, | 127 | size_t modifier_count, const struct GNUNET_ENV_Modifier *modifiers, |
125 | uint64_t data_offset, const void *data, size_t data_size, | 128 | uint64_t data_offset, const void *data, size_t data_size, |
126 | enum GNUNET_PSYC_MessageFlags flags) | 129 | enum GNUNET_PSYC_MessageFlags flags) |
127 | { | 130 | { |
131 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
132 | "Method: %s, modifiers: %lu, flags: %u\n%.*s\n", | ||
133 | name, modifier_count, flags, data_size, data); | ||
128 | return GNUNET_OK; | 134 | return GNUNET_OK; |
129 | } | 135 | } |
130 | 136 | ||
@@ -138,11 +144,72 @@ join (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, | |||
138 | return GNUNET_OK; | 144 | return GNUNET_OK; |
139 | } | 145 | } |
140 | 146 | ||
147 | struct TransmitClosure | ||
148 | { | ||
149 | struct GNUNET_PSYC_MasterTransmitHandle *handle; | ||
150 | uint8_t n; | ||
151 | uint8_t fragment_count; | ||
152 | char *fragments[16]; | ||
153 | uint16_t fragment_sizes[16]; | ||
154 | }; | ||
155 | |||
156 | |||
157 | static void | ||
158 | transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
159 | { | ||
160 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Transmit resume\n"); | ||
161 | struct TransmitClosure *tmit = cls; | ||
162 | GNUNET_PSYC_master_transmit_resume (tmit->handle); | ||
163 | } | ||
164 | |||
165 | |||
166 | static int | ||
167 | transmit_notify (void *cls, size_t *data_size, void *data) | ||
168 | { | ||
169 | struct TransmitClosure *tmit = cls; | ||
170 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
171 | "Transmit notify: %lu bytes\n", *data_size); | ||
172 | |||
173 | if (tmit->fragment_count <= tmit->n) | ||
174 | return GNUNET_YES; | ||
175 | |||
176 | GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size); | ||
177 | |||
178 | *data_size = tmit->fragment_sizes[tmit->n]; | ||
179 | memcpy (data, tmit->fragments[tmit->n], *data_size); | ||
180 | tmit->n++; | ||
181 | |||
182 | if (tmit->n == tmit->fragment_count - 1) | ||
183 | { | ||
184 | /* Send last fragment later. */ | ||
185 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &transmit_resume, | ||
186 | tmit); | ||
187 | *data_size = 0; | ||
188 | return GNUNET_NO; | ||
189 | } | ||
190 | return tmit->n <= tmit->fragment_count ? GNUNET_NO : GNUNET_YES; | ||
191 | } | ||
141 | 192 | ||
142 | void | 193 | void |
143 | master_started (void *cls, uint64_t max_message_id) | 194 | master_started (void *cls, uint64_t max_message_id) |
144 | { | 195 | { |
145 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master started: %lu\n", max_message_id); | 196 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master started: %lu\n", max_message_id); |
197 | |||
198 | struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); | ||
199 | GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, | ||
200 | "_foo", "bar baz", 7); | ||
201 | GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, | ||
202 | "_foo_bar", "foo bar baz", 11); | ||
203 | |||
204 | struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure); | ||
205 | tmit->fragment_count = 2; | ||
206 | tmit->fragments[0] = "foo bar"; | ||
207 | tmit->fragment_sizes[0] = 7; | ||
208 | tmit->fragments[1] = "baz!"; | ||
209 | tmit->fragment_sizes[1] = 4; | ||
210 | tmit->handle | ||
211 | = GNUNET_PSYC_master_transmit (mst, "_test", env, transmit_notify, tmit, | ||
212 | GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); | ||
146 | } | 213 | } |
147 | 214 | ||
148 | 215 | ||
@@ -157,7 +224,7 @@ slave_joined (void *cls, uint64_t max_message_id) | |||
157 | * Main function of the test, run from scheduler. | 224 | * Main function of the test, run from scheduler. |
158 | * | 225 | * |
159 | * @param cls NULL | 226 | * @param cls NULL |
160 | * @param cfg configuration we use (also to connect to PSYCstore service) | 227 | * @param cfg configuration we use (also to connect to PSYC service) |
161 | * @param peer handle to access more of the peer (not used) | 228 | * @param peer handle to access more of the peer (not used) |
162 | */ | 229 | */ |
163 | static void | 230 | static void |
@@ -182,9 +249,18 @@ run (void *cls, | |||
182 | mst = GNUNET_PSYC_master_start (cfg, channel_key, | 249 | mst = GNUNET_PSYC_master_start (cfg, channel_key, |
183 | GNUNET_PSYC_CHANNEL_PRIVATE, | 250 | GNUNET_PSYC_CHANNEL_PRIVATE, |
184 | &method, &join, &master_started, NULL); | 251 | &method, &join, &master_started, NULL); |
185 | 252 | return; | |
186 | slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, | 253 | struct GNUNET_PeerIdentity origin; |
187 | &method, &join, &slave_joined, NULL); | 254 | struct GNUNET_PeerIdentity relays[16]; |
255 | struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); | ||
256 | GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, | ||
257 | "_foo", "bar baz", 7); | ||
258 | GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, | ||
259 | "_foo_bar", "foo bar baz", 11); | ||
260 | slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin, | ||
261 | 16, relays, &method, &join, &slave_joined, | ||
262 | NULL, "_request_join", env, "some data", 9); | ||
263 | GNUNET_ENV_environment_destroy (env); | ||
188 | } | 264 | } |
189 | 265 | ||
190 | 266 | ||