diff options
author | Christian Grothoff <christian@grothoff.org> | 2018-11-15 15:43:41 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2018-11-15 15:43:41 +0100 |
commit | 700359ed8ce18fd6ddfc0940a5d0e5ba145c1fd1 (patch) | |
tree | d6122fa6aee9c1d9de4aebb29158054964d84ea3 /src/transport | |
parent | c359c3f7d928abf47a0c29d735bb7e30fb55e4dc (diff) | |
download | gnunet-700359ed8ce18fd6ddfc0940a5d0e5ba145c1fd1.tar.gz gnunet-700359ed8ce18fd6ddfc0940a5d0e5ba145c1fd1.zip |
getting data structures in place for gnunet-service-tng
Diffstat (limited to 'src/transport')
-rw-r--r-- | src/transport/gnunet-service-tng.c | 518 |
1 files changed, 491 insertions, 27 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 73b295442..1e638377a 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c | |||
@@ -75,6 +75,147 @@ enum ClientType | |||
75 | /** | 75 | /** |
76 | * Client connected to the transport service. | 76 | * Client connected to the transport service. |
77 | */ | 77 | */ |
78 | struct TransportClient; | ||
79 | |||
80 | |||
81 | /** | ||
82 | * A neighbour that at least one communicator is connected to. | ||
83 | */ | ||
84 | struct Neighbour; | ||
85 | |||
86 | |||
87 | /** | ||
88 | * List of available queues for a particular neighbour. | ||
89 | */ | ||
90 | struct Queue | ||
91 | { | ||
92 | /** | ||
93 | * Kept in a MDLL. | ||
94 | */ | ||
95 | struct Queue *next_neighbour; | ||
96 | |||
97 | /** | ||
98 | * Kept in a MDLL. | ||
99 | */ | ||
100 | struct Queue *prev_neighbour; | ||
101 | |||
102 | /** | ||
103 | * Kept in a MDLL. | ||
104 | */ | ||
105 | struct Queue *prev_client; | ||
106 | |||
107 | /** | ||
108 | * Kept in a MDLL. | ||
109 | */ | ||
110 | struct Queue *next_client; | ||
111 | |||
112 | /** | ||
113 | * Which neighbour is this queue for? | ||
114 | */ | ||
115 | struct Neighbour *neighbour; | ||
116 | |||
117 | /** | ||
118 | * Which communicator offers this queue? | ||
119 | */ | ||
120 | struct TransportClient *tc; | ||
121 | |||
122 | /** | ||
123 | * Unique identifier of this queue with the communicator. | ||
124 | */ | ||
125 | uint32_t qid; | ||
126 | |||
127 | /** | ||
128 | * Network type offered by this queue. | ||
129 | */ | ||
130 | enum GNUNET_ATS_Network_Type nt; | ||
131 | |||
132 | /** | ||
133 | * Address served by the queue. | ||
134 | */ | ||
135 | const char *address; | ||
136 | }; | ||
137 | |||
138 | |||
139 | /** | ||
140 | * A neighbour that at least one communicator is connected to. | ||
141 | */ | ||
142 | struct Neighbour | ||
143 | { | ||
144 | |||
145 | /** | ||
146 | * Which peer is this about? | ||
147 | */ | ||
148 | struct GNUNET_PeerIdentity pid; | ||
149 | |||
150 | /** | ||
151 | * Head of list of messages pending for this neighbour. | ||
152 | */ | ||
153 | struct PendingMessage *pending_msg_head; | ||
154 | |||
155 | /** | ||
156 | * Tail of list of messages pending for this neighbour. | ||
157 | */ | ||
158 | struct PendingMessage *pending_msg_tail; | ||
159 | |||
160 | /** | ||
161 | * Head of DLL of queues to this peer. | ||
162 | */ | ||
163 | struct Queue *queue_head; | ||
164 | |||
165 | /** | ||
166 | * Tail of DLL of queues to this peer. | ||
167 | */ | ||
168 | struct Queue *queue_tail; | ||
169 | |||
170 | }; | ||
171 | |||
172 | |||
173 | /** | ||
174 | * Transmission request from CORE that is awaiting delivery. | ||
175 | */ | ||
176 | struct PendingMessage | ||
177 | { | ||
178 | /** | ||
179 | * Kept in a MDLL of messages for this @a target. | ||
180 | */ | ||
181 | struct PendingMessage *next_neighbour; | ||
182 | |||
183 | /** | ||
184 | * Kept in a MDLL of messages for this @a target. | ||
185 | */ | ||
186 | struct PendingMessage *prev_neighbour; | ||
187 | |||
188 | /** | ||
189 | * Kept in a MDLL of messages from this @a client. | ||
190 | */ | ||
191 | struct PendingMessage *next_client; | ||
192 | |||
193 | /** | ||
194 | * Kept in a MDLL of messages from this @a client. | ||
195 | */ | ||
196 | struct PendingMessage *prev_client; | ||
197 | |||
198 | /** | ||
199 | * Target of the request. | ||
200 | */ | ||
201 | struct Neighbour *target; | ||
202 | |||
203 | /** | ||
204 | * Client that issued the transmission request. | ||
205 | */ | ||
206 | struct TransportClient *client; | ||
207 | |||
208 | /** | ||
209 | * Size of the original message. | ||
210 | */ | ||
211 | uint32_t bytes_msg; | ||
212 | |||
213 | }; | ||
214 | |||
215 | |||
216 | /** | ||
217 | * Client connected to the transport service. | ||
218 | */ | ||
78 | struct TransportClient | 219 | struct TransportClient |
79 | { | 220 | { |
80 | 221 | ||
@@ -107,17 +248,63 @@ struct TransportClient | |||
107 | { | 248 | { |
108 | 249 | ||
109 | /** | 250 | /** |
110 | * Peer identity to monitor the addresses of. | 251 | * Information for @e type #CT_CORE. |
111 | * Zero to monitor all neighbours. Valid if | ||
112 | * @e type is #CT_MONITOR. | ||
113 | */ | 252 | */ |
114 | struct GNUNET_PeerIdentity monitor_peer; | 253 | struct { |
254 | |||
255 | /** | ||
256 | * Head of list of messages pending for this client. | ||
257 | */ | ||
258 | struct PendingMessage *pending_msg_head; | ||
259 | |||
260 | /** | ||
261 | * Tail of list of messages pending for this client. | ||
262 | */ | ||
263 | struct PendingMessage *pending_msg_tail; | ||
264 | |||
265 | } core; | ||
266 | |||
267 | /** | ||
268 | * Information for @e type #CT_MONITOR. | ||
269 | */ | ||
270 | struct { | ||
271 | |||
272 | /** | ||
273 | * Peer identity to monitor the addresses of. | ||
274 | * Zero to monitor all neighbours. Valid if | ||
275 | * @e type is #CT_MONITOR. | ||
276 | */ | ||
277 | struct GNUNET_PeerIdentity peer; | ||
278 | |||
279 | /** | ||
280 | * Is this a one-shot monitor? | ||
281 | */ | ||
282 | int one_shot; | ||
283 | |||
284 | } monitor; | ||
285 | |||
115 | 286 | ||
116 | /** | 287 | /** |
117 | * If @e type is #CT_COMMUNICATOR, this communicator | 288 | * Information for @e type #CT_COMMUNICATOR. |
118 | * supports communicating using these addresses. | ||
119 | */ | 289 | */ |
120 | const char *address_prefix; | 290 | struct { |
291 | /** | ||
292 | * If @e type is #CT_COMMUNICATOR, this communicator | ||
293 | * supports communicating using these addresses. | ||
294 | */ | ||
295 | char *address_prefix; | ||
296 | |||
297 | /** | ||
298 | * Head of DLL of queues offered by this communicator. | ||
299 | */ | ||
300 | struct Queue *queue_head; | ||
301 | |||
302 | /** | ||
303 | * Tail of DLL of queues offered by this communicator. | ||
304 | */ | ||
305 | struct Queue *queue_tail; | ||
306 | |||
307 | } communicator; | ||
121 | 308 | ||
122 | } details; | 309 | } details; |
123 | 310 | ||
@@ -154,6 +341,26 @@ struct GNUNET_PeerIdentity GST_my_identity; | |||
154 | */ | 341 | */ |
155 | struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key; | 342 | struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key; |
156 | 343 | ||
344 | /** | ||
345 | * Map from PIDs to `struct Neighbour` entries. A peer is | ||
346 | * a neighbour if we have an MQ to it from some communicator. | ||
347 | */ | ||
348 | static struct GNUNET_CONTAINER_MultiPeerMap *neighbours; | ||
349 | |||
350 | |||
351 | /** | ||
352 | * Lookup neighbour record for peer @a pid. | ||
353 | * | ||
354 | * @param pid neighbour to look for | ||
355 | * @return NULL if we do not have this peer as a neighbour | ||
356 | */ | ||
357 | static struct Neighbour * | ||
358 | lookup_neighbour (const struct GNUNET_PeerIdentity *pid) | ||
359 | { | ||
360 | return GNUNET_CONTAINER_multipeermap_get (neighbours, | ||
361 | pid); | ||
362 | } | ||
363 | |||
157 | 364 | ||
158 | /** | 365 | /** |
159 | * Called whenever a client connects. Allocates our | 366 | * Called whenever a client connects. Allocates our |
@@ -210,10 +417,23 @@ client_disconnect_cb (void *cls, | |||
210 | case CT_NONE: | 417 | case CT_NONE: |
211 | break; | 418 | break; |
212 | case CT_CORE: | 419 | case CT_CORE: |
420 | { | ||
421 | struct PendingMessage *pm; | ||
422 | |||
423 | while (NULL != (pm = tc->details.core.pending_msg_head)) | ||
424 | { | ||
425 | GNUNET_CONTAINER_MDLL_remove (client, | ||
426 | tc->details.core.pending_msg_head, | ||
427 | tc->details.core.pending_msg_tail, | ||
428 | pm); | ||
429 | pm->client = NULL; | ||
430 | } | ||
431 | } | ||
213 | break; | 432 | break; |
214 | case CT_MONITOR: | 433 | case CT_MONITOR: |
215 | break; | 434 | break; |
216 | case CT_COMMUNICATOR: | 435 | case CT_COMMUNICATOR: |
436 | GNUNET_free (tc->details.communicator.address_prefix); | ||
217 | break; | 437 | break; |
218 | } | 438 | } |
219 | GNUNET_free (tc); | 439 | GNUNET_free (tc); |
@@ -268,10 +488,15 @@ static int | |||
268 | check_client_send (void *cls, | 488 | check_client_send (void *cls, |
269 | const struct OutboundMessage *obm) | 489 | const struct OutboundMessage *obm) |
270 | { | 490 | { |
491 | struct TransportClient *tc = cls; | ||
271 | uint16_t size; | 492 | uint16_t size; |
272 | const struct GNUNET_MessageHeader *obmm; | 493 | const struct GNUNET_MessageHeader *obmm; |
273 | 494 | ||
274 | (void) cls; | 495 | if (CT_CORE != tc->type) |
496 | { | ||
497 | GNUNET_break (0); | ||
498 | return GNUNET_SYSERR; | ||
499 | } | ||
275 | size = ntohs (obm->header.size) - sizeof (struct OutboundMessage); | 500 | size = ntohs (obm->header.size) - sizeof (struct OutboundMessage); |
276 | if (size < sizeof (struct GNUNET_MessageHeader)) | 501 | if (size < sizeof (struct GNUNET_MessageHeader)) |
277 | { | 502 | { |
@@ -289,6 +514,51 @@ check_client_send (void *cls, | |||
289 | 514 | ||
290 | 515 | ||
291 | /** | 516 | /** |
517 | * Send a response to the @a pm that we have processed a | ||
518 | * "send" request with status @a success. We | ||
519 | * transmitted @a bytes_physical on the actual wire. | ||
520 | * Sends a confirmation to the "core" client responsible | ||
521 | * for the original request and free's @a pm. | ||
522 | * | ||
523 | * @param pm handle to the original pending message | ||
524 | * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR | ||
525 | * for transmission failure | ||
526 | * @param bytes_physical amount of bandwidth consumed | ||
527 | */ | ||
528 | static void | ||
529 | client_send_response (struct PendingMessage *pm, | ||
530 | int success, | ||
531 | uint32_t bytes_physical) | ||
532 | { | ||
533 | struct TransportClient *tc = pm->client; | ||
534 | struct Neighbour *target = pm->target; | ||
535 | struct GNUNET_MQ_Envelope *env; | ||
536 | struct SendOkMessage *som; | ||
537 | |||
538 | if (NULL != tc) | ||
539 | { | ||
540 | env = GNUNET_MQ_msg (som, | ||
541 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); | ||
542 | som->success = htonl ((uint32_t) success); | ||
543 | som->bytes_msg = htonl (pm->bytes_msg); | ||
544 | som->bytes_physical = htonl (bytes_physical); | ||
545 | som->peer = target->pid; | ||
546 | GNUNET_MQ_send (tc->mq, | ||
547 | env); | ||
548 | GNUNET_CONTAINER_MDLL_remove (client, | ||
549 | tc->details.core.pending_msg_head, | ||
550 | tc->details.core.pending_msg_tail, | ||
551 | pm); | ||
552 | } | ||
553 | GNUNET_CONTAINER_MDLL_remove (neighbour, | ||
554 | target->pending_msg_head, | ||
555 | target->pending_msg_tail, | ||
556 | pm); | ||
557 | GNUNET_free (pm); | ||
558 | } | ||
559 | |||
560 | |||
561 | /** | ||
292 | * Client asked for transmission to a peer. Process the request. | 562 | * Client asked for transmission to a peer. Process the request. |
293 | * | 563 | * |
294 | * @param cls the client | 564 | * @param cls the client |
@@ -299,9 +569,55 @@ handle_client_send (void *cls, | |||
299 | const struct OutboundMessage *obm) | 569 | const struct OutboundMessage *obm) |
300 | { | 570 | { |
301 | struct TransportClient *tc = cls; | 571 | struct TransportClient *tc = cls; |
572 | struct PendingMessage *pm; | ||
302 | const struct GNUNET_MessageHeader *obmm; | 573 | const struct GNUNET_MessageHeader *obmm; |
574 | struct Neighbour *target; | ||
575 | uint32_t bytes_msg; | ||
303 | 576 | ||
577 | GNUNET_assert (CT_CORE == tc->type); | ||
304 | obmm = (const struct GNUNET_MessageHeader *) &obm[1]; | 578 | obmm = (const struct GNUNET_MessageHeader *) &obm[1]; |
579 | bytes_msg = ntohs (obmm->size); | ||
580 | target = lookup_neighbour (&obm->peer); | ||
581 | if (NULL == target) | ||
582 | { | ||
583 | /* Failure: don't have this peer as a neighbour (anymore). | ||
584 | Might have gone down asynchronously, so this is NOT | ||
585 | a protocol violation by CORE. Still count the event, | ||
586 | as this should be rare. */ | ||
587 | struct GNUNET_MQ_Envelope *env; | ||
588 | struct SendOkMessage *som; | ||
589 | |||
590 | env = GNUNET_MQ_msg (som, | ||
591 | GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); | ||
592 | som->success = htonl (GNUNET_SYSERR); | ||
593 | som->bytes_msg = htonl (bytes_msg); | ||
594 | som->bytes_physical = htonl (0); | ||
595 | som->peer = obm->peer; | ||
596 | GNUNET_MQ_send (tc->mq, | ||
597 | env); | ||
598 | GNUNET_SERVICE_client_continue (tc->client); | ||
599 | GNUNET_STATISTICS_update (GST_stats, | ||
600 | "# messages dropped (neighbour unknown)", | ||
601 | 1, | ||
602 | GNUNET_NO); | ||
603 | return; | ||
604 | } | ||
605 | pm = GNUNET_new (struct PendingMessage); | ||
606 | pm->client = tc; | ||
607 | pm->target = target; | ||
608 | pm->bytes_msg = bytes_msg; | ||
609 | GNUNET_CONTAINER_MDLL_insert (neighbour, | ||
610 | target->pending_msg_head, | ||
611 | target->pending_msg_tail, | ||
612 | pm); | ||
613 | GNUNET_CONTAINER_MDLL_insert (client, | ||
614 | tc->details.core.pending_msg_head, | ||
615 | tc->details.core.pending_msg_tail, | ||
616 | pm); | ||
617 | // FIXME: do the work, continuation with: | ||
618 | client_send_response (pm, | ||
619 | GNUNET_NO, | ||
620 | 0); | ||
305 | } | 621 | } |
306 | 622 | ||
307 | 623 | ||
@@ -315,10 +631,16 @@ static int | |||
315 | check_communicator_available (void *cls, | 631 | check_communicator_available (void *cls, |
316 | const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam) | 632 | const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam) |
317 | { | 633 | { |
634 | struct TransportClient *tc = cls; | ||
318 | const char *addr; | 635 | const char *addr; |
319 | uint16_t size; | 636 | uint16_t size; |
320 | 637 | ||
321 | (void) cls; | 638 | if (CT_NONE != tc->type) |
639 | { | ||
640 | GNUNET_break (0); | ||
641 | return GNUNET_SYSERR; | ||
642 | } | ||
643 | tc->type = CT_COMMUNICATOR; | ||
322 | size = ntohs (cam->header.size) - sizeof (*cam); | 644 | size = ntohs (cam->header.size) - sizeof (*cam); |
323 | if (0 == size) | 645 | if (0 == size) |
324 | return GNUNET_OK; /* receive-only communicator */ | 646 | return GNUNET_OK; /* receive-only communicator */ |
@@ -345,17 +667,10 @@ handle_communicator_available (void *cls, | |||
345 | struct TransportClient *tc = cls; | 667 | struct TransportClient *tc = cls; |
346 | uint16_t size; | 668 | uint16_t size; |
347 | 669 | ||
348 | if (CT_NONE != tc->type) | ||
349 | { | ||
350 | GNUNET_break (0); | ||
351 | GNUNET_SERVICE_client_drop (tc->client); | ||
352 | return; | ||
353 | } | ||
354 | tc->type = CT_COMMUNICATOR; | ||
355 | size = ntohs (cam->header.size) - sizeof (*cam); | 670 | size = ntohs (cam->header.size) - sizeof (*cam); |
356 | if (0 == size) | 671 | if (0 == size) |
357 | return; /* receive-only communicator */ | 672 | return; /* receive-only communicator */ |
358 | tc->details.address_prefix = GNUNET_strdup ((const char *) &cam[1]); | 673 | tc->details.communicator.address_prefix = GNUNET_strdup ((const char *) &cam[1]); |
359 | GNUNET_SERVICE_client_continue (tc->client); | 674 | GNUNET_SERVICE_client_continue (tc->client); |
360 | } | 675 | } |
361 | 676 | ||
@@ -370,10 +685,15 @@ static int | |||
370 | check_add_address (void *cls, | 685 | check_add_address (void *cls, |
371 | const struct GNUNET_TRANSPORT_AddAddressMessage *aam) | 686 | const struct GNUNET_TRANSPORT_AddAddressMessage *aam) |
372 | { | 687 | { |
688 | struct TransportClient *tc = cls; | ||
373 | const char *addr; | 689 | const char *addr; |
374 | uint16_t size; | 690 | uint16_t size; |
375 | 691 | ||
376 | (void) cls; | 692 | if (CT_COMMUNICATOR != tc->type) |
693 | { | ||
694 | GNUNET_break (0); | ||
695 | return GNUNET_SYSERR; | ||
696 | } | ||
377 | size = ntohs (aam->header.size) - sizeof (*aam); | 697 | size = ntohs (aam->header.size) - sizeof (*aam); |
378 | if (0 == size) | 698 | if (0 == size) |
379 | { | 699 | { |
@@ -418,12 +738,19 @@ handle_del_address (void *cls, | |||
418 | { | 738 | { |
419 | struct TransportClient *tc = cls; | 739 | struct TransportClient *tc = cls; |
420 | 740 | ||
741 | if (CT_COMMUNICATOR != tc->type) | ||
742 | { | ||
743 | GNUNET_break (0); | ||
744 | GNUNET_SERVICE_client_drop (tc->client); | ||
745 | return; | ||
746 | } | ||
747 | |||
421 | GNUNET_SERVICE_client_continue (tc->client); | 748 | GNUNET_SERVICE_client_continue (tc->client); |
422 | } | 749 | } |
423 | 750 | ||
424 | 751 | ||
425 | /** | 752 | /** |
426 | * Client asked for transmission to a peer. Process the request. | 753 | * Client notified us about transmission from a peer. Process the request. |
427 | * | 754 | * |
428 | * @param cls the client | 755 | * @param cls the client |
429 | * @param obm the send message that was sent | 756 | * @param obm the send message that was sent |
@@ -432,10 +759,15 @@ static int | |||
432 | check_incoming_msg (void *cls, | 759 | check_incoming_msg (void *cls, |
433 | const struct GNUNET_TRANSPORT_IncomingMessage *im) | 760 | const struct GNUNET_TRANSPORT_IncomingMessage *im) |
434 | { | 761 | { |
762 | struct TransportClient *tc = cls; | ||
435 | uint16_t size; | 763 | uint16_t size; |
436 | const struct GNUNET_MessageHeader *obmm; | 764 | const struct GNUNET_MessageHeader *obmm; |
437 | 765 | ||
438 | (void) cls; | 766 | if (CT_COMMUNICATOR != tc->type) |
767 | { | ||
768 | GNUNET_break (0); | ||
769 | return GNUNET_SYSERR; | ||
770 | } | ||
439 | size = ntohs (im->header.size) - sizeof (*im); | 771 | size = ntohs (im->header.size) - sizeof (*im); |
440 | if (size < sizeof (struct GNUNET_MessageHeader)) | 772 | if (size < sizeof (struct GNUNET_MessageHeader)) |
441 | { | 773 | { |
@@ -478,10 +810,15 @@ static int | |||
478 | check_add_queue_message (void *cls, | 810 | check_add_queue_message (void *cls, |
479 | const struct GNUNET_TRANSPORT_AddQueueMessage *aqm) | 811 | const struct GNUNET_TRANSPORT_AddQueueMessage *aqm) |
480 | { | 812 | { |
813 | struct TransportClient *tc = cls; | ||
481 | const char *addr; | 814 | const char *addr; |
482 | uint16_t size; | 815 | uint16_t size; |
483 | 816 | ||
484 | (void) cls; | 817 | if (CT_COMMUNICATOR != tc->type) |
818 | { | ||
819 | GNUNET_break (0); | ||
820 | return GNUNET_SYSERR; | ||
821 | } | ||
485 | size = ntohs (aqm->header.size) - sizeof (*aqm); | 822 | size = ntohs (aqm->header.size) - sizeof (*aqm); |
486 | if (0 == size) | 823 | if (0 == size) |
487 | { | 824 | { |
@@ -509,12 +846,66 @@ handle_add_queue_message (void *cls, | |||
509 | const struct GNUNET_TRANSPORT_AddQueueMessage *aqm) | 846 | const struct GNUNET_TRANSPORT_AddQueueMessage *aqm) |
510 | { | 847 | { |
511 | struct TransportClient *tc = cls; | 848 | struct TransportClient *tc = cls; |
849 | struct Queue *queue; | ||
850 | struct Neighbour *neighbour; | ||
851 | const char *addr; | ||
852 | uint16_t addr_len; | ||
512 | 853 | ||
854 | neighbour = lookup_neighbour (&aqm->receiver); | ||
855 | if (NULL == neighbour) | ||
856 | { | ||
857 | neighbour = GNUNET_new (struct Neighbour); | ||
858 | neighbour->pid = aqm->receiver; | ||
859 | GNUNET_assert (GNUNET_OK == | ||
860 | GNUNET_CONTAINER_multipeermap_put (neighbours, | ||
861 | &neighbour->pid, | ||
862 | neighbour, | ||
863 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
864 | // FIXME: notify cores/monitors! | ||
865 | } | ||
866 | addr_len = ntohs (aqm->header.size) - sizeof (*aqm); | ||
867 | addr = (const char *) &aqm[1]; | ||
868 | |||
869 | queue = GNUNET_malloc (sizeof (struct Queue) + addr_len); | ||
870 | queue->qid = aqm->qid; | ||
871 | queue->nt = (enum GNUNET_ATS_Network_Type) ntohl (aqm->nt); | ||
872 | queue->tc = tc; | ||
873 | queue->neighbour = neighbour; | ||
874 | queue->address = (const char *) &queue[1]; | ||
875 | memcpy (&queue[1], | ||
876 | addr, | ||
877 | addr_len); | ||
878 | GNUNET_CONTAINER_MDLL_insert (neighbour, | ||
879 | neighbour->queue_head, | ||
880 | neighbour->queue_tail, | ||
881 | queue); | ||
882 | GNUNET_CONTAINER_MDLL_insert (client, | ||
883 | tc->details.communicator.queue_head, | ||
884 | tc->details.communicator.queue_tail, | ||
885 | queue); | ||
886 | // FIXME: possibly transmit queued messages? | ||
513 | GNUNET_SERVICE_client_continue (tc->client); | 887 | GNUNET_SERVICE_client_continue (tc->client); |
514 | } | 888 | } |
515 | 889 | ||
516 | 890 | ||
517 | /** | 891 | /** |
892 | * Release memory used by @a neighbour. | ||
893 | * | ||
894 | * @param neighbour neighbour entry to free | ||
895 | */ | ||
896 | static void | ||
897 | free_neighbour (struct Neighbour *neighbour) | ||
898 | { | ||
899 | GNUNET_assert (NULL == neighbour->queue_head); | ||
900 | GNUNET_assert (GNUNET_YES == | ||
901 | GNUNET_CONTAINER_multipeermap_remove (neighbours, | ||
902 | &neighbour->pid, | ||
903 | neighbour)); | ||
904 | GNUNET_free (neighbour); | ||
905 | } | ||
906 | |||
907 | |||
908 | /** | ||
518 | * Queue to a peer went down. Process the request. | 909 | * Queue to a peer went down. Process the request. |
519 | * | 910 | * |
520 | * @param cls the client | 911 | * @param cls the client |
@@ -526,7 +917,42 @@ handle_del_queue_message (void *cls, | |||
526 | { | 917 | { |
527 | struct TransportClient *tc = cls; | 918 | struct TransportClient *tc = cls; |
528 | 919 | ||
529 | GNUNET_SERVICE_client_continue (tc->client); | 920 | if (CT_COMMUNICATOR != tc->type) |
921 | { | ||
922 | GNUNET_break (0); | ||
923 | GNUNET_SERVICE_client_drop (tc->client); | ||
924 | return; | ||
925 | } | ||
926 | for (struct Queue *queue = tc->details.communicator.queue_head; | ||
927 | NULL != queue; | ||
928 | queue = queue->next_client) | ||
929 | { | ||
930 | struct Neighbour *neighbour = queue->neighbour; | ||
931 | |||
932 | if ( (dqm->qid != queue->qid) || | ||
933 | (0 != memcmp (&dqm->receiver, | ||
934 | &neighbour->pid, | ||
935 | sizeof (struct GNUNET_PeerIdentity))) ) | ||
936 | continue; | ||
937 | GNUNET_CONTAINER_MDLL_remove (neighbour, | ||
938 | neighbour->queue_head, | ||
939 | neighbour->queue_tail, | ||
940 | queue); | ||
941 | GNUNET_CONTAINER_MDLL_remove (client, | ||
942 | tc->details.communicator.queue_head, | ||
943 | tc->details.communicator.queue_tail, | ||
944 | queue); | ||
945 | GNUNET_free (queue); | ||
946 | if (NULL == neighbour->queue_head) | ||
947 | { | ||
948 | // FIXME: notify cores/monitors! | ||
949 | free_neighbour (neighbour); | ||
950 | } | ||
951 | GNUNET_SERVICE_client_continue (tc->client); | ||
952 | return; | ||
953 | } | ||
954 | GNUNET_break (0); | ||
955 | GNUNET_SERVICE_client_drop (tc->client); | ||
530 | } | 956 | } |
531 | 957 | ||
532 | 958 | ||
@@ -542,6 +968,12 @@ handle_send_message_ack (void *cls, | |||
542 | { | 968 | { |
543 | struct TransportClient *tc = cls; | 969 | struct TransportClient *tc = cls; |
544 | 970 | ||
971 | if (CT_COMMUNICATOR != tc->type) | ||
972 | { | ||
973 | GNUNET_break (0); | ||
974 | GNUNET_SERVICE_client_drop (tc->client); | ||
975 | return; | ||
976 | } | ||
545 | GNUNET_SERVICE_client_continue (tc->client); | 977 | GNUNET_SERVICE_client_continue (tc->client); |
546 | } | 978 | } |
547 | 979 | ||
@@ -565,20 +997,45 @@ handle_monitor_start (void *cls, | |||
565 | return; | 997 | return; |
566 | } | 998 | } |
567 | tc->type = CT_MONITOR; | 999 | tc->type = CT_MONITOR; |
568 | tc->details.monitor_peer = start->peer; | 1000 | tc->details.monitor.peer = start->peer; |
569 | // FIXME: remember also the one_shot flag! | 1001 | tc->details.monitor.one_shot = ntohl (start->one_shot); |
1002 | // FIXME: do work! | ||
570 | GNUNET_SERVICE_client_continue (tc->client); | 1003 | GNUNET_SERVICE_client_continue (tc->client); |
571 | } | 1004 | } |
572 | 1005 | ||
573 | 1006 | ||
574 | /** | 1007 | /** |
1008 | * Free neighbour entry. | ||
1009 | * | ||
1010 | * @param cls NULL | ||
1011 | * @param pid unused | ||
1012 | * @param value a `struct Neighbour` | ||
1013 | * @return #GNUNET_OK (always) | ||
1014 | */ | ||
1015 | static int | ||
1016 | free_neighbour_cb (void *cls, | ||
1017 | const struct GNUNET_PeerIdentity *pid, | ||
1018 | void *value) | ||
1019 | { | ||
1020 | struct Neighbour *neighbour = value; | ||
1021 | |||
1022 | (void) cls; | ||
1023 | (void) pid; | ||
1024 | GNUNET_break (0); // should this ever happen? | ||
1025 | free_neighbour (neighbour); | ||
1026 | |||
1027 | return GNUNET_OK; | ||
1028 | } | ||
1029 | |||
1030 | |||
1031 | /** | ||
575 | * Function called when the service shuts down. Unloads our plugins | 1032 | * Function called when the service shuts down. Unloads our plugins |
576 | * and cancels pending validations. | 1033 | * and cancels pending validations. |
577 | * | 1034 | * |
578 | * @param cls closure, unused | 1035 | * @param cls closure, unused |
579 | */ | 1036 | */ |
580 | static void | 1037 | static void |
581 | shutdown_task (void *cls) | 1038 | do_shutdown (void *cls) |
582 | { | 1039 | { |
583 | (void) cls; | 1040 | (void) cls; |
584 | 1041 | ||
@@ -593,6 +1050,10 @@ shutdown_task (void *cls) | |||
593 | GNUNET_free (GST_my_private_key); | 1050 | GNUNET_free (GST_my_private_key); |
594 | GST_my_private_key = NULL; | 1051 | GST_my_private_key = NULL; |
595 | } | 1052 | } |
1053 | GNUNET_CONTAINER_multipeermap_iterate (neighbours, | ||
1054 | &free_neighbour_cb, | ||
1055 | NULL); | ||
1056 | GNUNET_CONTAINER_multipeermap_destroy (neighbours); | ||
596 | } | 1057 | } |
597 | 1058 | ||
598 | 1059 | ||
@@ -608,8 +1069,11 @@ run (void *cls, | |||
608 | const struct GNUNET_CONFIGURATION_Handle *c, | 1069 | const struct GNUNET_CONFIGURATION_Handle *c, |
609 | struct GNUNET_SERVICE_Handle *service) | 1070 | struct GNUNET_SERVICE_Handle *service) |
610 | { | 1071 | { |
1072 | (void) cls; | ||
611 | /* setup globals */ | 1073 | /* setup globals */ |
612 | GST_cfg = c; | 1074 | GST_cfg = c; |
1075 | neighbours = GNUNET_CONTAINER_multipeermap_create (1024, | ||
1076 | GNUNET_YES); | ||
613 | GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg); | 1077 | GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg); |
614 | if (NULL == GST_my_private_key) | 1078 | if (NULL == GST_my_private_key) |
615 | { | 1079 | { |
@@ -626,7 +1090,7 @@ run (void *cls, | |||
626 | 1090 | ||
627 | GST_stats = GNUNET_STATISTICS_create ("transport", | 1091 | GST_stats = GNUNET_STATISTICS_create ("transport", |
628 | GST_cfg); | 1092 | GST_cfg); |
629 | GNUNET_SCHEDULER_add_shutdown (&shutdown_task, | 1093 | GNUNET_SCHEDULER_add_shutdown (&do_shutdown, |
630 | NULL); | 1094 | NULL); |
631 | /* start subsystems */ | 1095 | /* start subsystems */ |
632 | } | 1096 | } |