aboutsummaryrefslogtreecommitdiff
path: root/src/psyc/psyc_api.c
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-01-06 00:09:43 +0000
committerGabor X Toth <*@tg-x.net>2014-01-06 00:09:43 +0000
commit1a0ffe2288b97b47a5b2bfbda2f9438680429422 (patch)
tree72db4cd67f06253a60bf3e2966fd0b1bf55eba5c /src/psyc/psyc_api.c
parent43d497d7c4ebb6efae37ae4bb2f812a68aa64a32 (diff)
downloadgnunet-1a0ffe2288b97b47a5b2bfbda2f9438680429422.tar.gz
gnunet-1a0ffe2288b97b47a5b2bfbda2f9438680429422.zip
psyc: ipc messages, notify callback for modifiers, tests
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r--src/psyc/psyc_api.c443
1 files changed, 245 insertions, 198 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index a5a01fa92..e904e00b5 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -45,7 +45,7 @@ struct OperationHandle
45{ 45{
46 struct OperationHandle *prev; 46 struct OperationHandle *prev;
47 struct OperationHandle *next; 47 struct OperationHandle *next;
48 const struct GNUNET_MessageHeader *msg; 48 struct GNUNET_MessageHeader *msg;
49}; 49};
50 50
51/** 51/**
@@ -79,6 +79,11 @@ struct GNUNET_PSYC_Channel
79 struct OperationHandle *tmit_tail; 79 struct OperationHandle *tmit_tail;
80 80
81 /** 81 /**
82 * Message being transmitted to the PSYC service.
83 */
84 struct OperationHandle *tmit_msg;
85
86 /**
82 * Message to send on reconnect. 87 * Message to send on reconnect.
83 */ 88 */
84 struct GNUNET_MessageHeader *reconnect_msg; 89 struct GNUNET_MessageHeader *reconnect_msg;
@@ -139,11 +144,6 @@ struct GNUNET_PSYC_Channel
139 uint32_t recv_mod_value_size; 144 uint32_t recv_mod_value_size;
140 145
141 /** 146 /**
142 * Buffer space available for transmitting the next data fragment.
143 */
144 uint16_t tmit_size; // FIXME
145
146 /**
147 * Is transmission paused? 147 * Is transmission paused?
148 */ 148 */
149 uint8_t tmit_paused; 149 uint8_t tmit_paused;
@@ -151,7 +151,7 @@ struct GNUNET_PSYC_Channel
151 /** 151 /**
152 * Are we still waiting for a PSYC_TRANSMIT_ACK? 152 * Are we still waiting for a PSYC_TRANSMIT_ACK?
153 */ 153 */
154 uint8_t tmit_ack_pending; // FIXME 154 uint8_t tmit_ack_pending;
155 155
156 /** 156 /**
157 * Are we polling for incoming messages right now? 157 * Are we polling for incoming messages right now?
@@ -176,7 +176,7 @@ struct GNUNET_PSYC_Channel
176struct GNUNET_PSYC_MasterTransmitHandle 176struct GNUNET_PSYC_MasterTransmitHandle
177{ 177{
178 struct GNUNET_PSYC_Master *master; 178 struct GNUNET_PSYC_Master *master;
179 GNUNET_PSYC_MasterTransmitNotify notify_mod; 179 GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod;
180 GNUNET_PSYC_MasterTransmitNotify notify_data; 180 GNUNET_PSYC_MasterTransmitNotify notify_data;
181 void *notify_cls; 181 void *notify_cls;
182 enum MessageState state; 182 enum MessageState state;
@@ -246,16 +246,14 @@ struct GNUNET_PSYC_StateQuery
246}; 246};
247 247
248 248
249/**
250 * Try again to connect to the PSYC service.
251 *
252 * @param cls Handle to the PSYC service.
253 * @param tc Scheduler context
254 */
255static void 249static void
256reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); 250reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
257 251
258 252
253static void
254master_transmit_data (struct GNUNET_PSYC_Master *mst);
255
256
259/** 257/**
260 * Reschedule a connect attempt to the service. 258 * Reschedule a connect attempt to the service.
261 * 259 *
@@ -323,6 +321,79 @@ recv_error (struct GNUNET_PSYC_Channel *ch)
323 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL); 321 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL);
324} 322}
325 323
324
325/**
326 * Queue an incoming message part for transmission to the PSYC service.
327 *
328 * The message part is added to the current message buffer.
329 * When this buffer is full, it is added to the transmission queue.
330 *
331 * @param ch Channel struct for the client.
332 * @param msg Modifier message part, or NULL when there's no more modifiers.
333 * @param end End of message.
334 */
335static void
336queue_message (struct GNUNET_PSYC_Channel *ch,
337 const struct GNUNET_MessageHeader *msg,
338 uint8_t end)
339{
340 uint16_t size = msg ? ntohs (msg->size) : 0;
341
342 LOG (GNUNET_ERROR_TYPE_DEBUG,
343 "Queueing message of type %u and size %u (end: %u)).\n",
344 ntohs (msg->type), size, end);
345
346 struct OperationHandle *op = ch->tmit_msg;
347 if (NULL != op)
348 {
349 if (NULL == msg
350 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < op->msg->size + size)
351 {
352 /* End of message or buffer is full, add it to transmission queue
353 * and start with empty buffer */
354 op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
355 op->msg->size = htons (op->msg->size);
356 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
357 ch->tmit_msg = op = NULL;
358 ch->tmit_ack_pending++;
359 }
360 else
361 {
362 /* Message fits in current buffer, append */
363 ch->tmit_msg = op
364 = GNUNET_realloc (op, sizeof (*op) + op->msg->size + size);
365 op->msg = (struct GNUNET_MessageHeader *) &op[1];
366 memcpy ((char *) op->msg + op->msg->size, msg, size);
367 op->msg->size += size;
368 }
369 }
370
371 if (NULL == op && NULL != msg)
372 {
373 /* Empty buffer, copy over message. */
374 ch->tmit_msg = op
375 = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) + size);
376 op->msg = (struct GNUNET_MessageHeader *) &op[1];
377 op->msg->size = sizeof (*op->msg) + size;
378 memcpy (&op->msg[1], msg, size);
379 }
380
381 if (NULL != op
382 && (end || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
383 < op->msg->size + sizeof (struct GNUNET_MessageHeader))))
384 {
385 /* End of message or buffer is full, add it to transmission queue. */
386 op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
387 op->msg->size = htons (op->msg->size);
388 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
389 ch->tmit_msg = op = NULL;
390 ch->tmit_ack_pending++;
391 }
392
393 transmit_next (ch);
394}
395
396
326/** 397/**
327 * Request a modifier from a client to transmit. 398 * Request a modifier from a client to transmit.
328 * 399 *
@@ -332,32 +403,71 @@ static void
332master_transmit_mod (struct GNUNET_PSYC_Master *mst) 403master_transmit_mod (struct GNUNET_PSYC_Master *mst)
333{ 404{
334 struct GNUNET_PSYC_Channel *ch = &mst->ch; 405 struct GNUNET_PSYC_Channel *ch = &mst->ch;
335 uint16_t max_data_size 406 uint16_t max_data_size, data_size;
336 = ch->tmit_size > sizeof (struct GNUNET_MessageHeader) 407 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
337 ? GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - ch->tmit_size 408 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
338 : GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD - ch->tmit_size; 409 int notify_ret;
339 uint16_t data_size = max_data_size;
340 410
341 struct GNUNET_MessageHeader *msg; 411 switch (mst->tmit->state)
342 struct OperationHandle *op 412 {
343 = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size); 413 case MSG_STATE_MODIFIER:
344 op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; 414 {
345 msg->type 415 struct GNUNET_PSYC_MessageModifier *mod
346 = MSG_STATE_MODIFIER == mst->tmit->state 416 = (struct GNUNET_PSYC_MessageModifier *) msg;
347 ? htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER) 417 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
348 : htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); 418 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
419 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
420 notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls,
421 &data_size, &mod[1], &mod->oper);
422 mod->name_size = strnlen ((char *) &mod[1], data_size);
423 if (mod->name_size < data_size)
424 {
425 mod->oper = htons (mod->oper);
426 mod->value_size = htons (data_size - 1 - mod->name_size);
427 mod->name_size = htons (mod->name_size);
428 }
429 else if (0 < data_size)
430 {
431 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
432 notify_ret = GNUNET_SYSERR;
433 }
434 break;
435 }
436 case MSG_STATE_MOD_CONT:
437 {
438 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
439 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
440 msg->size = sizeof (struct GNUNET_MessageHeader);
441 notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls,
442 &data_size, &msg[1], NULL);
443 break;
444 }
445 default:
446 GNUNET_assert (0);
447 }
349 448
350 int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls,
351 &data_size, &msg[1]);
352 switch (notify_ret) 449 switch (notify_ret)
353 { 450 {
354 case GNUNET_NO: 451 case GNUNET_NO:
355 if (0 != data_size) 452 if (0 == data_size)
356 mst->tmit->state = MSG_STATE_MOD_CONT; 453 { /* Transmission paused, nothing to send. */
454 ch->tmit_paused = GNUNET_YES;
455 return;
456 }
457 mst->tmit->state = MSG_STATE_MOD_CONT;
357 break; 458 break;
358 459
359 case GNUNET_YES: 460 case GNUNET_YES:
360 mst->tmit->state = (0 == data_size) ? MSG_STATE_DATA : MSG_STATE_MODIFIER; 461 if (0 == data_size)
462 {
463 /* End of modifiers. */
464 mst->tmit->state = MSG_STATE_DATA;
465 if (0 == ch->tmit_ack_pending)
466 master_transmit_data (mst);
467
468 return;
469 }
470 mst->tmit->state = MSG_STATE_MODIFIER;
361 break; 471 break;
362 472
363 default: 473 default:
@@ -368,36 +478,18 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
368 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); 478 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
369 msg->size = htons (sizeof (*msg)); 479 msg->size = htons (sizeof (*msg));
370 480
371 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); 481 queue_message (ch, msg, GNUNET_YES);
372 transmit_next (ch);
373 return; 482 return;
374 } 483 }
375 484
376 if ((GNUNET_NO == notify_ret && 0 == data_size))
377 {
378 /* Transmission paused, nothing to send. */
379 ch->tmit_paused = GNUNET_YES;
380 GNUNET_free (op);
381 }
382
383 if (0 < data_size) 485 if (0 < data_size)
384 { 486 {
385 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); 487 GNUNET_assert (data_size <= max_data_size);
386 msg->size = htons (sizeof (*msg) + data_size); 488 msg->size = htons (msg->size + data_size);
387 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); 489 queue_message (ch, msg, GNUNET_NO);
388 }
389
390 /* End of message. */
391 if (GNUNET_YES == notify_ret)
392 {
393 op = GNUNET_malloc (sizeof *(op) + sizeof (*msg));
394 op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
395 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
396 msg->size = htons (sizeof (*msg));
397 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
398 } 490 }
399 491
400 transmit_next (ch); 492 master_transmit_mod (mst);
401} 493}
402 494
403 495
@@ -410,11 +502,10 @@ static void
410master_transmit_data (struct GNUNET_PSYC_Master *mst) 502master_transmit_data (struct GNUNET_PSYC_Master *mst)
411{ 503{
412 struct GNUNET_PSYC_Channel *ch = &mst->ch; 504 struct GNUNET_PSYC_Channel *ch = &mst->ch;
413 struct GNUNET_MessageHeader *msg;
414 uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; 505 uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
415 struct OperationHandle *op 506 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
416 = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size); 507 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
417 op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; 508
418 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); 509 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
419 510
420 int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, 511 int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls,
@@ -426,7 +517,7 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
426 { 517 {
427 /* Transmission paused, nothing to send. */ 518 /* Transmission paused, nothing to send. */
428 ch->tmit_paused = GNUNET_YES; 519 ch->tmit_paused = GNUNET_YES;
429 GNUNET_free (op); 520 return;
430 } 521 }
431 break; 522 break;
432 523
@@ -441,9 +532,7 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
441 mst->tmit->state = MSG_STATE_START; 532 mst->tmit->state = MSG_STATE_START;
442 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); 533 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
443 msg->size = htons (sizeof (*msg)); 534 msg->size = htons (sizeof (*msg));
444 535 queue_message (ch, msg, GNUNET_YES);
445 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
446 transmit_next (ch);
447 return; 536 return;
448 } 537 }
449 538
@@ -451,20 +540,16 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
451 { 540 {
452 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); 541 GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
453 msg->size = htons (sizeof (*msg) + data_size); 542 msg->size = htons (sizeof (*msg) + data_size);
454 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); 543 queue_message (ch, msg, !notify_ret);
455 } 544 }
456 545
457 /* End of message. */ 546 /* End of message. */
458 if (GNUNET_YES == notify_ret) 547 if (GNUNET_YES == notify_ret)
459 { 548 {
460 op = GNUNET_malloc (sizeof *(op) + sizeof (*msg));
461 op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
462 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); 549 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
463 msg->size = htons (sizeof (*msg)); 550 msg->size = htons (sizeof (*msg));
464 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); 551 queue_message (ch, msg, GNUNET_YES);
465 } 552 }
466
467 transmit_next (ch);
468} 553}
469 554
470 555
@@ -476,57 +561,55 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
476 */ 561 */
477static void 562static void
478handle_psyc_message (struct GNUNET_PSYC_Channel *ch, 563handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
479 const struct GNUNET_PSYC_MessageHeader *pmsg) 564 const struct GNUNET_PSYC_MessageHeader *msg)
480{ 565{
481 const struct GNUNET_MessageHeader *msg; 566 uint16_t size = ntohs (msg->header.size);
482 uint16_t msize = ntohs (pmsg->header.size);
483 uint16_t pos = 0;
484 uint16_t size = 0;
485 uint16_t type, size_eq, size_min;
486 567
487 if (MSG_STATE_START == ch->recv_state) 568 if (MSG_STATE_START == ch->recv_state)
488 { 569 {
489 ch->recv_message_id = GNUNET_ntohll (pmsg->message_id); 570 ch->recv_message_id = GNUNET_ntohll (msg->message_id);
490 ch->recv_flags = ntohl (pmsg->flags); 571 ch->recv_flags = ntohl (msg->flags);
491 } 572 }
492 else if (GNUNET_ntohll (pmsg->message_id) != ch->recv_message_id) 573 else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
493 { 574 {
494 LOG (GNUNET_ERROR_TYPE_WARNING, 575 LOG (GNUNET_ERROR_TYPE_WARNING,
495 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", 576 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
496 GNUNET_ntohll (pmsg->message_id), ch->recv_message_id); 577 GNUNET_ntohll (msg->message_id), ch->recv_message_id);
497 GNUNET_break_op (0); 578 GNUNET_break_op (0);
498 recv_error (ch); 579 recv_error (ch);
499 } 580 }
500 else if (ntohl (pmsg->flags) != ch->recv_flags) 581 else if (ntohl (msg->flags) != ch->recv_flags)
501 { 582 {
502 LOG (GNUNET_ERROR_TYPE_WARNING, 583 LOG (GNUNET_ERROR_TYPE_WARNING,
503 "Unexpected message flags. Got: %lu, expected: %lu\n", 584 "Unexpected message flags. Got: %lu, expected: %lu\n",
504 ntohl (pmsg->flags), ch->recv_flags); 585 ntohl (msg->flags), ch->recv_flags);
505 GNUNET_break_op (0); 586 GNUNET_break_op (0);
506 recv_error (ch); 587 recv_error (ch);
507 } 588 }
508 589
509 for (pos = 0; sizeof (*pmsg) + pos < msize; pos += size) 590 uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
591
592 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
510 { 593 {
511 msg = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); 594 const struct GNUNET_MessageHeader *pmsg
512 size = ntohs (msg->size); 595 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
513 type = ntohs (msg->type); 596 psize = ntohs (pmsg->size);
597 ptype = ntohs (pmsg->type);
514 size_eq = size_min = 0; 598 size_eq = size_min = 0;
515 599
516 if (msize < sizeof (*pmsg) + pos + size) 600 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
601 "Received message part of type %u and size %u from PSYC.\n",
602 ptype, psize);
603
604 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
517 { 605 {
518 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 606 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
519 "Discarding message of type %u with invalid size. " 607 "Discarding message of type %u with invalid size %u.\n",
520 "(%u < %u + %u + %u)\n", ntohs (msg->type), 608 ptype, psize);
521 msize, sizeof (*msg), pos, size);
522 break; 609 break;
523 } 610 }
524 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
525 "Received message part of type %u and size %u from PSYC.\n",
526 ntohs (msg->type), size);
527 611
528 612 switch (ptype)
529 switch (type)
530 { 613 {
531 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: 614 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
532 size_min = sizeof (struct GNUNET_PSYC_MessageMethod); 615 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
@@ -534,6 +617,7 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
534 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: 617 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
535 size_min = sizeof (struct GNUNET_PSYC_MessageModifier); 618 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
536 break; 619 break;
620 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
537 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: 621 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
538 size_min = sizeof (struct GNUNET_MessageHeader); 622 size_min = sizeof (struct GNUNET_MessageHeader);
539 break; 623 break;
@@ -543,22 +627,22 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
543 break; 627 break;
544 } 628 }
545 629
546 if (! ((0 < size_eq && size == size_eq) 630 if (! ((0 < size_eq && psize == size_eq)
547 || (0 < size_min && size_min <= size))) 631 || (0 < size_min && size_min <= psize)))
548 { 632 {
549 GNUNET_break (0); 633 GNUNET_break (0);
550 reschedule_connect (ch); 634 reschedule_connect (ch);
551 return; 635 return;
552 } 636 }
553 637
554 switch (type) 638 switch (ptype)
555 { 639 {
556 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: 640 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
557 { 641 {
558 struct GNUNET_PSYC_MessageMethod *meth 642 struct GNUNET_PSYC_MessageMethod *meth
559 = (struct GNUNET_PSYC_MessageMethod *) msg; 643 = (struct GNUNET_PSYC_MessageMethod *) pmsg;
560 644
561 if (MSG_STATE_HEADER != ch->recv_state) 645 if (MSG_STATE_START != ch->recv_state)
562 { 646 {
563 LOG (GNUNET_ERROR_TYPE_WARNING, 647 LOG (GNUNET_ERROR_TYPE_WARNING,
564 "Discarding out of order message method.\n"); 648 "Discarding out of order message method.\n");
@@ -568,89 +652,66 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
568 */ 652 */
569 GNUNET_break_op (0); 653 GNUNET_break_op (0);
570 recv_error (ch); 654 recv_error (ch);
571 break; 655 return;
572 } 656 }
573 657
574 if ('\0' != (char *) meth + msg->size - 1) 658 if ('\0' != *((char *) meth + psize - 1))
575 { 659 {
576 LOG (GNUNET_ERROR_TYPE_WARNING, 660 LOG (GNUNET_ERROR_TYPE_WARNING,
577 "Discarding message with malformed method. " 661 "Discarding message with malformed method. "
578 "Message ID: %" PRIu64 "\n", ch->recv_message_id); 662 "Message ID: %" PRIu64 "\n", ch->recv_message_id);
579 GNUNET_break_op (0); 663 GNUNET_break_op (0);
580 recv_error (ch); 664 recv_error (ch);
581 break; 665 return;
582 } 666 }
583 GNUNET_PSYC_MessageCallback message_cb
584 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
585 ? ch->hist_message_cb
586 : ch->message_cb;
587
588 if (NULL != message_cb)
589 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
590
591 ch->recv_state = MSG_STATE_METHOD; 667 ch->recv_state = MSG_STATE_METHOD;
592 break; 668 break;
593 } 669 }
594 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: 670 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
595 { 671 {
596 if (MSG_STATE_MODIFIER != ch->recv_state) 672 if (!(MSG_STATE_METHOD == ch->recv_state
673 || MSG_STATE_MODIFIER == ch->recv_state
674 || MSG_STATE_MOD_CONT == ch->recv_state))
597 { 675 {
598 LOG (GNUNET_ERROR_TYPE_WARNING, 676 LOG (GNUNET_ERROR_TYPE_WARNING,
599 "Discarding out of order message modifier.\n"); 677 "Discarding out of order message modifier.\n");
600 GNUNET_break_op (0); 678 GNUNET_break_op (0);
601 recv_error (ch); 679 recv_error (ch);
602 break; 680 return;
603 } 681 }
604 682
605 struct GNUNET_PSYC_MessageModifier *mod 683 struct GNUNET_PSYC_MessageModifier *mod
606 = (struct GNUNET_PSYC_MessageModifier *) msg; 684 = (struct GNUNET_PSYC_MessageModifier *) pmsg;
607 685
608 uint16_t name_size = ntohs (mod->name_size); 686 uint16_t name_size = ntohs (mod->name_size);
609 ch->recv_mod_value_size_expected = ntohs (mod->value_size); 687 ch->recv_mod_value_size_expected = ntohs (mod->value_size);
610 ch->recv_mod_value_size = size - sizeof (*mod) - name_size - 1; 688 ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1;
611 689
612 if (size < sizeof (*mod) + name_size + 1 690 if (psize < sizeof (*mod) + name_size + 1
613 || '\0' != (char *) &mod[1] + mod->name_size 691 || '\0' != *((char *) &mod[1] + name_size)
614 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) 692 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
615 { 693 {
616 LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n"); 694 LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n");
617 GNUNET_break_op (0); 695 GNUNET_break_op (0);
618 break; 696 return;
619 } 697 }
620
621 ch->recv_state = MSG_STATE_MODIFIER; 698 ch->recv_state = MSG_STATE_MODIFIER;
622
623 GNUNET_PSYC_MessageCallback message_cb
624 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
625 ? ch->hist_message_cb
626 : ch->message_cb;
627
628 if (NULL != message_cb)
629 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
630
631 break; 699 break;
632 } 700 }
633 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: 701 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
634 { 702 {
635 ch->recv_mod_value_size += size - sizeof (*msg); 703 ch->recv_mod_value_size += psize - sizeof (*pmsg);
636 704
637 if (MSG_STATE_MODIFIER != ch->recv_state 705 if (!(MSG_STATE_MODIFIER == ch->recv_state
706 || MSG_STATE_MOD_CONT == ch->recv_state)
638 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) 707 || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
639 { 708 {
640 LOG (GNUNET_ERROR_TYPE_WARNING, 709 LOG (GNUNET_ERROR_TYPE_WARNING,
641 "Discarding out of order message modifier continuation.\n"); 710 "Discarding out of order message modifier continuation.\n");
642 GNUNET_break_op (0); 711 GNUNET_break_op (0);
643 recv_reset (ch); 712 recv_reset (ch);
644 break; 713 return;
645 } 714 }
646
647 GNUNET_PSYC_MessageCallback message_cb
648 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
649 ? ch->hist_message_cb
650 : ch->message_cb;
651
652 if (NULL != message_cb)
653 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
654 break; 715 break;
655 } 716 }
656 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: 717 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
@@ -662,12 +723,23 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
662 "Discarding out of order message data fragment.\n"); 723 "Discarding out of order message data fragment.\n");
663 GNUNET_break_op (0); 724 GNUNET_break_op (0);
664 recv_reset (ch); 725 recv_reset (ch);
665 break; 726 return;
666 } 727 }
667
668 ch->recv_state = MSG_STATE_DATA; 728 ch->recv_state = MSG_STATE_DATA;
669 break; 729 break;
670 } 730 }
731 }
732
733 GNUNET_PSYC_MessageCallback message_cb
734 = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
735 ? ch->hist_message_cb
736 : ch->message_cb;
737
738 if (NULL != message_cb)
739 message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg);
740
741 switch (ptype)
742 {
671 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: 743 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
672 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: 744 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
673 recv_reset (ch); 745 recv_reset (ch);
@@ -717,18 +789,7 @@ message_handler (void *cls,
717 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: 789 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
718 size_min = sizeof (struct GNUNET_PSYC_MessageHeader); 790 size_min = sizeof (struct GNUNET_PSYC_MessageHeader);
719 break; 791 break;
720 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: 792 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
721 size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
722 break;
723 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
724 size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
725 break;
726 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
727 size_min = sizeof (struct GNUNET_MessageHeader);
728 break;
729 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
730 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
731 case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
732 size_eq = sizeof (struct GNUNET_MessageHeader); 793 size_eq = sizeof (struct GNUNET_MessageHeader);
733 break; 794 break;
734 } 795 }
@@ -761,9 +822,15 @@ message_handler (void *cls,
761#endif 822#endif
762 break; 823 break;
763 } 824 }
764 case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: 825 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
765 { 826 {
766 ch->tmit_ack_pending = GNUNET_NO; 827 if (0 == ch->tmit_ack_pending)
828 {
829 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
830 GNUNET_break (0);
831 break;
832 }
833 ch->tmit_ack_pending--;
767 834
768 if (ch->is_master) 835 if (ch->is_master)
769 { 836 {
@@ -771,10 +838,6 @@ message_handler (void *cls,
771 switch (mst->tmit->state) 838 switch (mst->tmit->state)
772 { 839 {
773 case MSG_STATE_MODIFIER: 840 case MSG_STATE_MODIFIER:
774 if (GNUNET_NO == ch->tmit_paused)
775 master_transmit_mod (mst);
776 break;
777
778 case MSG_STATE_MOD_CONT: 841 case MSG_STATE_MOD_CONT:
779 if (GNUNET_NO == ch->tmit_paused) 842 if (GNUNET_NO == ch->tmit_paused)
780 master_transmit_mod (mst); 843 master_transmit_mod (mst);
@@ -795,12 +858,13 @@ message_handler (void *cls,
795 else 858 else
796 { 859 {
797 LOG (GNUNET_ERROR_TYPE_WARNING, 860 LOG (GNUNET_ERROR_TYPE_WARNING,
798 "Ignoring transmit ack, there's no transmission going on.\n"); 861 "Ignoring message ACK, there's no transmission going on.\n");
862 GNUNET_break (0);
799 } 863 }
800 break; 864 break;
801 default: 865 default:
802 LOG (GNUNET_ERROR_TYPE_WARNING, 866 LOG (GNUNET_ERROR_TYPE_DEBUG,
803 "Ignoring unexpected transmit ack.\n"); 867 "Ignoring message ACK in state %u.\n", mst->tmit->state);
804 } 868 }
805 } 869 }
806 else 870 else
@@ -811,12 +875,15 @@ message_handler (void *cls,
811 } 875 }
812 876
813 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: 877 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
814 handle_psyc_message(ch, (const struct GNUNET_PSYC_MessageHeader *) msg); 878 handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
815 break; 879 break;
816 } 880 }
817 881
818 GNUNET_CLIENT_receive (ch->client, &message_handler, ch, 882 if (NULL != ch->client)
819 GNUNET_TIME_UNIT_FOREVER_REL); 883 {
884 GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
885 GNUNET_TIME_UNIT_FOREVER_REL);
886 }
820} 887}
821 888
822 889
@@ -1029,6 +1096,8 @@ void
1029GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) 1096GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
1030{ 1097{
1031 disconnect (master); 1098 disconnect (master);
1099 if (NULL != master->tmit)
1100 GNUNET_free (master->tmit);
1032 GNUNET_free (master); 1101 GNUNET_free (master);
1033} 1102}
1034 1103
@@ -1069,30 +1138,6 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
1069} 1138}
1070 1139
1071 1140
1072/* FIXME: split up value into <64K chunks and transmit the continuations in
1073 * MOD_CONT msgs */
1074static int
1075send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod)
1076{
1077 struct GNUNET_PSYC_Channel *ch = cls;
1078 size_t name_size = strlen (mod->name) + 1;
1079 struct GNUNET_PSYC_MessageModifier *pmod;
1080 struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmod)
1081 + name_size + mod->value_size);
1082 pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
1083 op->msg = (struct GNUNET_MessageHeader *) pmod;
1084
1085 pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
1086 pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size);
1087 pmod->name_size = htons (name_size);
1088 memcpy (&pmod[1], mod->name, name_size);
1089 memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size);
1090
1091 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
1092 return GNUNET_YES;
1093}
1094
1095
1096/** 1141/**
1097 * Send a message to call a method to all members in the PSYC channel. 1142 * Send a message to call a method to all members in the PSYC channel.
1098 * 1143 *
@@ -1107,7 +1152,7 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod)
1107struct GNUNET_PSYC_MasterTransmitHandle * 1152struct GNUNET_PSYC_MasterTransmitHandle *
1108GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, 1153GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
1109 const char *method_name, 1154 const char *method_name,
1110 GNUNET_PSYC_MasterTransmitNotify notify_mod, 1155 GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod,
1111 GNUNET_PSYC_MasterTransmitNotify notify_data, 1156 GNUNET_PSYC_MasterTransmitNotify notify_data,
1112 void *notify_cls, 1157 void *notify_cls,
1113 enum GNUNET_PSYC_MasterTransmitFlags flags) 1158 enum GNUNET_PSYC_MasterTransmitFlags flags)
@@ -1120,25 +1165,27 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
1120 1165
1121 size_t size = strlen (method_name) + 1; 1166 size_t size = strlen (method_name) + 1;
1122 struct GNUNET_PSYC_MessageMethod *pmeth; 1167 struct GNUNET_PSYC_MessageMethod *pmeth;
1123 struct OperationHandle *op 1168 struct OperationHandle *op;
1124 = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth) + size);
1125 pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1];
1126 op->msg = (struct GNUNET_MessageHeader *) pmeth;
1127 1169
1170 ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
1171 + sizeof (*pmeth) + size);
1172 op->msg = (struct GNUNET_MessageHeader *) &op[1];
1173 op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
1174
1175 pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
1128 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); 1176 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
1129 pmeth->header.size = htons (sizeof (*pmeth) + size); 1177 pmeth->header.size = htons (sizeof (*pmeth) + size);
1130 pmeth->flags = htonl (flags); 1178 pmeth->flags = htonl (flags);
1131 memcpy (&pmeth[1], method_name, size); 1179 memcpy (&pmeth[1], method_name, size);
1132 1180
1133 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
1134 transmit_next (ch);
1135
1136 master->tmit = GNUNET_malloc (sizeof (*master->tmit)); 1181 master->tmit = GNUNET_malloc (sizeof (*master->tmit));
1137 master->tmit->master = master; 1182 master->tmit->master = master;
1138 master->tmit->notify_mod = notify_mod; 1183 master->tmit->notify_mod = notify_mod;
1139 master->tmit->notify_data = notify_data; 1184 master->tmit->notify_data = notify_data;
1140 master->tmit->notify_cls = notify_cls; 1185 master->tmit->notify_cls = notify_cls;
1141 master->tmit->state = MSG_STATE_START; // FIXME 1186 master->tmit->state = MSG_STATE_MODIFIER;
1187
1188 master_transmit_mod (master);
1142 return master->tmit; 1189 return master->tmit;
1143} 1190}
1144 1191
@@ -1152,7 +1199,7 @@ void
1152GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) 1199GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
1153{ 1200{
1154 struct GNUNET_PSYC_Channel *ch = &th->master->ch; 1201 struct GNUNET_PSYC_Channel *ch = &th->master->ch;
1155 if (GNUNET_NO == ch->tmit_ack_pending) 1202 if (0 == ch->tmit_ack_pending)
1156 { 1203 {
1157 ch->tmit_paused = GNUNET_NO; 1204 ch->tmit_paused = GNUNET_NO;
1158 master_transmit_data (th->master); 1205 master_transmit_data (th->master);