diff options
author | Gabor X Toth <*@tg-x.net> | 2014-01-06 00:09:43 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2014-01-06 00:09:43 +0000 |
commit | 1a0ffe2288b97b47a5b2bfbda2f9438680429422 (patch) | |
tree | 72db4cd67f06253a60bf3e2966fd0b1bf55eba5c /src/psyc/psyc_api.c | |
parent | 43d497d7c4ebb6efae37ae4bb2f812a68aa64a32 (diff) | |
download | gnunet-1a0ffe2288b97b47a5b2bfbda2f9438680429422.tar.gz gnunet-1a0ffe2288b97b47a5b2bfbda2f9438680429422.zip |
psyc: ipc messages, notify callback for modifiers, tests
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r-- | src/psyc/psyc_api.c | 443 |
1 files changed, 245 insertions, 198 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index a5a01fa92..e904e00b5 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -45,7 +45,7 @@ struct OperationHandle | |||
45 | { | 45 | { |
46 | struct OperationHandle *prev; | 46 | struct OperationHandle *prev; |
47 | struct OperationHandle *next; | 47 | struct OperationHandle *next; |
48 | const struct GNUNET_MessageHeader *msg; | 48 | struct GNUNET_MessageHeader *msg; |
49 | }; | 49 | }; |
50 | 50 | ||
51 | /** | 51 | /** |
@@ -79,6 +79,11 @@ struct GNUNET_PSYC_Channel | |||
79 | struct OperationHandle *tmit_tail; | 79 | struct OperationHandle *tmit_tail; |
80 | 80 | ||
81 | /** | 81 | /** |
82 | * Message being transmitted to the PSYC service. | ||
83 | */ | ||
84 | struct OperationHandle *tmit_msg; | ||
85 | |||
86 | /** | ||
82 | * Message to send on reconnect. | 87 | * Message to send on reconnect. |
83 | */ | 88 | */ |
84 | struct GNUNET_MessageHeader *reconnect_msg; | 89 | struct GNUNET_MessageHeader *reconnect_msg; |
@@ -139,11 +144,6 @@ struct GNUNET_PSYC_Channel | |||
139 | uint32_t recv_mod_value_size; | 144 | uint32_t recv_mod_value_size; |
140 | 145 | ||
141 | /** | 146 | /** |
142 | * Buffer space available for transmitting the next data fragment. | ||
143 | */ | ||
144 | uint16_t tmit_size; // FIXME | ||
145 | |||
146 | /** | ||
147 | * Is transmission paused? | 147 | * Is transmission paused? |
148 | */ | 148 | */ |
149 | uint8_t tmit_paused; | 149 | uint8_t tmit_paused; |
@@ -151,7 +151,7 @@ struct GNUNET_PSYC_Channel | |||
151 | /** | 151 | /** |
152 | * Are we still waiting for a PSYC_TRANSMIT_ACK? | 152 | * Are we still waiting for a PSYC_TRANSMIT_ACK? |
153 | */ | 153 | */ |
154 | uint8_t tmit_ack_pending; // FIXME | 154 | uint8_t tmit_ack_pending; |
155 | 155 | ||
156 | /** | 156 | /** |
157 | * Are we polling for incoming messages right now? | 157 | * Are we polling for incoming messages right now? |
@@ -176,7 +176,7 @@ struct GNUNET_PSYC_Channel | |||
176 | struct GNUNET_PSYC_MasterTransmitHandle | 176 | struct GNUNET_PSYC_MasterTransmitHandle |
177 | { | 177 | { |
178 | struct GNUNET_PSYC_Master *master; | 178 | struct GNUNET_PSYC_Master *master; |
179 | GNUNET_PSYC_MasterTransmitNotify notify_mod; | 179 | GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod; |
180 | GNUNET_PSYC_MasterTransmitNotify notify_data; | 180 | GNUNET_PSYC_MasterTransmitNotify notify_data; |
181 | void *notify_cls; | 181 | void *notify_cls; |
182 | enum MessageState state; | 182 | enum MessageState state; |
@@ -246,16 +246,14 @@ struct GNUNET_PSYC_StateQuery | |||
246 | }; | 246 | }; |
247 | 247 | ||
248 | 248 | ||
249 | /** | ||
250 | * Try again to connect to the PSYC service. | ||
251 | * | ||
252 | * @param cls Handle to the PSYC service. | ||
253 | * @param tc Scheduler context | ||
254 | */ | ||
255 | static void | 249 | static void |
256 | reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | 250 | reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); |
257 | 251 | ||
258 | 252 | ||
253 | static void | ||
254 | master_transmit_data (struct GNUNET_PSYC_Master *mst); | ||
255 | |||
256 | |||
259 | /** | 257 | /** |
260 | * Reschedule a connect attempt to the service. | 258 | * Reschedule a connect attempt to the service. |
261 | * | 259 | * |
@@ -323,6 +321,79 @@ recv_error (struct GNUNET_PSYC_Channel *ch) | |||
323 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL); | 321 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL); |
324 | } | 322 | } |
325 | 323 | ||
324 | |||
325 | /** | ||
326 | * Queue an incoming message part for transmission to the PSYC service. | ||
327 | * | ||
328 | * The message part is added to the current message buffer. | ||
329 | * When this buffer is full, it is added to the transmission queue. | ||
330 | * | ||
331 | * @param ch Channel struct for the client. | ||
332 | * @param msg Modifier message part, or NULL when there's no more modifiers. | ||
333 | * @param end End of message. | ||
334 | */ | ||
335 | static void | ||
336 | queue_message (struct GNUNET_PSYC_Channel *ch, | ||
337 | const struct GNUNET_MessageHeader *msg, | ||
338 | uint8_t end) | ||
339 | { | ||
340 | uint16_t size = msg ? ntohs (msg->size) : 0; | ||
341 | |||
342 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
343 | "Queueing message of type %u and size %u (end: %u)).\n", | ||
344 | ntohs (msg->type), size, end); | ||
345 | |||
346 | struct OperationHandle *op = ch->tmit_msg; | ||
347 | if (NULL != op) | ||
348 | { | ||
349 | if (NULL == msg | ||
350 | || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < op->msg->size + size) | ||
351 | { | ||
352 | /* End of message or buffer is full, add it to transmission queue | ||
353 | * and start with empty buffer */ | ||
354 | op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
355 | op->msg->size = htons (op->msg->size); | ||
356 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
357 | ch->tmit_msg = op = NULL; | ||
358 | ch->tmit_ack_pending++; | ||
359 | } | ||
360 | else | ||
361 | { | ||
362 | /* Message fits in current buffer, append */ | ||
363 | ch->tmit_msg = op | ||
364 | = GNUNET_realloc (op, sizeof (*op) + op->msg->size + size); | ||
365 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
366 | memcpy ((char *) op->msg + op->msg->size, msg, size); | ||
367 | op->msg->size += size; | ||
368 | } | ||
369 | } | ||
370 | |||
371 | if (NULL == op && NULL != msg) | ||
372 | { | ||
373 | /* Empty buffer, copy over message. */ | ||
374 | ch->tmit_msg = op | ||
375 | = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) + size); | ||
376 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
377 | op->msg->size = sizeof (*op->msg) + size; | ||
378 | memcpy (&op->msg[1], msg, size); | ||
379 | } | ||
380 | |||
381 | if (NULL != op | ||
382 | && (end || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD | ||
383 | < op->msg->size + sizeof (struct GNUNET_MessageHeader)))) | ||
384 | { | ||
385 | /* End of message or buffer is full, add it to transmission queue. */ | ||
386 | op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
387 | op->msg->size = htons (op->msg->size); | ||
388 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
389 | ch->tmit_msg = op = NULL; | ||
390 | ch->tmit_ack_pending++; | ||
391 | } | ||
392 | |||
393 | transmit_next (ch); | ||
394 | } | ||
395 | |||
396 | |||
326 | /** | 397 | /** |
327 | * Request a modifier from a client to transmit. | 398 | * Request a modifier from a client to transmit. |
328 | * | 399 | * |
@@ -332,32 +403,71 @@ static void | |||
332 | master_transmit_mod (struct GNUNET_PSYC_Master *mst) | 403 | master_transmit_mod (struct GNUNET_PSYC_Master *mst) |
333 | { | 404 | { |
334 | struct GNUNET_PSYC_Channel *ch = &mst->ch; | 405 | struct GNUNET_PSYC_Channel *ch = &mst->ch; |
335 | uint16_t max_data_size | 406 | uint16_t max_data_size, data_size; |
336 | = ch->tmit_size > sizeof (struct GNUNET_MessageHeader) | 407 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; |
337 | ? GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - ch->tmit_size | 408 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; |
338 | : GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD - ch->tmit_size; | 409 | int notify_ret; |
339 | uint16_t data_size = max_data_size; | ||
340 | 410 | ||
341 | struct GNUNET_MessageHeader *msg; | 411 | switch (mst->tmit->state) |
342 | struct OperationHandle *op | 412 | { |
343 | = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size); | 413 | case MSG_STATE_MODIFIER: |
344 | op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; | 414 | { |
345 | msg->type | 415 | struct GNUNET_PSYC_MessageModifier *mod |
346 | = MSG_STATE_MODIFIER == mst->tmit->state | 416 | = (struct GNUNET_PSYC_MessageModifier *) msg; |
347 | ? htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER) | 417 | max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; |
348 | : htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); | 418 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); |
419 | msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); | ||
420 | notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls, | ||
421 | &data_size, &mod[1], &mod->oper); | ||
422 | mod->name_size = strnlen ((char *) &mod[1], data_size); | ||
423 | if (mod->name_size < data_size) | ||
424 | { | ||
425 | mod->oper = htons (mod->oper); | ||
426 | mod->value_size = htons (data_size - 1 - mod->name_size); | ||
427 | mod->name_size = htons (mod->name_size); | ||
428 | } | ||
429 | else if (0 < data_size) | ||
430 | { | ||
431 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n"); | ||
432 | notify_ret = GNUNET_SYSERR; | ||
433 | } | ||
434 | break; | ||
435 | } | ||
436 | case MSG_STATE_MOD_CONT: | ||
437 | { | ||
438 | max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; | ||
439 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); | ||
440 | msg->size = sizeof (struct GNUNET_MessageHeader); | ||
441 | notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls, | ||
442 | &data_size, &msg[1], NULL); | ||
443 | break; | ||
444 | } | ||
445 | default: | ||
446 | GNUNET_assert (0); | ||
447 | } | ||
349 | 448 | ||
350 | int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, | ||
351 | &data_size, &msg[1]); | ||
352 | switch (notify_ret) | 449 | switch (notify_ret) |
353 | { | 450 | { |
354 | case GNUNET_NO: | 451 | case GNUNET_NO: |
355 | if (0 != data_size) | 452 | if (0 == data_size) |
356 | mst->tmit->state = MSG_STATE_MOD_CONT; | 453 | { /* Transmission paused, nothing to send. */ |
454 | ch->tmit_paused = GNUNET_YES; | ||
455 | return; | ||
456 | } | ||
457 | mst->tmit->state = MSG_STATE_MOD_CONT; | ||
357 | break; | 458 | break; |
358 | 459 | ||
359 | case GNUNET_YES: | 460 | case GNUNET_YES: |
360 | mst->tmit->state = (0 == data_size) ? MSG_STATE_DATA : MSG_STATE_MODIFIER; | 461 | if (0 == data_size) |
462 | { | ||
463 | /* End of modifiers. */ | ||
464 | mst->tmit->state = MSG_STATE_DATA; | ||
465 | if (0 == ch->tmit_ack_pending) | ||
466 | master_transmit_data (mst); | ||
467 | |||
468 | return; | ||
469 | } | ||
470 | mst->tmit->state = MSG_STATE_MODIFIER; | ||
361 | break; | 471 | break; |
362 | 472 | ||
363 | default: | 473 | default: |
@@ -368,36 +478,18 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst) | |||
368 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | 478 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); |
369 | msg->size = htons (sizeof (*msg)); | 479 | msg->size = htons (sizeof (*msg)); |
370 | 480 | ||
371 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | 481 | queue_message (ch, msg, GNUNET_YES); |
372 | transmit_next (ch); | ||
373 | return; | 482 | return; |
374 | } | 483 | } |
375 | 484 | ||
376 | if ((GNUNET_NO == notify_ret && 0 == data_size)) | ||
377 | { | ||
378 | /* Transmission paused, nothing to send. */ | ||
379 | ch->tmit_paused = GNUNET_YES; | ||
380 | GNUNET_free (op); | ||
381 | } | ||
382 | |||
383 | if (0 < data_size) | 485 | if (0 < data_size) |
384 | { | 486 | { |
385 | GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); | 487 | GNUNET_assert (data_size <= max_data_size); |
386 | msg->size = htons (sizeof (*msg) + data_size); | 488 | msg->size = htons (msg->size + data_size); |
387 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | 489 | queue_message (ch, msg, GNUNET_NO); |
388 | } | ||
389 | |||
390 | /* End of message. */ | ||
391 | if (GNUNET_YES == notify_ret) | ||
392 | { | ||
393 | op = GNUNET_malloc (sizeof *(op) + sizeof (*msg)); | ||
394 | op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
395 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); | ||
396 | msg->size = htons (sizeof (*msg)); | ||
397 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
398 | } | 490 | } |
399 | 491 | ||
400 | transmit_next (ch); | 492 | master_transmit_mod (mst); |
401 | } | 493 | } |
402 | 494 | ||
403 | 495 | ||
@@ -410,11 +502,10 @@ static void | |||
410 | master_transmit_data (struct GNUNET_PSYC_Master *mst) | 502 | master_transmit_data (struct GNUNET_PSYC_Master *mst) |
411 | { | 503 | { |
412 | struct GNUNET_PSYC_Channel *ch = &mst->ch; | 504 | struct GNUNET_PSYC_Channel *ch = &mst->ch; |
413 | struct GNUNET_MessageHeader *msg; | ||
414 | uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; | 505 | uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; |
415 | struct OperationHandle *op | 506 | char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; |
416 | = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size); | 507 | struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; |
417 | op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; | 508 | |
418 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); | 509 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); |
419 | 510 | ||
420 | int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, | 511 | int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, |
@@ -426,7 +517,7 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) | |||
426 | { | 517 | { |
427 | /* Transmission paused, nothing to send. */ | 518 | /* Transmission paused, nothing to send. */ |
428 | ch->tmit_paused = GNUNET_YES; | 519 | ch->tmit_paused = GNUNET_YES; |
429 | GNUNET_free (op); | 520 | return; |
430 | } | 521 | } |
431 | break; | 522 | break; |
432 | 523 | ||
@@ -441,9 +532,7 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) | |||
441 | mst->tmit->state = MSG_STATE_START; | 532 | mst->tmit->state = MSG_STATE_START; |
442 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | 533 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); |
443 | msg->size = htons (sizeof (*msg)); | 534 | msg->size = htons (sizeof (*msg)); |
444 | 535 | queue_message (ch, msg, GNUNET_YES); | |
445 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
446 | transmit_next (ch); | ||
447 | return; | 536 | return; |
448 | } | 537 | } |
449 | 538 | ||
@@ -451,20 +540,16 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) | |||
451 | { | 540 | { |
452 | GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); | 541 | GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); |
453 | msg->size = htons (sizeof (*msg) + data_size); | 542 | msg->size = htons (sizeof (*msg) + data_size); |
454 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | 543 | queue_message (ch, msg, !notify_ret); |
455 | } | 544 | } |
456 | 545 | ||
457 | /* End of message. */ | 546 | /* End of message. */ |
458 | if (GNUNET_YES == notify_ret) | 547 | if (GNUNET_YES == notify_ret) |
459 | { | 548 | { |
460 | op = GNUNET_malloc (sizeof *(op) + sizeof (*msg)); | ||
461 | op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
462 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); | 549 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); |
463 | msg->size = htons (sizeof (*msg)); | 550 | msg->size = htons (sizeof (*msg)); |
464 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | 551 | queue_message (ch, msg, GNUNET_YES); |
465 | } | 552 | } |
466 | |||
467 | transmit_next (ch); | ||
468 | } | 553 | } |
469 | 554 | ||
470 | 555 | ||
@@ -476,57 +561,55 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst) | |||
476 | */ | 561 | */ |
477 | static void | 562 | static void |
478 | handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | 563 | handle_psyc_message (struct GNUNET_PSYC_Channel *ch, |
479 | const struct GNUNET_PSYC_MessageHeader *pmsg) | 564 | const struct GNUNET_PSYC_MessageHeader *msg) |
480 | { | 565 | { |
481 | const struct GNUNET_MessageHeader *msg; | 566 | uint16_t size = ntohs (msg->header.size); |
482 | uint16_t msize = ntohs (pmsg->header.size); | ||
483 | uint16_t pos = 0; | ||
484 | uint16_t size = 0; | ||
485 | uint16_t type, size_eq, size_min; | ||
486 | 567 | ||
487 | if (MSG_STATE_START == ch->recv_state) | 568 | if (MSG_STATE_START == ch->recv_state) |
488 | { | 569 | { |
489 | ch->recv_message_id = GNUNET_ntohll (pmsg->message_id); | 570 | ch->recv_message_id = GNUNET_ntohll (msg->message_id); |
490 | ch->recv_flags = ntohl (pmsg->flags); | 571 | ch->recv_flags = ntohl (msg->flags); |
491 | } | 572 | } |
492 | else if (GNUNET_ntohll (pmsg->message_id) != ch->recv_message_id) | 573 | else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) |
493 | { | 574 | { |
494 | LOG (GNUNET_ERROR_TYPE_WARNING, | 575 | LOG (GNUNET_ERROR_TYPE_WARNING, |
495 | "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", | 576 | "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", |
496 | GNUNET_ntohll (pmsg->message_id), ch->recv_message_id); | 577 | GNUNET_ntohll (msg->message_id), ch->recv_message_id); |
497 | GNUNET_break_op (0); | 578 | GNUNET_break_op (0); |
498 | recv_error (ch); | 579 | recv_error (ch); |
499 | } | 580 | } |
500 | else if (ntohl (pmsg->flags) != ch->recv_flags) | 581 | else if (ntohl (msg->flags) != ch->recv_flags) |
501 | { | 582 | { |
502 | LOG (GNUNET_ERROR_TYPE_WARNING, | 583 | LOG (GNUNET_ERROR_TYPE_WARNING, |
503 | "Unexpected message flags. Got: %lu, expected: %lu\n", | 584 | "Unexpected message flags. Got: %lu, expected: %lu\n", |
504 | ntohl (pmsg->flags), ch->recv_flags); | 585 | ntohl (msg->flags), ch->recv_flags); |
505 | GNUNET_break_op (0); | 586 | GNUNET_break_op (0); |
506 | recv_error (ch); | 587 | recv_error (ch); |
507 | } | 588 | } |
508 | 589 | ||
509 | for (pos = 0; sizeof (*pmsg) + pos < msize; pos += size) | 590 | uint16_t pos = 0, psize = 0, ptype, size_eq, size_min; |
591 | |||
592 | for (pos = 0; sizeof (*msg) + pos < size; pos += psize) | ||
510 | { | 593 | { |
511 | msg = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); | 594 | const struct GNUNET_MessageHeader *pmsg |
512 | size = ntohs (msg->size); | 595 | = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); |
513 | type = ntohs (msg->type); | 596 | psize = ntohs (pmsg->size); |
597 | ptype = ntohs (pmsg->type); | ||
514 | size_eq = size_min = 0; | 598 | size_eq = size_min = 0; |
515 | 599 | ||
516 | if (msize < sizeof (*pmsg) + pos + size) | 600 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
601 | "Received message part of type %u and size %u from PSYC.\n", | ||
602 | ptype, psize); | ||
603 | |||
604 | if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) | ||
517 | { | 605 | { |
518 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 606 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
519 | "Discarding message of type %u with invalid size. " | 607 | "Discarding message of type %u with invalid size %u.\n", |
520 | "(%u < %u + %u + %u)\n", ntohs (msg->type), | 608 | ptype, psize); |
521 | msize, sizeof (*msg), pos, size); | ||
522 | break; | 609 | break; |
523 | } | 610 | } |
524 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
525 | "Received message part of type %u and size %u from PSYC.\n", | ||
526 | ntohs (msg->type), size); | ||
527 | 611 | ||
528 | 612 | switch (ptype) | |
529 | switch (type) | ||
530 | { | 613 | { |
531 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | 614 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: |
532 | size_min = sizeof (struct GNUNET_PSYC_MessageMethod); | 615 | size_min = sizeof (struct GNUNET_PSYC_MessageMethod); |
@@ -534,6 +617,7 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
534 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | 617 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: |
535 | size_min = sizeof (struct GNUNET_PSYC_MessageModifier); | 618 | size_min = sizeof (struct GNUNET_PSYC_MessageModifier); |
536 | break; | 619 | break; |
620 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | ||
537 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | 621 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: |
538 | size_min = sizeof (struct GNUNET_MessageHeader); | 622 | size_min = sizeof (struct GNUNET_MessageHeader); |
539 | break; | 623 | break; |
@@ -543,22 +627,22 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
543 | break; | 627 | break; |
544 | } | 628 | } |
545 | 629 | ||
546 | if (! ((0 < size_eq && size == size_eq) | 630 | if (! ((0 < size_eq && psize == size_eq) |
547 | || (0 < size_min && size_min <= size))) | 631 | || (0 < size_min && size_min <= psize))) |
548 | { | 632 | { |
549 | GNUNET_break (0); | 633 | GNUNET_break (0); |
550 | reschedule_connect (ch); | 634 | reschedule_connect (ch); |
551 | return; | 635 | return; |
552 | } | 636 | } |
553 | 637 | ||
554 | switch (type) | 638 | switch (ptype) |
555 | { | 639 | { |
556 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | 640 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: |
557 | { | 641 | { |
558 | struct GNUNET_PSYC_MessageMethod *meth | 642 | struct GNUNET_PSYC_MessageMethod *meth |
559 | = (struct GNUNET_PSYC_MessageMethod *) msg; | 643 | = (struct GNUNET_PSYC_MessageMethod *) pmsg; |
560 | 644 | ||
561 | if (MSG_STATE_HEADER != ch->recv_state) | 645 | if (MSG_STATE_START != ch->recv_state) |
562 | { | 646 | { |
563 | LOG (GNUNET_ERROR_TYPE_WARNING, | 647 | LOG (GNUNET_ERROR_TYPE_WARNING, |
564 | "Discarding out of order message method.\n"); | 648 | "Discarding out of order message method.\n"); |
@@ -568,89 +652,66 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
568 | */ | 652 | */ |
569 | GNUNET_break_op (0); | 653 | GNUNET_break_op (0); |
570 | recv_error (ch); | 654 | recv_error (ch); |
571 | break; | 655 | return; |
572 | } | 656 | } |
573 | 657 | ||
574 | if ('\0' != (char *) meth + msg->size - 1) | 658 | if ('\0' != *((char *) meth + psize - 1)) |
575 | { | 659 | { |
576 | LOG (GNUNET_ERROR_TYPE_WARNING, | 660 | LOG (GNUNET_ERROR_TYPE_WARNING, |
577 | "Discarding message with malformed method. " | 661 | "Discarding message with malformed method. " |
578 | "Message ID: %" PRIu64 "\n", ch->recv_message_id); | 662 | "Message ID: %" PRIu64 "\n", ch->recv_message_id); |
579 | GNUNET_break_op (0); | 663 | GNUNET_break_op (0); |
580 | recv_error (ch); | 664 | recv_error (ch); |
581 | break; | 665 | return; |
582 | } | 666 | } |
583 | GNUNET_PSYC_MessageCallback message_cb | ||
584 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | ||
585 | ? ch->hist_message_cb | ||
586 | : ch->message_cb; | ||
587 | |||
588 | if (NULL != message_cb) | ||
589 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg); | ||
590 | |||
591 | ch->recv_state = MSG_STATE_METHOD; | 667 | ch->recv_state = MSG_STATE_METHOD; |
592 | break; | 668 | break; |
593 | } | 669 | } |
594 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | 670 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: |
595 | { | 671 | { |
596 | if (MSG_STATE_MODIFIER != ch->recv_state) | 672 | if (!(MSG_STATE_METHOD == ch->recv_state |
673 | || MSG_STATE_MODIFIER == ch->recv_state | ||
674 | || MSG_STATE_MOD_CONT == ch->recv_state)) | ||
597 | { | 675 | { |
598 | LOG (GNUNET_ERROR_TYPE_WARNING, | 676 | LOG (GNUNET_ERROR_TYPE_WARNING, |
599 | "Discarding out of order message modifier.\n"); | 677 | "Discarding out of order message modifier.\n"); |
600 | GNUNET_break_op (0); | 678 | GNUNET_break_op (0); |
601 | recv_error (ch); | 679 | recv_error (ch); |
602 | break; | 680 | return; |
603 | } | 681 | } |
604 | 682 | ||
605 | struct GNUNET_PSYC_MessageModifier *mod | 683 | struct GNUNET_PSYC_MessageModifier *mod |
606 | = (struct GNUNET_PSYC_MessageModifier *) msg; | 684 | = (struct GNUNET_PSYC_MessageModifier *) pmsg; |
607 | 685 | ||
608 | uint16_t name_size = ntohs (mod->name_size); | 686 | uint16_t name_size = ntohs (mod->name_size); |
609 | ch->recv_mod_value_size_expected = ntohs (mod->value_size); | 687 | ch->recv_mod_value_size_expected = ntohs (mod->value_size); |
610 | ch->recv_mod_value_size = size - sizeof (*mod) - name_size - 1; | 688 | ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1; |
611 | 689 | ||
612 | if (size < sizeof (*mod) + name_size + 1 | 690 | if (psize < sizeof (*mod) + name_size + 1 |
613 | || '\0' != (char *) &mod[1] + mod->name_size | 691 | || '\0' != *((char *) &mod[1] + name_size) |
614 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) | 692 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) |
615 | { | 693 | { |
616 | LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n"); | 694 | LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n"); |
617 | GNUNET_break_op (0); | 695 | GNUNET_break_op (0); |
618 | break; | 696 | return; |
619 | } | 697 | } |
620 | |||
621 | ch->recv_state = MSG_STATE_MODIFIER; | 698 | ch->recv_state = MSG_STATE_MODIFIER; |
622 | |||
623 | GNUNET_PSYC_MessageCallback message_cb | ||
624 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | ||
625 | ? ch->hist_message_cb | ||
626 | : ch->message_cb; | ||
627 | |||
628 | if (NULL != message_cb) | ||
629 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg); | ||
630 | |||
631 | break; | 699 | break; |
632 | } | 700 | } |
633 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | 701 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: |
634 | { | 702 | { |
635 | ch->recv_mod_value_size += size - sizeof (*msg); | 703 | ch->recv_mod_value_size += psize - sizeof (*pmsg); |
636 | 704 | ||
637 | if (MSG_STATE_MODIFIER != ch->recv_state | 705 | if (!(MSG_STATE_MODIFIER == ch->recv_state |
706 | || MSG_STATE_MOD_CONT == ch->recv_state) | ||
638 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) | 707 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) |
639 | { | 708 | { |
640 | LOG (GNUNET_ERROR_TYPE_WARNING, | 709 | LOG (GNUNET_ERROR_TYPE_WARNING, |
641 | "Discarding out of order message modifier continuation.\n"); | 710 | "Discarding out of order message modifier continuation.\n"); |
642 | GNUNET_break_op (0); | 711 | GNUNET_break_op (0); |
643 | recv_reset (ch); | 712 | recv_reset (ch); |
644 | break; | 713 | return; |
645 | } | 714 | } |
646 | |||
647 | GNUNET_PSYC_MessageCallback message_cb | ||
648 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | ||
649 | ? ch->hist_message_cb | ||
650 | : ch->message_cb; | ||
651 | |||
652 | if (NULL != message_cb) | ||
653 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg); | ||
654 | break; | 715 | break; |
655 | } | 716 | } |
656 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | 717 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: |
@@ -662,12 +723,23 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | |||
662 | "Discarding out of order message data fragment.\n"); | 723 | "Discarding out of order message data fragment.\n"); |
663 | GNUNET_break_op (0); | 724 | GNUNET_break_op (0); |
664 | recv_reset (ch); | 725 | recv_reset (ch); |
665 | break; | 726 | return; |
666 | } | 727 | } |
667 | |||
668 | ch->recv_state = MSG_STATE_DATA; | 728 | ch->recv_state = MSG_STATE_DATA; |
669 | break; | 729 | break; |
670 | } | 730 | } |
731 | } | ||
732 | |||
733 | GNUNET_PSYC_MessageCallback message_cb | ||
734 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | ||
735 | ? ch->hist_message_cb | ||
736 | : ch->message_cb; | ||
737 | |||
738 | if (NULL != message_cb) | ||
739 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg); | ||
740 | |||
741 | switch (ptype) | ||
742 | { | ||
671 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | 743 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: |
672 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | 744 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: |
673 | recv_reset (ch); | 745 | recv_reset (ch); |
@@ -717,18 +789,7 @@ message_handler (void *cls, | |||
717 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: | 789 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: |
718 | size_min = sizeof (struct GNUNET_PSYC_MessageHeader); | 790 | size_min = sizeof (struct GNUNET_PSYC_MessageHeader); |
719 | break; | 791 | break; |
720 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | 792 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: |
721 | size_min = sizeof (struct GNUNET_PSYC_MessageMethod); | ||
722 | break; | ||
723 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
724 | size_min = sizeof (struct GNUNET_PSYC_MessageModifier); | ||
725 | break; | ||
726 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
727 | size_min = sizeof (struct GNUNET_MessageHeader); | ||
728 | break; | ||
729 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
730 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | ||
731 | case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: | ||
732 | size_eq = sizeof (struct GNUNET_MessageHeader); | 793 | size_eq = sizeof (struct GNUNET_MessageHeader); |
733 | break; | 794 | break; |
734 | } | 795 | } |
@@ -761,9 +822,15 @@ message_handler (void *cls, | |||
761 | #endif | 822 | #endif |
762 | break; | 823 | break; |
763 | } | 824 | } |
764 | case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: | 825 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: |
765 | { | 826 | { |
766 | ch->tmit_ack_pending = GNUNET_NO; | 827 | if (0 == ch->tmit_ack_pending) |
828 | { | ||
829 | LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n"); | ||
830 | GNUNET_break (0); | ||
831 | break; | ||
832 | } | ||
833 | ch->tmit_ack_pending--; | ||
767 | 834 | ||
768 | if (ch->is_master) | 835 | if (ch->is_master) |
769 | { | 836 | { |
@@ -771,10 +838,6 @@ message_handler (void *cls, | |||
771 | switch (mst->tmit->state) | 838 | switch (mst->tmit->state) |
772 | { | 839 | { |
773 | case MSG_STATE_MODIFIER: | 840 | case MSG_STATE_MODIFIER: |
774 | if (GNUNET_NO == ch->tmit_paused) | ||
775 | master_transmit_mod (mst); | ||
776 | break; | ||
777 | |||
778 | case MSG_STATE_MOD_CONT: | 841 | case MSG_STATE_MOD_CONT: |
779 | if (GNUNET_NO == ch->tmit_paused) | 842 | if (GNUNET_NO == ch->tmit_paused) |
780 | master_transmit_mod (mst); | 843 | master_transmit_mod (mst); |
@@ -795,12 +858,13 @@ message_handler (void *cls, | |||
795 | else | 858 | else |
796 | { | 859 | { |
797 | LOG (GNUNET_ERROR_TYPE_WARNING, | 860 | LOG (GNUNET_ERROR_TYPE_WARNING, |
798 | "Ignoring transmit ack, there's no transmission going on.\n"); | 861 | "Ignoring message ACK, there's no transmission going on.\n"); |
862 | GNUNET_break (0); | ||
799 | } | 863 | } |
800 | break; | 864 | break; |
801 | default: | 865 | default: |
802 | LOG (GNUNET_ERROR_TYPE_WARNING, | 866 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
803 | "Ignoring unexpected transmit ack.\n"); | 867 | "Ignoring message ACK in state %u.\n", mst->tmit->state); |
804 | } | 868 | } |
805 | } | 869 | } |
806 | else | 870 | else |
@@ -811,12 +875,15 @@ message_handler (void *cls, | |||
811 | } | 875 | } |
812 | 876 | ||
813 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: | 877 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: |
814 | handle_psyc_message(ch, (const struct GNUNET_PSYC_MessageHeader *) msg); | 878 | handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg); |
815 | break; | 879 | break; |
816 | } | 880 | } |
817 | 881 | ||
818 | GNUNET_CLIENT_receive (ch->client, &message_handler, ch, | 882 | if (NULL != ch->client) |
819 | GNUNET_TIME_UNIT_FOREVER_REL); | 883 | { |
884 | GNUNET_CLIENT_receive (ch->client, &message_handler, ch, | ||
885 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
886 | } | ||
820 | } | 887 | } |
821 | 888 | ||
822 | 889 | ||
@@ -1029,6 +1096,8 @@ void | |||
1029 | GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) | 1096 | GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) |
1030 | { | 1097 | { |
1031 | disconnect (master); | 1098 | disconnect (master); |
1099 | if (NULL != master->tmit) | ||
1100 | GNUNET_free (master->tmit); | ||
1032 | GNUNET_free (master); | 1101 | GNUNET_free (master); |
1033 | } | 1102 | } |
1034 | 1103 | ||
@@ -1069,30 +1138,6 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, | |||
1069 | } | 1138 | } |
1070 | 1139 | ||
1071 | 1140 | ||
1072 | /* FIXME: split up value into <64K chunks and transmit the continuations in | ||
1073 | * MOD_CONT msgs */ | ||
1074 | static int | ||
1075 | send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod) | ||
1076 | { | ||
1077 | struct GNUNET_PSYC_Channel *ch = cls; | ||
1078 | size_t name_size = strlen (mod->name) + 1; | ||
1079 | struct GNUNET_PSYC_MessageModifier *pmod; | ||
1080 | struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmod) | ||
1081 | + name_size + mod->value_size); | ||
1082 | pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1]; | ||
1083 | op->msg = (struct GNUNET_MessageHeader *) pmod; | ||
1084 | |||
1085 | pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); | ||
1086 | pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size); | ||
1087 | pmod->name_size = htons (name_size); | ||
1088 | memcpy (&pmod[1], mod->name, name_size); | ||
1089 | memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size); | ||
1090 | |||
1091 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
1092 | return GNUNET_YES; | ||
1093 | } | ||
1094 | |||
1095 | |||
1096 | /** | 1141 | /** |
1097 | * Send a message to call a method to all members in the PSYC channel. | 1142 | * Send a message to call a method to all members in the PSYC channel. |
1098 | * | 1143 | * |
@@ -1107,7 +1152,7 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod) | |||
1107 | struct GNUNET_PSYC_MasterTransmitHandle * | 1152 | struct GNUNET_PSYC_MasterTransmitHandle * |
1108 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | 1153 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, |
1109 | const char *method_name, | 1154 | const char *method_name, |
1110 | GNUNET_PSYC_MasterTransmitNotify notify_mod, | 1155 | GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod, |
1111 | GNUNET_PSYC_MasterTransmitNotify notify_data, | 1156 | GNUNET_PSYC_MasterTransmitNotify notify_data, |
1112 | void *notify_cls, | 1157 | void *notify_cls, |
1113 | enum GNUNET_PSYC_MasterTransmitFlags flags) | 1158 | enum GNUNET_PSYC_MasterTransmitFlags flags) |
@@ -1120,25 +1165,27 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | |||
1120 | 1165 | ||
1121 | size_t size = strlen (method_name) + 1; | 1166 | size_t size = strlen (method_name) + 1; |
1122 | struct GNUNET_PSYC_MessageMethod *pmeth; | 1167 | struct GNUNET_PSYC_MessageMethod *pmeth; |
1123 | struct OperationHandle *op | 1168 | struct OperationHandle *op; |
1124 | = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth) + size); | ||
1125 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1]; | ||
1126 | op->msg = (struct GNUNET_MessageHeader *) pmeth; | ||
1127 | 1169 | ||
1170 | ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) | ||
1171 | + sizeof (*pmeth) + size); | ||
1172 | op->msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
1173 | op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size; | ||
1174 | |||
1175 | pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1]; | ||
1128 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); | 1176 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); |
1129 | pmeth->header.size = htons (sizeof (*pmeth) + size); | 1177 | pmeth->header.size = htons (sizeof (*pmeth) + size); |
1130 | pmeth->flags = htonl (flags); | 1178 | pmeth->flags = htonl (flags); |
1131 | memcpy (&pmeth[1], method_name, size); | 1179 | memcpy (&pmeth[1], method_name, size); |
1132 | 1180 | ||
1133 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
1134 | transmit_next (ch); | ||
1135 | |||
1136 | master->tmit = GNUNET_malloc (sizeof (*master->tmit)); | 1181 | master->tmit = GNUNET_malloc (sizeof (*master->tmit)); |
1137 | master->tmit->master = master; | 1182 | master->tmit->master = master; |
1138 | master->tmit->notify_mod = notify_mod; | 1183 | master->tmit->notify_mod = notify_mod; |
1139 | master->tmit->notify_data = notify_data; | 1184 | master->tmit->notify_data = notify_data; |
1140 | master->tmit->notify_cls = notify_cls; | 1185 | master->tmit->notify_cls = notify_cls; |
1141 | master->tmit->state = MSG_STATE_START; // FIXME | 1186 | master->tmit->state = MSG_STATE_MODIFIER; |
1187 | |||
1188 | master_transmit_mod (master); | ||
1142 | return master->tmit; | 1189 | return master->tmit; |
1143 | } | 1190 | } |
1144 | 1191 | ||
@@ -1152,7 +1199,7 @@ void | |||
1152 | GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) | 1199 | GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) |
1153 | { | 1200 | { |
1154 | struct GNUNET_PSYC_Channel *ch = &th->master->ch; | 1201 | struct GNUNET_PSYC_Channel *ch = &th->master->ch; |
1155 | if (GNUNET_NO == ch->tmit_ack_pending) | 1202 | if (0 == ch->tmit_ack_pending) |
1156 | { | 1203 | { |
1157 | ch->tmit_paused = GNUNET_NO; | 1204 | ch->tmit_paused = GNUNET_NO; |
1158 | master_transmit_data (th->master); | 1205 | master_transmit_data (th->master); |