aboutsummaryrefslogtreecommitdiff
path: root/src/experimentation
diff options
context:
space:
mode:
authorMatthias Wachs <wachs@net.in.tum.de>2013-08-01 14:36:10 +0000
committerMatthias Wachs <wachs@net.in.tum.de>2013-08-01 14:36:10 +0000
commitc901c3c7eb667979c3ab9862927ab5e4a6b9a451 (patch)
tree8d52f6f3d952fed23936956a1f6e7379a56a005d /src/experimentation
parentb9625263b024a2838fa6e2bcf8b43a6c78a07d0e (diff)
downloadgnunet-c901c3c7eb667979c3ab9862927ab5e4a6b9a451.tar.gz
gnunet-c901c3c7eb667979c3ab9862927ab5e4a6b9a451.zip
new unified communication mechanism
Diffstat (limited to 'src/experimentation')
-rw-r--r--src/experimentation/gnunet-daemon-experimentation.h11
-rw-r--r--src/experimentation/gnunet-daemon-experimentation_nodes.c232
-rw-r--r--src/experimentation/gnunet-daemon-experimentation_scheduler.c90
-rw-r--r--src/experimentation/test_experimentation_clique.conf2
4 files changed, 221 insertions, 114 deletions
diff --git a/src/experimentation/gnunet-daemon-experimentation.h b/src/experimentation/gnunet-daemon-experimentation.h
index 61dfd1b26..73499301a 100644
--- a/src/experimentation/gnunet-daemon-experimentation.h
+++ b/src/experimentation/gnunet-daemon-experimentation.h
@@ -166,8 +166,8 @@ struct Node
166 */ 166 */
167 struct GNUNET_PeerIdentity *issuer_id; 167 struct GNUNET_PeerIdentity *issuer_id;
168 168
169 struct ExperimentStartCtx *e_req_head; 169 struct NodeComCtx *e_req_head;
170 struct ExperimentStartCtx *e_req_tail; 170 struct NodeComCtx *e_req_tail;
171}; 171};
172 172
173struct Experimentation_Issuer 173struct Experimentation_Issuer
@@ -273,6 +273,13 @@ GED_nodes_rts (struct Node *n);
273int 273int
274GED_nodes_request_start (struct Node *n, struct Experiment *e); 274GED_nodes_request_start (struct Node *n, struct Experiment *e);
275 275
276/**
277 * Confirm a experiment START with a node
278 *
279 * @return GNUNET_NO if core was busy with sending, GNUNET_OK otherwise
280 */
281int
282GED_nodes_send_start_ack (struct Node *n, struct Experiment *e);
276 283
277/** 284/**
278 * Start the nodes management 285 * Start the nodes management
diff --git a/src/experimentation/gnunet-daemon-experimentation_nodes.c b/src/experimentation/gnunet-daemon-experimentation_nodes.c
index fca7e001c..8c3063ac9 100644
--- a/src/experimentation/gnunet-daemon-experimentation_nodes.c
+++ b/src/experimentation/gnunet-daemon-experimentation_nodes.c
@@ -63,6 +63,19 @@ struct GNUNET_CONTAINER_MultiHashMap *nodes_active;
63 */ 63 */
64struct GNUNET_CONTAINER_MultiHashMap *nodes_inactive; 64struct GNUNET_CONTAINER_MultiHashMap *nodes_inactive;
65 65
66struct NodeComCtx
67{
68 struct NodeComCtx *prev;
69 struct NodeComCtx *next;
70
71 struct Node *n;
72 struct Experiment *e;
73
74 size_t size;
75 GNUNET_CONNECTION_TransmitReadyNotify notify;
76 void *notify_cls;
77};
78
66 79
67/** 80/**
68 * Update statistics 81 * Update statistics
@@ -96,7 +109,7 @@ static void update_stats (struct GNUNET_CONTAINER_MultiHashMap *m)
96 109
97 110
98/** 111/**
99 * Clean up nodes 112 * Clean up node
100 * 113 *
101 * @param cls the hashmap to clean up 114 * @param cls the hashmap to clean up
102 * @param key key of the current node 115 * @param key key of the current node
@@ -104,11 +117,13 @@ static void update_stats (struct GNUNET_CONTAINER_MultiHashMap *m)
104 * @return always GNUNET_OK 117 * @return always GNUNET_OK
105 */ 118 */
106static int 119static int
107cleanup_nodes (void *cls, 120cleanup_node (void *cls,
108 const struct GNUNET_HashCode * key, 121 const struct GNUNET_HashCode * key,
109 void *value) 122 void *value)
110{ 123{
111 struct Node *n; 124 struct Node *n;
125 struct NodeComCtx *e_cur;
126 struct NodeComCtx *e_next;
112 struct GNUNET_CONTAINER_MultiHashMap *cur = cls; 127 struct GNUNET_CONTAINER_MultiHashMap *cur = cls;
113 128
114 n = value; 129 n = value;
@@ -117,11 +132,20 @@ cleanup_nodes (void *cls,
117 GNUNET_SCHEDULER_cancel (n->timeout_task); 132 GNUNET_SCHEDULER_cancel (n->timeout_task);
118 n->timeout_task = GNUNET_SCHEDULER_NO_TASK; 133 n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
119 } 134 }
135
120 if (NULL != n->cth) 136 if (NULL != n->cth)
121 { 137 {
122 GNUNET_CORE_notify_transmit_ready_cancel (n->cth); 138 GNUNET_CORE_notify_transmit_ready_cancel (n->cth);
123 n->cth = NULL; 139 n->cth = NULL;
124 } 140 }
141 e_next = n->e_req_head;
142 while (NULL != (e_cur = e_next))
143 {
144 e_next = e_cur->next;
145 GNUNET_CONTAINER_DLL_remove (n->e_req_head, n->e_req_tail, e_cur);
146 GNUNET_free (e_cur);
147 }
148
125 GNUNET_free_non_null (n->issuer_id); 149 GNUNET_free_non_null (n->issuer_id);
126 150
127 GNUNET_CONTAINER_multihashmap_remove (cur, key, value); 151 GNUNET_CONTAINER_multihashmap_remove (cur, key, value);
@@ -159,6 +183,47 @@ core_startup_handler (void *cls,
159 me = *my_identity; 183 me = *my_identity;
160} 184}
161 185
186void
187schedule_transmisson (struct NodeComCtx *e_ctx);
188
189size_t
190transmit_read_wrapper (void *cls, size_t bufsize, void *buf)
191{
192 struct NodeComCtx *e_ctx = cls;
193 struct NodeComCtx *next = NULL;
194
195 size_t res = e_ctx->notify (e_ctx->notify_cls, bufsize, buf);
196 e_ctx->n->cth = NULL;
197
198 GNUNET_CONTAINER_DLL_remove (e_ctx->n->e_req_head, e_ctx->n->e_req_tail, e_ctx);
199 next = e_ctx->n->e_req_head;
200 GNUNET_free (e_ctx);
201
202 if (NULL != next)
203 {
204 /* Schedule next message */
205 schedule_transmisson (next);
206 }
207 return res;
208}
209
210void
211schedule_transmisson (struct NodeComCtx *e_ctx)
212{
213 if (NULL != e_ctx->n->cth)
214 return;
215
216 e_ctx->n->cth = GNUNET_CORE_notify_transmit_ready (ch, GNUNET_NO, 0, FAST_TIMEOUT,
217 &e_ctx->n->id, e_ctx->size, transmit_read_wrapper, e_ctx);
218 if (NULL == e_ctx->n->cth)
219 {
220 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Cannot send message to peer `%s' for experiment `%s'\n"),
221 GNUNET_i2s(&e_ctx->n->id), e_ctx->e->name);
222 GNUNET_free (e_ctx);
223 }
224
225}
226
162 227
163/** 228/**
164 * Remove experimentation request due to timeout 229 * Remove experimentation request due to timeout
@@ -242,6 +307,7 @@ size_t send_experimentation_request_cb (void *cls, size_t bufsize, void *buf)
242static void send_experimentation_request (const struct GNUNET_PeerIdentity *peer) 307static void send_experimentation_request (const struct GNUNET_PeerIdentity *peer)
243{ 308{
244 struct Node *n; 309 struct Node *n;
310 struct NodeComCtx *e_ctx;
245 size_t size; 311 size_t size;
246 size_t c_issuers; 312 size_t c_issuers;
247 313
@@ -253,9 +319,15 @@ static void send_experimentation_request (const struct GNUNET_PeerIdentity *peer
253 n->id = *peer; 319 n->id = *peer;
254 n->timeout_task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &remove_request, n); 320 n->timeout_task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &remove_request, n);
255 n->capabilities = NONE; 321 n->capabilities = NONE;
256 n->cth = GNUNET_CORE_notify_transmit_ready(ch, GNUNET_NO, 0, 322
257 GNUNET_TIME_relative_get_forever_(), 323 e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
258 peer, size, send_experimentation_request_cb, n); 324 e_ctx->n = n;
325 e_ctx->e = NULL;
326 e_ctx->size = size;
327 e_ctx->notify = &send_experimentation_request_cb;
328 e_ctx->notify_cls = n;
329 GNUNET_CONTAINER_DLL_insert_tail(n->e_req_head, n->e_req_tail, e_ctx);
330 schedule_transmisson (e_ctx);
259 331
260 GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (nodes_requested, 332 GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (nodes_requested,
261 &peer->hashPubKey, n, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); 333 &peer->hashPubKey, n, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
@@ -362,7 +434,7 @@ static void node_make_active (struct Node *n)
362 update_stats (nodes_active); 434 update_stats (nodes_active);
363 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Added peer `%s' as active node\n"), 435 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Added peer `%s' as active node\n"),
364 GNUNET_i2s (&n->id)); 436 GNUNET_i2s (&n->id));
365 437return;
366 /* Request experiments for this node to start them */ 438 /* Request experiments for this node to start them */
367 for (c1 = 0; c1 < n->issuer_count; c1++) 439 for (c1 = 0; c1 < n->issuer_count; c1++)
368 { 440 {
@@ -382,6 +454,7 @@ static void handle_request (const struct GNUNET_PeerIdentity *peer,
382 const struct GNUNET_MessageHeader *message) 454 const struct GNUNET_MessageHeader *message)
383{ 455{
384 struct Node *n; 456 struct Node *n;
457 struct NodeComCtx *e_ctx;
385 struct Experimentation_Request *rm = (struct Experimentation_Request *) message; 458 struct Experimentation_Request *rm = (struct Experimentation_Request *) message;
386 struct Experimentation_Issuer *rmi = (struct Experimentation_Issuer *) &rm[1]; 459 struct Experimentation_Issuer *rmi = (struct Experimentation_Issuer *) &rm[1];
387 int c1; 460 int c1;
@@ -467,12 +540,15 @@ static void handle_request (const struct GNUNET_PeerIdentity *peer,
467 node_make_active (n); 540 node_make_active (n);
468 541
469 /* Send response */ 542 /* Send response */
470 n->cth = GNUNET_CORE_notify_transmit_ready (ch, GNUNET_NO, 0, 543 e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
471 GNUNET_TIME_relative_get_forever_(), 544 e_ctx->n = n;
472 peer, 545 e_ctx->e = NULL;
473 sizeof (struct Experimentation_Response) + 546 e_ctx->size = sizeof (struct Experimentation_Response) + GSE_my_issuer_count * sizeof (struct Experimentation_Issuer);
474 GSE_my_issuer_count * sizeof (struct Experimentation_Issuer), 547 e_ctx->notify = &send_response_cb;
475 send_response_cb, n); 548 e_ctx->notify_cls = n;
549
550 GNUNET_CONTAINER_DLL_insert_tail(n->e_req_head, n->e_req_tail, e_ctx);
551 schedule_transmisson (e_ctx);
476} 552}
477 553
478 554
@@ -494,7 +570,6 @@ static void handle_response (const struct GNUNET_PeerIdentity *peer,
494 unsigned int c1; 570 unsigned int c1;
495 unsigned int c2; 571 unsigned int c2;
496 572
497
498 if (ntohs (message->size) < sizeof (struct Experimentation_Response)) 573 if (ntohs (message->size) < sizeof (struct Experimentation_Response))
499 { 574 {
500 GNUNET_break (0); 575 GNUNET_break (0);
@@ -642,9 +717,6 @@ static void handle_start (const struct GNUNET_PeerIdentity *peer,
642 return; 717 return;
643 } 718 }
644 719
645 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"),
646 "START", GNUNET_i2s (peer), name);
647
648 GED_scheduler_handle_start (n, e); 720 GED_scheduler_handle_start (n, e);
649} 721}
650 722
@@ -714,9 +786,6 @@ static void handle_start_ack (const struct GNUNET_PeerIdentity *peer,
714 GNUNET_break (0); 786 GNUNET_break (0);
715 return; 787 return;
716 } 788 }
717
718 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer %s for experiment `%s'\n"),
719 "START_ACK", GNUNET_i2s (peer), name);
720 GED_scheduler_handle_start_ack (n, e); 789 GED_scheduler_handle_start_ack (n, e);
721} 790}
722 791
@@ -814,7 +883,6 @@ void core_connect_handler (void *cls,
814 return; /* This peer is known as inactive */ 883 return; /* This peer is known as inactive */
815 884
816 send_experimentation_request (peer); 885 send_experimentation_request (peer);
817
818} 886}
819 887
820 888
@@ -827,12 +895,21 @@ void core_connect_handler (void *cls,
827void core_disconnect_handler (void *cls, 895void core_disconnect_handler (void *cls,
828 const struct GNUNET_PeerIdentity * peer) 896 const struct GNUNET_PeerIdentity * peer)
829{ 897{
898 struct Node *n;
830 if (GNUNET_YES == is_me(peer)) 899 if (GNUNET_YES == is_me(peer))
831 return; 900 return;
832 901
833 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Disconnected from peer %s\n"), 902 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Disconnected from peer %s\n"),
834 GNUNET_i2s (peer)); 903 GNUNET_i2s (peer));
835 904
905 if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_requested, &peer->hashPubKey)))
906 cleanup_node (nodes_requested, &peer->hashPubKey, n);
907
908 if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_active, &peer->hashPubKey)))
909 cleanup_node (nodes_active, &peer->hashPubKey, n);
910
911 if (NULL != (n = GNUNET_CONTAINER_multihashmap_get (nodes_inactive, &peer->hashPubKey)))
912 cleanup_node (nodes_inactive, &peer->hashPubKey, n);
836} 913}
837 914
838 915
@@ -878,29 +955,16 @@ core_receive_handler (void *cls,
878 return GNUNET_OK; 955 return GNUNET_OK;
879} 956}
880 957
881struct ExperimentStartCtx
882{
883 struct ExperimentStartCtx *prev;
884 struct ExperimentStartCtx *next;
885
886 struct Node *n;
887 struct Experiment *e;
888};
889 958
890size_t node_experiment_start_cb (void *cls, size_t bufsize, void *buf) 959size_t node_experiment_start_cb (void *cls, size_t bufsize, void *buf)
891{ 960{
892 struct ExperimentStartCtx *e_ctx = cls; 961 struct NodeComCtx *e_ctx = cls;
893 struct GED_start_message *msg; 962 struct GED_start_message *msg;
894 size_t name_len; 963 size_t name_len;
895 size_t size; 964 size_t size;
896 965
897 GNUNET_CONTAINER_DLL_remove (e_ctx->n->e_req_head, e_ctx->n->e_req_tail, e_ctx);
898 e_ctx->n->cth = NULL;
899 if (NULL == buf) 966 if (NULL == buf)
900 {
901 GNUNET_free (e_ctx);
902 return 0; 967 return 0;
903 }
904 968
905 name_len = strlen(e_ctx->e->name) + 1; 969 name_len = strlen(e_ctx->e->name) + 1;
906 size = sizeof (struct GED_start_message) + name_len; 970 size = sizeof (struct GED_start_message) + name_len;
@@ -915,21 +979,63 @@ size_t node_experiment_start_cb (void *cls, size_t bufsize, void *buf)
915 979
916 memcpy (buf, msg, size); 980 memcpy (buf, msg, size);
917 GNUNET_free (msg); 981 GNUNET_free (msg);
918 GNUNET_free (e_ctx);
919 return size; 982 return size;
920} 983}
921 984
985size_t node_experiment_start_ack_cb (void *cls, size_t bufsize, void *buf)
986{
987 struct NodeComCtx *e_ctx = cls;
988 struct GED_start_ack_message *msg;
989 size_t name_len;
990 size_t size;
991 if (NULL == buf)
992 return 0;
922 993
994 name_len = strlen(e_ctx->e->name) + 1;
995 size = sizeof (struct GED_start_ack_message) + name_len;
996
997 msg = GNUNET_malloc (size);
998 msg->header.size = htons (size);
999 msg->header.type = htons (GNUNET_MESSAGE_TYPE_EXPERIMENTATION_START_ACK);
1000 msg->issuer = e_ctx->e->issuer;
1001 msg->version_nbo = GNUNET_TIME_absolute_hton(e_ctx->e->version);
1002 msg->len_name = htonl (name_len);
1003 memcpy (&msg[1], e_ctx->e->name, name_len);
1004
1005 memcpy (buf, msg, size);
1006 GNUNET_free (msg);
1007 return size;
1008}
1009
1010
1011
1012
1013/**
1014 * Confirm a experiment START with a node
1015 *
1016 * @return GNUNET_NO if core was busy with sending, GNUNET_OK otherwise
1017 */
923int 1018int
924GED_nodes_rts (struct Node *n) 1019GED_nodes_send_start_ack (struct Node *n, struct Experiment *e)
925{ 1020{
926 if (NULL == n->cth) 1021 struct NodeComCtx *e_ctx;
927 return GNUNET_YES; 1022
928 else 1023 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending %s for experiment request to peer `%s' for experiment `%s'\n"),
929 return GNUNET_NO; 1024 "START_ACK" ,GNUNET_i2s(&n->id), e->name);
1025
1026 e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
1027 e_ctx->n = n;
1028 e_ctx->e = e;
1029 e_ctx->size = sizeof (struct GED_start_ack_message) + strlen (e->name) + 1;
1030 e_ctx->notify = &node_experiment_start_ack_cb;
1031 e_ctx->notify_cls = e_ctx;
930 1032
1033 GNUNET_CONTAINER_DLL_insert_tail (n->e_req_head, n->e_req_tail, e_ctx);
1034 schedule_transmisson (e_ctx);
1035 return GNUNET_OK;
931} 1036}
932 1037
1038
933/** 1039/**
934 * Request a experiment to start with a node 1040 * Request a experiment to start with a node
935 * 1041 *
@@ -938,31 +1044,20 @@ GED_nodes_rts (struct Node *n)
938int 1044int
939GED_nodes_request_start (struct Node *n, struct Experiment *e) 1045GED_nodes_request_start (struct Node *n, struct Experiment *e)
940{ 1046{
941 struct ExperimentStartCtx *e_ctx; 1047 struct NodeComCtx *e_ctx;
942
943 if (NULL != n->cth)
944 {
945 GNUNET_break (0); /* should call rts before */
946 return GNUNET_NO;
947 }
948 1048
949 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending experiment start request to peer `%s' for experiment `%s'\n"), 1049 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Sending %s for experiment request to peer `%s' for experiment `%s'\n"),
950 GNUNET_i2s(&n->id), e->name); 1050 "START", GNUNET_i2s(&n->id), e->name);
951 1051
952 e_ctx = GNUNET_malloc (sizeof (struct ExperimentStartCtx)); 1052 e_ctx = GNUNET_malloc (sizeof (struct NodeComCtx));
953 e_ctx->n = n; 1053 e_ctx->n = n;
954 e_ctx->e = e; 1054 e_ctx->e = e;
955 n->cth = GNUNET_CORE_notify_transmit_ready (ch, GNUNET_NO, 0, FAST_TIMEOUT, &n->id, 1055 e_ctx->size = sizeof (struct GED_start_message) + strlen (e->name) + 1;
956 sizeof (struct GED_start_message) + strlen (e->name) + 1, 1056 e_ctx->notify = &node_experiment_start_cb;
957 &node_experiment_start_cb, e_ctx); 1057 e_ctx->notify_cls = e_ctx;
958 if (NULL == n->cth)
959 {
960 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Cannot send experiment start request to peer `%s' for experiment `%s'\n"),
961 GNUNET_i2s(&n->id), e->name);
962 GNUNET_free (e_ctx);
963 }
964 GNUNET_CONTAINER_DLL_insert (n->e_req_head, n->e_req_tail, e_ctx);
965 1058
1059 GNUNET_CONTAINER_DLL_insert_tail (n->e_req_head, n->e_req_tail, e_ctx);
1060 schedule_transmisson (e_ctx);
966 return GNUNET_OK; 1061 return GNUNET_OK;
967} 1062}
968 1063
@@ -998,16 +1093,10 @@ GED_nodes_start ()
998void 1093void
999GED_nodes_stop () 1094GED_nodes_stop ()
1000{ 1095{
1001 if (NULL != ch)
1002 {
1003 GNUNET_CORE_disconnect (ch);
1004 ch = NULL;
1005 }
1006
1007 if (NULL != nodes_requested) 1096 if (NULL != nodes_requested)
1008 { 1097 {
1009 GNUNET_CONTAINER_multihashmap_iterate (nodes_requested, 1098 GNUNET_CONTAINER_multihashmap_iterate (nodes_requested,
1010 &cleanup_nodes, 1099 &cleanup_node,
1011 nodes_requested); 1100 nodes_requested);
1012 update_stats (nodes_requested); 1101 update_stats (nodes_requested);
1013 GNUNET_CONTAINER_multihashmap_destroy (nodes_requested); 1102 GNUNET_CONTAINER_multihashmap_destroy (nodes_requested);
@@ -1017,7 +1106,7 @@ GED_nodes_stop ()
1017 if (NULL != nodes_active) 1106 if (NULL != nodes_active)
1018 { 1107 {
1019 GNUNET_CONTAINER_multihashmap_iterate (nodes_active, 1108 GNUNET_CONTAINER_multihashmap_iterate (nodes_active,
1020 &cleanup_nodes, 1109 &cleanup_node,
1021 nodes_active); 1110 nodes_active);
1022 update_stats (nodes_active); 1111 update_stats (nodes_active);
1023 GNUNET_CONTAINER_multihashmap_destroy (nodes_active); 1112 GNUNET_CONTAINER_multihashmap_destroy (nodes_active);
@@ -1027,12 +1116,17 @@ GED_nodes_stop ()
1027 if (NULL != nodes_inactive) 1116 if (NULL != nodes_inactive)
1028 { 1117 {
1029 GNUNET_CONTAINER_multihashmap_iterate (nodes_inactive, 1118 GNUNET_CONTAINER_multihashmap_iterate (nodes_inactive,
1030 &cleanup_nodes, 1119 &cleanup_node,
1031 nodes_inactive); 1120 nodes_inactive);
1032 update_stats (nodes_inactive); 1121 update_stats (nodes_inactive);
1033 GNUNET_CONTAINER_multihashmap_destroy (nodes_inactive); 1122 GNUNET_CONTAINER_multihashmap_destroy (nodes_inactive);
1034 nodes_inactive = NULL; 1123 nodes_inactive = NULL;
1035 } 1124 }
1125 if (NULL != ch)
1126 {
1127 GNUNET_CORE_disconnect (ch);
1128 ch = NULL;
1129 }
1036} 1130}
1037 1131
1038/* end of gnunet-daemon-experimentation_nodes.c */ 1132/* end of gnunet-daemon-experimentation_nodes.c */
diff --git a/src/experimentation/gnunet-daemon-experimentation_scheduler.c b/src/experimentation/gnunet-daemon-experimentation_scheduler.c
index de5c80614..519caa877 100644
--- a/src/experimentation/gnunet-daemon-experimentation_scheduler.c
+++ b/src/experimentation/gnunet-daemon-experimentation_scheduler.c
@@ -122,27 +122,13 @@ static void run_experiment_inbound (void *cls,const struct GNUNET_SCHEDULER_Task
122 struct ScheduledExperiment *se = cls; 122 struct ScheduledExperiment *se = cls;
123 struct GNUNET_TIME_Relative start; 123 struct GNUNET_TIME_Relative start;
124 struct GNUNET_TIME_Relative end; 124 struct GNUNET_TIME_Relative end;
125 struct GNUNET_TIME_Relative backoff;
126 125
127 se->task = GNUNET_SCHEDULER_NO_TASK; 126 se->task = GNUNET_SCHEDULER_NO_TASK;
128 127
129 if (GNUNET_NO == GED_nodes_rts (se->n))
130 {
131 se->state = BUSY;
132 backoff = GNUNET_TIME_UNIT_SECONDS;
133 backoff.rel_value += GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000);
134 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to peer `%s' for `%s' for %llu ms\n",
135 GNUNET_i2s (&se->n->id), se->e->name, (unsigned long long) backoff.rel_value);
136 se->task = GNUNET_SCHEDULER_add_delayed (backoff, &run_experiment_inbound, se);
137 return;
138 }
139 else if (BUSY == se->state)
140 se->state = NOT_RUNNING;
141
142 switch (se->state) { 128 switch (se->state) {
143 case NOT_RUNNING: 129 case NOT_RUNNING:
144 /* Send START_ACK message */ 130 /* Send START_ACK message */
145 //GED_nodes_request_start (se->n, se->e); 131 GED_nodes_send_start_ack (se->n, se->e);
146 se->state = REQUESTED; 132 se->state = REQUESTED;
147 /* Schedule to run */ 133 /* Schedule to run */
148 start = GNUNET_TIME_absolute_get_remaining(se->e->start); 134 start = GNUNET_TIME_absolute_get_remaining(se->e->start);
@@ -152,12 +138,11 @@ static void run_experiment_inbound (void *cls,const struct GNUNET_SCHEDULER_Task
152 se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_inbound, se); 138 se->task = GNUNET_SCHEDULER_add_delayed (start, &run_experiment_inbound, se);
153 break; 139 break;
154 case REQUESTED: 140 case REQUESTED:
155 /* Already requested */
156 se->state = STARTED;
157 case STARTED: 141 case STARTED:
142 se->state = STARTED;
158 /* Experiment is running */ 143 /* Experiment is running */
159 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment `%s' peer for `%s'\n", 144 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running %s experiment `%s' peer for `%s'\n",
160 GNUNET_i2s (&se->n->id), se->e->name); 145 "inbound", GNUNET_i2s (&se->n->id), se->e->name);
161 146
162 /* do work here */ 147 /* do work here */
163 148
@@ -184,33 +169,15 @@ static void run_experiment_outbound (void *cls,const struct GNUNET_SCHEDULER_Tas
184{ 169{
185 struct ScheduledExperiment *se = cls; 170 struct ScheduledExperiment *se = cls;
186 struct GNUNET_TIME_Relative end; 171 struct GNUNET_TIME_Relative end;
187 struct GNUNET_TIME_Relative backoff;
188 172
189 se->task = GNUNET_SCHEDULER_NO_TASK; 173 se->task = GNUNET_SCHEDULER_NO_TASK;
190 174
191 if (GNUNET_NO == GED_nodes_rts (se->n))
192 {
193 /* Cannot send to peer, core is busy */
194 se->state = BUSY;
195 backoff = GNUNET_TIME_UNIT_SECONDS;
196 backoff.rel_value += GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000);
197 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to peer `%s' for `%s' for %llu ms\n",
198 GNUNET_i2s (&se->n->id), se->e->name, (unsigned long long) backoff.rel_value);
199 se->task = GNUNET_SCHEDULER_add_delayed (backoff, &run_experiment_outbound, se);
200 return;
201 }
202 else if (BUSY == se->state)
203 se->state = NOT_RUNNING; /* Not busy anymore, can send */
204
205 switch (se->state) { 175 switch (se->state) {
206 case NOT_RUNNING: 176 case NOT_RUNNING:
207 /* Send START message */ 177 /* Send START message */
208 GED_nodes_request_start (se->n, se->e); 178 GED_nodes_request_start (se->n, se->e);
209 se->state = REQUESTED; 179 se->state = REQUESTED;
210 se->task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &request_timeout, se); 180 se->task = GNUNET_SCHEDULER_add_delayed (EXP_RESPONSE_TIMEOUT, &request_timeout, se);
211
212 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Sending start request to peer `%s' for `%s'\n",
213 GNUNET_i2s (&se->n->id), se->e->name);
214 experiments_requested ++; 181 experiments_requested ++;
215 GNUNET_STATISTICS_set (GED_stats, "# experiments requested", experiments_requested, GNUNET_NO); 182 GNUNET_STATISTICS_set (GED_stats, "# experiments requested", experiments_requested, GNUNET_NO);
216 break; 183 break;
@@ -220,8 +187,8 @@ static void run_experiment_outbound (void *cls,const struct GNUNET_SCHEDULER_Tas
220 break; 187 break;
221 case STARTED: 188 case STARTED:
222 /* Experiment is running */ 189 /* Experiment is running */
223 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment `%s' peer for `%s'\n", 190 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running %s experiment `%s' peer for `%s'\n",
224 GNUNET_i2s (&se->n->id), se->e->name); 191 "outbound", GNUNET_i2s (&se->n->id), se->e->name);
225 192
226 /* do work here */ 193 /* do work here */
227 194
@@ -271,7 +238,7 @@ GED_scheduler_handle_start (struct Node *n, struct Experiment *e)
271} 238}
272 239
273/** 240/**
274 * Handle a START_ACL message from a remote node 241 * Handle a START_ACK message from a remote node
275 * 242 *
276 * @param n the node 243 * @param n the node
277 * @param e the experiment 244 * @param e the experiment
@@ -281,7 +248,7 @@ GED_scheduler_handle_start_ack (struct Node *n, struct Experiment *e)
281{ 248{
282 struct ScheduledExperiment *se; 249 struct ScheduledExperiment *se;
283 250
284 if (NULL == (se = find_experiment (waiting_in_head, waiting_in_tail, n, e, GNUNET_NO))) 251 if (NULL == (se = find_experiment (waiting_out_head, waiting_out_tail, n, e, GNUNET_YES)))
285 { 252 {
286 GNUNET_break (0); 253 GNUNET_break (0);
287 return; 254 return;
@@ -291,7 +258,14 @@ GED_scheduler_handle_start_ack (struct Node *n, struct Experiment *e)
291 "START_ACK", GNUNET_i2s (&n->id), e->name); 258 "START_ACK", GNUNET_i2s (&n->id), e->name);
292 259
293 if (GNUNET_SCHEDULER_NO_TASK != se->task) 260 if (GNUNET_SCHEDULER_NO_TASK != se->task)
294 GNUNET_SCHEDULER_cancel (se->task); 261 GNUNET_SCHEDULER_cancel (se->task); /* *Canceling timeout task */
262
263 /* Remove from waiting list, add to running list */
264 GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, se);
265 GNUNET_CONTAINER_DLL_insert (running_out_head, waiting_out_tail, se);
266
267 /* Change state and schedule to run */
268 se->state = STARTED;
295 se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se); 269 se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se);
296} 270}
297 271
@@ -427,6 +401,38 @@ GED_scheduler_stop ()
427 experiments_running --; 401 experiments_running --;
428 GNUNET_STATISTICS_set (GED_stats, "# experiments running", experiments_running, GNUNET_NO); 402 GNUNET_STATISTICS_set (GED_stats, "# experiments running", experiments_running, GNUNET_NO);
429 } 403 }
404
405 next = waiting_out_head;
406 while (NULL != (cur = next))
407 {
408 next = cur->next;
409 GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, cur);
410 if (GNUNET_SCHEDULER_NO_TASK != cur->task)
411 {
412 GNUNET_SCHEDULER_cancel (cur->task);
413 cur->task = GNUNET_SCHEDULER_NO_TASK;
414 }
415 GNUNET_free (cur);
416 GNUNET_assert (experiments_scheduled > 0);
417 experiments_scheduled --;
418 GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", experiments_scheduled, GNUNET_NO);
419 }
420
421 next = running_out_head;
422 while (NULL != (cur = next))
423 {
424 next = cur->next;
425 GNUNET_CONTAINER_DLL_remove (running_out_head, running_out_tail, cur);
426 if (GNUNET_SCHEDULER_NO_TASK != cur->task)
427 {
428 GNUNET_SCHEDULER_cancel (cur->task);
429 cur->task = GNUNET_SCHEDULER_NO_TASK;
430 }
431 GNUNET_free (cur);
432 GNUNET_assert (experiments_running > 0);
433 experiments_running --;
434 GNUNET_STATISTICS_set (GED_stats, "# experiments running", experiments_running, GNUNET_NO);
435 }
430} 436}
431 437
432/* end of gnunet-daemon-experimentation_scheduler.c */ 438/* end of gnunet-daemon-experimentation_scheduler.c */
diff --git a/src/experimentation/test_experimentation_clique.conf b/src/experimentation/test_experimentation_clique.conf
index bc6a34dcc..5b2b86ab2 100644
--- a/src/experimentation/test_experimentation_clique.conf
+++ b/src/experimentation/test_experimentation_clique.conf
@@ -32,7 +32,7 @@ NEIGHBOUR_LIMIT = 50
32PORT = 12365 32PORT = 12365
33 33
34[experimentation] 34[experimentation]
35#PREFIX = valgrind --leak-check=full 35PREFIX = valgrind --leak-check=full
36ISSUERS = TFRM29O2RQNKLVBQIGODJ6GD58LSQ2NM9TNFBC6N48BRJHQO38Q73N2OM3V4CLKDM6CILQV4CU8PMJDRG0FNB0PDI057DBRANMLPLRG 36ISSUERS = TFRM29O2RQNKLVBQIGODJ6GD58LSQ2NM9TNFBC6N48BRJHQO38Q73N2OM3V4CLKDM6CILQV4CU8PMJDRG0FNB0PDI057DBRANMLPLRG
37EXPERIMENTS = test_experiments.exp 37EXPERIMENTS = test_experiments.exp
38 38