aboutsummaryrefslogtreecommitdiff
path: root/src/consensus
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-02-19 12:39:15 +0100
committerChristian Grothoff <christian@grothoff.org>2017-02-19 12:39:15 +0100
commitaf4e2e306bd703958ab0b8de1ab25fcc0a528eea (patch)
treef0a0454e1efeb748e05c813a64c5624b29fcea29 /src/consensus
parent8a946bb7c2bc6bb863980ecf16607c57c2b077ec (diff)
downloadgnunet-af4e2e306bd703958ab0b8de1ab25fcc0a528eea.tar.gz
gnunet-af4e2e306bd703958ab0b8de1ab25fcc0a528eea.zip
converting consensus service to new SERVICE API
Diffstat (limited to 'src/consensus')
-rw-r--r--src/consensus/gnunet-service-consensus.c490
1 files changed, 234 insertions, 256 deletions
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index 64decc29e..16ca6a57f 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 Copyright (C) 2012, 2013 GNUnet e.V. 3 Copyright (C) 2012, 2013, 2017 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -451,7 +451,7 @@ struct ConsensusSession
451 /** 451 /**
452 * Client that inhabits the session 452 * Client that inhabits the session
453 */ 453 */
454 struct GNUNET_SERVER_Client *client; 454 struct GNUNET_SERVICE_Client *client;
455 455
456 /** 456 /**
457 * Queued messages to the client. 457 * Queued messages to the client.
@@ -510,11 +510,6 @@ static struct ConsensusSession *sessions_tail;
510static const struct GNUNET_CONFIGURATION_Handle *cfg; 510static const struct GNUNET_CONFIGURATION_Handle *cfg;
511 511
512/** 512/**
513 * Handle to the server for this service.
514 */
515static struct GNUNET_SERVER_Handle *srv;
516
517/**
518 * Peer that runs this service. 513 * Peer that runs this service.
519 */ 514 */
520static struct GNUNET_PeerIdentity my_peer; 515static struct GNUNET_PeerIdentity my_peer;
@@ -528,9 +523,11 @@ struct GNUNET_STATISTICS_Handle *statistics;
528static void 523static void
529finish_task (struct TaskEntry *task); 524finish_task (struct TaskEntry *task);
530 525
526
531static void 527static void
532run_ready_steps (struct ConsensusSession *session); 528run_ready_steps (struct ConsensusSession *session);
533 529
530
534static const char * 531static const char *
535phasename (uint16_t phase) 532phasename (uint16_t phase)
536{ 533{
@@ -653,36 +650,6 @@ debug_str_rfn_key (const struct RfnKey *rk)
653 650
654 651
655/** 652/**
656 * Destroy a session, free all resources associated with it.
657 *
658 * @param session the session to destroy
659 */
660static void
661destroy_session (struct ConsensusSession *session)
662{
663 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
664 if (NULL != session->set_listener)
665 {
666 GNUNET_SET_listen_cancel (session->set_listener);
667 session->set_listener = NULL;
668 }
669 if (NULL != session->client_mq)
670 {
671 GNUNET_MQ_destroy (session->client_mq);
672 session->client_mq = NULL;
673 /* The MQ cleanup will also disconnect the underlying client. */
674 session->client = NULL;
675 }
676 if (NULL != session->client)
677 {
678 GNUNET_SERVER_client_disconnect (session->client);
679 session->client = NULL;
680 }
681 GNUNET_free (session);
682}
683
684
685/**
686 * Send the final result set of the consensus to the client, element by 653 * Send the final result set of the consensus to the client, element by
687 * element. 654 * element.
688 * 655 *
@@ -1511,12 +1478,14 @@ rfn_create (uint16_t size)
1511} 1478}
1512 1479
1513 1480
1514void 1481#if UNUSED
1482static void
1515diff_destroy (struct DiffEntry *diff) 1483diff_destroy (struct DiffEntry *diff)
1516{ 1484{
1517 GNUNET_CONTAINER_multihashmap_destroy (diff->changes); 1485 GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
1518 GNUNET_free (diff); 1486 GNUNET_free (diff);
1519} 1487}
1488#endif
1520 1489
1521 1490
1522/** 1491/**
@@ -2328,45 +2297,48 @@ peer_id_cmp (const void *h1, const void *h2)
2328/** 2297/**
2329 * Create the sorted list of peers for the session, 2298 * Create the sorted list of peers for the session,
2330 * add the local peer if not in the join message. 2299 * add the local peer if not in the join message.
2300 *
2301 * @param session session to initialize
2302 * @param join_msg join message with the list of peers participating at the end
2331 */ 2303 */
2332static void 2304static void
2333initialize_session_peer_list (struct ConsensusSession *session, 2305initialize_session_peer_list (struct ConsensusSession *session,
2334 struct GNUNET_CONSENSUS_JoinMessage *join_msg) 2306 const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2335{ 2307{
2336 unsigned int local_peer_in_list; 2308 const struct GNUNET_PeerIdentity *msg_peers
2337 uint32_t listed_peers; 2309 = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2338 const struct GNUNET_PeerIdentity *msg_peers; 2310 int local_peer_in_list;
2339 unsigned int i;
2340
2341 GNUNET_assert (NULL != join_msg);
2342
2343 /* peers in the join message, may or may not include the local peer */
2344 listed_peers = ntohl (join_msg->num_peers);
2345
2346 session->num_peers = listed_peers;
2347 2311
2348 msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1]; 2312 session->num_peers = ntohl (join_msg->num_peers);
2349 2313
2314 /* Peers in the join message, may or may not include the local peer,
2315 Add it if it is missing. */
2350 local_peer_in_list = GNUNET_NO; 2316 local_peer_in_list = GNUNET_NO;
2351 for (i = 0; i < listed_peers; i++) 2317 for (unsigned int i = 0; i < session->num_peers; i++)
2352 { 2318 {
2353 if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity))) 2319 if (0 == memcmp (&msg_peers[i],
2320 &my_peer,
2321 sizeof (struct GNUNET_PeerIdentity)))
2354 { 2322 {
2355 local_peer_in_list = GNUNET_YES; 2323 local_peer_in_list = GNUNET_YES;
2356 break; 2324 break;
2357 } 2325 }
2358 } 2326 }
2359
2360 if (GNUNET_NO == local_peer_in_list) 2327 if (GNUNET_NO == local_peer_in_list)
2361 session->num_peers++; 2328 session->num_peers++;
2362 2329
2363 session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); 2330 session->peers = GNUNET_new_array (session->num_peers,
2364 2331 struct GNUNET_PeerIdentity);
2365 if (GNUNET_NO == local_peer_in_list) 2332 if (GNUNET_NO == local_peer_in_list)
2366 session->peers[session->num_peers - 1] = my_peer; 2333 session->peers[session->num_peers - 1] = my_peer;
2367 2334
2368 GNUNET_memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); 2335 GNUNET_memcpy (session->peers,
2369 qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp); 2336 msg_peers,
2337 ntohl (join_msg->num_peers) * sizeof (struct GNUNET_PeerIdentity));
2338 qsort (session->peers,
2339 session->num_peers,
2340 sizeof (struct GNUNET_PeerIdentity),
2341 &peer_id_cmp);
2370} 2342}
2371 2343
2372 2344
@@ -2924,177 +2896,160 @@ construct_task_graph (struct ConsensusSession *session)
2924} 2896}
2925 2897
2926 2898
2899
2927/** 2900/**
2928 * Initialize the session, continue receiving messages from the owning client 2901 * Check join message.
2929 * 2902 *
2930 * @param session the session to initialize 2903 * @param cls session of client that sent the message
2931 * @param join_msg the join message from the client 2904 * @param m message sent by the client
2905 * @return #GNUNET_OK if @a m is well-formed
2906 */
2907static int
2908check_client_join (void *cls,
2909 const struct GNUNET_CONSENSUS_JoinMessage *m)
2910{
2911 uint32_t listed_peers = ntohl (m->num_peers);
2912
2913 if ( (ntohs (m->header.size) - sizeof (*m)) !=
2914 listed_peers * sizeof (struct GNUNET_PeerIdentity))
2915 {
2916 GNUNET_break (0);
2917 return GNUNET_SYSERR;
2918 }
2919 return GNUNET_OK;
2920}
2921
2922
2923/**
2924 * Called when a client wants to join a consensus session.
2925 *
2926 * @param cls session of client that sent the message
2927 * @param m message sent by the client
2932 */ 2928 */
2933static void 2929static void
2934initialize_session (struct ConsensusSession *session, 2930handle_client_join (void *cls,
2935 struct GNUNET_CONSENSUS_JoinMessage *join_msg) 2931 const struct GNUNET_CONSENSUS_JoinMessage *m)
2936{ 2932{
2933 struct ConsensusSession *session = cls;
2937 struct ConsensusSession *other_session; 2934 struct ConsensusSession *other_session;
2938 2935
2939 initialize_session_peer_list (session, join_msg); 2936 initialize_session_peer_list (session,
2940 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers); 2937 m);
2941 compute_global_id (session, &join_msg->session_id); 2938 compute_global_id (session,
2939 &m->session_id);
2942 2940
2943 /* Check if some local client already owns the session. 2941 /* Check if some local client already owns the session.
2944 It is only legal to have a session with an existing global id 2942 It is only legal to have a session with an existing global id
2945 if all other sessions with this global id are finished.*/ 2943 if all other sessions with this global id are finished.*/
2946 other_session = sessions_head; 2944 for (other_session = sessions_head;
2947 while (NULL != other_session) 2945 NULL != other_session;
2946 other_session = other_session->next)
2948 { 2947 {
2949 if ((other_session != session) && 2948 if ( (other_session != session) &&
2950 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) 2949 (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
2951 { 2950 &other_session->global_id)) )
2952 //if (CONSENSUS_ROUND_FINISH != other_session->current_round)
2953 //{
2954 // GNUNET_break (0);
2955 // destroy_session (session);
2956 // return;
2957 //}
2958 break; 2951 break;
2959 }
2960 other_session = other_session->next;
2961 } 2952 }
2962 2953
2963 session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline); 2954 session->conclude_deadline
2964 session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start); 2955 = GNUNET_TIME_absolute_ntoh (m->deadline);
2965 2956 session->conclude_start
2966 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus with timeout %llums created\n", 2957 = GNUNET_TIME_absolute_ntoh (m->start);
2967 (long long) (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline)).rel_value_us / 1000); 2958 session->local_peer_idx = get_peer_idx (&my_peer,
2968 2959 session);
2969 session->local_peer_idx = get_peer_idx (&my_peer, session);
2970 GNUNET_assert (-1 != session->local_peer_idx); 2960 GNUNET_assert (-1 != session->local_peer_idx);
2971 session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
2972 &session->global_id,
2973 set_listen_cb, session);
2974 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx);
2975 2961
2976 session->setmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); 2962 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2977 session->taskmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); 2963 "Joining consensus session %s containing %u peers as %u with timeout %s\n",
2978 session->diffmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); 2964 GNUNET_h2s (&m->session_id),
2979 session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); 2965 session->num_peers,
2966 session->local_peer_idx,
2967 GNUNET_STRINGS_relative_time_to_string
2968 (GNUNET_TIME_absolute_get_difference (session->conclude_start,
2969 session->conclude_deadline),
2970 GNUNET_YES));
2971
2972 session->set_listener
2973 = GNUNET_SET_listen (cfg,
2974 GNUNET_SET_OPERATION_UNION,
2975 &session->global_id,
2976 &set_listen_cb,
2977 session);
2978
2979 session->setmap = GNUNET_CONTAINER_multihashmap_create (1,
2980 GNUNET_NO);
2981 session->taskmap = GNUNET_CONTAINER_multihashmap_create (1,
2982 GNUNET_NO);
2983 session->diffmap = GNUNET_CONTAINER_multihashmap_create (1,
2984 GNUNET_NO);
2985 session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1,
2986 GNUNET_NO);
2980 2987
2981 { 2988 {
2982 struct SetEntry *client_set; 2989 struct SetEntry *client_set;
2990
2983 client_set = GNUNET_new (struct SetEntry); 2991 client_set = GNUNET_new (struct SetEntry);
2984 client_set->h = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); 2992 client_set->h = GNUNET_SET_create (cfg,
2993 GNUNET_SET_OPERATION_UNION);
2985 client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 }); 2994 client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
2986 put_set (session, client_set); 2995 put_set (session,
2996 client_set);
2987 } 2997 }
2988 2998
2989 session->peers_blacklisted = GNUNET_new_array (session->num_peers, int); 2999 session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3000 int);
2990 3001
2991 /* Just construct the task graph, 3002 /* Just construct the task graph,
2992 but don't run anything until the client calls conclude. */ 3003 but don't run anything until the client calls conclude. */
2993 construct_task_graph (session); 3004 construct_task_graph (session);
2994 3005 GNUNET_SERVICE_client_continue (session->client);
2995 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id));
2996} 3006}
2997 3007
2998 3008
2999static struct ConsensusSession * 3009static void
3000get_session_by_client (struct GNUNET_SERVER_Client *client) 3010client_insert_done (void *cls)
3001{ 3011{
3002 struct ConsensusSession *session; 3012 // FIXME: implement
3003
3004 session = sessions_head;
3005 while (NULL != session)
3006 {
3007 if (session->client == client)
3008 return session;
3009 session = session->next;
3010 }
3011 return NULL;
3012} 3013}
3013 3014
3014 3015
3015/** 3016/**
3016 * Called when a client wants to join a consensus session. 3017 * Called when a client performs an insert operation.
3017 * 3018 *
3018 * @param cls unused 3019 * @param cls client handle
3019 * @param client client that sent the message 3020 * @param msg message sent by the client
3020 * @param m message sent by the client 3021 * @return #GNUNET_OK (always well-formed)
3021 */ 3022 */
3022static void 3023static int
3023client_join (void *cls, 3024check_client_insert (void *cls,
3024 struct GNUNET_SERVER_Client *client, 3025 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3025 const struct GNUNET_MessageHeader *m)
3026{
3027 struct ConsensusSession *session;
3028
3029 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n");
3030
3031 session = get_session_by_client (client);
3032 if (NULL != session)
3033 {
3034 GNUNET_break (0);
3035 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
3036 return;
3037 }
3038 session = GNUNET_new (struct ConsensusSession);
3039 session->client = client;
3040 session->client_mq = GNUNET_MQ_queue_for_server_client (client);
3041 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
3042 initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
3043 GNUNET_SERVER_receive_done (client, GNUNET_OK);
3044
3045 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n");
3046}
3047
3048
3049static void
3050client_insert_done (void *cls)
3051{ 3026{
3052 // FIXME: implement 3027 return GNUNET_OK;
3053} 3028}
3054 3029
3055 3030
3056/** 3031/**
3057 * Called when a client performs an insert operation. 3032 * Called when a client performs an insert operation.
3058 * 3033 *
3059 * @param cls (unused) 3034 * @param cls client handle
3060 * @param client client handle 3035 * @param msg message sent by the client
3061 * @param m message sent by the client
3062 */ 3036 */
3063void 3037static void
3064client_insert (void *cls, 3038handle_client_insert (void *cls,
3065 struct GNUNET_SERVER_Client *client, 3039 const struct GNUNET_CONSENSUS_ElementMessage *msg)
3066 const struct GNUNET_MessageHeader *m)
3067{ 3040{
3068 struct ConsensusSession *session; 3041 struct ConsensusSession *session = cls;
3069 struct GNUNET_CONSENSUS_ElementMessage *msg;
3070 struct GNUNET_SET_Element *element; 3042 struct GNUNET_SET_Element *element;
3071 ssize_t element_size; 3043 ssize_t element_size;
3072 struct GNUNET_SET_Handle *initial_set; 3044 struct GNUNET_SET_Handle *initial_set;
3073 3045
3074 session = get_session_by_client (client);
3075
3076 if (NULL == session)
3077 {
3078 GNUNET_break (0);
3079 GNUNET_SERVER_client_disconnect (client);
3080 return;
3081 }
3082
3083 if (GNUNET_YES == session->conclude_started) 3046 if (GNUNET_YES == session->conclude_started)
3084 { 3047 {
3085 GNUNET_break (0); 3048 GNUNET_break (0);
3086 GNUNET_SERVER_client_disconnect (client); 3049 GNUNET_SERVICE_client_drop (session->client);
3087 return; 3050 return;
3088 } 3051 }
3089
3090 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
3091 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); 3052 element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3092 if (element_size < 0)
3093 {
3094 GNUNET_break (0);
3095 return;
3096 }
3097
3098 element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size); 3053 element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
3099 element->element_type = msg->element_type; 3054 element->element_type = msg->element_type;
3100 element->size = element_size; 3055 element->size = element_size;
@@ -3103,71 +3058,61 @@ client_insert (void *cls,
3103 { 3058 {
3104 struct SetKey key = { SET_KIND_CURRENT, 0, 0 }; 3059 struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3105 struct SetEntry *entry; 3060 struct SetEntry *entry;
3106 entry = lookup_set (session, &key); 3061
3062 entry = lookup_set (session,
3063 &key);
3107 GNUNET_assert (NULL != entry); 3064 GNUNET_assert (NULL != entry);
3108 initial_set = entry->h; 3065 initial_set = entry->h;
3109 } 3066 }
3110 session->num_client_insert_pending++; 3067 session->num_client_insert_pending++;
3111 GNUNET_SET_add_element (initial_set, element, client_insert_done, session); 3068 GNUNET_SET_add_element (initial_set,
3069 element,
3070 &client_insert_done,
3071 session);
3112 3072
3113#ifdef GNUNET_EXTRA_LOGGING 3073#ifdef GNUNET_EXTRA_LOGGING
3114 { 3074 {
3115 struct GNUNET_HashCode hash; 3075 struct GNUNET_HashCode hash;
3116 3076
3117 GNUNET_SET_element_hash (element, &hash); 3077 GNUNET_SET_element_hash (element,
3078 &hash);
3118 3079
3119 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element %s added\n", 3080 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3081 "P%u: element %s added\n",
3120 session->local_peer_idx, 3082 session->local_peer_idx,
3121 GNUNET_h2s (&hash)); 3083 GNUNET_h2s (&hash));
3122 } 3084 }
3123#endif 3085#endif
3124
3125 GNUNET_free (element); 3086 GNUNET_free (element);
3126 GNUNET_SERVER_receive_done (client, GNUNET_OK); 3087 GNUNET_SERVICE_client_continue (session->client);
3127} 3088}
3128 3089
3129 3090
3130/** 3091/**
3131 * Called when a client performs the conclude operation. 3092 * Called when a client performs the conclude operation.
3132 * 3093 *
3133 * @param cls (unused) 3094 * @param cls client handle
3134 * @param client client handle
3135 * @param message message sent by the client 3095 * @param message message sent by the client
3136 */ 3096 */
3137static void 3097static void
3138client_conclude (void *cls, 3098handle_client_conclude (void *cls,
3139 struct GNUNET_SERVER_Client *client, 3099 const struct GNUNET_MessageHeader *message)
3140 const struct GNUNET_MessageHeader *message)
3141{ 3100{
3142 struct ConsensusSession *session; 3101 struct ConsensusSession *session = cls;
3143
3144 session = get_session_by_client (client);
3145 if (NULL == session)
3146 {
3147 /* client not found */
3148 GNUNET_break (0);
3149 GNUNET_SERVER_client_disconnect (client);
3150 return;
3151 }
3152 3102
3153 if (GNUNET_YES == session->conclude_started) 3103 if (GNUNET_YES == session->conclude_started)
3154 { 3104 {
3155 /* conclude started twice */ 3105 /* conclude started twice */
3156 GNUNET_break (0); 3106 GNUNET_break (0);
3157 GNUNET_SERVER_client_disconnect (client); 3107 GNUNET_SERVICE_client_drop (session->client);
3158 destroy_session (session);
3159 return; 3108 return;
3160 } 3109 }
3161 3110 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3162 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n"); 3111 "conclude requested\n");
3163
3164 session->conclude_started = GNUNET_YES; 3112 session->conclude_started = GNUNET_YES;
3165
3166 install_step_timeouts (session); 3113 install_step_timeouts (session);
3167 run_ready_steps (session); 3114 run_ready_steps (session);
3168 3115 GNUNET_SERVICE_client_continue (session->client);
3169
3170 GNUNET_SERVER_receive_done (client, GNUNET_OK);
3171} 3116}
3172 3117
3173 3118
@@ -3179,82 +3124,115 @@ client_conclude (void *cls,
3179static void 3124static void
3180shutdown_task (void *cls) 3125shutdown_task (void *cls)
3181{ 3126{
3182 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "shutting down\n"); 3127 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3183 while (NULL != sessions_head) 3128 "shutting down\n");
3184 destroy_session (sessions_head); 3129 GNUNET_STATISTICS_destroy (statistics,
3185 3130 GNUNET_NO);
3186 GNUNET_STATISTICS_destroy (statistics, GNUNET_NO); 3131 statistics = NULL;
3187} 3132}
3188 3133
3189 3134
3190/** 3135/**
3191 * Clean up after a client after it is 3136 * Start processing consensus requests.
3192 * disconnected (either by us or by itself)
3193 * 3137 *
3194 * @param cls closure, unused 3138 * @param cls closure
3195 * @param client the client to clean up after 3139 * @param c configuration to use
3140 * @param service the initialized service
3196 */ 3141 */
3197void 3142static void
3198handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) 3143run (void *cls,
3144 const struct GNUNET_CONFIGURATION_Handle *c,
3145 struct GNUNET_SERVICE_Handle *service)
3199{ 3146{
3200 struct ConsensusSession *session; 3147 cfg = c;
3201 3148 if (GNUNET_OK !=
3202 session = get_session_by_client (client); 3149 GNUNET_CRYPTO_get_peer_identity (cfg,
3203 if (NULL == session) 3150 &my_peer))
3151 {
3152 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3153 "Could not retrieve host identity\n");
3154 GNUNET_SCHEDULER_shutdown ();
3204 return; 3155 return;
3205 // FIXME: destroy if we can 3156 }
3157 statistics = GNUNET_STATISTICS_create ("consensus",
3158 cfg);
3159 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
3160 NULL);
3206} 3161}
3207 3162
3208 3163
3164/**
3165 * Callback called when a client connects to the service.
3166 *
3167 * @param cls closure for the service
3168 * @param c the new client that connected to the service
3169 * @param mq the message queue used to send messages to the client
3170 * @return @a c
3171 */
3172static void *
3173client_connect_cb (void *cls,
3174 struct GNUNET_SERVICE_Client *c,
3175 struct GNUNET_MQ_Handle *mq)
3176{
3177 struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3178
3179 session->client = c;
3180 session->client_mq = mq;
3181 GNUNET_CONTAINER_DLL_insert (sessions_head,
3182 sessions_tail,
3183 session);
3184 return session;
3185}
3186
3209 3187
3210/** 3188/**
3211 * Start processing consensus requests. 3189 * Callback called when a client disconnected from the service
3212 * 3190 *
3213 * @param cls closure 3191 * @param cls closure for the service
3214 * @param server the initialized server 3192 * @param c the client that disconnected
3215 * @param c configuration to use 3193 * @param internal_cls should be equal to @a c
3216 */ 3194 */
3217static void 3195static void
3218run (void *cls, struct GNUNET_SERVER_Handle *server, 3196client_disconnect_cb (void *cls,
3219 const struct GNUNET_CONFIGURATION_Handle *c) 3197 struct GNUNET_SERVICE_Client *c,
3198 void *internal_cls)
3220{ 3199{
3221 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { 3200 struct ConsensusSession *session = internal_cls;
3222 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3223 sizeof (struct GNUNET_MessageHeader)},
3224 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
3225 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
3226 {NULL, NULL, 0, 0}
3227 };
3228 3201
3229 cfg = c; 3202 if (NULL != session->set_listener)
3230 srv = server;
3231 if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer))
3232 { 3203 {
3233 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n"); 3204 GNUNET_SET_listen_cancel (session->set_listener);
3234 GNUNET_break (0); 3205 session->set_listener = NULL;
3235 GNUNET_SCHEDULER_shutdown ();
3236 return;
3237 } 3206 }
3238 statistics = GNUNET_STATISTICS_create ("consensus", cfg); 3207 GNUNET_CONTAINER_DLL_remove (sessions_head,
3239 GNUNET_SERVER_add_handlers (server, server_handlers); 3208 sessions_tail,
3240 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); 3209 session);
3241 GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL); 3210 GNUNET_free (session);
3242 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
3243} 3211}
3244 3212
3245 3213
3246/** 3214/**
3247 * The main function for the consensus service. 3215 * Define "main" method using service macro.
3248 *
3249 * @param argc number of arguments from the command line
3250 * @param argv command line arguments
3251 * @return 0 ok, 1 on error
3252 */ 3216 */
3253int 3217GNUNET_SERVICE_MAIN
3254main (int argc, char *const *argv) 3218("consensus",
3255{ 3219 GNUNET_SERVICE_OPTION_NONE,
3256 int ret; 3220 &run,
3257 ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL); 3221 &client_connect_cb,
3258 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret); 3222 &client_disconnect_cb,
3259 return (GNUNET_OK == ret) ? 0 : 1; 3223 NULL,
3260} 3224 GNUNET_MQ_hd_fixed_size (client_conclude,
3225 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
3226 struct GNUNET_MessageHeader,
3227 NULL),
3228 GNUNET_MQ_hd_var_size (client_insert,
3229 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT,
3230 struct GNUNET_CONSENSUS_ElementMessage,
3231 NULL),
3232 GNUNET_MQ_hd_var_size (client_join,
3233 GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN,
3234 struct GNUNET_CONSENSUS_JoinMessage,
3235 NULL),
3236 GNUNET_MQ_handler_end ());
3237
3238/* end of gnunet-service-consensus.c */