aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-08-12 14:34:16 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-08-12 14:34:16 +0000
commita9e630d6cbc7f777f157a034b9ebf0c26c93fa68 (patch)
tree32daa00f1fe63ad1cd7dae6e35377ca0bacc8d0b
parente0af01758cb7eec5aea8e942c3d54a71b2c2100b (diff)
downloadgnunet-a9e630d6cbc7f777f157a034b9ebf0c26c93fa68.tar.gz
gnunet-a9e630d6cbc7f777f157a034b9ebf0c26c93fa68.zip
- listener re-connects transparently
- bugs
-rw-r--r--src/include/gnunet_set_service.h9
-rw-r--r--src/set/gnunet-service-set.c25
-rw-r--r--src/set/gnunet-service-set_union.c53
-rw-r--r--src/set/set.h2
-rw-r--r--src/set/set_api.c106
5 files changed, 146 insertions, 49 deletions
diff --git a/src/include/gnunet_set_service.h b/src/include/gnunet_set_service.h
index 43c481035..da6ba1301 100644
--- a/src/include/gnunet_set_service.h
+++ b/src/include/gnunet_set_service.h
@@ -208,8 +208,7 @@ typedef int (*GNUNET_SET_ElementIterator) (void *cls,
208 * @param other_peer the other peer 208 * @param other_peer the other peer
209 * @param context_msg message with application specific information from 209 * @param context_msg message with application specific information from
210 * the other peer 210 * the other peer
211 * @param request request from the other peer, use GNUNET_SET_accept 211 * @param request request from the other peer (never NULL), use GNUNET_SET_accept
212 * Will be NULL if the listener failed.
213 * to accept it, otherwise the request will be refused 212 * to accept it, otherwise the request will be refused
214 * Note that we can't just return value from the listen callback, 213 * Note that we can't just return value from the listen callback,
215 * as it is also necessary to specify the set we want to do the 214 * as it is also necessary to specify the set we want to do the
@@ -315,7 +314,9 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
315 314
316 315
317/** 316/**
318 * Wait for set operation requests for the given application id 317 * Wait for set operation requests for the given application ID.
318 * If the connection to the set service is lost, the listener is
319 * re-created transparently with exponential backoff.
319 * 320 *
320 * @param cfg configuration to use for connecting to 321 * @param cfg configuration to use for connecting to
321 * the set service 322 * the set service
@@ -336,6 +337,8 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
336 337
337/** 338/**
338 * Cancel the given listen operation. 339 * Cancel the given listen operation.
340 * After calling cancel, the listen callback for this listen handle
341 * will not be called again.
339 * 342 *
340 * @param lh handle for the listen operation 343 * @param lh handle for the listen operation
341 */ 344 */
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index 443694d08..cfa0ab711 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -442,8 +442,8 @@ handle_incoming_msg (struct OperationState *op,
442 } 442 }
443 443
444 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received P2P operation request (op %u, app %s)\n", 444 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received P2P operation request (op %u, app %s)\n",
445 ntohs (msg->operation), GNUNET_h2s (&msg->app_id)); 445 ntohl (msg->operation), GNUNET_h2s (&msg->app_id));
446 listener = listener_get_by_target (ntohs (msg->operation), &msg->app_id); 446 listener = listener_get_by_target (ntohl (msg->operation), &msg->app_id);
447 if (NULL == listener) 447 if (NULL == listener)
448 { 448 {
449 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 449 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -477,6 +477,7 @@ handle_client_iterate (void *cls,
477 return; 477 return;
478 } 478 }
479 479
480 GNUNET_SERVER_receive_done (client, GNUNET_OK);
480 set->vt->iterate (set); 481 set->vt->iterate (set);
481} 482}
482 483
@@ -557,21 +558,30 @@ handle_client_listen (void *cls,
557 listener->client = client; 558 listener->client = client;
558 listener->client_mq = GNUNET_MQ_queue_for_server_client (client); 559 listener->client_mq = GNUNET_MQ_queue_for_server_client (client);
559 listener->app_id = msg->app_id; 560 listener->app_id = msg->app_id;
560 listener->operation = ntohs (msg->operation); 561 listener->operation = ntohl (msg->operation);
561 GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); 562 GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener);
562 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new listener created (op %u, app %s)\n", 563 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new listener created (op %u, app %s)\n",
563 listener->operation, GNUNET_h2s (&listener->app_id)); 564 listener->operation, GNUNET_h2s (&listener->app_id));
564 for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) 565 for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
565 { 566 {
566 if ( (NULL == incoming->spec) || 567 if (NULL == incoming->spec)
567 (0 != incoming->suggest_id) ) 568 {
569 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request has no spec yet\n");
570 continue;
571 }
572 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considering (op: %u, app: %s, suggest: %u)\n",
573 incoming->spec->operation, GNUNET_h2s (&incoming->spec->app_id), incoming->suggest_id);
574
575 if (0 != incoming->suggest_id)
568 continue; 576 continue;
569 if (listener->operation != incoming->spec->operation) 577 if (listener->operation != incoming->spec->operation)
570 continue; 578 continue;
571 if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, &incoming->spec->app_id)) 579 if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, &incoming->spec->app_id))
572 continue; 580 continue;
581 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request suggested\n");
573 incoming_suggest (incoming, listener); 582 incoming_suggest (incoming, listener);
574 } 583 }
584 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considered all incoming requests\n");
575 GNUNET_SERVER_receive_done (client, GNUNET_OK); 585 GNUNET_SERVER_receive_done (client, GNUNET_OK);
576} 586}
577 587
@@ -942,8 +952,9 @@ dispatch_p2p_message (void *cls,
942 952
943 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dispatching mesh message (type: %u)\n", 953 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dispatching mesh message (type: %u)\n",
944 ntohs (message->type)); 954 ntohs (message->type));
945 ret = tc->vt->msg_handler (tc->op, message); 955 /* FIXME: do this before or after the handler? */
946 GNUNET_MESH_receive_done (tunnel); 956 GNUNET_MESH_receive_done (tunnel);
957 ret = tc->vt->msg_handler (tc->op, message);
947 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled mesh message (type: %u)\n", 958 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled mesh message (type: %u)\n",
948 ntohs (message->type)); 959 ntohs (message->type));
949 return ret; 960 return ret;
@@ -1023,7 +1034,7 @@ main (int argc, char *const *argv)
1023 int ret; 1034 int ret;
1024 ret = GNUNET_SERVICE_run (argc, argv, "set", 1035 ret = GNUNET_SERVICE_run (argc, argv, "set",
1025 GNUNET_SERVICE_OPTION_NONE, &run, NULL); 1036 GNUNET_SERVICE_OPTION_NONE, &run, NULL);
1026 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n"); 1037 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
1027 return (GNUNET_OK == ret) ? 0 : 1; 1038 return (GNUNET_OK == ret) ? 0 : 1;
1028} 1039}
1029 1040
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index 644f975b6..1980cdacb 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -147,6 +147,8 @@ struct OperationState
147 147
148 /** 148 /**
149 * Maps IBF-Keys (specific to the current salt) to elements. 149 * Maps IBF-Keys (specific to the current salt) to elements.
150 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
151 * Colliding IBF-Keys are linked.
150 */ 152 */
151 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; 153 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
152 154
@@ -493,7 +495,7 @@ send_operation_request (struct OperationState *eo)
493 GNUNET_SERVER_client_disconnect (eo->spec->set->client); 495 GNUNET_SERVER_client_disconnect (eo->spec->set->client);
494 return; 496 return;
495 } 497 }
496 msg->operation = htons (GNUNET_SET_OPERATION_UNION); 498 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
497 msg->app_id = eo->spec->app_id; 499 msg->app_id = eo->spec->app_id;
498 msg->salt = htonl (eo->spec->salt); 500 msg->salt = htonl (eo->spec->salt);
499 GNUNET_MQ_send (eo->mq, ev); 501 GNUNET_MQ_send (eo->mq, ev);
@@ -524,7 +526,7 @@ send_operation_request (struct OperationState *eo)
524 * GNUNET_NO if not. 526 * GNUNET_NO if not.
525 */ 527 */
526static int 528static int
527insert_element_iterator (void *cls, 529op_register_element_iterator (void *cls,
528 uint32_t key, 530 uint32_t key,
529 void *value) 531 void *value)
530{ 532{
@@ -549,12 +551,16 @@ insert_element_iterator (void *cls,
549/** 551/**
550 * Insert an element into the union operation's 552 * Insert an element into the union operation's
551 * key-to-element mapping. Takes ownership of 'ee'. 553 * key-to-element mapping. Takes ownership of 'ee'.
554 * Note that this does not insert the element in the set,
555 * only in the operation's key-element mapping.
556 * This is done to speed up re-tried operations, if some elements
557 * were transmitted, and then the IBF fails to decode.
552 * 558 *
553 * @param eo the union operation 559 * @param eo the union operation
554 * @param ee the element entry 560 * @param ee the element entry
555 */ 561 */
556static void 562static void
557insert_element (struct OperationState *eo, struct ElementEntry *ee) 563op_register_element (struct OperationState *eo, struct ElementEntry *ee)
558{ 564{
559 int ret; 565 int ret;
560 struct IBF_Key ibf_key; 566 struct IBF_Key ibf_key;
@@ -566,14 +572,14 @@ insert_element (struct OperationState *eo, struct ElementEntry *ee)
566 k->ibf_key = ibf_key; 572 k->ibf_key = ibf_key;
567 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, 573 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
568 (uint32_t) ibf_key.key_val, 574 (uint32_t) ibf_key.key_val,
569 insert_element_iterator, k); 575 op_register_element_iterator, k);
570 576
571 /* was the element inserted into a colliding bucket? */ 577 /* was the element inserted into a colliding bucket? */
572 if (GNUNET_SYSERR == ret) 578 if (GNUNET_SYSERR == ret)
573 return; 579 return;
574 580
575 GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, 581 GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
576 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 582 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
577} 583}
578 584
579 585
@@ -623,7 +629,7 @@ init_key_to_element_iterator (void *cls,
623 629
624 e->remote = GNUNET_NO; 630 e->remote = GNUNET_NO;
625 631
626 insert_element (eo, e); 632 op_register_element (eo, e);
627 return GNUNET_YES; 633 return GNUNET_YES;
628} 634}
629 635
@@ -861,27 +867,32 @@ decode_and_send (struct OperationState *eo)
861 ibf_destroy (eo->remote_ibf); 867 ibf_destroy (eo->remote_ibf);
862 eo->remote_ibf = NULL; 868 eo->remote_ibf = NULL;
863 869
864 num_decoded = 0;
865
866 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size); 870 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size);
867 871
872 num_decoded = 0;
873 last_key.key_val = 0;
874
868 while (1) 875 while (1)
869 { 876 {
870 int res; 877 int res;
878 int cycle_detected = GNUNET_NO;
871 879
872 if (num_decoded > 0) 880 last_key = key;
873 last_key = key;
874 881
875 res = ibf_decode (diff_ibf, &side, &key); 882 res = ibf_decode (diff_ibf, &side, &key);
876 if (res == GNUNET_OK) 883 if (res == GNUNET_OK)
884 {
877 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n", 885 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n",
878 key.key_val); 886 key.key_val);
879 num_decoded += 1; 887 num_decoded += 1;
880 if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val)) 888 if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val))
881 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n", 889 {
882 num_decoded, diff_ibf->size); 890 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n",
883 if ((GNUNET_SYSERR == res) || (num_decoded > diff_ibf->size) || 891 num_decoded, diff_ibf->size);
884 (num_decoded > 1 && last_key.key_val == key.key_val)) 892 cycle_detected = GNUNET_YES;
893 }
894 }
895 if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected))
885 { 896 {
886 int next_order; 897 int next_order;
887 next_order = 0; 898 next_order = 0;
@@ -922,6 +933,8 @@ decode_and_send (struct OperationState *eo)
922 933
923 /* FIXME: before sending the request, check if we may just have the element */ 934 /* FIXME: before sending the request, check if we may just have the element */
924 /* FIXME: merge multiple requests */ 935 /* FIXME: merge multiple requests */
936 /* FIXME: remember somewhere that we already requested the element,
937 * so that we don't request it again with the next ibf if decoding fails */
925 ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), 938 ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
926 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); 939 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
927 940
@@ -1089,7 +1102,9 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1089 ee->remote = GNUNET_YES; 1102 ee->remote = GNUNET_YES;
1090 GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash); 1103 GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash);
1091 1104
1092 insert_element (eo, ee); 1105 /* FIXME: see if the element has already been inserted! */
1106
1107 op_register_element (eo, ee);
1093 send_client_element (eo, &ee->element); 1108 send_client_element (eo, &ee->element);
1094} 1109}
1095 1110
@@ -1386,6 +1401,8 @@ int
1386union_handle_p2p_message (struct OperationState *eo, 1401union_handle_p2p_message (struct OperationState *eo,
1387 const struct GNUNET_MessageHeader *mh) 1402 const struct GNUNET_MessageHeader *mh)
1388{ 1403{
1404 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
1405 ntohs (mh->type), ntohs (mh->size));
1389 switch (ntohs (mh->type)) 1406 switch (ntohs (mh->type))
1390 { 1407 {
1391 case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: 1408 case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
@@ -1490,6 +1507,8 @@ union_iterate (struct Set *set)
1490{ 1507{
1491 struct GNUNET_MQ_Envelope *ev; 1508 struct GNUNET_MQ_Envelope *ev;
1492 1509
1510 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "iterating union set with %u elements\n",
1511 GNUNET_CONTAINER_multihashmap_size (set->state->elements));
1493 GNUNET_CONTAINER_multihashmap_iterate (set->state->elements, send_iter_element_iter, set); 1512 GNUNET_CONTAINER_multihashmap_iterate (set->state->elements, send_iter_element_iter, set);
1494 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE); 1513 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
1495 GNUNET_MQ_send (set->client_mq, ev); 1514 GNUNET_MQ_send (set->client_mq, ev);
diff --git a/src/set/set.h b/src/set/set.h
index 9f6eb3642..ec8390448 100644
--- a/src/set/set.h
+++ b/src/set/set.h
@@ -59,7 +59,7 @@ struct GNUNET_SET_ListenMessage
59 /** 59 /**
60 * Operation type, values of enum GNUNET_SET_OperationType 60 * Operation type, values of enum GNUNET_SET_OperationType
61 */ 61 */
62 uint16_t operation GNUNET_PACKED; 62 uint32_t operation GNUNET_PACKED;
63 63
64 /** 64 /**
65 * application id 65 * application id
diff --git a/src/set/set_api.c b/src/set/set_api.c
index 3e317a939..7f52fb0a9 100644
--- a/src/set/set_api.c
+++ b/src/set/set_api.c
@@ -169,6 +169,13 @@ struct GNUNET_SET_ListenHandle
169 struct GNUNET_MQ_Handle* mq; 169 struct GNUNET_MQ_Handle* mq;
170 170
171 /** 171 /**
172 * Configuration handle for the listener, stored
173 * here to be able to reconnect transparently on
174 * connection failure.
175 */
176 const struct GNUNET_CONFIGURATION_Handle *cfg;
177
178 /**
172 * Function to call on a new incoming request, 179 * Function to call on a new incoming request,
173 * or on error. 180 * or on error.
174 */ 181 */
@@ -178,9 +185,30 @@ struct GNUNET_SET_ListenHandle
178 * Closure for listen_cb. 185 * Closure for listen_cb.
179 */ 186 */
180 void *listen_cls; 187 void *listen_cls;
188
189 /**
190 * Operation we listen for.
191 */
192 enum GNUNET_SET_OperationType operation;
193
194 /**
195 * Application ID we listen for.
196 */
197 struct GNUNET_HashCode app_id;
198
199 /**
200 * Time to wait until we try to reconnect on failure.
201 */
202 struct GNUNET_TIME_Relative reconnect_backoff;
181}; 203};
182 204
183 205
206/* forward declaration */
207static void
208listen_connect (void *cls,
209 const struct GNUNET_SCHEDULER_TaskContext *tc);
210
211
184/** 212/**
185 * Handle element for iteration over the set. 213 * Handle element for iteration over the set.
186 * 214 *
@@ -198,7 +226,8 @@ handle_iter_element (void *cls, const struct GNUNET_MessageHeader *mh)
198 if (NULL == set->iterator) 226 if (NULL == set->iterator)
199 return; 227 return;
200 228
201 element.type = htons (mh->type); 229 element.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_IterResponseMessage);
230 element.type = htons (msg->element_type);
202 element.data = &msg[1]; 231 element.data = &msg[1];
203 set->iterator (set->iterator_cls, &element); 232 set->iterator (set->iterator_cls, &element);
204} 233}
@@ -266,6 +295,7 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
266 oh->result_cb (oh->result_cls, &e, result_status); 295 oh->result_cb (oh->result_cls, &e, result_status);
267} 296}
268 297
298
269/** 299/**
270 * Handle request message for a listen operation 300 * Handle request message for a listen operation
271 * 301 *
@@ -297,9 +327,9 @@ handle_request (void *cls, const struct GNUNET_MessageHeader *mh)
297 amsg->request_id = htonl (0); 327 amsg->request_id = htonl (0);
298 amsg->accept_reject_id = msg->accept_id; 328 amsg->accept_reject_id = msg->accept_id;
299 GNUNET_MQ_send (lh->mq, mqm); 329 GNUNET_MQ_send (lh->mq, mqm);
300 GNUNET_free (req);
301 LOG (GNUNET_ERROR_TYPE_DEBUG, "rejecting request\n"); 330 LOG (GNUNET_ERROR_TYPE_DEBUG, "rejecting request\n");
302 } 331 }
332 GNUNET_free (req);
303 333
304 LOG (GNUNET_ERROR_TYPE_DEBUG, "processed op request from service\n"); 334 LOG (GNUNET_ERROR_TYPE_DEBUG, "processed op request from service\n");
305 335
@@ -313,8 +343,14 @@ handle_client_listener_error (void *cls, enum GNUNET_MQ_Error error)
313{ 343{
314 struct GNUNET_SET_ListenHandle *lh = cls; 344 struct GNUNET_SET_ListenHandle *lh = cls;
315 345
316 /* FIXME: why do you do this? */ 346 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listener broke down, re-connecting\n");
317 lh->listen_cb (lh->listen_cls, NULL, NULL, NULL); 347 GNUNET_CLIENT_disconnect (lh->client);
348 lh->client = NULL;
349 GNUNET_MQ_destroy (lh->mq);
350 lh->mq = NULL;
351
352 GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff, listen_connect, lh);
353 lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
318} 354}
319 355
320 356
@@ -465,6 +501,7 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
465 set->client = NULL; 501 set->client = NULL;
466 GNUNET_MQ_destroy (set->mq); 502 GNUNET_MQ_destroy (set->mq);
467 set->mq = NULL; 503 set->mq = NULL;
504 GNUNET_free (set);
468} 505}
469 506
470 507
@@ -514,11 +551,49 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
514 return oh; 551 return oh;
515} 552}
516 553
554
555/**
556 * Connect to the set service in order to listen
557 * for request.
558 *
559 * @param cls the listen handle to connect
560 * @param tc task context if invoked as a task, NULL otherwise
561 */
562static void
563listen_connect (void *cls,
564 const struct GNUNET_SCHEDULER_TaskContext *tc)
565{
566 struct GNUNET_MQ_Envelope *mqm;
567 struct GNUNET_SET_ListenMessage *msg;
568 struct GNUNET_SET_ListenHandle *lh = cls;
569 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
570 {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST},
571 GNUNET_MQ_HANDLERS_END
572 };
573
574 GNUNET_assert (NULL == lh->client);
575 lh->client = GNUNET_CLIENT_connect ("set", lh->cfg);
576 if (NULL == lh->client)
577 {
578 LOG (GNUNET_ERROR_TYPE_ERROR,
579 "could not connect to set (wrong configuration?), giving up listening\n");
580 return;
581 }
582 GNUNET_assert (NULL == lh->mq);
583 lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers,
584 handle_client_listener_error, lh);
585 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
586 msg->operation = htonl (lh->operation);
587 msg->app_id = lh->app_id;
588 GNUNET_MQ_send (lh->mq, mqm);
589}
590
591
517/** 592/**
518 * Wait for set operation requests for the given application id 593 * Wait for set operation requests for the given application id
519 * 594 *
520 * @param cfg configuration to use for connecting to 595 * @param cfg configuration to use for connecting to
521 * the set service 596 * the set service, needs to be valid for the lifetime of the listen handle
522 * @param operation operation we want to listen for 597 * @param operation operation we want to listen for
523 * @param app_id id of the application that handles set operation requests 598 * @param app_id id of the application that handles set operation requests
524 * @param listen_cb called for each incoming request matching the operation 599 * @param listen_cb called for each incoming request matching the operation
@@ -534,25 +609,15 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
534 void *listen_cls) 609 void *listen_cls)
535{ 610{
536 struct GNUNET_SET_ListenHandle *lh; 611 struct GNUNET_SET_ListenHandle *lh;
537 struct GNUNET_MQ_Envelope *mqm;
538 struct GNUNET_SET_ListenMessage *msg;
539 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
540 {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST},
541 GNUNET_MQ_HANDLERS_END
542 };
543 612
544 lh = GNUNET_new (struct GNUNET_SET_ListenHandle); 613 lh = GNUNET_new (struct GNUNET_SET_ListenHandle);
545 lh->client = GNUNET_CLIENT_connect ("set", cfg);
546 lh->listen_cb = listen_cb; 614 lh->listen_cb = listen_cb;
547 lh->listen_cls = listen_cls; 615 lh->listen_cls = listen_cls;
548 GNUNET_assert (NULL != lh->client); 616 lh->cfg = cfg;
549 lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers, 617 lh->operation = operation;
550 handle_client_listener_error, lh); 618 lh->app_id = *app_id;
551 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN); 619 lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
552 msg->operation = htons (operation); 620 listen_connect (lh, NULL);
553 msg->app_id = *app_id;
554 GNUNET_MQ_send (lh->mq, mqm);
555
556 return lh; 621 return lh;
557} 622}
558 623
@@ -680,7 +745,6 @@ GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
680} 745}
681 746
682 747
683
684/** 748/**
685 * Iterate over all elements in the given set. 749 * Iterate over all elements in the given set.
686 * Note that this operation involves transferring every element of the set 750 * Note that this operation involves transferring every element of the set