diff options
Diffstat (limited to 'src/core/core_api.c')
-rw-r--r-- | src/core/core_api.c | 1537 |
1 files changed, 1134 insertions, 403 deletions
diff --git a/src/core/core_api.c b/src/core/core_api.c index a1e6aea65..9500a1316 100644 --- a/src/core/core_api.c +++ b/src/core/core_api.c | |||
@@ -23,6 +23,9 @@ | |||
23 | * @brief core service; this is the main API for encrypted P2P | 23 | * @brief core service; this is the main API for encrypted P2P |
24 | * communications | 24 | * communications |
25 | * @author Christian Grothoff | 25 | * @author Christian Grothoff |
26 | * | ||
27 | * TODO: | ||
28 | * - implement atsi parsing and passing | ||
26 | */ | 29 | */ |
27 | #include "platform.h" | 30 | #include "platform.h" |
28 | #include "gnunet_constants.h" | 31 | #include "gnunet_constants.h" |
@@ -31,12 +34,124 @@ | |||
31 | 34 | ||
32 | 35 | ||
33 | /** | 36 | /** |
37 | * Information we track for each peer. | ||
38 | */ | ||
39 | struct PeerRecord | ||
40 | { | ||
41 | |||
42 | /** | ||
43 | * We generally do NOT keep peer records in a DLL; this | ||
44 | * DLL is only used IF this peer's 'pending_head' message | ||
45 | * is ready for transmission. | ||
46 | */ | ||
47 | struct PeerRecord *prev; | ||
48 | |||
49 | /** | ||
50 | * We generally do NOT keep peer records in a DLL; this | ||
51 | * DLL is only used IF this peer's 'pending_head' message | ||
52 | * is ready for transmission. | ||
53 | */ | ||
54 | struct PeerRecord *next; | ||
55 | |||
56 | /** | ||
57 | * Peer the record is about. | ||
58 | */ | ||
59 | struct GNUNET_PeerIdentity peer; | ||
60 | |||
61 | /** | ||
62 | * Corresponding core handle. | ||
63 | */ | ||
64 | struct GNUNET_CORE_Handle *ch; | ||
65 | |||
66 | /** | ||
67 | * Head of doubly-linked list of pending requests. | ||
68 | * Requests are sorted by deadline *except* for HEAD, | ||
69 | * which is only modified upon transmission to core. | ||
70 | */ | ||
71 | struct GNUNET_CORE_TransmitHandle *pending_head; | ||
72 | |||
73 | /** | ||
74 | * Tail of doubly-linked list of pending requests. | ||
75 | */ | ||
76 | struct GNUNET_CORE_TransmitHandle *pending_tail; | ||
77 | |||
78 | /** | ||
79 | * Pending callback waiting for peer information, or NULL for none. | ||
80 | */ | ||
81 | GNUNET_CORE_PeerConfigurationInfoCallback pcic; | ||
82 | |||
83 | /** | ||
84 | * Closure for pcic. | ||
85 | */ | ||
86 | void *pcic_cls; | ||
87 | |||
88 | /** | ||
89 | * Request information ID for the given pcic (needed in case a | ||
90 | * request is cancelled after being submitted to core and a new | ||
91 | * one is generated; in this case, we need to avoid matching the | ||
92 | * reply to the first (cancelled) request to the second request). | ||
93 | */ | ||
94 | uint32_t rim_id; | ||
95 | |||
96 | /** | ||
97 | * ID of timeout task for the 'pending_head' handle | ||
98 | * which is the one with the smallest timeout. | ||
99 | */ | ||
100 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; | ||
101 | |||
102 | /** | ||
103 | * Current size of the queue of pending requests. | ||
104 | */ | ||
105 | unsigned int queue_size; | ||
106 | |||
107 | /** | ||
108 | * SendMessageRequest ID generator for this peer. | ||
109 | */ | ||
110 | uint16_t smr_id_gen; | ||
111 | |||
112 | }; | ||
113 | |||
114 | |||
115 | /** | ||
116 | * Entry in a doubly-linked list of control messages to be transmitted | ||
117 | * to the core service. Control messages include traffic allocation, | ||
118 | * connection requests and of course our initial 'init' request. | ||
119 | * | ||
120 | * The actual message is allocated at the end of this struct. | ||
121 | */ | ||
122 | struct ControlMessage | ||
123 | { | ||
124 | /** | ||
125 | * This is a doubly-linked list. | ||
126 | */ | ||
127 | struct ControlMessage *next; | ||
128 | |||
129 | /** | ||
130 | * This is a doubly-linked list. | ||
131 | */ | ||
132 | struct ControlMessage *prev; | ||
133 | |||
134 | /** | ||
135 | * Function to run after successful transmission (or call with | ||
136 | * reason 'TIMEOUT' on error). | ||
137 | */ | ||
138 | GNUNET_SCHEDULER_Task cont; | ||
139 | |||
140 | /** | ||
141 | * Closure for 'cont'. | ||
142 | */ | ||
143 | void *cont_cls; | ||
144 | |||
145 | }; | ||
146 | |||
147 | |||
148 | |||
149 | /** | ||
34 | * Context for the core service connection. | 150 | * Context for the core service connection. |
35 | */ | 151 | */ |
36 | struct GNUNET_CORE_Handle | 152 | struct GNUNET_CORE_Handle |
37 | { | 153 | { |
38 | 154 | ||
39 | |||
40 | /** | 155 | /** |
41 | * Configuration we're using. | 156 | * Configuration we're using. |
42 | */ | 157 | */ |
@@ -83,9 +198,9 @@ struct GNUNET_CORE_Handle | |||
83 | const struct GNUNET_CORE_MessageHandler *handlers; | 198 | const struct GNUNET_CORE_MessageHandler *handlers; |
84 | 199 | ||
85 | /** | 200 | /** |
86 | * Our connection to the service for notifications. | 201 | * Our connection to the service. |
87 | */ | 202 | */ |
88 | struct GNUNET_CLIENT_Connection *client_notifications; | 203 | struct GNUNET_CLIENT_Connection *client; |
89 | 204 | ||
90 | /** | 205 | /** |
91 | * Handle for our current transmission request. | 206 | * Handle for our current transmission request. |
@@ -95,41 +210,43 @@ struct GNUNET_CORE_Handle | |||
95 | /** | 210 | /** |
96 | * Head of doubly-linked list of pending requests. | 211 | * Head of doubly-linked list of pending requests. |
97 | */ | 212 | */ |
98 | struct GNUNET_CORE_TransmitHandle *pending_head; | 213 | struct ControlMessage *pending_head; |
99 | 214 | ||
100 | /** | 215 | /** |
101 | * Tail of doubly-linked list of pending requests. | 216 | * Tail of doubly-linked list of pending requests. |
102 | */ | 217 | */ |
103 | struct GNUNET_CORE_TransmitHandle *pending_tail; | 218 | struct ControlMessage *pending_tail; |
104 | 219 | ||
105 | /** | 220 | /** |
106 | * Currently submitted request (or NULL) | 221 | * Head of doubly-linked list of peers that are core-approved |
222 | * to send their next message. | ||
107 | */ | 223 | */ |
108 | struct GNUNET_CORE_TransmitHandle *submitted; | 224 | struct PeerRecord *ready_peer_head; |
109 | 225 | ||
110 | /** | 226 | /** |
111 | * Currently submitted request based on solicitation (or NULL) | 227 | * Tail of doubly-linked list of peers that are core-approved |
228 | * to send their next message. | ||
112 | */ | 229 | */ |
113 | struct GNUNET_CORE_TransmitHandle *solicit_transmit_req; | 230 | struct PeerRecord *ready_peer_tail; |
114 | 231 | ||
115 | /** | 232 | /** |
116 | * Buffer where we store a message for transmission in response | 233 | * Hash map listing all of the peers that we are currently |
117 | * to a traffic solicitation (or NULL). | 234 | * connected to. |
118 | */ | 235 | */ |
119 | char *solicit_buffer; | 236 | struct GNUNET_CONTAINER_MultiHashMap *peers; |
120 | 237 | ||
121 | /** | 238 | /** |
122 | * How long to wait until we time out the connection attempt? | 239 | * ID of reconnect task (if any). |
123 | */ | 240 | */ |
124 | struct GNUNET_TIME_Absolute startup_timeout; | 241 | GNUNET_SCHEDULER_TaskIdentifier reconnect_task; |
125 | 242 | ||
126 | /** | 243 | /** |
127 | * ID of reconnect task (if any). | 244 | * Request information ID generator. |
128 | */ | 245 | */ |
129 | GNUNET_SCHEDULER_TaskIdentifier reconnect_task; | 246 | uint32_t rim_id_gen; |
130 | 247 | ||
131 | /** | 248 | /** |
132 | * Number of messages we should queue per target. | 249 | * Number of messages we are allowed to queue per target. |
133 | */ | 250 | */ |
134 | unsigned int queue_size; | 251 | unsigned int queue_size; |
135 | 252 | ||
@@ -155,6 +272,7 @@ struct GNUNET_CORE_Handle | |||
155 | * requests? | 272 | * requests? |
156 | */ | 273 | */ |
157 | int currently_down; | 274 | int currently_down; |
275 | |||
158 | }; | 276 | }; |
159 | 277 | ||
160 | 278 | ||
@@ -175,9 +293,15 @@ struct GNUNET_CORE_TransmitHandle | |||
175 | struct GNUNET_CORE_TransmitHandle *prev; | 293 | struct GNUNET_CORE_TransmitHandle *prev; |
176 | 294 | ||
177 | /** | 295 | /** |
178 | * Corresponding core handle. | 296 | * Corresponding peer record. |
179 | */ | 297 | */ |
180 | struct GNUNET_CORE_Handle *ch; | 298 | struct PeerRecord *peer; |
299 | |||
300 | /** | ||
301 | * Corresponding SEND_REQUEST message. Only non-NULL | ||
302 | * while SEND_REQUEST message is pending. | ||
303 | */ | ||
304 | struct ControlMessage *cm; | ||
181 | 305 | ||
182 | /** | 306 | /** |
183 | * Function that will be called to get the actual request | 307 | * Function that will be called to get the actual request |
@@ -193,32 +317,11 @@ struct GNUNET_CORE_TransmitHandle | |||
193 | void *get_message_cls; | 317 | void *get_message_cls; |
194 | 318 | ||
195 | /** | 319 | /** |
196 | * If this entry is for a transmission request, pointer | ||
197 | * to the notify callback; otherwise NULL. | ||
198 | */ | ||
199 | GNUNET_CONNECTION_TransmitReadyNotify notify; | ||
200 | |||
201 | /** | ||
202 | * Closure for notify. | ||
203 | */ | ||
204 | void *notify_cls; | ||
205 | |||
206 | /** | ||
207 | * Peer the request is about. | ||
208 | */ | ||
209 | struct GNUNET_PeerIdentity peer; | ||
210 | |||
211 | /** | ||
212 | * Timeout for this handle. | 320 | * Timeout for this handle. |
213 | */ | 321 | */ |
214 | struct GNUNET_TIME_Absolute timeout; | 322 | struct GNUNET_TIME_Absolute timeout; |
215 | 323 | ||
216 | /** | 324 | /** |
217 | * ID of timeout task. | ||
218 | */ | ||
219 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; | ||
220 | |||
221 | /** | ||
222 | * How important is this message? | 325 | * How important is this message? |
223 | */ | 326 | */ |
224 | uint32_t priority; | 327 | uint32_t priority; |
@@ -228,54 +331,127 @@ struct GNUNET_CORE_TransmitHandle | |||
228 | */ | 331 | */ |
229 | uint16_t msize; | 332 | uint16_t msize; |
230 | 333 | ||
334 | /** | ||
335 | * Send message request ID for this request. | ||
336 | */ | ||
337 | uint16_t smr_id; | ||
231 | 338 | ||
232 | }; | 339 | }; |
233 | 340 | ||
234 | 341 | ||
342 | /** | ||
343 | * Our current client connection went down. Clean it up | ||
344 | * and try to reconnect! | ||
345 | * | ||
346 | * @param h our handle to the core service | ||
347 | */ | ||
348 | static void | ||
349 | reconnect (struct GNUNET_CORE_Handle *h); | ||
350 | |||
351 | |||
352 | /** | ||
353 | * Task schedule to try to re-connect to core. | ||
354 | * | ||
355 | * @param cls the 'struct GNUNET_CORE_Handle' | ||
356 | * @param tc task context | ||
357 | */ | ||
358 | static void | ||
359 | reconnect_task (void *cls, | ||
360 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
361 | { | ||
362 | struct GNUNET_CORE_Handle *h = cls; | ||
363 | |||
364 | h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; | ||
365 | reconnect (h); | ||
366 | } | ||
367 | |||
368 | |||
369 | /** | ||
370 | * Check the list of pending requests, send the next | ||
371 | * one to the core. | ||
372 | * | ||
373 | * @param h core handle | ||
374 | */ | ||
235 | static void | 375 | static void |
236 | reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | 376 | trigger_next_request (struct GNUNET_CORE_Handle *h); |
237 | 377 | ||
238 | 378 | ||
239 | /** | 379 | /** |
240 | * Function called when we are ready to transmit our | 380 | * The given request hit its timeout. Remove from the |
241 | * "START" message (or when this operation timed out). | 381 | * doubly-linked list and call the respective continuation. |
242 | * | 382 | * |
243 | * @param cls closure | 383 | * @param cls the transmit handle of the request that timed out |
244 | * @param size number of bytes available in buf | 384 | * @param tc context, can be NULL (!) |
245 | * @param buf where the callee should write the message | ||
246 | * @return number of bytes written to buf | ||
247 | */ | 385 | */ |
248 | static size_t transmit_start (void *cls, size_t size, void *buf); | 386 | static void |
387 | transmission_timeout (void *cls, | ||
388 | const struct GNUNET_SCHEDULER_TaskContext *tc); | ||
249 | 389 | ||
250 | 390 | ||
251 | /** | 391 | /** |
252 | * Our current client connection went down. Clean it up | 392 | * Control message was sent, mark it as such. |
253 | * and try to reconnect! | ||
254 | * | 393 | * |
255 | * @param h our handle to the core service | 394 | * @param cls the 'struct GNUNET_CORE_TransmitHandle*' |
395 | * @param tc scheduler context | ||
256 | */ | 396 | */ |
257 | static void | 397 | static void |
258 | reconnect (struct GNUNET_CORE_Handle *h) | 398 | mark_control_message_sent (void *cls, |
399 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
259 | { | 400 | { |
260 | #if DEBUG_CORE | 401 | struct GNUNET_CORE_TransmitHandle *th = cls; |
261 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 402 | |
262 | "Reconnecting to CORE service\n"); | 403 | th->cm = NULL; |
263 | #endif | 404 | } |
264 | if (h->client_notifications != NULL) | 405 | |
265 | GNUNET_CLIENT_disconnect (h->client_notifications, GNUNET_NO); | 406 | |
266 | h->currently_down = GNUNET_YES; | 407 | /** |
267 | h->client_notifications = GNUNET_CLIENT_connect ("core", h->cfg); | 408 | * Send a control message to the peer asking for transmission |
268 | if (h->client_notifications == NULL) | 409 | * of the message in the given peer record. |
269 | h->reconnect_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, | 410 | * |
270 | &reconnect_task, | 411 | * @param pr peer to request transmission to |
271 | h); | 412 | */ |
272 | else | 413 | static void |
273 | h->cth = GNUNET_CLIENT_notify_transmit_ready (h->client_notifications, | 414 | request_next_transmission (struct PeerRecord *pr) |
274 | sizeof (struct InitMessage) + | 415 | { |
275 | sizeof (uint16_t) * h->hcnt, | 416 | struct GNUNET_CORE_Handle *h = pr->ch; |
276 | GNUNET_TIME_UNIT_SECONDS, | 417 | struct ControlMessage *cm; |
277 | GNUNET_NO, | 418 | struct SendMessageRequest *smr; |
278 | &transmit_start, h); | 419 | struct GNUNET_CORE_TransmitHandle *th; |
420 | |||
421 | if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK) | ||
422 | { | ||
423 | GNUNET_SCHEDULER_cancel (pr->timeout_task); | ||
424 | pr->timeout_task = GNUNET_SCHEDULER_NO_TASK; | ||
425 | } | ||
426 | if (NULL == (th = pr->pending_head)) | ||
427 | { | ||
428 | trigger_next_request (h); | ||
429 | return; | ||
430 | } | ||
431 | GNUNET_assert (pr->prev == NULL); | ||
432 | GNUNET_assert (pr->next == NULL); | ||
433 | pr->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (th->timeout), | ||
434 | &transmission_timeout, | ||
435 | pr); | ||
436 | cm = GNUNET_malloc (sizeof (struct ControlMessage) + | ||
437 | sizeof (struct SendMessageRequest)); | ||
438 | cm->cont = &mark_control_message_sent; | ||
439 | cm->cont_cls = th; | ||
440 | th->cm = cm; | ||
441 | smr = (struct SendMessageRequest*) &cm[1]; | ||
442 | smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST); | ||
443 | smr->header.size = htons (sizeof (struct SendMessageRequest)); | ||
444 | smr->priority = htonl (th->priority); | ||
445 | smr->deadline = GNUNET_TIME_absolute_hton (th->timeout); | ||
446 | smr->peer = pr->peer; | ||
447 | smr->queue_size = htonl (pr->queue_size); | ||
448 | smr->size = htons (th->msize); | ||
449 | smr->smr_id = htons (th->smr_id = pr->smr_id_gen++); | ||
450 | GNUNET_CONTAINER_DLL_insert_after (h->pending_head, | ||
451 | h->pending_tail, | ||
452 | h->pending_tail, | ||
453 | cm); | ||
454 | trigger_next_request (h); | ||
279 | } | 455 | } |
280 | 456 | ||
281 | 457 | ||
@@ -287,97 +463,225 @@ reconnect (struct GNUNET_CORE_Handle *h) | |||
287 | * @param tc context, can be NULL (!) | 463 | * @param tc context, can be NULL (!) |
288 | */ | 464 | */ |
289 | static void | 465 | static void |
290 | timeout_request (void *cls, | 466 | transmission_timeout (void *cls, |
291 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 467 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
292 | { | 468 | { |
293 | struct GNUNET_CORE_TransmitHandle *th = cls; | 469 | struct PeerRecord *pr = cls; |
470 | struct GNUNET_CORE_TransmitHandle *th; | ||
294 | 471 | ||
295 | th->timeout_task = GNUNET_SCHEDULER_NO_TASK; | 472 | pr->timeout_task = GNUNET_SCHEDULER_NO_TASK; |
296 | GNUNET_CONTAINER_DLL_remove (th->ch->pending_head, | 473 | th = pr->pending_head; |
297 | th->ch->pending_tail, | 474 | GNUNET_CONTAINER_DLL_remove (pr->pending_head, |
475 | pr->pending_tail, | ||
298 | th); | 476 | th); |
477 | pr->queue_size--; | ||
299 | #if DEBUG_CORE | 478 | #if DEBUG_CORE |
300 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 479 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
301 | "Signalling timeout of request for transmission to CORE service\n"); | 480 | "Signalling timeout of request for transmission to CORE service\n"); |
302 | #endif | 481 | #endif |
303 | GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL)); | 482 | GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL)); |
483 | request_next_transmission (pr); | ||
304 | } | 484 | } |
305 | 485 | ||
306 | 486 | ||
307 | /** | 487 | /** |
308 | * Function called when we are ready to transmit a request from our | 488 | * Transmit the next message to the core service. |
309 | * request list (or when this operation timed out). | ||
310 | * | ||
311 | * @param cls closure | ||
312 | * @param size number of bytes available in buf | ||
313 | * @param buf where the callee should write the message | ||
314 | * @return number of bytes written to buf | ||
315 | */ | 489 | */ |
316 | static size_t | 490 | static size_t |
317 | request_start (void *cls, size_t size, void *buf) | 491 | transmit_message (void *cls, |
492 | size_t size, | ||
493 | void *buf) | ||
318 | { | 494 | { |
319 | struct GNUNET_CORE_Handle *h = cls; | 495 | struct GNUNET_CORE_Handle *h = cls; |
496 | struct ControlMessage *cm; | ||
320 | struct GNUNET_CORE_TransmitHandle *th; | 497 | struct GNUNET_CORE_TransmitHandle *th; |
498 | struct PeerRecord *pr; | ||
499 | struct SendMessage *sm; | ||
500 | const struct GNUNET_MessageHeader *hdr; | ||
501 | uint16_t msize; | ||
321 | size_t ret; | 502 | size_t ret; |
322 | 503 | ||
323 | h->cth = NULL; | 504 | h->cth = NULL; |
324 | th = h->pending_head; | ||
325 | if (th == NULL) | ||
326 | return 0; | ||
327 | if (buf == NULL) | 505 | if (buf == NULL) |
328 | { | 506 | { |
329 | if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK) | 507 | reconnect (h); |
508 | return 0; | ||
509 | } | ||
510 | /* first check for control messages */ | ||
511 | if (NULL != (cm = h->pending_head)) | ||
512 | { | ||
513 | hdr = (const struct GNUNET_MessageHeader*) &cm[1]; | ||
514 | msize = ntohs (hdr->size); | ||
515 | if (size < msize) | ||
330 | { | 516 | { |
331 | GNUNET_SCHEDULER_cancel(th->timeout_task); | 517 | trigger_next_request (h); |
332 | th->timeout_task = GNUNET_SCHEDULER_NO_TASK; | 518 | return 0; |
333 | } | 519 | } |
334 | timeout_request (th, NULL); | 520 | memcpy (buf, hdr, msize); |
335 | return 0; | 521 | GNUNET_CONTAINER_DLL_remove (h->pending_head, |
522 | h->pending_tail, | ||
523 | cm); | ||
524 | if (NULL != cm->cont) | ||
525 | GNUNET_SCHEDULER_add_now (cm->cont, cm->cont_cls); | ||
526 | GNUNET_free (cm); | ||
527 | trigger_next_request (h); | ||
528 | return msize; | ||
336 | } | 529 | } |
337 | GNUNET_CONTAINER_DLL_remove (h->pending_head, | 530 | /* now check for 'ready' P2P messages */ |
338 | h->pending_tail, | 531 | if (NULL != (pr = h->ready_peer_head)) |
339 | th); | 532 | { |
340 | GNUNET_assert (h->submitted == NULL); | 533 | th = pr->pending_head; |
341 | h->submitted = th; | 534 | if (size < th->msize + sizeof (struct SendMessage)) |
342 | GNUNET_assert (size >= th->msize); | 535 | { |
343 | ret = th->get_message (th->get_message_cls, size, buf); | 536 | trigger_next_request (h); |
344 | GNUNET_assert (ret <= size); | 537 | return 0; |
538 | } | ||
539 | GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, | ||
540 | h->ready_peer_tail, | ||
541 | pr); | ||
542 | GNUNET_CONTAINER_DLL_remove (pr->pending_head, | ||
543 | pr->pending_tail, | ||
544 | th); | ||
545 | pr->queue_size--; | ||
546 | if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK) | ||
547 | { | ||
548 | GNUNET_SCHEDULER_cancel (pr->timeout_task); | ||
549 | pr->timeout_task = GNUNET_SCHEDULER_NO_TASK; | ||
550 | } | ||
551 | |||
552 | sm = (struct SendMessage *) buf; | ||
553 | sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND); | ||
554 | sm->priority = htonl (th->priority); | ||
555 | sm->deadline = GNUNET_TIME_absolute_hton (th->timeout); | ||
556 | sm->peer = pr->peer; | ||
557 | ret = th->get_message (th->get_message_cls, | ||
558 | size - sizeof (struct SendMessage), | ||
559 | &sm[1]); | ||
560 | |||
561 | if (0 == ret) | ||
562 | { | ||
345 | #if DEBUG_CORE | 563 | #if DEBUG_CORE |
346 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 564 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
347 | "Transmitting %u bytes to core\n", | 565 | "Size of clients message to peer %s is 0!\n", |
348 | ret); | 566 | GNUNET_i2s(&pr->peer)); |
349 | #endif | 567 | #endif |
350 | return ret; | 568 | /* client decided to send nothing! */ |
569 | request_next_transmission (pr); | ||
570 | return 0; | ||
571 | } | ||
572 | #if DEBUG_CORE | ||
573 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
574 | "Produced SEND message to core with %u bytes payload\n", | ||
575 | (unsigned int) ret); | ||
576 | #endif | ||
577 | GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader)); | ||
578 | if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) | ||
579 | { | ||
580 | GNUNET_break (0); | ||
581 | request_next_transmission (pr); | ||
582 | return 0; | ||
583 | } | ||
584 | ret += sizeof (struct SendMessage); | ||
585 | sm->header.size = htons (ret); | ||
586 | GNUNET_assert (ret <= size); | ||
587 | GNUNET_free (th); | ||
588 | request_next_transmission (pr); | ||
589 | return ret; | ||
590 | } | ||
591 | return 0; | ||
351 | } | 592 | } |
352 | 593 | ||
353 | 594 | ||
354 | /** | 595 | /** |
355 | * Check the list of pending requests, send the next | 596 | * Check the list of pending requests, send the next |
356 | * one to the core. | 597 | * one to the core. |
598 | * | ||
599 | * @param h core handle | ||
357 | */ | 600 | */ |
358 | static void | 601 | static void |
359 | trigger_next_request (struct GNUNET_CORE_Handle *h) | 602 | trigger_next_request (struct GNUNET_CORE_Handle *h) |
360 | { | 603 | { |
604 | uint16_t msize; | ||
605 | |||
606 | if (GNUNET_YES == h->currently_down) | ||
607 | return; | ||
608 | if (NULL != h->cth) | ||
609 | return; | ||
610 | if (h->pending_head != NULL) | ||
611 | msize = ntohs (((struct GNUNET_MessageHeader*) &h->pending_head[1])->size); | ||
612 | else if (h->ready_peer_head != NULL) | ||
613 | msize = h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage); | ||
614 | else | ||
615 | return; /* no pending message */ | ||
616 | h->cth = GNUNET_CLIENT_notify_transmit_ready (h->client, | ||
617 | msize, | ||
618 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
619 | GNUNET_NO, | ||
620 | &transmit_message, h); | ||
621 | } | ||
622 | |||
623 | |||
624 | |||
625 | |||
626 | /** | ||
627 | * Notify clients about disconnect and free | ||
628 | * the entry for connected peer. | ||
629 | * | ||
630 | * @param cls the 'struct GNUNET_CORE_Handle*' | ||
631 | * @param key the peer identity (not used) | ||
632 | * @param value the 'struct PeerRecord' to free. | ||
633 | * @return GNUNET_YES (continue) | ||
634 | */ | ||
635 | static int | ||
636 | disconnect_and_free_peer_entry (void *cls, | ||
637 | const GNUNET_HashCode *key, | ||
638 | void *value) | ||
639 | { | ||
640 | struct GNUNET_CORE_Handle *h = cls; | ||
361 | struct GNUNET_CORE_TransmitHandle *th; | 641 | struct GNUNET_CORE_TransmitHandle *th; |
642 | struct PeerRecord *pr = value; | ||
362 | 643 | ||
363 | if (h->currently_down) | 644 | while (NULL != (th = pr->pending_head)) |
364 | { | 645 | { |
365 | #if DEBUG_CORE | 646 | GNUNET_CONTAINER_DLL_remove (pr->pending_head, |
366 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 647 | pr->pending_tail, |
367 | "In trigger_next_request, connection currently down...\n"); | 648 | th); |
368 | #endif | 649 | pr->queue_size--; |
369 | return; /* connection temporarily down */ | 650 | GNUNET_assert (0 == |
651 | th->get_message (th->get_message_cls, | ||
652 | 0, NULL)); | ||
653 | GNUNET_free (th); | ||
370 | } | 654 | } |
371 | if (NULL == (th = h->pending_head)) | 655 | if (pr->pcic != NULL) |
372 | return; /* no requests pending */ | 656 | { |
373 | GNUNET_assert (NULL == h->cth); | 657 | // FIXME: call pcic callback! |
374 | h->cth = GNUNET_CLIENT_notify_transmit_ready (h->client_notifications, | 658 | } |
375 | th->msize, | 659 | if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK) |
376 | GNUNET_TIME_absolute_get_remaining | 660 | { |
377 | (th->timeout), | 661 | GNUNET_SCHEDULER_cancel (pr->timeout_task); |
378 | GNUNET_NO, | 662 | pr->timeout_task = GNUNET_SCHEDULER_NO_TASK; |
379 | &request_start, | 663 | } |
380 | h); | 664 | GNUNET_assert (pr->queue_size == 0); |
665 | if ( (pr->prev != NULL) || | ||
666 | (pr->next != NULL) || | ||
667 | (h->ready_peer_head == pr) ) | ||
668 | GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, | ||
669 | h->ready_peer_tail, | ||
670 | pr); | ||
671 | if (h->disconnects != NULL) | ||
672 | h->disconnects (h->cls, | ||
673 | &pr->peer); | ||
674 | GNUNET_assert (GNUNET_YES == | ||
675 | GNUNET_CONTAINER_multihashmap_remove (h->peers, | ||
676 | key, | ||
677 | pr)); | ||
678 | GNUNET_assert (pr->pending_head == NULL); | ||
679 | GNUNET_assert (pr->pending_tail == NULL); | ||
680 | GNUNET_assert (pr->ch = h); | ||
681 | GNUNET_assert (pr->queue_size == 0); | ||
682 | GNUNET_assert (pr->timeout_task == GNUNET_SCHEDULER_NO_TASK); | ||
683 | GNUNET_free (pr); | ||
684 | return GNUNET_YES; | ||
381 | } | 685 | } |
382 | 686 | ||
383 | 687 | ||
@@ -388,18 +692,28 @@ trigger_next_request (struct GNUNET_CORE_Handle *h) | |||
388 | * @param msg the message received from the core service | 692 | * @param msg the message received from the core service |
389 | */ | 693 | */ |
390 | static void | 694 | static void |
391 | main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) | 695 | main_notify_handler (void *cls, |
696 | const struct GNUNET_MessageHeader *msg) | ||
392 | { | 697 | { |
393 | struct GNUNET_CORE_Handle *h = cls; | 698 | struct GNUNET_CORE_Handle *h = cls; |
394 | unsigned int hpos; | 699 | const struct InitReplyMessage *m; |
395 | const struct ConnectNotifyMessage *cnm; | 700 | const struct ConnectNotifyMessage *cnm; |
396 | const struct DisconnectNotifyMessage *dnm; | 701 | const struct DisconnectNotifyMessage *dnm; |
397 | const struct NotifyTrafficMessage *ntm; | 702 | const struct NotifyTrafficMessage *ntm; |
398 | const struct GNUNET_MessageHeader *em; | 703 | const struct GNUNET_MessageHeader *em; |
704 | const struct ConfigurationInfoMessage *cim; | ||
399 | const struct PeerStatusNotifyMessage *psnm; | 705 | const struct PeerStatusNotifyMessage *psnm; |
706 | const struct SendMessageReady *smr; | ||
707 | const struct GNUNET_CORE_MessageHandler *mh; | ||
708 | GNUNET_CORE_StartupCallback init; | ||
709 | GNUNET_CORE_PeerConfigurationInfoCallback pcic; | ||
710 | struct GNUNET_PeerIdentity my_identity; | ||
711 | struct PeerRecord *pr; | ||
712 | struct GNUNET_CORE_TransmitHandle *th; | ||
713 | unsigned int hpos; | ||
714 | int trigger; | ||
400 | uint16_t msize; | 715 | uint16_t msize; |
401 | uint16_t et; | 716 | uint16_t et; |
402 | const struct GNUNET_CORE_MessageHandler *mh; | ||
403 | 717 | ||
404 | if (msg == NULL) | 718 | if (msg == NULL) |
405 | { | 719 | { |
@@ -417,37 +731,85 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
417 | #endif | 731 | #endif |
418 | switch (ntohs (msg->type)) | 732 | switch (ntohs (msg->type)) |
419 | { | 733 | { |
734 | case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY: | ||
735 | if (ntohs (msg->size) != sizeof (struct InitReplyMessage)) | ||
736 | { | ||
737 | GNUNET_break (0); | ||
738 | reconnect (h); | ||
739 | return; | ||
740 | } | ||
741 | m = (const struct InitReplyMessage *) msg; | ||
742 | GNUNET_break (0 == ntohl (m->reserved)); | ||
743 | /* start our message processing loop */ | ||
744 | #if DEBUG_CORE | ||
745 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
746 | "Successfully connected to core service, starting processing loop.\n"); | ||
747 | #endif | ||
748 | if (GNUNET_YES == h->currently_down) | ||
749 | { | ||
750 | h->currently_down = GNUNET_NO; | ||
751 | trigger_next_request (h); | ||
752 | } | ||
753 | if (NULL != (init = h->init)) | ||
754 | { | ||
755 | /* mark so we don't call init on reconnect */ | ||
756 | h->init = NULL; | ||
757 | GNUNET_CRYPTO_hash (&m->publicKey, | ||
758 | sizeof (struct | ||
759 | GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), | ||
760 | &my_identity.hashPubKey); | ||
761 | init (h->cls, h, &my_identity, &m->publicKey); | ||
762 | } | ||
763 | break; | ||
420 | case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT: | 764 | case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT: |
421 | if (NULL == h->connects) | ||
422 | { | ||
423 | GNUNET_break (0); | ||
424 | break; | ||
425 | } | ||
426 | if (msize != sizeof (struct ConnectNotifyMessage)) | 765 | if (msize != sizeof (struct ConnectNotifyMessage)) |
427 | { | 766 | { |
428 | GNUNET_break (0); | 767 | GNUNET_break (0); |
429 | break; | 768 | break; |
430 | } | 769 | } |
431 | cnm = (const struct ConnectNotifyMessage *) msg; | 770 | cnm = (const struct ConnectNotifyMessage *) msg; |
432 | h->connects (h->cls, | 771 | pr = GNUNET_CONTAINER_multihashmap_get (h->peers, |
433 | &cnm->peer, | 772 | &cnm->peer.hashPubKey); |
434 | GNUNET_TIME_relative_ntoh (cnm->latency), | 773 | if (pr != NULL) |
435 | ntohl (cnm->distance)); | 774 | { |
775 | GNUNET_break (0); | ||
776 | reconnect (h); | ||
777 | return; | ||
778 | } | ||
779 | pr = GNUNET_malloc (sizeof (struct PeerRecord)); | ||
780 | pr->peer = cnm->peer; | ||
781 | pr->ch = h; | ||
782 | GNUNET_assert (GNUNET_YES == | ||
783 | GNUNET_CONTAINER_multihashmap_put (h->peers, | ||
784 | &cnm->peer.hashPubKey, | ||
785 | pr, | ||
786 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); | ||
787 | if (NULL != h->connects) | ||
788 | h->connects (h->cls, | ||
789 | &cnm->peer, | ||
790 | NULL /* FIXME: atsi! */); | ||
436 | break; | 791 | break; |
437 | case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT: | 792 | case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT: |
438 | if (NULL == h->disconnects) | ||
439 | { | ||
440 | GNUNET_break (0); | ||
441 | break; | ||
442 | } | ||
443 | if (msize != sizeof (struct DisconnectNotifyMessage)) | 793 | if (msize != sizeof (struct DisconnectNotifyMessage)) |
444 | { | 794 | { |
445 | GNUNET_break (0); | 795 | GNUNET_break (0); |
446 | break; | 796 | break; |
447 | } | 797 | } |
448 | dnm = (const struct DisconnectNotifyMessage *) msg; | 798 | dnm = (const struct DisconnectNotifyMessage *) msg; |
449 | h->disconnects (h->cls, | 799 | pr = GNUNET_CONTAINER_multihashmap_get (h->peers, |
450 | &dnm->peer); | 800 | &dnm->peer.hashPubKey); |
801 | if (pr == NULL) | ||
802 | { | ||
803 | GNUNET_break (0); | ||
804 | reconnect (h); | ||
805 | return; | ||
806 | } | ||
807 | trigger = ( (pr->prev != NULL) || | ||
808 | (pr->next != NULL) || | ||
809 | (h->ready_peer_head == pr) ); | ||
810 | disconnect_and_free_peer_entry (h, &dnm->peer.hashPubKey, pr); | ||
811 | if (trigger) | ||
812 | trigger_next_request (h); | ||
451 | break; | 813 | break; |
452 | case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE: | 814 | case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE: |
453 | if (NULL == h->status_events) | 815 | if (NULL == h->status_events) |
@@ -461,13 +823,20 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
461 | break; | 823 | break; |
462 | } | 824 | } |
463 | psnm = (const struct PeerStatusNotifyMessage *) msg; | 825 | psnm = (const struct PeerStatusNotifyMessage *) msg; |
826 | pr = GNUNET_CONTAINER_multihashmap_get (h->peers, | ||
827 | &psnm->peer.hashPubKey); | ||
828 | if (pr == NULL) | ||
829 | { | ||
830 | GNUNET_break (0); | ||
831 | reconnect (h); | ||
832 | return; | ||
833 | } | ||
464 | h->status_events (h->cls, | 834 | h->status_events (h->cls, |
465 | &psnm->peer, | 835 | &psnm->peer, |
466 | GNUNET_TIME_relative_ntoh (psnm->latency), | ||
467 | ntohl (psnm->distance), | ||
468 | psnm->bandwidth_in, | 836 | psnm->bandwidth_in, |
469 | psnm->bandwidth_out, | 837 | psnm->bandwidth_out, |
470 | GNUNET_TIME_absolute_ntoh (psnm->timeout)); | 838 | GNUNET_TIME_absolute_ntoh (psnm->timeout), |
839 | NULL /* FIXME: atsi */); | ||
471 | break; | 840 | break; |
472 | case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND: | 841 | case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND: |
473 | if (msize < | 842 | if (msize < |
@@ -486,6 +855,14 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
486 | ntohs (em->size), | 855 | ntohs (em->size), |
487 | GNUNET_i2s (&ntm->peer)); | 856 | GNUNET_i2s (&ntm->peer)); |
488 | #endif | 857 | #endif |
858 | pr = GNUNET_CONTAINER_multihashmap_get (h->peers, | ||
859 | &ntm->peer.hashPubKey); | ||
860 | if (pr == NULL) | ||
861 | { | ||
862 | GNUNET_break (0); | ||
863 | reconnect (h); | ||
864 | return; | ||
865 | } | ||
489 | if ((GNUNET_NO == h->inbound_hdr_only) && | 866 | if ((GNUNET_NO == h->inbound_hdr_only) && |
490 | (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage))) | 867 | (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage))) |
491 | { | 868 | { |
@@ -506,8 +883,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
506 | } | 883 | } |
507 | if (GNUNET_OK != | 884 | if (GNUNET_OK != |
508 | h->handlers[hpos].callback (h->cls, &ntm->peer, em, | 885 | h->handlers[hpos].callback (h->cls, &ntm->peer, em, |
509 | GNUNET_TIME_relative_ntoh (ntm->latency), | 886 | NULL /* FIXME: atsi */)) |
510 | ntohl (ntm->distance))) | ||
511 | { | 887 | { |
512 | /* error in processing, do not process other messages! */ | 888 | /* error in processing, do not process other messages! */ |
513 | break; | 889 | break; |
@@ -515,8 +891,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
515 | } | 891 | } |
516 | if (NULL != h->inbound_notify) | 892 | if (NULL != h->inbound_notify) |
517 | h->inbound_notify (h->cls, &ntm->peer, em, | 893 | h->inbound_notify (h->cls, &ntm->peer, em, |
518 | GNUNET_TIME_relative_ntoh (ntm->latency), | 894 | NULL /* FIXME: atsi */); |
519 | ntohl (ntm->distance)); | ||
520 | break; | 895 | break; |
521 | case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND: | 896 | case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND: |
522 | if (msize < | 897 | if (msize < |
@@ -528,6 +903,14 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
528 | } | 903 | } |
529 | ntm = (const struct NotifyTrafficMessage *) msg; | 904 | ntm = (const struct NotifyTrafficMessage *) msg; |
530 | em = (const struct GNUNET_MessageHeader *) &ntm[1]; | 905 | em = (const struct GNUNET_MessageHeader *) &ntm[1]; |
906 | pr = GNUNET_CONTAINER_multihashmap_get (h->peers, | ||
907 | &ntm->peer.hashPubKey); | ||
908 | if (pr == NULL) | ||
909 | { | ||
910 | GNUNET_break (0); | ||
911 | reconnect (h); | ||
912 | return; | ||
913 | } | ||
531 | if ((GNUNET_NO == h->outbound_hdr_only) && | 914 | if ((GNUNET_NO == h->outbound_hdr_only) && |
532 | (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage))) | 915 | (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage))) |
533 | { | 916 | { |
@@ -540,159 +923,157 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
540 | break; | 923 | break; |
541 | } | 924 | } |
542 | h->outbound_notify (h->cls, &ntm->peer, em, | 925 | h->outbound_notify (h->cls, &ntm->peer, em, |
543 | GNUNET_TIME_relative_ntoh (ntm->latency), | 926 | NULL /* FIXME: atsi? */); |
544 | ntohl (ntm->distance)); | 927 | break; |
928 | case GNUNET_MESSAGE_TYPE_CORE_SEND_READY: | ||
929 | if (msize != sizeof (struct SendMessageReady)) | ||
930 | { | ||
931 | GNUNET_break (0); | ||
932 | break; | ||
933 | } | ||
934 | smr = (const struct SendMessageReady *) msg; | ||
935 | pr = GNUNET_CONTAINER_multihashmap_get (h->peers, | ||
936 | &smr->peer.hashPubKey); | ||
937 | if (pr == NULL) | ||
938 | { | ||
939 | GNUNET_break (0); | ||
940 | reconnect (h); | ||
941 | return; | ||
942 | } | ||
943 | th = pr->pending_head; | ||
944 | if (ntohs (smr->smr_id) != th->smr_id) | ||
945 | { | ||
946 | /* READY message is for expired or cancelled message, | ||
947 | ignore! (we should have already sent another request) */ | ||
948 | break; | ||
949 | } | ||
950 | if ( (pr->prev != NULL) || | ||
951 | (pr->next != NULL) || | ||
952 | (h->ready_peer_head == pr) ) | ||
953 | { | ||
954 | /* we should not already be on the ready list... */ | ||
955 | GNUNET_break (0); | ||
956 | reconnect (h); | ||
957 | return; | ||
958 | } | ||
959 | GNUNET_CONTAINER_DLL_insert (h->ready_peer_head, | ||
960 | h->ready_peer_tail, | ||
961 | pr); | ||
962 | trigger_next_request (h); | ||
963 | break; | ||
964 | case GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO: | ||
965 | if (ntohs (msg->size) != sizeof (struct ConfigurationInfoMessage)) | ||
966 | { | ||
967 | GNUNET_break (0); | ||
968 | break; | ||
969 | } | ||
970 | cim = (const struct ConfigurationInfoMessage*) msg; | ||
971 | pr = GNUNET_CONTAINER_multihashmap_get (h->peers, | ||
972 | &cim->peer.hashPubKey); | ||
973 | if (pr == NULL) | ||
974 | { | ||
975 | GNUNET_break (0); | ||
976 | reconnect (h); | ||
977 | return; | ||
978 | } | ||
979 | if (pr->rim_id != ntohl (cim->rim_id)) | ||
980 | break; | ||
981 | pcic = pr->pcic; | ||
982 | pr->pcic = NULL; | ||
983 | if (pcic != NULL) | ||
984 | pcic (pr->pcic_cls, | ||
985 | &pr->peer, | ||
986 | cim->bw_out, | ||
987 | ntohl (cim->reserved_amount), | ||
988 | GNUNET_ntohll (cim->preference)); | ||
545 | break; | 989 | break; |
546 | default: | 990 | default: |
547 | GNUNET_break (0); | 991 | GNUNET_break (0); |
548 | break; | 992 | break; |
549 | } | 993 | } |
550 | GNUNET_CLIENT_receive (h->client_notifications, | 994 | GNUNET_CLIENT_receive (h->client, |
551 | &main_notify_handler, h, GNUNET_TIME_UNIT_FOREVER_REL); | 995 | &main_notify_handler, h, GNUNET_TIME_UNIT_FOREVER_REL); |
552 | } | 996 | } |
553 | 997 | ||
554 | 998 | ||
555 | /** | 999 | /** |
556 | * Function called when we are ready to transmit our | 1000 | * Task executed once we are done transmitting the INIT message. |
557 | * "START" message (or when this operation timed out). | 1001 | * Starts our 'receive' loop. |
558 | * | 1002 | * |
559 | * @param cls closure | 1003 | * @param cls the 'struct GNUNET_CORE_Handle' |
560 | * @param size number of bytes available in buf | 1004 | * @param tc task context |
561 | * @param buf where the callee should write the message | ||
562 | * @return number of bytes written to buf | ||
563 | */ | ||
564 | static size_t transmit_start (void *cls, size_t size, void *buf); | ||
565 | |||
566 | |||
567 | /** | ||
568 | * Function called on the first message received from | ||
569 | * the service (contains our public key, etc.). | ||
570 | * Should trigger calling the init callback | ||
571 | * and then start our regular message processing. | ||
572 | * | ||
573 | * @param cls closure | ||
574 | * @param msg message received, NULL on timeout or fatal error | ||
575 | */ | 1005 | */ |
576 | static void | 1006 | static void |
577 | init_reply_handler (void *cls, const struct GNUNET_MessageHeader *msg) | 1007 | init_done_task (void *cls, |
1008 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
578 | { | 1009 | { |
579 | struct GNUNET_CORE_Handle *h = cls; | 1010 | struct GNUNET_CORE_Handle *h = cls; |
580 | const struct InitReplyMessage *m; | ||
581 | GNUNET_CORE_StartupCallback init; | ||
582 | struct GNUNET_PeerIdentity my_identity; | ||
583 | 1011 | ||
584 | if ((msg == NULL) || | 1012 | if (0 == (tc->reason & GNUNET_SCHEDULER_REASON_PREREQ_DONE)) |
585 | (ntohs (msg->size) != sizeof (struct InitReplyMessage)) || | ||
586 | (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY)) | ||
587 | { | 1013 | { |
588 | if (msg != NULL) | 1014 | if (h->client != NULL) |
589 | { | ||
590 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
591 | _ | ||
592 | ("Error connecting to core service (failed to receive `%s' message, got message of type %u and size %u).\n"), | ||
593 | "INIT_REPLY", | ||
594 | ntohs (msg->type), | ||
595 | ntohs (msg->size)); | ||
596 | GNUNET_break (0); | ||
597 | } | ||
598 | else | ||
599 | { | 1015 | { |
600 | #if DEBUG_CORE | 1016 | GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); |
601 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1017 | h->client = NULL; |
602 | _("Failed to connect to core service, will retry.\n")); | ||
603 | #endif | ||
604 | } | 1018 | } |
605 | transmit_start (h, 0, NULL); | 1019 | h->reconnect_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, |
1020 | &reconnect_task, | ||
1021 | h); | ||
606 | return; | 1022 | return; |
607 | } | 1023 | } |
608 | m = (const struct InitReplyMessage *) msg; | 1024 | GNUNET_CLIENT_receive (h->client, |
609 | GNUNET_break (0 == ntohl (m->reserved)); | 1025 | &main_notify_handler, |
610 | /* start our message processing loop */ | 1026 | h, |
611 | #if DEBUG_CORE | 1027 | GNUNET_TIME_UNIT_FOREVER_REL); |
612 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
613 | "Successfully connected to core service, starting processing loop.\n"); | ||
614 | #endif | ||
615 | h->currently_down = GNUNET_NO; | ||
616 | trigger_next_request (h); | ||
617 | GNUNET_CLIENT_receive (h->client_notifications, | ||
618 | &main_notify_handler, h, GNUNET_TIME_UNIT_FOREVER_REL); | ||
619 | if (NULL != (init = h->init)) | ||
620 | { | ||
621 | /* mark so we don't call init on reconnect */ | ||
622 | h->init = NULL; | ||
623 | GNUNET_CRYPTO_hash (&m->publicKey, | ||
624 | sizeof (struct | ||
625 | GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), | ||
626 | &my_identity.hashPubKey); | ||
627 | init (h->cls, h, &my_identity, &m->publicKey); | ||
628 | } | ||
629 | } | ||
630 | |||
631 | |||
632 | static void | ||
633 | reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
634 | { | ||
635 | struct GNUNET_CORE_Handle *h = cls; | ||
636 | h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; | ||
637 | reconnect (h); | ||
638 | } | 1028 | } |
639 | 1029 | ||
640 | 1030 | ||
641 | /** | 1031 | /** |
642 | * Function called when we are ready to transmit our | 1032 | * Our current client connection went down. Clean it up |
643 | * "START" message (or when this operation timed out). | 1033 | * and try to reconnect! |
644 | * | 1034 | * |
645 | * @param cls closure | 1035 | * @param h our handle to the core service |
646 | * @param size number of bytes available in buf | ||
647 | * @param buf where the callee should write the message | ||
648 | * @return number of bytes written to buf | ||
649 | */ | 1036 | */ |
650 | static size_t | 1037 | static void |
651 | transmit_start (void *cls, size_t size, void *buf) | 1038 | reconnect (struct GNUNET_CORE_Handle *h) |
652 | { | 1039 | { |
653 | struct GNUNET_CORE_Handle *h = cls; | 1040 | struct ControlMessage *cm; |
654 | struct InitMessage *init; | 1041 | struct InitMessage *init; |
655 | uint16_t *ts; | ||
656 | uint16_t msize; | ||
657 | uint32_t opt; | 1042 | uint32_t opt; |
1043 | uint16_t msize; | ||
1044 | uint16_t *ts; | ||
658 | unsigned int hpos; | 1045 | unsigned int hpos; |
659 | struct GNUNET_TIME_Relative delay; | ||
660 | 1046 | ||
661 | h->cth = NULL; | 1047 | #if DEBUG_CORE |
662 | if (size == 0) | 1048 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1049 | "Reconnecting to CORE service\n"); | ||
1050 | #endif | ||
1051 | if (h->client != NULL) | ||
663 | { | 1052 | { |
664 | if ((h->init == NULL) || | 1053 | GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); |
665 | (GNUNET_TIME_absolute_get ().abs_value < h->startup_timeout.abs_value)) | 1054 | h->client = NULL; |
666 | { | 1055 | GNUNET_CONTAINER_multihashmap_iterate (h->peers, |
667 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 1056 | &disconnect_and_free_peer_entry, |
668 | _("Failed to connect to core service, retrying.\n")); | 1057 | h); |
669 | delay = GNUNET_TIME_absolute_get_remaining (h->startup_timeout); | 1058 | } |
670 | if ((h->init == NULL) || (delay.rel_value > 1000)) | 1059 | h->currently_down = GNUNET_YES; |
671 | delay = GNUNET_TIME_UNIT_SECONDS; | 1060 | h->client = GNUNET_CLIENT_connect ("core", h->cfg); |
672 | if (h->init == NULL) | 1061 | if (h->client == NULL) |
673 | h->startup_timeout = | 1062 | { |
674 | GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_MINUTES); | 1063 | h->reconnect_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, |
675 | h->reconnect_task = | 1064 | &reconnect_task, |
676 | GNUNET_SCHEDULER_add_delayed (delay, &reconnect_task, h); | 1065 | h); |
677 | return 0; | 1066 | return; |
678 | } | ||
679 | /* timeout on initial connect */ | ||
680 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
681 | _("Failed to connect to core service, giving up.\n")); | ||
682 | h->init (h->cls, NULL, NULL, NULL); | ||
683 | GNUNET_CORE_disconnect (h); | ||
684 | return 0; | ||
685 | } | 1067 | } |
686 | msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage); | 1068 | msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage); |
687 | GNUNET_assert (size >= msize); | 1069 | cm = GNUNET_malloc (sizeof (struct ControlMessage) + |
688 | init = buf; | 1070 | msize); |
1071 | cm->cont = &init_done_task; | ||
1072 | cm->cont_cls = h; | ||
1073 | init = (struct InitMessage*) &cm[1]; | ||
689 | init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT); | 1074 | init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT); |
690 | init->header.size = htons (msize); | 1075 | init->header.size = htons (msize); |
691 | opt = GNUNET_CORE_OPTION_NOTHING; | 1076 | opt = GNUNET_CORE_OPTION_SEND_CONNECT | GNUNET_CORE_OPTION_SEND_DISCONNECT; |
692 | if (h->connects != NULL) | ||
693 | opt |= GNUNET_CORE_OPTION_SEND_CONNECT; | ||
694 | if (h->disconnects != NULL) | ||
695 | opt |= GNUNET_CORE_OPTION_SEND_DISCONNECT; | ||
696 | if (h->status_events != NULL) | 1077 | if (h->status_events != NULL) |
697 | opt |= GNUNET_CORE_OPTION_SEND_STATUS_CHANGE; | 1078 | opt |= GNUNET_CORE_OPTION_SEND_STATUS_CHANGE; |
698 | if (h->inbound_notify != NULL) | 1079 | if (h->inbound_notify != NULL) |
@@ -710,25 +1091,23 @@ transmit_start (void *cls, size_t size, void *buf) | |||
710 | opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND; | 1091 | opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND; |
711 | } | 1092 | } |
712 | init->options = htonl (opt); | 1093 | init->options = htonl (opt); |
713 | ts = (uint16_t *) & init[1]; | 1094 | ts = (uint16_t *) &init[1]; |
714 | for (hpos = 0; hpos < h->hcnt; hpos++) | 1095 | for (hpos = 0; hpos < h->hcnt; hpos++) |
715 | ts[hpos] = htons (h->handlers[hpos].type); | 1096 | ts[hpos] = htons (h->handlers[hpos].type); |
716 | GNUNET_CLIENT_receive (h->client_notifications, | 1097 | GNUNET_CONTAINER_DLL_insert (h->pending_head, |
717 | &init_reply_handler, | 1098 | h->pending_tail, |
718 | h, | 1099 | cm); |
719 | GNUNET_TIME_absolute_get_remaining | 1100 | trigger_next_request (h); |
720 | (h->startup_timeout)); | ||
721 | return sizeof (struct InitMessage) + h->hcnt * sizeof (uint16_t); | ||
722 | } | 1101 | } |
723 | 1102 | ||
724 | 1103 | ||
1104 | |||
725 | /** | 1105 | /** |
726 | * Connect to the core service. Note that the connection may | 1106 | * Connect to the core service. Note that the connection may |
727 | * complete (or fail) asynchronously. | 1107 | * complete (or fail) asynchronously. |
728 | * | 1108 | * |
729 | * @param cfg configuration to use | 1109 | * @param cfg configuration to use |
730 | * @param queue_size size of the per-peer message queue | 1110 | * @param queue_size size of the per-peer message queue |
731 | * @param timeout after how long should we give up trying to connect to the core service? | ||
732 | * @param cls closure for the various callbacks that follow (including handlers in the handlers array) | 1111 | * @param cls closure for the various callbacks that follow (including handlers in the handlers array) |
733 | * @param init callback to call on timeout or once we have successfully | 1112 | * @param init callback to call on timeout or once we have successfully |
734 | * connected to the core service; note that timeout is only meaningful if init is not NULL | 1113 | * connected to the core service; note that timeout is only meaningful if init is not NULL |
@@ -750,7 +1129,6 @@ transmit_start (void *cls, size_t size, void *buf) | |||
750 | struct GNUNET_CORE_Handle * | 1129 | struct GNUNET_CORE_Handle * |
751 | GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | 1130 | GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, |
752 | unsigned int queue_size, | 1131 | unsigned int queue_size, |
753 | struct GNUNET_TIME_Relative timeout, | ||
754 | void *cls, | 1132 | void *cls, |
755 | GNUNET_CORE_StartupCallback init, | 1133 | GNUNET_CORE_StartupCallback init, |
756 | GNUNET_CORE_ConnectEventHandler connects, | 1134 | GNUNET_CORE_ConnectEventHandler connects, |
@@ -766,6 +1144,7 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
766 | 1144 | ||
767 | h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle)); | 1145 | h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle)); |
768 | h->cfg = cfg; | 1146 | h->cfg = cfg; |
1147 | h->queue_size = queue_size; | ||
769 | h->cls = cls; | 1148 | h->cls = cls; |
770 | h->init = init; | 1149 | h->init = init; |
771 | h->connects = connects; | 1150 | h->connects = connects; |
@@ -776,133 +1155,57 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
776 | h->inbound_hdr_only = inbound_hdr_only; | 1155 | h->inbound_hdr_only = inbound_hdr_only; |
777 | h->outbound_hdr_only = outbound_hdr_only; | 1156 | h->outbound_hdr_only = outbound_hdr_only; |
778 | h->handlers = handlers; | 1157 | h->handlers = handlers; |
779 | h->queue_size = queue_size; | ||
780 | h->client_notifications = GNUNET_CLIENT_connect ("core", cfg); | ||
781 | if (h->client_notifications == NULL) | ||
782 | { | ||
783 | GNUNET_free (h); | ||
784 | return NULL; | ||
785 | } | ||
786 | h->startup_timeout = GNUNET_TIME_relative_to_absolute (timeout); | ||
787 | h->hcnt = 0; | 1158 | h->hcnt = 0; |
788 | while (handlers[h->hcnt].callback != NULL) | 1159 | while (handlers[h->hcnt].callback != NULL) |
789 | h->hcnt++; | 1160 | h->hcnt++; |
790 | GNUNET_assert (h->hcnt < | 1161 | GNUNET_assert (h->hcnt < |
791 | (GNUNET_SERVER_MAX_MESSAGE_SIZE - | 1162 | (GNUNET_SERVER_MAX_MESSAGE_SIZE - |
792 | sizeof (struct InitMessage)) / sizeof (uint16_t)); | 1163 | sizeof (struct InitMessage)) / sizeof (uint16_t)); |
793 | #if DEBUG_CORE | 1164 | reconnect (h); |
794 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
795 | "Trying to connect to core service in next %llu ms.\n", | ||
796 | timeout.rel_value); | ||
797 | #endif | ||
798 | h->cth = | ||
799 | GNUNET_CLIENT_notify_transmit_ready (h->client_notifications, | ||
800 | sizeof (struct InitMessage) + | ||
801 | sizeof (uint16_t) * h->hcnt, timeout, | ||
802 | GNUNET_YES, | ||
803 | &transmit_start, h); | ||
804 | return h; | 1165 | return h; |
805 | } | 1166 | } |
806 | 1167 | ||
807 | 1168 | ||
808 | /** | 1169 | /** |
809 | * Disconnect from the core service. | 1170 | * Disconnect from the core service. This function can only |
1171 | * be called *after* all pending 'GNUNET_CORE_notify_transmit_ready' | ||
1172 | * requests have been explicitly cancelled. | ||
810 | * | 1173 | * |
811 | * @param handle connection to core to disconnect | 1174 | * @param handle connection to core to disconnect |
812 | */ | 1175 | */ |
813 | void | 1176 | void |
814 | GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle) | 1177 | GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle) |
815 | { | 1178 | { |
816 | if (handle->cth != NULL) | 1179 | struct ControlMessage *cm; |
817 | GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth); | 1180 | |
818 | if (handle->solicit_transmit_req != NULL) | ||
819 | GNUNET_CORE_notify_transmit_ready_cancel (handle->solicit_transmit_req); | ||
820 | if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK) | 1181 | if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK) |
821 | GNUNET_SCHEDULER_cancel (handle->reconnect_task); | ||
822 | if (handle->client_notifications != NULL) | ||
823 | GNUNET_CLIENT_disconnect (handle->client_notifications, GNUNET_NO); | ||
824 | GNUNET_break (handle->pending_head == NULL); | ||
825 | GNUNET_free_non_null (handle->solicit_buffer); | ||
826 | GNUNET_free (handle); | ||
827 | } | ||
828 | |||
829 | |||
830 | /** | ||
831 | * Build the message requesting data transmission. | ||
832 | */ | ||
833 | static size_t | ||
834 | produce_send (void *cls, size_t size, void *buf) | ||
835 | { | ||
836 | struct GNUNET_CORE_TransmitHandle *th = cls; | ||
837 | struct GNUNET_CORE_Handle *h; | ||
838 | struct SendMessage *sm; | ||
839 | size_t dt; | ||
840 | GNUNET_CONNECTION_TransmitReadyNotify notify; | ||
841 | void *notify_cls; | ||
842 | |||
843 | h = th->ch; | ||
844 | if (buf == NULL) | ||
845 | { | 1182 | { |
846 | /* timeout or error */ | 1183 | GNUNET_SCHEDULER_cancel (handle->reconnect_task); |
847 | #if DEBUG_CORE | 1184 | handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK; |
848 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
849 | "P2P transmission request for `%4s' timed out.\n", | ||
850 | GNUNET_i2s(&th->peer)); | ||
851 | #endif | ||
852 | GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); | ||
853 | GNUNET_CORE_notify_transmit_ready_cancel (th); | ||
854 | if ((h->pending_head == th) && (h->cth != NULL)) /* Request hasn't been canceled yet! */ | ||
855 | { | ||
856 | GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth); | ||
857 | h->cth = NULL; | ||
858 | trigger_next_request (h); | ||
859 | } | ||
860 | /* Otherwise this request timed out, but another is actually queued for sending, so don't try to send another! */ | ||
861 | return 0; | ||
862 | } | 1185 | } |
863 | sm = (struct SendMessage *) buf; | 1186 | if (handle->cth != NULL) |
864 | sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND); | ||
865 | sm->priority = htonl (th->priority); | ||
866 | sm->deadline = GNUNET_TIME_absolute_hton (th->timeout); | ||
867 | sm->peer = th->peer; | ||
868 | notify = th->notify; | ||
869 | notify_cls = th->notify_cls; | ||
870 | GNUNET_CORE_notify_transmit_ready_cancel (th); | ||
871 | trigger_next_request (h); | ||
872 | size = GNUNET_MIN (size, | ||
873 | GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE); | ||
874 | GNUNET_assert (size >= sizeof (struct SendMessage)); | ||
875 | dt = notify (notify_cls, size - sizeof (struct SendMessage), &sm[1]); | ||
876 | if (0 == dt) | ||
877 | { | 1187 | { |
878 | #if DEBUG_CORE | 1188 | GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth); |
879 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1189 | handle->cth = NULL; |
880 | "Size of clients message to peer %s is 0!\n", | ||
881 | GNUNET_i2s(&sm->peer)); | ||
882 | #endif | ||
883 | /* client decided to send nothing! */ | ||
884 | return 0; | ||
885 | } | 1190 | } |
886 | #if DEBUG_CORE | 1191 | if (handle->client != NULL) |
887 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
888 | "Produced SEND message to core with %u bytes payload\n", | ||
889 | dt); | ||
890 | #endif | ||
891 | GNUNET_assert (dt >= sizeof (struct GNUNET_MessageHeader)); | ||
892 | if (dt + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) | ||
893 | { | 1192 | { |
894 | GNUNET_break (0); | 1193 | GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO); |
895 | return 0; | 1194 | handle->client = NULL; |
896 | } | 1195 | } |
897 | #if DEBUG_CORE | 1196 | while (NULL != (cm = handle->pending_head)) |
898 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1197 | { |
899 | "Preparing for P2P transmission of %u bytes to `%4s'.\n", | 1198 | GNUNET_CONTAINER_DLL_remove (handle->pending_head, |
900 | dt, | 1199 | handle->pending_tail, |
901 | GNUNET_i2s(&sm->peer)); | 1200 | cm); |
902 | #endif | 1201 | GNUNET_free (cm); |
903 | sm->header.size = htons (dt + sizeof (struct SendMessage)); | 1202 | } |
904 | GNUNET_assert (dt + sizeof (struct SendMessage) <= size); | 1203 | GNUNET_CONTAINER_multihashmap_iterate (handle->peers, |
905 | return dt + sizeof (struct SendMessage); | 1204 | &disconnect_and_free_peer_entry, |
1205 | handle); | ||
1206 | GNUNET_CONTAINER_multihashmap_destroy (handle->peers); | ||
1207 | GNUNET_break (handle->ready_peer_head == NULL); | ||
1208 | GNUNET_free (handle); | ||
906 | } | 1209 | } |
907 | 1210 | ||
908 | 1211 | ||
@@ -926,61 +1229,489 @@ produce_send (void *cls, size_t size, void *buf) | |||
926 | */ | 1229 | */ |
927 | struct GNUNET_CORE_TransmitHandle * | 1230 | struct GNUNET_CORE_TransmitHandle * |
928 | GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, | 1231 | GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, |
929 | unsigned int priority, | 1232 | uint32_t priority, |
930 | struct GNUNET_TIME_Relative maxdelay, | 1233 | struct GNUNET_TIME_Relative maxdelay, |
931 | const struct GNUNET_PeerIdentity *target, | 1234 | const struct GNUNET_PeerIdentity *target, |
932 | size_t notify_size, | 1235 | size_t notify_size, |
933 | GNUNET_CONNECTION_TransmitReadyNotify notify, | 1236 | GNUNET_CONNECTION_TransmitReadyNotify notify, |
934 | void *notify_cls) | 1237 | void *notify_cls) |
935 | { | 1238 | { |
1239 | struct PeerRecord *pr; | ||
936 | struct GNUNET_CORE_TransmitHandle *th; | 1240 | struct GNUNET_CORE_TransmitHandle *th; |
1241 | struct GNUNET_CORE_TransmitHandle *pos; | ||
1242 | struct GNUNET_CORE_TransmitHandle *prev; | ||
1243 | struct GNUNET_CORE_TransmitHandle *minp; | ||
937 | 1244 | ||
1245 | pr = GNUNET_CONTAINER_multihashmap_get (handle->peers, | ||
1246 | &target->hashPubKey); | ||
1247 | if (NULL == pr) | ||
1248 | { | ||
1249 | /* attempt to send to peer that is not connected */ | ||
1250 | GNUNET_break (0); | ||
1251 | return NULL; | ||
1252 | } | ||
938 | GNUNET_assert (notify_size + sizeof (struct SendMessage) < | 1253 | GNUNET_assert (notify_size + sizeof (struct SendMessage) < |
939 | GNUNET_SERVER_MAX_MESSAGE_SIZE); | 1254 | GNUNET_SERVER_MAX_MESSAGE_SIZE); |
940 | th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle)); | 1255 | th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle)); |
941 | th->ch = handle; | 1256 | th->peer = pr; |
942 | GNUNET_CONTAINER_DLL_insert_after (handle->pending_head, | 1257 | th->get_message = notify; |
943 | handle->pending_tail, | 1258 | th->get_message_cls = notify_cls; |
944 | handle->pending_tail, | ||
945 | th); | ||
946 | th->get_message = &produce_send; | ||
947 | th->get_message_cls = th; | ||
948 | th->notify = notify; | ||
949 | th->notify_cls = notify_cls; | ||
950 | th->peer = *target; | ||
951 | th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay); | 1259 | th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay); |
952 | th->timeout_task = GNUNET_SCHEDULER_add_delayed (maxdelay, | ||
953 | &timeout_request, th); | ||
954 | th->priority = priority; | 1260 | th->priority = priority; |
955 | th->msize = sizeof (struct SendMessage) + notify_size; | 1261 | th->msize = notify_size; |
1262 | /* bound queue size */ | ||
1263 | if (pr->queue_size == handle->queue_size) | ||
1264 | { | ||
1265 | /* find lowest-priority entry */ | ||
1266 | minp = pr->pending_head; | ||
1267 | prev = minp->next; | ||
1268 | while (prev != NULL) | ||
1269 | { | ||
1270 | if (prev->priority < minp->priority) | ||
1271 | minp = prev; | ||
1272 | prev = prev->next; | ||
1273 | } | ||
1274 | if (minp == NULL) | ||
1275 | { | ||
1276 | GNUNET_break (handle->queue_size != 0); | ||
1277 | GNUNET_break (pr->queue_size == 0); | ||
1278 | return NULL; | ||
1279 | } | ||
1280 | if (priority <= minp->priority) | ||
1281 | return NULL; /* priority too low */ | ||
1282 | GNUNET_CONTAINER_DLL_remove (pr->pending_head, | ||
1283 | pr->pending_tail, | ||
1284 | minp); | ||
1285 | pr->queue_size--; | ||
1286 | GNUNET_assert (0 == | ||
1287 | minp->get_message (minp->get_message_cls, | ||
1288 | 0, NULL)); | ||
1289 | GNUNET_free (minp); | ||
1290 | } | ||
1291 | |||
1292 | /* Order entries by deadline, but SKIP 'HEAD' if | ||
1293 | we're in the 'ready_peer_*' DLL */ | ||
1294 | pos = pr->pending_head; | ||
1295 | if ( (pr->prev != NULL) || | ||
1296 | (pr->next != NULL) || | ||
1297 | (pr == handle->ready_peer_head) ) | ||
1298 | { | ||
1299 | GNUNET_assert (pos != NULL); | ||
1300 | pos = pos->next; /* skip head */ | ||
1301 | } | ||
1302 | |||
1303 | /* insertion sort */ | ||
1304 | prev = pos; | ||
1305 | while ( (pos != NULL) && | ||
1306 | (pos->timeout.abs_value < th->timeout.abs_value) ) | ||
1307 | { | ||
1308 | prev = pos; | ||
1309 | pos = pos->next; | ||
1310 | } | ||
1311 | GNUNET_CONTAINER_DLL_insert_after (pr->pending_head, | ||
1312 | pr->pending_tail, | ||
1313 | prev, | ||
1314 | th); | ||
1315 | pr->queue_size++; | ||
956 | /* was the request queue previously empty? */ | 1316 | /* was the request queue previously empty? */ |
957 | if ( (handle->pending_head == th) && | 1317 | if (pr->pending_head == th) |
958 | (handle->cth == NULL) ) | 1318 | request_next_transmission (pr); |
959 | trigger_next_request (handle); | ||
960 | return th; | 1319 | return th; |
961 | } | 1320 | } |
962 | 1321 | ||
963 | 1322 | ||
964 | /** | 1323 | /** |
965 | * Cancel the specified transmission-ready notification. | 1324 | * Cancel the specified transmission-ready notification. |
966 | *s | 1325 | * |
967 | * @param th handle that was returned by "notify_transmit_ready". | 1326 | * @param th handle that was returned by "notify_transmit_ready". |
968 | */ | 1327 | */ |
969 | void | 1328 | void |
970 | GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle | 1329 | GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle |
971 | *th) | 1330 | *th) |
972 | { | 1331 | { |
973 | struct GNUNET_CORE_Handle *h = th->ch; | 1332 | struct PeerRecord *pr = th->peer; |
1333 | struct GNUNET_CORE_Handle *h = pr->ch; | ||
1334 | int was_head; | ||
974 | 1335 | ||
975 | if (h->submitted == th) | 1336 | was_head = (pr->pending_head == th); |
976 | h->submitted = NULL; | 1337 | GNUNET_CONTAINER_DLL_remove (pr->pending_head, |
977 | else | 1338 | pr->pending_tail, |
978 | GNUNET_CONTAINER_DLL_remove (h->pending_head, | 1339 | th); |
979 | h->pending_tail, | 1340 | if (th->cm != NULL) |
980 | th); | 1341 | { |
981 | if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK) | 1342 | /* we're currently in the control queue, remove */ |
982 | GNUNET_SCHEDULER_cancel (th->timeout_task); | 1343 | GNUNET_CONTAINER_DLL_remove (h->pending_head, |
1344 | h->pending_tail, | ||
1345 | th->cm); | ||
1346 | GNUNET_free (th->cm); | ||
1347 | } | ||
983 | GNUNET_free (th); | 1348 | GNUNET_free (th); |
1349 | if (was_head) | ||
1350 | { | ||
1351 | if ( (pr->prev != NULL) || | ||
1352 | (pr->next != NULL) || | ||
1353 | (pr == h->ready_peer_head) ) | ||
1354 | { | ||
1355 | /* the request that was 'approved' by core was | ||
1356 | cancelled before it could be transmitted; remove | ||
1357 | us from the 'ready' list */ | ||
1358 | GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, | ||
1359 | h->ready_peer_tail, | ||
1360 | pr); | ||
1361 | } | ||
1362 | request_next_transmission (pr); | ||
1363 | } | ||
1364 | } | ||
1365 | |||
1366 | |||
1367 | /* ****************** GNUNET_CORE_peer_request_connect ******************** */ | ||
1368 | |||
1369 | /** | ||
1370 | * Handle for a request to the core to connect to | ||
1371 | * a particular peer. Can be used to cancel the request | ||
1372 | * (before the 'cont'inuation is called). | ||
1373 | */ | ||
1374 | struct GNUNET_CORE_PeerRequestHandle | ||
1375 | { | ||
1376 | |||
1377 | /** | ||
1378 | * Link to control message. | ||
1379 | */ | ||
1380 | struct ControlMessage *cm; | ||
1381 | |||
1382 | /** | ||
1383 | * Core handle used. | ||
1384 | */ | ||
1385 | struct GNUNET_CORE_Handle *h; | ||
1386 | |||
1387 | /** | ||
1388 | * Continuation to run when done. | ||
1389 | */ | ||
1390 | GNUNET_SCHEDULER_Task cont; | ||
1391 | |||
1392 | /** | ||
1393 | * Closure for 'cont'. | ||
1394 | */ | ||
1395 | void *cont_cls; | ||
1396 | |||
1397 | }; | ||
1398 | |||
1399 | |||
1400 | |||
1401 | /** | ||
1402 | * Continuation called when the control message was transmitted. | ||
1403 | * Calls the original continuation and frees the remaining | ||
1404 | * resources. | ||
1405 | * | ||
1406 | * @param cls the 'struct GNUNET_CORE_PeerRequestHandle' | ||
1407 | * @param tc scheduler context | ||
1408 | */ | ||
1409 | static void | ||
1410 | peer_request_connect_cont (void *cls, | ||
1411 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
1412 | { | ||
1413 | struct GNUNET_CORE_PeerRequestHandle *ret = cls; | ||
1414 | |||
1415 | if (ret->cont != NULL) | ||
1416 | ret->cont (ret->cont_cls, tc); | ||
1417 | GNUNET_free (ret); | ||
1418 | } | ||
1419 | |||
1420 | |||
1421 | /** | ||
1422 | * Request that the core should try to connect to a particular peer. | ||
1423 | * Once the request has been transmitted to the core, the continuation | ||
1424 | * function will be called. Note that this does NOT mean that a | ||
1425 | * connection was successfully established -- it only means that the | ||
1426 | * core will now try. Successful establishment of the connection | ||
1427 | * will be signalled to the 'connects' callback argument of | ||
1428 | * 'GNUNET_CORE_connect' only. If the core service does not respond | ||
1429 | * to our connection attempt within the given time frame, 'cont' will | ||
1430 | * be called with the TIMEOUT reason code. | ||
1431 | * | ||
1432 | * @param h core handle | ||
1433 | * @param timeout how long to try to talk to core | ||
1434 | * @param peer who should we connect to | ||
1435 | * @param cont function to call once the request has been completed (or timed out) | ||
1436 | * @param cont_cls closure for cont | ||
1437 | * @return NULL on error (cont will not be called), otherwise handle for cancellation | ||
1438 | */ | ||
1439 | struct GNUNET_CORE_PeerRequestHandle * | ||
1440 | GNUNET_CORE_peer_request_connect (struct GNUNET_CORE_Handle *h, | ||
1441 | struct GNUNET_TIME_Relative timeout, | ||
1442 | const struct GNUNET_PeerIdentity * peer, | ||
1443 | GNUNET_SCHEDULER_Task cont, | ||
1444 | void *cont_cls) | ||
1445 | { | ||
1446 | struct GNUNET_CORE_PeerRequestHandle *ret; | ||
1447 | struct ControlMessage *cm; | ||
1448 | struct ConnectMessage *msg; | ||
1449 | |||
1450 | cm = GNUNET_malloc (sizeof (struct ControlMessage) + | ||
1451 | sizeof (struct ConnectMessage)); | ||
1452 | msg = (struct ConnectMessage*) &cm[1]; | ||
1453 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT); | ||
1454 | msg->header.size = htons (sizeof (struct ConnectMessage)); | ||
1455 | msg->reserved = htonl (0); | ||
1456 | msg->timeout = GNUNET_TIME_relative_hton (timeout); | ||
1457 | msg->peer = *peer; | ||
1458 | GNUNET_CONTAINER_DLL_insert (h->pending_head, | ||
1459 | h->pending_tail, | ||
1460 | cm); | ||
1461 | ret = GNUNET_malloc (sizeof (struct GNUNET_CORE_PeerRequestHandle)); | ||
1462 | ret->h = h; | ||
1463 | ret->cm = cm; | ||
1464 | ret->cont = cont; | ||
1465 | ret->cont_cls = cont_cls; | ||
1466 | cm->cont = &peer_request_connect_cont; | ||
1467 | cm->cont_cls = ret; | ||
1468 | if (h->pending_head == cm) | ||
1469 | trigger_next_request (h); | ||
1470 | return ret; | ||
1471 | } | ||
1472 | |||
1473 | |||
1474 | /** | ||
1475 | * Cancel a pending request to connect to a particular peer. Must not | ||
1476 | * be called after the 'cont' function was invoked. | ||
1477 | * | ||
1478 | * @param req request handle that was returned for the original request | ||
1479 | */ | ||
1480 | void | ||
1481 | GNUNET_CORE_peer_request_connect_cancel (struct GNUNET_CORE_PeerRequestHandle *req) | ||
1482 | { | ||
1483 | struct GNUNET_CORE_Handle *h = req->h; | ||
1484 | struct ControlMessage *cm = req->cm; | ||
1485 | |||
1486 | GNUNET_CONTAINER_DLL_remove (h->pending_head, | ||
1487 | h->pending_tail, | ||
1488 | cm); | ||
1489 | GNUNET_free (cm); | ||
1490 | GNUNET_free (req); | ||
1491 | } | ||
1492 | |||
1493 | |||
1494 | /* ****************** GNUNET_CORE_peer_change_preference ******************** */ | ||
1495 | |||
1496 | |||
1497 | struct GNUNET_CORE_InformationRequestContext | ||
1498 | { | ||
1499 | |||
1500 | /** | ||
1501 | * Our connection to the service. | ||
1502 | */ | ||
1503 | struct GNUNET_CORE_Handle *h; | ||
1504 | |||
1505 | /** | ||
1506 | * Function to call with the information. | ||
1507 | */ | ||
1508 | GNUNET_CORE_PeerConfigurationInfoCallback info; | ||
1509 | |||
1510 | /** | ||
1511 | * Closure for info. | ||
1512 | */ | ||
1513 | void *info_cls; | ||
1514 | |||
1515 | /** | ||
1516 | * Link to control message, NULL if CM was sent. | ||
1517 | */ | ||
1518 | struct ControlMessage *cm; | ||
1519 | |||
1520 | /** | ||
1521 | * Link to peer record. | ||
1522 | */ | ||
1523 | struct PeerRecord *pr; | ||
1524 | }; | ||
1525 | |||
1526 | |||
1527 | /** | ||
1528 | * CM was sent, remove link so we don't double-free. | ||
1529 | * | ||
1530 | * @param cls the 'struct GNUNET_CORE_InformationRequestContext' | ||
1531 | * @param tc scheduler context | ||
1532 | */ | ||
1533 | static void | ||
1534 | change_preference_send_continuation (void *cls, | ||
1535 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
1536 | { | ||
1537 | struct GNUNET_CORE_InformationRequestContext *irc = cls; | ||
1538 | |||
1539 | irc->cm = NULL; | ||
1540 | } | ||
1541 | |||
1542 | |||
1543 | /** | ||
1544 | * Obtain statistics and/or change preferences for the given peer. | ||
1545 | * | ||
1546 | * @param h core handle | ||
1547 | * @param peer identifies the peer | ||
1548 | * @param timeout after how long should we give up (and call "info" with NULL | ||
1549 | * for "peer" to signal an error)? | ||
1550 | * @param bw_out set to the current bandwidth limit (sending) for this peer, | ||
1551 | * caller should set "bw_out" to "-1" to avoid changing | ||
1552 | * the current value; otherwise "bw_out" will be lowered to | ||
1553 | * the specified value; passing a pointer to "0" can be used to force | ||
1554 | * us to disconnect from the peer; "bw_out" might not increase | ||
1555 | * as specified since the upper bound is generally | ||
1556 | * determined by the other peer! | ||
1557 | * @param amount reserve N bytes for receiving, negative | ||
1558 | * amounts can be used to undo a (recent) reservation; | ||
1559 | * @param preference increase incoming traffic share preference by this amount; | ||
1560 | * in the absence of "amount" reservations, we use this | ||
1561 | * preference value to assign proportional bandwidth shares | ||
1562 | * to all connected peers | ||
1563 | * @param info function to call with the resulting configuration information | ||
1564 | * @param info_cls closure for info | ||
1565 | * @return NULL on error | ||
1566 | */ | ||
1567 | struct GNUNET_CORE_InformationRequestContext * | ||
1568 | GNUNET_CORE_peer_change_preference (struct GNUNET_CORE_Handle *h, | ||
1569 | const struct GNUNET_PeerIdentity *peer, | ||
1570 | struct GNUNET_TIME_Relative timeout, | ||
1571 | struct GNUNET_BANDWIDTH_Value32NBO bw_out, | ||
1572 | int32_t amount, | ||
1573 | uint64_t preference, | ||
1574 | GNUNET_CORE_PeerConfigurationInfoCallback info, | ||
1575 | void *info_cls) | ||
1576 | { | ||
1577 | struct GNUNET_CORE_InformationRequestContext *irc; | ||
1578 | struct PeerRecord *pr; | ||
1579 | struct RequestInfoMessage *rim; | ||
1580 | struct ControlMessage *cm; | ||
1581 | |||
1582 | pr = GNUNET_CONTAINER_multihashmap_get (h->peers, | ||
1583 | &peer->hashPubKey); | ||
1584 | if (NULL == pr) | ||
1585 | { | ||
1586 | /* attempt to change preference on peer that is not connected */ | ||
1587 | GNUNET_break (0); | ||
1588 | return NULL; | ||
1589 | } | ||
1590 | if (pr->pcic != NULL) | ||
1591 | { | ||
1592 | /* second change before first one is done */ | ||
1593 | GNUNET_break (0); | ||
1594 | return NULL; | ||
1595 | } | ||
1596 | irc = GNUNET_malloc (sizeof (struct GNUNET_CORE_InformationRequestContext)); | ||
1597 | irc->h = h; | ||
1598 | irc->info = info; | ||
1599 | irc->info_cls = info_cls; | ||
1600 | cm = GNUNET_malloc (sizeof (struct ControlMessage) + | ||
1601 | sizeof (struct RequestInfoMessage)); | ||
1602 | cm->cont = &change_preference_send_continuation; | ||
1603 | cm->cont_cls = irc; | ||
1604 | irc->cm = cm; | ||
1605 | rim = (struct RequestInfoMessage*) &cm[1]; | ||
1606 | rim->header.size = htons (sizeof (struct RequestInfoMessage)); | ||
1607 | rim->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO); | ||
1608 | rim->rim_id = htonl (pr->rim_id = h->rim_id_gen++); | ||
1609 | rim->limit_outbound = bw_out; | ||
1610 | rim->reserve_inbound = htonl (amount); | ||
1611 | rim->preference_change = GNUNET_htonll(preference); | ||
1612 | rim->peer = *peer; | ||
1613 | GNUNET_CONTAINER_DLL_insert (h->pending_head, | ||
1614 | h->pending_tail, | ||
1615 | cm); | ||
1616 | pr->pcic = info; | ||
1617 | pr->pcic_cls = info_cls; | ||
1618 | return irc; | ||
1619 | } | ||
1620 | |||
1621 | |||
1622 | /** | ||
1623 | * Cancel request for getting information about a peer. | ||
1624 | * Note that an eventual change in preference, trust or bandwidth | ||
1625 | * assignment MAY have already been committed at the time, | ||
1626 | * so cancelling a request is NOT sure to undo the original | ||
1627 | * request. The original request may or may not still commit. | ||
1628 | * The only thing cancellation ensures is that the callback | ||
1629 | * from the original request will no longer be called. | ||
1630 | * | ||
1631 | * @param irc context returned by the original GNUNET_CORE_peer_get_info call | ||
1632 | */ | ||
1633 | void | ||
1634 | GNUNET_CORE_peer_change_preference_cancel (struct GNUNET_CORE_InformationRequestContext *irc) | ||
1635 | { | ||
1636 | struct GNUNET_CORE_Handle *h = irc->h; | ||
1637 | struct PeerRecord *pr = irc->pr; | ||
1638 | |||
1639 | if (irc->cm != NULL) | ||
1640 | { | ||
1641 | GNUNET_CONTAINER_DLL_remove (h->pending_head, | ||
1642 | h->pending_tail, | ||
1643 | irc->cm); | ||
1644 | GNUNET_free (irc->cm); | ||
1645 | } | ||
1646 | pr->pcic = NULL; | ||
1647 | pr->pcic_cls = NULL; | ||
1648 | GNUNET_free (irc); | ||
1649 | } | ||
1650 | |||
1651 | |||
1652 | /* ********************* GNUNET_CORE_iterate_peers *********************** */ | ||
1653 | |||
1654 | /** | ||
1655 | * Context for 'iterate_peers' helper function. | ||
1656 | */ | ||
1657 | struct IterationContext | ||
1658 | { | ||
1659 | /** | ||
1660 | * Callback to call. | ||
1661 | */ | ||
1662 | GNUNET_CORE_ConnectEventHandler peer_cb; | ||
1663 | |||
1664 | /** | ||
1665 | * Closure for 'peer_cb'. | ||
1666 | */ | ||
1667 | void *cb_cls; | ||
1668 | }; | ||
1669 | |||
1670 | |||
1671 | /** | ||
1672 | * Call callback for each peer. | ||
1673 | * | ||
1674 | * @param cls the 'struct IterationContext' | ||
1675 | * @param hc peer identity, not used | ||
1676 | * @param value the 'struct PeerRecord' | ||
1677 | * @return GNUNET_YES (continue iteration) | ||
1678 | */ | ||
1679 | static int | ||
1680 | iterate_peers (void *cls, | ||
1681 | const GNUNET_HashCode *hc, | ||
1682 | void *value) | ||
1683 | { | ||
1684 | struct IterationContext *ic = cls; | ||
1685 | struct PeerRecord *pr = value; | ||
1686 | |||
1687 | ic->peer_cb (ic->cb_cls, | ||
1688 | &pr->peer, | ||
1689 | NULL /* FIXME: pass atsi? */); | ||
1690 | return GNUNET_YES; | ||
1691 | } | ||
1692 | |||
1693 | |||
1694 | /** | ||
1695 | * Obtain statistics and/or change preferences for the given peer. | ||
1696 | * | ||
1697 | * @param h handle to core | ||
1698 | * @param peer_cb function to call with the peer information | ||
1699 | * @param cb_cls closure for peer_cb | ||
1700 | * @return GNUNET_OK if iterating, GNUNET_SYSERR on error | ||
1701 | */ | ||
1702 | int | ||
1703 | GNUNET_CORE_iterate_peers (struct GNUNET_CORE_Handle *h, | ||
1704 | GNUNET_CORE_ConnectEventHandler peer_cb, | ||
1705 | void *cb_cls) | ||
1706 | { | ||
1707 | struct IterationContext ic; | ||
1708 | |||
1709 | ic.peer_cb = peer_cb; | ||
1710 | ic.cb_cls = cb_cls; | ||
1711 | GNUNET_CONTAINER_multihashmap_iterate (h->peers, | ||
1712 | &iterate_peers, | ||
1713 | &ic); | ||
1714 | return GNUNET_OK; | ||
984 | } | 1715 | } |
985 | 1716 | ||
986 | 1717 | ||