diff options
author | Gabor X Toth <*@tg-x.net> | 2013-11-09 23:12:27 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2013-11-09 23:12:27 +0000 |
commit | 172ab07eeb1215cc9d22dabc589f7529ac2d59ea (patch) | |
tree | d75fc75033c7e93b58c1908f06c7b856bc317b8e /src/psyc/psyc_api.c | |
parent | d10808d7f17c5f6f1356c22ef0992965cbaf5ce1 (diff) | |
download | gnunet-172ab07eeb1215cc9d22dabc589f7529ac2d59ea.tar.gz gnunet-172ab07eeb1215cc9d22dabc589f7529ac2d59ea.zip |
psyc: handling messages from multicast and passing them to clients; pause/resume fixes
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r-- | src/psyc/psyc_api.c | 94 |
1 files changed, 67 insertions, 27 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 18b5920b3..f971de1b8 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -69,12 +69,12 @@ struct GNUNET_PSYC_Channel | |||
69 | /** | 69 | /** |
70 | * Head of operations to transmit. | 70 | * Head of operations to transmit. |
71 | */ | 71 | */ |
72 | struct OperationHandle *transmit_head; | 72 | struct OperationHandle *tmit_head; |
73 | 73 | ||
74 | /** | 74 | /** |
75 | * Tail of operations to transmit. | 75 | * Tail of operations to transmit. |
76 | */ | 76 | */ |
77 | struct OperationHandle *transmit_tail; | 77 | struct OperationHandle *tmit_tail; |
78 | 78 | ||
79 | /** | 79 | /** |
80 | * Message to send on reconnect. | 80 | * Message to send on reconnect. |
@@ -116,6 +116,16 @@ struct GNUNET_PSYC_Channel | |||
116 | * Buffer space available for transmitting the next data fragment. | 116 | * Buffer space available for transmitting the next data fragment. |
117 | */ | 117 | */ |
118 | uint16_t tmit_buf_avail; | 118 | uint16_t tmit_buf_avail; |
119 | |||
120 | /** | ||
121 | * Is transmission paused? | ||
122 | */ | ||
123 | uint8_t tmit_paused; | ||
124 | |||
125 | /** | ||
126 | * Are we still waiting for a PSYC_TRANSMIT_ACK? | ||
127 | */ | ||
128 | uint8_t tmit_ack_pending; | ||
119 | }; | 129 | }; |
120 | 130 | ||
121 | 131 | ||
@@ -243,6 +253,11 @@ static void | |||
243 | transmit_next (struct GNUNET_PSYC_Channel *ch); | 253 | transmit_next (struct GNUNET_PSYC_Channel *ch); |
244 | 254 | ||
245 | 255 | ||
256 | /** | ||
257 | * Request data from client to transmit. | ||
258 | * | ||
259 | * @param mst Master handle. | ||
260 | */ | ||
246 | static void | 261 | static void |
247 | master_transmit_data (struct GNUNET_PSYC_Master *mst) | 262 | master_transmit_data (struct GNUNET_PSYC_Master *mst) |
248 | { | 263 | { |
@@ -268,12 +283,13 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) | |||
268 | default: | 283 | default: |
269 | mst->tmit->status = GNUNET_PSYC_DATA_CANCEL; | 284 | mst->tmit->status = GNUNET_PSYC_DATA_CANCEL; |
270 | data_size = 0; | 285 | data_size = 0; |
271 | LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error\n"); | 286 | LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error.\n"); |
272 | } | 287 | } |
273 | 288 | ||
274 | if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size)) | 289 | if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size)) |
275 | { | 290 | { |
276 | /* Transmission paused, nothing to send. */ | 291 | /* Transmission paused, nothing to send. */ |
292 | ch->tmit_paused = GNUNET_YES; | ||
277 | GNUNET_free (op); | 293 | GNUNET_free (op); |
278 | } | 294 | } |
279 | else | 295 | else |
@@ -281,7 +297,8 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) | |||
281 | GNUNET_assert (data_size <= ch->tmit_buf_avail); | 297 | GNUNET_assert (data_size <= ch->tmit_buf_avail); |
282 | pdata->header.size = htons (sizeof (*pdata) + data_size); | 298 | pdata->header.size = htons (sizeof (*pdata) + data_size); |
283 | pdata->status = htons (mst->tmit->status); | 299 | pdata->status = htons (mst->tmit->status); |
284 | GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); | 300 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); |
301 | ch->tmit_ack_pending = GNUNET_YES; | ||
285 | transmit_next (ch); | 302 | transmit_next (ch); |
286 | } | 303 | } |
287 | } | 304 | } |
@@ -305,7 +322,6 @@ message_handler (void *cls, | |||
305 | struct CountersResult *cres; | 322 | struct CountersResult *cres; |
306 | struct TransmitAck *tack; | 323 | struct TransmitAck *tack; |
307 | 324 | ||
308 | |||
309 | if (NULL == msg) | 325 | if (NULL == msg) |
310 | { | 326 | { |
311 | reschedule_connect (ch); | 327 | reschedule_connect (ch); |
@@ -317,7 +333,8 @@ message_handler (void *cls, | |||
317 | uint16_t type = ntohs (msg->type); | 333 | uint16_t type = ntohs (msg->type); |
318 | 334 | ||
319 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 335 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
320 | "Received message of type %d from PSYC service\n", type); | 336 | "Received message of type %d and size %u from PSYC service\n", |
337 | type, size); | ||
321 | 338 | ||
322 | switch (type) | 339 | switch (type) |
323 | { | 340 | { |
@@ -328,10 +345,16 @@ message_handler (void *cls, | |||
328 | case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: | 345 | case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: |
329 | size_eq = sizeof (struct TransmitAck); | 346 | size_eq = sizeof (struct TransmitAck); |
330 | break; | 347 | break; |
348 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
349 | size_min = sizeof (struct GNUNET_PSYC_MessageMethod); | ||
350 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
351 | size_min = sizeof (struct GNUNET_PSYC_MessageModifier); | ||
352 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
353 | size_min = sizeof (struct GNUNET_PSYC_MessageData); | ||
331 | } | 354 | } |
332 | 355 | ||
333 | if (! ((0 < size_eq && size == size_eq) | 356 | if (! ((0 < size_eq && size == size_eq) |
334 | || (0 < size_min && size >= size_min))) | 357 | || (0 < size_min && size_min <= size))) |
335 | { | 358 | { |
336 | GNUNET_break (0); | 359 | GNUNET_break (0); |
337 | reschedule_connect (ch); | 360 | reschedule_connect (ch); |
@@ -370,7 +393,9 @@ message_handler (void *cls, | |||
370 | else | 393 | else |
371 | { | 394 | { |
372 | ch->tmit_buf_avail = ntohs (tack->buf_avail); | 395 | ch->tmit_buf_avail = ntohs (tack->buf_avail); |
373 | master_transmit_data (mst); | 396 | ch->tmit_ack_pending = GNUNET_NO; |
397 | if (GNUNET_NO == ch->tmit_paused) | ||
398 | master_transmit_data (mst); | ||
374 | } | 399 | } |
375 | } | 400 | } |
376 | else | 401 | else |
@@ -378,6 +403,18 @@ message_handler (void *cls, | |||
378 | /* TODO: slave */ | 403 | /* TODO: slave */ |
379 | } | 404 | } |
380 | break; | 405 | break; |
406 | |||
407 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
408 | |||
409 | break; | ||
410 | |||
411 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
412 | |||
413 | break; | ||
414 | |||
415 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
416 | |||
417 | break; | ||
381 | } | 418 | } |
382 | 419 | ||
383 | GNUNET_CLIENT_receive (ch->client, &message_handler, ch, | 420 | GNUNET_CLIENT_receive (ch->client, &message_handler, ch, |
@@ -397,9 +434,9 @@ static size_t | |||
397 | send_next_message (void *cls, size_t size, void *buf) | 434 | send_next_message (void *cls, size_t size, void *buf) |
398 | { | 435 | { |
399 | struct GNUNET_PSYC_Channel *ch = cls; | 436 | struct GNUNET_PSYC_Channel *ch = cls; |
400 | struct OperationHandle *op = ch->transmit_head; | 437 | struct OperationHandle *op = ch->tmit_head; |
401 | size_t ret; | 438 | size_t ret; |
402 | 439 | LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n"); | |
403 | ch->th = NULL; | 440 | ch->th = NULL; |
404 | if (NULL == op->msg) | 441 | if (NULL == op->msg) |
405 | return 0; | 442 | return 0; |
@@ -409,15 +446,12 @@ send_next_message (void *cls, size_t size, void *buf) | |||
409 | reschedule_connect (ch); | 446 | reschedule_connect (ch); |
410 | return 0; | 447 | return 0; |
411 | } | 448 | } |
412 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
413 | "Sending message of type %d to PSYC service\n", | ||
414 | ntohs (op->msg->type)); | ||
415 | memcpy (buf, op->msg, ret); | 449 | memcpy (buf, op->msg, ret); |
416 | 450 | ||
417 | GNUNET_CONTAINER_DLL_remove (ch->transmit_head, ch->transmit_tail, op); | 451 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, op); |
418 | GNUNET_free (op); | 452 | GNUNET_free (op); |
419 | 453 | ||
420 | if (NULL != ch->transmit_head) | 454 | if (NULL != ch->tmit_head) |
421 | transmit_next (ch); | 455 | transmit_next (ch); |
422 | 456 | ||
423 | if (GNUNET_NO == ch->in_receive) | 457 | if (GNUNET_NO == ch->in_receive) |
@@ -438,10 +472,11 @@ send_next_message (void *cls, size_t size, void *buf) | |||
438 | static void | 472 | static void |
439 | transmit_next (struct GNUNET_PSYC_Channel *ch) | 473 | transmit_next (struct GNUNET_PSYC_Channel *ch) |
440 | { | 474 | { |
475 | LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n"); | ||
441 | if (NULL != ch->th || NULL == ch->client) | 476 | if (NULL != ch->th || NULL == ch->client) |
442 | return; | 477 | return; |
443 | 478 | ||
444 | struct OperationHandle *op = ch->transmit_head; | 479 | struct OperationHandle *op = ch->tmit_head; |
445 | if (NULL == op) | 480 | if (NULL == op) |
446 | return; | 481 | return; |
447 | 482 | ||
@@ -472,14 +507,14 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
472 | ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg); | 507 | ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg); |
473 | GNUNET_assert (NULL != ch->client); | 508 | GNUNET_assert (NULL != ch->client); |
474 | 509 | ||
475 | if (NULL == ch->transmit_head || | 510 | if (NULL == ch->tmit_head || |
476 | ch->transmit_head->msg->type != ch->reconnect_msg->type) | 511 | ch->tmit_head->msg->type != ch->reconnect_msg->type) |
477 | { | 512 | { |
478 | uint16_t reconn_size = ntohs (ch->reconnect_msg->size); | 513 | uint16_t reconn_size = ntohs (ch->reconnect_msg->size); |
479 | struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size); | 514 | struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size); |
480 | memcpy (&op[1], ch->reconnect_msg, reconn_size); | 515 | memcpy (&op[1], ch->reconnect_msg, reconn_size); |
481 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; | 516 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; |
482 | GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); | 517 | GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, op); |
483 | } | 518 | } |
484 | transmit_next (ch); | 519 | transmit_next (ch); |
485 | } | 520 | } |
@@ -496,7 +531,7 @@ disconnect (void *c) | |||
496 | struct GNUNET_PSYC_Channel *ch = c; | 531 | struct GNUNET_PSYC_Channel *ch = c; |
497 | 532 | ||
498 | GNUNET_assert (NULL != ch); | 533 | GNUNET_assert (NULL != ch); |
499 | if (ch->transmit_head != ch->transmit_tail) | 534 | if (ch->tmit_head != ch->tmit_tail) |
500 | { | 535 | { |
501 | LOG (GNUNET_ERROR_TYPE_ERROR, | 536 | LOG (GNUNET_ERROR_TYPE_ERROR, |
502 | "Disconnecting while there are still outstanding messages!\n"); | 537 | "Disconnecting while there are still outstanding messages!\n"); |
@@ -654,7 +689,7 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod) | |||
654 | memcpy (&pmod[1], mod->name, name_size); | 689 | memcpy (&pmod[1], mod->name, name_size); |
655 | memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size); | 690 | memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size); |
656 | 691 | ||
657 | GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); | 692 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); |
658 | return GNUNET_YES; | 693 | return GNUNET_YES; |
659 | } | 694 | } |
660 | 695 | ||
@@ -699,7 +734,7 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | |||
699 | pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env)); | 734 | pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env)); |
700 | memcpy (&pmeth[1], method_name, size); | 735 | memcpy (&pmeth[1], method_name, size); |
701 | 736 | ||
702 | GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); | 737 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); |
703 | GNUNET_ENV_environment_iterate (env, send_modifier, master); | 738 | GNUNET_ENV_environment_iterate (env, send_modifier, master); |
704 | transmit_next (ch); | 739 | transmit_next (ch); |
705 | 740 | ||
@@ -720,7 +755,12 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | |||
720 | void | 755 | void |
721 | GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) | 756 | GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) |
722 | { | 757 | { |
723 | master_transmit_data (th->master); | 758 | struct GNUNET_PSYC_Channel *ch = &th->master->ch; |
759 | if (GNUNET_NO == ch->tmit_ack_pending) | ||
760 | { | ||
761 | ch->tmit_paused = GNUNET_NO; | ||
762 | master_transmit_data (th->master); | ||
763 | } | ||
724 | } | 764 | } |
725 | 765 | ||
726 | 766 | ||
@@ -938,8 +978,8 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, | |||
938 | slvadd->header.size = htons (sizeof (*slvadd)); | 978 | slvadd->header.size = htons (sizeof (*slvadd)); |
939 | slvadd->announced_at = GNUNET_htonll (announced_at); | 979 | slvadd->announced_at = GNUNET_htonll (announced_at); |
940 | slvadd->effective_since = GNUNET_htonll (effective_since); | 980 | slvadd->effective_since = GNUNET_htonll (effective_since); |
941 | GNUNET_CONTAINER_DLL_insert_tail (channel->transmit_head, | 981 | GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, |
942 | channel->transmit_tail, | 982 | channel->tmit_tail, |
943 | op); | 983 | op); |
944 | transmit_next (channel); | 984 | transmit_next (channel); |
945 | } | 985 | } |
@@ -979,8 +1019,8 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, | |||
979 | slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM; | 1019 | slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM; |
980 | slvrm->header.size = htons (sizeof (*slvrm)); | 1020 | slvrm->header.size = htons (sizeof (*slvrm)); |
981 | slvrm->announced_at = GNUNET_htonll (announced_at); | 1021 | slvrm->announced_at = GNUNET_htonll (announced_at); |
982 | GNUNET_CONTAINER_DLL_insert_tail (channel->transmit_head, | 1022 | GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, |
983 | channel->transmit_tail, | 1023 | channel->tmit_tail, |
984 | op); | 1024 | op); |
985 | transmit_next (channel); | 1025 | transmit_next (channel); |
986 | } | 1026 | } |