diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-06-19 21:29:20 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-06-19 21:29:20 +0000 |
commit | 8b1a351c049deec3226aa40a883b97c76916bea7 (patch) | |
tree | d507edb09031a9c9025ad5eb927fc1a9c36452d7 /src/core/core_api.c | |
parent | 80c4558f3742ce78f1f511ea13d8941c46b0e88b (diff) | |
download | gnunet-8b1a351c049deec3226aa40a883b97c76916bea7.tar.gz gnunet-8b1a351c049deec3226aa40a883b97c76916bea7.zip |
refactoring core API to use new MQ lib
Diffstat (limited to 'src/core/core_api.c')
-rw-r--r-- | src/core/core_api.c | 1183 |
1 files changed, 423 insertions, 760 deletions
diff --git a/src/core/core_api.c b/src/core/core_api.c index 7b423b6a0..af78ab4f9 100644 --- a/src/core/core_api.c +++ b/src/core/core_api.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | Copyright (C) 2009-2014 GNUnet e.V. | 3 | Copyright (C) 2009-2016 GNUnet e.V. |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -44,12 +44,6 @@ struct GNUNET_CORE_TransmitHandle | |||
44 | struct PeerRecord *peer; | 44 | struct PeerRecord *peer; |
45 | 45 | ||
46 | /** | 46 | /** |
47 | * Corresponding SEND_REQUEST message. Only non-NULL | ||
48 | * while SEND_REQUEST message is pending. | ||
49 | */ | ||
50 | struct ControlMessage *cm; | ||
51 | |||
52 | /** | ||
53 | * Function that will be called to get the actual request | 47 | * Function that will be called to get the actual request |
54 | * (once we are ready to transmit this request to the core). | 48 | * (once we are ready to transmit this request to the core). |
55 | * The function will be called with a NULL buffer to signal | 49 | * The function will be called with a NULL buffer to signal |
@@ -103,26 +97,12 @@ struct PeerRecord | |||
103 | { | 97 | { |
104 | 98 | ||
105 | /** | 99 | /** |
106 | * We generally do NOT keep peer records in a DLL; this | 100 | * Corresponding CORE handle. |
107 | * DLL is only used IF this peer's 'pending_head' message | ||
108 | * is ready for transmission. | ||
109 | */ | ||
110 | struct PeerRecord *prev; | ||
111 | |||
112 | /** | ||
113 | * We generally do NOT keep peer records in a DLL; this | ||
114 | * DLL is only used IF this peer's 'pending_head' message | ||
115 | * is ready for transmission. | ||
116 | */ | ||
117 | struct PeerRecord *next; | ||
118 | |||
119 | /** | ||
120 | * Corresponding core handle. | ||
121 | */ | 101 | */ |
122 | struct GNUNET_CORE_Handle *ch; | 102 | struct GNUNET_CORE_Handle *ch; |
123 | 103 | ||
124 | /** | 104 | /** |
125 | * Pending request, if any. 'th->peer' is set to NULL if the | 105 | * Pending request, if any. 'th->peer' is set to NULL if the |
126 | * request is not active. | 106 | * request is not active. |
127 | */ | 107 | */ |
128 | struct GNUNET_CORE_TransmitHandle th; | 108 | struct GNUNET_CORE_TransmitHandle th; |
@@ -133,11 +113,6 @@ struct PeerRecord | |||
133 | struct GNUNET_PeerIdentity peer; | 113 | struct GNUNET_PeerIdentity peer; |
134 | 114 | ||
135 | /** | 115 | /** |
136 | * ID of task to run #run_request_next_transmission(). | ||
137 | */ | ||
138 | struct GNUNET_SCHEDULER_Task *ntr_task; | ||
139 | |||
140 | /** | ||
141 | * SendMessageRequest ID generator for this peer. | 116 | * SendMessageRequest ID generator for this peer. |
142 | */ | 117 | */ |
143 | uint16_t smr_id_gen; | 118 | uint16_t smr_id_gen; |
@@ -146,58 +121,6 @@ struct PeerRecord | |||
146 | 121 | ||
147 | 122 | ||
148 | /** | 123 | /** |
149 | * Type of function called upon completion. | ||
150 | * | ||
151 | * @param cls closure | ||
152 | * @param success #GNUNET_OK on success (which for request_connect | ||
153 | * ONLY means that we transmitted the connect request to CORE, | ||
154 | * it does not mean that we are actually now connected!); | ||
155 | * #GNUNET_NO on timeout, | ||
156 | * #GNUNET_SYSERR if core was shut down | ||
157 | */ | ||
158 | typedef void | ||
159 | (*GNUNET_CORE_ControlContinuation) (void *cls, | ||
160 | int success); | ||
161 | |||
162 | |||
163 | /** | ||
164 | * Entry in a doubly-linked list of control messages to be transmitted | ||
165 | * to the core service. Control messages include traffic allocation, | ||
166 | * connection requests and of course our initial 'init' request. | ||
167 | * | ||
168 | * The actual message is allocated at the end of this struct. | ||
169 | */ | ||
170 | struct ControlMessage | ||
171 | { | ||
172 | /** | ||
173 | * This is a doubly-linked list. | ||
174 | */ | ||
175 | struct ControlMessage *next; | ||
176 | |||
177 | /** | ||
178 | * This is a doubly-linked list. | ||
179 | */ | ||
180 | struct ControlMessage *prev; | ||
181 | |||
182 | /** | ||
183 | * Function to run after transmission failed/succeeded. | ||
184 | */ | ||
185 | GNUNET_CORE_ControlContinuation cont; | ||
186 | |||
187 | /** | ||
188 | * Closure for @e cont. | ||
189 | */ | ||
190 | void *cont_cls; | ||
191 | |||
192 | /** | ||
193 | * Transmit handle (if one is associated with this ControlMessage), or NULL. | ||
194 | */ | ||
195 | struct GNUNET_CORE_TransmitHandle *th; | ||
196 | }; | ||
197 | |||
198 | |||
199 | |||
200 | /** | ||
201 | * Context for the core service connection. | 124 | * Context for the core service connection. |
202 | */ | 125 | */ |
203 | struct GNUNET_CORE_Handle | 126 | struct GNUNET_CORE_Handle |
@@ -241,39 +164,12 @@ struct GNUNET_CORE_Handle | |||
241 | /** | 164 | /** |
242 | * Function handlers for messages of particular type. | 165 | * Function handlers for messages of particular type. |
243 | */ | 166 | */ |
244 | const struct GNUNET_CORE_MessageHandler *handlers; | 167 | struct GNUNET_CORE_MessageHandler *handlers; |
245 | |||
246 | /** | ||
247 | * Our connection to the service. | ||
248 | */ | ||
249 | struct GNUNET_CLIENT_Connection *client; | ||
250 | |||
251 | /** | ||
252 | * Handle for our current transmission request. | ||
253 | */ | ||
254 | struct GNUNET_CLIENT_TransmitHandle *cth; | ||
255 | |||
256 | /** | ||
257 | * Head of doubly-linked list of pending requests. | ||
258 | */ | ||
259 | struct ControlMessage *control_pending_head; | ||
260 | |||
261 | /** | ||
262 | * Tail of doubly-linked list of pending requests. | ||
263 | */ | ||
264 | struct ControlMessage *control_pending_tail; | ||
265 | |||
266 | /** | ||
267 | * Head of doubly-linked list of peers that are core-approved | ||
268 | * to send their next message. | ||
269 | */ | ||
270 | struct PeerRecord *ready_peer_head; | ||
271 | 168 | ||
272 | /** | 169 | /** |
273 | * Tail of doubly-linked list of peers that are core-approved | 170 | * Our message queue for transmissions to the service. |
274 | * to send their next message. | ||
275 | */ | 171 | */ |
276 | struct PeerRecord *ready_peer_tail; | 172 | struct GNUNET_MQ_Handle *mq; |
277 | 173 | ||
278 | /** | 174 | /** |
279 | * Hash map listing all of the peers that we are currently | 175 | * Hash map listing all of the peers that we are currently |
@@ -289,7 +185,7 @@ struct GNUNET_CORE_Handle | |||
289 | /** | 185 | /** |
290 | * ID of reconnect task (if any). | 186 | * ID of reconnect task (if any). |
291 | */ | 187 | */ |
292 | struct GNUNET_SCHEDULER_Task * reconnect_task; | 188 | struct GNUNET_SCHEDULER_Task *reconnect_task; |
293 | 189 | ||
294 | /** | 190 | /** |
295 | * Current delay we use for re-trying to connect to core. | 191 | * Current delay we use for re-trying to connect to core. |
@@ -351,8 +247,8 @@ reconnect_task (void *cls) | |||
351 | 247 | ||
352 | 248 | ||
353 | /** | 249 | /** |
354 | * Notify clients about disconnect and free | 250 | * Notify clients about disconnect and free the entry for connected |
355 | * the entry for connected peer. | 251 | * peer. |
356 | * | 252 | * |
357 | * @param cls the `struct GNUNET_CORE_Handle *` | 253 | * @param cls the `struct GNUNET_CORE_Handle *` |
358 | * @param key the peer identity (not used) | 254 | * @param key the peer identity (not used) |
@@ -368,17 +264,6 @@ disconnect_and_free_peer_entry (void *cls, | |||
368 | struct GNUNET_CORE_TransmitHandle *th; | 264 | struct GNUNET_CORE_TransmitHandle *th; |
369 | struct PeerRecord *pr = value; | 265 | struct PeerRecord *pr = value; |
370 | 266 | ||
371 | if (NULL != pr->ntr_task) | ||
372 | { | ||
373 | GNUNET_SCHEDULER_cancel (pr->ntr_task); | ||
374 | pr->ntr_task = NULL; | ||
375 | } | ||
376 | if ( (NULL != pr->prev) || | ||
377 | (NULL != pr->next) || | ||
378 | (h->ready_peer_head == pr) ) | ||
379 | GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, | ||
380 | h->ready_peer_tail, | ||
381 | pr); | ||
382 | if (NULL != h->disconnects) | 267 | if (NULL != h->disconnects) |
383 | h->disconnects (h->cls, | 268 | h->disconnects (h->cls, |
384 | &pr->peer); | 269 | &pr->peer); |
@@ -388,14 +273,13 @@ disconnect_and_free_peer_entry (void *cls, | |||
388 | { | 273 | { |
389 | GNUNET_break (0); | 274 | GNUNET_break (0); |
390 | th->peer = NULL; | 275 | th->peer = NULL; |
391 | if (NULL != th->cm) | ||
392 | th->cm->th = NULL; | ||
393 | } | 276 | } |
394 | /* done with 'voluntary' cleanups, now on to normal freeing */ | 277 | /* done with 'voluntary' cleanups, now on to normal freeing */ |
395 | GNUNET_assert (GNUNET_YES == | 278 | GNUNET_assert (GNUNET_YES == |
396 | GNUNET_CONTAINER_multipeermap_remove (h->peers, key, pr)); | 279 | GNUNET_CONTAINER_multipeermap_remove (h->peers, |
280 | key, | ||
281 | pr)); | ||
397 | GNUNET_assert (pr->ch == h); | 282 | GNUNET_assert (pr->ch == h); |
398 | GNUNET_assert (NULL == pr->ntr_task); | ||
399 | GNUNET_free (pr); | 283 | GNUNET_free (pr); |
400 | return GNUNET_YES; | 284 | return GNUNET_YES; |
401 | } | 285 | } |
@@ -410,659 +294,490 @@ disconnect_and_free_peer_entry (void *cls, | |||
410 | static void | 294 | static void |
411 | reconnect_later (struct GNUNET_CORE_Handle *h) | 295 | reconnect_later (struct GNUNET_CORE_Handle *h) |
412 | { | 296 | { |
413 | struct ControlMessage *cm; | ||
414 | struct PeerRecord *pr; | ||
415 | |||
416 | GNUNET_assert (NULL == h->reconnect_task); | 297 | GNUNET_assert (NULL == h->reconnect_task); |
417 | if (NULL != h->cth) | 298 | if (NULL != h->mq) |
418 | { | ||
419 | GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth); | ||
420 | h->cth = NULL; | ||
421 | } | ||
422 | if (NULL != h->client) | ||
423 | { | 299 | { |
424 | GNUNET_CLIENT_disconnect (h->client); | 300 | GNUNET_MQ_destroy (h->mq); |
425 | h->client = NULL; | 301 | h->mq = NULL; |
426 | } | 302 | } |
427 | h->currently_down = GNUNET_YES; | 303 | h->currently_down = GNUNET_YES; |
428 | GNUNET_assert (h->reconnect_task == NULL); | 304 | GNUNET_assert (h->reconnect_task == NULL); |
429 | h->reconnect_task = | 305 | h->reconnect_task = |
430 | GNUNET_SCHEDULER_add_delayed (h->retry_backoff, | 306 | GNUNET_SCHEDULER_add_delayed (h->retry_backoff, |
431 | &reconnect_task, h); | 307 | &reconnect_task, |
432 | while (NULL != (cm = h->control_pending_head)) | 308 | h); |
433 | { | ||
434 | GNUNET_CONTAINER_DLL_remove (h->control_pending_head, | ||
435 | h->control_pending_tail, | ||
436 | cm); | ||
437 | if (NULL != cm->th) | ||
438 | cm->th->cm = NULL; | ||
439 | if (NULL != cm->cont) | ||
440 | cm->cont (cm->cont_cls, GNUNET_NO); | ||
441 | GNUNET_free (cm); | ||
442 | } | ||
443 | GNUNET_CONTAINER_multipeermap_iterate (h->peers, | 309 | GNUNET_CONTAINER_multipeermap_iterate (h->peers, |
444 | &disconnect_and_free_peer_entry, h); | 310 | &disconnect_and_free_peer_entry, |
445 | while (NULL != (pr = h->ready_peer_head)) | 311 | h); |
446 | GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, | ||
447 | h->ready_peer_tail, | ||
448 | pr); | ||
449 | GNUNET_assert (NULL == h->control_pending_head); | ||
450 | h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff); | 312 | h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff); |
451 | } | 313 | } |
452 | 314 | ||
453 | 315 | ||
454 | /** | 316 | /** |
455 | * Check the list of pending requests, send the next | 317 | * Generic error handler, called with the appropriate error code and |
456 | * one to the core. | 318 | * the same closure specified at the creation of the message queue. |
319 | * Not every message queue implementation supports an error handler. | ||
457 | * | 320 | * |
458 | * @param h core handle | 321 | * @param cls closure, a `struct GNUNET_CORE_Handle *` |
459 | * @param ignore_currently_down transmit message even if not initialized? | 322 | * @param error error code |
460 | */ | 323 | */ |
461 | static void | 324 | static void |
462 | trigger_next_request (struct GNUNET_CORE_Handle *h, | 325 | handle_mq_error (void *cls, |
463 | int ignore_currently_down); | 326 | enum GNUNET_MQ_Error error) |
327 | { | ||
328 | struct GNUNET_CORE_Handle *h = cls; | ||
329 | |||
330 | reconnect_later (h); | ||
331 | } | ||
464 | 332 | ||
465 | 333 | ||
466 | /** | 334 | /** |
467 | * Send a control message to the peer asking for transmission | 335 | * Handle init reply message received from CORE service. Notify |
468 | * of the message in the given peer record. | 336 | * application that we are now connected to the CORE. Also fake |
337 | * loopback connection. | ||
469 | * | 338 | * |
470 | * @param pr peer to request transmission to | 339 | * @param cls the `struct GNUNET_CORE_Handle` |
340 | * @param m the init reply | ||
471 | */ | 341 | */ |
472 | static void | 342 | static void |
473 | request_next_transmission (struct PeerRecord *pr) | 343 | handle_init_reply (void *cls, |
344 | const struct InitReplyMessage *m) | ||
474 | { | 345 | { |
475 | struct GNUNET_CORE_Handle *h = pr->ch; | 346 | struct GNUNET_CORE_Handle *h = cls; |
476 | struct ControlMessage *cm; | 347 | GNUNET_CORE_StartupCallback init; |
477 | struct SendMessageRequest *smr; | 348 | struct PeerRecord *pr; |
478 | struct GNUNET_CORE_TransmitHandle *th; | ||
479 | 349 | ||
480 | th = &pr->th; | 350 | GNUNET_break (0 == ntohl (m->reserved)); |
481 | if (NULL == th->peer) | 351 | GNUNET_break (GNUNET_YES == h->currently_down); |
352 | h->currently_down = GNUNET_NO; | ||
353 | h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS; | ||
354 | if (NULL != (init = h->init)) | ||
482 | { | 355 | { |
483 | trigger_next_request (h, GNUNET_NO); | 356 | /* mark so we don't call init on reconnect */ |
484 | return; | 357 | h->init = NULL; |
358 | h->me = m->my_identity; | ||
359 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
360 | "Connected to core service of peer `%s'.\n", | ||
361 | GNUNET_i2s (&h->me)); | ||
362 | init (h->cls, | ||
363 | &h->me); | ||
485 | } | 364 | } |
486 | if (NULL != th->cm) | 365 | else |
487 | return; /* already done */ | 366 | { |
488 | GNUNET_assert (NULL == pr->prev); | 367 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
489 | GNUNET_assert (NULL == pr->next); | 368 | "Successfully reconnected to core service.\n"); |
490 | cm = GNUNET_malloc (sizeof (struct ControlMessage) + | 369 | GNUNET_break (0 == memcmp (&h->me, |
491 | sizeof (struct SendMessageRequest)); | 370 | &m->my_identity, |
492 | th->cm = cm; | 371 | sizeof (struct GNUNET_PeerIdentity))); |
493 | cm->th = th; | 372 | } |
494 | smr = (struct SendMessageRequest *) &cm[1]; | 373 | /* fake 'connect to self' */ |
495 | smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST); | 374 | pr = GNUNET_new (struct PeerRecord); |
496 | smr->header.size = htons (sizeof (struct SendMessageRequest)); | 375 | pr->peer = h->me; |
497 | smr->priority = htonl ((uint32_t) th->priority); | 376 | pr->ch = h; |
498 | smr->deadline = GNUNET_TIME_absolute_hton (th->deadline); | 377 | GNUNET_assert (GNUNET_YES == |
499 | smr->peer = pr->peer; | 378 | GNUNET_CONTAINER_multipeermap_put (h->peers, |
500 | smr->reserved = htonl (0); | 379 | &h->me, |
501 | smr->size = htons (th->msize); | 380 | pr, |
502 | smr->smr_id = htons (th->smr_id = pr->smr_id_gen++); | 381 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); |
503 | GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head, | 382 | if (NULL != h->connects) |
504 | h->control_pending_tail, cm); | 383 | h->connects (h->cls, |
505 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 384 | &pr->peer); |
506 | "Adding SEND REQUEST for peer `%s' to message queue\n", | ||
507 | GNUNET_i2s (&pr->peer)); | ||
508 | trigger_next_request (h, GNUNET_NO); | ||
509 | } | 385 | } |
510 | 386 | ||
511 | 387 | ||
512 | /** | 388 | /** |
513 | * Transmit the next message to the core service. | 389 | * Handle connect message received from CORE service. |
390 | * Notify the application about the new connection. | ||
514 | * | 391 | * |
515 | * @param cls closure with the `struct GNUNET_CORE_Handle` | 392 | * @param cls the `struct GNUNET_CORE_Handle` |
516 | * @param size number of bytes available in @a buf | 393 | * @param cnm the connect message |
517 | * @param buf where the callee should write the message | ||
518 | * @return number of bytes written to @a buf | ||
519 | */ | 394 | */ |
520 | static size_t | 395 | static void |
521 | transmit_message (void *cls, | 396 | handle_connect_notify (void *cls, |
522 | size_t size, | 397 | const struct ConnectNotifyMessage * cnm) |
523 | void *buf) | ||
524 | { | 398 | { |
525 | struct GNUNET_CORE_Handle *h = cls; | 399 | struct GNUNET_CORE_Handle *h = cls; |
526 | struct ControlMessage *cm; | ||
527 | struct GNUNET_CORE_TransmitHandle *th; | ||
528 | struct GNUNET_TIME_Relative delay; | ||
529 | struct GNUNET_TIME_Relative overdue; | ||
530 | struct PeerRecord *pr; | 400 | struct PeerRecord *pr; |
531 | struct SendMessage *sm; | ||
532 | const struct GNUNET_MessageHeader *hdr; | ||
533 | uint16_t msize; | ||
534 | size_t ret; | ||
535 | 401 | ||
536 | GNUNET_assert (h->reconnect_task == NULL); | 402 | GNUNET_break (GNUNET_NO == h->currently_down); |
537 | h->cth = NULL; | ||
538 | if (NULL == buf) | ||
539 | { | ||
540 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
541 | "Transmission failed, initiating reconnect\n"); | ||
542 | reconnect_later (h); | ||
543 | return 0; | ||
544 | } | ||
545 | /* first check for control messages */ | ||
546 | if (NULL != (cm = h->control_pending_head)) | ||
547 | { | ||
548 | hdr = (const struct GNUNET_MessageHeader *) &cm[1]; | ||
549 | msize = ntohs (hdr->size); | ||
550 | if (size < msize) | ||
551 | { | ||
552 | trigger_next_request (h, GNUNET_NO); | ||
553 | return 0; | ||
554 | } | ||
555 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
556 | "Transmitting control message with %u bytes of type %u to core.\n", | ||
557 | (unsigned int) msize, | ||
558 | (unsigned int) ntohs (hdr->type)); | ||
559 | memcpy (buf, hdr, msize); | ||
560 | GNUNET_CONTAINER_DLL_remove (h->control_pending_head, | ||
561 | h->control_pending_tail, cm); | ||
562 | if (NULL != cm->th) | ||
563 | cm->th->cm = NULL; | ||
564 | if (NULL != cm->cont) | ||
565 | cm->cont (cm->cont_cls, GNUNET_OK); | ||
566 | GNUNET_free (cm); | ||
567 | trigger_next_request (h, GNUNET_NO); | ||
568 | return msize; | ||
569 | } | ||
570 | /* now check for 'ready' P2P messages */ | ||
571 | if (NULL == (pr = h->ready_peer_head)) | ||
572 | return 0; | ||
573 | GNUNET_assert (NULL != pr->th.peer); | ||
574 | th = &pr->th; | ||
575 | if (size < th->msize + sizeof (struct SendMessage)) | ||
576 | { | ||
577 | trigger_next_request (h, GNUNET_NO); | ||
578 | return 0; | ||
579 | } | ||
580 | GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, | ||
581 | h->ready_peer_tail, | ||
582 | pr); | ||
583 | th->peer = NULL; | ||
584 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 403 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
585 | "Transmitting SEND request to `%s' with %u bytes.\n", | 404 | "Received notification about connection from `%s'.\n", |
586 | GNUNET_i2s (&pr->peer), | 405 | GNUNET_i2s (&cnm->peer)); |
587 | (unsigned int) th->msize); | 406 | if (0 == memcmp (&h->me, |
588 | sm = (struct SendMessage *) buf; | 407 | &cnm->peer, |
589 | sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND); | 408 | sizeof (struct GNUNET_PeerIdentity))) |
590 | sm->priority = htonl ((uint32_t) th->priority); | ||
591 | sm->deadline = GNUNET_TIME_absolute_hton (th->deadline); | ||
592 | sm->peer = pr->peer; | ||
593 | sm->cork = htonl ((uint32_t) th->cork); | ||
594 | sm->reserved = htonl (0); | ||
595 | ret = | ||
596 | th->get_message (th->get_message_cls, | ||
597 | size - sizeof (struct SendMessage), | ||
598 | &sm[1]); | ||
599 | delay = GNUNET_TIME_absolute_get_duration (th->request_time); | ||
600 | overdue = GNUNET_TIME_absolute_get_duration (th->deadline); | ||
601 | if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us) | ||
602 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
603 | "Transmitting overdue %u bytes to `%s' at priority %u with %s delay%s\n", | ||
604 | ret, | ||
605 | GNUNET_i2s (&pr->peer), | ||
606 | (unsigned int) th->priority, | ||
607 | GNUNET_STRINGS_relative_time_to_string (delay, | ||
608 | GNUNET_YES), | ||
609 | (th->cork) ? " (corked)" : ""); | ||
610 | else | ||
611 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
612 | "Transmitting %u bytes to `%s' at priority %u with %s delay%s\n", | ||
613 | ret, | ||
614 | GNUNET_i2s (&pr->peer), | ||
615 | (unsigned int) th->priority, | ||
616 | GNUNET_STRINGS_relative_time_to_string (delay, | ||
617 | GNUNET_YES), | ||
618 | (th->cork) ? " (corked)" : ""); | ||
619 | if ( (0 == ret) && | ||
620 | (GNUNET_CORE_PRIO_BACKGROUND == th->priority) ) | ||
621 | { | 409 | { |
622 | /* client decided to send nothing; as the priority was | 410 | /* connect to self!? */ |
623 | BACKGROUND, we can just not send anything to core. | 411 | GNUNET_break (0); |
624 | For higher-priority messages, we must give an | 412 | return; |
625 | empty message to CORE so that it knows that this | ||
626 | message is no longer pending. */ | ||
627 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
628 | "Size of clients message to peer %s is 0!\n", | ||
629 | GNUNET_i2s (&pr->peer)); | ||
630 | request_next_transmission (pr); | ||
631 | return 0; | ||
632 | } | 413 | } |
633 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 414 | pr = GNUNET_CONTAINER_multipeermap_get (h->peers, |
634 | "Produced SEND message to core with %u bytes payload\n", | 415 | &cnm->peer); |
635 | (unsigned int) ret); | 416 | if (NULL != pr) |
636 | if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) | ||
637 | { | 417 | { |
638 | GNUNET_break (0); | 418 | GNUNET_break (0); |
639 | request_next_transmission (pr); | 419 | reconnect_later (h); |
640 | return 0; | 420 | return; |
641 | } | 421 | } |
642 | ret += sizeof (struct SendMessage); | 422 | pr = GNUNET_new (struct PeerRecord); |
643 | sm->header.size = htons (ret); | 423 | pr->peer = cnm->peer; |
644 | GNUNET_assert (ret <= size); | 424 | pr->ch = h; |
645 | request_next_transmission (pr); | 425 | GNUNET_assert (GNUNET_YES == |
646 | return ret; | 426 | GNUNET_CONTAINER_multipeermap_put (h->peers, |
427 | &cnm->peer, | ||
428 | pr, | ||
429 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
430 | if (NULL != h->connects) | ||
431 | h->connects (h->cls, | ||
432 | &pr->peer); | ||
647 | } | 433 | } |
648 | 434 | ||
649 | 435 | ||
650 | /** | 436 | /** |
651 | * Check the list of pending requests, send the next one to the core. | 437 | * Handle disconnect message received from CORE service. |
438 | * Notify the application about the lost connection. | ||
652 | * | 439 | * |
653 | * @param h core handle | 440 | * @param cls the `struct GNUNET_CORE_Handle` |
654 | * @param ignore_currently_down transmit message even if not initialized? | 441 | * @param dnm message about the disconnect event |
655 | */ | 442 | */ |
656 | static void | 443 | static void |
657 | trigger_next_request (struct GNUNET_CORE_Handle *h, | 444 | handle_disconnect_notify (void *cls, |
658 | int ignore_currently_down) | 445 | const struct DisconnectNotifyMessage * dnm) |
659 | { | 446 | { |
660 | uint16_t msize; | 447 | struct GNUNET_CORE_Handle *h = cls; |
448 | struct PeerRecord *pr; | ||
661 | 449 | ||
662 | if ( (GNUNET_YES == h->currently_down) && | 450 | GNUNET_break (GNUNET_NO == h->currently_down); |
663 | (GNUNET_NO == ignore_currently_down) ) | 451 | if (0 == memcmp (&h->me, |
452 | &dnm->peer, | ||
453 | sizeof (struct GNUNET_PeerIdentity))) | ||
664 | { | 454 | { |
665 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 455 | /* connection to self!? */ |
666 | "Core connection down, not processing queue\n"); | 456 | GNUNET_break (0); |
667 | return; | 457 | return; |
668 | } | 458 | } |
669 | if (NULL != h->cth) | 459 | GNUNET_break (0 == ntohl (dnm->reserved)); |
460 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
461 | "Received notification about disconnect from `%s'.\n", | ||
462 | GNUNET_i2s (&dnm->peer)); | ||
463 | pr = GNUNET_CONTAINER_multipeermap_get (h->peers, | ||
464 | &dnm->peer); | ||
465 | if (NULL == pr) | ||
670 | { | 466 | { |
671 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 467 | GNUNET_break (0); |
672 | "Request pending, not processing queue\n"); | 468 | reconnect_later (h); |
673 | return; | 469 | return; |
674 | } | 470 | } |
675 | if (NULL != h->control_pending_head) | 471 | disconnect_and_free_peer_entry (h, |
676 | msize = | 472 | &dnm->peer, |
677 | ntohs (((struct GNUNET_MessageHeader *) &h-> | 473 | pr); |
678 | control_pending_head[1])->size); | 474 | } |
679 | else if (h->ready_peer_head != NULL) | 475 | |
680 | msize = | 476 | |
681 | h->ready_peer_head->th.msize + sizeof (struct SendMessage); | 477 | /** |
682 | else | 478 | * Check that message received from CORE service is well-formed. |
479 | * | ||
480 | * @param cls the `struct GNUNET_CORE_Handle` | ||
481 | * @param ntm the message we got | ||
482 | * @return #GNUNET_OK if the message is well-formed | ||
483 | */ | ||
484 | static int | ||
485 | check_notify_inbound (void *cls, | ||
486 | const struct NotifyTrafficMessage *ntm) | ||
487 | { | ||
488 | struct GNUNET_CORE_Handle *h = cls; | ||
489 | uint16_t msize; | ||
490 | const struct GNUNET_MessageHeader *em; | ||
491 | |||
492 | GNUNET_break (GNUNET_NO == h->currently_down); | ||
493 | msize = ntohs (ntm->header.size) - sizeof (struct NotifyTrafficMessage); | ||
494 | if (msize < sizeof (struct GNUNET_MessageHeader)) | ||
683 | { | 495 | { |
684 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 496 | GNUNET_break (0); |
685 | "Request queue empty, not processing queue\n"); | 497 | return GNUNET_SYSERR; |
686 | return; /* no pending message */ | ||
687 | } | 498 | } |
688 | h->cth = | 499 | em = (const struct GNUNET_MessageHeader *) &ntm[1]; |
689 | GNUNET_CLIENT_notify_transmit_ready (h->client, msize, | 500 | if ( (GNUNET_NO == h->inbound_hdr_only) && |
690 | GNUNET_TIME_UNIT_FOREVER_REL, | 501 | (msize != ntohs (em->size)) ) |
691 | GNUNET_NO, | 502 | { |
692 | &transmit_message, h); | 503 | GNUNET_break (0); |
504 | return GNUNET_SYSERR; | ||
505 | } | ||
506 | return GNUNET_OK; | ||
693 | } | 507 | } |
694 | 508 | ||
695 | 509 | ||
696 | /** | 510 | /** |
697 | * Handler for notification messages received from the core. | 511 | * Handle inbound message received from CORE service. If applicable, |
512 | * notify the application. | ||
698 | * | 513 | * |
699 | * @param cls our `struct GNUNET_CORE_Handle` | 514 | * @param cls the `struct GNUNET_CORE_Handle` |
700 | * @param msg the message received from the core service | 515 | * @param ntm the message we got from CORE. |
701 | */ | 516 | */ |
702 | static void | 517 | static void |
703 | main_notify_handler (void *cls, | 518 | handle_notify_inbound (void *cls, |
704 | const struct GNUNET_MessageHeader *msg) | 519 | const struct NotifyTrafficMessage *ntm) |
705 | { | 520 | { |
706 | struct GNUNET_CORE_Handle *h = cls; | 521 | struct GNUNET_CORE_Handle *h = cls; |
707 | const struct InitReplyMessage *m; | ||
708 | const struct ConnectNotifyMessage *cnm; | ||
709 | const struct DisconnectNotifyMessage *dnm; | ||
710 | const struct NotifyTrafficMessage *ntm; | ||
711 | const struct GNUNET_MessageHeader *em; | 522 | const struct GNUNET_MessageHeader *em; |
712 | const struct SendMessageReady *smr; | ||
713 | const struct GNUNET_CORE_MessageHandler *mh; | ||
714 | GNUNET_CORE_StartupCallback init; | ||
715 | struct PeerRecord *pr; | 523 | struct PeerRecord *pr; |
716 | struct GNUNET_CORE_TransmitHandle *th; | ||
717 | unsigned int hpos; | ||
718 | int trigger; | ||
719 | uint16_t msize; | ||
720 | uint16_t et; | 524 | uint16_t et; |
721 | 525 | ||
722 | if (NULL == msg) | 526 | GNUNET_break (GNUNET_NO == h->currently_down); |
723 | { | ||
724 | LOG (GNUNET_ERROR_TYPE_INFO, | ||
725 | _("Client was disconnected from core service, trying to reconnect.\n")); | ||
726 | reconnect_later (h); | ||
727 | return; | ||
728 | } | ||
729 | msize = ntohs (msg->size); | ||
730 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 527 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
731 | "Processing message of type %u and size %u from core service\n", | 528 | "Received inbound message from `%s'.\n", |
732 | ntohs (msg->type), msize); | 529 | GNUNET_i2s (&ntm->peer)); |
733 | switch (ntohs (msg->type)) | 530 | em = (const struct GNUNET_MessageHeader *) &ntm[1]; |
531 | et = ntohs (em->type); | ||
532 | for (unsigned int hpos = 0; NULL != h->handlers[hpos].callback; hpos++) | ||
734 | { | 533 | { |
735 | case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY: | 534 | const struct GNUNET_CORE_MessageHandler *mh; |
736 | if (ntohs (msg->size) != sizeof (struct InitReplyMessage)) | 535 | |
737 | { | 536 | mh = &h->handlers[hpos]; |
738 | GNUNET_break (0); | 537 | if (mh->type != et) |
739 | reconnect_later (h); | 538 | continue; |
740 | return; | 539 | if ( (mh->expected_size != ntohs (em->size)) && |
741 | } | 540 | (0 != mh->expected_size) ) |
742 | m = (const struct InitReplyMessage *) msg; | ||
743 | GNUNET_break (0 == ntohl (m->reserved)); | ||
744 | /* start our message processing loop */ | ||
745 | if (GNUNET_YES == h->currently_down) | ||
746 | { | ||
747 | h->currently_down = GNUNET_NO; | ||
748 | trigger_next_request (h, GNUNET_NO); | ||
749 | } | ||
750 | h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS; | ||
751 | h->me = m->my_identity; | ||
752 | if (NULL != (init = h->init)) | ||
753 | { | ||
754 | /* mark so we don't call init on reconnect */ | ||
755 | h->init = NULL; | ||
756 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
757 | "Connected to core service of peer `%s'.\n", | ||
758 | GNUNET_i2s (&h->me)); | ||
759 | init (h->cls, &h->me); | ||
760 | } | ||
761 | else | ||
762 | { | ||
763 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
764 | "Successfully reconnected to core service.\n"); | ||
765 | } | ||
766 | /* fake 'connect to self' */ | ||
767 | pr = GNUNET_CONTAINER_multipeermap_get (h->peers, &h->me); | ||
768 | GNUNET_assert (NULL == pr); | ||
769 | pr = GNUNET_new (struct PeerRecord); | ||
770 | pr->peer = h->me; | ||
771 | pr->ch = h; | ||
772 | GNUNET_assert (GNUNET_YES == | ||
773 | GNUNET_CONTAINER_multipeermap_put (h->peers, | ||
774 | &h->me, pr, | ||
775 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
776 | if (NULL != h->connects) | ||
777 | h->connects (h->cls, &pr->peer); | ||
778 | break; | ||
779 | case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT: | ||
780 | if (msize < sizeof (struct ConnectNotifyMessage)) | ||
781 | { | ||
782 | GNUNET_break (0); | ||
783 | reconnect_later (h); | ||
784 | return; | ||
785 | } | ||
786 | cnm = (const struct ConnectNotifyMessage *) msg; | ||
787 | if (msize != | ||
788 | sizeof (struct ConnectNotifyMessage)) | ||
789 | { | ||
790 | GNUNET_break (0); | ||
791 | reconnect_later (h); | ||
792 | return; | ||
793 | } | ||
794 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
795 | "Received notification about connection from `%s'.\n", | ||
796 | GNUNET_i2s (&cnm->peer)); | ||
797 | if (0 == memcmp (&h->me, | ||
798 | &cnm->peer, | ||
799 | sizeof (struct GNUNET_PeerIdentity))) | ||
800 | { | ||
801 | /* connect to self!? */ | ||
802 | GNUNET_break (0); | ||
803 | return; | ||
804 | } | ||
805 | pr = GNUNET_CONTAINER_multipeermap_get (h->peers, &cnm->peer); | ||
806 | if (NULL != pr) | ||
807 | { | ||
808 | GNUNET_break (0); | ||
809 | reconnect_later (h); | ||
810 | return; | ||
811 | } | ||
812 | pr = GNUNET_new (struct PeerRecord); | ||
813 | pr->peer = cnm->peer; | ||
814 | pr->ch = h; | ||
815 | GNUNET_assert (GNUNET_YES == | ||
816 | GNUNET_CONTAINER_multipeermap_put (h->peers, | ||
817 | &cnm->peer, pr, | ||
818 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
819 | if (NULL != h->connects) | ||
820 | h->connects (h->cls, &pr->peer); | ||
821 | break; | ||
822 | case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT: | ||
823 | if (msize != sizeof (struct DisconnectNotifyMessage)) | ||
824 | { | ||
825 | GNUNET_break (0); | ||
826 | reconnect_later (h); | ||
827 | return; | ||
828 | } | ||
829 | dnm = (const struct DisconnectNotifyMessage *) msg; | ||
830 | if (0 == memcmp (&h->me, | ||
831 | &dnm->peer, | ||
832 | sizeof (struct GNUNET_PeerIdentity))) | ||
833 | { | ||
834 | /* connection to self!? */ | ||
835 | GNUNET_break (0); | ||
836 | return; | ||
837 | } | ||
838 | GNUNET_break (0 == ntohl (dnm->reserved)); | ||
839 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
840 | "Received notification about disconnect from `%s'.\n", | ||
841 | GNUNET_i2s (&dnm->peer)); | ||
842 | pr = GNUNET_CONTAINER_multipeermap_get (h->peers, &dnm->peer); | ||
843 | if (NULL == pr) | ||
844 | { | ||
845 | GNUNET_break (0); | ||
846 | reconnect_later (h); | ||
847 | return; | ||
848 | } | ||
849 | trigger = ((pr->prev != NULL) || (pr->next != NULL) || | ||
850 | (h->ready_peer_head == pr)); | ||
851 | disconnect_and_free_peer_entry (h, &dnm->peer, pr); | ||
852 | if (trigger) | ||
853 | trigger_next_request (h, GNUNET_NO); | ||
854 | break; | ||
855 | case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND: | ||
856 | if (msize < sizeof (struct NotifyTrafficMessage)) | ||
857 | { | ||
858 | GNUNET_break (0); | ||
859 | reconnect_later (h); | ||
860 | return; | ||
861 | } | ||
862 | ntm = (const struct NotifyTrafficMessage *) msg; | ||
863 | if ((msize < | ||
864 | sizeof (struct NotifyTrafficMessage) + | ||
865 | sizeof (struct GNUNET_MessageHeader)) ) | ||
866 | { | ||
867 | GNUNET_break (0); | ||
868 | reconnect_later (h); | ||
869 | return; | ||
870 | } | ||
871 | em = (const struct GNUNET_MessageHeader *) &ntm[1]; | ||
872 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
873 | "Received message of type %u and size %u from peer `%s'\n", | ||
874 | ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer)); | ||
875 | if ((GNUNET_NO == h->inbound_hdr_only) && | ||
876 | (msize != | ||
877 | ntohs (em->size) + sizeof (struct NotifyTrafficMessage))) | ||
878 | { | ||
879 | GNUNET_break (0); | ||
880 | reconnect_later (h); | ||
881 | return; | ||
882 | } | ||
883 | et = ntohs (em->type); | ||
884 | for (hpos = 0; hpos < h->hcnt; hpos++) | ||
885 | { | ||
886 | mh = &h->handlers[hpos]; | ||
887 | if (mh->type != et) | ||
888 | continue; | ||
889 | if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0)) | ||
890 | { | ||
891 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
892 | "Unexpected message size %u for message of type %u from peer `%s'\n", | ||
893 | htons (em->size), mh->type, GNUNET_i2s (&ntm->peer)); | ||
894 | GNUNET_break_op (0); | ||
895 | continue; | ||
896 | } | ||
897 | pr = GNUNET_CONTAINER_multipeermap_get (h->peers, &ntm->peer); | ||
898 | if (NULL == pr) | ||
899 | { | ||
900 | GNUNET_break (0); | ||
901 | reconnect_later (h); | ||
902 | return; | ||
903 | } | ||
904 | if (GNUNET_OK != | ||
905 | h->handlers[hpos].callback (h->cls, &ntm->peer, em)) | ||
906 | { | ||
907 | /* error in processing, do not process other messages! */ | ||
908 | break; | ||
909 | } | ||
910 | } | ||
911 | if (NULL != h->inbound_notify) | ||
912 | h->inbound_notify (h->cls, &ntm->peer, em); | ||
913 | break; | ||
914 | case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND: | ||
915 | if (msize < sizeof (struct NotifyTrafficMessage)) | ||
916 | { | ||
917 | GNUNET_break (0); | ||
918 | reconnect_later (h); | ||
919 | return; | ||
920 | } | ||
921 | ntm = (const struct NotifyTrafficMessage *) msg; | ||
922 | if ((msize < | ||
923 | sizeof (struct NotifyTrafficMessage) + | ||
924 | sizeof (struct GNUNET_MessageHeader)) ) | ||
925 | { | ||
926 | GNUNET_break (0); | ||
927 | reconnect_later (h); | ||
928 | return; | ||
929 | } | ||
930 | em = (const struct GNUNET_MessageHeader *) &ntm[1]; | ||
931 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
932 | "Received notification about transmission to `%s'.\n", | ||
933 | GNUNET_i2s (&ntm->peer)); | ||
934 | if ((GNUNET_NO == h->outbound_hdr_only) && | ||
935 | (msize != | ||
936 | ntohs (em->size) + sizeof (struct NotifyTrafficMessage))) | ||
937 | { | ||
938 | GNUNET_break (0); | ||
939 | reconnect_later (h); | ||
940 | return; | ||
941 | } | ||
942 | if (NULL == h->outbound_notify) | ||
943 | { | ||
944 | GNUNET_break (0); | ||
945 | break; | ||
946 | } | ||
947 | h->outbound_notify (h->cls, &ntm->peer, em); | ||
948 | break; | ||
949 | case GNUNET_MESSAGE_TYPE_CORE_SEND_READY: | ||
950 | if (msize != sizeof (struct SendMessageReady)) | ||
951 | { | 541 | { |
952 | GNUNET_break (0); | 542 | LOG (GNUNET_ERROR_TYPE_ERROR, |
953 | reconnect_later (h); | 543 | "Unexpected message size %u for message of type %u from peer `%s'\n", |
954 | return; | 544 | htons (em->size), |
545 | mh->type, | ||
546 | GNUNET_i2s (&ntm->peer)); | ||
547 | GNUNET_break_op (0); | ||
548 | continue; | ||
955 | } | 549 | } |
956 | smr = (const struct SendMessageReady *) msg; | ||
957 | pr = GNUNET_CONTAINER_multipeermap_get (h->peers, | 550 | pr = GNUNET_CONTAINER_multipeermap_get (h->peers, |
958 | &smr->peer); | 551 | &ntm->peer); |
959 | if (NULL == pr) | 552 | if (NULL == pr) |
960 | { | 553 | { |
961 | GNUNET_break (0); | 554 | GNUNET_break (0); |
962 | reconnect_later (h); | 555 | reconnect_later (h); |
963 | return; | 556 | return; |
964 | } | 557 | } |
965 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 558 | if (GNUNET_OK != |
966 | "Received notification about transmission readiness to `%s'.\n", | 559 | h->handlers[hpos].callback (h->cls, |
967 | GNUNET_i2s (&smr->peer)); | 560 | &ntm->peer, |
968 | if (NULL == pr->th.peer) | 561 | em)) |
969 | { | 562 | { |
970 | /* request must have been cancelled between the original request | 563 | /* error in processing, do not process other messages! */ |
971 | * and the response from core, ignore core's readiness */ | ||
972 | break; | 564 | break; |
973 | } | 565 | } |
566 | } | ||
567 | if (NULL != h->inbound_notify) | ||
568 | h->inbound_notify (h->cls, | ||
569 | &ntm->peer, | ||
570 | em); | ||
571 | } | ||
974 | 572 | ||
975 | th = &pr->th; | 573 | |
976 | if (ntohs (smr->smr_id) != th->smr_id) | 574 | /** |
977 | { | 575 | * Check that message received from CORE service is well-formed. |
978 | /* READY message is for expired or cancelled message, | 576 | * |
979 | * ignore! (we should have already sent another request) */ | 577 | * @param cls the `struct GNUNET_CORE_Handle` |
980 | break; | 578 | * @param ntm the message we got |
981 | } | 579 | * @return #GNUNET_OK if the message is well-formed |
982 | if ( (NULL != pr->prev) || | 580 | */ |
983 | (NULL != pr->next) || | 581 | static int |
984 | (h->ready_peer_head == pr) ) | 582 | check_notify_outbound (void *cls, |
985 | { | 583 | const struct NotifyTrafficMessage *ntm) |
986 | /* we should not already be on the ready list... */ | 584 | { |
987 | GNUNET_break (0); | 585 | struct GNUNET_CORE_Handle *h = cls; |
988 | reconnect_later (h); | 586 | uint16_t msize; |
989 | return; | 587 | const struct GNUNET_MessageHeader *em; |
990 | } | 588 | |
991 | GNUNET_CONTAINER_DLL_insert (h->ready_peer_head, | 589 | GNUNET_break (GNUNET_NO == h->currently_down); |
992 | h->ready_peer_tail, | 590 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
993 | pr); | 591 | "Received outbound message from `%s'.\n", |
994 | trigger_next_request (h, GNUNET_NO); | 592 | GNUNET_i2s (&ntm->peer)); |
995 | break; | 593 | msize = ntohs (ntm->header.size) - sizeof (struct NotifyTrafficMessage); |
996 | default: | 594 | if (msize < sizeof (struct GNUNET_MessageHeader)) |
997 | reconnect_later (h); | 595 | { |
596 | GNUNET_break (0); | ||
597 | return GNUNET_SYSERR; | ||
598 | } | ||
599 | em = (const struct GNUNET_MessageHeader *) &ntm[1]; | ||
600 | if ( (GNUNET_NO == h->outbound_hdr_only) && | ||
601 | (msize != ntohs (em->size)) ) | ||
602 | { | ||
603 | GNUNET_break (0); | ||
604 | return GNUNET_SYSERR; | ||
605 | } | ||
606 | return GNUNET_OK; | ||
607 | } | ||
608 | |||
609 | |||
610 | /** | ||
611 | * Handle outbound message received from CORE service. If applicable, | ||
612 | * notify the application. | ||
613 | * | ||
614 | * @param cls the `struct GNUNET_CORE_Handle` | ||
615 | * @param ntm the message we got | ||
616 | */ | ||
617 | static void | ||
618 | handle_notify_outbound (void *cls, | ||
619 | const struct NotifyTrafficMessage *ntm) | ||
620 | { | ||
621 | struct GNUNET_CORE_Handle *h = cls; | ||
622 | const struct GNUNET_MessageHeader *em; | ||
623 | |||
624 | GNUNET_break (GNUNET_NO == h->currently_down); | ||
625 | em = (const struct GNUNET_MessageHeader *) &ntm[1]; | ||
626 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
627 | "Received notification about transmission to `%s'.\n", | ||
628 | GNUNET_i2s (&ntm->peer)); | ||
629 | if (NULL == h->outbound_notify) | ||
630 | { | ||
631 | GNUNET_break (0); | ||
998 | return; | 632 | return; |
999 | } | 633 | } |
1000 | GNUNET_CLIENT_receive (h->client, | 634 | h->outbound_notify (h->cls, |
1001 | &main_notify_handler, h, | 635 | &ntm->peer, |
1002 | GNUNET_TIME_UNIT_FOREVER_REL); | 636 | em); |
1003 | } | 637 | } |
1004 | 638 | ||
1005 | 639 | ||
1006 | /** | 640 | /** |
1007 | * Task executed once we are done transmitting the INIT message. | 641 | * Handle message received from CORE service notifying us that we are |
1008 | * Starts our 'receive' loop. | 642 | * now allowed to send a message to a peer. If that message is still |
643 | * pending, put it into the queue to be transmitted. | ||
1009 | * | 644 | * |
1010 | * @param cls the 'struct GNUNET_CORE_Handle' | 645 | * @param cls the `struct GNUNET_CORE_Handle` |
1011 | * @param success were we successful | 646 | * @param ntm the message we got |
1012 | */ | 647 | */ |
1013 | static void | 648 | static void |
1014 | init_done_task (void *cls, int success) | 649 | handle_send_ready (void *cls, |
650 | const struct SendMessageReady *smr) | ||
1015 | { | 651 | { |
1016 | struct GNUNET_CORE_Handle *h = cls; | 652 | struct GNUNET_CORE_Handle *h = cls; |
653 | struct PeerRecord *pr; | ||
654 | struct GNUNET_CORE_TransmitHandle *th; | ||
655 | struct SendMessage *sm; | ||
656 | struct GNUNET_MQ_Envelope *env; | ||
657 | struct GNUNET_TIME_Relative delay; | ||
658 | struct GNUNET_TIME_Relative overdue; | ||
659 | unsigned int ret; | ||
1017 | 660 | ||
1018 | if (GNUNET_SYSERR == success) | 661 | GNUNET_break (GNUNET_NO == h->currently_down); |
1019 | return; /* shutdown */ | 662 | pr = GNUNET_CONTAINER_multipeermap_get (h->peers, |
1020 | if (GNUNET_NO == success) | 663 | &smr->peer); |
664 | if (NULL == pr) | ||
1021 | { | 665 | { |
1022 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 666 | GNUNET_break (0); |
1023 | "Failed to exchange INIT with core, retrying\n"); | 667 | reconnect_later (h); |
1024 | if (h->reconnect_task == NULL) | 668 | return; |
1025 | reconnect_later (h); | 669 | } |
670 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
671 | "Received notification about transmission readiness to `%s'.\n", | ||
672 | GNUNET_i2s (&smr->peer)); | ||
673 | if (NULL == pr->th.peer) | ||
674 | { | ||
675 | /* request must have been cancelled between the original request | ||
676 | * and the response from CORE, ignore CORE's readiness */ | ||
1026 | return; | 677 | return; |
1027 | } | 678 | } |
1028 | GNUNET_CLIENT_receive (h->client, | 679 | th = &pr->th; |
1029 | &main_notify_handler, h, | 680 | if (ntohs (smr->smr_id) != th->smr_id) |
1030 | GNUNET_TIME_UNIT_FOREVER_REL); | 681 | { |
682 | /* READY message is for expired or cancelled message, | ||
683 | * ignore! (we should have already sent another request) */ | ||
684 | return; | ||
685 | } | ||
686 | /* ok, all good, send message out! */ | ||
687 | th->peer = NULL; | ||
688 | env = GNUNET_MQ_msg_extra (sm, | ||
689 | th->msize, | ||
690 | GNUNET_MESSAGE_TYPE_CORE_SEND); | ||
691 | sm->priority = htonl ((uint32_t) th->priority); | ||
692 | sm->deadline = GNUNET_TIME_absolute_hton (th->deadline); | ||
693 | sm->peer = pr->peer; | ||
694 | sm->cork = htonl ((uint32_t) th->cork); | ||
695 | sm->reserved = htonl (0); | ||
696 | ret = th->get_message (th->get_message_cls, | ||
697 | th->msize, | ||
698 | &sm[1]); | ||
699 | GNUNET_assert (ret == th->msize); /* NOTE: API change! */ | ||
700 | delay = GNUNET_TIME_absolute_get_duration (th->request_time); | ||
701 | overdue = GNUNET_TIME_absolute_get_duration (th->deadline); | ||
702 | if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us) | ||
703 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
704 | "Transmitting overdue %u bytes to `%s' at priority %u with %s delay %s\n", | ||
705 | ret, | ||
706 | GNUNET_i2s (&pr->peer), | ||
707 | (unsigned int) th->priority, | ||
708 | GNUNET_STRINGS_relative_time_to_string (delay, | ||
709 | GNUNET_YES), | ||
710 | (th->cork) ? " (corked)" : ""); | ||
711 | else | ||
712 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
713 | "Transmitting %u bytes to `%s' at priority %u with %s delay %s\n", | ||
714 | ret, | ||
715 | GNUNET_i2s (&pr->peer), | ||
716 | (unsigned int) th->priority, | ||
717 | GNUNET_STRINGS_relative_time_to_string (delay, | ||
718 | GNUNET_YES), | ||
719 | (th->cork) ? " (corked)" : ""); | ||
720 | GNUNET_MQ_send (h->mq, | ||
721 | env); | ||
1031 | } | 722 | } |
1032 | 723 | ||
1033 | 724 | ||
1034 | /** | 725 | /** |
1035 | * Our current client connection went down. Clean it up | 726 | * Our current client connection went down. Clean it up and try to |
1036 | * and try to reconnect! | 727 | * reconnect! |
1037 | * | 728 | * |
1038 | * @param h our handle to the core service | 729 | * @param h our handle to the core service |
1039 | */ | 730 | */ |
1040 | static void | 731 | static void |
1041 | reconnect (struct GNUNET_CORE_Handle *h) | 732 | reconnect (struct GNUNET_CORE_Handle *h) |
1042 | { | 733 | { |
1043 | struct ControlMessage *cm; | 734 | GNUNET_MQ_hd_fixed_size (init_reply, |
735 | GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY, | ||
736 | struct InitReplyMessage); | ||
737 | GNUNET_MQ_hd_fixed_size (connect_notify, | ||
738 | GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT, | ||
739 | struct ConnectNotifyMessage); | ||
740 | GNUNET_MQ_hd_fixed_size (disconnect_notify, | ||
741 | GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT, | ||
742 | struct DisconnectNotifyMessage); | ||
743 | GNUNET_MQ_hd_var_size (notify_inbound, | ||
744 | GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND, | ||
745 | struct NotifyTrafficMessage); | ||
746 | GNUNET_MQ_hd_var_size (notify_outbound, | ||
747 | GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND, | ||
748 | struct NotifyTrafficMessage); | ||
749 | GNUNET_MQ_hd_fixed_size (send_ready, | ||
750 | GNUNET_MESSAGE_TYPE_CORE_SEND_READY, | ||
751 | struct SendMessageReady); | ||
752 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
753 | make_init_reply_handler (h), | ||
754 | make_connect_notify_handler (h), | ||
755 | make_disconnect_notify_handler (h), | ||
756 | make_notify_inbound_handler (h), | ||
757 | make_notify_outbound_handler (h), | ||
758 | make_send_ready_handler (h), | ||
759 | GNUNET_MQ_handler_end () | ||
760 | }; | ||
1044 | struct InitMessage *init; | 761 | struct InitMessage *init; |
762 | struct GNUNET_MQ_Envelope *env; | ||
1045 | uint32_t opt; | 763 | uint32_t opt; |
1046 | uint16_t msize; | ||
1047 | uint16_t *ts; | 764 | uint16_t *ts; |
1048 | unsigned int hpos; | ||
1049 | 765 | ||
1050 | GNUNET_assert (NULL == h->client); | 766 | GNUNET_assert (NULL == h->mq); |
1051 | GNUNET_assert (GNUNET_YES == h->currently_down); | 767 | GNUNET_assert (GNUNET_YES == h->currently_down); |
1052 | GNUNET_assert (NULL != h->cfg); | 768 | h->mq = GNUNET_CLIENT_connecT (h->cfg, |
1053 | h->client = GNUNET_CLIENT_connect ("core", h->cfg); | 769 | "core", |
1054 | if (NULL == h->client) | 770 | handlers, |
771 | &handle_mq_error, | ||
772 | h); | ||
773 | if (NULL == h->mq) | ||
1055 | { | 774 | { |
1056 | reconnect_later (h); | 775 | reconnect_later (h); |
1057 | return; | 776 | return; |
1058 | } | 777 | } |
1059 | msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage); | 778 | env = GNUNET_MQ_msg_extra (init, |
1060 | cm = GNUNET_malloc (sizeof (struct ControlMessage) + msize); | 779 | sizeof (uint16_t) * h->hcnt, |
1061 | cm->cont = &init_done_task; | 780 | GNUNET_MESSAGE_TYPE_CORE_INIT); |
1062 | cm->cont_cls = h; | ||
1063 | init = (struct InitMessage *) &cm[1]; | ||
1064 | init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT); | ||
1065 | init->header.size = htons (msize); | ||
1066 | opt = 0; | 781 | opt = 0; |
1067 | if (NULL != h->inbound_notify) | 782 | if (NULL != h->inbound_notify) |
1068 | { | 783 | { |
@@ -1081,22 +796,18 @@ reconnect (struct GNUNET_CORE_Handle *h) | |||
1081 | LOG (GNUNET_ERROR_TYPE_INFO, | 796 | LOG (GNUNET_ERROR_TYPE_INFO, |
1082 | "(Re)connecting to CORE service, monitoring messages of type %u\n", | 797 | "(Re)connecting to CORE service, monitoring messages of type %u\n", |
1083 | opt); | 798 | opt); |
1084 | |||
1085 | init->options = htonl (opt); | 799 | init->options = htonl (opt); |
1086 | ts = (uint16_t *) & init[1]; | 800 | ts = (uint16_t *) &init[1]; |
1087 | for (hpos = 0; hpos < h->hcnt; hpos++) | 801 | for (unsigned int hpos = 0; hpos < h->hcnt; hpos++) |
1088 | ts[hpos] = htons (h->handlers[hpos].type); | 802 | ts[hpos] = htons (h->handlers[hpos].type); |
1089 | GNUNET_CONTAINER_DLL_insert (h->control_pending_head, | 803 | GNUNET_MQ_send (h->mq, |
1090 | h->control_pending_tail, | 804 | env); |
1091 | cm); | ||
1092 | trigger_next_request (h, GNUNET_YES); | ||
1093 | } | 805 | } |
1094 | 806 | ||
1095 | 807 | ||
1096 | |||
1097 | /** | 808 | /** |
1098 | * Connect to the core service. Note that the connection may | 809 | * Connect to the core service. Note that the connection may complete |
1099 | * complete (or fail) asynchronously. | 810 | * (or fail) asynchronously. |
1100 | * | 811 | * |
1101 | * @param cfg configuration to use | 812 | * @param cfg configuration to use |
1102 | * @param cls closure for the various callbacks that follow (including handlers in the handlers array) | 813 | * @param cls closure for the various callbacks that follow (including handlers in the handlers array) |
@@ -1129,8 +840,8 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1129 | const struct GNUNET_CORE_MessageHandler *handlers) | 840 | const struct GNUNET_CORE_MessageHandler *handlers) |
1130 | { | 841 | { |
1131 | struct GNUNET_CORE_Handle *h; | 842 | struct GNUNET_CORE_Handle *h; |
843 | unsigned int hcnt; | ||
1132 | 844 | ||
1133 | GNUNET_assert (NULL != cfg); | ||
1134 | h = GNUNET_new (struct GNUNET_CORE_Handle); | 845 | h = GNUNET_new (struct GNUNET_CORE_Handle); |
1135 | h->cfg = cfg; | 846 | h->cfg = cfg; |
1136 | h->cls = cls; | 847 | h->cls = cls; |
@@ -1141,14 +852,20 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1141 | h->outbound_notify = outbound_notify; | 852 | h->outbound_notify = outbound_notify; |
1142 | h->inbound_hdr_only = inbound_hdr_only; | 853 | h->inbound_hdr_only = inbound_hdr_only; |
1143 | h->outbound_hdr_only = outbound_hdr_only; | 854 | h->outbound_hdr_only = outbound_hdr_only; |
1144 | h->handlers = handlers; | ||
1145 | h->hcnt = 0; | ||
1146 | h->currently_down = GNUNET_YES; | 855 | h->currently_down = GNUNET_YES; |
1147 | h->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO); | 856 | h->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO); |
857 | hcnt = 0; | ||
858 | if (NULL != handlers) | ||
859 | while (NULL != handlers[hcnt].callback) | ||
860 | hcnt++; | ||
861 | h->handlers = GNUNET_new_array (hcnt + 1, | ||
862 | struct GNUNET_CORE_MessageHandler); | ||
1148 | if (NULL != handlers) | 863 | if (NULL != handlers) |
1149 | while (NULL != handlers[h->hcnt].callback) | 864 | memcpy (h->handlers, |
1150 | h->hcnt++; | 865 | handlers, |
1151 | GNUNET_assert (h->hcnt < | 866 | hcnt * sizeof (struct GNUNET_CORE_MessageHandler)); |
867 | h->hcnt = hcnt; | ||
868 | GNUNET_assert (hcnt < | ||
1152 | (GNUNET_SERVER_MAX_MESSAGE_SIZE - | 869 | (GNUNET_SERVER_MAX_MESSAGE_SIZE - |
1153 | sizeof (struct InitMessage)) / sizeof (uint16_t)); | 870 | sizeof (struct InitMessage)) / sizeof (uint16_t)); |
1154 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 871 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
@@ -1168,63 +885,29 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1168 | void | 885 | void |
1169 | GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle) | 886 | GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle) |
1170 | { | 887 | { |
1171 | struct ControlMessage *cm; | ||
1172 | |||
1173 | GNUNET_assert (NULL != handle); | ||
1174 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 888 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1175 | "Disconnecting from CORE service\n"); | 889 | "Disconnecting from CORE service\n"); |
1176 | if (NULL != handle->cth) | ||
1177 | { | ||
1178 | GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth); | ||
1179 | handle->cth = NULL; | ||
1180 | } | ||
1181 | while (NULL != (cm = handle->control_pending_head)) | ||
1182 | { | ||
1183 | GNUNET_CONTAINER_DLL_remove (handle->control_pending_head, | ||
1184 | handle->control_pending_tail, | ||
1185 | cm); | ||
1186 | if (NULL != cm->th) | ||
1187 | cm->th->cm = NULL; | ||
1188 | if (NULL != cm->cont) | ||
1189 | cm->cont (cm->cont_cls, GNUNET_SYSERR); | ||
1190 | GNUNET_free (cm); | ||
1191 | } | ||
1192 | if (NULL != handle->client) | ||
1193 | { | ||
1194 | GNUNET_CLIENT_disconnect (handle->client); | ||
1195 | handle->client = NULL; | ||
1196 | } | ||
1197 | GNUNET_CONTAINER_multipeermap_iterate (handle->peers, | 890 | GNUNET_CONTAINER_multipeermap_iterate (handle->peers, |
1198 | &disconnect_and_free_peer_entry, | 891 | &disconnect_and_free_peer_entry, |
1199 | handle); | 892 | handle); |
893 | GNUNET_CONTAINER_multipeermap_destroy (handle->peers); | ||
894 | handle->peers = NULL; | ||
1200 | if (NULL != handle->reconnect_task) | 895 | if (NULL != handle->reconnect_task) |
1201 | { | 896 | { |
1202 | GNUNET_SCHEDULER_cancel (handle->reconnect_task); | 897 | GNUNET_SCHEDULER_cancel (handle->reconnect_task); |
1203 | handle->reconnect_task = NULL; | 898 | handle->reconnect_task = NULL; |
1204 | } | 899 | } |
1205 | GNUNET_CONTAINER_multipeermap_destroy (handle->peers); | 900 | if (NULL != handle->mq) |
1206 | handle->peers = NULL; | 901 | { |
1207 | GNUNET_break (NULL == handle->ready_peer_head); | 902 | GNUNET_MQ_destroy (handle->mq); |
903 | handle->mq = NULL; | ||
904 | } | ||
905 | GNUNET_free (handle->handlers); | ||
1208 | GNUNET_free (handle); | 906 | GNUNET_free (handle); |
1209 | } | 907 | } |
1210 | 908 | ||
1211 | 909 | ||
1212 | /** | 910 | /** |
1213 | * Task that calls #request_next_transmission(). | ||
1214 | * | ||
1215 | * @param cls the `struct PeerRecord *` | ||
1216 | */ | ||
1217 | static void | ||
1218 | run_request_next_transmission (void *cls) | ||
1219 | { | ||
1220 | struct PeerRecord *pr = cls; | ||
1221 | |||
1222 | pr->ntr_task = NULL; | ||
1223 | request_next_transmission (pr); | ||
1224 | } | ||
1225 | |||
1226 | |||
1227 | /** | ||
1228 | * Ask the core to call @a notify once it is ready to transmit the | 911 | * Ask the core to call @a notify once it is ready to transmit the |
1229 | * given number of bytes to the specified @a target. Must only be | 912 | * given number of bytes to the specified @a target. Must only be |
1230 | * called after a connection to the respective peer has been | 913 | * called after a connection to the respective peer has been |
@@ -1261,13 +944,16 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, | |||
1261 | { | 944 | { |
1262 | struct PeerRecord *pr; | 945 | struct PeerRecord *pr; |
1263 | struct GNUNET_CORE_TransmitHandle *th; | 946 | struct GNUNET_CORE_TransmitHandle *th; |
947 | struct SendMessageRequest *smr; | ||
948 | struct GNUNET_MQ_Envelope *env; | ||
1264 | 949 | ||
1265 | if (notify_size > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) | 950 | GNUNET_assert (NULL != notify); |
951 | if ( (notify_size > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) || | ||
952 | (notify_size + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ) | ||
1266 | { | 953 | { |
1267 | GNUNET_break (0); | 954 | GNUNET_break (0); |
1268 | return NULL; | 955 | return NULL; |
1269 | } | 956 | } |
1270 | GNUNET_assert (NULL != notify); | ||
1271 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 957 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1272 | "Asking core for transmission of %u bytes to `%s'\n", | 958 | "Asking core for transmission of %u bytes to `%s'\n", |
1273 | (unsigned int) notify_size, | 959 | (unsigned int) notify_size, |
@@ -1286,10 +972,10 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, | |||
1286 | GNUNET_break (0); | 972 | GNUNET_break (0); |
1287 | return NULL; | 973 | return NULL; |
1288 | } | 974 | } |
1289 | GNUNET_assert (notify_size + sizeof (struct SendMessage) < | ||
1290 | GNUNET_SERVER_MAX_MESSAGE_SIZE); | ||
1291 | th = &pr->th; | 975 | th = &pr->th; |
1292 | memset (th, 0, sizeof (struct GNUNET_CORE_TransmitHandle)); | 976 | memset (th, |
977 | 0, | ||
978 | sizeof (struct GNUNET_CORE_TransmitHandle)); | ||
1293 | th->peer = pr; | 979 | th->peer = pr; |
1294 | th->get_message = notify; | 980 | th->get_message = notify; |
1295 | th->get_message_cls = notify_cls; | 981 | th->get_message_cls = notify_cls; |
@@ -1301,9 +987,16 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, | |||
1301 | th->priority = priority; | 987 | th->priority = priority; |
1302 | th->msize = notify_size; | 988 | th->msize = notify_size; |
1303 | th->cork = cork; | 989 | th->cork = cork; |
1304 | GNUNET_assert (NULL == pr->ntr_task); | 990 | env = GNUNET_MQ_msg (smr, |
1305 | pr->ntr_task = | 991 | GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST); |
1306 | GNUNET_SCHEDULER_add_now (&run_request_next_transmission, pr); | 992 | smr->priority = htonl ((uint32_t) th->priority); |
993 | smr->deadline = GNUNET_TIME_absolute_hton (th->deadline); | ||
994 | smr->peer = pr->peer; | ||
995 | smr->reserved = htonl (0); | ||
996 | smr->size = htons (th->msize); | ||
997 | smr->smr_id = htons (th->smr_id = pr->smr_id_gen++); | ||
998 | GNUNET_MQ_send (handle->mq, | ||
999 | env); | ||
1307 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1000 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1308 | "Transmission request added to queue\n"); | 1001 | "Transmission request added to queue\n"); |
1309 | return th; | 1002 | return th; |
@@ -1319,41 +1012,12 @@ void | |||
1319 | GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th) | 1012 | GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th) |
1320 | { | 1013 | { |
1321 | struct PeerRecord *pr = th->peer; | 1014 | struct PeerRecord *pr = th->peer; |
1322 | struct GNUNET_CORE_Handle *h; | ||
1323 | 1015 | ||
1324 | GNUNET_assert (NULL != th); | ||
1325 | GNUNET_assert (NULL != pr); | ||
1326 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1016 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1327 | "Aborting transmission request to core for %u bytes to `%s'\n", | 1017 | "Aborting transmission request to core for %u bytes to `%s'\n", |
1328 | (unsigned int) th->msize, | 1018 | (unsigned int) th->msize, |
1329 | GNUNET_i2s (&pr->peer)); | 1019 | GNUNET_i2s (&pr->peer)); |
1330 | th->peer = NULL; | 1020 | th->peer = NULL; |
1331 | h = pr->ch; | ||
1332 | if (NULL != th->cm) | ||
1333 | { | ||
1334 | /* we're currently in the control queue, remove */ | ||
1335 | GNUNET_CONTAINER_DLL_remove (h->control_pending_head, | ||
1336 | h->control_pending_tail, | ||
1337 | th->cm); | ||
1338 | GNUNET_free (th->cm); | ||
1339 | th->cm = NULL; | ||
1340 | } | ||
1341 | if ( (NULL != pr->prev) || | ||
1342 | (NULL != pr->next) || | ||
1343 | (pr == h->ready_peer_head) ) | ||
1344 | { | ||
1345 | /* the request that was 'approved' by core was | ||
1346 | * canceled before it could be transmitted; remove | ||
1347 | * us from the 'ready' list */ | ||
1348 | GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, | ||
1349 | h->ready_peer_tail, | ||
1350 | pr); | ||
1351 | } | ||
1352 | if (NULL != pr->ntr_task) | ||
1353 | { | ||
1354 | GNUNET_SCHEDULER_cancel (pr->ntr_task); | ||
1355 | pr->ntr_task = NULL; | ||
1356 | } | ||
1357 | } | 1021 | } |
1358 | 1022 | ||
1359 | 1023 | ||
@@ -1376,9 +1040,8 @@ int | |||
1376 | GNUNET_CORE_is_peer_connected_sync (const struct GNUNET_CORE_Handle *h, | 1040 | GNUNET_CORE_is_peer_connected_sync (const struct GNUNET_CORE_Handle *h, |
1377 | const struct GNUNET_PeerIdentity *pid) | 1041 | const struct GNUNET_PeerIdentity *pid) |
1378 | { | 1042 | { |
1379 | GNUNET_assert (NULL != h); | 1043 | return GNUNET_CONTAINER_multipeermap_contains (h->peers, |
1380 | GNUNET_assert (NULL != pid); | 1044 | pid); |
1381 | return GNUNET_CONTAINER_multipeermap_contains (h->peers, pid); | ||
1382 | } | 1045 | } |
1383 | 1046 | ||
1384 | 1047 | ||