aboutsummaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set.c
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-06-19 10:48:54 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-06-19 10:48:54 +0000
commita900b29ddaa9ea46c731b054b5e3ef3e725b95a8 (patch)
tree52e1a9697b0abf4618cd5684359ec5f0a040898a /src/set/gnunet-service-set.c
parent17353bc0a47c89bda205f23e7995377c9bfe7769 (diff)
downloadgnunet-a900b29ddaa9ea46c731b054b5e3ef3e725b95a8.tar.gz
gnunet-a900b29ddaa9ea46c731b054b5e3ef3e725b95a8.zip
- opaque mq structs
- mq for mesh - faster hashing for IBFs - mesh replaces stream in set - new set profiler (work in progress)
Diffstat (limited to 'src/set/gnunet-service-set.c')
-rw-r--r--src/set/gnunet-service-set.c278
1 files changed, 194 insertions, 84 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index d2f0b48d5..bd934de84 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -29,13 +29,16 @@
29 29
30/** 30/**
31 * Configuration of our local peer. 31 * Configuration of our local peer.
32 * (Not declared 'static' as also needed in gnunet-service-set_union.c)
32 */ 33 */
33const struct GNUNET_CONFIGURATION_Handle *configuration; 34const struct GNUNET_CONFIGURATION_Handle *configuration;
34 35
35/** 36/**
36 * Socket listening for other peers via stream. 37 * Handle to the mesh service, used
38 * to listen for and connect to remote peers.
39 * (Not declared 'static' as also needed in gnunet-service-set_union.c)
37 */ 40 */
38static struct GNUNET_STREAM_ListenSocket *stream_listen_socket; 41struct GNUNET_MESH_Handle *mesh;
39 42
40/** 43/**
41 * Sets are held in a doubly linked list. 44 * Sets are held in a doubly linked list.
@@ -78,14 +81,14 @@ static uint32_t accept_id = 1;
78 81
79 82
80/** 83/**
81 * Get set that is owned by the client, if any. 84 * Get set that is owned by the given client, if any.
82 * 85 *
83 * @param client client to look for 86 * @param client client to look for
84 * @return set that the client owns, NULL if the client 87 * @return set that the client owns, NULL if the client
85 * does not own a set 88 * does not own a set
86 */ 89 */
87static struct Set * 90static struct Set *
88get_set (struct GNUNET_SERVER_Client *client) 91set_get (struct GNUNET_SERVER_Client *client)
89{ 92{
90 struct Set *set; 93 struct Set *set;
91 for (set = sets_head; NULL != set; set = set->next) 94 for (set = sets_head; NULL != set; set = set->next)
@@ -137,7 +140,7 @@ get_incoming (uint32_t id)
137 * @param listener listener to destroy 140 * @param listener listener to destroy
138 */ 141 */
139static void 142static void
140destroy_listener (struct Listener *listener) 143listener_destroy (struct Listener *listener)
141{ 144{
142 if (NULL != listener->client_mq) 145 if (NULL != listener->client_mq)
143 { 146 {
@@ -155,7 +158,7 @@ destroy_listener (struct Listener *listener)
155 * @param set the set to destroy 158 * @param set the set to destroy
156 */ 159 */
157static void 160static void
158destroy_set (struct Set *set) 161set_destroy (struct Set *set)
159{ 162{
160 switch (set->operation) 163 switch (set->operation)
161 { 164 {
@@ -187,12 +190,12 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
187 struct Set *set; 190 struct Set *set;
188 struct Listener *listener; 191 struct Listener *listener;
189 192
190 set = get_set (client); 193 set = set_get (client);
191 if (NULL != set) 194 if (NULL != set)
192 destroy_set (set); 195 set_destroy (set);
193 listener = get_listener (client); 196 listener = get_listener (client);
194 if (NULL != listener) 197 if (NULL != listener)
195 destroy_listener (listener); 198 listener_destroy (listener);
196} 199}
197 200
198 201
@@ -202,17 +205,14 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
202 * @param incoming remote request to destroy 205 * @param incoming remote request to destroy
203 */ 206 */
204static void 207static void
205destroy_incoming (struct Incoming *incoming) 208incoming_destroy (struct Incoming *incoming)
206{ 209{
207 if (NULL != incoming->mq) 210 if (NULL != incoming->tc)
208 { 211 {
209 GNUNET_MQ_destroy (incoming->mq); 212 GNUNET_free (incoming->tc);
210 incoming->mq = NULL; 213 GNUNET_assert (NULL != incoming->tc->tunnel);
211 } 214 GNUNET_MESH_tunnel_destroy (incoming->tc->tunnel);
212 if (NULL != incoming->socket) 215 incoming->tc = NULL;
213 {
214 GNUNET_STREAM_close (incoming->socket);
215 incoming->socket = NULL;
216 } 216 }
217 GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming); 217 GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
218 GNUNET_free (incoming); 218 GNUNET_free (incoming);
@@ -237,6 +237,15 @@ get_listener_by_target (enum GNUNET_SET_OperationType op,
237} 237}
238 238
239 239
240
241static void
242tunnel_context_destroy (struct TunnelContext *tc)
243{
244 GNUNET_free (tc);
245 /* FIXME destroy the rest */
246}
247
248
240/** 249/**
241 * Handle a request for a set operation from 250 * Handle a request for a set operation from
242 * another peer. 251 * another peer.
@@ -244,16 +253,31 @@ get_listener_by_target (enum GNUNET_SET_OperationType op,
244 * @param cls the incoming socket 253 * @param cls the incoming socket
245 * @param mh the message 254 * @param mh the message
246 */ 255 */
247static void 256static int
248handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh) 257handle_p2p_operation_request (void *cls,
258 struct GNUNET_MESH_Tunnel *tunnel,
259 void **tunnel_ctx,
260 const struct GNUNET_PeerIdentity *sender,
261 const struct GNUNET_MessageHeader *mh)
249{ 262{
250 struct Incoming *incoming = cls; 263 struct TunnelContext *tc = *tunnel_ctx;
264 struct Incoming *incoming;
251 const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; 265 const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh;
252 struct GNUNET_MQ_Message *mqm; 266 struct GNUNET_MQ_Envelope *mqm;
253 struct GNUNET_SET_RequestMessage *cmsg; 267 struct GNUNET_SET_RequestMessage *cmsg;
254 struct Listener *listener; 268 struct Listener *listener;
255 const struct GNUNET_MessageHeader *context_msg; 269 const struct GNUNET_MessageHeader *context_msg;
256 270
271 if (CONTEXT_INCOMING != tc->type)
272 {
273 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected operation request\n");
274 tunnel_context_destroy (tc);
275 /* don't kill the whole mesh connection */
276 return GNUNET_OK;
277 }
278
279 incoming = tc->data;
280
257 context_msg = GNUNET_MQ_extract_nested_mh (msg); 281 context_msg = GNUNET_MQ_extract_nested_mh (msg);
258 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n", 282 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n",
259 ntohs (msg->operation), GNUNET_h2s (&msg->app_id)); 283 ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
@@ -263,20 +287,26 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
263 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 287 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
264 "set operation request from peer failed: " 288 "set operation request from peer failed: "
265 "no set with matching application ID and operation type\n"); 289 "no set with matching application ID and operation type\n");
266 return; 290 tunnel_context_destroy (tc);
291 /* don't kill the whole mesh connection */
292 return GNUNET_OK;
267 } 293 }
268 mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, context_msg); 294 mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, context_msg);
269 if (NULL == mqm) 295 if (NULL == mqm)
270 { 296 {
271 /* FIXME: disconnect the peer */ 297 /* FIXME: disconnect the peer */
272 GNUNET_break_op (0); 298 GNUNET_break_op (0);
273 return; 299 tunnel_context_destroy (tc);
300 /* don't kill the whole mesh connection */
301 return GNUNET_OK;
274 } 302 }
275 incoming->accept_id = accept_id++; 303 incoming->accept_id = accept_id++;
276 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request with accept id %u\n", incoming->accept_id); 304 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request with accept id %u\n", incoming->accept_id);
277 cmsg->accept_id = htonl (incoming->accept_id); 305 cmsg->accept_id = htonl (incoming->accept_id);
278 cmsg->peer_id = incoming->peer; 306 cmsg->peer_id = incoming->tc->peer;
279 GNUNET_MQ_send (listener->client_mq, mqm); 307 GNUNET_MQ_send (listener->client_mq, mqm);
308
309 return GNUNET_OK;
280} 310}
281 311
282 312
@@ -298,7 +328,7 @@ handle_client_create (void *cls,
298 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n", 328 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n",
299 ntohs (msg->operation)); 329 ntohs (msg->operation));
300 330
301 if (NULL != get_set (client)) 331 if (NULL != set_get (client))
302 { 332 {
303 GNUNET_break (0); 333 GNUNET_break (0);
304 GNUNET_SERVER_client_disconnect (client); 334 GNUNET_SERVER_client_disconnect (client);
@@ -379,7 +409,7 @@ handle_client_remove (void *cls,
379{ 409{
380 struct Set *set; 410 struct Set *set;
381 411
382 set = get_set (client); 412 set = set_get (client);
383 if (NULL == set) 413 if (NULL == set)
384 { 414 {
385 GNUNET_break (0); 415 GNUNET_break (0);
@@ -428,7 +458,7 @@ handle_client_reject (void *cls,
428 return; 458 return;
429 } 459 }
430 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n"); 460 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n");
431 destroy_incoming (incoming); 461 incoming_destroy (incoming);
432 GNUNET_SERVER_receive_done (client, GNUNET_OK); 462 GNUNET_SERVER_receive_done (client, GNUNET_OK);
433} 463}
434 464
@@ -449,7 +479,7 @@ handle_client_add (void *cls,
449{ 479{
450 struct Set *set; 480 struct Set *set;
451 481
452 set = get_set (client); 482 set = set_get (client);
453 if (NULL == set) 483 if (NULL == set)
454 { 484 {
455 GNUNET_break (0); 485 GNUNET_break (0);
@@ -486,7 +516,7 @@ handle_client_evaluate (void *cls,
486{ 516{
487 struct Set *set; 517 struct Set *set;
488 518
489 set = get_set (client); 519 set = set_get (client);
490 if (NULL == set) 520 if (NULL == set)
491 { 521 {
492 GNUNET_break (0); 522 GNUNET_break (0);
@@ -558,8 +588,7 @@ handle_client_accept (void *cls,
558 return; 588 return;
559 } 589 }
560 590
561 591 set = set_get (client);
562 set = get_set (client);
563 592
564 if (NULL == set) 593 if (NULL == set)
565 { 594 {
@@ -584,51 +613,12 @@ handle_client_accept (void *cls,
584 613
585 /* note: _GSS_*_accept has to make sure the socket and mq are set to NULL, 614 /* note: _GSS_*_accept has to make sure the socket and mq are set to NULL,
586 * otherwise they will be destroyed and disconnected */ 615 * otherwise they will be destroyed and disconnected */
587 destroy_incoming (incoming); 616 incoming_destroy (incoming);
588 GNUNET_SERVER_receive_done (client, GNUNET_OK); 617 GNUNET_SERVER_receive_done (client, GNUNET_OK);
589} 618}
590 619
591 620
592/** 621/**
593 * Functions of this type are called upon new stream connection from other peers
594 * or upon binding error which happen when the app_port given in
595 * GNUNET_STREAM_listen() is already taken.
596 *
597 * @param cls the closure from GNUNET_STREAM_listen
598 * @param socket the socket representing the stream; NULL on binding error
599 * @param initiator the identity of the peer who wants to establish a stream
600 * with us; NULL on binding error
601 * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
602 * stream (the socket will be invalid after the call)
603 */
604static int
605stream_listen_cb (void *cls,
606 struct GNUNET_STREAM_Socket *socket,
607 const struct GNUNET_PeerIdentity *initiator)
608{
609 struct Incoming *incoming;
610 static const struct GNUNET_MQ_Handler handlers[] = {
611 {handle_p2p_operation_request, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST},
612 GNUNET_MQ_HANDLERS_END
613 };
614
615 if (NULL == socket)
616 {
617 GNUNET_break (0);
618 return GNUNET_SYSERR;
619 }
620
621 incoming = GNUNET_new (struct Incoming);
622 incoming->peer = *initiator;
623 incoming->socket = socket;
624 incoming->mq = GNUNET_STREAM_mq_create (incoming->socket, handlers, NULL, incoming);
625 /* FIXME: timeout for peers that only connect but don't send anything */
626 GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
627 return GNUNET_OK;
628}
629
630
631/**
632 * Called to clean up, after a shutdown has been requested. 622 * Called to clean up, after a shutdown has been requested.
633 * 623 *
634 * @param cls closure 624 * @param cls closure
@@ -638,31 +628,126 @@ static void
638shutdown_task (void *cls, 628shutdown_task (void *cls,
639 const struct GNUNET_SCHEDULER_TaskContext *tc) 629 const struct GNUNET_SCHEDULER_TaskContext *tc)
640{ 630{
641 if (NULL != stream_listen_socket) 631 if (NULL != mesh)
642 { 632 {
643 GNUNET_STREAM_listen_close (stream_listen_socket); 633 GNUNET_MESH_disconnect (mesh);
644 stream_listen_socket = NULL; 634 mesh = NULL;
645 } 635 }
646 636
647 while (NULL != incoming_head) 637 while (NULL != incoming_head)
648 { 638 {
649 destroy_incoming (incoming_head); 639 incoming_destroy (incoming_head);
650 } 640 }
651 641
652 while (NULL != listeners_head) 642 while (NULL != listeners_head)
653 { 643 {
654 destroy_listener (listeners_head); 644 listener_destroy (listeners_head);
655 } 645 }
656 646
657 while (NULL != sets_head) 647 while (NULL != sets_head)
658 { 648 {
659 destroy_set (sets_head); 649 set_destroy (sets_head);
660 } 650 }
661 651
662 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); 652 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
663} 653}
664 654
665 655
656
657/**
658 * Signature of the main function of a task.
659 *
660 * @param cls closure
661 * @param tc context information (why was this task triggered now)
662 */
663static void
664incoming_timeout_cb (void *cls,
665 const struct GNUNET_SCHEDULER_TaskContext *tc)
666{
667 struct Incoming *incoming = cls;
668
669 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "remote peer timed out");
670 incoming_destroy (incoming);
671}
672
673
674/**
675 * Method called whenever another peer has added us to a tunnel
676 * the other peer initiated.
677 * Only called (once) upon reception of data with a message type which was
678 * subscribed to in GNUNET_MESH_connect. A call to GNUNET_MESH_tunnel_destroy
679 * causes te tunnel to be ignored and no further notifications are sent about
680 * the same tunnel.
681 *
682 * @param cls closure
683 * @param tunnel new handle to the tunnel
684 * @param initiator peer that started the tunnel
685 * @param port Port this tunnel is for.
686 * @return initial tunnel context for the tunnel
687 * (can be NULL -- that's not an error)
688 */
689static void *
690tunnel_new_cb (void *cls,
691 struct GNUNET_MESH_Tunnel *tunnel,
692 const struct GNUNET_PeerIdentity *initiator,
693 uint32_t port)
694{
695 struct Incoming *incoming;
696 struct TunnelContext *tc;
697
698 GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET);
699 tc = GNUNET_new (struct TunnelContext);
700 incoming = GNUNET_new (struct Incoming);
701 incoming->tc = tc;
702 tc->peer = *initiator;
703 tc->tunnel = tunnel;
704 tc->mq = GNUNET_MESH_mq_create (tunnel);
705 tc->data = incoming;
706 tc->type = CONTEXT_INCOMING;
707 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming);
708 GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
709
710 return tc;
711}
712
713
714/**
715 * Function called whenever a tunnel is destroyed. Should clean up
716 * any associated state. This function is NOT called if the client has
717 * explicitly asked for the tunnel to be destroyed using
718 * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
719 * the tunnel.
720 *
721 * @param cls closure (set from GNUNET_MESH_connect)
722 * @param tunnel connection to the other end (henceforth invalid)
723 * @param tunnel_ctx place where local state associated
724 * with the tunnel is stored
725 */
726static void
727tunnel_end_cb (void *cls,
728 const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx)
729{
730 struct TunnelContext *ctx = tunnel_ctx;
731
732 switch (ctx->type)
733 {
734 case CONTEXT_INCOMING:
735 incoming_destroy ((struct Incoming *) ctx->data);
736 break;
737 case CONTEXT_OPERATION_UNION:
738 _GSS_union_operation_destroy ((struct UnionEvaluateOperation *) ctx->data);
739 break;
740 case CONTEXT_OPERATION_INTERSECTION:
741 GNUNET_assert (0);
742 /* FIXME: cfuchs */
743 break;
744 default:
745 GNUNET_assert (0);
746 }
747
748}
749
750
666/** 751/**
667 * Function called by the service's run 752 * Function called by the service's run
668 * method to run service-specific setup code. 753 * method to run service-specific setup code.
@@ -686,16 +771,40 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
686 {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0}, 771 {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0},
687 {NULL, NULL, 0, 0} 772 {NULL, NULL, 0, 0}
688 }; 773 };
774 static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = {
775 {handle_p2p_operation_request,
776 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
777 /* messages for the union operation */
778 {_GSS_union_handle_p2p_message,
779 GNUNET_MESSAGE_TYPE_SET_P2P_IBF, 0},
780 {_GSS_union_handle_p2p_message,
781 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
782 {_GSS_union_handle_p2p_message,
783 GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0},
784 {_GSS_union_handle_p2p_message,
785 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
786 {_GSS_union_handle_p2p_message,
787 GNUNET_MESSAGE_TYPE_SET_P2P_SE, 0},
788 /* FIXME: messages for intersection operation */
789 {NULL, 0, 0}
790 };
791 static const uint32_t mesh_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0};
689 792
690 configuration = cfg; 793 configuration = cfg;
691 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); 794 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
795 &shutdown_task, NULL);
692 GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL); 796 GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
693 GNUNET_SERVER_add_handlers (server, server_handlers); 797 GNUNET_SERVER_add_handlers (server, server_handlers);
694 stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET,
695 &stream_listen_cb, NULL,
696 GNUNET_STREAM_OPTION_END);
697 798
698 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set service running\n"); 799 mesh = GNUNET_MESH_connect (cfg, NULL, tunnel_new_cb, tunnel_end_cb,
800 mesh_handlers, mesh_ports);
801 if (NULL == mesh)
802 {
803 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not connect to mesh\n");
804 return;
805 }
806
807 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "service started\n");
699} 808}
700 809
701 810
@@ -710,7 +819,8 @@ int
710main (int argc, char *const *argv) 819main (int argc, char *const *argv)
711{ 820{
712 int ret; 821 int ret;
713 ret = GNUNET_SERVICE_run (argc, argv, "set", GNUNET_SERVICE_OPTION_NONE, &run, NULL); 822 ret = GNUNET_SERVICE_run (argc, argv, "set",
823 GNUNET_SERVICE_OPTION_NONE, &run, NULL);
714 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n"); 824 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
715 return (GNUNET_OK == ret) ? 0 : 1; 825 return (GNUNET_OK == ret) ? 0 : 1;
716} 826}