aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/transport/transport_api_core.c27
-rw-r--r--src/util/client.c2
-rw-r--r--src/util/client_new.c49
-rw-r--r--src/util/mq.c135
4 files changed, 113 insertions, 100 deletions
diff --git a/src/transport/transport_api_core.c b/src/transport/transport_api_core.c
index f6ea43db9..de18a140c 100644
--- a/src/transport/transport_api_core.c
+++ b/src/transport/transport_api_core.c
@@ -354,6 +354,25 @@ handle_hello (void *cls,
354 * @param cls the `struct Neighbour` where the message was sent 354 * @param cls the `struct Neighbour` where the message was sent
355 */ 355 */
356static void 356static void
357notify_send_done_fin (void *cls)
358{
359 struct Neighbour *n = cls;
360
361 n->timeout_task = NULL;
362 n->is_ready = GNUNET_YES;
363 GNUNET_MQ_impl_send_continue (n->mq);
364}
365
366
367/**
368 * A message from the handler's message queue to a neighbour was
369 * transmitted. Now trigger (possibly delayed) notification of the
370 * neighbour's message queue that we are done and thus ready for
371 * the next message.
372 *
373 * @param cls the `struct Neighbour` where the message was sent
374 */
375static void
357notify_send_done (void *cls) 376notify_send_done (void *cls)
358{ 377{
359 struct Neighbour *n = cls; 378 struct Neighbour *n = cls;
@@ -364,8 +383,8 @@ notify_send_done (void *cls)
364 { 383 {
365 GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, 384 GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
366 n->env_size + n->traffic_overhead); 385 n->env_size + n->traffic_overhead);
367 n->traffic_overhead = 0;
368 n->env = NULL; 386 n->env = NULL;
387 n->traffic_overhead = 0;
369 } 388 }
370 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 389 delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
371 128); 390 128);
@@ -375,10 +394,11 @@ notify_send_done (void *cls)
375 GNUNET_MQ_impl_send_continue (n->mq); 394 GNUNET_MQ_impl_send_continue (n->mq);
376 return; 395 return;
377 } 396 }
397 GNUNET_MQ_impl_send_in_flight (n->mq);
378 /* cannot send even a small message without violating 398 /* cannot send even a small message without violating
379 quota, wait a before notifying MQ */ 399 quota, wait a before allowing MQ to send next message */
380 n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, 400 n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
381 &notify_send_done, 401 &notify_send_done_fin,
382 n); 402 n);
383} 403}
384 404
@@ -411,6 +431,7 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq,
411 GNUNET_MQ_impl_send_continue (mq); 431 GNUNET_MQ_impl_send_continue (mq);
412 return; 432 return;
413 } 433 }
434 GNUNET_assert (NULL == n->env);
414 n->env = GNUNET_MQ_msg_nested_mh (obm, 435 n->env = GNUNET_MQ_msg_nested_mh (obm,
415 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 436 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
416 msg); 437 msg);
diff --git a/src/util/client.c b/src/util/client.c
index f40d5e6eb..47db91c8e 100644
--- a/src/util/client.c
+++ b/src/util/client.c
@@ -375,7 +375,7 @@ do_connect (const char *service_name,
375 * @return the message queue, NULL on error 375 * @return the message queue, NULL on error
376 */ 376 */
377struct GNUNET_MQ_Handle * 377struct GNUNET_MQ_Handle *
378GNUNET_CLIENT_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg, 378GNUNET_CLIENT_connecTX (const struct GNUNET_CONFIGURATION_Handle *cfg,
379 const char *service_name, 379 const char *service_name,
380 const struct GNUNET_MQ_MessageHandler *handlers, 380 const struct GNUNET_MQ_MessageHandler *handlers,
381 GNUNET_MQ_ErrorHandler error_handler, 381 GNUNET_MQ_ErrorHandler error_handler,
diff --git a/src/util/client_new.c b/src/util/client_new.c
index 1e90470fb..593d3a268 100644
--- a/src/util/client_new.c
+++ b/src/util/client_new.c
@@ -213,10 +213,9 @@ start_connect (void *cls);
213static void 213static void
214connect_fail_continuation (struct ClientState *cstate) 214connect_fail_continuation (struct ClientState *cstate)
215{ 215{
216 LOG (GNUNET_ERROR_TYPE_INFO, 216 LOG (GNUNET_ERROR_TYPE_WARNING,
217 "Failed to establish TCP connection to `%s:%u', no further addresses to try.\n", 217 "Failed to establish connection to `%s', no further addresses to try.\n",
218 cstate->hostname, 218 cstate->service_name);
219 cstate->port);
220 GNUNET_break (NULL == cstate->ap_head); 219 GNUNET_break (NULL == cstate->ap_head);
221 GNUNET_break (NULL == cstate->ap_tail); 220 GNUNET_break (NULL == cstate->ap_tail);
222 GNUNET_break (NULL == cstate->dns_active); 221 GNUNET_break (NULL == cstate->dns_active);
@@ -245,6 +244,7 @@ transmit_ready (void *cls)
245 ssize_t ret; 244 ssize_t ret;
246 size_t len; 245 size_t len;
247 const char *pos; 246 const char *pos;
247 int notify_in_flight;
248 248
249 cstate->send_task = NULL; 249 cstate->send_task = NULL;
250 pos = (const char *) cstate->msg; 250 pos = (const char *) cstate->msg;
@@ -262,10 +262,7 @@ transmit_ready (void *cls)
262 GNUNET_MQ_ERROR_WRITE); 262 GNUNET_MQ_ERROR_WRITE);
263 return; 263 return;
264 } 264 }
265 if (0 == cstate->msg_off) 265 notify_in_flight = (0 == cstate->msg_off);
266 {
267 GNUNET_MQ_impl_send_in_flight (cstate->mq);
268 }
269 cstate->msg_off += ret; 266 cstate->msg_off += ret;
270 if (cstate->msg_off < len) 267 if (cstate->msg_off < len)
271 { 268 {
@@ -274,6 +271,8 @@ transmit_ready (void *cls)
274 cstate->sock, 271 cstate->sock,
275 &transmit_ready, 272 &transmit_ready,
276 cstate); 273 cstate);
274 if (notify_in_flight)
275 GNUNET_MQ_impl_send_in_flight (cstate->mq);
277 return; 276 return;
278 } 277 }
279 cstate->msg = NULL; 278 cstate->msg = NULL;
@@ -345,6 +344,7 @@ connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
345 { 344 {
346 /* defer destruction */ 345 /* defer destruction */
347 cstate->in_destroy = GNUNET_YES; 346 cstate->in_destroy = GNUNET_YES;
347 cstate->mq = NULL;
348 return; 348 return;
349 } 349 }
350 if (NULL != cstate->dns_active) 350 if (NULL != cstate->dns_active)
@@ -384,8 +384,12 @@ receive_ready (void *cls)
384 GNUNET_NO); 384 GNUNET_NO);
385 if (GNUNET_SYSERR == ret) 385 if (GNUNET_SYSERR == ret)
386 { 386 {
387 GNUNET_MQ_inject_error (cstate->mq, 387 if (NULL != cstate->mq)
388 GNUNET_MQ_ERROR_READ); 388 GNUNET_MQ_inject_error (cstate->mq,
389 GNUNET_MQ_ERROR_READ);
390 if (GNUNET_YES == cstate->in_destroy)
391 connection_client_destroy_impl (cstate->mq,
392 cstate);
389 return; 393 return;
390 } 394 }
391 if (GNUNET_YES == cstate->in_destroy) 395 if (GNUNET_YES == cstate->in_destroy)
@@ -723,16 +727,25 @@ start_connect (void *cls)
723#endif 727#endif
724 728
725 if ( (0 == (cstate->attempts++ % 2)) || 729 if ( (0 == (cstate->attempts++ % 2)) ||
726 (0 == cstate->port) ) 730 (0 == cstate->port) ||
731 (NULL == cstate->hostname) )
727 { 732 {
728 /* on even rounds, try UNIX first */ 733 /* on even rounds, try UNIX first, or always
734 if we do not have a DNS name and TCP port. */
729 cstate->sock = try_unixpath (cstate->service_name, 735 cstate->sock = try_unixpath (cstate->service_name,
730 cstate->cfg); 736 cstate->cfg);
731 if (NULL != cstate->sock) 737 if (NULL != cstate->sock)
732 { 738 {
733 connect_success_continuation (cstate); 739 connect_success_continuation (cstate);
734 return; 740 return;
735 } 741 }
742 }
743 if ( (NULL == cstate->hostname) ||
744 (0 == cstate->port) )
745 {
746 /* All options failed. Boo! */
747 connect_fail_continuation (cstate);
748 return;
736 } 749 }
737 cstate->dns_active 750 cstate->dns_active
738 = GNUNET_RESOLVER_ip_get (cstate->hostname, 751 = GNUNET_RESOLVER_ip_get (cstate->hostname,
@@ -807,11 +820,11 @@ connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
807 * @return the message queue, NULL on error 820 * @return the message queue, NULL on error
808 */ 821 */
809struct GNUNET_MQ_Handle * 822struct GNUNET_MQ_Handle *
810GNUNET_CLIENT_connecT2 (const struct GNUNET_CONFIGURATION_Handle *cfg, 823GNUNET_CLIENT_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg,
811 const char *service_name, 824 const char *service_name,
812 const struct GNUNET_MQ_MessageHandler *handlers, 825 const struct GNUNET_MQ_MessageHandler *handlers,
813 GNUNET_MQ_ErrorHandler error_handler, 826 GNUNET_MQ_ErrorHandler error_handler,
814 void *error_handler_cls) 827 void *error_handler_cls)
815{ 828{
816 struct ClientState *cstate; 829 struct ClientState *cstate;
817 830
diff --git a/src/util/mq.c b/src/util/mq.c
index 4ba6c5ff8..ba947d5b8 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -128,6 +128,11 @@ struct GNUNET_MQ_Handle
128 void *error_handler_cls; 128 void *error_handler_cls;
129 129
130 /** 130 /**
131 * Task to asynchronously run #impl_send_continue().
132 */
133 struct GNUNET_SCHEDULER_Task *send_task;
134
135 /**
131 * Linked list of messages pending to be sent 136 * Linked list of messages pending to be sent
132 */ 137 */
133 struct GNUNET_MQ_Envelope *envelope_head; 138 struct GNUNET_MQ_Envelope *envelope_head;
@@ -145,23 +150,11 @@ struct GNUNET_MQ_Handle
145 struct GNUNET_MQ_Envelope *current_envelope; 150 struct GNUNET_MQ_Envelope *current_envelope;
146 151
147 /** 152 /**
148 * GNUNET_YES if the sent notification was called
149 * for the current envelope.
150 */
151 int send_notification_called;
152
153 /**
154 * Map of associations, lazily allocated 153 * Map of associations, lazily allocated
155 */ 154 */
156 struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; 155 struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
157 156
158 /** 157 /**
159 * Task scheduled during #GNUNET_MQ_impl_send_continue
160 * or #GNUNET_MQ_impl_send_in_flight
161 */
162 struct GNUNET_SCHEDULER_Task *send_task;
163
164 /**
165 * Functions to call on queue destruction; kept in a DLL. 158 * Functions to call on queue destruction; kept in a DLL.
166 */ 159 */
167 struct GNUNET_MQ_DestroyNotificationHandle *dnh_head; 160 struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
@@ -196,9 +189,15 @@ struct GNUNET_MQ_Handle
196 unsigned int queue_length; 189 unsigned int queue_length;
197 190
198 /** 191 /**
199 * GNUNET_YES if GNUNET_MQ_impl_evacuate was called. 192 * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
193 * FIXME: is this dead?
200 */ 194 */
201 int evacuate_called; 195 int evacuate_called;
196
197 /**
198 * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
199 */
200 int in_flight;
202}; 201};
203 202
204 203
@@ -364,7 +363,7 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
364unsigned int 363unsigned int
365GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq) 364GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
366{ 365{
367 return mq->queue_length; 366 return mq->queue_length - (GNUNET_YES == mq->in_flight) ? 1 : 0;
368} 367}
369 368
370 369
@@ -385,7 +384,8 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
385 mq->queue_length++; 384 mq->queue_length++;
386 ev->parent_queue = mq; 385 ev->parent_queue = mq;
387 /* is the implementation busy? queue it! */ 386 /* is the implementation busy? queue it! */
388 if (NULL != mq->current_envelope) 387 if ( (NULL != mq->current_envelope) ||
388 (NULL != mq->send_task) )
389 { 389 {
390 GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, 390 GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
391 mq->envelope_tail, 391 mq->envelope_tail,
@@ -428,35 +428,6 @@ GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
428 428
429 429
430/** 430/**
431 * Task run to call the send notification for the next queued
432 * message, if any. Only useful for implementing message queues,
433 * results in undefined behavior if not used carefully.
434 *
435 * @param cls message queue to send the next message with
436 */
437static void
438impl_send_in_flight (void *cls)
439{
440 struct GNUNET_MQ_Handle *mq = cls;
441 struct GNUNET_MQ_Envelope *current_envelope;
442
443 mq->send_task = NULL;
444 /* call is only valid if we're actually currently sending
445 * a message */
446 current_envelope = mq->current_envelope;
447 GNUNET_assert (NULL != current_envelope);
448 /* can't call cancel from now on anymore */
449 current_envelope->parent_queue = NULL;
450 if ( (GNUNET_NO == mq->send_notification_called) &&
451 (NULL != current_envelope->sent_cb) )
452 {
453 current_envelope->sent_cb (current_envelope->sent_cls);
454 }
455 mq->send_notification_called = GNUNET_YES;
456}
457
458
459/**
460 * Task run to call the send implementation for the next queued 431 * Task run to call the send implementation for the next queued
461 * message, if any. Only useful for implementing message queues, 432 * message, if any. Only useful for implementing message queues,
462 * results in undefined behavior if not used carefully. 433 * results in undefined behavior if not used carefully.
@@ -467,32 +438,19 @@ static void
467impl_send_continue (void *cls) 438impl_send_continue (void *cls)
468{ 439{
469 struct GNUNET_MQ_Handle *mq = cls; 440 struct GNUNET_MQ_Handle *mq = cls;
470 struct GNUNET_MQ_Envelope *current_envelope; 441
471
472 mq->send_task = NULL; 442 mq->send_task = NULL;
473 /* call is only valid if we're actually currently sending 443 /* call is only valid if we're actually currently sending
474 * a message */ 444 * a message */
475 current_envelope = mq->current_envelope;
476 GNUNET_assert (NULL != current_envelope);
477 impl_send_in_flight (mq);
478 GNUNET_assert (0 < mq->queue_length);
479 mq->queue_length--;
480 if (NULL == mq->envelope_head) 445 if (NULL == mq->envelope_head)
481 { 446 return;
482 mq->current_envelope = NULL; 447 mq->current_envelope = mq->envelope_head;
483 } 448 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
484 else 449 mq->envelope_tail,
485 { 450 mq->current_envelope);
486 mq->current_envelope = mq->envelope_head; 451 mq->send_impl (mq,
487 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, 452 mq->current_envelope->mh,
488 mq->envelope_tail, 453 mq->impl_state);
489 mq->current_envelope);
490 mq->send_notification_called = GNUNET_NO;
491 mq->send_impl (mq,
492 mq->current_envelope->mh,
493 mq->impl_state);
494 }
495 GNUNET_free (current_envelope);
496} 454}
497 455
498 456
@@ -506,22 +464,32 @@ impl_send_continue (void *cls)
506void 464void
507GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) 465GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
508{ 466{
509 /* maybe #GNUNET_MQ_impl_send_in_flight was called? */ 467 struct GNUNET_MQ_Envelope *current_envelope;
510 if (NULL != mq->send_task) 468 GNUNET_MQ_NotifyCallback cb;
511 { 469
512 GNUNET_SCHEDULER_cancel (mq->send_task); 470 GNUNET_assert (0 < mq->queue_length);
513 } 471 mq->queue_length--;
472 current_envelope = mq->current_envelope;
473 current_envelope->parent_queue = NULL;
474 mq->current_envelope = NULL;
475 GNUNET_assert (NULL == mq->send_task);
514 mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, 476 mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
515 mq); 477 mq);
478 if (NULL != (cb = current_envelope->sent_cb))
479 {
480 current_envelope->sent_cb = NULL;
481 cb (current_envelope->sent_cls);
482 }
483 GNUNET_free (current_envelope);
516} 484}
517 485
518 486
519/** 487/**
520 * Call the send notification for the current message, but do not 488 * Call the send notification for the current message, but do not
521 * try to send the next message until #gnunet_mq_impl_send_continue 489 * try to send the next message until #GNUNET_MQ_impl_send_continue
522 * is called. 490 * is called.
523 * 491 *
524 * only useful for implementing message queues, results in undefined 492 * Only useful for implementing message queues, results in undefined
525 * behavior if not used carefully. 493 * behavior if not used carefully.
526 * 494 *
527 * @param mq message queue to send the next message with 495 * @param mq message queue to send the next message with
@@ -529,9 +497,21 @@ GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
529void 497void
530GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq) 498GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
531{ 499{
532 GNUNET_assert (NULL == mq->send_task); 500 struct GNUNET_MQ_Envelope *current_envelope;
533 mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_in_flight, 501 GNUNET_MQ_NotifyCallback cb;
534 mq); 502
503 mq->in_flight = GNUNET_YES;
504 /* call is only valid if we're actually currently sending
505 * a message */
506 current_envelope = mq->current_envelope;
507 GNUNET_assert (NULL != current_envelope);
508 /* can't call cancel from now on anymore */
509 current_envelope->parent_queue = NULL;
510 if (NULL != (cb = current_envelope->sent_cb))
511 {
512 current_envelope->sent_cb = NULL;
513 cb (current_envelope->sent_cls);
514 }
535} 515}
536 516
537 517
@@ -1187,7 +1167,6 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
1187 GNUNET_CONTAINER_DLL_remove (mq->envelope_head, 1167 GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
1188 mq->envelope_tail, 1168 mq->envelope_tail,
1189 mq->current_envelope); 1169 mq->current_envelope);
1190 mq->send_notification_called = GNUNET_NO;
1191 mq->send_impl (mq, 1170 mq->send_impl (mq,
1192 mq->current_envelope->mh, 1171 mq->current_envelope->mh,
1193 mq->impl_state); 1172 mq->impl_state);