aboutsummaryrefslogtreecommitdiff
path: root/src/core/gnunet-service-core_clients.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-10-05 13:26:24 +0000
committerChristian Grothoff <christian@grothoff.org>2011-10-05 13:26:24 +0000
commit0f29195adbd56ae10dea70c2951333c13e765f88 (patch)
tree83247d0f38a2cba50209af2325bad3d74890eb71 /src/core/gnunet-service-core_clients.c
parentf39c4e7141b1fbb4830cb24ff630a879337f98d4 (diff)
downloadgnunet-0f29195adbd56ae10dea70c2951333c13e765f88.tar.gz
gnunet-0f29195adbd56ae10dea70c2951333c13e765f88.zip
towards new core service implementation -- breaking core up into smaller modules
Diffstat (limited to 'src/core/gnunet-service-core_clients.c')
-rw-r--r--src/core/gnunet-service-core_clients.c1102
1 files changed, 1102 insertions, 0 deletions
diff --git a/src/core/gnunet-service-core_clients.c b/src/core/gnunet-service-core_clients.c
new file mode 100644
index 000000000..ffd6d294f
--- /dev/null
+++ b/src/core/gnunet-service-core_clients.c
@@ -0,0 +1,1102 @@
1
2/**
3 * Data structure for each client connected to the core service.
4 */
5struct Client
6{
7 /**
8 * Clients are kept in a linked list.
9 */
10 struct Client *next;
11
12 /**
13 * Handle for the client with the server API.
14 */
15 struct GNUNET_SERVER_Client *client_handle;
16
17 /**
18 * Array of the types of messages this peer cares
19 * about (with "tcnt" entries). Allocated as part
20 * of this client struct, do not free!
21 */
22 const uint16_t *types;
23
24 /**
25 * Map of peer identities to active transmission requests of this
26 * client to the peer (of type 'struct ClientActiveRequest').
27 */
28 struct GNUNET_CONTAINER_MultiHashMap *requests;
29
30 /**
31 * Options for messages this client cares about,
32 * see GNUNET_CORE_OPTION_ values.
33 */
34 uint32_t options;
35
36 /**
37 * Number of types of incoming messages this client
38 * specifically cares about. Size of the "types" array.
39 */
40 unsigned int tcnt;
41
42};
43
44
45/**
46 * Record kept for each request for transmission issued by a
47 * client that is still pending.
48 */
49struct ClientActiveRequest
50{
51
52 /**
53 * Active requests are kept in a doubly-linked list of
54 * the respective target peer.
55 */
56 struct ClientActiveRequest *next;
57
58 /**
59 * Active requests are kept in a doubly-linked list of
60 * the respective target peer.
61 */
62 struct ClientActiveRequest *prev;
63
64 /**
65 * Handle to the client.
66 */
67 struct Client *client;
68
69 /**
70 * By what time would the client want to see this message out?
71 */
72 struct GNUNET_TIME_Absolute deadline;
73
74 /**
75 * How important is this request.
76 */
77 uint32_t priority;
78
79 /**
80 * How many more requests does this client have?
81 */
82 uint32_t queue_size;
83
84 /**
85 * How many bytes does the client intend to send?
86 */
87 uint16_t msize;
88
89 /**
90 * Unique request ID (in big endian).
91 */
92 uint16_t smr_id;
93
94};
95
96
97
98/**
99 * Linked list of our clients.
100 */
101static struct Client *clients;
102
103/**
104 * Context for notifications we need to send to our clients.
105 */
106static struct GNUNET_SERVER_NotificationContext *notifier;
107
108
109/**
110 * Our message stream tokenizer (for encrypted payload).
111 */
112static struct GNUNET_SERVER_MessageStreamTokenizer *mst;
113
114
115
116/**
117 * Send a message to one of our clients.
118 *
119 * @param client target for the message
120 * @param msg message to transmit
121 * @param can_drop could this message be dropped if the
122 * client's queue is getting too large?
123 */
124static void
125send_to_client (struct Client *client, const struct GNUNET_MessageHeader *msg,
126 int can_drop)
127{
128#if DEBUG_CORE_CLIENT
129 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
130 "Preparing to send %u bytes of message of type %u to client.\n",
131 (unsigned int) ntohs (msg->size),
132 (unsigned int) ntohs (msg->type));
133#endif
134 GNUNET_SERVER_notification_context_unicast (notifier, client->client_handle,
135 msg, can_drop);
136}
137
138
139
140
141
142/**
143 * Send a message to all of our current clients that have
144 * the right options set.
145 *
146 * @param msg message to multicast
147 * @param can_drop can this message be discarded if the queue is too long
148 * @param options mask to use
149 */
150static void
151send_to_all_clients (const struct GNUNET_MessageHeader *msg, int can_drop,
152 int options)
153{
154 struct Client *c;
155
156 c = clients;
157 while (c != NULL)
158 {
159 if (0 != (c->options & options))
160 {
161#if DEBUG_CORE_CLIENT > 1
162 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
163 "Sending message of type %u to client.\n",
164 (unsigned int) ntohs (msg->type));
165#endif
166 send_to_client (c, msg, can_drop);
167 }
168 c = c->next;
169 }
170}
171
172
173
174/**
175 * Handle CORE_SEND_REQUEST message.
176 */
177static void
178handle_client_send_request (void *cls, struct GNUNET_SERVER_Client *client,
179 const struct GNUNET_MessageHeader *message)
180{
181 const struct SendMessageRequest *req;
182 struct Neighbour *n;
183 struct Client *c;
184 struct ClientActiveRequest *car;
185
186 req = (const struct SendMessageRequest *) message;
187 if (0 ==
188 memcmp (&req->peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
189 n = &self;
190 else
191 n = find_neighbour (&req->peer);
192 if ((n == NULL) || (GNUNET_YES != n->is_connected) ||
193 (n->status != PEER_STATE_KEY_CONFIRMED))
194 {
195 /* neighbour must have disconnected since request was issued,
196 * ignore (client will realize it once it processes the
197 * disconnect notification) */
198#if DEBUG_CORE_CLIENT
199 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
200 "Dropped client request for transmission (am disconnected)\n");
201#endif
202 GNUNET_STATISTICS_update (stats,
203 gettext_noop
204 ("# send requests dropped (disconnected)"), 1,
205 GNUNET_NO);
206 GNUNET_SERVER_receive_done (client, GNUNET_OK);
207 return;
208 }
209 c = clients;
210 while ((c != NULL) && (c->client_handle != client))
211 c = c->next;
212 if (c == NULL)
213 {
214 /* client did not send INIT first! */
215 GNUNET_break (0);
216 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
217 return;
218 }
219 if (c->requests == NULL)
220 c->requests = GNUNET_CONTAINER_multihashmap_create (16);
221#if DEBUG_CORE_CLIENT
222 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
223 "Received client transmission request. queueing\n");
224#endif
225 car = GNUNET_CONTAINER_multihashmap_get (c->requests, &req->peer.hashPubKey);
226 if (car == NULL)
227 {
228 /* create new entry */
229 car = GNUNET_malloc (sizeof (struct ClientActiveRequest));
230 GNUNET_assert (GNUNET_OK ==
231 GNUNET_CONTAINER_multihashmap_put (c->requests,
232 &req->peer.hashPubKey,
233 car,
234 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
235 GNUNET_CONTAINER_DLL_insert (n->active_client_request_head,
236 n->active_client_request_tail, car);
237 car->client = c;
238 }
239 car->deadline = GNUNET_TIME_absolute_ntoh (req->deadline);
240 car->priority = ntohl (req->priority);
241 car->queue_size = ntohl (req->queue_size);
242 car->msize = ntohs (req->size);
243 car->smr_id = req->smr_id;
244 schedule_peer_messages (n);
245 GNUNET_SERVER_receive_done (client, GNUNET_OK);
246}
247
248
249/**
250 * Notify client about an existing connection to one of our neighbours.
251 */
252static int
253notify_client_about_neighbour (void *cls, const GNUNET_HashCode * key,
254 void *value)
255{
256 struct Client *c = cls;
257 struct Neighbour *n = value;
258 size_t size;
259 char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
260 struct GNUNET_TRANSPORT_ATS_Information *ats;
261 struct ConnectNotifyMessage *cnm;
262
263 size =
264 sizeof (struct ConnectNotifyMessage) +
265 (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
266 if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
267 {
268 GNUNET_break (0);
269 /* recovery strategy: throw away performance data */
270 GNUNET_array_grow (n->ats, n->ats_count, 0);
271 size =
272 sizeof (struct ConnectNotifyMessage) +
273 (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
274 }
275 cnm = (struct ConnectNotifyMessage *) buf;
276 cnm->header.size = htons (size);
277 cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
278 cnm->ats_count = htonl (n->ats_count);
279 ats = &cnm->ats;
280 memcpy (ats, n->ats,
281 sizeof (struct GNUNET_TRANSPORT_ATS_Information) * n->ats_count);
282 ats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
283 ats[n->ats_count].value = htonl (0);
284 if (n->status == PEER_STATE_KEY_CONFIRMED)
285 {
286#if DEBUG_CORE_CLIENT
287 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n",
288 "NOTIFY_CONNECT");
289#endif
290 cnm->peer = n->peer;
291 send_to_client (c, &cnm->header, GNUNET_NO);
292 }
293 return GNUNET_OK;
294}
295
296
297
298/**
299 * Handle CORE_INIT request.
300 */
301static void
302handle_client_init (void *cls, struct GNUNET_SERVER_Client *client,
303 const struct GNUNET_MessageHeader *message)
304{
305 const struct InitMessage *im;
306 struct InitReplyMessage irm;
307 struct Client *c;
308 uint16_t msize;
309 const uint16_t *types;
310 uint16_t *wtypes;
311 unsigned int i;
312
313#if DEBUG_CORE_CLIENT
314 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
315 "Client connecting to core service with `%s' message\n", "INIT");
316#endif
317 /* check that we don't have an entry already */
318 c = clients;
319 while (c != NULL)
320 {
321 if (client == c->client_handle)
322 {
323 GNUNET_break (0);
324 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
325 return;
326 }
327 c = c->next;
328 }
329 msize = ntohs (message->size);
330 if (msize < sizeof (struct InitMessage))
331 {
332 GNUNET_break (0);
333 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
334 return;
335 }
336 GNUNET_SERVER_notification_context_add (notifier, client);
337 im = (const struct InitMessage *) message;
338 types = (const uint16_t *) &im[1];
339 msize -= sizeof (struct InitMessage);
340 c = GNUNET_malloc (sizeof (struct Client) + msize);
341 c->client_handle = client;
342 c->next = clients;
343 clients = c;
344 c->tcnt = msize / sizeof (uint16_t);
345 c->types = (const uint16_t *) &c[1];
346 wtypes = (uint16_t *) & c[1];
347 for (i = 0; i < c->tcnt; i++)
348 {
349 wtypes[i] = ntohs (types[i]);
350 my_type_map[wtypes[i] / 32] |= (1 << (wtypes[i] % 32));
351 }
352 if (c->tcnt > 0)
353 broadcast_my_type_map ();
354 c->options = ntohl (im->options);
355#if DEBUG_CORE_CLIENT
356 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
357 "Client %p is interested in %u message types\n", c,
358 (unsigned int) c->tcnt);
359#endif
360 /* send init reply message */
361 irm.header.size = htons (sizeof (struct InitReplyMessage));
362 irm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY);
363 irm.reserved = htonl (0);
364 memcpy (&irm.publicKey, &my_public_key,
365 sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
366#if DEBUG_CORE_CLIENT
367 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n",
368 "INIT_REPLY");
369#endif
370 send_to_client (c, &irm.header, GNUNET_NO);
371 if (0 != (c->options & GNUNET_CORE_OPTION_SEND_CONNECT))
372 {
373 /* notify new client about existing neighbours */
374 GNUNET_CONTAINER_multihashmap_iterate (neighbours,
375 &notify_client_about_neighbour, c);
376 }
377 GNUNET_SERVER_receive_done (client, GNUNET_OK);
378}
379
380
381/**
382 * Free client request records.
383 *
384 * @param cls NULL
385 * @param key identity of peer for which this is an active request
386 * @param value the 'struct ClientActiveRequest' to free
387 * @return GNUNET_YES (continue iteration)
388 */
389static int
390destroy_active_client_request (void *cls, const GNUNET_HashCode * key,
391 void *value)
392{
393 struct ClientActiveRequest *car = value;
394 struct Neighbour *n;
395 struct GNUNET_PeerIdentity peer;
396
397 peer.hashPubKey = *key;
398 n = find_neighbour (&peer);
399 GNUNET_assert (NULL != n);
400 GNUNET_CONTAINER_DLL_remove (n->active_client_request_head,
401 n->active_client_request_tail, car);
402 GNUNET_free (car);
403 return GNUNET_YES;
404}
405
406
407/**
408 * A client disconnected, clean up.
409 *
410 * @param cls closure
411 * @param client identification of the client
412 */
413static void
414handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
415{
416 struct Client *pos;
417 struct Client *prev;
418 unsigned int i;
419 const uint16_t *wtypes;
420
421 if (client == NULL)
422 return;
423#if DEBUG_CORE_CLIENT
424 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
425 "Client %p has disconnected from core service.\n", client);
426#endif
427 prev = NULL;
428 pos = clients;
429 while (pos != NULL)
430 {
431 if (client == pos->client_handle)
432 break;
433 prev = pos;
434 pos = pos->next;
435 }
436 if (pos == NULL)
437 {
438 /* client never sent INIT */
439 return;
440 }
441 if (prev == NULL)
442 clients = pos->next;
443 else
444 prev->next = pos->next;
445 if (pos->requests != NULL)
446 {
447 GNUNET_CONTAINER_multihashmap_iterate (pos->requests,
448 &destroy_active_client_request,
449 NULL);
450 GNUNET_CONTAINER_multihashmap_destroy (pos->requests);
451 }
452 GNUNET_free (pos);
453
454 /* rebuild my_type_map */
455 memset (my_type_map, 0, sizeof (my_type_map));
456 for (pos = clients; NULL != pos; pos = pos->next)
457 {
458 wtypes = (const uint16_t *) &pos[1];
459 for (i = 0; i < pos->tcnt; i++)
460 my_type_map[wtypes[i] / 32] |= (1 << (wtypes[i] % 32));
461 }
462 broadcast_my_type_map ();
463}
464
465
466
467
468
469/**
470 * Handle CORE_SEND request.
471 *
472 * @param cls unused
473 * @param client the client issuing the request
474 * @param message the "struct SendMessage"
475 */
476static void
477handle_client_send (void *cls, struct GNUNET_SERVER_Client *client,
478 const struct GNUNET_MessageHeader *message)
479{
480 const struct SendMessage *sm;
481 struct Neighbour *n;
482 struct MessageEntry *prev;
483 struct MessageEntry *pos;
484 struct MessageEntry *e;
485 struct MessageEntry *min_prio_entry;
486 struct MessageEntry *min_prio_prev;
487 unsigned int min_prio;
488 unsigned int queue_size;
489 uint16_t msize;
490
491 msize = ntohs (message->size);
492 if (msize <
493 sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
494 {
495 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
496 "msize is %u, should be at least %u (in %s:%d)\n", msize,
497 sizeof (struct SendMessage) +
498 sizeof (struct GNUNET_MessageHeader), __FILE__, __LINE__);
499 GNUNET_break (0);
500 if (client != NULL)
501 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
502 return;
503 }
504 sm = (const struct SendMessage *) message;
505 msize -= sizeof (struct SendMessage);
506 if (0 ==
507 memcmp (&sm->peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
508 {
509 /* loopback */
510 GNUNET_SERVER_mst_receive (mst, &self, (const char *) &sm[1], msize,
511 GNUNET_YES, GNUNET_NO);
512 if (client != NULL)
513 GNUNET_SERVER_receive_done (client, GNUNET_OK);
514 return;
515 }
516 n = find_neighbour (&sm->peer);
517 if ((n == NULL) || (GNUNET_YES != n->is_connected) ||
518 (n->status != PEER_STATE_KEY_CONFIRMED))
519 {
520 /* attempt to send message to peer that is not connected anymore
521 * (can happen due to asynchrony) */
522 GNUNET_STATISTICS_update (stats,
523 gettext_noop
524 ("# messages discarded (disconnected)"), 1,
525 GNUNET_NO);
526 if (client != NULL)
527 GNUNET_SERVER_receive_done (client, GNUNET_OK);
528 return;
529 }
530#if DEBUG_CORE
531 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
532 "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n",
533 "SEND", (unsigned int) msize, GNUNET_i2s (&sm->peer));
534#endif
535 discard_expired_messages (n);
536 /* bound queue size */
537 /* NOTE: this entire block to bound the queue size should be
538 * obsolete with the new client-request code and the
539 * 'schedule_peer_messages' mechanism; we still have this code in
540 * here for now as a sanity check for the new mechanmism;
541 * ultimately, we should probably simply reject SEND messages that
542 * are not 'approved' (or provide a new core API for very unreliable
543 * delivery that always sends with priority 0). Food for thought. */
544 min_prio = UINT32_MAX;
545 min_prio_entry = NULL;
546 min_prio_prev = NULL;
547 queue_size = 0;
548 prev = NULL;
549 pos = n->messages;
550 while (pos != NULL)
551 {
552 if (pos->priority <= min_prio)
553 {
554 min_prio_entry = pos;
555 min_prio_prev = prev;
556 min_prio = pos->priority;
557 }
558 queue_size++;
559 prev = pos;
560 pos = pos->next;
561 }
562 if (queue_size >= MAX_PEER_QUEUE_SIZE)
563 {
564 /* queue full */
565 if (ntohl (sm->priority) <= min_prio)
566 {
567 /* discard new entry; this should no longer happen! */
568 GNUNET_break (0);
569#if DEBUG_CORE
570 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
571 "Queue full (%u/%u), discarding new request (%u bytes of type %u)\n",
572 queue_size, (unsigned int) MAX_PEER_QUEUE_SIZE,
573 (unsigned int) msize, (unsigned int) ntohs (message->type));
574#endif
575 GNUNET_STATISTICS_update (stats,
576 gettext_noop ("# discarded CORE_SEND requests"),
577 1, GNUNET_NO);
578
579 if (client != NULL)
580 GNUNET_SERVER_receive_done (client, GNUNET_OK);
581 return;
582 }
583 GNUNET_assert (min_prio_entry != NULL);
584 /* discard "min_prio_entry" */
585#if DEBUG_CORE
586 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
587 "Queue full, discarding existing older request\n");
588#endif
589 GNUNET_STATISTICS_update (stats,
590 gettext_noop
591 ("# discarded lower priority CORE_SEND requests"),
592 1, GNUNET_NO);
593 if (min_prio_prev == NULL)
594 n->messages = min_prio_entry->next;
595 else
596 min_prio_prev->next = min_prio_entry->next;
597 GNUNET_free (min_prio_entry);
598 }
599
600#if DEBUG_CORE
601 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
602 "Adding transmission request for `%4s' of size %u to queue\n",
603 GNUNET_i2s (&sm->peer), (unsigned int) msize);
604#endif
605 GNUNET_break (0 == ntohl (sm->reserved));
606 e = GNUNET_malloc (sizeof (struct MessageEntry) + msize);
607 e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline);
608 e->priority = ntohl (sm->priority);
609 e->size = msize;
610 if (GNUNET_YES != (int) ntohl (sm->cork))
611 e->got_slack = GNUNET_YES;
612 memcpy (&e[1], &sm[1], msize);
613
614 /* insert, keep list sorted by deadline */
615 prev = NULL;
616 pos = n->messages;
617 while ((pos != NULL) && (pos->deadline.abs_value < e->deadline.abs_value))
618 {
619 prev = pos;
620 pos = pos->next;
621 }
622 if (prev == NULL)
623 n->messages = e;
624 else
625 prev->next = e;
626 e->next = pos;
627
628 /* consider scheduling now */
629 process_plaintext_neighbour_queue (n);
630 if (client != NULL)
631 GNUNET_SERVER_receive_done (client, GNUNET_OK);
632}
633
634
635/**
636 * Handle CORE_REQUEST_CONNECT request.
637 *
638 * @param cls unused
639 * @param client the client issuing the request
640 * @param message the "struct ConnectMessage"
641 */
642static void
643handle_client_request_connect (void *cls, struct GNUNET_SERVER_Client *client,
644 const struct GNUNET_MessageHeader *message)
645{
646 const struct ConnectMessage *cm = (const struct ConnectMessage *) message;
647 struct Neighbour *n;
648
649 if (0 ==
650 memcmp (&cm->peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
651 {
652 /* In this case a client has asked us to connect to ourselves, not really an error! */
653 GNUNET_SERVER_receive_done (client, GNUNET_OK);
654 return;
655 }
656 GNUNET_break (ntohl (cm->reserved) == 0);
657#if DEBUG_CORE
658 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
659 "Core received `%s' request for `%4s', will try to establish connection\n",
660 "REQUEST_CONNECT", GNUNET_i2s (&cm->peer));
661#endif
662 GNUNET_STATISTICS_update (stats,
663 gettext_noop ("# connection requests received"), 1,
664 GNUNET_NO);
665 GNUNET_SERVER_receive_done (client, GNUNET_OK);
666 n = find_neighbour (&cm->peer);
667 if ((n == NULL) || (GNUNET_YES != n->is_connected))
668 {
669 GNUNET_TRANSPORT_try_connect (transport, &cm->peer);
670 }
671 else
672 {
673 GNUNET_STATISTICS_update (stats,
674 gettext_noop
675 ("# connection requests ignored (already connected)"),
676 1, GNUNET_NO);
677 }
678}
679
680
681
682/**
683 * Helper function for handle_client_iterate_peers.
684 *
685 * @param cls the 'struct GNUNET_SERVER_TransmitContext' to queue replies
686 * @param key identity of the connected peer
687 * @param value the 'struct Neighbour' for the peer
688 * @return GNUNET_OK (continue to iterate)
689 */
690static int
691queue_connect_message (void *cls, const GNUNET_HashCode * key, void *value)
692{
693 struct GNUNET_SERVER_TransmitContext *tc = cls;
694 struct Neighbour *n = value;
695 char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1];
696 struct GNUNET_TRANSPORT_ATS_Information *ats;
697 size_t size;
698 struct ConnectNotifyMessage *cnm;
699
700 cnm = (struct ConnectNotifyMessage *) buf;
701 if (n->status != PEER_STATE_KEY_CONFIRMED)
702 return GNUNET_OK;
703 size =
704 sizeof (struct ConnectNotifyMessage) +
705 (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
706 if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
707 {
708 GNUNET_break (0);
709 /* recovery strategy: throw away performance data */
710 GNUNET_array_grow (n->ats, n->ats_count, 0);
711 size =
712 sizeof (struct PeerStatusNotifyMessage) +
713 n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
714 }
715 cnm = (struct ConnectNotifyMessage *) buf;
716 cnm->header.size = htons (size);
717 cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
718 cnm->ats_count = htonl (n->ats_count);
719 ats = &cnm->ats;
720 memcpy (ats, n->ats,
721 n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
722 ats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
723 ats[n->ats_count].value = htonl (0);
724#if DEBUG_CORE_CLIENT
725 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n",
726 "NOTIFY_CONNECT");
727#endif
728 cnm->peer = n->peer;
729 GNUNET_SERVER_transmit_context_append_message (tc, &cnm->header);
730 return GNUNET_OK;
731}
732
733
734/**
735 * Handle CORE_ITERATE_PEERS request.
736 *
737 * @param cls unused
738 * @param client client sending the iteration request
739 * @param message iteration request message
740 */
741static void
742handle_client_iterate_peers (void *cls, struct GNUNET_SERVER_Client *client,
743 const struct GNUNET_MessageHeader *message)
744{
745 struct GNUNET_MessageHeader done_msg;
746 struct GNUNET_SERVER_TransmitContext *tc;
747 int msize;
748
749 /* notify new client about existing neighbours */
750
751 msize = ntohs (message->size);
752 tc = GNUNET_SERVER_transmit_context_create (client);
753 if (msize == sizeof (struct GNUNET_MessageHeader))
754 GNUNET_CONTAINER_multihashmap_iterate (neighbours, &queue_connect_message,
755 tc);
756 else
757 GNUNET_break (0);
758
759 done_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
760 done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END);
761 GNUNET_SERVER_transmit_context_append_message (tc, &done_msg);
762 GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
763}
764
765
766/**
767 * Handle CORE_PEER_CONNECTED request. Notify client about existing neighbours.
768 *
769 * @param cls unused
770 * @param client client sending the iteration request
771 * @param message iteration request message
772 */
773static void
774handle_client_have_peer (void *cls, struct GNUNET_SERVER_Client *client,
775 const struct GNUNET_MessageHeader *message)
776{
777 struct GNUNET_MessageHeader done_msg;
778 struct GNUNET_SERVER_TransmitContext *tc;
779 struct GNUNET_PeerIdentity *peer;
780
781 tc = GNUNET_SERVER_transmit_context_create (client);
782 peer = (struct GNUNET_PeerIdentity *) &message[1];
783 GNUNET_CONTAINER_multihashmap_get_multiple (neighbours, &peer->hashPubKey,
784 &queue_connect_message, tc);
785 done_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
786 done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END);
787 GNUNET_SERVER_transmit_context_append_message (tc, &done_msg);
788 GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
789}
790
791
792/**
793 * Handle REQUEST_INFO request.
794 *
795 * @param cls unused
796 * @param client client sending the request
797 * @param message iteration request message
798 */
799static void
800handle_client_request_info (void *cls, struct GNUNET_SERVER_Client *client,
801 const struct GNUNET_MessageHeader *message)
802{
803 const struct RequestInfoMessage *rcm;
804 struct Client *pos;
805 struct Neighbour *n;
806 struct ConfigurationInfoMessage cim;
807 int32_t want_reserv;
808 int32_t got_reserv;
809 unsigned long long old_preference;
810 struct GNUNET_TIME_Relative rdelay;
811
812 rdelay = GNUNET_TIME_relative_get_zero ();
813#if DEBUG_CORE_CLIENT
814 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Core service receives `%s' request.\n",
815 "REQUEST_INFO");
816#endif
817 pos = clients;
818 while (pos != NULL)
819 {
820 if (client == pos->client_handle)
821 break;
822 pos = pos->next;
823 }
824 if (pos == NULL)
825 {
826 GNUNET_break (0);
827 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
828 return;
829 }
830
831 rcm = (const struct RequestInfoMessage *) message;
832 n = find_neighbour (&rcm->peer);
833 memset (&cim, 0, sizeof (cim));
834 if ((n != NULL) && (GNUNET_YES == n->is_connected))
835 {
836 want_reserv = ntohl (rcm->reserve_inbound);
837 if (n->bw_out_internal_limit.value__ != rcm->limit_outbound.value__)
838 {
839 n->bw_out_internal_limit = rcm->limit_outbound;
840 if (n->bw_out.value__ !=
841 GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
842 n->bw_out_external_limit).value__)
843 {
844 n->bw_out =
845 GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit,
846 n->bw_out_external_limit);
847 GNUNET_BANDWIDTH_tracker_update_quota (&n->available_recv_window,
848 n->bw_out);
849 GNUNET_TRANSPORT_set_quota (transport, &n->peer, n->bw_in, n->bw_out);
850 handle_peer_status_change (n);
851 }
852 }
853 if (want_reserv < 0)
854 {
855 got_reserv = want_reserv;
856 }
857 else if (want_reserv > 0)
858 {
859 rdelay =
860 GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window,
861 want_reserv);
862 if (rdelay.rel_value == 0)
863 got_reserv = want_reserv;
864 else
865 got_reserv = 0; /* all or nothing */
866 }
867 else
868 got_reserv = 0;
869 GNUNET_BANDWIDTH_tracker_consume (&n->available_recv_window, got_reserv);
870 old_preference = n->current_preference;
871 n->current_preference += GNUNET_ntohll (rcm->preference_change);
872 if (old_preference > n->current_preference)
873 {
874 /* overflow; cap at maximum value */
875 n->current_preference = ULLONG_MAX;
876 }
877 update_preference_sum (n->current_preference - old_preference);
878#if DEBUG_CORE_QUOTA
879 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
880 "Received reservation request for %d bytes for peer `%4s', reserved %d bytes, suggesting delay of %llu ms\n",
881 (int) want_reserv, GNUNET_i2s (&rcm->peer), (int) got_reserv,
882 (unsigned long long) rdelay.rel_value);
883#endif
884 cim.reserved_amount = htonl (got_reserv);
885 cim.reserve_delay = GNUNET_TIME_relative_hton (rdelay);
886 cim.bw_out = n->bw_out;
887 cim.preference = n->current_preference;
888 }
889 else
890 {
891 /* Technically, this COULD happen (due to asynchronous behavior),
892 * but it should be rare, so we should generate an info event
893 * to help diagnosis of serious errors that might be masked by this */
894 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
895 _
896 ("Client asked for preference change with peer `%s', which is not connected!\n"),
897 GNUNET_i2s (&rcm->peer));
898 GNUNET_SERVER_receive_done (client, GNUNET_OK);
899 return;
900 }
901 cim.header.size = htons (sizeof (struct ConfigurationInfoMessage));
902 cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO);
903 cim.peer = rcm->peer;
904 cim.rim_id = rcm->rim_id;
905#if DEBUG_CORE_CLIENT
906 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n",
907 "CONFIGURATION_INFO");
908#endif
909 send_to_client (pos, &cim.header, GNUNET_NO);
910 GNUNET_SERVER_receive_done (client, GNUNET_OK);
911}
912
913
914
915
916/**
917 * Send a P2P message to a client.
918 *
919 * @param sender who sent us the message?
920 * @param client who should we give the message to?
921 * @param m contains the message to transmit
922 * @param msize number of bytes in buf to transmit
923 */
924static void
925send_p2p_message_to_client (struct Neighbour *sender, struct Client *client,
926 const void *m, size_t msize)
927{
928 size_t size =
929 msize + sizeof (struct NotifyTrafficMessage) +
930 (sender->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
931 char buf[size];
932 struct NotifyTrafficMessage *ntm;
933 struct GNUNET_TRANSPORT_ATS_Information *ats;
934
935 GNUNET_assert (GNUNET_YES == sender->is_connected);
936 GNUNET_break (sender->status == PEER_STATE_KEY_CONFIRMED);
937 if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
938 {
939 GNUNET_break (0);
940 /* recovery strategy: throw performance data away... */
941 GNUNET_array_grow (sender->ats, sender->ats_count, 0);
942 size =
943 msize + sizeof (struct NotifyTrafficMessage) +
944 (sender->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information);
945 }
946#if DEBUG_CORE
947 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
948 "Core service passes message from `%4s' of type %u to client.\n",
949 GNUNET_i2s (&sender->peer),
950 (unsigned int)
951 ntohs (((const struct GNUNET_MessageHeader *) m)->type));
952#endif
953 ntm = (struct NotifyTrafficMessage *) buf;
954 ntm->header.size = htons (size);
955 ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND);
956 ntm->ats_count = htonl (sender->ats_count);
957 ntm->peer = sender->peer;
958 ats = &ntm->ats;
959 memcpy (ats, sender->ats,
960 sizeof (struct GNUNET_TRANSPORT_ATS_Information) * sender->ats_count);
961 ats[sender->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
962 ats[sender->ats_count].value = htonl (0);
963 memcpy (&ats[sender->ats_count + 1], m, msize);
964 send_to_client (client, &ntm->header, GNUNET_YES);
965}
966
967
968
969
970/**
971 * Deliver P2P message to interested clients.
972 *
973 * @param cls always NULL
974 * @param client who sent us the message (struct Neighbour)
975 * @param m the message
976 */
977static void
978deliver_message (void *cls, void *client, const struct GNUNET_MessageHeader *m)
979{
980 struct Neighbour *sender = client;
981 size_t msize = ntohs (m->size);
982 char buf[256];
983 struct Client *cpos;
984 uint16_t type;
985 unsigned int tpos;
986 int deliver_full;
987 int dropped;
988
989 GNUNET_break (sender->status == PEER_STATE_KEY_CONFIRMED);
990 type = ntohs (m->type);
991#if DEBUG_CORE > 1
992 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
993 "Received encapsulated message of type %u and size %u from `%4s'\n",
994 (unsigned int) type, ntohs (m->size), GNUNET_i2s (&sender->peer));
995#endif
996 GNUNET_snprintf (buf, sizeof (buf),
997 gettext_noop ("# bytes of messages of type %u received"),
998 (unsigned int) type);
999 GNUNET_STATISTICS_update (stats, buf, msize, GNUNET_NO);
1000 if ((GNUNET_MESSAGE_TYPE_CORE_BINARY_TYPE_MAP == type) ||
1001 (GNUNET_MESSAGE_TYPE_CORE_COMPRESSED_TYPE_MAP == type))
1002 {
1003 /* FIXME: update message type map for 'Neighbour' */
1004 return;
1005 }
1006 dropped = GNUNET_YES;
1007 cpos = clients;
1008 while (cpos != NULL)
1009 {
1010 deliver_full = GNUNET_NO;
1011 if (0 != (cpos->options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND))
1012 deliver_full = GNUNET_YES;
1013 else
1014 {
1015 for (tpos = 0; tpos < cpos->tcnt; tpos++)
1016 {
1017 if (type != cpos->types[tpos])
1018 continue;
1019 deliver_full = GNUNET_YES;
1020 break;
1021 }
1022 }
1023 if (GNUNET_YES == deliver_full)
1024 {
1025 send_p2p_message_to_client (sender, cpos, m, msize);
1026 dropped = GNUNET_NO;
1027 }
1028 else if (cpos->options & GNUNET_CORE_OPTION_SEND_HDR_INBOUND)
1029 {
1030 send_p2p_message_to_client (sender, cpos, m,
1031 sizeof (struct GNUNET_MessageHeader));
1032 }
1033 cpos = cpos->next;
1034 }
1035 if (dropped == GNUNET_YES)
1036 {
1037#if DEBUG_CORE
1038 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1039 "Message of type %u from `%4s' not delivered to any client.\n",
1040 (unsigned int) type, GNUNET_i2s (&sender->peer));
1041#endif
1042 GNUNET_STATISTICS_update (stats,
1043 gettext_noop
1044 ("# messages not delivered to any client"), 1,
1045 GNUNET_NO);
1046 }
1047}
1048
1049
1050
1051void
1052GSC_CLIENTS_init (struct GNUNET_SERVER_Handle *server)
1053{
1054 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1055 {&handle_client_init, NULL,
1056 GNUNET_MESSAGE_TYPE_CORE_INIT, 0},
1057 {&handle_client_iterate_peers, NULL,
1058 GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS,
1059 sizeof (struct GNUNET_MessageHeader)},
1060 {&handle_client_have_peer, NULL,
1061 GNUNET_MESSAGE_TYPE_CORE_PEER_CONNECTED,
1062 sizeof (struct GNUNET_MessageHeader) +
1063 sizeof (struct GNUNET_PeerIdentity)},
1064 {&handle_client_request_info, NULL,
1065 GNUNET_MESSAGE_TYPE_CORE_REQUEST_INFO,
1066 sizeof (struct RequestInfoMessage)},
1067 {&handle_client_send_request, NULL,
1068 GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST,
1069 sizeof (struct SendMessageRequest)},
1070 {&handle_client_send, NULL,
1071 GNUNET_MESSAGE_TYPE_CORE_SEND, 0},
1072 {&handle_client_request_connect, NULL,
1073 GNUNET_MESSAGE_TYPE_CORE_REQUEST_CONNECT,
1074 sizeof (struct ConnectMessage)},
1075 {NULL, NULL, 0, 0}
1076 };
1077
1078 /* setup notification */
1079 notifier =
1080 GNUNET_SERVER_notification_context_create (server, MAX_NOTIFY_QUEUE);
1081 GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
1082 GNUNET_SERVER_add_handlers (server, handlers);
1083 mst = GNUNET_SERVER_mst_create (&deliver_message, NULL);
1084}
1085
1086
1087void
1088GSC_CLIENTS_done ()
1089{
1090 struct Client *c;
1091
1092 while (NULL != (c = clients))
1093 handle_client_disconnect (NULL, c->client_handle);
1094 GNUNET_SERVER_notification_context_destroy (notifier);
1095 notifier = NULL;
1096 if (mst != NULL)
1097 {
1098 GNUNET_SERVER_mst_destroy (mst);
1099 mst = NULL;
1100 }
1101
1102}