aboutsummaryrefslogtreecommitdiff
path: root/src/core/core_api.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-06-19 21:29:20 +0000
committerChristian Grothoff <christian@grothoff.org>2016-06-19 21:29:20 +0000
commit8b1a351c049deec3226aa40a883b97c76916bea7 (patch)
treed507edb09031a9c9025ad5eb927fc1a9c36452d7 /src/core/core_api.c
parent80c4558f3742ce78f1f511ea13d8941c46b0e88b (diff)
downloadgnunet-8b1a351c049deec3226aa40a883b97c76916bea7.tar.gz
gnunet-8b1a351c049deec3226aa40a883b97c76916bea7.zip
refactoring core API to use new MQ lib
Diffstat (limited to 'src/core/core_api.c')
-rw-r--r--src/core/core_api.c1183
1 files changed, 423 insertions, 760 deletions
diff --git a/src/core/core_api.c b/src/core/core_api.c
index 7b423b6a0..af78ab4f9 100644
--- a/src/core/core_api.c
+++ b/src/core/core_api.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2009-2014 GNUnet e.V. 3 Copyright (C) 2009-2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -44,12 +44,6 @@ struct GNUNET_CORE_TransmitHandle
44 struct PeerRecord *peer; 44 struct PeerRecord *peer;
45 45
46 /** 46 /**
47 * Corresponding SEND_REQUEST message. Only non-NULL
48 * while SEND_REQUEST message is pending.
49 */
50 struct ControlMessage *cm;
51
52 /**
53 * Function that will be called to get the actual request 47 * Function that will be called to get the actual request
54 * (once we are ready to transmit this request to the core). 48 * (once we are ready to transmit this request to the core).
55 * The function will be called with a NULL buffer to signal 49 * The function will be called with a NULL buffer to signal
@@ -103,26 +97,12 @@ struct PeerRecord
103{ 97{
104 98
105 /** 99 /**
106 * We generally do NOT keep peer records in a DLL; this 100 * Corresponding CORE handle.
107 * DLL is only used IF this peer's 'pending_head' message
108 * is ready for transmission.
109 */
110 struct PeerRecord *prev;
111
112 /**
113 * We generally do NOT keep peer records in a DLL; this
114 * DLL is only used IF this peer's 'pending_head' message
115 * is ready for transmission.
116 */
117 struct PeerRecord *next;
118
119 /**
120 * Corresponding core handle.
121 */ 101 */
122 struct GNUNET_CORE_Handle *ch; 102 struct GNUNET_CORE_Handle *ch;
123 103
124 /** 104 /**
125 * Pending request, if any. 'th->peer' is set to NULL if the 105 * Pending request, if any. 'th->peer' is set to NULL if the
126 * request is not active. 106 * request is not active.
127 */ 107 */
128 struct GNUNET_CORE_TransmitHandle th; 108 struct GNUNET_CORE_TransmitHandle th;
@@ -133,11 +113,6 @@ struct PeerRecord
133 struct GNUNET_PeerIdentity peer; 113 struct GNUNET_PeerIdentity peer;
134 114
135 /** 115 /**
136 * ID of task to run #run_request_next_transmission().
137 */
138 struct GNUNET_SCHEDULER_Task *ntr_task;
139
140 /**
141 * SendMessageRequest ID generator for this peer. 116 * SendMessageRequest ID generator for this peer.
142 */ 117 */
143 uint16_t smr_id_gen; 118 uint16_t smr_id_gen;
@@ -146,58 +121,6 @@ struct PeerRecord
146 121
147 122
148/** 123/**
149 * Type of function called upon completion.
150 *
151 * @param cls closure
152 * @param success #GNUNET_OK on success (which for request_connect
153 * ONLY means that we transmitted the connect request to CORE,
154 * it does not mean that we are actually now connected!);
155 * #GNUNET_NO on timeout,
156 * #GNUNET_SYSERR if core was shut down
157 */
158typedef void
159(*GNUNET_CORE_ControlContinuation) (void *cls,
160 int success);
161
162
163/**
164 * Entry in a doubly-linked list of control messages to be transmitted
165 * to the core service. Control messages include traffic allocation,
166 * connection requests and of course our initial 'init' request.
167 *
168 * The actual message is allocated at the end of this struct.
169 */
170struct ControlMessage
171{
172 /**
173 * This is a doubly-linked list.
174 */
175 struct ControlMessage *next;
176
177 /**
178 * This is a doubly-linked list.
179 */
180 struct ControlMessage *prev;
181
182 /**
183 * Function to run after transmission failed/succeeded.
184 */
185 GNUNET_CORE_ControlContinuation cont;
186
187 /**
188 * Closure for @e cont.
189 */
190 void *cont_cls;
191
192 /**
193 * Transmit handle (if one is associated with this ControlMessage), or NULL.
194 */
195 struct GNUNET_CORE_TransmitHandle *th;
196};
197
198
199
200/**
201 * Context for the core service connection. 124 * Context for the core service connection.
202 */ 125 */
203struct GNUNET_CORE_Handle 126struct GNUNET_CORE_Handle
@@ -241,39 +164,12 @@ struct GNUNET_CORE_Handle
241 /** 164 /**
242 * Function handlers for messages of particular type. 165 * Function handlers for messages of particular type.
243 */ 166 */
244 const struct GNUNET_CORE_MessageHandler *handlers; 167 struct GNUNET_CORE_MessageHandler *handlers;
245
246 /**
247 * Our connection to the service.
248 */
249 struct GNUNET_CLIENT_Connection *client;
250
251 /**
252 * Handle for our current transmission request.
253 */
254 struct GNUNET_CLIENT_TransmitHandle *cth;
255
256 /**
257 * Head of doubly-linked list of pending requests.
258 */
259 struct ControlMessage *control_pending_head;
260
261 /**
262 * Tail of doubly-linked list of pending requests.
263 */
264 struct ControlMessage *control_pending_tail;
265
266 /**
267 * Head of doubly-linked list of peers that are core-approved
268 * to send their next message.
269 */
270 struct PeerRecord *ready_peer_head;
271 168
272 /** 169 /**
273 * Tail of doubly-linked list of peers that are core-approved 170 * Our message queue for transmissions to the service.
274 * to send their next message.
275 */ 171 */
276 struct PeerRecord *ready_peer_tail; 172 struct GNUNET_MQ_Handle *mq;
277 173
278 /** 174 /**
279 * Hash map listing all of the peers that we are currently 175 * Hash map listing all of the peers that we are currently
@@ -289,7 +185,7 @@ struct GNUNET_CORE_Handle
289 /** 185 /**
290 * ID of reconnect task (if any). 186 * ID of reconnect task (if any).
291 */ 187 */
292 struct GNUNET_SCHEDULER_Task * reconnect_task; 188 struct GNUNET_SCHEDULER_Task *reconnect_task;
293 189
294 /** 190 /**
295 * Current delay we use for re-trying to connect to core. 191 * Current delay we use for re-trying to connect to core.
@@ -351,8 +247,8 @@ reconnect_task (void *cls)
351 247
352 248
353/** 249/**
354 * Notify clients about disconnect and free 250 * Notify clients about disconnect and free the entry for connected
355 * the entry for connected peer. 251 * peer.
356 * 252 *
357 * @param cls the `struct GNUNET_CORE_Handle *` 253 * @param cls the `struct GNUNET_CORE_Handle *`
358 * @param key the peer identity (not used) 254 * @param key the peer identity (not used)
@@ -368,17 +264,6 @@ disconnect_and_free_peer_entry (void *cls,
368 struct GNUNET_CORE_TransmitHandle *th; 264 struct GNUNET_CORE_TransmitHandle *th;
369 struct PeerRecord *pr = value; 265 struct PeerRecord *pr = value;
370 266
371 if (NULL != pr->ntr_task)
372 {
373 GNUNET_SCHEDULER_cancel (pr->ntr_task);
374 pr->ntr_task = NULL;
375 }
376 if ( (NULL != pr->prev) ||
377 (NULL != pr->next) ||
378 (h->ready_peer_head == pr) )
379 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
380 h->ready_peer_tail,
381 pr);
382 if (NULL != h->disconnects) 267 if (NULL != h->disconnects)
383 h->disconnects (h->cls, 268 h->disconnects (h->cls,
384 &pr->peer); 269 &pr->peer);
@@ -388,14 +273,13 @@ disconnect_and_free_peer_entry (void *cls,
388 { 273 {
389 GNUNET_break (0); 274 GNUNET_break (0);
390 th->peer = NULL; 275 th->peer = NULL;
391 if (NULL != th->cm)
392 th->cm->th = NULL;
393 } 276 }
394 /* done with 'voluntary' cleanups, now on to normal freeing */ 277 /* done with 'voluntary' cleanups, now on to normal freeing */
395 GNUNET_assert (GNUNET_YES == 278 GNUNET_assert (GNUNET_YES ==
396 GNUNET_CONTAINER_multipeermap_remove (h->peers, key, pr)); 279 GNUNET_CONTAINER_multipeermap_remove (h->peers,
280 key,
281 pr));
397 GNUNET_assert (pr->ch == h); 282 GNUNET_assert (pr->ch == h);
398 GNUNET_assert (NULL == pr->ntr_task);
399 GNUNET_free (pr); 283 GNUNET_free (pr);
400 return GNUNET_YES; 284 return GNUNET_YES;
401} 285}
@@ -410,659 +294,490 @@ disconnect_and_free_peer_entry (void *cls,
410static void 294static void
411reconnect_later (struct GNUNET_CORE_Handle *h) 295reconnect_later (struct GNUNET_CORE_Handle *h)
412{ 296{
413 struct ControlMessage *cm;
414 struct PeerRecord *pr;
415
416 GNUNET_assert (NULL == h->reconnect_task); 297 GNUNET_assert (NULL == h->reconnect_task);
417 if (NULL != h->cth) 298 if (NULL != h->mq)
418 {
419 GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
420 h->cth = NULL;
421 }
422 if (NULL != h->client)
423 { 299 {
424 GNUNET_CLIENT_disconnect (h->client); 300 GNUNET_MQ_destroy (h->mq);
425 h->client = NULL; 301 h->mq = NULL;
426 } 302 }
427 h->currently_down = GNUNET_YES; 303 h->currently_down = GNUNET_YES;
428 GNUNET_assert (h->reconnect_task == NULL); 304 GNUNET_assert (h->reconnect_task == NULL);
429 h->reconnect_task = 305 h->reconnect_task =
430 GNUNET_SCHEDULER_add_delayed (h->retry_backoff, 306 GNUNET_SCHEDULER_add_delayed (h->retry_backoff,
431 &reconnect_task, h); 307 &reconnect_task,
432 while (NULL != (cm = h->control_pending_head)) 308 h);
433 {
434 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
435 h->control_pending_tail,
436 cm);
437 if (NULL != cm->th)
438 cm->th->cm = NULL;
439 if (NULL != cm->cont)
440 cm->cont (cm->cont_cls, GNUNET_NO);
441 GNUNET_free (cm);
442 }
443 GNUNET_CONTAINER_multipeermap_iterate (h->peers, 309 GNUNET_CONTAINER_multipeermap_iterate (h->peers,
444 &disconnect_and_free_peer_entry, h); 310 &disconnect_and_free_peer_entry,
445 while (NULL != (pr = h->ready_peer_head)) 311 h);
446 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
447 h->ready_peer_tail,
448 pr);
449 GNUNET_assert (NULL == h->control_pending_head);
450 h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff); 312 h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
451} 313}
452 314
453 315
454/** 316/**
455 * Check the list of pending requests, send the next 317 * Generic error handler, called with the appropriate error code and
456 * one to the core. 318 * the same closure specified at the creation of the message queue.
319 * Not every message queue implementation supports an error handler.
457 * 320 *
458 * @param h core handle 321 * @param cls closure, a `struct GNUNET_CORE_Handle *`
459 * @param ignore_currently_down transmit message even if not initialized? 322 * @param error error code
460 */ 323 */
461static void 324static void
462trigger_next_request (struct GNUNET_CORE_Handle *h, 325handle_mq_error (void *cls,
463 int ignore_currently_down); 326 enum GNUNET_MQ_Error error)
327{
328 struct GNUNET_CORE_Handle *h = cls;
329
330 reconnect_later (h);
331}
464 332
465 333
466/** 334/**
467 * Send a control message to the peer asking for transmission 335 * Handle init reply message received from CORE service. Notify
468 * of the message in the given peer record. 336 * application that we are now connected to the CORE. Also fake
337 * loopback connection.
469 * 338 *
470 * @param pr peer to request transmission to 339 * @param cls the `struct GNUNET_CORE_Handle`
340 * @param m the init reply
471 */ 341 */
472static void 342static void
473request_next_transmission (struct PeerRecord *pr) 343handle_init_reply (void *cls,
344 const struct InitReplyMessage *m)
474{ 345{
475 struct GNUNET_CORE_Handle *h = pr->ch; 346 struct GNUNET_CORE_Handle *h = cls;
476 struct ControlMessage *cm; 347 GNUNET_CORE_StartupCallback init;
477 struct SendMessageRequest *smr; 348 struct PeerRecord *pr;
478 struct GNUNET_CORE_TransmitHandle *th;
479 349
480 th = &pr->th; 350 GNUNET_break (0 == ntohl (m->reserved));
481 if (NULL == th->peer) 351 GNUNET_break (GNUNET_YES == h->currently_down);
352 h->currently_down = GNUNET_NO;
353 h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
354 if (NULL != (init = h->init))
482 { 355 {
483 trigger_next_request (h, GNUNET_NO); 356 /* mark so we don't call init on reconnect */
484 return; 357 h->init = NULL;
358 h->me = m->my_identity;
359 LOG (GNUNET_ERROR_TYPE_DEBUG,
360 "Connected to core service of peer `%s'.\n",
361 GNUNET_i2s (&h->me));
362 init (h->cls,
363 &h->me);
485 } 364 }
486 if (NULL != th->cm) 365 else
487 return; /* already done */ 366 {
488 GNUNET_assert (NULL == pr->prev); 367 LOG (GNUNET_ERROR_TYPE_DEBUG,
489 GNUNET_assert (NULL == pr->next); 368 "Successfully reconnected to core service.\n");
490 cm = GNUNET_malloc (sizeof (struct ControlMessage) + 369 GNUNET_break (0 == memcmp (&h->me,
491 sizeof (struct SendMessageRequest)); 370 &m->my_identity,
492 th->cm = cm; 371 sizeof (struct GNUNET_PeerIdentity)));
493 cm->th = th; 372 }
494 smr = (struct SendMessageRequest *) &cm[1]; 373 /* fake 'connect to self' */
495 smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST); 374 pr = GNUNET_new (struct PeerRecord);
496 smr->header.size = htons (sizeof (struct SendMessageRequest)); 375 pr->peer = h->me;
497 smr->priority = htonl ((uint32_t) th->priority); 376 pr->ch = h;
498 smr->deadline = GNUNET_TIME_absolute_hton (th->deadline); 377 GNUNET_assert (GNUNET_YES ==
499 smr->peer = pr->peer; 378 GNUNET_CONTAINER_multipeermap_put (h->peers,
500 smr->reserved = htonl (0); 379 &h->me,
501 smr->size = htons (th->msize); 380 pr,
502 smr->smr_id = htons (th->smr_id = pr->smr_id_gen++); 381 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
503 GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head, 382 if (NULL != h->connects)
504 h->control_pending_tail, cm); 383 h->connects (h->cls,
505 LOG (GNUNET_ERROR_TYPE_DEBUG, 384 &pr->peer);
506 "Adding SEND REQUEST for peer `%s' to message queue\n",
507 GNUNET_i2s (&pr->peer));
508 trigger_next_request (h, GNUNET_NO);
509} 385}
510 386
511 387
512/** 388/**
513 * Transmit the next message to the core service. 389 * Handle connect message received from CORE service.
390 * Notify the application about the new connection.
514 * 391 *
515 * @param cls closure with the `struct GNUNET_CORE_Handle` 392 * @param cls the `struct GNUNET_CORE_Handle`
516 * @param size number of bytes available in @a buf 393 * @param cnm the connect message
517 * @param buf where the callee should write the message
518 * @return number of bytes written to @a buf
519 */ 394 */
520static size_t 395static void
521transmit_message (void *cls, 396handle_connect_notify (void *cls,
522 size_t size, 397 const struct ConnectNotifyMessage * cnm)
523 void *buf)
524{ 398{
525 struct GNUNET_CORE_Handle *h = cls; 399 struct GNUNET_CORE_Handle *h = cls;
526 struct ControlMessage *cm;
527 struct GNUNET_CORE_TransmitHandle *th;
528 struct GNUNET_TIME_Relative delay;
529 struct GNUNET_TIME_Relative overdue;
530 struct PeerRecord *pr; 400 struct PeerRecord *pr;
531 struct SendMessage *sm;
532 const struct GNUNET_MessageHeader *hdr;
533 uint16_t msize;
534 size_t ret;
535 401
536 GNUNET_assert (h->reconnect_task == NULL); 402 GNUNET_break (GNUNET_NO == h->currently_down);
537 h->cth = NULL;
538 if (NULL == buf)
539 {
540 LOG (GNUNET_ERROR_TYPE_DEBUG,
541 "Transmission failed, initiating reconnect\n");
542 reconnect_later (h);
543 return 0;
544 }
545 /* first check for control messages */
546 if (NULL != (cm = h->control_pending_head))
547 {
548 hdr = (const struct GNUNET_MessageHeader *) &cm[1];
549 msize = ntohs (hdr->size);
550 if (size < msize)
551 {
552 trigger_next_request (h, GNUNET_NO);
553 return 0;
554 }
555 LOG (GNUNET_ERROR_TYPE_DEBUG,
556 "Transmitting control message with %u bytes of type %u to core.\n",
557 (unsigned int) msize,
558 (unsigned int) ntohs (hdr->type));
559 memcpy (buf, hdr, msize);
560 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
561 h->control_pending_tail, cm);
562 if (NULL != cm->th)
563 cm->th->cm = NULL;
564 if (NULL != cm->cont)
565 cm->cont (cm->cont_cls, GNUNET_OK);
566 GNUNET_free (cm);
567 trigger_next_request (h, GNUNET_NO);
568 return msize;
569 }
570 /* now check for 'ready' P2P messages */
571 if (NULL == (pr = h->ready_peer_head))
572 return 0;
573 GNUNET_assert (NULL != pr->th.peer);
574 th = &pr->th;
575 if (size < th->msize + sizeof (struct SendMessage))
576 {
577 trigger_next_request (h, GNUNET_NO);
578 return 0;
579 }
580 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
581 h->ready_peer_tail,
582 pr);
583 th->peer = NULL;
584 LOG (GNUNET_ERROR_TYPE_DEBUG, 403 LOG (GNUNET_ERROR_TYPE_DEBUG,
585 "Transmitting SEND request to `%s' with %u bytes.\n", 404 "Received notification about connection from `%s'.\n",
586 GNUNET_i2s (&pr->peer), 405 GNUNET_i2s (&cnm->peer));
587 (unsigned int) th->msize); 406 if (0 == memcmp (&h->me,
588 sm = (struct SendMessage *) buf; 407 &cnm->peer,
589 sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND); 408 sizeof (struct GNUNET_PeerIdentity)))
590 sm->priority = htonl ((uint32_t) th->priority);
591 sm->deadline = GNUNET_TIME_absolute_hton (th->deadline);
592 sm->peer = pr->peer;
593 sm->cork = htonl ((uint32_t) th->cork);
594 sm->reserved = htonl (0);
595 ret =
596 th->get_message (th->get_message_cls,
597 size - sizeof (struct SendMessage),
598 &sm[1]);
599 delay = GNUNET_TIME_absolute_get_duration (th->request_time);
600 overdue = GNUNET_TIME_absolute_get_duration (th->deadline);
601 if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
602 LOG (GNUNET_ERROR_TYPE_WARNING,
603 "Transmitting overdue %u bytes to `%s' at priority %u with %s delay%s\n",
604 ret,
605 GNUNET_i2s (&pr->peer),
606 (unsigned int) th->priority,
607 GNUNET_STRINGS_relative_time_to_string (delay,
608 GNUNET_YES),
609 (th->cork) ? " (corked)" : "");
610 else
611 LOG (GNUNET_ERROR_TYPE_DEBUG,
612 "Transmitting %u bytes to `%s' at priority %u with %s delay%s\n",
613 ret,
614 GNUNET_i2s (&pr->peer),
615 (unsigned int) th->priority,
616 GNUNET_STRINGS_relative_time_to_string (delay,
617 GNUNET_YES),
618 (th->cork) ? " (corked)" : "");
619 if ( (0 == ret) &&
620 (GNUNET_CORE_PRIO_BACKGROUND == th->priority) )
621 { 409 {
622 /* client decided to send nothing; as the priority was 410 /* connect to self!? */
623 BACKGROUND, we can just not send anything to core. 411 GNUNET_break (0);
624 For higher-priority messages, we must give an 412 return;
625 empty message to CORE so that it knows that this
626 message is no longer pending. */
627 LOG (GNUNET_ERROR_TYPE_DEBUG,
628 "Size of clients message to peer %s is 0!\n",
629 GNUNET_i2s (&pr->peer));
630 request_next_transmission (pr);
631 return 0;
632 } 413 }
633 LOG (GNUNET_ERROR_TYPE_DEBUG, 414 pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
634 "Produced SEND message to core with %u bytes payload\n", 415 &cnm->peer);
635 (unsigned int) ret); 416 if (NULL != pr)
636 if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
637 { 417 {
638 GNUNET_break (0); 418 GNUNET_break (0);
639 request_next_transmission (pr); 419 reconnect_later (h);
640 return 0; 420 return;
641 } 421 }
642 ret += sizeof (struct SendMessage); 422 pr = GNUNET_new (struct PeerRecord);
643 sm->header.size = htons (ret); 423 pr->peer = cnm->peer;
644 GNUNET_assert (ret <= size); 424 pr->ch = h;
645 request_next_transmission (pr); 425 GNUNET_assert (GNUNET_YES ==
646 return ret; 426 GNUNET_CONTAINER_multipeermap_put (h->peers,
427 &cnm->peer,
428 pr,
429 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
430 if (NULL != h->connects)
431 h->connects (h->cls,
432 &pr->peer);
647} 433}
648 434
649 435
650/** 436/**
651 * Check the list of pending requests, send the next one to the core. 437 * Handle disconnect message received from CORE service.
438 * Notify the application about the lost connection.
652 * 439 *
653 * @param h core handle 440 * @param cls the `struct GNUNET_CORE_Handle`
654 * @param ignore_currently_down transmit message even if not initialized? 441 * @param dnm message about the disconnect event
655 */ 442 */
656static void 443static void
657trigger_next_request (struct GNUNET_CORE_Handle *h, 444handle_disconnect_notify (void *cls,
658 int ignore_currently_down) 445 const struct DisconnectNotifyMessage * dnm)
659{ 446{
660 uint16_t msize; 447 struct GNUNET_CORE_Handle *h = cls;
448 struct PeerRecord *pr;
661 449
662 if ( (GNUNET_YES == h->currently_down) && 450 GNUNET_break (GNUNET_NO == h->currently_down);
663 (GNUNET_NO == ignore_currently_down) ) 451 if (0 == memcmp (&h->me,
452 &dnm->peer,
453 sizeof (struct GNUNET_PeerIdentity)))
664 { 454 {
665 LOG (GNUNET_ERROR_TYPE_DEBUG, 455 /* connection to self!? */
666 "Core connection down, not processing queue\n"); 456 GNUNET_break (0);
667 return; 457 return;
668 } 458 }
669 if (NULL != h->cth) 459 GNUNET_break (0 == ntohl (dnm->reserved));
460 LOG (GNUNET_ERROR_TYPE_DEBUG,
461 "Received notification about disconnect from `%s'.\n",
462 GNUNET_i2s (&dnm->peer));
463 pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
464 &dnm->peer);
465 if (NULL == pr)
670 { 466 {
671 LOG (GNUNET_ERROR_TYPE_DEBUG, 467 GNUNET_break (0);
672 "Request pending, not processing queue\n"); 468 reconnect_later (h);
673 return; 469 return;
674 } 470 }
675 if (NULL != h->control_pending_head) 471 disconnect_and_free_peer_entry (h,
676 msize = 472 &dnm->peer,
677 ntohs (((struct GNUNET_MessageHeader *) &h-> 473 pr);
678 control_pending_head[1])->size); 474}
679 else if (h->ready_peer_head != NULL) 475
680 msize = 476
681 h->ready_peer_head->th.msize + sizeof (struct SendMessage); 477/**
682 else 478 * Check that message received from CORE service is well-formed.
479 *
480 * @param cls the `struct GNUNET_CORE_Handle`
481 * @param ntm the message we got
482 * @return #GNUNET_OK if the message is well-formed
483 */
484static int
485check_notify_inbound (void *cls,
486 const struct NotifyTrafficMessage *ntm)
487{
488 struct GNUNET_CORE_Handle *h = cls;
489 uint16_t msize;
490 const struct GNUNET_MessageHeader *em;
491
492 GNUNET_break (GNUNET_NO == h->currently_down);
493 msize = ntohs (ntm->header.size) - sizeof (struct NotifyTrafficMessage);
494 if (msize < sizeof (struct GNUNET_MessageHeader))
683 { 495 {
684 LOG (GNUNET_ERROR_TYPE_DEBUG, 496 GNUNET_break (0);
685 "Request queue empty, not processing queue\n"); 497 return GNUNET_SYSERR;
686 return; /* no pending message */
687 } 498 }
688 h->cth = 499 em = (const struct GNUNET_MessageHeader *) &ntm[1];
689 GNUNET_CLIENT_notify_transmit_ready (h->client, msize, 500 if ( (GNUNET_NO == h->inbound_hdr_only) &&
690 GNUNET_TIME_UNIT_FOREVER_REL, 501 (msize != ntohs (em->size)) )
691 GNUNET_NO, 502 {
692 &transmit_message, h); 503 GNUNET_break (0);
504 return GNUNET_SYSERR;
505 }
506 return GNUNET_OK;
693} 507}
694 508
695 509
696/** 510/**
697 * Handler for notification messages received from the core. 511 * Handle inbound message received from CORE service. If applicable,
512 * notify the application.
698 * 513 *
699 * @param cls our `struct GNUNET_CORE_Handle` 514 * @param cls the `struct GNUNET_CORE_Handle`
700 * @param msg the message received from the core service 515 * @param ntm the message we got from CORE.
701 */ 516 */
702static void 517static void
703main_notify_handler (void *cls, 518handle_notify_inbound (void *cls,
704 const struct GNUNET_MessageHeader *msg) 519 const struct NotifyTrafficMessage *ntm)
705{ 520{
706 struct GNUNET_CORE_Handle *h = cls; 521 struct GNUNET_CORE_Handle *h = cls;
707 const struct InitReplyMessage *m;
708 const struct ConnectNotifyMessage *cnm;
709 const struct DisconnectNotifyMessage *dnm;
710 const struct NotifyTrafficMessage *ntm;
711 const struct GNUNET_MessageHeader *em; 522 const struct GNUNET_MessageHeader *em;
712 const struct SendMessageReady *smr;
713 const struct GNUNET_CORE_MessageHandler *mh;
714 GNUNET_CORE_StartupCallback init;
715 struct PeerRecord *pr; 523 struct PeerRecord *pr;
716 struct GNUNET_CORE_TransmitHandle *th;
717 unsigned int hpos;
718 int trigger;
719 uint16_t msize;
720 uint16_t et; 524 uint16_t et;
721 525
722 if (NULL == msg) 526 GNUNET_break (GNUNET_NO == h->currently_down);
723 {
724 LOG (GNUNET_ERROR_TYPE_INFO,
725 _("Client was disconnected from core service, trying to reconnect.\n"));
726 reconnect_later (h);
727 return;
728 }
729 msize = ntohs (msg->size);
730 LOG (GNUNET_ERROR_TYPE_DEBUG, 527 LOG (GNUNET_ERROR_TYPE_DEBUG,
731 "Processing message of type %u and size %u from core service\n", 528 "Received inbound message from `%s'.\n",
732 ntohs (msg->type), msize); 529 GNUNET_i2s (&ntm->peer));
733 switch (ntohs (msg->type)) 530 em = (const struct GNUNET_MessageHeader *) &ntm[1];
531 et = ntohs (em->type);
532 for (unsigned int hpos = 0; NULL != h->handlers[hpos].callback; hpos++)
734 { 533 {
735 case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY: 534 const struct GNUNET_CORE_MessageHandler *mh;
736 if (ntohs (msg->size) != sizeof (struct InitReplyMessage)) 535
737 { 536 mh = &h->handlers[hpos];
738 GNUNET_break (0); 537 if (mh->type != et)
739 reconnect_later (h); 538 continue;
740 return; 539 if ( (mh->expected_size != ntohs (em->size)) &&
741 } 540 (0 != mh->expected_size) )
742 m = (const struct InitReplyMessage *) msg;
743 GNUNET_break (0 == ntohl (m->reserved));
744 /* start our message processing loop */
745 if (GNUNET_YES == h->currently_down)
746 {
747 h->currently_down = GNUNET_NO;
748 trigger_next_request (h, GNUNET_NO);
749 }
750 h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
751 h->me = m->my_identity;
752 if (NULL != (init = h->init))
753 {
754 /* mark so we don't call init on reconnect */
755 h->init = NULL;
756 LOG (GNUNET_ERROR_TYPE_DEBUG,
757 "Connected to core service of peer `%s'.\n",
758 GNUNET_i2s (&h->me));
759 init (h->cls, &h->me);
760 }
761 else
762 {
763 LOG (GNUNET_ERROR_TYPE_DEBUG,
764 "Successfully reconnected to core service.\n");
765 }
766 /* fake 'connect to self' */
767 pr = GNUNET_CONTAINER_multipeermap_get (h->peers, &h->me);
768 GNUNET_assert (NULL == pr);
769 pr = GNUNET_new (struct PeerRecord);
770 pr->peer = h->me;
771 pr->ch = h;
772 GNUNET_assert (GNUNET_YES ==
773 GNUNET_CONTAINER_multipeermap_put (h->peers,
774 &h->me, pr,
775 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
776 if (NULL != h->connects)
777 h->connects (h->cls, &pr->peer);
778 break;
779 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT:
780 if (msize < sizeof (struct ConnectNotifyMessage))
781 {
782 GNUNET_break (0);
783 reconnect_later (h);
784 return;
785 }
786 cnm = (const struct ConnectNotifyMessage *) msg;
787 if (msize !=
788 sizeof (struct ConnectNotifyMessage))
789 {
790 GNUNET_break (0);
791 reconnect_later (h);
792 return;
793 }
794 LOG (GNUNET_ERROR_TYPE_DEBUG,
795 "Received notification about connection from `%s'.\n",
796 GNUNET_i2s (&cnm->peer));
797 if (0 == memcmp (&h->me,
798 &cnm->peer,
799 sizeof (struct GNUNET_PeerIdentity)))
800 {
801 /* connect to self!? */
802 GNUNET_break (0);
803 return;
804 }
805 pr = GNUNET_CONTAINER_multipeermap_get (h->peers, &cnm->peer);
806 if (NULL != pr)
807 {
808 GNUNET_break (0);
809 reconnect_later (h);
810 return;
811 }
812 pr = GNUNET_new (struct PeerRecord);
813 pr->peer = cnm->peer;
814 pr->ch = h;
815 GNUNET_assert (GNUNET_YES ==
816 GNUNET_CONTAINER_multipeermap_put (h->peers,
817 &cnm->peer, pr,
818 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
819 if (NULL != h->connects)
820 h->connects (h->cls, &pr->peer);
821 break;
822 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT:
823 if (msize != sizeof (struct DisconnectNotifyMessage))
824 {
825 GNUNET_break (0);
826 reconnect_later (h);
827 return;
828 }
829 dnm = (const struct DisconnectNotifyMessage *) msg;
830 if (0 == memcmp (&h->me,
831 &dnm->peer,
832 sizeof (struct GNUNET_PeerIdentity)))
833 {
834 /* connection to self!? */
835 GNUNET_break (0);
836 return;
837 }
838 GNUNET_break (0 == ntohl (dnm->reserved));
839 LOG (GNUNET_ERROR_TYPE_DEBUG,
840 "Received notification about disconnect from `%s'.\n",
841 GNUNET_i2s (&dnm->peer));
842 pr = GNUNET_CONTAINER_multipeermap_get (h->peers, &dnm->peer);
843 if (NULL == pr)
844 {
845 GNUNET_break (0);
846 reconnect_later (h);
847 return;
848 }
849 trigger = ((pr->prev != NULL) || (pr->next != NULL) ||
850 (h->ready_peer_head == pr));
851 disconnect_and_free_peer_entry (h, &dnm->peer, pr);
852 if (trigger)
853 trigger_next_request (h, GNUNET_NO);
854 break;
855 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND:
856 if (msize < sizeof (struct NotifyTrafficMessage))
857 {
858 GNUNET_break (0);
859 reconnect_later (h);
860 return;
861 }
862 ntm = (const struct NotifyTrafficMessage *) msg;
863 if ((msize <
864 sizeof (struct NotifyTrafficMessage) +
865 sizeof (struct GNUNET_MessageHeader)) )
866 {
867 GNUNET_break (0);
868 reconnect_later (h);
869 return;
870 }
871 em = (const struct GNUNET_MessageHeader *) &ntm[1];
872 LOG (GNUNET_ERROR_TYPE_DEBUG,
873 "Received message of type %u and size %u from peer `%s'\n",
874 ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer));
875 if ((GNUNET_NO == h->inbound_hdr_only) &&
876 (msize !=
877 ntohs (em->size) + sizeof (struct NotifyTrafficMessage)))
878 {
879 GNUNET_break (0);
880 reconnect_later (h);
881 return;
882 }
883 et = ntohs (em->type);
884 for (hpos = 0; hpos < h->hcnt; hpos++)
885 {
886 mh = &h->handlers[hpos];
887 if (mh->type != et)
888 continue;
889 if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0))
890 {
891 LOG (GNUNET_ERROR_TYPE_ERROR,
892 "Unexpected message size %u for message of type %u from peer `%s'\n",
893 htons (em->size), mh->type, GNUNET_i2s (&ntm->peer));
894 GNUNET_break_op (0);
895 continue;
896 }
897 pr = GNUNET_CONTAINER_multipeermap_get (h->peers, &ntm->peer);
898 if (NULL == pr)
899 {
900 GNUNET_break (0);
901 reconnect_later (h);
902 return;
903 }
904 if (GNUNET_OK !=
905 h->handlers[hpos].callback (h->cls, &ntm->peer, em))
906 {
907 /* error in processing, do not process other messages! */
908 break;
909 }
910 }
911 if (NULL != h->inbound_notify)
912 h->inbound_notify (h->cls, &ntm->peer, em);
913 break;
914 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND:
915 if (msize < sizeof (struct NotifyTrafficMessage))
916 {
917 GNUNET_break (0);
918 reconnect_later (h);
919 return;
920 }
921 ntm = (const struct NotifyTrafficMessage *) msg;
922 if ((msize <
923 sizeof (struct NotifyTrafficMessage) +
924 sizeof (struct GNUNET_MessageHeader)) )
925 {
926 GNUNET_break (0);
927 reconnect_later (h);
928 return;
929 }
930 em = (const struct GNUNET_MessageHeader *) &ntm[1];
931 LOG (GNUNET_ERROR_TYPE_DEBUG,
932 "Received notification about transmission to `%s'.\n",
933 GNUNET_i2s (&ntm->peer));
934 if ((GNUNET_NO == h->outbound_hdr_only) &&
935 (msize !=
936 ntohs (em->size) + sizeof (struct NotifyTrafficMessage)))
937 {
938 GNUNET_break (0);
939 reconnect_later (h);
940 return;
941 }
942 if (NULL == h->outbound_notify)
943 {
944 GNUNET_break (0);
945 break;
946 }
947 h->outbound_notify (h->cls, &ntm->peer, em);
948 break;
949 case GNUNET_MESSAGE_TYPE_CORE_SEND_READY:
950 if (msize != sizeof (struct SendMessageReady))
951 { 541 {
952 GNUNET_break (0); 542 LOG (GNUNET_ERROR_TYPE_ERROR,
953 reconnect_later (h); 543 "Unexpected message size %u for message of type %u from peer `%s'\n",
954 return; 544 htons (em->size),
545 mh->type,
546 GNUNET_i2s (&ntm->peer));
547 GNUNET_break_op (0);
548 continue;
955 } 549 }
956 smr = (const struct SendMessageReady *) msg;
957 pr = GNUNET_CONTAINER_multipeermap_get (h->peers, 550 pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
958 &smr->peer); 551 &ntm->peer);
959 if (NULL == pr) 552 if (NULL == pr)
960 { 553 {
961 GNUNET_break (0); 554 GNUNET_break (0);
962 reconnect_later (h); 555 reconnect_later (h);
963 return; 556 return;
964 } 557 }
965 LOG (GNUNET_ERROR_TYPE_DEBUG, 558 if (GNUNET_OK !=
966 "Received notification about transmission readiness to `%s'.\n", 559 h->handlers[hpos].callback (h->cls,
967 GNUNET_i2s (&smr->peer)); 560 &ntm->peer,
968 if (NULL == pr->th.peer) 561 em))
969 { 562 {
970 /* request must have been cancelled between the original request 563 /* error in processing, do not process other messages! */
971 * and the response from core, ignore core's readiness */
972 break; 564 break;
973 } 565 }
566 }
567 if (NULL != h->inbound_notify)
568 h->inbound_notify (h->cls,
569 &ntm->peer,
570 em);
571}
974 572
975 th = &pr->th; 573
976 if (ntohs (smr->smr_id) != th->smr_id) 574/**
977 { 575 * Check that message received from CORE service is well-formed.
978 /* READY message is for expired or cancelled message, 576 *
979 * ignore! (we should have already sent another request) */ 577 * @param cls the `struct GNUNET_CORE_Handle`
980 break; 578 * @param ntm the message we got
981 } 579 * @return #GNUNET_OK if the message is well-formed
982 if ( (NULL != pr->prev) || 580 */
983 (NULL != pr->next) || 581static int
984 (h->ready_peer_head == pr) ) 582check_notify_outbound (void *cls,
985 { 583 const struct NotifyTrafficMessage *ntm)
986 /* we should not already be on the ready list... */ 584{
987 GNUNET_break (0); 585 struct GNUNET_CORE_Handle *h = cls;
988 reconnect_later (h); 586 uint16_t msize;
989 return; 587 const struct GNUNET_MessageHeader *em;
990 } 588
991 GNUNET_CONTAINER_DLL_insert (h->ready_peer_head, 589 GNUNET_break (GNUNET_NO == h->currently_down);
992 h->ready_peer_tail, 590 LOG (GNUNET_ERROR_TYPE_DEBUG,
993 pr); 591 "Received outbound message from `%s'.\n",
994 trigger_next_request (h, GNUNET_NO); 592 GNUNET_i2s (&ntm->peer));
995 break; 593 msize = ntohs (ntm->header.size) - sizeof (struct NotifyTrafficMessage);
996 default: 594 if (msize < sizeof (struct GNUNET_MessageHeader))
997 reconnect_later (h); 595 {
596 GNUNET_break (0);
597 return GNUNET_SYSERR;
598 }
599 em = (const struct GNUNET_MessageHeader *) &ntm[1];
600 if ( (GNUNET_NO == h->outbound_hdr_only) &&
601 (msize != ntohs (em->size)) )
602 {
603 GNUNET_break (0);
604 return GNUNET_SYSERR;
605 }
606 return GNUNET_OK;
607}
608
609
610/**
611 * Handle outbound message received from CORE service. If applicable,
612 * notify the application.
613 *
614 * @param cls the `struct GNUNET_CORE_Handle`
615 * @param ntm the message we got
616 */
617static void
618handle_notify_outbound (void *cls,
619 const struct NotifyTrafficMessage *ntm)
620{
621 struct GNUNET_CORE_Handle *h = cls;
622 const struct GNUNET_MessageHeader *em;
623
624 GNUNET_break (GNUNET_NO == h->currently_down);
625 em = (const struct GNUNET_MessageHeader *) &ntm[1];
626 LOG (GNUNET_ERROR_TYPE_DEBUG,
627 "Received notification about transmission to `%s'.\n",
628 GNUNET_i2s (&ntm->peer));
629 if (NULL == h->outbound_notify)
630 {
631 GNUNET_break (0);
998 return; 632 return;
999 } 633 }
1000 GNUNET_CLIENT_receive (h->client, 634 h->outbound_notify (h->cls,
1001 &main_notify_handler, h, 635 &ntm->peer,
1002 GNUNET_TIME_UNIT_FOREVER_REL); 636 em);
1003} 637}
1004 638
1005 639
1006/** 640/**
1007 * Task executed once we are done transmitting the INIT message. 641 * Handle message received from CORE service notifying us that we are
1008 * Starts our 'receive' loop. 642 * now allowed to send a message to a peer. If that message is still
643 * pending, put it into the queue to be transmitted.
1009 * 644 *
1010 * @param cls the 'struct GNUNET_CORE_Handle' 645 * @param cls the `struct GNUNET_CORE_Handle`
1011 * @param success were we successful 646 * @param ntm the message we got
1012 */ 647 */
1013static void 648static void
1014init_done_task (void *cls, int success) 649handle_send_ready (void *cls,
650 const struct SendMessageReady *smr)
1015{ 651{
1016 struct GNUNET_CORE_Handle *h = cls; 652 struct GNUNET_CORE_Handle *h = cls;
653 struct PeerRecord *pr;
654 struct GNUNET_CORE_TransmitHandle *th;
655 struct SendMessage *sm;
656 struct GNUNET_MQ_Envelope *env;
657 struct GNUNET_TIME_Relative delay;
658 struct GNUNET_TIME_Relative overdue;
659 unsigned int ret;
1017 660
1018 if (GNUNET_SYSERR == success) 661 GNUNET_break (GNUNET_NO == h->currently_down);
1019 return; /* shutdown */ 662 pr = GNUNET_CONTAINER_multipeermap_get (h->peers,
1020 if (GNUNET_NO == success) 663 &smr->peer);
664 if (NULL == pr)
1021 { 665 {
1022 LOG (GNUNET_ERROR_TYPE_DEBUG, 666 GNUNET_break (0);
1023 "Failed to exchange INIT with core, retrying\n"); 667 reconnect_later (h);
1024 if (h->reconnect_task == NULL) 668 return;
1025 reconnect_later (h); 669 }
670 LOG (GNUNET_ERROR_TYPE_DEBUG,
671 "Received notification about transmission readiness to `%s'.\n",
672 GNUNET_i2s (&smr->peer));
673 if (NULL == pr->th.peer)
674 {
675 /* request must have been cancelled between the original request
676 * and the response from CORE, ignore CORE's readiness */
1026 return; 677 return;
1027 } 678 }
1028 GNUNET_CLIENT_receive (h->client, 679 th = &pr->th;
1029 &main_notify_handler, h, 680 if (ntohs (smr->smr_id) != th->smr_id)
1030 GNUNET_TIME_UNIT_FOREVER_REL); 681 {
682 /* READY message is for expired or cancelled message,
683 * ignore! (we should have already sent another request) */
684 return;
685 }
686 /* ok, all good, send message out! */
687 th->peer = NULL;
688 env = GNUNET_MQ_msg_extra (sm,
689 th->msize,
690 GNUNET_MESSAGE_TYPE_CORE_SEND);
691 sm->priority = htonl ((uint32_t) th->priority);
692 sm->deadline = GNUNET_TIME_absolute_hton (th->deadline);
693 sm->peer = pr->peer;
694 sm->cork = htonl ((uint32_t) th->cork);
695 sm->reserved = htonl (0);
696 ret = th->get_message (th->get_message_cls,
697 th->msize,
698 &sm[1]);
699 GNUNET_assert (ret == th->msize); /* NOTE: API change! */
700 delay = GNUNET_TIME_absolute_get_duration (th->request_time);
701 overdue = GNUNET_TIME_absolute_get_duration (th->deadline);
702 if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
703 LOG (GNUNET_ERROR_TYPE_WARNING,
704 "Transmitting overdue %u bytes to `%s' at priority %u with %s delay %s\n",
705 ret,
706 GNUNET_i2s (&pr->peer),
707 (unsigned int) th->priority,
708 GNUNET_STRINGS_relative_time_to_string (delay,
709 GNUNET_YES),
710 (th->cork) ? " (corked)" : "");
711 else
712 LOG (GNUNET_ERROR_TYPE_DEBUG,
713 "Transmitting %u bytes to `%s' at priority %u with %s delay %s\n",
714 ret,
715 GNUNET_i2s (&pr->peer),
716 (unsigned int) th->priority,
717 GNUNET_STRINGS_relative_time_to_string (delay,
718 GNUNET_YES),
719 (th->cork) ? " (corked)" : "");
720 GNUNET_MQ_send (h->mq,
721 env);
1031} 722}
1032 723
1033 724
1034/** 725/**
1035 * Our current client connection went down. Clean it up 726 * Our current client connection went down. Clean it up and try to
1036 * and try to reconnect! 727 * reconnect!
1037 * 728 *
1038 * @param h our handle to the core service 729 * @param h our handle to the core service
1039 */ 730 */
1040static void 731static void
1041reconnect (struct GNUNET_CORE_Handle *h) 732reconnect (struct GNUNET_CORE_Handle *h)
1042{ 733{
1043 struct ControlMessage *cm; 734 GNUNET_MQ_hd_fixed_size (init_reply,
735 GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY,
736 struct InitReplyMessage);
737 GNUNET_MQ_hd_fixed_size (connect_notify,
738 GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT,
739 struct ConnectNotifyMessage);
740 GNUNET_MQ_hd_fixed_size (disconnect_notify,
741 GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT,
742 struct DisconnectNotifyMessage);
743 GNUNET_MQ_hd_var_size (notify_inbound,
744 GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND,
745 struct NotifyTrafficMessage);
746 GNUNET_MQ_hd_var_size (notify_outbound,
747 GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND,
748 struct NotifyTrafficMessage);
749 GNUNET_MQ_hd_fixed_size (send_ready,
750 GNUNET_MESSAGE_TYPE_CORE_SEND_READY,
751 struct SendMessageReady);
752 struct GNUNET_MQ_MessageHandler handlers[] = {
753 make_init_reply_handler (h),
754 make_connect_notify_handler (h),
755 make_disconnect_notify_handler (h),
756 make_notify_inbound_handler (h),
757 make_notify_outbound_handler (h),
758 make_send_ready_handler (h),
759 GNUNET_MQ_handler_end ()
760 };
1044 struct InitMessage *init; 761 struct InitMessage *init;
762 struct GNUNET_MQ_Envelope *env;
1045 uint32_t opt; 763 uint32_t opt;
1046 uint16_t msize;
1047 uint16_t *ts; 764 uint16_t *ts;
1048 unsigned int hpos;
1049 765
1050 GNUNET_assert (NULL == h->client); 766 GNUNET_assert (NULL == h->mq);
1051 GNUNET_assert (GNUNET_YES == h->currently_down); 767 GNUNET_assert (GNUNET_YES == h->currently_down);
1052 GNUNET_assert (NULL != h->cfg); 768 h->mq = GNUNET_CLIENT_connecT (h->cfg,
1053 h->client = GNUNET_CLIENT_connect ("core", h->cfg); 769 "core",
1054 if (NULL == h->client) 770 handlers,
771 &handle_mq_error,
772 h);
773 if (NULL == h->mq)
1055 { 774 {
1056 reconnect_later (h); 775 reconnect_later (h);
1057 return; 776 return;
1058 } 777 }
1059 msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage); 778 env = GNUNET_MQ_msg_extra (init,
1060 cm = GNUNET_malloc (sizeof (struct ControlMessage) + msize); 779 sizeof (uint16_t) * h->hcnt,
1061 cm->cont = &init_done_task; 780 GNUNET_MESSAGE_TYPE_CORE_INIT);
1062 cm->cont_cls = h;
1063 init = (struct InitMessage *) &cm[1];
1064 init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT);
1065 init->header.size = htons (msize);
1066 opt = 0; 781 opt = 0;
1067 if (NULL != h->inbound_notify) 782 if (NULL != h->inbound_notify)
1068 { 783 {
@@ -1081,22 +796,18 @@ reconnect (struct GNUNET_CORE_Handle *h)
1081 LOG (GNUNET_ERROR_TYPE_INFO, 796 LOG (GNUNET_ERROR_TYPE_INFO,
1082 "(Re)connecting to CORE service, monitoring messages of type %u\n", 797 "(Re)connecting to CORE service, monitoring messages of type %u\n",
1083 opt); 798 opt);
1084
1085 init->options = htonl (opt); 799 init->options = htonl (opt);
1086 ts = (uint16_t *) & init[1]; 800 ts = (uint16_t *) &init[1];
1087 for (hpos = 0; hpos < h->hcnt; hpos++) 801 for (unsigned int hpos = 0; hpos < h->hcnt; hpos++)
1088 ts[hpos] = htons (h->handlers[hpos].type); 802 ts[hpos] = htons (h->handlers[hpos].type);
1089 GNUNET_CONTAINER_DLL_insert (h->control_pending_head, 803 GNUNET_MQ_send (h->mq,
1090 h->control_pending_tail, 804 env);
1091 cm);
1092 trigger_next_request (h, GNUNET_YES);
1093} 805}
1094 806
1095 807
1096
1097/** 808/**
1098 * Connect to the core service. Note that the connection may 809 * Connect to the core service. Note that the connection may complete
1099 * complete (or fail) asynchronously. 810 * (or fail) asynchronously.
1100 * 811 *
1101 * @param cfg configuration to use 812 * @param cfg configuration to use
1102 * @param cls closure for the various callbacks that follow (including handlers in the handlers array) 813 * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
@@ -1129,8 +840,8 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1129 const struct GNUNET_CORE_MessageHandler *handlers) 840 const struct GNUNET_CORE_MessageHandler *handlers)
1130{ 841{
1131 struct GNUNET_CORE_Handle *h; 842 struct GNUNET_CORE_Handle *h;
843 unsigned int hcnt;
1132 844
1133 GNUNET_assert (NULL != cfg);
1134 h = GNUNET_new (struct GNUNET_CORE_Handle); 845 h = GNUNET_new (struct GNUNET_CORE_Handle);
1135 h->cfg = cfg; 846 h->cfg = cfg;
1136 h->cls = cls; 847 h->cls = cls;
@@ -1141,14 +852,20 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1141 h->outbound_notify = outbound_notify; 852 h->outbound_notify = outbound_notify;
1142 h->inbound_hdr_only = inbound_hdr_only; 853 h->inbound_hdr_only = inbound_hdr_only;
1143 h->outbound_hdr_only = outbound_hdr_only; 854 h->outbound_hdr_only = outbound_hdr_only;
1144 h->handlers = handlers;
1145 h->hcnt = 0;
1146 h->currently_down = GNUNET_YES; 855 h->currently_down = GNUNET_YES;
1147 h->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO); 856 h->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO);
857 hcnt = 0;
858 if (NULL != handlers)
859 while (NULL != handlers[hcnt].callback)
860 hcnt++;
861 h->handlers = GNUNET_new_array (hcnt + 1,
862 struct GNUNET_CORE_MessageHandler);
1148 if (NULL != handlers) 863 if (NULL != handlers)
1149 while (NULL != handlers[h->hcnt].callback) 864 memcpy (h->handlers,
1150 h->hcnt++; 865 handlers,
1151 GNUNET_assert (h->hcnt < 866 hcnt * sizeof (struct GNUNET_CORE_MessageHandler));
867 h->hcnt = hcnt;
868 GNUNET_assert (hcnt <
1152 (GNUNET_SERVER_MAX_MESSAGE_SIZE - 869 (GNUNET_SERVER_MAX_MESSAGE_SIZE -
1153 sizeof (struct InitMessage)) / sizeof (uint16_t)); 870 sizeof (struct InitMessage)) / sizeof (uint16_t));
1154 LOG (GNUNET_ERROR_TYPE_DEBUG, 871 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1168,63 +885,29 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1168void 885void
1169GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle) 886GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
1170{ 887{
1171 struct ControlMessage *cm;
1172
1173 GNUNET_assert (NULL != handle);
1174 LOG (GNUNET_ERROR_TYPE_DEBUG, 888 LOG (GNUNET_ERROR_TYPE_DEBUG,
1175 "Disconnecting from CORE service\n"); 889 "Disconnecting from CORE service\n");
1176 if (NULL != handle->cth)
1177 {
1178 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth);
1179 handle->cth = NULL;
1180 }
1181 while (NULL != (cm = handle->control_pending_head))
1182 {
1183 GNUNET_CONTAINER_DLL_remove (handle->control_pending_head,
1184 handle->control_pending_tail,
1185 cm);
1186 if (NULL != cm->th)
1187 cm->th->cm = NULL;
1188 if (NULL != cm->cont)
1189 cm->cont (cm->cont_cls, GNUNET_SYSERR);
1190 GNUNET_free (cm);
1191 }
1192 if (NULL != handle->client)
1193 {
1194 GNUNET_CLIENT_disconnect (handle->client);
1195 handle->client = NULL;
1196 }
1197 GNUNET_CONTAINER_multipeermap_iterate (handle->peers, 890 GNUNET_CONTAINER_multipeermap_iterate (handle->peers,
1198 &disconnect_and_free_peer_entry, 891 &disconnect_and_free_peer_entry,
1199 handle); 892 handle);
893 GNUNET_CONTAINER_multipeermap_destroy (handle->peers);
894 handle->peers = NULL;
1200 if (NULL != handle->reconnect_task) 895 if (NULL != handle->reconnect_task)
1201 { 896 {
1202 GNUNET_SCHEDULER_cancel (handle->reconnect_task); 897 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1203 handle->reconnect_task = NULL; 898 handle->reconnect_task = NULL;
1204 } 899 }
1205 GNUNET_CONTAINER_multipeermap_destroy (handle->peers); 900 if (NULL != handle->mq)
1206 handle->peers = NULL; 901 {
1207 GNUNET_break (NULL == handle->ready_peer_head); 902 GNUNET_MQ_destroy (handle->mq);
903 handle->mq = NULL;
904 }
905 GNUNET_free (handle->handlers);
1208 GNUNET_free (handle); 906 GNUNET_free (handle);
1209} 907}
1210 908
1211 909
1212/** 910/**
1213 * Task that calls #request_next_transmission().
1214 *
1215 * @param cls the `struct PeerRecord *`
1216 */
1217static void
1218run_request_next_transmission (void *cls)
1219{
1220 struct PeerRecord *pr = cls;
1221
1222 pr->ntr_task = NULL;
1223 request_next_transmission (pr);
1224}
1225
1226
1227/**
1228 * Ask the core to call @a notify once it is ready to transmit the 911 * Ask the core to call @a notify once it is ready to transmit the
1229 * given number of bytes to the specified @a target. Must only be 912 * given number of bytes to the specified @a target. Must only be
1230 * called after a connection to the respective peer has been 913 * called after a connection to the respective peer has been
@@ -1261,13 +944,16 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
1261{ 944{
1262 struct PeerRecord *pr; 945 struct PeerRecord *pr;
1263 struct GNUNET_CORE_TransmitHandle *th; 946 struct GNUNET_CORE_TransmitHandle *th;
947 struct SendMessageRequest *smr;
948 struct GNUNET_MQ_Envelope *env;
1264 949
1265 if (notify_size > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) 950 GNUNET_assert (NULL != notify);
951 if ( (notify_size > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) ||
952 (notify_size + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) )
1266 { 953 {
1267 GNUNET_break (0); 954 GNUNET_break (0);
1268 return NULL; 955 return NULL;
1269 } 956 }
1270 GNUNET_assert (NULL != notify);
1271 LOG (GNUNET_ERROR_TYPE_DEBUG, 957 LOG (GNUNET_ERROR_TYPE_DEBUG,
1272 "Asking core for transmission of %u bytes to `%s'\n", 958 "Asking core for transmission of %u bytes to `%s'\n",
1273 (unsigned int) notify_size, 959 (unsigned int) notify_size,
@@ -1286,10 +972,10 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
1286 GNUNET_break (0); 972 GNUNET_break (0);
1287 return NULL; 973 return NULL;
1288 } 974 }
1289 GNUNET_assert (notify_size + sizeof (struct SendMessage) <
1290 GNUNET_SERVER_MAX_MESSAGE_SIZE);
1291 th = &pr->th; 975 th = &pr->th;
1292 memset (th, 0, sizeof (struct GNUNET_CORE_TransmitHandle)); 976 memset (th,
977 0,
978 sizeof (struct GNUNET_CORE_TransmitHandle));
1293 th->peer = pr; 979 th->peer = pr;
1294 th->get_message = notify; 980 th->get_message = notify;
1295 th->get_message_cls = notify_cls; 981 th->get_message_cls = notify_cls;
@@ -1301,9 +987,16 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
1301 th->priority = priority; 987 th->priority = priority;
1302 th->msize = notify_size; 988 th->msize = notify_size;
1303 th->cork = cork; 989 th->cork = cork;
1304 GNUNET_assert (NULL == pr->ntr_task); 990 env = GNUNET_MQ_msg (smr,
1305 pr->ntr_task = 991 GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
1306 GNUNET_SCHEDULER_add_now (&run_request_next_transmission, pr); 992 smr->priority = htonl ((uint32_t) th->priority);
993 smr->deadline = GNUNET_TIME_absolute_hton (th->deadline);
994 smr->peer = pr->peer;
995 smr->reserved = htonl (0);
996 smr->size = htons (th->msize);
997 smr->smr_id = htons (th->smr_id = pr->smr_id_gen++);
998 GNUNET_MQ_send (handle->mq,
999 env);
1307 LOG (GNUNET_ERROR_TYPE_DEBUG, 1000 LOG (GNUNET_ERROR_TYPE_DEBUG,
1308 "Transmission request added to queue\n"); 1001 "Transmission request added to queue\n");
1309 return th; 1002 return th;
@@ -1319,41 +1012,12 @@ void
1319GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th) 1012GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th)
1320{ 1013{
1321 struct PeerRecord *pr = th->peer; 1014 struct PeerRecord *pr = th->peer;
1322 struct GNUNET_CORE_Handle *h;
1323 1015
1324 GNUNET_assert (NULL != th);
1325 GNUNET_assert (NULL != pr);
1326 LOG (GNUNET_ERROR_TYPE_DEBUG, 1016 LOG (GNUNET_ERROR_TYPE_DEBUG,
1327 "Aborting transmission request to core for %u bytes to `%s'\n", 1017 "Aborting transmission request to core for %u bytes to `%s'\n",
1328 (unsigned int) th->msize, 1018 (unsigned int) th->msize,
1329 GNUNET_i2s (&pr->peer)); 1019 GNUNET_i2s (&pr->peer));
1330 th->peer = NULL; 1020 th->peer = NULL;
1331 h = pr->ch;
1332 if (NULL != th->cm)
1333 {
1334 /* we're currently in the control queue, remove */
1335 GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
1336 h->control_pending_tail,
1337 th->cm);
1338 GNUNET_free (th->cm);
1339 th->cm = NULL;
1340 }
1341 if ( (NULL != pr->prev) ||
1342 (NULL != pr->next) ||
1343 (pr == h->ready_peer_head) )
1344 {
1345 /* the request that was 'approved' by core was
1346 * canceled before it could be transmitted; remove
1347 * us from the 'ready' list */
1348 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
1349 h->ready_peer_tail,
1350 pr);
1351 }
1352 if (NULL != pr->ntr_task)
1353 {
1354 GNUNET_SCHEDULER_cancel (pr->ntr_task);
1355 pr->ntr_task = NULL;
1356 }
1357} 1021}
1358 1022
1359 1023
@@ -1376,9 +1040,8 @@ int
1376GNUNET_CORE_is_peer_connected_sync (const struct GNUNET_CORE_Handle *h, 1040GNUNET_CORE_is_peer_connected_sync (const struct GNUNET_CORE_Handle *h,
1377 const struct GNUNET_PeerIdentity *pid) 1041 const struct GNUNET_PeerIdentity *pid)
1378{ 1042{
1379 GNUNET_assert (NULL != h); 1043 return GNUNET_CONTAINER_multipeermap_contains (h->peers,
1380 GNUNET_assert (NULL != pid); 1044 pid);
1381 return GNUNET_CONTAINER_multipeermap_contains (h->peers, pid);
1382} 1045}
1383 1046
1384 1047