diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-08-12 14:34:16 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-08-12 14:34:16 +0000 |
commit | a9e630d6cbc7f777f157a034b9ebf0c26c93fa68 (patch) | |
tree | 32daa00f1fe63ad1cd7dae6e35377ca0bacc8d0b | |
parent | e0af01758cb7eec5aea8e942c3d54a71b2c2100b (diff) | |
download | gnunet-a9e630d6cbc7f777f157a034b9ebf0c26c93fa68.tar.gz gnunet-a9e630d6cbc7f777f157a034b9ebf0c26c93fa68.zip |
- listener re-connects transparently
- bugs
-rw-r--r-- | src/include/gnunet_set_service.h | 9 | ||||
-rw-r--r-- | src/set/gnunet-service-set.c | 25 | ||||
-rw-r--r-- | src/set/gnunet-service-set_union.c | 53 | ||||
-rw-r--r-- | src/set/set.h | 2 | ||||
-rw-r--r-- | src/set/set_api.c | 106 |
5 files changed, 146 insertions, 49 deletions
diff --git a/src/include/gnunet_set_service.h b/src/include/gnunet_set_service.h index 43c481035..da6ba1301 100644 --- a/src/include/gnunet_set_service.h +++ b/src/include/gnunet_set_service.h | |||
@@ -208,8 +208,7 @@ typedef int (*GNUNET_SET_ElementIterator) (void *cls, | |||
208 | * @param other_peer the other peer | 208 | * @param other_peer the other peer |
209 | * @param context_msg message with application specific information from | 209 | * @param context_msg message with application specific information from |
210 | * the other peer | 210 | * the other peer |
211 | * @param request request from the other peer, use GNUNET_SET_accept | 211 | * @param request request from the other peer (never NULL), use GNUNET_SET_accept |
212 | * Will be NULL if the listener failed. | ||
213 | * to accept it, otherwise the request will be refused | 212 | * to accept it, otherwise the request will be refused |
214 | * Note that we can't just return value from the listen callback, | 213 | * Note that we can't just return value from the listen callback, |
215 | * as it is also necessary to specify the set we want to do the | 214 | * as it is also necessary to specify the set we want to do the |
@@ -315,7 +314,9 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer, | |||
315 | 314 | ||
316 | 315 | ||
317 | /** | 316 | /** |
318 | * Wait for set operation requests for the given application id | 317 | * Wait for set operation requests for the given application ID. |
318 | * If the connection to the set service is lost, the listener is | ||
319 | * re-created transparently with exponential backoff. | ||
319 | * | 320 | * |
320 | * @param cfg configuration to use for connecting to | 321 | * @param cfg configuration to use for connecting to |
321 | * the set service | 322 | * the set service |
@@ -336,6 +337,8 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
336 | 337 | ||
337 | /** | 338 | /** |
338 | * Cancel the given listen operation. | 339 | * Cancel the given listen operation. |
340 | * After calling cancel, the listen callback for this listen handle | ||
341 | * will not be called again. | ||
339 | * | 342 | * |
340 | * @param lh handle for the listen operation | 343 | * @param lh handle for the listen operation |
341 | */ | 344 | */ |
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index 443694d08..cfa0ab711 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c | |||
@@ -442,8 +442,8 @@ handle_incoming_msg (struct OperationState *op, | |||
442 | } | 442 | } |
443 | 443 | ||
444 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received P2P operation request (op %u, app %s)\n", | 444 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received P2P operation request (op %u, app %s)\n", |
445 | ntohs (msg->operation), GNUNET_h2s (&msg->app_id)); | 445 | ntohl (msg->operation), GNUNET_h2s (&msg->app_id)); |
446 | listener = listener_get_by_target (ntohs (msg->operation), &msg->app_id); | 446 | listener = listener_get_by_target (ntohl (msg->operation), &msg->app_id); |
447 | if (NULL == listener) | 447 | if (NULL == listener) |
448 | { | 448 | { |
449 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 449 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -477,6 +477,7 @@ handle_client_iterate (void *cls, | |||
477 | return; | 477 | return; |
478 | } | 478 | } |
479 | 479 | ||
480 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
480 | set->vt->iterate (set); | 481 | set->vt->iterate (set); |
481 | } | 482 | } |
482 | 483 | ||
@@ -557,21 +558,30 @@ handle_client_listen (void *cls, | |||
557 | listener->client = client; | 558 | listener->client = client; |
558 | listener->client_mq = GNUNET_MQ_queue_for_server_client (client); | 559 | listener->client_mq = GNUNET_MQ_queue_for_server_client (client); |
559 | listener->app_id = msg->app_id; | 560 | listener->app_id = msg->app_id; |
560 | listener->operation = ntohs (msg->operation); | 561 | listener->operation = ntohl (msg->operation); |
561 | GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); | 562 | GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); |
562 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new listener created (op %u, app %s)\n", | 563 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new listener created (op %u, app %s)\n", |
563 | listener->operation, GNUNET_h2s (&listener->app_id)); | 564 | listener->operation, GNUNET_h2s (&listener->app_id)); |
564 | for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) | 565 | for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) |
565 | { | 566 | { |
566 | if ( (NULL == incoming->spec) || | 567 | if (NULL == incoming->spec) |
567 | (0 != incoming->suggest_id) ) | 568 | { |
569 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request has no spec yet\n"); | ||
570 | continue; | ||
571 | } | ||
572 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considering (op: %u, app: %s, suggest: %u)\n", | ||
573 | incoming->spec->operation, GNUNET_h2s (&incoming->spec->app_id), incoming->suggest_id); | ||
574 | |||
575 | if (0 != incoming->suggest_id) | ||
568 | continue; | 576 | continue; |
569 | if (listener->operation != incoming->spec->operation) | 577 | if (listener->operation != incoming->spec->operation) |
570 | continue; | 578 | continue; |
571 | if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, &incoming->spec->app_id)) | 579 | if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, &incoming->spec->app_id)) |
572 | continue; | 580 | continue; |
581 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request suggested\n"); | ||
573 | incoming_suggest (incoming, listener); | 582 | incoming_suggest (incoming, listener); |
574 | } | 583 | } |
584 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considered all incoming requests\n"); | ||
575 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 585 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
576 | } | 586 | } |
577 | 587 | ||
@@ -942,8 +952,9 @@ dispatch_p2p_message (void *cls, | |||
942 | 952 | ||
943 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dispatching mesh message (type: %u)\n", | 953 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dispatching mesh message (type: %u)\n", |
944 | ntohs (message->type)); | 954 | ntohs (message->type)); |
945 | ret = tc->vt->msg_handler (tc->op, message); | 955 | /* FIXME: do this before or after the handler? */ |
946 | GNUNET_MESH_receive_done (tunnel); | 956 | GNUNET_MESH_receive_done (tunnel); |
957 | ret = tc->vt->msg_handler (tc->op, message); | ||
947 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled mesh message (type: %u)\n", | 958 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled mesh message (type: %u)\n", |
948 | ntohs (message->type)); | 959 | ntohs (message->type)); |
949 | return ret; | 960 | return ret; |
@@ -1023,7 +1034,7 @@ main (int argc, char *const *argv) | |||
1023 | int ret; | 1034 | int ret; |
1024 | ret = GNUNET_SERVICE_run (argc, argv, "set", | 1035 | ret = GNUNET_SERVICE_run (argc, argv, "set", |
1025 | GNUNET_SERVICE_OPTION_NONE, &run, NULL); | 1036 | GNUNET_SERVICE_OPTION_NONE, &run, NULL); |
1026 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n"); | 1037 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret); |
1027 | return (GNUNET_OK == ret) ? 0 : 1; | 1038 | return (GNUNET_OK == ret) ? 0 : 1; |
1028 | } | 1039 | } |
1029 | 1040 | ||
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 644f975b6..1980cdacb 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -147,6 +147,8 @@ struct OperationState | |||
147 | 147 | ||
148 | /** | 148 | /** |
149 | * Maps IBF-Keys (specific to the current salt) to elements. | 149 | * Maps IBF-Keys (specific to the current salt) to elements. |
150 | * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key. | ||
151 | * Colliding IBF-Keys are linked. | ||
150 | */ | 152 | */ |
151 | struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; | 153 | struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; |
152 | 154 | ||
@@ -493,7 +495,7 @@ send_operation_request (struct OperationState *eo) | |||
493 | GNUNET_SERVER_client_disconnect (eo->spec->set->client); | 495 | GNUNET_SERVER_client_disconnect (eo->spec->set->client); |
494 | return; | 496 | return; |
495 | } | 497 | } |
496 | msg->operation = htons (GNUNET_SET_OPERATION_UNION); | 498 | msg->operation = htonl (GNUNET_SET_OPERATION_UNION); |
497 | msg->app_id = eo->spec->app_id; | 499 | msg->app_id = eo->spec->app_id; |
498 | msg->salt = htonl (eo->spec->salt); | 500 | msg->salt = htonl (eo->spec->salt); |
499 | GNUNET_MQ_send (eo->mq, ev); | 501 | GNUNET_MQ_send (eo->mq, ev); |
@@ -524,7 +526,7 @@ send_operation_request (struct OperationState *eo) | |||
524 | * GNUNET_NO if not. | 526 | * GNUNET_NO if not. |
525 | */ | 527 | */ |
526 | static int | 528 | static int |
527 | insert_element_iterator (void *cls, | 529 | op_register_element_iterator (void *cls, |
528 | uint32_t key, | 530 | uint32_t key, |
529 | void *value) | 531 | void *value) |
530 | { | 532 | { |
@@ -549,12 +551,16 @@ insert_element_iterator (void *cls, | |||
549 | /** | 551 | /** |
550 | * Insert an element into the union operation's | 552 | * Insert an element into the union operation's |
551 | * key-to-element mapping. Takes ownership of 'ee'. | 553 | * key-to-element mapping. Takes ownership of 'ee'. |
554 | * Note that this does not insert the element in the set, | ||
555 | * only in the operation's key-element mapping. | ||
556 | * This is done to speed up re-tried operations, if some elements | ||
557 | * were transmitted, and then the IBF fails to decode. | ||
552 | * | 558 | * |
553 | * @param eo the union operation | 559 | * @param eo the union operation |
554 | * @param ee the element entry | 560 | * @param ee the element entry |
555 | */ | 561 | */ |
556 | static void | 562 | static void |
557 | insert_element (struct OperationState *eo, struct ElementEntry *ee) | 563 | op_register_element (struct OperationState *eo, struct ElementEntry *ee) |
558 | { | 564 | { |
559 | int ret; | 565 | int ret; |
560 | struct IBF_Key ibf_key; | 566 | struct IBF_Key ibf_key; |
@@ -566,14 +572,14 @@ insert_element (struct OperationState *eo, struct ElementEntry *ee) | |||
566 | k->ibf_key = ibf_key; | 572 | k->ibf_key = ibf_key; |
567 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, | 573 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, |
568 | (uint32_t) ibf_key.key_val, | 574 | (uint32_t) ibf_key.key_val, |
569 | insert_element_iterator, k); | 575 | op_register_element_iterator, k); |
570 | 576 | ||
571 | /* was the element inserted into a colliding bucket? */ | 577 | /* was the element inserted into a colliding bucket? */ |
572 | if (GNUNET_SYSERR == ret) | 578 | if (GNUNET_SYSERR == ret) |
573 | return; | 579 | return; |
574 | 580 | ||
575 | GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, | 581 | GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, |
576 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | 582 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
577 | } | 583 | } |
578 | 584 | ||
579 | 585 | ||
@@ -623,7 +629,7 @@ init_key_to_element_iterator (void *cls, | |||
623 | 629 | ||
624 | e->remote = GNUNET_NO; | 630 | e->remote = GNUNET_NO; |
625 | 631 | ||
626 | insert_element (eo, e); | 632 | op_register_element (eo, e); |
627 | return GNUNET_YES; | 633 | return GNUNET_YES; |
628 | } | 634 | } |
629 | 635 | ||
@@ -861,27 +867,32 @@ decode_and_send (struct OperationState *eo) | |||
861 | ibf_destroy (eo->remote_ibf); | 867 | ibf_destroy (eo->remote_ibf); |
862 | eo->remote_ibf = NULL; | 868 | eo->remote_ibf = NULL; |
863 | 869 | ||
864 | num_decoded = 0; | ||
865 | |||
866 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size); | 870 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size); |
867 | 871 | ||
872 | num_decoded = 0; | ||
873 | last_key.key_val = 0; | ||
874 | |||
868 | while (1) | 875 | while (1) |
869 | { | 876 | { |
870 | int res; | 877 | int res; |
878 | int cycle_detected = GNUNET_NO; | ||
871 | 879 | ||
872 | if (num_decoded > 0) | 880 | last_key = key; |
873 | last_key = key; | ||
874 | 881 | ||
875 | res = ibf_decode (diff_ibf, &side, &key); | 882 | res = ibf_decode (diff_ibf, &side, &key); |
876 | if (res == GNUNET_OK) | 883 | if (res == GNUNET_OK) |
884 | { | ||
877 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n", | 885 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n", |
878 | key.key_val); | 886 | key.key_val); |
879 | num_decoded += 1; | 887 | num_decoded += 1; |
880 | if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val)) | 888 | if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val)) |
881 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n", | 889 | { |
882 | num_decoded, diff_ibf->size); | 890 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n", |
883 | if ((GNUNET_SYSERR == res) || (num_decoded > diff_ibf->size) || | 891 | num_decoded, diff_ibf->size); |
884 | (num_decoded > 1 && last_key.key_val == key.key_val)) | 892 | cycle_detected = GNUNET_YES; |
893 | } | ||
894 | } | ||
895 | if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected)) | ||
885 | { | 896 | { |
886 | int next_order; | 897 | int next_order; |
887 | next_order = 0; | 898 | next_order = 0; |
@@ -922,6 +933,8 @@ decode_and_send (struct OperationState *eo) | |||
922 | 933 | ||
923 | /* FIXME: before sending the request, check if we may just have the element */ | 934 | /* FIXME: before sending the request, check if we may just have the element */ |
924 | /* FIXME: merge multiple requests */ | 935 | /* FIXME: merge multiple requests */ |
936 | /* FIXME: remember somewhere that we already requested the element, | ||
937 | * so that we don't request it again with the next ibf if decoding fails */ | ||
925 | ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), | 938 | ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), |
926 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); | 939 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); |
927 | 940 | ||
@@ -1089,7 +1102,9 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) | |||
1089 | ee->remote = GNUNET_YES; | 1102 | ee->remote = GNUNET_YES; |
1090 | GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash); | 1103 | GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash); |
1091 | 1104 | ||
1092 | insert_element (eo, ee); | 1105 | /* FIXME: see if the element has already been inserted! */ |
1106 | |||
1107 | op_register_element (eo, ee); | ||
1093 | send_client_element (eo, &ee->element); | 1108 | send_client_element (eo, &ee->element); |
1094 | } | 1109 | } |
1095 | 1110 | ||
@@ -1386,6 +1401,8 @@ int | |||
1386 | union_handle_p2p_message (struct OperationState *eo, | 1401 | union_handle_p2p_message (struct OperationState *eo, |
1387 | const struct GNUNET_MessageHeader *mh) | 1402 | const struct GNUNET_MessageHeader *mh) |
1388 | { | 1403 | { |
1404 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n", | ||
1405 | ntohs (mh->type), ntohs (mh->size)); | ||
1389 | switch (ntohs (mh->type)) | 1406 | switch (ntohs (mh->type)) |
1390 | { | 1407 | { |
1391 | case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: | 1408 | case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: |
@@ -1490,6 +1507,8 @@ union_iterate (struct Set *set) | |||
1490 | { | 1507 | { |
1491 | struct GNUNET_MQ_Envelope *ev; | 1508 | struct GNUNET_MQ_Envelope *ev; |
1492 | 1509 | ||
1510 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "iterating union set with %u elements\n", | ||
1511 | GNUNET_CONTAINER_multihashmap_size (set->state->elements)); | ||
1493 | GNUNET_CONTAINER_multihashmap_iterate (set->state->elements, send_iter_element_iter, set); | 1512 | GNUNET_CONTAINER_multihashmap_iterate (set->state->elements, send_iter_element_iter, set); |
1494 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE); | 1513 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE); |
1495 | GNUNET_MQ_send (set->client_mq, ev); | 1514 | GNUNET_MQ_send (set->client_mq, ev); |
diff --git a/src/set/set.h b/src/set/set.h index 9f6eb3642..ec8390448 100644 --- a/src/set/set.h +++ b/src/set/set.h | |||
@@ -59,7 +59,7 @@ struct GNUNET_SET_ListenMessage | |||
59 | /** | 59 | /** |
60 | * Operation type, values of enum GNUNET_SET_OperationType | 60 | * Operation type, values of enum GNUNET_SET_OperationType |
61 | */ | 61 | */ |
62 | uint16_t operation GNUNET_PACKED; | 62 | uint32_t operation GNUNET_PACKED; |
63 | 63 | ||
64 | /** | 64 | /** |
65 | * application id | 65 | * application id |
diff --git a/src/set/set_api.c b/src/set/set_api.c index 3e317a939..7f52fb0a9 100644 --- a/src/set/set_api.c +++ b/src/set/set_api.c | |||
@@ -169,6 +169,13 @@ struct GNUNET_SET_ListenHandle | |||
169 | struct GNUNET_MQ_Handle* mq; | 169 | struct GNUNET_MQ_Handle* mq; |
170 | 170 | ||
171 | /** | 171 | /** |
172 | * Configuration handle for the listener, stored | ||
173 | * here to be able to reconnect transparently on | ||
174 | * connection failure. | ||
175 | */ | ||
176 | const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
177 | |||
178 | /** | ||
172 | * Function to call on a new incoming request, | 179 | * Function to call on a new incoming request, |
173 | * or on error. | 180 | * or on error. |
174 | */ | 181 | */ |
@@ -178,9 +185,30 @@ struct GNUNET_SET_ListenHandle | |||
178 | * Closure for listen_cb. | 185 | * Closure for listen_cb. |
179 | */ | 186 | */ |
180 | void *listen_cls; | 187 | void *listen_cls; |
188 | |||
189 | /** | ||
190 | * Operation we listen for. | ||
191 | */ | ||
192 | enum GNUNET_SET_OperationType operation; | ||
193 | |||
194 | /** | ||
195 | * Application ID we listen for. | ||
196 | */ | ||
197 | struct GNUNET_HashCode app_id; | ||
198 | |||
199 | /** | ||
200 | * Time to wait until we try to reconnect on failure. | ||
201 | */ | ||
202 | struct GNUNET_TIME_Relative reconnect_backoff; | ||
181 | }; | 203 | }; |
182 | 204 | ||
183 | 205 | ||
206 | /* forward declaration */ | ||
207 | static void | ||
208 | listen_connect (void *cls, | ||
209 | const struct GNUNET_SCHEDULER_TaskContext *tc); | ||
210 | |||
211 | |||
184 | /** | 212 | /** |
185 | * Handle element for iteration over the set. | 213 | * Handle element for iteration over the set. |
186 | * | 214 | * |
@@ -198,7 +226,8 @@ handle_iter_element (void *cls, const struct GNUNET_MessageHeader *mh) | |||
198 | if (NULL == set->iterator) | 226 | if (NULL == set->iterator) |
199 | return; | 227 | return; |
200 | 228 | ||
201 | element.type = htons (mh->type); | 229 | element.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_IterResponseMessage); |
230 | element.type = htons (msg->element_type); | ||
202 | element.data = &msg[1]; | 231 | element.data = &msg[1]; |
203 | set->iterator (set->iterator_cls, &element); | 232 | set->iterator (set->iterator_cls, &element); |
204 | } | 233 | } |
@@ -266,6 +295,7 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh) | |||
266 | oh->result_cb (oh->result_cls, &e, result_status); | 295 | oh->result_cb (oh->result_cls, &e, result_status); |
267 | } | 296 | } |
268 | 297 | ||
298 | |||
269 | /** | 299 | /** |
270 | * Handle request message for a listen operation | 300 | * Handle request message for a listen operation |
271 | * | 301 | * |
@@ -297,9 +327,9 @@ handle_request (void *cls, const struct GNUNET_MessageHeader *mh) | |||
297 | amsg->request_id = htonl (0); | 327 | amsg->request_id = htonl (0); |
298 | amsg->accept_reject_id = msg->accept_id; | 328 | amsg->accept_reject_id = msg->accept_id; |
299 | GNUNET_MQ_send (lh->mq, mqm); | 329 | GNUNET_MQ_send (lh->mq, mqm); |
300 | GNUNET_free (req); | ||
301 | LOG (GNUNET_ERROR_TYPE_DEBUG, "rejecting request\n"); | 330 | LOG (GNUNET_ERROR_TYPE_DEBUG, "rejecting request\n"); |
302 | } | 331 | } |
332 | GNUNET_free (req); | ||
303 | 333 | ||
304 | LOG (GNUNET_ERROR_TYPE_DEBUG, "processed op request from service\n"); | 334 | LOG (GNUNET_ERROR_TYPE_DEBUG, "processed op request from service\n"); |
305 | 335 | ||
@@ -313,8 +343,14 @@ handle_client_listener_error (void *cls, enum GNUNET_MQ_Error error) | |||
313 | { | 343 | { |
314 | struct GNUNET_SET_ListenHandle *lh = cls; | 344 | struct GNUNET_SET_ListenHandle *lh = cls; |
315 | 345 | ||
316 | /* FIXME: why do you do this? */ | 346 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listener broke down, re-connecting\n"); |
317 | lh->listen_cb (lh->listen_cls, NULL, NULL, NULL); | 347 | GNUNET_CLIENT_disconnect (lh->client); |
348 | lh->client = NULL; | ||
349 | GNUNET_MQ_destroy (lh->mq); | ||
350 | lh->mq = NULL; | ||
351 | |||
352 | GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff, listen_connect, lh); | ||
353 | lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff); | ||
318 | } | 354 | } |
319 | 355 | ||
320 | 356 | ||
@@ -465,6 +501,7 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) | |||
465 | set->client = NULL; | 501 | set->client = NULL; |
466 | GNUNET_MQ_destroy (set->mq); | 502 | GNUNET_MQ_destroy (set->mq); |
467 | set->mq = NULL; | 503 | set->mq = NULL; |
504 | GNUNET_free (set); | ||
468 | } | 505 | } |
469 | 506 | ||
470 | 507 | ||
@@ -514,11 +551,49 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer, | |||
514 | return oh; | 551 | return oh; |
515 | } | 552 | } |
516 | 553 | ||
554 | |||
555 | /** | ||
556 | * Connect to the set service in order to listen | ||
557 | * for request. | ||
558 | * | ||
559 | * @param cls the listen handle to connect | ||
560 | * @param tc task context if invoked as a task, NULL otherwise | ||
561 | */ | ||
562 | static void | ||
563 | listen_connect (void *cls, | ||
564 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
565 | { | ||
566 | struct GNUNET_MQ_Envelope *mqm; | ||
567 | struct GNUNET_SET_ListenMessage *msg; | ||
568 | struct GNUNET_SET_ListenHandle *lh = cls; | ||
569 | static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { | ||
570 | {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST}, | ||
571 | GNUNET_MQ_HANDLERS_END | ||
572 | }; | ||
573 | |||
574 | GNUNET_assert (NULL == lh->client); | ||
575 | lh->client = GNUNET_CLIENT_connect ("set", lh->cfg); | ||
576 | if (NULL == lh->client) | ||
577 | { | ||
578 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
579 | "could not connect to set (wrong configuration?), giving up listening\n"); | ||
580 | return; | ||
581 | } | ||
582 | GNUNET_assert (NULL == lh->mq); | ||
583 | lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers, | ||
584 | handle_client_listener_error, lh); | ||
585 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN); | ||
586 | msg->operation = htonl (lh->operation); | ||
587 | msg->app_id = lh->app_id; | ||
588 | GNUNET_MQ_send (lh->mq, mqm); | ||
589 | } | ||
590 | |||
591 | |||
517 | /** | 592 | /** |
518 | * Wait for set operation requests for the given application id | 593 | * Wait for set operation requests for the given application id |
519 | * | 594 | * |
520 | * @param cfg configuration to use for connecting to | 595 | * @param cfg configuration to use for connecting to |
521 | * the set service | 596 | * the set service, needs to be valid for the lifetime of the listen handle |
522 | * @param operation operation we want to listen for | 597 | * @param operation operation we want to listen for |
523 | * @param app_id id of the application that handles set operation requests | 598 | * @param app_id id of the application that handles set operation requests |
524 | * @param listen_cb called for each incoming request matching the operation | 599 | * @param listen_cb called for each incoming request matching the operation |
@@ -534,25 +609,15 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
534 | void *listen_cls) | 609 | void *listen_cls) |
535 | { | 610 | { |
536 | struct GNUNET_SET_ListenHandle *lh; | 611 | struct GNUNET_SET_ListenHandle *lh; |
537 | struct GNUNET_MQ_Envelope *mqm; | ||
538 | struct GNUNET_SET_ListenMessage *msg; | ||
539 | static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { | ||
540 | {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST}, | ||
541 | GNUNET_MQ_HANDLERS_END | ||
542 | }; | ||
543 | 612 | ||
544 | lh = GNUNET_new (struct GNUNET_SET_ListenHandle); | 613 | lh = GNUNET_new (struct GNUNET_SET_ListenHandle); |
545 | lh->client = GNUNET_CLIENT_connect ("set", cfg); | ||
546 | lh->listen_cb = listen_cb; | 614 | lh->listen_cb = listen_cb; |
547 | lh->listen_cls = listen_cls; | 615 | lh->listen_cls = listen_cls; |
548 | GNUNET_assert (NULL != lh->client); | 616 | lh->cfg = cfg; |
549 | lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers, | 617 | lh->operation = operation; |
550 | handle_client_listener_error, lh); | 618 | lh->app_id = *app_id; |
551 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN); | 619 | lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; |
552 | msg->operation = htons (operation); | 620 | listen_connect (lh, NULL); |
553 | msg->app_id = *app_id; | ||
554 | GNUNET_MQ_send (lh->mq, mqm); | ||
555 | |||
556 | return lh; | 621 | return lh; |
557 | } | 622 | } |
558 | 623 | ||
@@ -680,7 +745,6 @@ GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh, | |||
680 | } | 745 | } |
681 | 746 | ||
682 | 747 | ||
683 | |||
684 | /** | 748 | /** |
685 | * Iterate over all elements in the given set. | 749 | * Iterate over all elements in the given set. |
686 | * Note that this operation involves transferring every element of the set | 750 | * Note that this operation involves transferring every element of the set |