aboutsummaryrefslogtreecommitdiff
path: root/src/core/core_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/core_api.c')
-rw-r--r--src/core/core_api.c1537
1 files changed, 1134 insertions, 403 deletions
diff --git a/src/core/core_api.c b/src/core/core_api.c
index a1e6aea65..9500a1316 100644
--- a/src/core/core_api.c
+++ b/src/core/core_api.c
@@ -23,6 +23,9 @@
23 * @brief core service; this is the main API for encrypted P2P 23 * @brief core service; this is the main API for encrypted P2P
24 * communications 24 * communications
25 * @author Christian Grothoff 25 * @author Christian Grothoff
26 *
27 * TODO:
28 * - implement atsi parsing and passing
26 */ 29 */
27#include "platform.h" 30#include "platform.h"
28#include "gnunet_constants.h" 31#include "gnunet_constants.h"
@@ -31,12 +34,124 @@
31 34
32 35
33/** 36/**
37 * Information we track for each peer.
38 */
39struct PeerRecord
40{
41
42 /**
43 * We generally do NOT keep peer records in a DLL; this
44 * DLL is only used IF this peer's 'pending_head' message
45 * is ready for transmission.
46 */
47 struct PeerRecord *prev;
48
49 /**
50 * We generally do NOT keep peer records in a DLL; this
51 * DLL is only used IF this peer's 'pending_head' message
52 * is ready for transmission.
53 */
54 struct PeerRecord *next;
55
56 /**
57 * Peer the record is about.
58 */
59 struct GNUNET_PeerIdentity peer;
60
61 /**
62 * Corresponding core handle.
63 */
64 struct GNUNET_CORE_Handle *ch;
65
66 /**
67 * Head of doubly-linked list of pending requests.
68 * Requests are sorted by deadline *except* for HEAD,
69 * which is only modified upon transmission to core.
70 */
71 struct GNUNET_CORE_TransmitHandle *pending_head;
72
73 /**
74 * Tail of doubly-linked list of pending requests.
75 */
76 struct GNUNET_CORE_TransmitHandle *pending_tail;
77
78 /**
79 * Pending callback waiting for peer information, or NULL for none.
80 */
81 GNUNET_CORE_PeerConfigurationInfoCallback pcic;
82
83 /**
84 * Closure for pcic.
85 */
86 void *pcic_cls;
87
88 /**
89 * Request information ID for the given pcic (needed in case a
90 * request is cancelled after being submitted to core and a new
91 * one is generated; in this case, we need to avoid matching the
92 * reply to the first (cancelled) request to the second request).
93 */
94 uint32_t rim_id;
95
96 /**
97 * ID of timeout task for the 'pending_head' handle
98 * which is the one with the smallest timeout.
99 */
100 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
101
102 /**
103 * Current size of the queue of pending requests.
104 */
105 unsigned int queue_size;
106
107 /**
108 * SendMessageRequest ID generator for this peer.
109 */
110 uint16_t smr_id_gen;
111
112};
113
114
115/**
116 * Entry in a doubly-linked list of control messages to be transmitted
117 * to the core service. Control messages include traffic allocation,
118 * connection requests and of course our initial 'init' request.
119 *
120 * The actual message is allocated at the end of this struct.
121 */
122struct ControlMessage
123{
124 /**
125 * This is a doubly-linked list.
126 */
127 struct ControlMessage *next;
128
129 /**
130 * This is a doubly-linked list.
131 */
132 struct ControlMessage *prev;
133
134 /**
135 * Function to run after successful transmission (or call with
136 * reason 'TIMEOUT' on error).
137 */
138 GNUNET_SCHEDULER_Task cont;
139
140 /**
141 * Closure for 'cont'.
142 */
143 void *cont_cls;
144
145};
146
147
148
149/**
34 * Context for the core service connection. 150 * Context for the core service connection.
35 */ 151 */
36struct GNUNET_CORE_Handle 152struct GNUNET_CORE_Handle
37{ 153{
38 154
39
40 /** 155 /**
41 * Configuration we're using. 156 * Configuration we're using.
42 */ 157 */
@@ -83,9 +198,9 @@ struct GNUNET_CORE_Handle
83 const struct GNUNET_CORE_MessageHandler *handlers; 198 const struct GNUNET_CORE_MessageHandler *handlers;
84 199
85 /** 200 /**
86 * Our connection to the service for notifications. 201 * Our connection to the service.
87 */ 202 */
88 struct GNUNET_CLIENT_Connection *client_notifications; 203 struct GNUNET_CLIENT_Connection *client;
89 204
90 /** 205 /**
91 * Handle for our current transmission request. 206 * Handle for our current transmission request.
@@ -95,41 +210,43 @@ struct GNUNET_CORE_Handle
95 /** 210 /**
96 * Head of doubly-linked list of pending requests. 211 * Head of doubly-linked list of pending requests.
97 */ 212 */
98 struct GNUNET_CORE_TransmitHandle *pending_head; 213 struct ControlMessage *pending_head;
99 214
100 /** 215 /**
101 * Tail of doubly-linked list of pending requests. 216 * Tail of doubly-linked list of pending requests.
102 */ 217 */
103 struct GNUNET_CORE_TransmitHandle *pending_tail; 218 struct ControlMessage *pending_tail;
104 219
105 /** 220 /**
106 * Currently submitted request (or NULL) 221 * Head of doubly-linked list of peers that are core-approved
222 * to send their next message.
107 */ 223 */
108 struct GNUNET_CORE_TransmitHandle *submitted; 224 struct PeerRecord *ready_peer_head;
109 225
110 /** 226 /**
111 * Currently submitted request based on solicitation (or NULL) 227 * Tail of doubly-linked list of peers that are core-approved
228 * to send their next message.
112 */ 229 */
113 struct GNUNET_CORE_TransmitHandle *solicit_transmit_req; 230 struct PeerRecord *ready_peer_tail;
114 231
115 /** 232 /**
116 * Buffer where we store a message for transmission in response 233 * Hash map listing all of the peers that we are currently
117 * to a traffic solicitation (or NULL). 234 * connected to.
118 */ 235 */
119 char *solicit_buffer; 236 struct GNUNET_CONTAINER_MultiHashMap *peers;
120 237
121 /** 238 /**
122 * How long to wait until we time out the connection attempt? 239 * ID of reconnect task (if any).
123 */ 240 */
124 struct GNUNET_TIME_Absolute startup_timeout; 241 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
125 242
126 /** 243 /**
127 * ID of reconnect task (if any). 244 * Request information ID generator.
128 */ 245 */
129 GNUNET_SCHEDULER_TaskIdentifier reconnect_task; 246 uint32_t rim_id_gen;
130 247
131 /** 248 /**
132 * Number of messages we should queue per target. 249 * Number of messages we are allowed to queue per target.
133 */ 250 */
134 unsigned int queue_size; 251 unsigned int queue_size;
135 252
@@ -155,6 +272,7 @@ struct GNUNET_CORE_Handle
155 * requests? 272 * requests?
156 */ 273 */
157 int currently_down; 274 int currently_down;
275
158}; 276};
159 277
160 278
@@ -175,9 +293,15 @@ struct GNUNET_CORE_TransmitHandle
175 struct GNUNET_CORE_TransmitHandle *prev; 293 struct GNUNET_CORE_TransmitHandle *prev;
176 294
177 /** 295 /**
178 * Corresponding core handle. 296 * Corresponding peer record.
179 */ 297 */
180 struct GNUNET_CORE_Handle *ch; 298 struct PeerRecord *peer;
299
300 /**
301 * Corresponding SEND_REQUEST message. Only non-NULL
302 * while SEND_REQUEST message is pending.
303 */
304 struct ControlMessage *cm;
181 305
182 /** 306 /**
183 * Function that will be called to get the actual request 307 * Function that will be called to get the actual request
@@ -193,32 +317,11 @@ struct GNUNET_CORE_TransmitHandle
193 void *get_message_cls; 317 void *get_message_cls;
194 318
195 /** 319 /**
196 * If this entry is for a transmission request, pointer
197 * to the notify callback; otherwise NULL.
198 */
199 GNUNET_CONNECTION_TransmitReadyNotify notify;
200
201 /**
202 * Closure for notify.
203 */
204 void *notify_cls;
205
206 /**
207 * Peer the request is about.
208 */
209 struct GNUNET_PeerIdentity peer;
210
211 /**
212 * Timeout for this handle. 320 * Timeout for this handle.
213 */ 321 */
214 struct GNUNET_TIME_Absolute timeout; 322 struct GNUNET_TIME_Absolute timeout;
215 323
216 /** 324 /**
217 * ID of timeout task.
218 */
219 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
220
221 /**
222 * How important is this message? 325 * How important is this message?
223 */ 326 */
224 uint32_t priority; 327 uint32_t priority;
@@ -228,54 +331,127 @@ struct GNUNET_CORE_TransmitHandle
228 */ 331 */
229 uint16_t msize; 332 uint16_t msize;
230 333
334 /**
335 * Send message request ID for this request.
336 */
337 uint16_t smr_id;
231 338
232}; 339};
233 340
234 341
342/**
343 * Our current client connection went down. Clean it up
344 * and try to reconnect!
345 *
346 * @param h our handle to the core service
347 */
348static void
349reconnect (struct GNUNET_CORE_Handle *h);
350
351
352/**
353 * Task schedule to try to re-connect to core.
354 *
355 * @param cls the 'struct GNUNET_CORE_Handle'
356 * @param tc task context
357 */
358static void
359reconnect_task (void *cls,
360 const struct GNUNET_SCHEDULER_TaskContext *tc)
361{
362 struct GNUNET_CORE_Handle *h = cls;
363
364 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
365 reconnect (h);
366}
367
368
369/**
370 * Check the list of pending requests, send the next
371 * one to the core.
372 *
373 * @param h core handle
374 */
235static void 375static void
236reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); 376trigger_next_request (struct GNUNET_CORE_Handle *h);
237 377
238 378
239/** 379/**
240 * Function called when we are ready to transmit our 380 * The given request hit its timeout. Remove from the
241 * "START" message (or when this operation timed out). 381 * doubly-linked list and call the respective continuation.
242 * 382 *
243 * @param cls closure 383 * @param cls the transmit handle of the request that timed out
244 * @param size number of bytes available in buf 384 * @param tc context, can be NULL (!)
245 * @param buf where the callee should write the message
246 * @return number of bytes written to buf
247 */ 385 */
248static size_t transmit_start (void *cls, size_t size, void *buf); 386static void
387transmission_timeout (void *cls,
388 const struct GNUNET_SCHEDULER_TaskContext *tc);
249 389
250 390
251/** 391/**
252 * Our current client connection went down. Clean it up 392 * Control message was sent, mark it as such.
253 * and try to reconnect!
254 * 393 *
255 * @param h our handle to the core service 394 * @param cls the 'struct GNUNET_CORE_TransmitHandle*'
395 * @param tc scheduler context
256 */ 396 */
257static void 397static void
258reconnect (struct GNUNET_CORE_Handle *h) 398mark_control_message_sent (void *cls,
399 const struct GNUNET_SCHEDULER_TaskContext *tc)
259{ 400{
260#if DEBUG_CORE 401 struct GNUNET_CORE_TransmitHandle *th = cls;
261 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 402
262 "Reconnecting to CORE service\n"); 403 th->cm = NULL;
263#endif 404}
264 if (h->client_notifications != NULL) 405
265 GNUNET_CLIENT_disconnect (h->client_notifications, GNUNET_NO); 406
266 h->currently_down = GNUNET_YES; 407/**
267 h->client_notifications = GNUNET_CLIENT_connect ("core", h->cfg); 408 * Send a control message to the peer asking for transmission
268 if (h->client_notifications == NULL) 409 * of the message in the given peer record.
269 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, 410 *
270 &reconnect_task, 411 * @param pr peer to request transmission to
271 h); 412 */
272 else 413static void
273 h->cth = GNUNET_CLIENT_notify_transmit_ready (h->client_notifications, 414request_next_transmission (struct PeerRecord *pr)
274 sizeof (struct InitMessage) + 415{
275 sizeof (uint16_t) * h->hcnt, 416 struct GNUNET_CORE_Handle *h = pr->ch;
276 GNUNET_TIME_UNIT_SECONDS, 417 struct ControlMessage *cm;
277 GNUNET_NO, 418 struct SendMessageRequest *smr;
278 &transmit_start, h); 419 struct GNUNET_CORE_TransmitHandle *th;
420
421 if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
422 {
423 GNUNET_SCHEDULER_cancel (pr->timeout_task);
424 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
425 }
426 if (NULL == (th = pr->pending_head))
427 {
428 trigger_next_request (h);
429 return;
430 }
431 GNUNET_assert (pr->prev == NULL);
432 GNUNET_assert (pr->next == NULL);
433 pr->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (th->timeout),
434 &transmission_timeout,
435 pr);
436 cm = GNUNET_malloc (sizeof (struct ControlMessage) +
437 sizeof (struct SendMessageRequest));
438 cm->cont = &mark_control_message_sent;
439 cm->cont_cls = th;
440 th->cm = cm;
441 smr = (struct SendMessageRequest*) &cm[1];
442 smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST);
443 smr->header.size = htons (sizeof (struct SendMessageRequest));
444 smr->priority = htonl (th->priority);
445 smr->deadline = GNUNET_TIME_absolute_hton (th->timeout);
446 smr->peer = pr->peer;
447 smr->queue_size = htonl (pr->queue_size);
448 smr->size = htons (th->msize);
449 smr->smr_id = htons (th->smr_id = pr->smr_id_gen++);
450 GNUNET_CONTAINER_DLL_insert_after (h->pending_head,
451 h->pending_tail,
452 h->pending_tail,
453 cm);
454 trigger_next_request (h);
279} 455}
280 456
281 457
@@ -287,97 +463,225 @@ reconnect (struct GNUNET_CORE_Handle *h)
287 * @param tc context, can be NULL (!) 463 * @param tc context, can be NULL (!)
288 */ 464 */
289static void 465static void
290timeout_request (void *cls, 466transmission_timeout (void *cls,
291 const struct GNUNET_SCHEDULER_TaskContext *tc) 467 const struct GNUNET_SCHEDULER_TaskContext *tc)
292{ 468{
293 struct GNUNET_CORE_TransmitHandle *th = cls; 469 struct PeerRecord *pr = cls;
470 struct GNUNET_CORE_TransmitHandle *th;
294 471
295 th->timeout_task = GNUNET_SCHEDULER_NO_TASK; 472 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
296 GNUNET_CONTAINER_DLL_remove (th->ch->pending_head, 473 th = pr->pending_head;
297 th->ch->pending_tail, 474 GNUNET_CONTAINER_DLL_remove (pr->pending_head,
475 pr->pending_tail,
298 th); 476 th);
477 pr->queue_size--;
299#if DEBUG_CORE 478#if DEBUG_CORE
300 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 479 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
301 "Signalling timeout of request for transmission to CORE service\n"); 480 "Signalling timeout of request for transmission to CORE service\n");
302#endif 481#endif
303 GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL)); 482 GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL));
483 request_next_transmission (pr);
304} 484}
305 485
306 486
307/** 487/**
308 * Function called when we are ready to transmit a request from our 488 * Transmit the next message to the core service.
309 * request list (or when this operation timed out).
310 *
311 * @param cls closure
312 * @param size number of bytes available in buf
313 * @param buf where the callee should write the message
314 * @return number of bytes written to buf
315 */ 489 */
316static size_t 490static size_t
317request_start (void *cls, size_t size, void *buf) 491transmit_message (void *cls,
492 size_t size,
493 void *buf)
318{ 494{
319 struct GNUNET_CORE_Handle *h = cls; 495 struct GNUNET_CORE_Handle *h = cls;
496 struct ControlMessage *cm;
320 struct GNUNET_CORE_TransmitHandle *th; 497 struct GNUNET_CORE_TransmitHandle *th;
498 struct PeerRecord *pr;
499 struct SendMessage *sm;
500 const struct GNUNET_MessageHeader *hdr;
501 uint16_t msize;
321 size_t ret; 502 size_t ret;
322 503
323 h->cth = NULL; 504 h->cth = NULL;
324 th = h->pending_head;
325 if (th == NULL)
326 return 0;
327 if (buf == NULL) 505 if (buf == NULL)
328 { 506 {
329 if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK) 507 reconnect (h);
508 return 0;
509 }
510 /* first check for control messages */
511 if (NULL != (cm = h->pending_head))
512 {
513 hdr = (const struct GNUNET_MessageHeader*) &cm[1];
514 msize = ntohs (hdr->size);
515 if (size < msize)
330 { 516 {
331 GNUNET_SCHEDULER_cancel(th->timeout_task); 517 trigger_next_request (h);
332 th->timeout_task = GNUNET_SCHEDULER_NO_TASK; 518 return 0;
333 } 519 }
334 timeout_request (th, NULL); 520 memcpy (buf, hdr, msize);
335 return 0; 521 GNUNET_CONTAINER_DLL_remove (h->pending_head,
522 h->pending_tail,
523 cm);
524 if (NULL != cm->cont)
525 GNUNET_SCHEDULER_add_now (cm->cont, cm->cont_cls);
526 GNUNET_free (cm);
527 trigger_next_request (h);
528 return msize;
336 } 529 }
337 GNUNET_CONTAINER_DLL_remove (h->pending_head, 530 /* now check for 'ready' P2P messages */
338 h->pending_tail, 531 if (NULL != (pr = h->ready_peer_head))
339 th); 532 {
340 GNUNET_assert (h->submitted == NULL); 533 th = pr->pending_head;
341 h->submitted = th; 534 if (size < th->msize + sizeof (struct SendMessage))
342 GNUNET_assert (size >= th->msize); 535 {
343 ret = th->get_message (th->get_message_cls, size, buf); 536 trigger_next_request (h);
344 GNUNET_assert (ret <= size); 537 return 0;
538 }
539 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
540 h->ready_peer_tail,
541 pr);
542 GNUNET_CONTAINER_DLL_remove (pr->pending_head,
543 pr->pending_tail,
544 th);
545 pr->queue_size--;
546 if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
547 {
548 GNUNET_SCHEDULER_cancel (pr->timeout_task);
549 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
550 }
551
552 sm = (struct SendMessage *) buf;
553 sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
554 sm->priority = htonl (th->priority);
555 sm->deadline = GNUNET_TIME_absolute_hton (th->timeout);
556 sm->peer = pr->peer;
557 ret = th->get_message (th->get_message_cls,
558 size - sizeof (struct SendMessage),
559 &sm[1]);
560
561 if (0 == ret)
562 {
345#if DEBUG_CORE 563#if DEBUG_CORE
346 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 564 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
347 "Transmitting %u bytes to core\n", 565 "Size of clients message to peer %s is 0!\n",
348 ret); 566 GNUNET_i2s(&pr->peer));
349#endif 567#endif
350 return ret; 568 /* client decided to send nothing! */
569 request_next_transmission (pr);
570 return 0;
571 }
572#if DEBUG_CORE
573 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
574 "Produced SEND message to core with %u bytes payload\n",
575 (unsigned int) ret);
576#endif
577 GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
578 if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
579 {
580 GNUNET_break (0);
581 request_next_transmission (pr);
582 return 0;
583 }
584 ret += sizeof (struct SendMessage);
585 sm->header.size = htons (ret);
586 GNUNET_assert (ret <= size);
587 GNUNET_free (th);
588 request_next_transmission (pr);
589 return ret;
590 }
591 return 0;
351} 592}
352 593
353 594
354/** 595/**
355 * Check the list of pending requests, send the next 596 * Check the list of pending requests, send the next
356 * one to the core. 597 * one to the core.
598 *
599 * @param h core handle
357 */ 600 */
358static void 601static void
359trigger_next_request (struct GNUNET_CORE_Handle *h) 602trigger_next_request (struct GNUNET_CORE_Handle *h)
360{ 603{
604 uint16_t msize;
605
606 if (GNUNET_YES == h->currently_down)
607 return;
608 if (NULL != h->cth)
609 return;
610 if (h->pending_head != NULL)
611 msize = ntohs (((struct GNUNET_MessageHeader*) &h->pending_head[1])->size);
612 else if (h->ready_peer_head != NULL)
613 msize = h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage);
614 else
615 return; /* no pending message */
616 h->cth = GNUNET_CLIENT_notify_transmit_ready (h->client,
617 msize,
618 GNUNET_TIME_UNIT_FOREVER_REL,
619 GNUNET_NO,
620 &transmit_message, h);
621}
622
623
624
625
626/**
627 * Notify clients about disconnect and free
628 * the entry for connected peer.
629 *
630 * @param cls the 'struct GNUNET_CORE_Handle*'
631 * @param key the peer identity (not used)
632 * @param value the 'struct PeerRecord' to free.
633 * @return GNUNET_YES (continue)
634 */
635static int
636disconnect_and_free_peer_entry (void *cls,
637 const GNUNET_HashCode *key,
638 void *value)
639{
640 struct GNUNET_CORE_Handle *h = cls;
361 struct GNUNET_CORE_TransmitHandle *th; 641 struct GNUNET_CORE_TransmitHandle *th;
642 struct PeerRecord *pr = value;
362 643
363 if (h->currently_down) 644 while (NULL != (th = pr->pending_head))
364 { 645 {
365#if DEBUG_CORE 646 GNUNET_CONTAINER_DLL_remove (pr->pending_head,
366 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 647 pr->pending_tail,
367 "In trigger_next_request, connection currently down...\n"); 648 th);
368#endif 649 pr->queue_size--;
369 return; /* connection temporarily down */ 650 GNUNET_assert (0 ==
651 th->get_message (th->get_message_cls,
652 0, NULL));
653 GNUNET_free (th);
370 } 654 }
371 if (NULL == (th = h->pending_head)) 655 if (pr->pcic != NULL)
372 return; /* no requests pending */ 656 {
373 GNUNET_assert (NULL == h->cth); 657 // FIXME: call pcic callback!
374 h->cth = GNUNET_CLIENT_notify_transmit_ready (h->client_notifications, 658 }
375 th->msize, 659 if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK)
376 GNUNET_TIME_absolute_get_remaining 660 {
377 (th->timeout), 661 GNUNET_SCHEDULER_cancel (pr->timeout_task);
378 GNUNET_NO, 662 pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
379 &request_start, 663 }
380 h); 664 GNUNET_assert (pr->queue_size == 0);
665 if ( (pr->prev != NULL) ||
666 (pr->next != NULL) ||
667 (h->ready_peer_head == pr) )
668 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
669 h->ready_peer_tail,
670 pr);
671 if (h->disconnects != NULL)
672 h->disconnects (h->cls,
673 &pr->peer);
674 GNUNET_assert (GNUNET_YES ==
675 GNUNET_CONTAINER_multihashmap_remove (h->peers,
676 key,
677 pr));
678 GNUNET_assert (pr->pending_head == NULL);
679 GNUNET_assert (pr->pending_tail == NULL);
680 GNUNET_assert (pr->ch = h);
681 GNUNET_assert (pr->queue_size == 0);
682 GNUNET_assert (pr->timeout_task == GNUNET_SCHEDULER_NO_TASK);
683 GNUNET_free (pr);
684 return GNUNET_YES;
381} 685}
382 686
383 687
@@ -388,18 +692,28 @@ trigger_next_request (struct GNUNET_CORE_Handle *h)
388 * @param msg the message received from the core service 692 * @param msg the message received from the core service
389 */ 693 */
390static void 694static void
391main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) 695main_notify_handler (void *cls,
696 const struct GNUNET_MessageHeader *msg)
392{ 697{
393 struct GNUNET_CORE_Handle *h = cls; 698 struct GNUNET_CORE_Handle *h = cls;
394 unsigned int hpos; 699 const struct InitReplyMessage *m;
395 const struct ConnectNotifyMessage *cnm; 700 const struct ConnectNotifyMessage *cnm;
396 const struct DisconnectNotifyMessage *dnm; 701 const struct DisconnectNotifyMessage *dnm;
397 const struct NotifyTrafficMessage *ntm; 702 const struct NotifyTrafficMessage *ntm;
398 const struct GNUNET_MessageHeader *em; 703 const struct GNUNET_MessageHeader *em;
704 const struct ConfigurationInfoMessage *cim;
399 const struct PeerStatusNotifyMessage *psnm; 705 const struct PeerStatusNotifyMessage *psnm;
706 const struct SendMessageReady *smr;
707 const struct GNUNET_CORE_MessageHandler *mh;
708 GNUNET_CORE_StartupCallback init;
709 GNUNET_CORE_PeerConfigurationInfoCallback pcic;
710 struct GNUNET_PeerIdentity my_identity;
711 struct PeerRecord *pr;
712 struct GNUNET_CORE_TransmitHandle *th;
713 unsigned int hpos;
714 int trigger;
400 uint16_t msize; 715 uint16_t msize;
401 uint16_t et; 716 uint16_t et;
402 const struct GNUNET_CORE_MessageHandler *mh;
403 717
404 if (msg == NULL) 718 if (msg == NULL)
405 { 719 {
@@ -417,37 +731,85 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
417#endif 731#endif
418 switch (ntohs (msg->type)) 732 switch (ntohs (msg->type))
419 { 733 {
734 case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY:
735 if (ntohs (msg->size) != sizeof (struct InitReplyMessage))
736 {
737 GNUNET_break (0);
738 reconnect (h);
739 return;
740 }
741 m = (const struct InitReplyMessage *) msg;
742 GNUNET_break (0 == ntohl (m->reserved));
743 /* start our message processing loop */
744#if DEBUG_CORE
745 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
746 "Successfully connected to core service, starting processing loop.\n");
747#endif
748 if (GNUNET_YES == h->currently_down)
749 {
750 h->currently_down = GNUNET_NO;
751 trigger_next_request (h);
752 }
753 if (NULL != (init = h->init))
754 {
755 /* mark so we don't call init on reconnect */
756 h->init = NULL;
757 GNUNET_CRYPTO_hash (&m->publicKey,
758 sizeof (struct
759 GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
760 &my_identity.hashPubKey);
761 init (h->cls, h, &my_identity, &m->publicKey);
762 }
763 break;
420 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT: 764 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT:
421 if (NULL == h->connects)
422 {
423 GNUNET_break (0);
424 break;
425 }
426 if (msize != sizeof (struct ConnectNotifyMessage)) 765 if (msize != sizeof (struct ConnectNotifyMessage))
427 { 766 {
428 GNUNET_break (0); 767 GNUNET_break (0);
429 break; 768 break;
430 } 769 }
431 cnm = (const struct ConnectNotifyMessage *) msg; 770 cnm = (const struct ConnectNotifyMessage *) msg;
432 h->connects (h->cls, 771 pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
433 &cnm->peer, 772 &cnm->peer.hashPubKey);
434 GNUNET_TIME_relative_ntoh (cnm->latency), 773 if (pr != NULL)
435 ntohl (cnm->distance)); 774 {
775 GNUNET_break (0);
776 reconnect (h);
777 return;
778 }
779 pr = GNUNET_malloc (sizeof (struct PeerRecord));
780 pr->peer = cnm->peer;
781 pr->ch = h;
782 GNUNET_assert (GNUNET_YES ==
783 GNUNET_CONTAINER_multihashmap_put (h->peers,
784 &cnm->peer.hashPubKey,
785 pr,
786 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
787 if (NULL != h->connects)
788 h->connects (h->cls,
789 &cnm->peer,
790 NULL /* FIXME: atsi! */);
436 break; 791 break;
437 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT: 792 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT:
438 if (NULL == h->disconnects)
439 {
440 GNUNET_break (0);
441 break;
442 }
443 if (msize != sizeof (struct DisconnectNotifyMessage)) 793 if (msize != sizeof (struct DisconnectNotifyMessage))
444 { 794 {
445 GNUNET_break (0); 795 GNUNET_break (0);
446 break; 796 break;
447 } 797 }
448 dnm = (const struct DisconnectNotifyMessage *) msg; 798 dnm = (const struct DisconnectNotifyMessage *) msg;
449 h->disconnects (h->cls, 799 pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
450 &dnm->peer); 800 &dnm->peer.hashPubKey);
801 if (pr == NULL)
802 {
803 GNUNET_break (0);
804 reconnect (h);
805 return;
806 }
807 trigger = ( (pr->prev != NULL) ||
808 (pr->next != NULL) ||
809 (h->ready_peer_head == pr) );
810 disconnect_and_free_peer_entry (h, &dnm->peer.hashPubKey, pr);
811 if (trigger)
812 trigger_next_request (h);
451 break; 813 break;
452 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE: 814 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE:
453 if (NULL == h->status_events) 815 if (NULL == h->status_events)
@@ -461,13 +823,20 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
461 break; 823 break;
462 } 824 }
463 psnm = (const struct PeerStatusNotifyMessage *) msg; 825 psnm = (const struct PeerStatusNotifyMessage *) msg;
826 pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
827 &psnm->peer.hashPubKey);
828 if (pr == NULL)
829 {
830 GNUNET_break (0);
831 reconnect (h);
832 return;
833 }
464 h->status_events (h->cls, 834 h->status_events (h->cls,
465 &psnm->peer, 835 &psnm->peer,
466 GNUNET_TIME_relative_ntoh (psnm->latency),
467 ntohl (psnm->distance),
468 psnm->bandwidth_in, 836 psnm->bandwidth_in,
469 psnm->bandwidth_out, 837 psnm->bandwidth_out,
470 GNUNET_TIME_absolute_ntoh (psnm->timeout)); 838 GNUNET_TIME_absolute_ntoh (psnm->timeout),
839 NULL /* FIXME: atsi */);
471 break; 840 break;
472 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND: 841 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND:
473 if (msize < 842 if (msize <
@@ -486,6 +855,14 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
486 ntohs (em->size), 855 ntohs (em->size),
487 GNUNET_i2s (&ntm->peer)); 856 GNUNET_i2s (&ntm->peer));
488#endif 857#endif
858 pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
859 &ntm->peer.hashPubKey);
860 if (pr == NULL)
861 {
862 GNUNET_break (0);
863 reconnect (h);
864 return;
865 }
489 if ((GNUNET_NO == h->inbound_hdr_only) && 866 if ((GNUNET_NO == h->inbound_hdr_only) &&
490 (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage))) 867 (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage)))
491 { 868 {
@@ -506,8 +883,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
506 } 883 }
507 if (GNUNET_OK != 884 if (GNUNET_OK !=
508 h->handlers[hpos].callback (h->cls, &ntm->peer, em, 885 h->handlers[hpos].callback (h->cls, &ntm->peer, em,
509 GNUNET_TIME_relative_ntoh (ntm->latency), 886 NULL /* FIXME: atsi */))
510 ntohl (ntm->distance)))
511 { 887 {
512 /* error in processing, do not process other messages! */ 888 /* error in processing, do not process other messages! */
513 break; 889 break;
@@ -515,8 +891,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
515 } 891 }
516 if (NULL != h->inbound_notify) 892 if (NULL != h->inbound_notify)
517 h->inbound_notify (h->cls, &ntm->peer, em, 893 h->inbound_notify (h->cls, &ntm->peer, em,
518 GNUNET_TIME_relative_ntoh (ntm->latency), 894 NULL /* FIXME: atsi */);
519 ntohl (ntm->distance));
520 break; 895 break;
521 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND: 896 case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND:
522 if (msize < 897 if (msize <
@@ -528,6 +903,14 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
528 } 903 }
529 ntm = (const struct NotifyTrafficMessage *) msg; 904 ntm = (const struct NotifyTrafficMessage *) msg;
530 em = (const struct GNUNET_MessageHeader *) &ntm[1]; 905 em = (const struct GNUNET_MessageHeader *) &ntm[1];
906 pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
907 &ntm->peer.hashPubKey);
908 if (pr == NULL)
909 {
910 GNUNET_break (0);
911 reconnect (h);
912 return;
913 }
531 if ((GNUNET_NO == h->outbound_hdr_only) && 914 if ((GNUNET_NO == h->outbound_hdr_only) &&
532 (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage))) 915 (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage)))
533 { 916 {
@@ -540,159 +923,157 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
540 break; 923 break;
541 } 924 }
542 h->outbound_notify (h->cls, &ntm->peer, em, 925 h->outbound_notify (h->cls, &ntm->peer, em,
543 GNUNET_TIME_relative_ntoh (ntm->latency), 926 NULL /* FIXME: atsi? */);
544 ntohl (ntm->distance)); 927 break;
928 case GNUNET_MESSAGE_TYPE_CORE_SEND_READY:
929 if (msize != sizeof (struct SendMessageReady))
930 {
931 GNUNET_break (0);
932 break;
933 }
934 smr = (const struct SendMessageReady *) msg;
935 pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
936 &smr->peer.hashPubKey);
937 if (pr == NULL)
938 {
939 GNUNET_break (0);
940 reconnect (h);
941 return;
942 }
943 th = pr->pending_head;
944 if (ntohs (smr->smr_id) != th->smr_id)
945 {
946 /* READY message is for expired or cancelled message,
947 ignore! (we should have already sent another request) */
948 break;
949 }
950 if ( (pr->prev != NULL) ||
951 (pr->next != NULL) ||
952 (h->ready_peer_head == pr) )
953 {
954 /* we should not already be on the ready list... */
955 GNUNET_break (0);
956 reconnect (h);
957 return;
958 }
959 GNUNET_CONTAINER_DLL_insert (h->ready_peer_head,
960 h->ready_peer_tail,
961 pr);
962 trigger_next_request (h);
963 break;
964 case GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO:
965 if (ntohs (msg->size) != sizeof (struct ConfigurationInfoMessage))
966 {
967 GNUNET_break (0);
968 break;
969 }
970 cim = (const struct ConfigurationInfoMessage*) msg;
971 pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
972 &cim->peer.hashPubKey);
973 if (pr == NULL)
974 {
975 GNUNET_break (0);
976 reconnect (h);
977 return;
978 }
979 if (pr->rim_id != ntohl (cim->rim_id))
980 break;
981 pcic = pr->pcic;
982 pr->pcic = NULL;
983 if (pcic != NULL)
984 pcic (pr->pcic_cls,
985 &pr->peer,
986 cim->bw_out,
987 ntohl (cim->reserved_amount),
988 GNUNET_ntohll (cim->preference));
545 break; 989 break;
546 default: 990 default:
547 GNUNET_break (0); 991 GNUNET_break (0);
548 break; 992 break;
549 } 993 }
550 GNUNET_CLIENT_receive (h->client_notifications, 994 GNUNET_CLIENT_receive (h->client,
551 &main_notify_handler, h, GNUNET_TIME_UNIT_FOREVER_REL); 995 &main_notify_handler, h, GNUNET_TIME_UNIT_FOREVER_REL);
552} 996}
553 997
554 998
555/** 999/**
556 * Function called when we are ready to transmit our 1000 * Task executed once we are done transmitting the INIT message.
557 * "START" message (or when this operation timed out). 1001 * Starts our 'receive' loop.
558 * 1002 *
559 * @param cls closure 1003 * @param cls the 'struct GNUNET_CORE_Handle'
560 * @param size number of bytes available in buf 1004 * @param tc task context
561 * @param buf where the callee should write the message
562 * @return number of bytes written to buf
563 */
564static size_t transmit_start (void *cls, size_t size, void *buf);
565
566
567/**
568 * Function called on the first message received from
569 * the service (contains our public key, etc.).
570 * Should trigger calling the init callback
571 * and then start our regular message processing.
572 *
573 * @param cls closure
574 * @param msg message received, NULL on timeout or fatal error
575 */ 1005 */
576static void 1006static void
577init_reply_handler (void *cls, const struct GNUNET_MessageHeader *msg) 1007init_done_task (void *cls,
1008 const struct GNUNET_SCHEDULER_TaskContext *tc)
578{ 1009{
579 struct GNUNET_CORE_Handle *h = cls; 1010 struct GNUNET_CORE_Handle *h = cls;
580 const struct InitReplyMessage *m;
581 GNUNET_CORE_StartupCallback init;
582 struct GNUNET_PeerIdentity my_identity;
583 1011
584 if ((msg == NULL) || 1012 if (0 == (tc->reason & GNUNET_SCHEDULER_REASON_PREREQ_DONE))
585 (ntohs (msg->size) != sizeof (struct InitReplyMessage)) ||
586 (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY))
587 { 1013 {
588 if (msg != NULL) 1014 if (h->client != NULL)
589 {
590 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
591 _
592 ("Error connecting to core service (failed to receive `%s' message, got message of type %u and size %u).\n"),
593 "INIT_REPLY",
594 ntohs (msg->type),
595 ntohs (msg->size));
596 GNUNET_break (0);
597 }
598 else
599 { 1015 {
600#if DEBUG_CORE 1016 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
601 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1017 h->client = NULL;
602 _("Failed to connect to core service, will retry.\n"));
603#endif
604 } 1018 }
605 transmit_start (h, 0, NULL); 1019 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
1020 &reconnect_task,
1021 h);
606 return; 1022 return;
607 } 1023 }
608 m = (const struct InitReplyMessage *) msg; 1024 GNUNET_CLIENT_receive (h->client,
609 GNUNET_break (0 == ntohl (m->reserved)); 1025 &main_notify_handler,
610 /* start our message processing loop */ 1026 h,
611#if DEBUG_CORE 1027 GNUNET_TIME_UNIT_FOREVER_REL);
612 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
613 "Successfully connected to core service, starting processing loop.\n");
614#endif
615 h->currently_down = GNUNET_NO;
616 trigger_next_request (h);
617 GNUNET_CLIENT_receive (h->client_notifications,
618 &main_notify_handler, h, GNUNET_TIME_UNIT_FOREVER_REL);
619 if (NULL != (init = h->init))
620 {
621 /* mark so we don't call init on reconnect */
622 h->init = NULL;
623 GNUNET_CRYPTO_hash (&m->publicKey,
624 sizeof (struct
625 GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
626 &my_identity.hashPubKey);
627 init (h->cls, h, &my_identity, &m->publicKey);
628 }
629}
630
631
632static void
633reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
634{
635 struct GNUNET_CORE_Handle *h = cls;
636 h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
637 reconnect (h);
638} 1028}
639 1029
640 1030
641/** 1031/**
642 * Function called when we are ready to transmit our 1032 * Our current client connection went down. Clean it up
643 * "START" message (or when this operation timed out). 1033 * and try to reconnect!
644 * 1034 *
645 * @param cls closure 1035 * @param h our handle to the core service
646 * @param size number of bytes available in buf
647 * @param buf where the callee should write the message
648 * @return number of bytes written to buf
649 */ 1036 */
650static size_t 1037static void
651transmit_start (void *cls, size_t size, void *buf) 1038reconnect (struct GNUNET_CORE_Handle *h)
652{ 1039{
653 struct GNUNET_CORE_Handle *h = cls; 1040 struct ControlMessage *cm;
654 struct InitMessage *init; 1041 struct InitMessage *init;
655 uint16_t *ts;
656 uint16_t msize;
657 uint32_t opt; 1042 uint32_t opt;
1043 uint16_t msize;
1044 uint16_t *ts;
658 unsigned int hpos; 1045 unsigned int hpos;
659 struct GNUNET_TIME_Relative delay;
660 1046
661 h->cth = NULL; 1047#if DEBUG_CORE
662 if (size == 0) 1048 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1049 "Reconnecting to CORE service\n");
1050#endif
1051 if (h->client != NULL)
663 { 1052 {
664 if ((h->init == NULL) || 1053 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
665 (GNUNET_TIME_absolute_get ().abs_value < h->startup_timeout.abs_value)) 1054 h->client = NULL;
666 { 1055 GNUNET_CONTAINER_multihashmap_iterate (h->peers,
667 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1056 &disconnect_and_free_peer_entry,
668 _("Failed to connect to core service, retrying.\n")); 1057 h);
669 delay = GNUNET_TIME_absolute_get_remaining (h->startup_timeout); 1058 }
670 if ((h->init == NULL) || (delay.rel_value > 1000)) 1059 h->currently_down = GNUNET_YES;
671 delay = GNUNET_TIME_UNIT_SECONDS; 1060 h->client = GNUNET_CLIENT_connect ("core", h->cfg);
672 if (h->init == NULL) 1061 if (h->client == NULL)
673 h->startup_timeout = 1062 {
674 GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_MINUTES); 1063 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
675 h->reconnect_task = 1064 &reconnect_task,
676 GNUNET_SCHEDULER_add_delayed (delay, &reconnect_task, h); 1065 h);
677 return 0; 1066 return;
678 }
679 /* timeout on initial connect */
680 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
681 _("Failed to connect to core service, giving up.\n"));
682 h->init (h->cls, NULL, NULL, NULL);
683 GNUNET_CORE_disconnect (h);
684 return 0;
685 } 1067 }
686 msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage); 1068 msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage);
687 GNUNET_assert (size >= msize); 1069 cm = GNUNET_malloc (sizeof (struct ControlMessage) +
688 init = buf; 1070 msize);
1071 cm->cont = &init_done_task;
1072 cm->cont_cls = h;
1073 init = (struct InitMessage*) &cm[1];
689 init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT); 1074 init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT);
690 init->header.size = htons (msize); 1075 init->header.size = htons (msize);
691 opt = GNUNET_CORE_OPTION_NOTHING; 1076 opt = GNUNET_CORE_OPTION_SEND_CONNECT | GNUNET_CORE_OPTION_SEND_DISCONNECT;
692 if (h->connects != NULL)
693 opt |= GNUNET_CORE_OPTION_SEND_CONNECT;
694 if (h->disconnects != NULL)
695 opt |= GNUNET_CORE_OPTION_SEND_DISCONNECT;
696 if (h->status_events != NULL) 1077 if (h->status_events != NULL)
697 opt |= GNUNET_CORE_OPTION_SEND_STATUS_CHANGE; 1078 opt |= GNUNET_CORE_OPTION_SEND_STATUS_CHANGE;
698 if (h->inbound_notify != NULL) 1079 if (h->inbound_notify != NULL)
@@ -710,25 +1091,23 @@ transmit_start (void *cls, size_t size, void *buf)
710 opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND; 1091 opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND;
711 } 1092 }
712 init->options = htonl (opt); 1093 init->options = htonl (opt);
713 ts = (uint16_t *) & init[1]; 1094 ts = (uint16_t *) &init[1];
714 for (hpos = 0; hpos < h->hcnt; hpos++) 1095 for (hpos = 0; hpos < h->hcnt; hpos++)
715 ts[hpos] = htons (h->handlers[hpos].type); 1096 ts[hpos] = htons (h->handlers[hpos].type);
716 GNUNET_CLIENT_receive (h->client_notifications, 1097 GNUNET_CONTAINER_DLL_insert (h->pending_head,
717 &init_reply_handler, 1098 h->pending_tail,
718 h, 1099 cm);
719 GNUNET_TIME_absolute_get_remaining 1100 trigger_next_request (h);
720 (h->startup_timeout));
721 return sizeof (struct InitMessage) + h->hcnt * sizeof (uint16_t);
722} 1101}
723 1102
724 1103
1104
725/** 1105/**
726 * Connect to the core service. Note that the connection may 1106 * Connect to the core service. Note that the connection may
727 * complete (or fail) asynchronously. 1107 * complete (or fail) asynchronously.
728 * 1108 *
729 * @param cfg configuration to use 1109 * @param cfg configuration to use
730 * @param queue_size size of the per-peer message queue 1110 * @param queue_size size of the per-peer message queue
731 * @param timeout after how long should we give up trying to connect to the core service?
732 * @param cls closure for the various callbacks that follow (including handlers in the handlers array) 1111 * @param cls closure for the various callbacks that follow (including handlers in the handlers array)
733 * @param init callback to call on timeout or once we have successfully 1112 * @param init callback to call on timeout or once we have successfully
734 * connected to the core service; note that timeout is only meaningful if init is not NULL 1113 * connected to the core service; note that timeout is only meaningful if init is not NULL
@@ -750,7 +1129,6 @@ transmit_start (void *cls, size_t size, void *buf)
750struct GNUNET_CORE_Handle * 1129struct GNUNET_CORE_Handle *
751GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, 1130GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
752 unsigned int queue_size, 1131 unsigned int queue_size,
753 struct GNUNET_TIME_Relative timeout,
754 void *cls, 1132 void *cls,
755 GNUNET_CORE_StartupCallback init, 1133 GNUNET_CORE_StartupCallback init,
756 GNUNET_CORE_ConnectEventHandler connects, 1134 GNUNET_CORE_ConnectEventHandler connects,
@@ -766,6 +1144,7 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
766 1144
767 h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle)); 1145 h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle));
768 h->cfg = cfg; 1146 h->cfg = cfg;
1147 h->queue_size = queue_size;
769 h->cls = cls; 1148 h->cls = cls;
770 h->init = init; 1149 h->init = init;
771 h->connects = connects; 1150 h->connects = connects;
@@ -776,133 +1155,57 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
776 h->inbound_hdr_only = inbound_hdr_only; 1155 h->inbound_hdr_only = inbound_hdr_only;
777 h->outbound_hdr_only = outbound_hdr_only; 1156 h->outbound_hdr_only = outbound_hdr_only;
778 h->handlers = handlers; 1157 h->handlers = handlers;
779 h->queue_size = queue_size;
780 h->client_notifications = GNUNET_CLIENT_connect ("core", cfg);
781 if (h->client_notifications == NULL)
782 {
783 GNUNET_free (h);
784 return NULL;
785 }
786 h->startup_timeout = GNUNET_TIME_relative_to_absolute (timeout);
787 h->hcnt = 0; 1158 h->hcnt = 0;
788 while (handlers[h->hcnt].callback != NULL) 1159 while (handlers[h->hcnt].callback != NULL)
789 h->hcnt++; 1160 h->hcnt++;
790 GNUNET_assert (h->hcnt < 1161 GNUNET_assert (h->hcnt <
791 (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1162 (GNUNET_SERVER_MAX_MESSAGE_SIZE -
792 sizeof (struct InitMessage)) / sizeof (uint16_t)); 1163 sizeof (struct InitMessage)) / sizeof (uint16_t));
793#if DEBUG_CORE 1164 reconnect (h);
794 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
795 "Trying to connect to core service in next %llu ms.\n",
796 timeout.rel_value);
797#endif
798 h->cth =
799 GNUNET_CLIENT_notify_transmit_ready (h->client_notifications,
800 sizeof (struct InitMessage) +
801 sizeof (uint16_t) * h->hcnt, timeout,
802 GNUNET_YES,
803 &transmit_start, h);
804 return h; 1165 return h;
805} 1166}
806 1167
807 1168
808/** 1169/**
809 * Disconnect from the core service. 1170 * Disconnect from the core service. This function can only
1171 * be called *after* all pending 'GNUNET_CORE_notify_transmit_ready'
1172 * requests have been explicitly cancelled.
810 * 1173 *
811 * @param handle connection to core to disconnect 1174 * @param handle connection to core to disconnect
812 */ 1175 */
813void 1176void
814GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle) 1177GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
815{ 1178{
816 if (handle->cth != NULL) 1179 struct ControlMessage *cm;
817 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth); 1180
818 if (handle->solicit_transmit_req != NULL)
819 GNUNET_CORE_notify_transmit_ready_cancel (handle->solicit_transmit_req);
820 if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK) 1181 if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
821 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
822 if (handle->client_notifications != NULL)
823 GNUNET_CLIENT_disconnect (handle->client_notifications, GNUNET_NO);
824 GNUNET_break (handle->pending_head == NULL);
825 GNUNET_free_non_null (handle->solicit_buffer);
826 GNUNET_free (handle);
827}
828
829
830/**
831 * Build the message requesting data transmission.
832 */
833static size_t
834produce_send (void *cls, size_t size, void *buf)
835{
836 struct GNUNET_CORE_TransmitHandle *th = cls;
837 struct GNUNET_CORE_Handle *h;
838 struct SendMessage *sm;
839 size_t dt;
840 GNUNET_CONNECTION_TransmitReadyNotify notify;
841 void *notify_cls;
842
843 h = th->ch;
844 if (buf == NULL)
845 { 1182 {
846 /* timeout or error */ 1183 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
847#if DEBUG_CORE 1184 handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
848 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
849 "P2P transmission request for `%4s' timed out.\n",
850 GNUNET_i2s(&th->peer));
851#endif
852 GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
853 GNUNET_CORE_notify_transmit_ready_cancel (th);
854 if ((h->pending_head == th) && (h->cth != NULL)) /* Request hasn't been canceled yet! */
855 {
856 GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
857 h->cth = NULL;
858 trigger_next_request (h);
859 }
860 /* Otherwise this request timed out, but another is actually queued for sending, so don't try to send another! */
861 return 0;
862 } 1185 }
863 sm = (struct SendMessage *) buf; 1186 if (handle->cth != NULL)
864 sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
865 sm->priority = htonl (th->priority);
866 sm->deadline = GNUNET_TIME_absolute_hton (th->timeout);
867 sm->peer = th->peer;
868 notify = th->notify;
869 notify_cls = th->notify_cls;
870 GNUNET_CORE_notify_transmit_ready_cancel (th);
871 trigger_next_request (h);
872 size = GNUNET_MIN (size,
873 GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE);
874 GNUNET_assert (size >= sizeof (struct SendMessage));
875 dt = notify (notify_cls, size - sizeof (struct SendMessage), &sm[1]);
876 if (0 == dt)
877 { 1187 {
878#if DEBUG_CORE 1188 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth);
879 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1189 handle->cth = NULL;
880 "Size of clients message to peer %s is 0!\n",
881 GNUNET_i2s(&sm->peer));
882#endif
883 /* client decided to send nothing! */
884 return 0;
885 } 1190 }
886#if DEBUG_CORE 1191 if (handle->client != NULL)
887 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
888 "Produced SEND message to core with %u bytes payload\n",
889 dt);
890#endif
891 GNUNET_assert (dt >= sizeof (struct GNUNET_MessageHeader));
892 if (dt + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
893 { 1192 {
894 GNUNET_break (0); 1193 GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
895 return 0; 1194 handle->client = NULL;
896 } 1195 }
897#if DEBUG_CORE 1196 while (NULL != (cm = handle->pending_head))
898 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1197 {
899 "Preparing for P2P transmission of %u bytes to `%4s'.\n", 1198 GNUNET_CONTAINER_DLL_remove (handle->pending_head,
900 dt, 1199 handle->pending_tail,
901 GNUNET_i2s(&sm->peer)); 1200 cm);
902#endif 1201 GNUNET_free (cm);
903 sm->header.size = htons (dt + sizeof (struct SendMessage)); 1202 }
904 GNUNET_assert (dt + sizeof (struct SendMessage) <= size); 1203 GNUNET_CONTAINER_multihashmap_iterate (handle->peers,
905 return dt + sizeof (struct SendMessage); 1204 &disconnect_and_free_peer_entry,
1205 handle);
1206 GNUNET_CONTAINER_multihashmap_destroy (handle->peers);
1207 GNUNET_break (handle->ready_peer_head == NULL);
1208 GNUNET_free (handle);
906} 1209}
907 1210
908 1211
@@ -926,61 +1229,489 @@ produce_send (void *cls, size_t size, void *buf)
926 */ 1229 */
927struct GNUNET_CORE_TransmitHandle * 1230struct GNUNET_CORE_TransmitHandle *
928GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, 1231GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle,
929 unsigned int priority, 1232 uint32_t priority,
930 struct GNUNET_TIME_Relative maxdelay, 1233 struct GNUNET_TIME_Relative maxdelay,
931 const struct GNUNET_PeerIdentity *target, 1234 const struct GNUNET_PeerIdentity *target,
932 size_t notify_size, 1235 size_t notify_size,
933 GNUNET_CONNECTION_TransmitReadyNotify notify, 1236 GNUNET_CONNECTION_TransmitReadyNotify notify,
934 void *notify_cls) 1237 void *notify_cls)
935{ 1238{
1239 struct PeerRecord *pr;
936 struct GNUNET_CORE_TransmitHandle *th; 1240 struct GNUNET_CORE_TransmitHandle *th;
1241 struct GNUNET_CORE_TransmitHandle *pos;
1242 struct GNUNET_CORE_TransmitHandle *prev;
1243 struct GNUNET_CORE_TransmitHandle *minp;
937 1244
1245 pr = GNUNET_CONTAINER_multihashmap_get (handle->peers,
1246 &target->hashPubKey);
1247 if (NULL == pr)
1248 {
1249 /* attempt to send to peer that is not connected */
1250 GNUNET_break (0);
1251 return NULL;
1252 }
938 GNUNET_assert (notify_size + sizeof (struct SendMessage) < 1253 GNUNET_assert (notify_size + sizeof (struct SendMessage) <
939 GNUNET_SERVER_MAX_MESSAGE_SIZE); 1254 GNUNET_SERVER_MAX_MESSAGE_SIZE);
940 th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle)); 1255 th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle));
941 th->ch = handle; 1256 th->peer = pr;
942 GNUNET_CONTAINER_DLL_insert_after (handle->pending_head, 1257 th->get_message = notify;
943 handle->pending_tail, 1258 th->get_message_cls = notify_cls;
944 handle->pending_tail,
945 th);
946 th->get_message = &produce_send;
947 th->get_message_cls = th;
948 th->notify = notify;
949 th->notify_cls = notify_cls;
950 th->peer = *target;
951 th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay); 1259 th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay);
952 th->timeout_task = GNUNET_SCHEDULER_add_delayed (maxdelay,
953 &timeout_request, th);
954 th->priority = priority; 1260 th->priority = priority;
955 th->msize = sizeof (struct SendMessage) + notify_size; 1261 th->msize = notify_size;
1262 /* bound queue size */
1263 if (pr->queue_size == handle->queue_size)
1264 {
1265 /* find lowest-priority entry */
1266 minp = pr->pending_head;
1267 prev = minp->next;
1268 while (prev != NULL)
1269 {
1270 if (prev->priority < minp->priority)
1271 minp = prev;
1272 prev = prev->next;
1273 }
1274 if (minp == NULL)
1275 {
1276 GNUNET_break (handle->queue_size != 0);
1277 GNUNET_break (pr->queue_size == 0);
1278 return NULL;
1279 }
1280 if (priority <= minp->priority)
1281 return NULL; /* priority too low */
1282 GNUNET_CONTAINER_DLL_remove (pr->pending_head,
1283 pr->pending_tail,
1284 minp);
1285 pr->queue_size--;
1286 GNUNET_assert (0 ==
1287 minp->get_message (minp->get_message_cls,
1288 0, NULL));
1289 GNUNET_free (minp);
1290 }
1291
1292 /* Order entries by deadline, but SKIP 'HEAD' if
1293 we're in the 'ready_peer_*' DLL */
1294 pos = pr->pending_head;
1295 if ( (pr->prev != NULL) ||
1296 (pr->next != NULL) ||
1297 (pr == handle->ready_peer_head) )
1298 {
1299 GNUNET_assert (pos != NULL);
1300 pos = pos->next; /* skip head */
1301 }
1302
1303 /* insertion sort */
1304 prev = pos;
1305 while ( (pos != NULL) &&
1306 (pos->timeout.abs_value < th->timeout.abs_value) )
1307 {
1308 prev = pos;
1309 pos = pos->next;
1310 }
1311 GNUNET_CONTAINER_DLL_insert_after (pr->pending_head,
1312 pr->pending_tail,
1313 prev,
1314 th);
1315 pr->queue_size++;
956 /* was the request queue previously empty? */ 1316 /* was the request queue previously empty? */
957 if ( (handle->pending_head == th) && 1317 if (pr->pending_head == th)
958 (handle->cth == NULL) ) 1318 request_next_transmission (pr);
959 trigger_next_request (handle);
960 return th; 1319 return th;
961} 1320}
962 1321
963 1322
964/** 1323/**
965 * Cancel the specified transmission-ready notification. 1324 * Cancel the specified transmission-ready notification.
966 *s 1325 *
967 * @param th handle that was returned by "notify_transmit_ready". 1326 * @param th handle that was returned by "notify_transmit_ready".
968 */ 1327 */
969void 1328void
970GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle 1329GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle
971 *th) 1330 *th)
972{ 1331{
973 struct GNUNET_CORE_Handle *h = th->ch; 1332 struct PeerRecord *pr = th->peer;
1333 struct GNUNET_CORE_Handle *h = pr->ch;
1334 int was_head;
974 1335
975 if (h->submitted == th) 1336 was_head = (pr->pending_head == th);
976 h->submitted = NULL; 1337 GNUNET_CONTAINER_DLL_remove (pr->pending_head,
977 else 1338 pr->pending_tail,
978 GNUNET_CONTAINER_DLL_remove (h->pending_head, 1339 th);
979 h->pending_tail, 1340 if (th->cm != NULL)
980 th); 1341 {
981 if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK) 1342 /* we're currently in the control queue, remove */
982 GNUNET_SCHEDULER_cancel (th->timeout_task); 1343 GNUNET_CONTAINER_DLL_remove (h->pending_head,
1344 h->pending_tail,
1345 th->cm);
1346 GNUNET_free (th->cm);
1347 }
983 GNUNET_free (th); 1348 GNUNET_free (th);
1349 if (was_head)
1350 {
1351 if ( (pr->prev != NULL) ||
1352 (pr->next != NULL) ||
1353 (pr == h->ready_peer_head) )
1354 {
1355 /* the request that was 'approved' by core was
1356 cancelled before it could be transmitted; remove
1357 us from the 'ready' list */
1358 GNUNET_CONTAINER_DLL_remove (h->ready_peer_head,
1359 h->ready_peer_tail,
1360 pr);
1361 }
1362 request_next_transmission (pr);
1363 }
1364}
1365
1366
1367/* ****************** GNUNET_CORE_peer_request_connect ******************** */
1368
1369/**
1370 * Handle for a request to the core to connect to
1371 * a particular peer. Can be used to cancel the request
1372 * (before the 'cont'inuation is called).
1373 */
1374struct GNUNET_CORE_PeerRequestHandle
1375{
1376
1377 /**
1378 * Link to control message.
1379 */
1380 struct ControlMessage *cm;
1381
1382 /**
1383 * Core handle used.
1384 */
1385 struct GNUNET_CORE_Handle *h;
1386
1387 /**
1388 * Continuation to run when done.
1389 */
1390 GNUNET_SCHEDULER_Task cont;
1391
1392 /**
1393 * Closure for 'cont'.
1394 */
1395 void *cont_cls;
1396
1397};
1398
1399
1400
1401/**
1402 * Continuation called when the control message was transmitted.
1403 * Calls the original continuation and frees the remaining
1404 * resources.
1405 *
1406 * @param cls the 'struct GNUNET_CORE_PeerRequestHandle'
1407 * @param tc scheduler context
1408 */
1409static void
1410peer_request_connect_cont (void *cls,
1411 const struct GNUNET_SCHEDULER_TaskContext *tc)
1412{
1413 struct GNUNET_CORE_PeerRequestHandle *ret = cls;
1414
1415 if (ret->cont != NULL)
1416 ret->cont (ret->cont_cls, tc);
1417 GNUNET_free (ret);
1418}
1419
1420
1421/**
1422 * Request that the core should try to connect to a particular peer.
1423 * Once the request has been transmitted to the core, the continuation
1424 * function will be called. Note that this does NOT mean that a
1425 * connection was successfully established -- it only means that the
1426 * core will now try. Successful establishment of the connection
1427 * will be signalled to the 'connects' callback argument of
1428 * 'GNUNET_CORE_connect' only. If the core service does not respond
1429 * to our connection attempt within the given time frame, 'cont' will
1430 * be called with the TIMEOUT reason code.
1431 *
1432 * @param h core handle
1433 * @param timeout how long to try to talk to core
1434 * @param peer who should we connect to
1435 * @param cont function to call once the request has been completed (or timed out)
1436 * @param cont_cls closure for cont
1437 * @return NULL on error (cont will not be called), otherwise handle for cancellation
1438 */
1439struct GNUNET_CORE_PeerRequestHandle *
1440GNUNET_CORE_peer_request_connect (struct GNUNET_CORE_Handle *h,
1441 struct GNUNET_TIME_Relative timeout,
1442 const struct GNUNET_PeerIdentity * peer,
1443 GNUNET_SCHEDULER_Task cont,
1444 void *cont_cls)
1445{
1446 struct GNUNET_CORE_PeerRequestHandle *ret;
1447 struct ControlMessage *cm;
1448 struct ConnectMessage *msg;
1449
1450 cm = GNUNET_malloc (sizeof (struct ControlMessage) +
1451 sizeof (struct ConnectMessage));
1452 msg = (struct ConnectMessage*) &cm[1];
1453 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT);
1454 msg->header.size = htons (sizeof (struct ConnectMessage));
1455 msg->reserved = htonl (0);
1456 msg->timeout = GNUNET_TIME_relative_hton (timeout);
1457 msg->peer = *peer;
1458 GNUNET_CONTAINER_DLL_insert (h->pending_head,
1459 h->pending_tail,
1460 cm);
1461 ret = GNUNET_malloc (sizeof (struct GNUNET_CORE_PeerRequestHandle));
1462 ret->h = h;
1463 ret->cm = cm;
1464 ret->cont = cont;
1465 ret->cont_cls = cont_cls;
1466 cm->cont = &peer_request_connect_cont;
1467 cm->cont_cls = ret;
1468 if (h->pending_head == cm)
1469 trigger_next_request (h);
1470 return ret;
1471}
1472
1473
1474/**
1475 * Cancel a pending request to connect to a particular peer. Must not
1476 * be called after the 'cont' function was invoked.
1477 *
1478 * @param req request handle that was returned for the original request
1479 */
1480void
1481GNUNET_CORE_peer_request_connect_cancel (struct GNUNET_CORE_PeerRequestHandle *req)
1482{
1483 struct GNUNET_CORE_Handle *h = req->h;
1484 struct ControlMessage *cm = req->cm;
1485
1486 GNUNET_CONTAINER_DLL_remove (h->pending_head,
1487 h->pending_tail,
1488 cm);
1489 GNUNET_free (cm);
1490 GNUNET_free (req);
1491}
1492
1493
1494/* ****************** GNUNET_CORE_peer_change_preference ******************** */
1495
1496
1497struct GNUNET_CORE_InformationRequestContext
1498{
1499
1500 /**
1501 * Our connection to the service.
1502 */
1503 struct GNUNET_CORE_Handle *h;
1504
1505 /**
1506 * Function to call with the information.
1507 */
1508 GNUNET_CORE_PeerConfigurationInfoCallback info;
1509
1510 /**
1511 * Closure for info.
1512 */
1513 void *info_cls;
1514
1515 /**
1516 * Link to control message, NULL if CM was sent.
1517 */
1518 struct ControlMessage *cm;
1519
1520 /**
1521 * Link to peer record.
1522 */
1523 struct PeerRecord *pr;
1524};
1525
1526
1527/**
1528 * CM was sent, remove link so we don't double-free.
1529 *
1530 * @param cls the 'struct GNUNET_CORE_InformationRequestContext'
1531 * @param tc scheduler context
1532 */
1533static void
1534change_preference_send_continuation (void *cls,
1535 const struct GNUNET_SCHEDULER_TaskContext *tc)
1536{
1537 struct GNUNET_CORE_InformationRequestContext *irc = cls;
1538
1539 irc->cm = NULL;
1540}
1541
1542
1543/**
1544 * Obtain statistics and/or change preferences for the given peer.
1545 *
1546 * @param h core handle
1547 * @param peer identifies the peer
1548 * @param timeout after how long should we give up (and call "info" with NULL
1549 * for "peer" to signal an error)?
1550 * @param bw_out set to the current bandwidth limit (sending) for this peer,
1551 * caller should set "bw_out" to "-1" to avoid changing
1552 * the current value; otherwise "bw_out" will be lowered to
1553 * the specified value; passing a pointer to "0" can be used to force
1554 * us to disconnect from the peer; "bw_out" might not increase
1555 * as specified since the upper bound is generally
1556 * determined by the other peer!
1557 * @param amount reserve N bytes for receiving, negative
1558 * amounts can be used to undo a (recent) reservation;
1559 * @param preference increase incoming traffic share preference by this amount;
1560 * in the absence of "amount" reservations, we use this
1561 * preference value to assign proportional bandwidth shares
1562 * to all connected peers
1563 * @param info function to call with the resulting configuration information
1564 * @param info_cls closure for info
1565 * @return NULL on error
1566 */
1567struct GNUNET_CORE_InformationRequestContext *
1568GNUNET_CORE_peer_change_preference (struct GNUNET_CORE_Handle *h,
1569 const struct GNUNET_PeerIdentity *peer,
1570 struct GNUNET_TIME_Relative timeout,
1571 struct GNUNET_BANDWIDTH_Value32NBO bw_out,
1572 int32_t amount,
1573 uint64_t preference,
1574 GNUNET_CORE_PeerConfigurationInfoCallback info,
1575 void *info_cls)
1576{
1577 struct GNUNET_CORE_InformationRequestContext *irc;
1578 struct PeerRecord *pr;
1579 struct RequestInfoMessage *rim;
1580 struct ControlMessage *cm;
1581
1582 pr = GNUNET_CONTAINER_multihashmap_get (h->peers,
1583 &peer->hashPubKey);
1584 if (NULL == pr)
1585 {
1586 /* attempt to change preference on peer that is not connected */
1587 GNUNET_break (0);
1588 return NULL;
1589 }
1590 if (pr->pcic != NULL)
1591 {
1592 /* second change before first one is done */
1593 GNUNET_break (0);
1594 return NULL;
1595 }
1596 irc = GNUNET_malloc (sizeof (struct GNUNET_CORE_InformationRequestContext));
1597 irc->h = h;
1598 irc->info = info;
1599 irc->info_cls = info_cls;
1600 cm = GNUNET_malloc (sizeof (struct ControlMessage) +
1601 sizeof (struct RequestInfoMessage));
1602 cm->cont = &change_preference_send_continuation;
1603 cm->cont_cls = irc;
1604 irc->cm = cm;
1605 rim = (struct RequestInfoMessage*) &cm[1];
1606 rim->header.size = htons (sizeof (struct RequestInfoMessage));
1607 rim->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO);
1608 rim->rim_id = htonl (pr->rim_id = h->rim_id_gen++);
1609 rim->limit_outbound = bw_out;
1610 rim->reserve_inbound = htonl (amount);
1611 rim->preference_change = GNUNET_htonll(preference);
1612 rim->peer = *peer;
1613 GNUNET_CONTAINER_DLL_insert (h->pending_head,
1614 h->pending_tail,
1615 cm);
1616 pr->pcic = info;
1617 pr->pcic_cls = info_cls;
1618 return irc;
1619}
1620
1621
1622/**
1623 * Cancel request for getting information about a peer.
1624 * Note that an eventual change in preference, trust or bandwidth
1625 * assignment MAY have already been committed at the time,
1626 * so cancelling a request is NOT sure to undo the original
1627 * request. The original request may or may not still commit.
1628 * The only thing cancellation ensures is that the callback
1629 * from the original request will no longer be called.
1630 *
1631 * @param irc context returned by the original GNUNET_CORE_peer_get_info call
1632 */
1633void
1634GNUNET_CORE_peer_change_preference_cancel (struct GNUNET_CORE_InformationRequestContext *irc)
1635{
1636 struct GNUNET_CORE_Handle *h = irc->h;
1637 struct PeerRecord *pr = irc->pr;
1638
1639 if (irc->cm != NULL)
1640 {
1641 GNUNET_CONTAINER_DLL_remove (h->pending_head,
1642 h->pending_tail,
1643 irc->cm);
1644 GNUNET_free (irc->cm);
1645 }
1646 pr->pcic = NULL;
1647 pr->pcic_cls = NULL;
1648 GNUNET_free (irc);
1649}
1650
1651
1652/* ********************* GNUNET_CORE_iterate_peers *********************** */
1653
1654/**
1655 * Context for 'iterate_peers' helper function.
1656 */
1657struct IterationContext
1658{
1659 /**
1660 * Callback to call.
1661 */
1662 GNUNET_CORE_ConnectEventHandler peer_cb;
1663
1664 /**
1665 * Closure for 'peer_cb'.
1666 */
1667 void *cb_cls;
1668};
1669
1670
1671/**
1672 * Call callback for each peer.
1673 *
1674 * @param cls the 'struct IterationContext'
1675 * @param hc peer identity, not used
1676 * @param value the 'struct PeerRecord'
1677 * @return GNUNET_YES (continue iteration)
1678 */
1679static int
1680iterate_peers (void *cls,
1681 const GNUNET_HashCode *hc,
1682 void *value)
1683{
1684 struct IterationContext *ic = cls;
1685 struct PeerRecord *pr = value;
1686
1687 ic->peer_cb (ic->cb_cls,
1688 &pr->peer,
1689 NULL /* FIXME: pass atsi? */);
1690 return GNUNET_YES;
1691}
1692
1693
1694/**
1695 * Obtain statistics and/or change preferences for the given peer.
1696 *
1697 * @param h handle to core
1698 * @param peer_cb function to call with the peer information
1699 * @param cb_cls closure for peer_cb
1700 * @return GNUNET_OK if iterating, GNUNET_SYSERR on error
1701 */
1702int
1703GNUNET_CORE_iterate_peers (struct GNUNET_CORE_Handle *h,
1704 GNUNET_CORE_ConnectEventHandler peer_cb,
1705 void *cb_cls)
1706{
1707 struct IterationContext ic;
1708
1709 ic.peer_cb = peer_cb;
1710 ic.cb_cls = cb_cls;
1711 GNUNET_CONTAINER_multihashmap_iterate (h->peers,
1712 &iterate_peers,
1713 &ic);
1714 return GNUNET_OK;
984} 1715}
985 1716
986 1717