aboutsummaryrefslogtreecommitdiff
path: root/src/psyc/psyc_api.c
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2013-11-09 23:12:27 +0000
committerGabor X Toth <*@tg-x.net>2013-11-09 23:12:27 +0000
commit172ab07eeb1215cc9d22dabc589f7529ac2d59ea (patch)
treed75fc75033c7e93b58c1908f06c7b856bc317b8e /src/psyc/psyc_api.c
parentd10808d7f17c5f6f1356c22ef0992965cbaf5ce1 (diff)
downloadgnunet-172ab07eeb1215cc9d22dabc589f7529ac2d59ea.tar.gz
gnunet-172ab07eeb1215cc9d22dabc589f7529ac2d59ea.zip
psyc: handling messages from multicast and passing them to clients; pause/resume fixes
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r--src/psyc/psyc_api.c94
1 files changed, 67 insertions, 27 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 18b5920b3..f971de1b8 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -69,12 +69,12 @@ struct GNUNET_PSYC_Channel
69 /** 69 /**
70 * Head of operations to transmit. 70 * Head of operations to transmit.
71 */ 71 */
72 struct OperationHandle *transmit_head; 72 struct OperationHandle *tmit_head;
73 73
74 /** 74 /**
75 * Tail of operations to transmit. 75 * Tail of operations to transmit.
76 */ 76 */
77 struct OperationHandle *transmit_tail; 77 struct OperationHandle *tmit_tail;
78 78
79 /** 79 /**
80 * Message to send on reconnect. 80 * Message to send on reconnect.
@@ -116,6 +116,16 @@ struct GNUNET_PSYC_Channel
116 * Buffer space available for transmitting the next data fragment. 116 * Buffer space available for transmitting the next data fragment.
117 */ 117 */
118 uint16_t tmit_buf_avail; 118 uint16_t tmit_buf_avail;
119
120 /**
121 * Is transmission paused?
122 */
123 uint8_t tmit_paused;
124
125 /**
126 * Are we still waiting for a PSYC_TRANSMIT_ACK?
127 */
128 uint8_t tmit_ack_pending;
119}; 129};
120 130
121 131
@@ -243,6 +253,11 @@ static void
243transmit_next (struct GNUNET_PSYC_Channel *ch); 253transmit_next (struct GNUNET_PSYC_Channel *ch);
244 254
245 255
256/**
257 * Request data from client to transmit.
258 *
259 * @param mst Master handle.
260 */
246static void 261static void
247master_transmit_data (struct GNUNET_PSYC_Master *mst) 262master_transmit_data (struct GNUNET_PSYC_Master *mst)
248{ 263{
@@ -268,12 +283,13 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
268 default: 283 default:
269 mst->tmit->status = GNUNET_PSYC_DATA_CANCEL; 284 mst->tmit->status = GNUNET_PSYC_DATA_CANCEL;
270 data_size = 0; 285 data_size = 0;
271 LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error\n"); 286 LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error.\n");
272 } 287 }
273 288
274 if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size)) 289 if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size))
275 { 290 {
276 /* Transmission paused, nothing to send. */ 291 /* Transmission paused, nothing to send. */
292 ch->tmit_paused = GNUNET_YES;
277 GNUNET_free (op); 293 GNUNET_free (op);
278 } 294 }
279 else 295 else
@@ -281,7 +297,8 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
281 GNUNET_assert (data_size <= ch->tmit_buf_avail); 297 GNUNET_assert (data_size <= ch->tmit_buf_avail);
282 pdata->header.size = htons (sizeof (*pdata) + data_size); 298 pdata->header.size = htons (sizeof (*pdata) + data_size);
283 pdata->status = htons (mst->tmit->status); 299 pdata->status = htons (mst->tmit->status);
284 GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); 300 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
301 ch->tmit_ack_pending = GNUNET_YES;
285 transmit_next (ch); 302 transmit_next (ch);
286 } 303 }
287} 304}
@@ -305,7 +322,6 @@ message_handler (void *cls,
305 struct CountersResult *cres; 322 struct CountersResult *cres;
306 struct TransmitAck *tack; 323 struct TransmitAck *tack;
307 324
308
309 if (NULL == msg) 325 if (NULL == msg)
310 { 326 {
311 reschedule_connect (ch); 327 reschedule_connect (ch);
@@ -317,7 +333,8 @@ message_handler (void *cls,
317 uint16_t type = ntohs (msg->type); 333 uint16_t type = ntohs (msg->type);
318 334
319 LOG (GNUNET_ERROR_TYPE_DEBUG, 335 LOG (GNUNET_ERROR_TYPE_DEBUG,
320 "Received message of type %d from PSYC service\n", type); 336 "Received message of type %d and size %u from PSYC service\n",
337 type, size);
321 338
322 switch (type) 339 switch (type)
323 { 340 {
@@ -328,10 +345,16 @@ message_handler (void *cls,
328 case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: 345 case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
329 size_eq = sizeof (struct TransmitAck); 346 size_eq = sizeof (struct TransmitAck);
330 break; 347 break;
348 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
349 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
350 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
351 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
352 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
353 size_min = sizeof (struct GNUNET_PSYC_MessageData);
331 } 354 }
332 355
333 if (! ((0 < size_eq && size == size_eq) 356 if (! ((0 < size_eq && size == size_eq)
334 || (0 < size_min && size >= size_min))) 357 || (0 < size_min && size_min <= size)))
335 { 358 {
336 GNUNET_break (0); 359 GNUNET_break (0);
337 reschedule_connect (ch); 360 reschedule_connect (ch);
@@ -370,7 +393,9 @@ message_handler (void *cls,
370 else 393 else
371 { 394 {
372 ch->tmit_buf_avail = ntohs (tack->buf_avail); 395 ch->tmit_buf_avail = ntohs (tack->buf_avail);
373 master_transmit_data (mst); 396 ch->tmit_ack_pending = GNUNET_NO;
397 if (GNUNET_NO == ch->tmit_paused)
398 master_transmit_data (mst);
374 } 399 }
375 } 400 }
376 else 401 else
@@ -378,6 +403,18 @@ message_handler (void *cls,
378 /* TODO: slave */ 403 /* TODO: slave */
379 } 404 }
380 break; 405 break;
406
407 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
408
409 break;
410
411 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
412
413 break;
414
415 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
416
417 break;
381 } 418 }
382 419
383 GNUNET_CLIENT_receive (ch->client, &message_handler, ch, 420 GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
@@ -397,9 +434,9 @@ static size_t
397send_next_message (void *cls, size_t size, void *buf) 434send_next_message (void *cls, size_t size, void *buf)
398{ 435{
399 struct GNUNET_PSYC_Channel *ch = cls; 436 struct GNUNET_PSYC_Channel *ch = cls;
400 struct OperationHandle *op = ch->transmit_head; 437 struct OperationHandle *op = ch->tmit_head;
401 size_t ret; 438 size_t ret;
402 439 LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
403 ch->th = NULL; 440 ch->th = NULL;
404 if (NULL == op->msg) 441 if (NULL == op->msg)
405 return 0; 442 return 0;
@@ -409,15 +446,12 @@ send_next_message (void *cls, size_t size, void *buf)
409 reschedule_connect (ch); 446 reschedule_connect (ch);
410 return 0; 447 return 0;
411 } 448 }
412 LOG (GNUNET_ERROR_TYPE_DEBUG,
413 "Sending message of type %d to PSYC service\n",
414 ntohs (op->msg->type));
415 memcpy (buf, op->msg, ret); 449 memcpy (buf, op->msg, ret);
416 450
417 GNUNET_CONTAINER_DLL_remove (ch->transmit_head, ch->transmit_tail, op); 451 GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, op);
418 GNUNET_free (op); 452 GNUNET_free (op);
419 453
420 if (NULL != ch->transmit_head) 454 if (NULL != ch->tmit_head)
421 transmit_next (ch); 455 transmit_next (ch);
422 456
423 if (GNUNET_NO == ch->in_receive) 457 if (GNUNET_NO == ch->in_receive)
@@ -438,10 +472,11 @@ send_next_message (void *cls, size_t size, void *buf)
438static void 472static void
439transmit_next (struct GNUNET_PSYC_Channel *ch) 473transmit_next (struct GNUNET_PSYC_Channel *ch)
440{ 474{
475 LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n");
441 if (NULL != ch->th || NULL == ch->client) 476 if (NULL != ch->th || NULL == ch->client)
442 return; 477 return;
443 478
444 struct OperationHandle *op = ch->transmit_head; 479 struct OperationHandle *op = ch->tmit_head;
445 if (NULL == op) 480 if (NULL == op)
446 return; 481 return;
447 482
@@ -472,14 +507,14 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
472 ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg); 507 ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg);
473 GNUNET_assert (NULL != ch->client); 508 GNUNET_assert (NULL != ch->client);
474 509
475 if (NULL == ch->transmit_head || 510 if (NULL == ch->tmit_head ||
476 ch->transmit_head->msg->type != ch->reconnect_msg->type) 511 ch->tmit_head->msg->type != ch->reconnect_msg->type)
477 { 512 {
478 uint16_t reconn_size = ntohs (ch->reconnect_msg->size); 513 uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
479 struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size); 514 struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size);
480 memcpy (&op[1], ch->reconnect_msg, reconn_size); 515 memcpy (&op[1], ch->reconnect_msg, reconn_size);
481 op->msg = (struct GNUNET_MessageHeader *) &op[1]; 516 op->msg = (struct GNUNET_MessageHeader *) &op[1];
482 GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); 517 GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, op);
483 } 518 }
484 transmit_next (ch); 519 transmit_next (ch);
485} 520}
@@ -496,7 +531,7 @@ disconnect (void *c)
496 struct GNUNET_PSYC_Channel *ch = c; 531 struct GNUNET_PSYC_Channel *ch = c;
497 532
498 GNUNET_assert (NULL != ch); 533 GNUNET_assert (NULL != ch);
499 if (ch->transmit_head != ch->transmit_tail) 534 if (ch->tmit_head != ch->tmit_tail)
500 { 535 {
501 LOG (GNUNET_ERROR_TYPE_ERROR, 536 LOG (GNUNET_ERROR_TYPE_ERROR,
502 "Disconnecting while there are still outstanding messages!\n"); 537 "Disconnecting while there are still outstanding messages!\n");
@@ -654,7 +689,7 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod)
654 memcpy (&pmod[1], mod->name, name_size); 689 memcpy (&pmod[1], mod->name, name_size);
655 memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size); 690 memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size);
656 691
657 GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); 692 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
658 return GNUNET_YES; 693 return GNUNET_YES;
659} 694}
660 695
@@ -699,7 +734,7 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
699 pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env)); 734 pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env));
700 memcpy (&pmeth[1], method_name, size); 735 memcpy (&pmeth[1], method_name, size);
701 736
702 GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); 737 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
703 GNUNET_ENV_environment_iterate (env, send_modifier, master); 738 GNUNET_ENV_environment_iterate (env, send_modifier, master);
704 transmit_next (ch); 739 transmit_next (ch);
705 740
@@ -720,7 +755,12 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
720void 755void
721GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) 756GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
722{ 757{
723 master_transmit_data (th->master); 758 struct GNUNET_PSYC_Channel *ch = &th->master->ch;
759 if (GNUNET_NO == ch->tmit_ack_pending)
760 {
761 ch->tmit_paused = GNUNET_NO;
762 master_transmit_data (th->master);
763 }
724} 764}
725 765
726 766
@@ -938,8 +978,8 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
938 slvadd->header.size = htons (sizeof (*slvadd)); 978 slvadd->header.size = htons (sizeof (*slvadd));
939 slvadd->announced_at = GNUNET_htonll (announced_at); 979 slvadd->announced_at = GNUNET_htonll (announced_at);
940 slvadd->effective_since = GNUNET_htonll (effective_since); 980 slvadd->effective_since = GNUNET_htonll (effective_since);
941 GNUNET_CONTAINER_DLL_insert_tail (channel->transmit_head, 981 GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
942 channel->transmit_tail, 982 channel->tmit_tail,
943 op); 983 op);
944 transmit_next (channel); 984 transmit_next (channel);
945} 985}
@@ -979,8 +1019,8 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
979 slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM; 1019 slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM;
980 slvrm->header.size = htons (sizeof (*slvrm)); 1020 slvrm->header.size = htons (sizeof (*slvrm));
981 slvrm->announced_at = GNUNET_htonll (announced_at); 1021 slvrm->announced_at = GNUNET_htonll (announced_at);
982 GNUNET_CONTAINER_DLL_insert_tail (channel->transmit_head, 1022 GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
983 channel->transmit_tail, 1023 channel->tmit_tail,
984 op); 1024 op);
985 transmit_next (channel); 1025 transmit_next (channel);
986} 1026}