diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-01-03 00:43:57 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-01-03 00:43:57 +0000 |
commit | 845316cb543af7e4c77709acce4df79f3e0dc162 (patch) | |
tree | 629739608b0bd0f645d82fd41d90ccb2cb617d09 /src/consensus/consensus_api.c | |
parent | 3def510831309a5a0a5f50a2250911d6b592a87e (diff) | |
download | gnunet-845316cb543af7e4c77709acce4df79f3e0dc162.tar.gz gnunet-845316cb543af7e4c77709acce4df79f3e0dc162.zip |
implemented the modified consensus api, started implementing p2p protocol for consensus
Diffstat (limited to 'src/consensus/consensus_api.c')
-rw-r--r-- | src/consensus/consensus_api.c | 481 |
1 files changed, 154 insertions, 327 deletions
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 2479c019c..25c76b358 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c | |||
@@ -33,14 +33,43 @@ | |||
33 | 33 | ||
34 | #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) | 34 | #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) |
35 | 35 | ||
36 | struct ElementAck | 36 | /** |
37 | * Actions that can be queued. | ||
38 | */ | ||
39 | struct QueuedMessage | ||
37 | { | 40 | { |
38 | struct ElementAck *next; | 41 | /** |
39 | struct ElementAck *prev; | 42 | * Queued messages are stored in a doubly linked list. |
40 | int keep; | 43 | */ |
41 | struct GNUNET_CONSENSUS_Element *element; | 44 | struct QueuedMessage *next; |
45 | |||
46 | /** | ||
47 | * Queued messages are stored in a doubly linked list. | ||
48 | */ | ||
49 | struct QueuedMessage *prev; | ||
50 | |||
51 | /** | ||
52 | * The actual queued message. | ||
53 | */ | ||
54 | struct GNUNET_MessageHeader *msg; | ||
55 | |||
56 | /** | ||
57 | * Size of the message in msg. | ||
58 | */ | ||
59 | size_t size; | ||
60 | |||
61 | /** | ||
62 | * Will be called after transmit, if not NULL | ||
63 | */ | ||
64 | GNUNET_CONSENSUS_InsertDoneCallback idc; | ||
65 | |||
66 | /** | ||
67 | * Closure for idc | ||
68 | */ | ||
69 | void *idc_cls; | ||
42 | }; | 70 | }; |
43 | 71 | ||
72 | |||
44 | /** | 73 | /** |
45 | * Handle for the service. | 74 | * Handle for the service. |
46 | */ | 75 | */ |
@@ -52,14 +81,14 @@ struct GNUNET_CONSENSUS_Handle | |||
52 | const struct GNUNET_CONFIGURATION_Handle *cfg; | 81 | const struct GNUNET_CONFIGURATION_Handle *cfg; |
53 | 82 | ||
54 | /** | 83 | /** |
55 | * Socket (if available). | 84 | * Client connected to the consensus service, may be NULL if not connected. |
56 | */ | 85 | */ |
57 | struct GNUNET_CLIENT_Connection *client; | 86 | struct GNUNET_CLIENT_Connection *client; |
58 | 87 | ||
59 | /** | 88 | /** |
60 | * Callback for new elements. Not called for elements added locally. | 89 | * Callback for new elements. Not called for elements added locally. |
61 | */ | 90 | */ |
62 | GNUNET_CONSENSUS_NewElementCallback new_element_cb; | 91 | GNUNET_CONSENSUS_ElementCallback new_element_cb; |
63 | 92 | ||
64 | /** | 93 | /** |
65 | * Closure for new_element_cb | 94 | * Closure for new_element_cb |
@@ -67,7 +96,7 @@ struct GNUNET_CONSENSUS_Handle | |||
67 | void *new_element_cls; | 96 | void *new_element_cls; |
68 | 97 | ||
69 | /** | 98 | /** |
70 | * Session identifier for the consensus session. | 99 | * The (local) session identifier for the consensus session. |
71 | */ | 100 | */ |
72 | struct GNUNET_HashCode session_id; | 101 | struct GNUNET_HashCode session_id; |
73 | 102 | ||
@@ -77,9 +106,9 @@ struct GNUNET_CONSENSUS_Handle | |||
77 | int num_peers; | 106 | int num_peers; |
78 | 107 | ||
79 | /** | 108 | /** |
80 | * Peer identities of peers in the consensus. Optionally includes the local peer. | 109 | * Peer identities of peers participating in the consensus, includes the local peer. |
81 | */ | 110 | */ |
82 | struct GNUNET_PeerIdentity *peers; | 111 | struct GNUNET_PeerIdentity **peers; |
83 | 112 | ||
84 | /** | 113 | /** |
85 | * Currently active transmit request. | 114 | * Currently active transmit request. |
@@ -92,22 +121,11 @@ struct GNUNET_CONSENSUS_Handle | |||
92 | int joined; | 121 | int joined; |
93 | 122 | ||
94 | /** | 123 | /** |
95 | * Called when the current insertion operation finishes. | ||
96 | * NULL if there is no insert operation active. | ||
97 | */ | ||
98 | GNUNET_CONSENSUS_InsertDoneCallback idc; | ||
99 | |||
100 | /** | ||
101 | * Closure for the insert done callback. | 124 | * Closure for the insert done callback. |
102 | */ | 125 | */ |
103 | void *idc_cls; | 126 | void *idc_cls; |
104 | 127 | ||
105 | /** | 128 | /** |
106 | * An element that was requested to be inserted. | ||
107 | */ | ||
108 | struct GNUNET_CONSENSUS_Element *insert_element; | ||
109 | |||
110 | /** | ||
111 | * Called when the conclude operation finishes or fails. | 129 | * Called when the conclude operation finishes or fails. |
112 | */ | 130 | */ |
113 | GNUNET_CONSENSUS_ConcludeCallback conclude_cb; | 131 | GNUNET_CONSENSUS_ConcludeCallback conclude_cb; |
@@ -122,103 +140,92 @@ struct GNUNET_CONSENSUS_Handle | |||
122 | */ | 140 | */ |
123 | struct GNUNET_TIME_Absolute conclude_deadline; | 141 | struct GNUNET_TIME_Absolute conclude_deadline; |
124 | 142 | ||
125 | struct ElementAck *ack_head; | 143 | unsigned int conclude_min_size; |
126 | struct ElementAck *ack_tail; | ||
127 | |||
128 | /** | ||
129 | * Set to GNUNET_YES if the begin message has been transmitted to the service | ||
130 | */ | ||
131 | int begin_sent; | ||
132 | 144 | ||
133 | /** | 145 | struct QueuedMessage *messages_head; |
134 | * Set to GNUNET_YES it the begin message should be transmitted to the service | 146 | struct QueuedMessage *messages_tail; |
135 | */ | ||
136 | int begin_requested; | ||
137 | }; | 147 | }; |
138 | 148 | ||
139 | 149 | ||
140 | static size_t | ||
141 | transmit_ack (void *cls, size_t size, void *buf); | ||
142 | |||
143 | static size_t | ||
144 | transmit_insert (void *cls, size_t size, void *buf); | ||
145 | |||
146 | static size_t | ||
147 | transmit_conclude (void *cls, size_t size, void *buf); | ||
148 | |||
149 | static size_t | ||
150 | transmit_begin (void *cls, size_t size, void *buf); | ||
151 | |||
152 | 150 | ||
153 | /** | 151 | /** |
154 | * Call notify_transmit_ready for ack if necessary and possible. | 152 | * Schedule transmitting the next message. |
153 | * | ||
154 | * @param consensus consensus handle | ||
155 | */ | 155 | */ |
156 | static void | 156 | static void |
157 | ntr_ack (struct GNUNET_CONSENSUS_Handle *consensus) | 157 | schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus); |
158 | { | ||
159 | if ((NULL == consensus->th) && (NULL != consensus->ack_head)) | ||
160 | { | ||
161 | consensus->th = | ||
162 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, | ||
163 | sizeof (struct GNUNET_CONSENSUS_AckMessage), | ||
164 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
165 | GNUNET_NO, &transmit_ack, consensus); | ||
166 | } | ||
167 | } | ||
168 | 158 | ||
169 | 159 | ||
170 | /** | 160 | /** |
171 | * Call notify_transmit_ready for ack if necessary and possible. | 161 | * Function called to notify a client about the connection |
162 | * begin ready to queue more data. "buf" will be | ||
163 | * NULL and "size" zero if the connection was closed for | ||
164 | * writing in the meantime. | ||
165 | * | ||
166 | * @param cls closure | ||
167 | * @param size number of bytes available in buf | ||
168 | * @param buf where the callee should write the message | ||
169 | * @return number of bytes written to buf | ||
172 | */ | 170 | */ |
173 | static void | 171 | static size_t transmit_queued (void *cls, size_t size, |
174 | ntr_insert (struct GNUNET_CONSENSUS_Handle *consensus) | 172 | void *buf) |
175 | { | 173 | { |
176 | if ((NULL == consensus->th) && (NULL != consensus->insert_element)) | 174 | struct GNUNET_CONSENSUS_Handle *consensus; |
175 | struct QueuedMessage *qmsg; | ||
176 | size_t ret_size; | ||
177 | |||
178 | printf("transmitting queued\n"); | ||
179 | |||
180 | consensus = (struct GNUNET_CONSENSUS_Handle *) cls; | ||
181 | qmsg = consensus->messages_head; | ||
182 | GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg); | ||
183 | GNUNET_assert (qmsg); | ||
184 | |||
185 | if (NULL == buf) | ||
177 | { | 186 | { |
178 | consensus->th = | 187 | if (NULL != qmsg->idc) |
179 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, | 188 | { |
180 | sizeof (struct GNUNET_CONSENSUS_ElementMessage) + | 189 | qmsg->idc (qmsg->idc_cls, GNUNET_YES); |
181 | consensus->insert_element->size, | 190 | } |
182 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
183 | GNUNET_NO, &transmit_insert, consensus); | ||
184 | } | 191 | } |
185 | } | ||
186 | |||
187 | 192 | ||
188 | /** | 193 | memcpy (buf, qmsg->msg, qmsg->size); |
189 | * Call notify_transmit_ready for ack if necessary and possible. | 194 | ret_size = qmsg->size; |
190 | */ | 195 | if (NULL != qmsg->idc) |
191 | static void | ||
192 | ntr_conclude (struct GNUNET_CONSENSUS_Handle *consensus) | ||
193 | { | ||
194 | if ((NULL == consensus->th) && (NULL != consensus->conclude_cb)) | ||
195 | { | 196 | { |
196 | consensus->th = | 197 | qmsg->idc (qmsg->idc_cls, GNUNET_YES); |
197 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, | ||
198 | sizeof (struct GNUNET_CONSENSUS_ConcludeMessage), | ||
199 | GNUNET_TIME_absolute_get_remaining (consensus->conclude_deadline), | ||
200 | GNUNET_NO, &transmit_conclude, consensus); | ||
201 | } | 198 | } |
199 | GNUNET_free (qmsg->msg); | ||
200 | GNUNET_free (qmsg); | ||
201 | |||
202 | schedule_transmit (consensus); | ||
203 | |||
204 | return ret_size; | ||
202 | } | 205 | } |
203 | 206 | ||
204 | 207 | ||
205 | /** | 208 | /** |
206 | * Call notify_transmit_ready for ack if necessary and possible. | 209 | * Schedule transmitting the next message. |
210 | * | ||
211 | * @param consensus consensus handle | ||
207 | */ | 212 | */ |
208 | static void | 213 | static void |
209 | ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus) | 214 | schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus) |
210 | { | 215 | { |
211 | if ((NULL == consensus->th) && (GNUNET_YES == consensus->begin_requested) && | 216 | if (NULL != consensus->th) |
212 | (GNUNET_NO == consensus->begin_sent)) | 217 | return; |
218 | |||
219 | if (NULL != consensus->messages_head) | ||
213 | { | 220 | { |
214 | consensus->th = | 221 | LOG (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n"); |
215 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, | 222 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, consensus->messages_head->size, |
216 | sizeof (struct GNUNET_MessageHeader), | 223 | GNUNET_TIME_UNIT_FOREVER_REL, |
217 | GNUNET_TIME_UNIT_FOREVER_REL, | 224 | GNUNET_NO, &transmit_queued, consensus); |
218 | GNUNET_NO, &transmit_begin, consensus); | ||
219 | } | 225 | } |
220 | } | 226 | } |
221 | 227 | ||
228 | |||
222 | /** | 229 | /** |
223 | * Called when the server has sent is a new element | 230 | * Called when the server has sent is a new element |
224 | * | 231 | * |
@@ -226,11 +233,12 @@ ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus) | |||
226 | * @param msg element message | 233 | * @param msg element message |
227 | */ | 234 | */ |
228 | static void | 235 | static void |
229 | handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus, | 236 | handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, |
230 | struct GNUNET_CONSENSUS_ElementMessage *msg) | 237 | struct GNUNET_CONSENSUS_ElementMessage *msg) |
231 | { | 238 | { |
232 | struct GNUNET_CONSENSUS_Element element; | 239 | struct GNUNET_CONSENSUS_Element element; |
233 | struct ElementAck *ack; | 240 | struct GNUNET_CONSENSUS_AckMessage *ack_msg; |
241 | struct QueuedMessage *queued_msg; | ||
234 | int ret; | 242 | int ret; |
235 | 243 | ||
236 | element.type = msg->element_type; | 244 | element.type = msg->element_type; |
@@ -238,11 +246,15 @@ handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus, | |||
238 | element.data = &msg[1]; | 246 | element.data = &msg[1]; |
239 | 247 | ||
240 | ret = consensus->new_element_cb (consensus->new_element_cls, &element); | 248 | ret = consensus->new_element_cb (consensus->new_element_cls, &element); |
241 | ack = GNUNET_malloc (sizeof (struct ElementAck)); | ||
242 | ack->keep = ret; | ||
243 | GNUNET_CONTAINER_DLL_insert_tail (consensus->ack_head, consensus->ack_tail,ack); | ||
244 | 249 | ||
245 | ntr_ack (consensus); | 250 | queued_msg = GNUNET_malloc (sizeof (struct QueuedMessage) + sizeof (struct GNUNET_CONSENSUS_AckMessage)); |
251 | queued_msg->msg = (struct GNUNET_MessageHeader *) &queued_msg[1]; | ||
252 | |||
253 | ack_msg = (struct GNUNET_CONSENSUS_AckMessage *) queued_msg->msg; | ||
254 | ack_msg->keep = ret; | ||
255 | |||
256 | GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, | ||
257 | queued_msg); | ||
246 | } | 258 | } |
247 | 259 | ||
248 | 260 | ||
@@ -254,13 +266,12 @@ handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus, | |||
254 | * @param msg conclude done message | 266 | * @param msg conclude done message |
255 | */ | 267 | */ |
256 | static void | 268 | static void |
257 | handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus, | 269 | handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, |
258 | struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) | 270 | struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) |
259 | { | 271 | { |
260 | GNUNET_assert (NULL != consensus->conclude_cb); | 272 | GNUNET_assert (NULL != consensus->conclude_cb); |
261 | consensus->conclude_cb(consensus->conclude_cls, | 273 | consensus->conclude_cb (consensus->conclude_cls, |
262 | msg->num_peers, | 274 | 0, NULL); |
263 | (struct GNUNET_PeerIdentity *) &msg[1]); | ||
264 | consensus->conclude_cb = NULL; | 275 | consensus->conclude_cb = NULL; |
265 | } | 276 | } |
266 | 277 | ||
@@ -287,12 +298,6 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
287 | GNUNET_CLIENT_disconnect (consensus->client); | 298 | GNUNET_CLIENT_disconnect (consensus->client); |
288 | consensus->client = NULL; | 299 | consensus->client = NULL; |
289 | consensus->new_element_cb (NULL, NULL); | 300 | consensus->new_element_cb (NULL, NULL); |
290 | if (NULL != consensus->idc) | ||
291 | { | ||
292 | consensus->idc(consensus->idc_cls, GNUNET_NO); | ||
293 | consensus->idc = NULL; | ||
294 | consensus->idc_cls = NULL; | ||
295 | } | ||
296 | return; | 301 | return; |
297 | } | 302 | } |
298 | 303 | ||
@@ -305,108 +310,12 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
305 | handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg); | 310 | handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg); |
306 | break; | 311 | break; |
307 | default: | 312 | default: |
308 | LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring"); | 313 | GNUNET_break (0); |
309 | } | 314 | } |
310 | GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, | 315 | GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, |
311 | GNUNET_TIME_UNIT_FOREVER_REL); | 316 | GNUNET_TIME_UNIT_FOREVER_REL); |
312 | } | 317 | } |
313 | 318 | ||
314 | |||
315 | |||
316 | |||
317 | /** | ||
318 | * Function called to notify a client about the connection | ||
319 | * begin ready to queue more data. "buf" will be | ||
320 | * NULL and "size" zero if the connection was closed for | ||
321 | * writing in the meantime. | ||
322 | * | ||
323 | * @param cls closure | ||
324 | * @param size number of bytes available in buf | ||
325 | * @param buf where the callee should write the message | ||
326 | * @return number of bytes written to buf | ||
327 | */ | ||
328 | static size_t | ||
329 | transmit_ack (void *cls, size_t size, void *buf) | ||
330 | { | ||
331 | struct GNUNET_CONSENSUS_AckMessage *msg; | ||
332 | struct GNUNET_CONSENSUS_Handle *consensus; | ||
333 | |||
334 | consensus = (struct GNUNET_CONSENSUS_Handle *) cls; | ||
335 | |||
336 | GNUNET_assert (NULL != consensus->ack_head); | ||
337 | |||
338 | msg = (struct GNUNET_CONSENSUS_AckMessage *) buf; | ||
339 | msg->keep = consensus->ack_head->keep; | ||
340 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK); | ||
341 | msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_AckMessage)); | ||
342 | |||
343 | consensus->ack_head = consensus->ack_head->next; | ||
344 | |||
345 | consensus->th = NULL; | ||
346 | |||
347 | ntr_insert (consensus); | ||
348 | ntr_ack (consensus); | ||
349 | ntr_conclude (consensus); | ||
350 | |||
351 | return sizeof (struct GNUNET_CONSENSUS_AckMessage); | ||
352 | } | ||
353 | |||
354 | /** | ||
355 | * Function called to notify a client about the connection | ||
356 | * begin ready to queue more data. "buf" will be | ||
357 | * NULL and "size" zero if the connection was closed for | ||
358 | * writing in the meantime. | ||
359 | * | ||
360 | * @param cls closure | ||
361 | * @param size number of bytes available in buf | ||
362 | * @param buf where the callee should write the message | ||
363 | * @return number of bytes written to buf | ||
364 | */ | ||
365 | static size_t | ||
366 | transmit_insert (void *cls, size_t size, void *buf) | ||
367 | { | ||
368 | struct GNUNET_CONSENSUS_ElementMessage *msg; | ||
369 | struct GNUNET_CONSENSUS_Handle *consensus; | ||
370 | GNUNET_CONSENSUS_InsertDoneCallback idc; | ||
371 | int msize; | ||
372 | void *idc_cls; | ||
373 | |||
374 | GNUNET_assert (NULL != buf); | ||
375 | |||
376 | consensus = cls; | ||
377 | |||
378 | GNUNET_assert (NULL != consensus->insert_element); | ||
379 | |||
380 | consensus->th = NULL; | ||
381 | |||
382 | msg = buf; | ||
383 | |||
384 | msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) + | ||
385 | consensus->insert_element->size; | ||
386 | |||
387 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); | ||
388 | msg->header.size = htons (msize); | ||
389 | memcpy (&msg[1], | ||
390 | consensus->insert_element->data, | ||
391 | consensus->insert_element->size); | ||
392 | |||
393 | consensus->insert_element = NULL; | ||
394 | |||
395 | idc = consensus->idc; | ||
396 | consensus->idc = NULL; | ||
397 | idc_cls = consensus->idc_cls; | ||
398 | consensus->idc_cls = NULL; | ||
399 | idc (idc_cls, GNUNET_YES); | ||
400 | |||
401 | |||
402 | ntr_ack (consensus); | ||
403 | ntr_insert (consensus); | ||
404 | ntr_conclude (consensus); | ||
405 | |||
406 | return msize; | ||
407 | } | ||
408 | |||
409 | |||
410 | /** | 319 | /** |
411 | * Function called to notify a client about the connection | 320 | * Function called to notify a client about the connection |
412 | * begin ready to queue more data. "buf" will be | 321 | * begin ready to queue more data. "buf" will be |
@@ -427,7 +336,7 @@ transmit_join (void *cls, size_t size, void *buf) | |||
427 | 336 | ||
428 | GNUNET_assert (NULL != buf); | 337 | GNUNET_assert (NULL != buf); |
429 | 338 | ||
430 | LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n"); | 339 | LOG (GNUNET_ERROR_TYPE_INFO, "transmitting join message\n"); |
431 | 340 | ||
432 | consensus = cls; | 341 | consensus = cls; |
433 | consensus->th = NULL; | 342 | consensus->th = NULL; |
@@ -447,9 +356,7 @@ transmit_join (void *cls, size_t size, void *buf) | |||
447 | consensus->peers, | 356 | consensus->peers, |
448 | consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); | 357 | consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); |
449 | 358 | ||
450 | ntr_insert (consensus); | 359 | schedule_transmit (consensus); |
451 | ntr_begin (consensus); | ||
452 | ntr_conclude (consensus); | ||
453 | 360 | ||
454 | GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, | 361 | GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, |
455 | GNUNET_TIME_UNIT_FOREVER_REL); | 362 | GNUNET_TIME_UNIT_FOREVER_REL); |
@@ -457,88 +364,11 @@ transmit_join (void *cls, size_t size, void *buf) | |||
457 | return msize; | 364 | return msize; |
458 | } | 365 | } |
459 | 366 | ||
460 | |||
461 | /** | ||
462 | * Function called to notify a client about the connection | ||
463 | * begin ready to queue more data. "buf" will be | ||
464 | * NULL and "size" zero if the connection was closed for | ||
465 | * writing in the meantime. | ||
466 | * | ||
467 | * @param cls closure | ||
468 | * @param size number of bytes available in buf | ||
469 | * @param buf where the callee should write the message | ||
470 | * @return number of bytes written to buf | ||
471 | */ | ||
472 | static size_t | ||
473 | transmit_conclude (void *cls, size_t size, void *buf) | ||
474 | { | ||
475 | struct GNUNET_CONSENSUS_ConcludeMessage *msg; | ||
476 | struct GNUNET_CONSENSUS_Handle *consensus; | ||
477 | int msize; | ||
478 | |||
479 | GNUNET_assert (NULL != buf); | ||
480 | |||
481 | consensus = cls; | ||
482 | consensus->th = NULL; | ||
483 | |||
484 | msg = buf; | ||
485 | |||
486 | msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage); | ||
487 | |||
488 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); | ||
489 | msg->header.size = htons (msize); | ||
490 | msg->timeout = | ||
491 | GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline)); | ||
492 | |||
493 | ntr_ack (consensus); | ||
494 | |||
495 | return msize; | ||
496 | } | ||
497 | |||
498 | |||
499 | /** | ||
500 | * Function called to notify a client about the connection | ||
501 | * begin ready to queue more data. "buf" will be | ||
502 | * NULL and "size" zero if the connection was closed for | ||
503 | * writing in the meantime. | ||
504 | * | ||
505 | * @param cls the consensus handle | ||
506 | * @param size number of bytes available in buf | ||
507 | * @param buf where the callee should write the message | ||
508 | * @return number of bytes written to buf | ||
509 | */ | ||
510 | static size_t | ||
511 | transmit_begin (void *cls, size_t size, void *buf) | ||
512 | { | ||
513 | struct GNUNET_MessageHeader *msg; | ||
514 | struct GNUNET_CONSENSUS_Handle *consensus; | ||
515 | int msize; | ||
516 | |||
517 | GNUNET_assert (NULL != buf); | ||
518 | |||
519 | consensus = cls; | ||
520 | consensus->th = NULL; | ||
521 | |||
522 | msg = buf; | ||
523 | |||
524 | msize = sizeof (struct GNUNET_MessageHeader); | ||
525 | |||
526 | msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN); | ||
527 | msg->size = htons (msize); | ||
528 | |||
529 | ntr_ack (consensus); | ||
530 | ntr_insert (consensus); | ||
531 | ntr_conclude (consensus); | ||
532 | |||
533 | return msize; | ||
534 | } | ||
535 | |||
536 | |||
537 | /** | 367 | /** |
538 | * Create a consensus session. | 368 | * Create a consensus session. |
539 | * | 369 | * |
540 | * @param cfg | 370 | * @param cfg configuration to use for connecting to the consensus service |
541 | * @param num_peers | 371 | * @param num_peers number of peers in the peers array |
542 | * @param peers array of peers participating in this consensus session | 372 | * @param peers array of peers participating in this consensus session |
543 | * Inclusion of the local peer is optional. | 373 | * Inclusion of the local peer is optional. |
544 | * @param session_id session identifier | 374 | * @param session_id session identifier |
@@ -553,7 +383,7 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
553 | unsigned int num_peers, | 383 | unsigned int num_peers, |
554 | const struct GNUNET_PeerIdentity *peers, | 384 | const struct GNUNET_PeerIdentity *peers, |
555 | const struct GNUNET_HashCode *session_id, | 385 | const struct GNUNET_HashCode *session_id, |
556 | GNUNET_CONSENSUS_NewElementCallback new_element_cb, | 386 | GNUNET_CONSENSUS_ElementCallback new_element_cb, |
557 | void *new_element_cls) | 387 | void *new_element_cls) |
558 | { | 388 | { |
559 | struct GNUNET_CONSENSUS_Handle *consensus; | 389 | struct GNUNET_CONSENSUS_Handle *consensus; |
@@ -567,17 +397,10 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
567 | consensus->session_id = *session_id; | 397 | consensus->session_id = *session_id; |
568 | 398 | ||
569 | if (0 == num_peers) | 399 | if (0 == num_peers) |
570 | { | ||
571 | consensus->peers = NULL; | 400 | consensus->peers = NULL; |
572 | } | ||
573 | else if (num_peers > 0) | 401 | else if (num_peers > 0) |
574 | { | 402 | consensus->peers = |
575 | consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); | 403 | GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); |
576 | } | ||
577 | else | ||
578 | { | ||
579 | GNUNET_break (0); | ||
580 | } | ||
581 | 404 | ||
582 | consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); | 405 | consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); |
583 | 406 | ||
@@ -615,45 +438,37 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, | |||
615 | GNUNET_CONSENSUS_InsertDoneCallback idc, | 438 | GNUNET_CONSENSUS_InsertDoneCallback idc, |
616 | void *idc_cls) | 439 | void *idc_cls) |
617 | { | 440 | { |
618 | GNUNET_assert (NULL == consensus->idc); | 441 | struct QueuedMessage *qmsg; |
619 | GNUNET_assert (NULL == consensus->insert_element); | 442 | struct GNUNET_CONSENSUS_ElementMessage *element_msg; |
620 | GNUNET_assert (NULL == consensus->conclude_cb); | 443 | size_t element_msg_size; |
621 | 444 | ||
622 | consensus->idc = idc; | 445 | LOG (GNUNET_ERROR_TYPE_INFO, "inserting, size=%llu\n", element->size); |
623 | consensus->idc_cls = idc_cls; | ||
624 | consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size); | ||
625 | 446 | ||
626 | if (consensus->joined == 0) | 447 | element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + |
627 | { | 448 | element->size); |
628 | return; | ||
629 | } | ||
630 | 449 | ||
631 | ntr_insert (consensus); | 450 | element_msg = GNUNET_malloc (element_msg_size); |
632 | } | 451 | element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); |
452 | element_msg->header.size = htons (element_msg_size); | ||
453 | memcpy (&element_msg[1], element->data, element->size); | ||
633 | 454 | ||
455 | qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); | ||
456 | qmsg->msg = (struct GNUNET_MessageHeader *) element_msg; | ||
457 | qmsg->size = element_msg_size; | ||
458 | qmsg->idc = idc; | ||
459 | qmsg->idc_cls = idc_cls; | ||
634 | 460 | ||
635 | /** | 461 | GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); |
636 | * Begin reconciling elements with other peers. | ||
637 | * | ||
638 | * @param consensus handle for the consensus session | ||
639 | */ | ||
640 | void | ||
641 | GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus) | ||
642 | { | ||
643 | GNUNET_assert (NULL == consensus->idc); | ||
644 | GNUNET_assert (NULL == consensus->insert_element); | ||
645 | GNUNET_assert (GNUNET_NO == consensus->begin_requested); | ||
646 | GNUNET_assert (GNUNET_NO == consensus->begin_sent); | ||
647 | |||
648 | consensus->begin_requested = GNUNET_YES; | ||
649 | 462 | ||
650 | ntr_begin (consensus); | 463 | schedule_transmit (consensus); |
651 | } | 464 | } |
652 | 465 | ||
653 | 466 | ||
654 | /** | 467 | /** |
655 | * We are finished inserting new elements into the consensus; | 468 | * We are done with inserting new elements into the consensus; |
656 | * try to conclude the consensus within a given time window. | 469 | * try to conclude the consensus within a given time window. |
470 | * After conclude has been called, no further elements may be | ||
471 | * inserted by the client. | ||
657 | * | 472 | * |
658 | * @param consensus consensus session | 473 | * @param consensus consensus session |
659 | * @param timeout timeout after which the conculde callback | 474 | * @param timeout timeout after which the conculde callback |
@@ -664,20 +479,32 @@ GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus) | |||
664 | void | 479 | void |
665 | GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, | 480 | GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, |
666 | struct GNUNET_TIME_Relative timeout, | 481 | struct GNUNET_TIME_Relative timeout, |
482 | unsigned int min_group_size_in_consensus, | ||
667 | GNUNET_CONSENSUS_ConcludeCallback conclude, | 483 | GNUNET_CONSENSUS_ConcludeCallback conclude, |
668 | void *conclude_cls) | 484 | void *conclude_cls) |
669 | { | 485 | { |
486 | struct QueuedMessage *qmsg; | ||
487 | struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg; | ||
488 | |||
670 | GNUNET_assert (NULL != conclude); | 489 | GNUNET_assert (NULL != conclude); |
671 | GNUNET_assert (NULL == consensus->conclude_cb); | 490 | GNUNET_assert (NULL == consensus->conclude_cb); |
672 | 491 | ||
673 | consensus->conclude_cls = conclude_cls; | 492 | consensus->conclude_cls = conclude_cls; |
674 | consensus->conclude_cb = conclude; | 493 | consensus->conclude_cb = conclude; |
675 | consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout); | ||
676 | 494 | ||
495 | conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); | ||
496 | conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); | ||
497 | conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); | ||
498 | conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); | ||
499 | conclude_msg->min_group_size = min_group_size_in_consensus; | ||
500 | |||
501 | qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); | ||
502 | qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg; | ||
503 | qmsg->size = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage); | ||
504 | |||
505 | GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); | ||
677 | 506 | ||
678 | /* if transmitting the conclude message is not possible right now, transmit_join | 507 | schedule_transmit (consensus); |
679 | * or transmit_ack will handle it */ | ||
680 | ntr_conclude (consensus); | ||
681 | } | 508 | } |
682 | 509 | ||
683 | 510 | ||