diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-07-02 19:21:17 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-07-02 19:21:17 +0000 |
commit | 8d7168b4fead012aaff73313248415ba2d395082 (patch) | |
tree | d52e829d28ed0577e93bc1f33b4b0b1377294091 /src/scalarproduct | |
parent | 41356c04c3dcedb29de159a48393523fa07bd5d1 (diff) | |
download | gnunet-8d7168b4fead012aaff73313248415ba2d395082.tar.gz gnunet-8d7168b4fead012aaff73313248415ba2d395082.zip |
rewrite scalarproduct_api to use MQ
Diffstat (limited to 'src/scalarproduct')
-rw-r--r-- | src/scalarproduct/scalarproduct_api.c | 397 |
1 files changed, 164 insertions, 233 deletions
diff --git a/src/scalarproduct/scalarproduct_api.c b/src/scalarproduct/scalarproduct_api.c index 71db3cd86..8639b4e5a 100644 --- a/src/scalarproduct/scalarproduct_api.c +++ b/src/scalarproduct/scalarproduct_api.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | Copyright (C) 2013, 2014 GNUnet e.V. | 3 | Copyright (C) 2013, 2014, 2016 GNUnet e.V. |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -23,8 +23,6 @@ | |||
23 | * @author Christian Fuchs | 23 | * @author Christian Fuchs |
24 | * @author Gaurav Kukreja | 24 | * @author Gaurav Kukreja |
25 | * @author Christian Grothoff | 25 | * @author Christian Grothoff |
26 | * | ||
27 | * TODO: use MQ | ||
28 | */ | 26 | */ |
29 | #include "platform.h" | 27 | #include "platform.h" |
30 | #include "gnunet_util_lib.h" | 28 | #include "gnunet_util_lib.h" |
@@ -62,22 +60,7 @@ struct GNUNET_SCALARPRODUCT_ComputationHandle | |||
62 | /** | 60 | /** |
63 | * Current connection to the scalarproduct service. | 61 | * Current connection to the scalarproduct service. |
64 | */ | 62 | */ |
65 | struct GNUNET_CLIENT_Connection *client; | 63 | struct GNUNET_MQ_Handle *mq; |
66 | |||
67 | /** | ||
68 | * Current transmit handle. | ||
69 | */ | ||
70 | struct GNUNET_CLIENT_TransmitHandle *th; | ||
71 | |||
72 | /** | ||
73 | * the client's elements which | ||
74 | */ | ||
75 | struct GNUNET_SCALARPRODUCT_Element *elements; | ||
76 | |||
77 | /** | ||
78 | * Message to be sent to the scalarproduct service | ||
79 | */ | ||
80 | struct GNUNET_MessageHeader *msg; | ||
81 | 64 | ||
82 | /** | 65 | /** |
83 | * Function to call after transmission of the request (Bob). | 66 | * Function to call after transmission of the request (Bob). |
@@ -105,144 +88,28 @@ struct GNUNET_SCALARPRODUCT_ComputationHandle | |||
105 | */ | 88 | */ |
106 | struct GNUNET_HashCode key; | 89 | struct GNUNET_HashCode key; |
107 | 90 | ||
108 | /** | ||
109 | * count of all @e elements we offer for computation | ||
110 | */ | ||
111 | uint32_t element_count_total; | ||
112 | |||
113 | /** | ||
114 | * count of the transfered @e elements we offer for computation | ||
115 | */ | ||
116 | uint32_t element_count_transfered; | ||
117 | |||
118 | /** | ||
119 | * Type to use for the multipart messages. | ||
120 | */ | ||
121 | uint16_t mp_type; | ||
122 | |||
123 | }; | 91 | }; |
124 | 92 | ||
125 | 93 | ||
126 | /** | 94 | /** |
127 | * Called when a response is received from the service. After basic | 95 | * Called when a response is received from the service. Perform basic |
128 | * check, the handler in `h->response_proc` is called. This functions | 96 | * check that the message is well-formed. |
129 | * handles the response to the client which used the API. | ||
130 | * | 97 | * |
131 | * @param cls Pointer to the Master Context | 98 | * @param cls Pointer to the Master Context |
132 | * @param msg Pointer to the data received in response | 99 | * @param message Pointer to the data received in response |
100 | * @return #GNUNET_OK if @a message is well-formed | ||
133 | */ | 101 | */ |
134 | static void | 102 | static int |
135 | receive_cb (void *cls, | 103 | check_response (void *cls, |
136 | const struct GNUNET_MessageHeader *msg) | 104 | const struct ClientResponseMessage *message) |
137 | { | 105 | { |
138 | struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls; | 106 | if (ntohs (message->header.size) != |
139 | const struct ClientResponseMessage *message; | ||
140 | enum GNUNET_SCALARPRODUCT_ResponseStatus status; | ||
141 | |||
142 | if (NULL == msg) | ||
143 | { | ||
144 | LOG (GNUNET_ERROR_TYPE_INFO, | ||
145 | "Disconnected from SCALARPRODUCT service.\n"); | ||
146 | h->response_proc (h, | ||
147 | NULL, | ||
148 | GNUNET_SCALARPRODUCT_STATUS_DISCONNECTED); | ||
149 | return; | ||
150 | } | ||
151 | if (ntohs (msg->size) < sizeof (struct ClientResponseMessage)) | ||
152 | { | ||
153 | GNUNET_break (0); | ||
154 | h->response_proc (h, | ||
155 | NULL, | ||
156 | GNUNET_SCALARPRODUCT_STATUS_INVALID_RESPONSE); | ||
157 | return; | ||
158 | } | ||
159 | message = (const struct ClientResponseMessage *) msg; | ||
160 | if (ntohs (msg->size) != | ||
161 | ntohl (message->product_length) + sizeof (struct ClientResponseMessage)) | 107 | ntohl (message->product_length) + sizeof (struct ClientResponseMessage)) |
162 | { | 108 | { |
163 | GNUNET_break (0); | 109 | GNUNET_break (0); |
164 | h->response_proc (h, | 110 | return GNUNET_SYSERR; |
165 | NULL, | ||
166 | GNUNET_SCALARPRODUCT_STATUS_INVALID_RESPONSE); | ||
167 | return; | ||
168 | } | ||
169 | status = (enum GNUNET_SCALARPRODUCT_ResponseStatus) ntohl (message->status); | ||
170 | h->response_proc (h, | ||
171 | message, | ||
172 | status); | ||
173 | } | ||
174 | |||
175 | |||
176 | /** | ||
177 | * Transmits the request to the SCALARPRODUCT service | ||
178 | * | ||
179 | * @param cls Closure with the `struct GNUNET_SCALARPRODUCT_ComputationHandle` | ||
180 | * @param size Size of the buffer @a buf | ||
181 | * @param buf Pointer to the buffer | ||
182 | * @return Size of the message sent | ||
183 | */ | ||
184 | static size_t | ||
185 | do_send_message (void *cls, | ||
186 | size_t size, | ||
187 | void *buf) | ||
188 | { | ||
189 | struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls; | ||
190 | struct ComputationBobCryptodataMultipartMessage *msg; | ||
191 | size_t ret; | ||
192 | uint32_t nsize; | ||
193 | uint32_t todo; | ||
194 | |||
195 | h->th = NULL; | ||
196 | if (NULL == buf) | ||
197 | { | ||
198 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
199 | "Failed to transmit request to SCALARPRODUCT.\n"); | ||
200 | /* notify caller about the error, done here */ | ||
201 | h->response_proc (h, NULL, | ||
202 | GNUNET_SCALARPRODUCT_STATUS_FAILURE); | ||
203 | return 0; | ||
204 | } | ||
205 | ret = ntohs (h->msg->size); | ||
206 | memcpy (buf, h->msg, ret); | ||
207 | GNUNET_free (h->msg); | ||
208 | h->msg = NULL; | ||
209 | |||
210 | /* done sending? */ | ||
211 | if (h->element_count_total == h->element_count_transfered) | ||
212 | { | ||
213 | GNUNET_CLIENT_receive (h->client, | ||
214 | &receive_cb, h, | ||
215 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
216 | return ret; | ||
217 | } | 111 | } |
218 | 112 | return GNUNET_OK; | |
219 | todo = h->element_count_total - h->element_count_transfered; | ||
220 | nsize = sizeof (struct ComputationBobCryptodataMultipartMessage) | ||
221 | + todo * sizeof (struct GNUNET_SCALARPRODUCT_Element); | ||
222 | if (GNUNET_SERVER_MAX_MESSAGE_SIZE <= size) | ||
223 | { | ||
224 | /* cannot do all of them, limit to what is possible in one message */ | ||
225 | todo = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct ComputationBobCryptodataMultipartMessage)) | ||
226 | / sizeof (struct GNUNET_SCALARPRODUCT_Element); | ||
227 | nsize = sizeof (struct ComputationBobCryptodataMultipartMessage) | ||
228 | + todo * sizeof (struct GNUNET_SCALARPRODUCT_Element); | ||
229 | } | ||
230 | |||
231 | msg = GNUNET_malloc (nsize); | ||
232 | h->msg = &msg->header; | ||
233 | msg->header.size = htons (nsize); | ||
234 | msg->header.type = htons (h->mp_type); | ||
235 | msg->element_count_contained = htonl (todo); | ||
236 | memcpy (&msg[1], | ||
237 | &h->elements[h->element_count_transfered], | ||
238 | todo * sizeof (struct GNUNET_SCALARPRODUCT_Element)); | ||
239 | h->element_count_transfered += todo; | ||
240 | h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, nsize, | ||
241 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
242 | GNUNET_NO, | ||
243 | &do_send_message, h); | ||
244 | GNUNET_assert (NULL != h->th); | ||
245 | return ret; | ||
246 | } | 113 | } |
247 | 114 | ||
248 | 115 | ||
@@ -268,6 +135,28 @@ process_status_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h, | |||
268 | 135 | ||
269 | 136 | ||
270 | /** | 137 | /** |
138 | * Called when a response is received from the service. After basic | ||
139 | * check, the handler in `h->response_proc` is called. This functions | ||
140 | * handles the response to the client which used the API. | ||
141 | * | ||
142 | * @param cls Pointer to the Master Context | ||
143 | * @param msg Pointer to the data received in response | ||
144 | */ | ||
145 | static void | ||
146 | handle_response (void *cls, | ||
147 | const struct ClientResponseMessage *message) | ||
148 | { | ||
149 | struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls; | ||
150 | enum GNUNET_SCALARPRODUCT_ResponseStatus status; | ||
151 | |||
152 | status = (enum GNUNET_SCALARPRODUCT_ResponseStatus) ntohl (message->status); | ||
153 | h->response_proc (h, | ||
154 | message, | ||
155 | status); | ||
156 | } | ||
157 | |||
158 | |||
159 | /** | ||
271 | * Check if the keys for all given elements are unique. | 160 | * Check if the keys for all given elements are unique. |
272 | * | 161 | * |
273 | * @param elements elements to check | 162 | * @param elements elements to check |
@@ -302,6 +191,27 @@ check_unique (const struct GNUNET_SCALARPRODUCT_Element *elements, | |||
302 | 191 | ||
303 | 192 | ||
304 | /** | 193 | /** |
194 | * We encountered an error communicating with the set service while | ||
195 | * performing a set operation. Report to the application. | ||
196 | * | ||
197 | * @param cls the `struct GNUNET_SCALARPRODUCT_ComputationHandle` | ||
198 | * @param error error code | ||
199 | */ | ||
200 | static void | ||
201 | mq_error_handler (void *cls, | ||
202 | enum GNUNET_MQ_Error error) | ||
203 | { | ||
204 | struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls; | ||
205 | |||
206 | LOG (GNUNET_ERROR_TYPE_INFO, | ||
207 | "Disconnected from SCALARPRODUCT service.\n"); | ||
208 | h->response_proc (h, | ||
209 | NULL, | ||
210 | GNUNET_SCALARPRODUCT_STATUS_DISCONNECTED); | ||
211 | } | ||
212 | |||
213 | |||
214 | /** | ||
305 | * Used by Bob's client to cooperate with Alice, | 215 | * Used by Bob's client to cooperate with Alice, |
306 | * | 216 | * |
307 | * @param cfg the gnunet configuration handle | 217 | * @param cfg the gnunet configuration handle |
@@ -320,65 +230,79 @@ GNUNET_SCALARPRODUCT_accept_computation (const struct GNUNET_CONFIGURATION_Handl | |||
320 | GNUNET_SCALARPRODUCT_ContinuationWithStatus cont, | 230 | GNUNET_SCALARPRODUCT_ContinuationWithStatus cont, |
321 | void *cont_cls) | 231 | void *cont_cls) |
322 | { | 232 | { |
323 | struct GNUNET_SCALARPRODUCT_ComputationHandle *h; | 233 | GNUNET_MQ_hd_var_size (response, |
234 | GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT, | ||
235 | struct ClientResponseMessage); | ||
236 | struct GNUNET_SCALARPRODUCT_ComputationHandle *h | ||
237 | = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); | ||
238 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
239 | make_response_handler (h), | ||
240 | GNUNET_MQ_handler_end () | ||
241 | }; | ||
242 | struct GNUNET_MQ_Envelope *env; | ||
324 | struct BobComputationMessage *msg; | 243 | struct BobComputationMessage *msg; |
244 | struct ComputationBobCryptodataMultipartMessage *mmsg; | ||
325 | uint32_t size; | 245 | uint32_t size; |
326 | uint16_t possible; | 246 | uint16_t possible; |
247 | uint16_t todo; | ||
248 | uint32_t element_count_transfered; | ||
249 | |||
327 | 250 | ||
328 | if (GNUNET_SYSERR == check_unique (elements, element_count)) | 251 | if (GNUNET_SYSERR == check_unique (elements, |
252 | element_count)) | ||
329 | return NULL; | 253 | return NULL; |
330 | h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); | ||
331 | h->cont_status = cont; | 254 | h->cont_status = cont; |
332 | h->cont_cls = cont_cls; | 255 | h->cont_cls = cont_cls; |
333 | h->response_proc = &process_status_message; | 256 | h->response_proc = &process_status_message; |
334 | h->cfg = cfg; | 257 | h->cfg = cfg; |
335 | h->key = *session_key; | 258 | h->key = *session_key; |
336 | h->client = GNUNET_CLIENT_connect ("scalarproduct-bob", cfg); | 259 | h->mq = GNUNET_CLIENT_connecT (cfg, |
337 | h->element_count_total = element_count; | 260 | "scalarproduct-bob", |
338 | h->mp_type = GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART_BOB; | 261 | handlers, |
339 | if (NULL == h->client) | 262 | &mq_error_handler, |
263 | h); | ||
264 | if (NULL == h->mq) | ||
340 | { | 265 | { |
341 | /* scalarproduct configuration error */ | 266 | /* scalarproduct configuration error */ |
342 | GNUNET_break (0); | 267 | GNUNET_break (0); |
343 | GNUNET_free (h); | 268 | GNUNET_free (h); |
344 | return NULL; | 269 | return NULL; |
345 | } | 270 | } |
346 | size = sizeof (struct BobComputationMessage) | 271 | possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BobComputationMessage)) |
347 | + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element); | 272 | / sizeof (struct GNUNET_SCALARPRODUCT_Element); |
348 | if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size) | 273 | todo = GNUNET_MIN (possible, |
349 | { | 274 | element_count); |
350 | possible = element_count; | 275 | size = todo * sizeof (struct GNUNET_SCALARPRODUCT_Element); |
351 | h->element_count_transfered = element_count; | 276 | env = GNUNET_MQ_msg_extra (msg, |
352 | } | 277 | size, |
353 | else | 278 | GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB); |
354 | { | ||
355 | /* create a multipart msg, first we calculate a new msg size for the head msg */ | ||
356 | possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BobComputationMessage)) | ||
357 | / sizeof (struct GNUNET_SCALARPRODUCT_Element); | ||
358 | h->element_count_transfered = possible; | ||
359 | size = sizeof (struct BobComputationMessage) | ||
360 | + possible * sizeof (struct GNUNET_SCALARPRODUCT_Element); | ||
361 | h->elements = GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count); | ||
362 | memcpy (h->elements, | ||
363 | elements, | ||
364 | sizeof (struct GNUNET_SCALARPRODUCT_Element) * element_count); | ||
365 | } | ||
366 | |||
367 | msg = GNUNET_malloc (size); | ||
368 | h->msg = &msg->header; | ||
369 | msg->header.size = htons (size); | ||
370 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB); | ||
371 | msg->element_count_total = htonl (element_count); | 279 | msg->element_count_total = htonl (element_count); |
372 | msg->element_count_contained = htonl (possible); | 280 | msg->element_count_contained = htonl (todo); |
373 | msg->session_key = *session_key; | 281 | msg->session_key = *session_key; |
374 | memcpy (&msg[1], | 282 | memcpy (&msg[1], |
375 | elements, | 283 | elements, |
376 | possible * sizeof (struct GNUNET_SCALARPRODUCT_Element)); | 284 | size); |
377 | h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, | 285 | element_count_transfered = todo; |
378 | GNUNET_TIME_UNIT_FOREVER_REL, | 286 | GNUNET_MQ_send (h->mq, |
379 | GNUNET_YES, /* retry is OK in the initial stage */ | 287 | env); |
380 | &do_send_message, h); | 288 | possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (*mmsg)) |
381 | GNUNET_assert (NULL != h->th); | 289 | / sizeof (struct GNUNET_SCALARPRODUCT_Element); |
290 | while (element_count_transfered < element_count) | ||
291 | { | ||
292 | todo = GNUNET_MIN (possible, | ||
293 | element_count - element_count_transfered); | ||
294 | size = todo * sizeof (struct GNUNET_SCALARPRODUCT_Element); | ||
295 | env = GNUNET_MQ_msg_extra (mmsg, | ||
296 | size, | ||
297 | GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART_BOB); | ||
298 | mmsg->element_count_contained = htonl (todo); | ||
299 | memcpy (&mmsg[1], | ||
300 | &elements[element_count_transfered], | ||
301 | size); | ||
302 | element_count_transfered += todo; | ||
303 | GNUNET_MQ_send (h->mq, | ||
304 | env); | ||
305 | } | ||
382 | return h; | 306 | return h; |
383 | } | 307 | } |
384 | 308 | ||
@@ -464,67 +388,81 @@ GNUNET_SCALARPRODUCT_start_computation (const struct GNUNET_CONFIGURATION_Handle | |||
464 | GNUNET_SCALARPRODUCT_DatumProcessor cont, | 388 | GNUNET_SCALARPRODUCT_DatumProcessor cont, |
465 | void *cont_cls) | 389 | void *cont_cls) |
466 | { | 390 | { |
467 | struct GNUNET_SCALARPRODUCT_ComputationHandle *h; | 391 | GNUNET_MQ_hd_var_size (response, |
392 | GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT, | ||
393 | struct ClientResponseMessage); | ||
394 | struct GNUNET_SCALARPRODUCT_ComputationHandle *h | ||
395 | = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); | ||
396 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
397 | make_response_handler (h), | ||
398 | GNUNET_MQ_handler_end () | ||
399 | }; | ||
400 | struct GNUNET_MQ_Envelope *env; | ||
468 | struct AliceComputationMessage *msg; | 401 | struct AliceComputationMessage *msg; |
402 | struct ComputationBobCryptodataMultipartMessage *mmsg; | ||
469 | uint32_t size; | 403 | uint32_t size; |
470 | uint32_t possible; | 404 | uint16_t possible; |
405 | uint16_t todo; | ||
406 | uint32_t element_count_transfered; | ||
471 | 407 | ||
472 | if (GNUNET_SYSERR == check_unique (elements, element_count)) | 408 | if (GNUNET_SYSERR == check_unique (elements, |
409 | element_count)) | ||
473 | return NULL; | 410 | return NULL; |
474 | h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); | 411 | h->mq = GNUNET_CLIENT_connecT (cfg, |
475 | h->client = GNUNET_CLIENT_connect ("scalarproduct-alice", cfg); | 412 | "scalarproduct-alice", |
476 | if (NULL == h->client) | 413 | handlers, |
414 | &mq_error_handler, | ||
415 | h); | ||
416 | if (NULL == h->mq) | ||
477 | { | 417 | { |
478 | /* missconfigured scalarproduct service */ | 418 | /* missconfigured scalarproduct service */ |
479 | GNUNET_break (0); | 419 | GNUNET_break (0); |
480 | GNUNET_free (h); | 420 | GNUNET_free (h); |
481 | return NULL; | 421 | return NULL; |
482 | } | 422 | } |
483 | h->element_count_total = element_count; | ||
484 | h->cont_datum = cont; | 423 | h->cont_datum = cont; |
485 | h->cont_cls = cont_cls; | 424 | h->cont_cls = cont_cls; |
486 | h->response_proc = &process_result_message; | 425 | h->response_proc = &process_result_message; |
487 | h->cfg = cfg; | 426 | h->cfg = cfg; |
488 | h->key = *session_key; | 427 | h->key = *session_key; |
489 | h->mp_type = GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART_ALICE; | ||
490 | size = sizeof (struct AliceComputationMessage) | ||
491 | + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element); | ||
492 | if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size) | ||
493 | { | ||
494 | possible = element_count; | ||
495 | h->element_count_transfered = element_count; | ||
496 | } | ||
497 | else | ||
498 | { | ||
499 | /* create a multipart msg, first we calculate a new msg size for the head msg */ | ||
500 | possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct AliceComputationMessage)) | ||
501 | / sizeof (struct GNUNET_SCALARPRODUCT_Element); | ||
502 | h->element_count_transfered = possible; | ||
503 | size = sizeof (struct AliceComputationMessage) | ||
504 | + possible * sizeof (struct GNUNET_SCALARPRODUCT_Element); | ||
505 | h->elements = GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count); | ||
506 | memcpy (h->elements, | ||
507 | elements, | ||
508 | sizeof (struct GNUNET_SCALARPRODUCT_Element) * element_count); | ||
509 | } | ||
510 | 428 | ||
511 | msg = GNUNET_malloc (size); | 429 | possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct AliceComputationMessage)) |
512 | h->msg = &msg->header; | 430 | / sizeof (struct GNUNET_SCALARPRODUCT_Element); |
513 | msg->header.size = htons (size); | 431 | todo = GNUNET_MIN (possible, |
514 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE); | 432 | element_count); |
433 | size = todo * sizeof (struct GNUNET_SCALARPRODUCT_Element); | ||
434 | env = GNUNET_MQ_msg_extra (msg, | ||
435 | size, | ||
436 | GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE); | ||
515 | msg->element_count_total = htonl (element_count); | 437 | msg->element_count_total = htonl (element_count); |
516 | msg->element_count_contained = htonl (possible); | 438 | msg->element_count_contained = htonl (todo); |
517 | msg->reserved = htonl (0); | 439 | msg->reserved = htonl (0); |
518 | msg->peer = *peer; | 440 | msg->peer = *peer; |
519 | msg->session_key = *session_key; | 441 | msg->session_key = *session_key; |
520 | memcpy (&msg[1], | 442 | memcpy (&msg[1], |
521 | elements, | 443 | elements, |
522 | sizeof (struct GNUNET_SCALARPRODUCT_Element) * possible); | 444 | size); |
523 | h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, | 445 | GNUNET_MQ_send (h->mq, |
524 | GNUNET_TIME_UNIT_FOREVER_REL, | 446 | env); |
525 | GNUNET_YES, /* retry is OK in the initial stage */ | 447 | element_count_transfered = todo; |
526 | &do_send_message, h); | 448 | possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (*mmsg)) |
527 | GNUNET_assert (NULL != h->th); | 449 | / sizeof (struct GNUNET_SCALARPRODUCT_Element); |
450 | while (element_count_transfered < element_count) | ||
451 | { | ||
452 | todo = GNUNET_MIN (possible, | ||
453 | element_count - element_count_transfered); | ||
454 | size = todo * sizeof (struct GNUNET_SCALARPRODUCT_Element); | ||
455 | env = GNUNET_MQ_msg_extra (mmsg, | ||
456 | size, | ||
457 | GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART_ALICE); | ||
458 | mmsg->element_count_contained = htonl (todo); | ||
459 | memcpy (&mmsg[1], | ||
460 | &elements[element_count_transfered], | ||
461 | size); | ||
462 | element_count_transfered += todo; | ||
463 | GNUNET_MQ_send (h->mq, | ||
464 | env); | ||
465 | } | ||
528 | return h; | 466 | return h; |
529 | } | 467 | } |
530 | 468 | ||
@@ -538,17 +476,10 @@ GNUNET_SCALARPRODUCT_start_computation (const struct GNUNET_CONFIGURATION_Handle | |||
538 | void | 476 | void |
539 | GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle *h) | 477 | GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle *h) |
540 | { | 478 | { |
541 | if (NULL != h->th) | 479 | if (NULL != h->mq) |
542 | { | ||
543 | GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); | ||
544 | h->th = NULL; | ||
545 | } | ||
546 | GNUNET_free_non_null (h->elements); | ||
547 | GNUNET_free_non_null (h->msg); | ||
548 | if (NULL != h->client) | ||
549 | { | 480 | { |
550 | GNUNET_CLIENT_disconnect (h->client); | 481 | GNUNET_MQ_destroy (h->mq); |
551 | h->client = NULL; | 482 | h->mq = NULL; |
552 | } | 483 | } |
553 | GNUNET_free (h); | 484 | GNUNET_free (h); |
554 | } | 485 | } |