diff options
author | Christian Grothoff <christian@grothoff.org> | 2017-02-19 12:39:15 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2017-02-19 12:39:15 +0100 |
commit | af4e2e306bd703958ab0b8de1ab25fcc0a528eea (patch) | |
tree | f0a0454e1efeb748e05c813a64c5624b29fcea29 /src/consensus/gnunet-service-consensus.c | |
parent | 8a946bb7c2bc6bb863980ecf16607c57c2b077ec (diff) | |
download | gnunet-af4e2e306bd703958ab0b8de1ab25fcc0a528eea.tar.gz gnunet-af4e2e306bd703958ab0b8de1ab25fcc0a528eea.zip |
converting consensus service to new SERVICE API
Diffstat (limited to 'src/consensus/gnunet-service-consensus.c')
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 490 |
1 files changed, 234 insertions, 256 deletions
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index 64decc29e..16ca6a57f 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet | 2 | This file is part of GNUnet |
3 | Copyright (C) 2012, 2013 GNUnet e.V. | 3 | Copyright (C) 2012, 2013, 2017 GNUnet e.V. |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -451,7 +451,7 @@ struct ConsensusSession | |||
451 | /** | 451 | /** |
452 | * Client that inhabits the session | 452 | * Client that inhabits the session |
453 | */ | 453 | */ |
454 | struct GNUNET_SERVER_Client *client; | 454 | struct GNUNET_SERVICE_Client *client; |
455 | 455 | ||
456 | /** | 456 | /** |
457 | * Queued messages to the client. | 457 | * Queued messages to the client. |
@@ -510,11 +510,6 @@ static struct ConsensusSession *sessions_tail; | |||
510 | static const struct GNUNET_CONFIGURATION_Handle *cfg; | 510 | static const struct GNUNET_CONFIGURATION_Handle *cfg; |
511 | 511 | ||
512 | /** | 512 | /** |
513 | * Handle to the server for this service. | ||
514 | */ | ||
515 | static struct GNUNET_SERVER_Handle *srv; | ||
516 | |||
517 | /** | ||
518 | * Peer that runs this service. | 513 | * Peer that runs this service. |
519 | */ | 514 | */ |
520 | static struct GNUNET_PeerIdentity my_peer; | 515 | static struct GNUNET_PeerIdentity my_peer; |
@@ -528,9 +523,11 @@ struct GNUNET_STATISTICS_Handle *statistics; | |||
528 | static void | 523 | static void |
529 | finish_task (struct TaskEntry *task); | 524 | finish_task (struct TaskEntry *task); |
530 | 525 | ||
526 | |||
531 | static void | 527 | static void |
532 | run_ready_steps (struct ConsensusSession *session); | 528 | run_ready_steps (struct ConsensusSession *session); |
533 | 529 | ||
530 | |||
534 | static const char * | 531 | static const char * |
535 | phasename (uint16_t phase) | 532 | phasename (uint16_t phase) |
536 | { | 533 | { |
@@ -653,36 +650,6 @@ debug_str_rfn_key (const struct RfnKey *rk) | |||
653 | 650 | ||
654 | 651 | ||
655 | /** | 652 | /** |
656 | * Destroy a session, free all resources associated with it. | ||
657 | * | ||
658 | * @param session the session to destroy | ||
659 | */ | ||
660 | static void | ||
661 | destroy_session (struct ConsensusSession *session) | ||
662 | { | ||
663 | GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); | ||
664 | if (NULL != session->set_listener) | ||
665 | { | ||
666 | GNUNET_SET_listen_cancel (session->set_listener); | ||
667 | session->set_listener = NULL; | ||
668 | } | ||
669 | if (NULL != session->client_mq) | ||
670 | { | ||
671 | GNUNET_MQ_destroy (session->client_mq); | ||
672 | session->client_mq = NULL; | ||
673 | /* The MQ cleanup will also disconnect the underlying client. */ | ||
674 | session->client = NULL; | ||
675 | } | ||
676 | if (NULL != session->client) | ||
677 | { | ||
678 | GNUNET_SERVER_client_disconnect (session->client); | ||
679 | session->client = NULL; | ||
680 | } | ||
681 | GNUNET_free (session); | ||
682 | } | ||
683 | |||
684 | |||
685 | /** | ||
686 | * Send the final result set of the consensus to the client, element by | 653 | * Send the final result set of the consensus to the client, element by |
687 | * element. | 654 | * element. |
688 | * | 655 | * |
@@ -1511,12 +1478,14 @@ rfn_create (uint16_t size) | |||
1511 | } | 1478 | } |
1512 | 1479 | ||
1513 | 1480 | ||
1514 | void | 1481 | #if UNUSED |
1482 | static void | ||
1515 | diff_destroy (struct DiffEntry *diff) | 1483 | diff_destroy (struct DiffEntry *diff) |
1516 | { | 1484 | { |
1517 | GNUNET_CONTAINER_multihashmap_destroy (diff->changes); | 1485 | GNUNET_CONTAINER_multihashmap_destroy (diff->changes); |
1518 | GNUNET_free (diff); | 1486 | GNUNET_free (diff); |
1519 | } | 1487 | } |
1488 | #endif | ||
1520 | 1489 | ||
1521 | 1490 | ||
1522 | /** | 1491 | /** |
@@ -2328,45 +2297,48 @@ peer_id_cmp (const void *h1, const void *h2) | |||
2328 | /** | 2297 | /** |
2329 | * Create the sorted list of peers for the session, | 2298 | * Create the sorted list of peers for the session, |
2330 | * add the local peer if not in the join message. | 2299 | * add the local peer if not in the join message. |
2300 | * | ||
2301 | * @param session session to initialize | ||
2302 | * @param join_msg join message with the list of peers participating at the end | ||
2331 | */ | 2303 | */ |
2332 | static void | 2304 | static void |
2333 | initialize_session_peer_list (struct ConsensusSession *session, | 2305 | initialize_session_peer_list (struct ConsensusSession *session, |
2334 | struct GNUNET_CONSENSUS_JoinMessage *join_msg) | 2306 | const struct GNUNET_CONSENSUS_JoinMessage *join_msg) |
2335 | { | 2307 | { |
2336 | unsigned int local_peer_in_list; | 2308 | const struct GNUNET_PeerIdentity *msg_peers |
2337 | uint32_t listed_peers; | 2309 | = (const struct GNUNET_PeerIdentity *) &join_msg[1]; |
2338 | const struct GNUNET_PeerIdentity *msg_peers; | 2310 | int local_peer_in_list; |
2339 | unsigned int i; | ||
2340 | |||
2341 | GNUNET_assert (NULL != join_msg); | ||
2342 | |||
2343 | /* peers in the join message, may or may not include the local peer */ | ||
2344 | listed_peers = ntohl (join_msg->num_peers); | ||
2345 | |||
2346 | session->num_peers = listed_peers; | ||
2347 | 2311 | ||
2348 | msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1]; | 2312 | session->num_peers = ntohl (join_msg->num_peers); |
2349 | 2313 | ||
2314 | /* Peers in the join message, may or may not include the local peer, | ||
2315 | Add it if it is missing. */ | ||
2350 | local_peer_in_list = GNUNET_NO; | 2316 | local_peer_in_list = GNUNET_NO; |
2351 | for (i = 0; i < listed_peers; i++) | 2317 | for (unsigned int i = 0; i < session->num_peers; i++) |
2352 | { | 2318 | { |
2353 | if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity))) | 2319 | if (0 == memcmp (&msg_peers[i], |
2320 | &my_peer, | ||
2321 | sizeof (struct GNUNET_PeerIdentity))) | ||
2354 | { | 2322 | { |
2355 | local_peer_in_list = GNUNET_YES; | 2323 | local_peer_in_list = GNUNET_YES; |
2356 | break; | 2324 | break; |
2357 | } | 2325 | } |
2358 | } | 2326 | } |
2359 | |||
2360 | if (GNUNET_NO == local_peer_in_list) | 2327 | if (GNUNET_NO == local_peer_in_list) |
2361 | session->num_peers++; | 2328 | session->num_peers++; |
2362 | 2329 | ||
2363 | session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); | 2330 | session->peers = GNUNET_new_array (session->num_peers, |
2364 | 2331 | struct GNUNET_PeerIdentity); | |
2365 | if (GNUNET_NO == local_peer_in_list) | 2332 | if (GNUNET_NO == local_peer_in_list) |
2366 | session->peers[session->num_peers - 1] = my_peer; | 2333 | session->peers[session->num_peers - 1] = my_peer; |
2367 | 2334 | ||
2368 | GNUNET_memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); | 2335 | GNUNET_memcpy (session->peers, |
2369 | qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp); | 2336 | msg_peers, |
2337 | ntohl (join_msg->num_peers) * sizeof (struct GNUNET_PeerIdentity)); | ||
2338 | qsort (session->peers, | ||
2339 | session->num_peers, | ||
2340 | sizeof (struct GNUNET_PeerIdentity), | ||
2341 | &peer_id_cmp); | ||
2370 | } | 2342 | } |
2371 | 2343 | ||
2372 | 2344 | ||
@@ -2924,177 +2896,160 @@ construct_task_graph (struct ConsensusSession *session) | |||
2924 | } | 2896 | } |
2925 | 2897 | ||
2926 | 2898 | ||
2899 | |||
2927 | /** | 2900 | /** |
2928 | * Initialize the session, continue receiving messages from the owning client | 2901 | * Check join message. |
2929 | * | 2902 | * |
2930 | * @param session the session to initialize | 2903 | * @param cls session of client that sent the message |
2931 | * @param join_msg the join message from the client | 2904 | * @param m message sent by the client |
2905 | * @return #GNUNET_OK if @a m is well-formed | ||
2906 | */ | ||
2907 | static int | ||
2908 | check_client_join (void *cls, | ||
2909 | const struct GNUNET_CONSENSUS_JoinMessage *m) | ||
2910 | { | ||
2911 | uint32_t listed_peers = ntohl (m->num_peers); | ||
2912 | |||
2913 | if ( (ntohs (m->header.size) - sizeof (*m)) != | ||
2914 | listed_peers * sizeof (struct GNUNET_PeerIdentity)) | ||
2915 | { | ||
2916 | GNUNET_break (0); | ||
2917 | return GNUNET_SYSERR; | ||
2918 | } | ||
2919 | return GNUNET_OK; | ||
2920 | } | ||
2921 | |||
2922 | |||
2923 | /** | ||
2924 | * Called when a client wants to join a consensus session. | ||
2925 | * | ||
2926 | * @param cls session of client that sent the message | ||
2927 | * @param m message sent by the client | ||
2932 | */ | 2928 | */ |
2933 | static void | 2929 | static void |
2934 | initialize_session (struct ConsensusSession *session, | 2930 | handle_client_join (void *cls, |
2935 | struct GNUNET_CONSENSUS_JoinMessage *join_msg) | 2931 | const struct GNUNET_CONSENSUS_JoinMessage *m) |
2936 | { | 2932 | { |
2933 | struct ConsensusSession *session = cls; | ||
2937 | struct ConsensusSession *other_session; | 2934 | struct ConsensusSession *other_session; |
2938 | 2935 | ||
2939 | initialize_session_peer_list (session, join_msg); | 2936 | initialize_session_peer_list (session, |
2940 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers); | 2937 | m); |
2941 | compute_global_id (session, &join_msg->session_id); | 2938 | compute_global_id (session, |
2939 | &m->session_id); | ||
2942 | 2940 | ||
2943 | /* Check if some local client already owns the session. | 2941 | /* Check if some local client already owns the session. |
2944 | It is only legal to have a session with an existing global id | 2942 | It is only legal to have a session with an existing global id |
2945 | if all other sessions with this global id are finished.*/ | 2943 | if all other sessions with this global id are finished.*/ |
2946 | other_session = sessions_head; | 2944 | for (other_session = sessions_head; |
2947 | while (NULL != other_session) | 2945 | NULL != other_session; |
2946 | other_session = other_session->next) | ||
2948 | { | 2947 | { |
2949 | if ((other_session != session) && | 2948 | if ( (other_session != session) && |
2950 | (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) | 2949 | (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, |
2951 | { | 2950 | &other_session->global_id)) ) |
2952 | //if (CONSENSUS_ROUND_FINISH != other_session->current_round) | ||
2953 | //{ | ||
2954 | // GNUNET_break (0); | ||
2955 | // destroy_session (session); | ||
2956 | // return; | ||
2957 | //} | ||
2958 | break; | 2951 | break; |
2959 | } | ||
2960 | other_session = other_session->next; | ||
2961 | } | 2952 | } |
2962 | 2953 | ||
2963 | session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline); | 2954 | session->conclude_deadline |
2964 | session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start); | 2955 | = GNUNET_TIME_absolute_ntoh (m->deadline); |
2965 | 2956 | session->conclude_start | |
2966 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus with timeout %llums created\n", | 2957 | = GNUNET_TIME_absolute_ntoh (m->start); |
2967 | (long long) (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline)).rel_value_us / 1000); | 2958 | session->local_peer_idx = get_peer_idx (&my_peer, |
2968 | 2959 | session); | |
2969 | session->local_peer_idx = get_peer_idx (&my_peer, session); | ||
2970 | GNUNET_assert (-1 != session->local_peer_idx); | 2960 | GNUNET_assert (-1 != session->local_peer_idx); |
2971 | session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, | ||
2972 | &session->global_id, | ||
2973 | set_listen_cb, session); | ||
2974 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx); | ||
2975 | 2961 | ||
2976 | session->setmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | 2962 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2977 | session->taskmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | 2963 | "Joining consensus session %s containing %u peers as %u with timeout %s\n", |
2978 | session->diffmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | 2964 | GNUNET_h2s (&m->session_id), |
2979 | session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | 2965 | session->num_peers, |
2966 | session->local_peer_idx, | ||
2967 | GNUNET_STRINGS_relative_time_to_string | ||
2968 | (GNUNET_TIME_absolute_get_difference (session->conclude_start, | ||
2969 | session->conclude_deadline), | ||
2970 | GNUNET_YES)); | ||
2971 | |||
2972 | session->set_listener | ||
2973 | = GNUNET_SET_listen (cfg, | ||
2974 | GNUNET_SET_OPERATION_UNION, | ||
2975 | &session->global_id, | ||
2976 | &set_listen_cb, | ||
2977 | session); | ||
2978 | |||
2979 | session->setmap = GNUNET_CONTAINER_multihashmap_create (1, | ||
2980 | GNUNET_NO); | ||
2981 | session->taskmap = GNUNET_CONTAINER_multihashmap_create (1, | ||
2982 | GNUNET_NO); | ||
2983 | session->diffmap = GNUNET_CONTAINER_multihashmap_create (1, | ||
2984 | GNUNET_NO); | ||
2985 | session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1, | ||
2986 | GNUNET_NO); | ||
2980 | 2987 | ||
2981 | { | 2988 | { |
2982 | struct SetEntry *client_set; | 2989 | struct SetEntry *client_set; |
2990 | |||
2983 | client_set = GNUNET_new (struct SetEntry); | 2991 | client_set = GNUNET_new (struct SetEntry); |
2984 | client_set->h = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); | 2992 | client_set->h = GNUNET_SET_create (cfg, |
2993 | GNUNET_SET_OPERATION_UNION); | ||
2985 | client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 }); | 2994 | client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 }); |
2986 | put_set (session, client_set); | 2995 | put_set (session, |
2996 | client_set); | ||
2987 | } | 2997 | } |
2988 | 2998 | ||
2989 | session->peers_blacklisted = GNUNET_new_array (session->num_peers, int); | 2999 | session->peers_blacklisted = GNUNET_new_array (session->num_peers, |
3000 | int); | ||
2990 | 3001 | ||
2991 | /* Just construct the task graph, | 3002 | /* Just construct the task graph, |
2992 | but don't run anything until the client calls conclude. */ | 3003 | but don't run anything until the client calls conclude. */ |
2993 | construct_task_graph (session); | 3004 | construct_task_graph (session); |
2994 | 3005 | GNUNET_SERVICE_client_continue (session->client); | |
2995 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id)); | ||
2996 | } | 3006 | } |
2997 | 3007 | ||
2998 | 3008 | ||
2999 | static struct ConsensusSession * | 3009 | static void |
3000 | get_session_by_client (struct GNUNET_SERVER_Client *client) | 3010 | client_insert_done (void *cls) |
3001 | { | 3011 | { |
3002 | struct ConsensusSession *session; | 3012 | // FIXME: implement |
3003 | |||
3004 | session = sessions_head; | ||
3005 | while (NULL != session) | ||
3006 | { | ||
3007 | if (session->client == client) | ||
3008 | return session; | ||
3009 | session = session->next; | ||
3010 | } | ||
3011 | return NULL; | ||
3012 | } | 3013 | } |
3013 | 3014 | ||
3014 | 3015 | ||
3015 | /** | 3016 | /** |
3016 | * Called when a client wants to join a consensus session. | 3017 | * Called when a client performs an insert operation. |
3017 | * | 3018 | * |
3018 | * @param cls unused | 3019 | * @param cls client handle |
3019 | * @param client client that sent the message | 3020 | * @param msg message sent by the client |
3020 | * @param m message sent by the client | 3021 | * @return #GNUNET_OK (always well-formed) |
3021 | */ | 3022 | */ |
3022 | static void | 3023 | static int |
3023 | client_join (void *cls, | 3024 | check_client_insert (void *cls, |
3024 | struct GNUNET_SERVER_Client *client, | 3025 | const struct GNUNET_CONSENSUS_ElementMessage *msg) |
3025 | const struct GNUNET_MessageHeader *m) | ||
3026 | { | ||
3027 | struct ConsensusSession *session; | ||
3028 | |||
3029 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n"); | ||
3030 | |||
3031 | session = get_session_by_client (client); | ||
3032 | if (NULL != session) | ||
3033 | { | ||
3034 | GNUNET_break (0); | ||
3035 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | ||
3036 | return; | ||
3037 | } | ||
3038 | session = GNUNET_new (struct ConsensusSession); | ||
3039 | session->client = client; | ||
3040 | session->client_mq = GNUNET_MQ_queue_for_server_client (client); | ||
3041 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); | ||
3042 | initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m); | ||
3043 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
3044 | |||
3045 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n"); | ||
3046 | } | ||
3047 | |||
3048 | |||
3049 | static void | ||
3050 | client_insert_done (void *cls) | ||
3051 | { | 3026 | { |
3052 | // FIXME: implement | 3027 | return GNUNET_OK; |
3053 | } | 3028 | } |
3054 | 3029 | ||
3055 | 3030 | ||
3056 | /** | 3031 | /** |
3057 | * Called when a client performs an insert operation. | 3032 | * Called when a client performs an insert operation. |
3058 | * | 3033 | * |
3059 | * @param cls (unused) | 3034 | * @param cls client handle |
3060 | * @param client client handle | 3035 | * @param msg message sent by the client |
3061 | * @param m message sent by the client | ||
3062 | */ | 3036 | */ |
3063 | void | 3037 | static void |
3064 | client_insert (void *cls, | 3038 | handle_client_insert (void *cls, |
3065 | struct GNUNET_SERVER_Client *client, | 3039 | const struct GNUNET_CONSENSUS_ElementMessage *msg) |
3066 | const struct GNUNET_MessageHeader *m) | ||
3067 | { | 3040 | { |
3068 | struct ConsensusSession *session; | 3041 | struct ConsensusSession *session = cls; |
3069 | struct GNUNET_CONSENSUS_ElementMessage *msg; | ||
3070 | struct GNUNET_SET_Element *element; | 3042 | struct GNUNET_SET_Element *element; |
3071 | ssize_t element_size; | 3043 | ssize_t element_size; |
3072 | struct GNUNET_SET_Handle *initial_set; | 3044 | struct GNUNET_SET_Handle *initial_set; |
3073 | 3045 | ||
3074 | session = get_session_by_client (client); | ||
3075 | |||
3076 | if (NULL == session) | ||
3077 | { | ||
3078 | GNUNET_break (0); | ||
3079 | GNUNET_SERVER_client_disconnect (client); | ||
3080 | return; | ||
3081 | } | ||
3082 | |||
3083 | if (GNUNET_YES == session->conclude_started) | 3046 | if (GNUNET_YES == session->conclude_started) |
3084 | { | 3047 | { |
3085 | GNUNET_break (0); | 3048 | GNUNET_break (0); |
3086 | GNUNET_SERVER_client_disconnect (client); | 3049 | GNUNET_SERVICE_client_drop (session->client); |
3087 | return; | 3050 | return; |
3088 | } | 3051 | } |
3089 | |||
3090 | msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; | ||
3091 | element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); | 3052 | element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); |
3092 | if (element_size < 0) | ||
3093 | { | ||
3094 | GNUNET_break (0); | ||
3095 | return; | ||
3096 | } | ||
3097 | |||
3098 | element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size); | 3053 | element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size); |
3099 | element->element_type = msg->element_type; | 3054 | element->element_type = msg->element_type; |
3100 | element->size = element_size; | 3055 | element->size = element_size; |
@@ -3103,71 +3058,61 @@ client_insert (void *cls, | |||
3103 | { | 3058 | { |
3104 | struct SetKey key = { SET_KIND_CURRENT, 0, 0 }; | 3059 | struct SetKey key = { SET_KIND_CURRENT, 0, 0 }; |
3105 | struct SetEntry *entry; | 3060 | struct SetEntry *entry; |
3106 | entry = lookup_set (session, &key); | 3061 | |
3062 | entry = lookup_set (session, | ||
3063 | &key); | ||
3107 | GNUNET_assert (NULL != entry); | 3064 | GNUNET_assert (NULL != entry); |
3108 | initial_set = entry->h; | 3065 | initial_set = entry->h; |
3109 | } | 3066 | } |
3110 | session->num_client_insert_pending++; | 3067 | session->num_client_insert_pending++; |
3111 | GNUNET_SET_add_element (initial_set, element, client_insert_done, session); | 3068 | GNUNET_SET_add_element (initial_set, |
3069 | element, | ||
3070 | &client_insert_done, | ||
3071 | session); | ||
3112 | 3072 | ||
3113 | #ifdef GNUNET_EXTRA_LOGGING | 3073 | #ifdef GNUNET_EXTRA_LOGGING |
3114 | { | 3074 | { |
3115 | struct GNUNET_HashCode hash; | 3075 | struct GNUNET_HashCode hash; |
3116 | 3076 | ||
3117 | GNUNET_SET_element_hash (element, &hash); | 3077 | GNUNET_SET_element_hash (element, |
3078 | &hash); | ||
3118 | 3079 | ||
3119 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element %s added\n", | 3080 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
3081 | "P%u: element %s added\n", | ||
3120 | session->local_peer_idx, | 3082 | session->local_peer_idx, |
3121 | GNUNET_h2s (&hash)); | 3083 | GNUNET_h2s (&hash)); |
3122 | } | 3084 | } |
3123 | #endif | 3085 | #endif |
3124 | |||
3125 | GNUNET_free (element); | 3086 | GNUNET_free (element); |
3126 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 3087 | GNUNET_SERVICE_client_continue (session->client); |
3127 | } | 3088 | } |
3128 | 3089 | ||
3129 | 3090 | ||
3130 | /** | 3091 | /** |
3131 | * Called when a client performs the conclude operation. | 3092 | * Called when a client performs the conclude operation. |
3132 | * | 3093 | * |
3133 | * @param cls (unused) | 3094 | * @param cls client handle |
3134 | * @param client client handle | ||
3135 | * @param message message sent by the client | 3095 | * @param message message sent by the client |
3136 | */ | 3096 | */ |
3137 | static void | 3097 | static void |
3138 | client_conclude (void *cls, | 3098 | handle_client_conclude (void *cls, |
3139 | struct GNUNET_SERVER_Client *client, | 3099 | const struct GNUNET_MessageHeader *message) |
3140 | const struct GNUNET_MessageHeader *message) | ||
3141 | { | 3100 | { |
3142 | struct ConsensusSession *session; | 3101 | struct ConsensusSession *session = cls; |
3143 | |||
3144 | session = get_session_by_client (client); | ||
3145 | if (NULL == session) | ||
3146 | { | ||
3147 | /* client not found */ | ||
3148 | GNUNET_break (0); | ||
3149 | GNUNET_SERVER_client_disconnect (client); | ||
3150 | return; | ||
3151 | } | ||
3152 | 3102 | ||
3153 | if (GNUNET_YES == session->conclude_started) | 3103 | if (GNUNET_YES == session->conclude_started) |
3154 | { | 3104 | { |
3155 | /* conclude started twice */ | 3105 | /* conclude started twice */ |
3156 | GNUNET_break (0); | 3106 | GNUNET_break (0); |
3157 | GNUNET_SERVER_client_disconnect (client); | 3107 | GNUNET_SERVICE_client_drop (session->client); |
3158 | destroy_session (session); | ||
3159 | return; | 3108 | return; |
3160 | } | 3109 | } |
3161 | 3110 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |
3162 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n"); | 3111 | "conclude requested\n"); |
3163 | |||
3164 | session->conclude_started = GNUNET_YES; | 3112 | session->conclude_started = GNUNET_YES; |
3165 | |||
3166 | install_step_timeouts (session); | 3113 | install_step_timeouts (session); |
3167 | run_ready_steps (session); | 3114 | run_ready_steps (session); |
3168 | 3115 | GNUNET_SERVICE_client_continue (session->client); | |
3169 | |||
3170 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
3171 | } | 3116 | } |
3172 | 3117 | ||
3173 | 3118 | ||
@@ -3179,82 +3124,115 @@ client_conclude (void *cls, | |||
3179 | static void | 3124 | static void |
3180 | shutdown_task (void *cls) | 3125 | shutdown_task (void *cls) |
3181 | { | 3126 | { |
3182 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "shutting down\n"); | 3127 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
3183 | while (NULL != sessions_head) | 3128 | "shutting down\n"); |
3184 | destroy_session (sessions_head); | 3129 | GNUNET_STATISTICS_destroy (statistics, |
3185 | 3130 | GNUNET_NO); | |
3186 | GNUNET_STATISTICS_destroy (statistics, GNUNET_NO); | 3131 | statistics = NULL; |
3187 | } | 3132 | } |
3188 | 3133 | ||
3189 | 3134 | ||
3190 | /** | 3135 | /** |
3191 | * Clean up after a client after it is | 3136 | * Start processing consensus requests. |
3192 | * disconnected (either by us or by itself) | ||
3193 | * | 3137 | * |
3194 | * @param cls closure, unused | 3138 | * @param cls closure |
3195 | * @param client the client to clean up after | 3139 | * @param c configuration to use |
3140 | * @param service the initialized service | ||
3196 | */ | 3141 | */ |
3197 | void | 3142 | static void |
3198 | handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | 3143 | run (void *cls, |
3144 | const struct GNUNET_CONFIGURATION_Handle *c, | ||
3145 | struct GNUNET_SERVICE_Handle *service) | ||
3199 | { | 3146 | { |
3200 | struct ConsensusSession *session; | 3147 | cfg = c; |
3201 | 3148 | if (GNUNET_OK != | |
3202 | session = get_session_by_client (client); | 3149 | GNUNET_CRYPTO_get_peer_identity (cfg, |
3203 | if (NULL == session) | 3150 | &my_peer)) |
3151 | { | ||
3152 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
3153 | "Could not retrieve host identity\n"); | ||
3154 | GNUNET_SCHEDULER_shutdown (); | ||
3204 | return; | 3155 | return; |
3205 | // FIXME: destroy if we can | 3156 | } |
3157 | statistics = GNUNET_STATISTICS_create ("consensus", | ||
3158 | cfg); | ||
3159 | GNUNET_SCHEDULER_add_shutdown (&shutdown_task, | ||
3160 | NULL); | ||
3206 | } | 3161 | } |
3207 | 3162 | ||
3208 | 3163 | ||
3164 | /** | ||
3165 | * Callback called when a client connects to the service. | ||
3166 | * | ||
3167 | * @param cls closure for the service | ||
3168 | * @param c the new client that connected to the service | ||
3169 | * @param mq the message queue used to send messages to the client | ||
3170 | * @return @a c | ||
3171 | */ | ||
3172 | static void * | ||
3173 | client_connect_cb (void *cls, | ||
3174 | struct GNUNET_SERVICE_Client *c, | ||
3175 | struct GNUNET_MQ_Handle *mq) | ||
3176 | { | ||
3177 | struct ConsensusSession *session = GNUNET_new (struct ConsensusSession); | ||
3178 | |||
3179 | session->client = c; | ||
3180 | session->client_mq = mq; | ||
3181 | GNUNET_CONTAINER_DLL_insert (sessions_head, | ||
3182 | sessions_tail, | ||
3183 | session); | ||
3184 | return session; | ||
3185 | } | ||
3186 | |||
3209 | 3187 | ||
3210 | /** | 3188 | /** |
3211 | * Start processing consensus requests. | 3189 | * Callback called when a client disconnected from the service |
3212 | * | 3190 | * |
3213 | * @param cls closure | 3191 | * @param cls closure for the service |
3214 | * @param server the initialized server | 3192 | * @param c the client that disconnected |
3215 | * @param c configuration to use | 3193 | * @param internal_cls should be equal to @a c |
3216 | */ | 3194 | */ |
3217 | static void | 3195 | static void |
3218 | run (void *cls, struct GNUNET_SERVER_Handle *server, | 3196 | client_disconnect_cb (void *cls, |
3219 | const struct GNUNET_CONFIGURATION_Handle *c) | 3197 | struct GNUNET_SERVICE_Client *c, |
3198 | void *internal_cls) | ||
3220 | { | 3199 | { |
3221 | static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { | 3200 | struct ConsensusSession *session = internal_cls; |
3222 | {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, | ||
3223 | sizeof (struct GNUNET_MessageHeader)}, | ||
3224 | {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, | ||
3225 | {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, | ||
3226 | {NULL, NULL, 0, 0} | ||
3227 | }; | ||
3228 | 3201 | ||
3229 | cfg = c; | 3202 | if (NULL != session->set_listener) |
3230 | srv = server; | ||
3231 | if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer)) | ||
3232 | { | 3203 | { |
3233 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n"); | 3204 | GNUNET_SET_listen_cancel (session->set_listener); |
3234 | GNUNET_break (0); | 3205 | session->set_listener = NULL; |
3235 | GNUNET_SCHEDULER_shutdown (); | ||
3236 | return; | ||
3237 | } | 3206 | } |
3238 | statistics = GNUNET_STATISTICS_create ("consensus", cfg); | 3207 | GNUNET_CONTAINER_DLL_remove (sessions_head, |
3239 | GNUNET_SERVER_add_handlers (server, server_handlers); | 3208 | sessions_tail, |
3240 | GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); | 3209 | session); |
3241 | GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL); | 3210 | GNUNET_free (session); |
3242 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n"); | ||
3243 | } | 3211 | } |
3244 | 3212 | ||
3245 | 3213 | ||
3246 | /** | 3214 | /** |
3247 | * The main function for the consensus service. | 3215 | * Define "main" method using service macro. |
3248 | * | ||
3249 | * @param argc number of arguments from the command line | ||
3250 | * @param argv command line arguments | ||
3251 | * @return 0 ok, 1 on error | ||
3252 | */ | 3216 | */ |
3253 | int | 3217 | GNUNET_SERVICE_MAIN |
3254 | main (int argc, char *const *argv) | 3218 | ("consensus", |
3255 | { | 3219 | GNUNET_SERVICE_OPTION_NONE, |
3256 | int ret; | 3220 | &run, |
3257 | ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL); | 3221 | &client_connect_cb, |
3258 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret); | 3222 | &client_disconnect_cb, |
3259 | return (GNUNET_OK == ret) ? 0 : 1; | 3223 | NULL, |
3260 | } | 3224 | GNUNET_MQ_hd_fixed_size (client_conclude, |
3225 | GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, | ||
3226 | struct GNUNET_MessageHeader, | ||
3227 | NULL), | ||
3228 | GNUNET_MQ_hd_var_size (client_insert, | ||
3229 | GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, | ||
3230 | struct GNUNET_CONSENSUS_ElementMessage, | ||
3231 | NULL), | ||
3232 | GNUNET_MQ_hd_var_size (client_join, | ||
3233 | GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, | ||
3234 | struct GNUNET_CONSENSUS_JoinMessage, | ||
3235 | NULL), | ||
3236 | GNUNET_MQ_handler_end ()); | ||
3237 | |||
3238 | /* end of gnunet-service-consensus.c */ | ||