diff options
author | Matthias Wachs <wachs@net.in.tum.de> | 2013-08-01 14:36:10 +0000 |
---|---|---|
committer | Matthias Wachs <wachs@net.in.tum.de> | 2013-08-01 14:36:10 +0000 |
commit | c901c3c7eb667979c3ab9862927ab5e4a6b9a451 (patch) | |
tree | 8d52f6f3d952fed23936956a1f6e7379a56a005d /src/experimentation | |
parent | b9625263b024a2838fa6e2bcf8b43a6c78a07d0e (diff) | |
download | gnunet-c901c3c7eb667979c3ab9862927ab5e4a6b9a451.tar.gz gnunet-c901c3c7eb667979c3ab9862927ab5e4a6b9a451.zip |
new unified communication mechanism
Diffstat (limited to 'src/experimentation')
4 files changed, 221 insertions, 114 deletions
diff --git a/src/experimentation/gnunet-daemon-experimentation.h b/src/experimentation/gnunet-daemon-experimentation.h index 61dfd1b26..73499301a 100644 --- a/src/experimentation/gnunet-daemon-experimentation.h +++ b/src/experimentation/gnunet-daemon-experimentation.h | |||
@@ -166,8 +166,8 @@ struct Node | |||
166 | */ | 166 | */ |
167 | struct GNUNET_PeerIdentity *issuer_id; | 167 | struct GNUNET_PeerIdentity *issuer_id; |
168 | 168 | ||
169 | struct ExperimentStartCtx *e_req_head; | 169 | struct NodeComCtx *e_req_head; |
170 | struct ExperimentStartCtx *e_req_tail; | 170 | struct NodeComCtx *e_req_tail; |
171 | }; | 171 | }; |
172 | 172 | ||
173 | struct Experimentation_Issuer | 173 | struct Experimentation_Issuer |
@@ -273,6 +273,13 @@ GED_nodes_rts (struct Node *n); | |||
273 | int | 273 | int |
274 | GED_nodes_request_start (struct Node *n, struct Experiment *e); | 274 | GED_nodes_request_start (struct Node *n, struct Experiment *e); |
275 | 275 | ||
276 | /** | ||
277 | * Confirm a experiment START with a node | ||
278 | * | ||
279 | * @return GNUNET_NO if core was busy with sending, GNUNET_OK otherwise | ||
280 | */ | ||
281 | int | ||
282 | GED_nodes_send_start_ack (struct Node *n, struct Experiment *e); | ||
276 | 283 | ||
277 | /** | 284 | /** |
278 | * Start the nodes management | 285 | * Start the nodes management |
diff --git a/src/experimentation/gnunet-daemon-experimentation_nodes.c b/src/experimentation/gnunet-daemon-experimentation_nodes.c index fca7e001c..8c3063ac9 100644 --- a/src/experimentation/gnunet-daemon-experimentation_nodes.c +++ b/src/experimentation/gnunet-daemon-experimentation_nodes.c | |||
@@ -63,6 +63,19 @@ struct GNUNET_CONTAINER_MultiHashMap *nodes_active; | |||
63 | */ | 63 | */ |
64 | struct GNUNET_CONTAINER_MultiHashMap *nodes_inactive; | 64 | struct GNUNET_CONTAINER_MultiHashMap *nodes_inactive; |
65 | 65 | ||
66 | struct NodeComCtx | ||
67 | { | ||
68 | struct NodeComCtx *prev; | ||
69 | struct NodeComCtx *next; | ||
70 | |||
71 | struct Node *n; | ||
72 | struct Experiment *e; | ||
73 | |||
74 | size_t size; | ||
75 | GNUNET_CONNECTION_TransmitReadyNotify notify; | ||
76 | void *notify_cls; | ||
77 | }; | ||
78 | |||
66 | 79 | ||
67 | /** | 80 | /** |
68 | * Update statistics | 81 | * Update statistics |
@@ -96,7 +109,7 @@ static void update_stats (struct GNUNET_CONTAINER_MultiHashMap *m) | |||
96 | 109 | ||
97 | 110 | ||
98 | /** | 111 | /** |
99 | * Clean up nodes | 112 | * Clean up node |
100 | * | 113 | * |
101 | * @param cls the hashmap to clean up | 114 | * @param cls the hashmap to clean up |
102 | * @param key key of the current node | 115 | * @param key key of the current node |
@@ -104,11 +117,13 @@ static void update_stats (struct GNUNET_CONTAINER_MultiHashMap *m) | |||
104 | * @return always GNUNET_OK | 117 | * @return always GNUNET_OK |
105 | */ | 118 | */ |
106 | static int | 119 | static int |
107 | cleanup_nodes (void *cls, | 120 | cleanup_node (void *cls, |
108 | const struct GNUNET_HashCode * key, | 121 | const struct GNUNET_HashCode * key, |
109 | void *value) | 122 | void *value) |
110 | { | 123 | { |
111 | struct Node *n; | 124 | struct Node *n; |
125 | struct NodeComCtx *e_cur; | ||
126 | struct NodeComCtx *e_next; | ||
112 | struct GNUNET_CONTAINER_MultiHashMap *cur = cls; | 127 | struct GNUNET_CONTAINER_MultiHashMap *cur = cls; |
113 | 128 | ||
114 | n = value; | 129 | n = value; |
@@ -117,11 +132,20 @@ cleanup_nodes (void *cls, | |||
117 | GNUNET_SCHEDULER_cancel (n->timeout_task); | 132 | GNUNET_SCHEDULER_cancel (n->timeout_task); |
118 | n->timeout_task = GNUNET_SCHEDULER_NO_TASK; | 133 | n->timeout_task = GNUNET_SCHEDULER_NO_TASK; |
119 | } | 134 | } |
135 | |||
120 | if (NULL != n->cth) | 136 | if (NULL != n->cth) |
121 | { | 137 | { |
122 | GNUNET_CORE_notify_transmit_ready_cancel (n->cth); | 138 | GNUNET_CORE_notify_transmit_ready_cancel (n->cth); |
123 | n->cth = NULL; | 139 | n->cth = NULL; |
124 | } | 140 | } |
141 | e_next = n->e_req_head; | ||
142 | while (NULL != (e_cur = e_next)) | ||
143 | { | ||
144 | e_next = e_cur->next; | ||
145 | GNUNET_CONTAINER_DLL_remove (n->e_req_head, n->e_req_tail, e_cur); | ||
146 | GNUNET_free (e_cur); | ||
147 | } | ||
148 | |||
125 | GNUNET_free_non_null (n->issuer_id); | 149 | GNUNET_free_non_null (n->issuer_id); |
126 | 150 | ||
127 | GNUNET_CONTAINER_multihashmap_remove (cur, key, value); | 151 | GNUNET_CONTAINER_multihashmap_remove (cur, key, value); |
@@ -159,6 +183,47 @@ core_startup_handler (void *cls, | |||
159 | me = *my_identity; | 183 | me = *my_identity; |
160 | } | 184 | } |
161 | 185 | ||
186 | void | ||
187 | schedule_transmisson (struct NodeComCtx *e_ctx); | ||
188 | |||
189 | size_t | ||
190 | transmit_read_wrapper (void *cls, size_t bufsize, void *buf) | ||
191 | { | ||
192 | struct NodeComCtx *e_ctx = cls; | ||
193 | struct NodeComCtx *next = NULL; | ||
194 | |||
195 | size_t res = e_ctx->notify (e_ctx->notify_cls, bufsize, buf); | ||
196 | e_ctx->n->cth = NULL; | ||
197 | |||
198 | GNUNET_CONTAINER_DLL_remove (e_ctx->n->e_req_head, e_ctx->n->e_req_tail, e_ctx); | ||
199 | next = e_ctx->n->e_req_head; | ||
200 | GNUNET_free (e_ctx); | ||
201 | |||
202 | if (NULL != next) | ||
203 | { | ||
204 | /* Schedule next message */ | ||
205 | schedule_transmisson (next); | ||
206 | } | ||
207 | return res; | ||
208 | } | ||
209 | |||
210 | void | ||
211 | schedule_transmisson (struct NodeComCtx *e_ctx) | ||
212 | { | ||
213 | if (NULL != e_ctx->n->cth) | ||
214 | return; | ||
215 | |||
216 | e_ctx->n->cth = GNUNET_CORE_notify_transmit_ready (ch, GNUNET_NO, 0, FAST_TIMEOUT, | ||
217 | &e_ctx->n->id, e_ctx->size, transmit_read_wrapper, e_ctx); | ||
218 | if (NULL == e_ctx->n->cth) | ||
219 | { | ||
220 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Cannot send message to peer `%s' for experiment `%s'\n"), | ||
221 | GNUNET_i2s(&e_ctx->n->id), e_ctx->e->name); | ||
222 | GNUNET_free (e_ctx); | ||
223 | } | ||
224 | |||
225 | } | ||
226 | |||
162 | 227 | ||
163 | /** | 228 | /** |
164 | * Remove experimentation request due to timeout | 229 | * Remove experimentation request due to timeout |
@@ -242,6 +307,7 @@ size_t send_experimentation_request_cb (void *cls, size_t bufsize, void *buf) | |||
242 | static void send_experimentation_request (const struct GNUNET_PeerIdentity *peer) | 307 | static void send_experimentation_request (const struct GNUNET_PeerIdentity *peer) |
243 | { | 308 | { |
244 | struct Node *n; | 309 | struct Node *n; |
310 | struct NodeComCtx *e_ctx; | ||
245 | size_t size; | 311 | size_t size; |
246 | size_t c_issuers; | 312 | size_t c_issuers; |
247 | 313 | ||
@@ -253,9 +319,15 @@ static void send_experimentation_request (const struct GNUNET_PeerIdentity *peer | |||
253 | n->id = *peer; | 319 | n->id = *peer; |
254 | n->timeout_task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &remove_request, n); | 320 | n->timeout_task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &remove_request, n); |
255 | n->capabilities = NONE; | 321 | n->capabilities = NONE; |
256 | n->cth = GNUNET_CORE_notify_transmit_ready(ch, GNUNET_NO, 0, | 322 | |
257 | GNUNET_TIME_relative_get_forever_(), | 323 | e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx)); |
258 | peer, size, send_experimentation_request_cb, n); | 324 | e_ctx->n = n; |
325 | e_ctx->e = NULL; | ||
326 | e_ctx->size = size; | ||
327 | e_ctx->notify = &send_experimentation_request_cb; | ||
328 | e_ctx->notify_cls = n; | ||
329 | GNUNET_CONTAINER_DLL_insert_tail(n->e_req_head, n->e_req_tail, e_ctx); | ||
330 | schedule_transmisson (e_ctx); | ||
259 | 331 | ||
260 | GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (nodes_requested, | 332 | GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (nodes_requested, |
261 | &peer->hashPubKey, n, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); | 333 | &peer->hashPubKey, n, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); |
@@ -362,7 +434,7 @@ static void node_make_active (struct Node *n) | |||
362 | update_stats (nodes_active); | 434 | update_stats (nodes_active); |
363 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Added peer `%s' as active node\n"), | 435 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Added peer `%s' as active node\n"), |
364 | GNUNET_i2s (&n->id)); | 436 | GNUNET_i2s (&n->id)); |
365 | 437 | return; | |
366 | /* Request experiments for this node to start them */ | 438 | /* Request experiments for this node to start them */ |
367 | for (c1 = 0; c1 < n->issuer_count; c1++) | 439 | for (c1 = 0; c1 < n->issuer_count; c1++) |
368 | { | 440 | { |
@@ -382,6 +454,7 @@ static void handle_request (const struct GNUNET_PeerIdentity *peer, | |||
382 | const struct GNUNET_MessageHeader *message) | 454 | const struct GNUNET_MessageHeader *message) |
383 | { | 455 | { |
384 | struct Node *n; | 456 | struct Node *n; |
457 | struct NodeComCtx *e_ctx; | ||
385 | struct Experimentation_Request *rm = (struct Experimentation_Request *) message; | 458 | struct Experimentation_Request *rm = (struct Experimentation_Request *) message; |
386 | struct Experimentation_Issuer *rmi = (struct Experimentation_Issuer *) &rm[1]; | 459 | struct Experimentation_Issuer *rmi = (struct Experimentation_Issuer *) &rm[1]; |
387 | int c1; | 460 | int c1; |
@@ -467,12 +540,15 @@ static void handle_request (const struct GNUNET_PeerIdentity *peer, | |||
467 | node_make_active (n); | 540 | node_make_active (n); |
468 | 541 | ||
469 | /* Send response */ | 542 | /* Send response */ |
470 | n->cth = GNUNET_CORE_notify_transmit_ready (ch, GNUNET_NO, 0, | 543 | e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx)); |
471 | GNUNET_TIME_relative_get_forever_(), | 544 | e_ctx->n = n; |
472 | peer, | 545 | e_ctx->e = NULL; |
473 | sizeof (struct Experimentation_Response) + | 546 | e_ctx->size = sizeof (struct Experimentation_Response) + GSE_my_issuer_count * sizeof (struct Experimentation_Issuer); |
474 | GSE_my_issuer_count * sizeof (struct Experimentation_Issuer), | 547 | e_ctx->notify = &send_response_cb; |
475 | send_response_cb, n); | 548 | e_ctx->notify_cls = n; |
549 | |||
550 | GNUNET_CONTAINER_DLL_insert_tail(n->e_req_head, n->e_req_tail, e_ctx); | ||
551 | schedule_transmisson (e_ctx); | ||
476 | } | 552 | } |
477 | 553 | ||
478 | 554 | ||
@@ -494,7 +570,6 @@ static void handle_response (const struct GNUNET_PeerIdentity *peer, | |||
494 | unsigned int c1; | 570 | unsigned int c1; |
495 | unsigned int c2; | 571 | unsigned int c2; |
496 | 572 | ||
497 | |||
498 | if (ntohs (message->size) < sizeof (struct Experimentation_Response)) | 573 | if (ntohs (message->size) < sizeof (struct Experimentation_Response)) |
499 | { | 574 | { |
500 | GNUNET_break (0); | 575 | GNUNET_break (0); |
@@ -642,9 +717,6 @@ static void handle_start (const struct GNUNET_PeerIdentity *peer, | |||
642 | return; | 717 | return; |
643 | } | 718 | } |
644 | 719 | ||
645 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"), | ||
646 | "START", GNUNET_i2s (peer), name); | ||
647 | |||
648 | GED_scheduler_handle_start (n, e); | 720 | GED_scheduler_handle_start (n, e); |
649 | } | 721 | } |
650 | 722 | ||
@@ -714,9 +786,6 @@ static void handle_start_ack (const struct GNUNET_PeerIdentity *peer, | |||
714 | GNUNET_break (0); | 786 | GNUNET_break (0); |
715 | return; | 787 | return; |
716 | } | 788 | } |
717 | |||
718 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"), | ||
719 | "START_ACK", GNUNET_i2s (peer), name); | ||
720 | GED_scheduler_handle_start_ack (n, e); | 789 | GED_scheduler_handle_start_ack (n, e); |
721 | } | 790 | } |
722 | 791 | ||
@@ -814,7 +883,6 @@ void core_connect_handler (void *cls, | |||
814 | return; /* This peer is known as inactive */ | 883 | return; /* This peer is known as inactive */ |
815 | 884 | ||
816 | send_experimentation_request (peer); | 885 | send_experimentation_request (peer); |
817 | |||
818 | } | 886 | } |
819 | 887 | ||
820 | 888 | ||
@@ -827,12 +895,21 @@ void core_connect_handler (void *cls, | |||
827 | void core_disconnect_handler (void *cls, | 895 | void core_disconnect_handler (void *cls, |
828 | const struct GNUNET_PeerIdentity * peer) | 896 | const struct GNUNET_PeerIdentity * peer) |
829 | { | 897 | { |
898 | struct Node *n; | ||
830 | if (GNUNET_YES == is_me(peer)) | 899 | if (GNUNET_YES == is_me(peer)) |
831 | return; | 900 | return; |
832 | 901 | ||
833 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Disconnected from peer %s\n"), | 902 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Disconnected from peer %s\n"), |
834 | GNUNET_i2s (peer)); | 903 | GNUNET_i2s (peer)); |
835 | 904 | ||
905 | if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &peer->hashPubKey))) | ||
906 | cleanup_node (nodes_requested, &peer->hashPubKey, n); | ||
907 | |||
908 | if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_active, &peer->hashPubKey))) | ||
909 | cleanup_node (nodes_active, &peer->hashPubKey, n); | ||
910 | |||
911 | if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &peer->hashPubKey))) | ||
912 | cleanup_node (nodes_inactive, &peer->hashPubKey, n); | ||
836 | } | 913 | } |
837 | 914 | ||
838 | 915 | ||
@@ -878,29 +955,16 @@ core_receive_handler (void *cls, | |||
878 | return GNUNET_OK; | 955 | return GNUNET_OK; |
879 | } | 956 | } |
880 | 957 | ||
881 | struct ExperimentStartCtx | ||
882 | { | ||
883 | struct ExperimentStartCtx *prev; | ||
884 | struct ExperimentStartCtx *next; | ||
885 | |||
886 | struct Node *n; | ||
887 | struct Experiment *e; | ||
888 | }; | ||
889 | 958 | ||
890 | size_t node_experiment_start_cb (void *cls, size_t bufsize, void *buf) | 959 | size_t node_experiment_start_cb (void *cls, size_t bufsize, void *buf) |
891 | { | 960 | { |
892 | struct ExperimentStartCtx *e_ctx = cls; | 961 | struct NodeComCtx *e_ctx = cls; |
893 | struct GED_start_message *msg; | 962 | struct GED_start_message *msg; |
894 | size_t name_len; | 963 | size_t name_len; |
895 | size_t size; | 964 | size_t size; |
896 | 965 | ||
897 | GNUNET_CONTAINER_DLL_remove (e_ctx->n->e_req_head, e_ctx->n->e_req_tail, e_ctx); | ||
898 | e_ctx->n->cth = NULL; | ||
899 | if (NULL == buf) | 966 | if (NULL == buf) |
900 | { | ||
901 | GNUNET_free (e_ctx); | ||
902 | return 0; | 967 | return 0; |
903 | } | ||
904 | 968 | ||
905 | name_len = strlen(e_ctx->e->name) + 1; | 969 | name_len = strlen(e_ctx->e->name) + 1; |
906 | size = sizeof (struct GED_start_message) + name_len; | 970 | size = sizeof (struct GED_start_message) + name_len; |
@@ -915,21 +979,63 @@ size_t node_experiment_start_cb (void *cls, size_t bufsize, void *buf) | |||
915 | 979 | ||
916 | memcpy (buf, msg, size); | 980 | memcpy (buf, msg, size); |
917 | GNUNET_free (msg); | 981 | GNUNET_free (msg); |
918 | GNUNET_free (e_ctx); | ||
919 | return size; | 982 | return size; |
920 | } | 983 | } |
921 | 984 | ||
985 | size_t node_experiment_start_ack_cb (void *cls, size_t bufsize, void *buf) | ||
986 | { | ||
987 | struct NodeComCtx *e_ctx = cls; | ||
988 | struct GED_start_ack_message *msg; | ||
989 | size_t name_len; | ||
990 | size_t size; | ||
991 | if (NULL == buf) | ||
992 | return 0; | ||
922 | 993 | ||
994 | name_len = strlen(e_ctx->e->name) + 1; | ||
995 | size = sizeof (struct GED_start_ack_message) + name_len; | ||
996 | |||
997 | msg = GNUNET_malloc (size); | ||
998 | msg->header.size = htons (size); | ||
999 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START_ACK); | ||
1000 | msg->issuer = e_ctx->e->issuer; | ||
1001 | msg->version_nbo = GNUNET_TIME_absolute_hton(e_ctx->e->version); | ||
1002 | msg->len_name = htonl (name_len); | ||
1003 | memcpy (&msg[1], e_ctx->e->name, name_len); | ||
1004 | |||
1005 | memcpy (buf, msg, size); | ||
1006 | GNUNET_free (msg); | ||
1007 | return size; | ||
1008 | } | ||
1009 | |||
1010 | |||
1011 | |||
1012 | |||
1013 | /** | ||
1014 | * Confirm a experiment START with a node | ||
1015 | * | ||
1016 | * @return GNUNET_NO if core was busy with sending, GNUNET_OK otherwise | ||
1017 | */ | ||
923 | int | 1018 | int |
924 | GED_nodes_rts (struct Node *n) | 1019 | GED_nodes_send_start_ack (struct Node *n, struct Experiment *e) |
925 | { | 1020 | { |
926 | if (NULL == n->cth) | 1021 | struct NodeComCtx *e_ctx; |
927 | return GNUNET_YES; | 1022 | |
928 | else | 1023 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending %s for experiment request to peer `%s' for experiment `%s'\n"), |
929 | return GNUNET_NO; | 1024 | "START_ACK" ,GNUNET_i2s(&n->id), e->name); |
1025 | |||
1026 | e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx)); | ||
1027 | e_ctx->n = n; | ||
1028 | e_ctx->e = e; | ||
1029 | e_ctx->size = sizeof (struct GED_start_ack_message) + strlen (e->name) + 1; | ||
1030 | e_ctx->notify = &node_experiment_start_ack_cb; | ||
1031 | e_ctx->notify_cls = e_ctx; | ||
930 | 1032 | ||
1033 | GNUNET_CONTAINER_DLL_insert_tail (n->e_req_head, n->e_req_tail, e_ctx); | ||
1034 | schedule_transmisson (e_ctx); | ||
1035 | return GNUNET_OK; | ||
931 | } | 1036 | } |
932 | 1037 | ||
1038 | |||
933 | /** | 1039 | /** |
934 | * Request a experiment to start with a node | 1040 | * Request a experiment to start with a node |
935 | * | 1041 | * |
@@ -938,31 +1044,20 @@ GED_nodes_rts (struct Node *n) | |||
938 | int | 1044 | int |
939 | GED_nodes_request_start (struct Node *n, struct Experiment *e) | 1045 | GED_nodes_request_start (struct Node *n, struct Experiment *e) |
940 | { | 1046 | { |
941 | struct ExperimentStartCtx *e_ctx; | 1047 | struct NodeComCtx *e_ctx; |
942 | |||
943 | if (NULL != n->cth) | ||
944 | { | ||
945 | GNUNET_break (0); /* should call rts before */ | ||
946 | return GNUNET_NO; | ||
947 | } | ||
948 | 1048 | ||
949 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending experiment start request to peer `%s' for experiment `%s'\n"), | 1049 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending %s for experiment request to peer `%s' for experiment `%s'\n"), |
950 | GNUNET_i2s(&n->id), e->name); | 1050 | "START", GNUNET_i2s(&n->id), e->name); |
951 | 1051 | ||
952 | e_ctx = GNUNET_malloc (sizeof (struct ExperimentStartCtx)); | 1052 | e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx)); |
953 | e_ctx->n = n; | 1053 | e_ctx->n = n; |
954 | e_ctx->e = e; | 1054 | e_ctx->e = e; |
955 | n->cth = GNUNET_CORE_notify_transmit_ready (ch, GNUNET_NO, 0, FAST_TIMEOUT, &n->id, | 1055 | e_ctx->size = sizeof (struct GED_start_message) + strlen (e->name) + 1; |
956 | sizeof (struct GED_start_message) + strlen (e->name) + 1, | 1056 | e_ctx->notify = &node_experiment_start_cb; |
957 | &node_experiment_start_cb, e_ctx); | 1057 | e_ctx->notify_cls = e_ctx; |
958 | if (NULL == n->cth) | ||
959 | { | ||
960 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Cannot send experiment start request to peer `%s' for experiment `%s'\n"), | ||
961 | GNUNET_i2s(&n->id), e->name); | ||
962 | GNUNET_free (e_ctx); | ||
963 | } | ||
964 | GNUNET_CONTAINER_DLL_insert (n->e_req_head, n->e_req_tail, e_ctx); | ||
965 | 1058 | ||
1059 | GNUNET_CONTAINER_DLL_insert_tail (n->e_req_head, n->e_req_tail, e_ctx); | ||
1060 | schedule_transmisson (e_ctx); | ||
966 | return GNUNET_OK; | 1061 | return GNUNET_OK; |
967 | } | 1062 | } |
968 | 1063 | ||
@@ -998,16 +1093,10 @@ GED_nodes_start () | |||
998 | void | 1093 | void |
999 | GED_nodes_stop () | 1094 | GED_nodes_stop () |
1000 | { | 1095 | { |
1001 | if (NULL != ch) | ||
1002 | { | ||
1003 | GNUNET_CORE_disconnect (ch); | ||
1004 | ch = NULL; | ||
1005 | } | ||
1006 | |||
1007 | if (NULL != nodes_requested) | 1096 | if (NULL != nodes_requested) |
1008 | { | 1097 | { |
1009 | GNUNET_CONTAINER_multihashmap_iterate (nodes_requested, | 1098 | GNUNET_CONTAINER_multihashmap_iterate (nodes_requested, |
1010 | &cleanup_nodes, | 1099 | &cleanup_node, |
1011 | nodes_requested); | 1100 | nodes_requested); |
1012 | update_stats (nodes_requested); | 1101 | update_stats (nodes_requested); |
1013 | GNUNET_CONTAINER_multihashmap_destroy (nodes_requested); | 1102 | GNUNET_CONTAINER_multihashmap_destroy (nodes_requested); |
@@ -1017,7 +1106,7 @@ GED_nodes_stop () | |||
1017 | if (NULL != nodes_active) | 1106 | if (NULL != nodes_active) |
1018 | { | 1107 | { |
1019 | GNUNET_CONTAINER_multihashmap_iterate (nodes_active, | 1108 | GNUNET_CONTAINER_multihashmap_iterate (nodes_active, |
1020 | &cleanup_nodes, | 1109 | &cleanup_node, |
1021 | nodes_active); | 1110 | nodes_active); |
1022 | update_stats (nodes_active); | 1111 | update_stats (nodes_active); |
1023 | GNUNET_CONTAINER_multihashmap_destroy (nodes_active); | 1112 | GNUNET_CONTAINER_multihashmap_destroy (nodes_active); |
@@ -1027,12 +1116,17 @@ GED_nodes_stop () | |||
1027 | if (NULL != nodes_inactive) | 1116 | if (NULL != nodes_inactive) |
1028 | { | 1117 | { |
1029 | GNUNET_CONTAINER_multihashmap_iterate (nodes_inactive, | 1118 | GNUNET_CONTAINER_multihashmap_iterate (nodes_inactive, |
1030 | &cleanup_nodes, | 1119 | &cleanup_node, |
1031 | nodes_inactive); | 1120 | nodes_inactive); |
1032 | update_stats (nodes_inactive); | 1121 | update_stats (nodes_inactive); |
1033 | GNUNET_CONTAINER_multihashmap_destroy (nodes_inactive); | 1122 | GNUNET_CONTAINER_multihashmap_destroy (nodes_inactive); |
1034 | nodes_inactive = NULL; | 1123 | nodes_inactive = NULL; |
1035 | } | 1124 | } |
1125 | if (NULL != ch) | ||
1126 | { | ||
1127 | GNUNET_CORE_disconnect (ch); | ||
1128 | ch = NULL; | ||
1129 | } | ||
1036 | } | 1130 | } |
1037 | 1131 | ||
1038 | /* end of gnunet-daemon-experimentation_nodes.c */ | 1132 | /* end of gnunet-daemon-experimentation_nodes.c */ |
diff --git a/src/experimentation/gnunet-daemon-experimentation_scheduler.c b/src/experimentation/gnunet-daemon-experimentation_scheduler.c index de5c80614..519caa877 100644 --- a/src/experimentation/gnunet-daemon-experimentation_scheduler.c +++ b/src/experimentation/gnunet-daemon-experimentation_scheduler.c | |||
@@ -122,27 +122,13 @@ static void run_experiment_inbound (void *cls,const struct GNUNET_SCHEDULER_Task | |||
122 | struct ScheduledExperiment *se = cls; | 122 | struct ScheduledExperiment *se = cls; |
123 | struct GNUNET_TIME_Relative start; | 123 | struct GNUNET_TIME_Relative start; |
124 | struct GNUNET_TIME_Relative end; | 124 | struct GNUNET_TIME_Relative end; |
125 | struct GNUNET_TIME_Relative backoff; | ||
126 | 125 | ||
127 | se->task = GNUNET_SCHEDULER_NO_TASK; | 126 | se->task = GNUNET_SCHEDULER_NO_TASK; |
128 | 127 | ||
129 | if (GNUNET_NO == GED_nodes_rts (se->n)) | ||
130 | { | ||
131 | se->state = BUSY; | ||
132 | backoff = GNUNET_TIME_UNIT_SECONDS; | ||
133 | backoff.rel_value += GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000); | ||
134 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to peer `%s' for `%s' for %llu ms\n", | ||
135 | GNUNET_i2s (&se->n->id), se->e->name, (unsigned long long) backoff.rel_value); | ||
136 | se->task = GNUNET_SCHEDULER_add_delayed (backoff, &run_experiment_inbound, se); | ||
137 | return; | ||
138 | } | ||
139 | else if (BUSY == se->state) | ||
140 | se->state = NOT_RUNNING; | ||
141 | |||
142 | switch (se->state) { | 128 | switch (se->state) { |
143 | case NOT_RUNNING: | 129 | case NOT_RUNNING: |
144 | /* Send START_ACK message */ | 130 | /* Send START_ACK message */ |
145 | //GED_nodes_request_start (se->n, se->e); | 131 | GED_nodes_send_start_ack (se->n, se->e); |
146 | se->state = REQUESTED; | 132 | se->state = REQUESTED; |
147 | /* Schedule to run */ | 133 | /* Schedule to run */ |
148 | start = GNUNET_TIME_absolute_get_remaining(se->e->start); | 134 | start = GNUNET_TIME_absolute_get_remaining(se->e->start); |
@@ -152,12 +138,11 @@ static void run_experiment_inbound (void *cls,const struct GNUNET_SCHEDULER_Task | |||
152 | se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_inbound, se); | 138 | se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_inbound, se); |
153 | break; | 139 | break; |
154 | case REQUESTED: | 140 | case REQUESTED: |
155 | /* Already requested */ | ||
156 | se->state = STARTED; | ||
157 | case STARTED: | 141 | case STARTED: |
142 | se->state = STARTED; | ||
158 | /* Experiment is running */ | 143 | /* Experiment is running */ |
159 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment `%s' peer for `%s'\n", | 144 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running %s experiment `%s' peer for `%s'\n", |
160 | GNUNET_i2s (&se->n->id), se->e->name); | 145 | "inbound", GNUNET_i2s (&se->n->id), se->e->name); |
161 | 146 | ||
162 | /* do work here */ | 147 | /* do work here */ |
163 | 148 | ||
@@ -184,33 +169,15 @@ static void run_experiment_outbound (void *cls,const struct GNUNET_SCHEDULER_Tas | |||
184 | { | 169 | { |
185 | struct ScheduledExperiment *se = cls; | 170 | struct ScheduledExperiment *se = cls; |
186 | struct GNUNET_TIME_Relative end; | 171 | struct GNUNET_TIME_Relative end; |
187 | struct GNUNET_TIME_Relative backoff; | ||
188 | 172 | ||
189 | se->task = GNUNET_SCHEDULER_NO_TASK; | 173 | se->task = GNUNET_SCHEDULER_NO_TASK; |
190 | 174 | ||
191 | if (GNUNET_NO == GED_nodes_rts (se->n)) | ||
192 | { | ||
193 | /* Cannot send to peer, core is busy */ | ||
194 | se->state = BUSY; | ||
195 | backoff = GNUNET_TIME_UNIT_SECONDS; | ||
196 | backoff.rel_value += GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000); | ||
197 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to peer `%s' for `%s' for %llu ms\n", | ||
198 | GNUNET_i2s (&se->n->id), se->e->name, (unsigned long long) backoff.rel_value); | ||
199 | se->task = GNUNET_SCHEDULER_add_delayed (backoff, &run_experiment_outbound, se); | ||
200 | return; | ||
201 | } | ||
202 | else if (BUSY == se->state) | ||
203 | se->state = NOT_RUNNING; /* Not busy anymore, can send */ | ||
204 | |||
205 | switch (se->state) { | 175 | switch (se->state) { |
206 | case NOT_RUNNING: | 176 | case NOT_RUNNING: |
207 | /* Send START message */ | 177 | /* Send START message */ |
208 | GED_nodes_request_start (se->n, se->e); | 178 | GED_nodes_request_start (se->n, se->e); |
209 | se->state = REQUESTED; | 179 | se->state = REQUESTED; |
210 | se->task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &request_timeout, se); | 180 | se->task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &request_timeout, se); |
211 | |||
212 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Sending start request to peer `%s' for `%s'\n", | ||
213 | GNUNET_i2s (&se->n->id), se->e->name); | ||
214 | experiments_requested ++; | 181 | experiments_requested ++; |
215 | GNUNET_STATISTICS_set (GED_stats, "# experiments requested", experiments_requested, GNUNET_NO); | 182 | GNUNET_STATISTICS_set (GED_stats, "# experiments requested", experiments_requested, GNUNET_NO); |
216 | break; | 183 | break; |
@@ -220,8 +187,8 @@ static void run_experiment_outbound (void *cls,const struct GNUNET_SCHEDULER_Tas | |||
220 | break; | 187 | break; |
221 | case STARTED: | 188 | case STARTED: |
222 | /* Experiment is running */ | 189 | /* Experiment is running */ |
223 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment `%s' peer for `%s'\n", | 190 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running %s experiment `%s' peer for `%s'\n", |
224 | GNUNET_i2s (&se->n->id), se->e->name); | 191 | "outbound", GNUNET_i2s (&se->n->id), se->e->name); |
225 | 192 | ||
226 | /* do work here */ | 193 | /* do work here */ |
227 | 194 | ||
@@ -271,7 +238,7 @@ GED_scheduler_handle_start (struct Node *n, struct Experiment *e) | |||
271 | } | 238 | } |
272 | 239 | ||
273 | /** | 240 | /** |
274 | * Handle a START_ACL message from a remote node | 241 | * Handle a START_ACK message from a remote node |
275 | * | 242 | * |
276 | * @param n the node | 243 | * @param n the node |
277 | * @param e the experiment | 244 | * @param e the experiment |
@@ -281,7 +248,7 @@ GED_scheduler_handle_start_ack (struct Node *n, struct Experiment *e) | |||
281 | { | 248 | { |
282 | struct ScheduledExperiment *se; | 249 | struct ScheduledExperiment *se; |
283 | 250 | ||
284 | if (NULL == (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO))) | 251 | if (NULL == (se = find_experiment (waiting_out_head, waiting_out_tail, n, e, GNUNET_YES))) |
285 | { | 252 | { |
286 | GNUNET_break (0); | 253 | GNUNET_break (0); |
287 | return; | 254 | return; |
@@ -291,7 +258,14 @@ GED_scheduler_handle_start_ack (struct Node *n, struct Experiment *e) | |||
291 | "START_ACK", GNUNET_i2s (&n->id), e->name); | 258 | "START_ACK", GNUNET_i2s (&n->id), e->name); |
292 | 259 | ||
293 | if (GNUNET_SCHEDULER_NO_TASK != se->task) | 260 | if (GNUNET_SCHEDULER_NO_TASK != se->task) |
294 | GNUNET_SCHEDULER_cancel (se->task); | 261 | GNUNET_SCHEDULER_cancel (se->task); /* *Canceling timeout task */ |
262 | |||
263 | /* Remove from waiting list, add to running list */ | ||
264 | GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, se); | ||
265 | GNUNET_CONTAINER_DLL_insert (running_out_head, waiting_out_tail, se); | ||
266 | |||
267 | /* Change state and schedule to run */ | ||
268 | se->state = STARTED; | ||
295 | se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se); | 269 | se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se); |
296 | } | 270 | } |
297 | 271 | ||
@@ -427,6 +401,38 @@ GED_scheduler_stop () | |||
427 | experiments_running --; | 401 | experiments_running --; |
428 | GNUNET_STATISTICS_set (GED_stats, "# experiments running", experiments_running, GNUNET_NO); | 402 | GNUNET_STATISTICS_set (GED_stats, "# experiments running", experiments_running, GNUNET_NO); |
429 | } | 403 | } |
404 | |||
405 | next = waiting_out_head; | ||
406 | while (NULL != (cur = next)) | ||
407 | { | ||
408 | next = cur->next; | ||
409 | GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, cur); | ||
410 | if (GNUNET_SCHEDULER_NO_TASK != cur->task) | ||
411 | { | ||
412 | GNUNET_SCHEDULER_cancel (cur->task); | ||
413 | cur->task = GNUNET_SCHEDULER_NO_TASK; | ||
414 | } | ||
415 | GNUNET_free (cur); | ||
416 | GNUNET_assert (experiments_scheduled > 0); | ||
417 | experiments_scheduled --; | ||
418 | GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO); | ||
419 | } | ||
420 | |||
421 | next = running_out_head; | ||
422 | while (NULL != (cur = next)) | ||
423 | { | ||
424 | next = cur->next; | ||
425 | GNUNET_CONTAINER_DLL_remove (running_out_head, running_out_tail, cur); | ||
426 | if (GNUNET_SCHEDULER_NO_TASK != cur->task) | ||
427 | { | ||
428 | GNUNET_SCHEDULER_cancel (cur->task); | ||
429 | cur->task = GNUNET_SCHEDULER_NO_TASK; | ||
430 | } | ||
431 | GNUNET_free (cur); | ||
432 | GNUNET_assert (experiments_running > 0); | ||
433 | experiments_running --; | ||
434 | GNUNET_STATISTICS_set (GED_stats, "# experiments running", experiments_running, GNUNET_NO); | ||
435 | } | ||
430 | } | 436 | } |
431 | 437 | ||
432 | /* end of gnunet-daemon-experimentation_scheduler.c */ | 438 | /* end of gnunet-daemon-experimentation_scheduler.c */ |
diff --git a/src/experimentation/test_experimentation_clique.conf b/src/experimentation/test_experimentation_clique.conf index bc6a34dcc..5b2b86ab2 100644 --- a/src/experimentation/test_experimentation_clique.conf +++ b/src/experimentation/test_experimentation_clique.conf | |||
@@ -32,7 +32,7 @@ NEIGHBOUR_LIMIT = 50 | |||
32 | PORT = 12365 | 32 | PORT = 12365 |
33 | 33 | ||
34 | [experimentation] | 34 | [experimentation] |
35 | #PREFIX = valgrind --leak-check=full | 35 | PREFIX = valgrind --leak-check=full |
36 | ISSUERS = TFRM29O2RQNKLVBQIGODJ6GD58LSQ2NM9TNFBC6N48BRJHQO38Q73N2OM3V4CLKDM6CILQV4CU8PMJDRG0FNB0PDI057DBRANMLPLRG | 36 | ISSUERS = TFRM29O2RQNKLVBQIGODJ6GD58LSQ2NM9TNFBC6N48BRJHQO38Q73N2OM3V4CLKDM6CILQV4CU8PMJDRG0FNB0PDI057DBRANMLPLRG |
37 | EXPERIMENTS = test_experiments.exp | 37 | EXPERIMENTS = test_experiments.exp |
38 | 38 | ||