aboutsummaryrefslogtreecommitdiff
path: root/src/scalarproduct/scalarproduct_api.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2014-05-24 19:57:15 +0000
committerChristian Grothoff <christian@grothoff.org>2014-05-24 19:57:15 +0000
commit31536a9bb75502f4c090472f188e1eec138515f7 (patch)
tree2c75617baa1ca040ddfb6a59d90831624b3512b1 /src/scalarproduct/scalarproduct_api.c
parent152cd13768915399f9e5137d78a2f75296a1b93e (diff)
downloadgnunet-31536a9bb75502f4c090472f188e1eec138515f7.tar.gz
gnunet-31536a9bb75502f4c090472f188e1eec138515f7.zip
cleaning up scalar product client API
Diffstat (limited to 'src/scalarproduct/scalarproduct_api.c')
-rw-r--r--src/scalarproduct/scalarproduct_api.c589
1 files changed, 258 insertions, 331 deletions
diff --git a/src/scalarproduct/scalarproduct_api.c b/src/scalarproduct/scalarproduct_api.c
index 3b92af579..ea62027ab 100644
--- a/src/scalarproduct/scalarproduct_api.c
+++ b/src/scalarproduct/scalarproduct_api.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 (C) 2013 Christian Grothoff (and other contributing authors) 3 (C) 2013, 2014 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -23,7 +23,7 @@
23 * @brief API for the scalarproduct 23 * @brief API for the scalarproduct
24 * @author Christian Fuchs 24 * @author Christian Fuchs
25 * @author Gaurav Kukreja 25 * @author Gaurav Kukreja
26 * 26 * @author Christian Grothoff
27 */ 27 */
28#include "platform.h" 28#include "platform.h"
29#include "gnunet_util_lib.h" 29#include "gnunet_util_lib.h"
@@ -34,16 +34,19 @@
34 34
35#define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-api",__VA_ARGS__) 35#define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-api",__VA_ARGS__)
36 36
37/**************************************************************
38 *** Datatype Declarations **********
39 **************************************************************/
40 37
41/** 38/**
42 * the abstraction function for our internal callback 39 * The abstraction function for our internal callback
40 *
41 * @param h computation handle
42 * @param msg response we got, NULL on errors
43 * @param status processing status code
43 */ 44 */
44typedef void (*GNUNET_SCALARPRODUCT_ResponseMessageHandler) (void *cls, 45typedef void
45 const struct GNUNET_MessageHeader *msg, 46(*GNUNET_SCALARPRODUCT_ResponseMessageHandler) (struct GNUNET_SCALARPRODUCT_ComputationHandle *h,
46 enum GNUNET_SCALARPRODUCT_ResponseStatus status); 47 const struct ClientResponseMessage *msg,
48 enum GNUNET_SCALARPRODUCT_ResponseStatus status);
49
47 50
48/** 51/**
49 * A handle returned for each computation 52 * A handle returned for each computation
@@ -61,11 +64,6 @@ struct GNUNET_SCALARPRODUCT_ComputationHandle
61 struct GNUNET_CLIENT_Connection *client; 64 struct GNUNET_CLIENT_Connection *client;
62 65
63 /** 66 /**
64 * Handle for statistics.
65 */
66 struct GNUNET_STATISTICS_Handle *stats;
67
68 /**
69 * The shared session key identifying this computation 67 * The shared session key identifying this computation
70 */ 68 */
71 struct GNUNET_HashCode key; 69 struct GNUNET_HashCode key;
@@ -76,31 +74,26 @@ struct GNUNET_SCALARPRODUCT_ComputationHandle
76 struct GNUNET_CLIENT_TransmitHandle *th; 74 struct GNUNET_CLIENT_TransmitHandle *th;
77 75
78 /** 76 /**
79 * count of all elements we offer for computation 77 * count of all @e elements we offer for computation
80 */ 78 */
81 uint32_t element_count_total; 79 uint32_t element_count_total;
82 80
83 /** 81 /**
84 * count of the transfered elements we offer for computation 82 * count of the transfered @e elements we offer for computation
85 */ 83 */
86 uint32_t element_count_transfered; 84 uint32_t element_count_transfered;
87 85
88 /** 86 /**
89 * the client's elements which 87 * the client's elements which
90 */ 88 */
91 struct GNUNET_SCALARPRODUCT_Element * elements; 89 struct GNUNET_SCALARPRODUCT_Element *elements;
92 90
93 /** 91 /**
94 * Message to be sent to the scalarproduct service 92 * Message to be sent to the scalarproduct service
95 */ 93 */
96 void * msg; 94 struct GNUNET_MessageHeader *msg;
97 95
98 /** 96 /**
99 * The client's msg handler callback
100 */
101 union
102 {
103 /**
104 * Function to call after transmission of the request (Bob). 97 * Function to call after transmission of the request (Bob).
105 */ 98 */
106 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status; 99 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
@@ -109,101 +102,99 @@ struct GNUNET_SCALARPRODUCT_ComputationHandle
109 * Function to call after transmission of the request (Alice). 102 * Function to call after transmission of the request (Alice).
110 */ 103 */
111 GNUNET_SCALARPRODUCT_DatumProcessor cont_datum; 104 GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
112 };
113 105
114 /** 106 /**
115 * Closure for 'cont'. 107 * Closure for @e cont_status or @e cont_datum.
116 */ 108 */
117 void *cont_cls; 109 void *cont_cls;
118 110
119 /** 111 /**
120 * API internal callback for results and failures to be forwarded to the client 112 * API internal callback for results and failures to be forwarded to
113 * the client.
121 */ 114 */
122 GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc; 115 GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
123
124 /**
125 *
126 */
127 GNUNET_SCHEDULER_TaskIdentifier cont_multipart;
128};
129
130/**************************************************************
131 *** Forward Function Declarations **********
132 **************************************************************/
133
134void
135GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h);
136 116
137static size_t do_send_message (void *cls, size_t size, void *buf); 117};
138/**************************************************************
139 *** Static Function Declarations **********
140 **************************************************************/
141 118
142 119
143/** 120/**
144 * Handles the STATUS received from the service for a response, does not contain a payload 121 * Handles the STATUS received from the service for a response, does
122 * not contain a payload.
145 * 123 *
146 * @param cls our Handle 124 * @param h our Handle
147 * @param msg Pointer to the response received 125 * @param msg Pointer to the response received
148 * @param status the condition the request was terminated with (eg: disconnect) 126 * @param status the condition the request was terminated with (eg: disconnect)
149 */ 127 */
150static void 128static void
151process_status_message (void *cls, 129process_status_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h,
152 const struct GNUNET_MessageHeader *msg, 130 const struct ClientResponseMessage *msg,
153 enum GNUNET_SCALARPRODUCT_ResponseStatus status) 131 enum GNUNET_SCALARPRODUCT_ResponseStatus status)
154{ 132{
155 struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; 133 if (NULL != h->cont_status)
156 134 h->cont_status (h->cont_cls,
157 qe->cont_status (qe->cont_cls, status); 135 status);
136 GNUNET_SCALARPRODUCT_cancel (h);
158} 137}
159 138
160 139
161/** 140/**
162 * Handles the RESULT received from the service for a request, should contain a result MPI value 141 * Handles the RESULT received from the service for a request, should
142 * contain a result MPI value
163 * 143 *
164 * @param cls our Handle 144 * @param h our Handle
165 * @param msg Pointer to the response received 145 * @param msg Pointer to the response received
166 * @param status the condition the request was terminated with (eg: disconnect) 146 * @param status the condition the request was terminated with (eg: disconnect)
167 */ 147 */
168static void 148static void
169process_result_message (void *cls, 149process_result_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h,
170 const struct GNUNET_MessageHeader *msg, 150 const struct ClientResponseMessage *msg,
171 enum GNUNET_SCALARPRODUCT_ResponseStatus status) 151 enum GNUNET_SCALARPRODUCT_ResponseStatus status)
172{ 152{
173 struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; 153 size_t product_len = ntohl (msg->product_length);
174 const struct GNUNET_SCALARPRODUCT_client_response *message =
175 (const struct GNUNET_SCALARPRODUCT_client_response *) msg;
176 gcry_mpi_t result = NULL; 154 gcry_mpi_t result = NULL;
177 gcry_error_t rc; 155 gcry_error_t rc;
156 gcry_mpi_t num;
157 size_t rsize;
178 158
159 if (ntohs (msg->header.size) - sizeof (struct ClientResponseMessage)
160 != product_len)
161 {
162 GNUNET_break (0);
163 status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
164 }
179 if (GNUNET_SCALARPRODUCT_Status_Success == status) 165 if (GNUNET_SCALARPRODUCT_Status_Success == status)
166 {
167 result = gcry_mpi_new (0);
168
169 if (0 < product_len)
180 { 170 {
181 size_t product_len = ntohl (message->product_length); 171 rsize = 0;
182 result = gcry_mpi_new (0); 172 if (0 != (rc = gcry_mpi_scan (&num, GCRYMPI_FMT_STD,
183 173 &msg[1],
184 if (0 < product_len) 174 product_len,
185 { 175 &rsize)))
186 gcry_mpi_t num; 176 {
187 size_t read = 0; 177 LOG_GCRY (GNUNET_ERROR_TYPE_ERROR,
188 178 "gcry_mpi_scan",
189 if (0 != (rc = gcry_mpi_scan (&num, GCRYMPI_FMT_STD, &message[1], product_len, &read))) 179 rc);
190 { 180 gcry_mpi_release (result);
191 LOG_GCRY(GNUNET_ERROR_TYPE_ERROR, "gcry_mpi_scan", rc); 181 result = NULL;
192 gcry_mpi_release (result); 182 status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
193 result = NULL; 183 }
194 status = GNUNET_SCALARPRODUCT_Status_InvalidResponse; 184 else
195 } 185 {
196 else 186 if (0 < ntohl (msg->range))
197 { 187 gcry_mpi_add (result, result, num);
198 if (0 < message->range) 188 else if (0 > ntohl (msg->range))
199 gcry_mpi_add (result, result, num); 189 gcry_mpi_sub (result, result, num);
200 else if (0 > message->range) 190 gcry_mpi_release (num);
201 gcry_mpi_sub (result, result, num); 191 }
202 gcry_mpi_release (num);
203 }
204 }
205 } 192 }
206 qe->cont_datum (qe->cont_cls, status, result); 193 }
194 h->cont_datum (h->cont_cls, status, result);
195 if (NULL != result)
196 gcry_mpi_release (result);
197 GNUNET_SCALARPRODUCT_cancel (h);
207} 198}
208 199
209 200
@@ -216,144 +207,114 @@ process_result_message (void *cls,
216 * @param msg Pointer to the data received in response 207 * @param msg Pointer to the data received in response
217 */ 208 */
218static void 209static void
219receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) 210receive_cb (void *cls,
211 const struct GNUNET_MessageHeader *msg)
220{ 212{
221 struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls; 213 struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
222 const struct GNUNET_SCALARPRODUCT_client_response *message = 214 const struct ClientResponseMessage *message;
223 (const struct GNUNET_SCALARPRODUCT_client_response *) msg;
224 enum GNUNET_SCALARPRODUCT_ResponseStatus status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
225 215
226 if (NULL == msg) 216 if (NULL == msg)
227 { 217 {
228 LOG (GNUNET_ERROR_TYPE_WARNING, "Disconnected by Service.\n"); 218 LOG (GNUNET_ERROR_TYPE_INFO,
229 status = GNUNET_SCALARPRODUCT_Status_ServiceDisconnected; 219 "Disconnected from SCALARPRODUCT service.\n");
230 } 220 h->response_proc (h,
231 else if ((GNUNET_SYSERR != message->status) && (0 < message->product_length )) 221 NULL,
232 { 222 GNUNET_SCALARPRODUCT_Status_ServiceDisconnected);
233 // response for the responder client, successful 223 return;
234 GNUNET_STATISTICS_update (h->stats,
235 gettext_noop ("# SUC responder result messages received"), 1,
236 GNUNET_NO);
237
238 status = GNUNET_SCALARPRODUCT_Status_Success;
239 }
240 else if (message->status == GNUNET_SYSERR){
241 // service signaled an error
242 status = GNUNET_SCALARPRODUCT_Status_Failure;
243 } 224 }
244 225 if (ntohs (msg->size) != sizeof (struct ClientResponseMessage))
245 if (h->cont_status != NULL) 226 {
246 h->response_proc (h, msg, status); 227 GNUNET_break (0);
247 228 h->response_proc (h,
248 GNUNET_free (h); 229 NULL,
249} 230 GNUNET_SCALARPRODUCT_Status_InvalidResponse);
250 231 return;
251
252static void
253send_multipart (void * cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
254{
255 struct GNUNET_SCALARPRODUCT_ComputationHandle *h = (struct GNUNET_SCALARPRODUCT_ComputationHandle *) cls;
256 struct GNUNET_SCALARPRODUCT_computation_message_multipart *msg;
257 uint32_t size;
258 uint32_t todo;
259
260 h->cont_multipart = GNUNET_SCHEDULER_NO_TASK;
261
262 todo = h->element_count_total - h->element_count_transfered;
263 size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message_multipart) +todo * sizeof (struct GNUNET_SCALARPRODUCT_Element);
264 if (GNUNET_SERVER_MAX_MESSAGE_SIZE <= size) {
265 //create a multipart msg, first we calculate a new msg size for the head msg
266 todo = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct GNUNET_SCALARPRODUCT_computation_message_multipart)) / sizeof (struct GNUNET_SCALARPRODUCT_Element);
267 size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message_multipart) +todo * sizeof (struct GNUNET_SCALARPRODUCT_Element);
268 } 232 }
269 233 message = (const struct ClientResponseMessage *) msg;
270 msg = (struct GNUNET_SCALARPRODUCT_computation_message_multipart*) GNUNET_malloc (size); 234 if (GNUNET_SYSERR == ntohl (message->status))
271 h->msg = msg; 235 {
272 msg->header.size = htons (size); 236 h->response_proc (h,
273 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART); 237 NULL,
274 msg->element_count_contained = htonl (todo); 238 GNUNET_SCALARPRODUCT_Status_Failure);
275 239 return;
276 memcpy (&msg[1], &h->elements[h->element_count_transfered], todo * sizeof (struct GNUNET_SCALARPRODUCT_Element));
277 h->element_count_transfered += todo;
278
279 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
280 GNUNET_TIME_UNIT_FOREVER_REL,
281 GNUNET_YES, // retry is OK in the initial stage
282 &do_send_message, h);
283
284 if (!h->th) {
285 LOG (GNUNET_ERROR_TYPE_ERROR,
286 _ ("Failed to send a multipart message to the scalarproduct service\n"));
287 GNUNET_STATISTICS_update (h->stats,
288 gettext_noop ("# transmission request failures"),
289 1, GNUNET_NO);
290 GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
291 GNUNET_CLIENT_disconnect (h->client);
292 GNUNET_free (h->msg);
293 h->msg = NULL;
294 if (h->cont_status != NULL)
295 h->response_proc (h, NULL, GNUNET_SCALARPRODUCT_Status_Failure);
296
297 GNUNET_SCALARPRODUCT_cancel (cls);
298 } 240 }
241 h->response_proc (h,
242 message,
243 GNUNET_SCALARPRODUCT_Status_Success);
299} 244}
300 245
246
301/** 247/**
302 * Transmits the request to the VectorProduct Service 248 * Transmits the request to the SCALARPRODUCT service
303 * 249 *
304 * @param cls Closure 250 * @param cls Closure with the `struct GNUNET_SCALARPRODUCT_ComputationHandle`
305 * @param size Size of the buffer 251 * @param size Size of the buffer @a buf
306 * @param buf Pointer to the buffer 252 * @param buf Pointer to the buffer
307 *
308 * @return Size of the message sent 253 * @return Size of the message sent
309 */ 254 */
310static size_t 255static size_t
311do_send_message (void *cls, size_t size, 256do_send_message (void *cls,
257 size_t size,
312 void *buf) 258 void *buf)
313{ 259{
314 struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls; 260 struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
261 struct ComputationMultipartMessage *msg;
262 size_t ret;
263 uint32_t nsize;
264 uint32_t todo;
315 265
316 if (NULL == buf) { 266 h->th = NULL;
317 LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n"); 267 if (NULL == buf)
318 GNUNET_STATISTICS_update (h->stats, 268 {
319 gettext_noop ("# transmission request failures"), 269 LOG (GNUNET_ERROR_TYPE_DEBUG,
320 1, GNUNET_NO); 270 "Failed to transmit request to SCALARPRODUCT.\n");
321 271 /* notify caller about the error, done here */
322 // notify caller about the error, done here. 272 h->response_proc (h, NULL,
323 if (h->cont_status != NULL) 273 GNUNET_SCALARPRODUCT_Status_Failure);
324 h->response_proc (h, NULL, GNUNET_SCALARPRODUCT_Status_Failure);
325
326 GNUNET_SCALARPRODUCT_cancel (cls);
327 return 0; 274 return 0;
328 } 275 }
329 memcpy (buf, h->msg, size); 276 ret = ntohs (h->msg->size);
330 277 memcpy (buf, h->msg, ret);
331 GNUNET_free (h->msg); 278 GNUNET_free (h->msg);
332 h->msg = NULL; 279 h->msg = NULL;
333 h->th = NULL;
334
335#if INSANE_STATISTICS
336 GNUNET_STATISTICS_update (h->stats,
337 gettext_noop ("# bytes sent to scalarproduct"), 1,
338 GNUNET_NO);
339#endif
340 280
341 /* done sending */ 281 /* done sending? */
342 if (h->element_count_total == h->element_count_transfered) { 282 if (h->element_count_total == h->element_count_transfered)
343 GNUNET_CLIENT_receive (h->client, &receive_cb, h, 283 {
284 GNUNET_CLIENT_receive (h->client,
285 &receive_cb, h,
344 GNUNET_TIME_UNIT_FOREVER_REL); 286 GNUNET_TIME_UNIT_FOREVER_REL);
345 return size; 287 return ret;
346 } 288 }
347
348 h->cont_multipart = GNUNET_SCHEDULER_add_now (&send_multipart, h);
349
350 return size;
351}
352 289
290 todo = h->element_count_total - h->element_count_transfered;
291 nsize = sizeof (struct ComputationMultipartMessage)
292 + todo * sizeof (struct GNUNET_SCALARPRODUCT_Element);
293 if (GNUNET_SERVER_MAX_MESSAGE_SIZE <= size)
294 {
295 /* cannot do all of them, limit to what is possible in one message */
296 todo = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct ComputationMultipartMessage))
297 / sizeof (struct GNUNET_SCALARPRODUCT_Element);
298 nsize = sizeof (struct ComputationMultipartMessage)
299 + todo * sizeof (struct GNUNET_SCALARPRODUCT_Element);
300 }
353 301
354/************************************************************** 302 msg = GNUNET_malloc (nsize);
355 *** API ********** 303 h->msg = &msg->header;
356 **************************************************************/ 304 msg->header.size = htons (nsize);
305 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART);
306 msg->element_count_contained = htonl (todo);
307 memcpy (&msg[1],
308 &h->elements[h->element_count_transfered],
309 todo * sizeof (struct GNUNET_SCALARPRODUCT_Element));
310 h->element_count_transfered += todo;
311 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, nsize,
312 GNUNET_TIME_UNIT_FOREVER_REL,
313 GNUNET_NO,
314 &do_send_message, h);
315 GNUNET_assert (NULL != h->th);
316 return ret;
317}
357 318
358 319
359/** 320/**
@@ -362,96 +323,75 @@ do_send_message (void *cls, size_t size,
362 * @param cfg the gnunet configuration handle 323 * @param cfg the gnunet configuration handle
363 * @param key Session key unique to the requesting client 324 * @param key Session key unique to the requesting client
364 * @param elements Array of elements of the vector 325 * @param elements Array of elements of the vector
365 * @param element_count Number of elements in the vector 326 * @param element_count Number of elements in the @a elements vector
366 * @param cont Callback function 327 * @param cont Callback function
367 * @param cont_cls Closure for the callback function 328 * @param cont_cls Closure for @a cont
368 *
369 * @return a new handle for this computation 329 * @return a new handle for this computation
370 */ 330 */
371struct GNUNET_SCALARPRODUCT_ComputationHandle * 331struct GNUNET_SCALARPRODUCT_ComputationHandle *
372GNUNET_SCALARPRODUCT_accept_computation (const struct GNUNET_CONFIGURATION_Handle * cfg, 332GNUNET_SCALARPRODUCT_accept_computation (const struct GNUNET_CONFIGURATION_Handle *cfg,
373 const struct GNUNET_HashCode * session_key, 333 const struct GNUNET_HashCode *session_key,
374 const struct GNUNET_SCALARPRODUCT_Element * elements, 334 const struct GNUNET_SCALARPRODUCT_Element *elements,
375 uint32_t element_count, 335 uint32_t element_count,
376 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont, 336 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
377 void * cont_cls) 337 void *cont_cls)
378{ 338{
379 struct GNUNET_SCALARPRODUCT_ComputationHandle *h; 339 struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
380 struct GNUNET_SCALARPRODUCT_computation_message *msg; 340 struct ComputationMessage *msg;
381 uint32_t size; 341 uint32_t size;
382 uint16_t possible; 342 uint16_t possible;
383 343
384 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_computation_message)
385 + element_count * sizeof (int32_t));
386 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); 344 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
345 h->cont_status = cont;
346 h->cont_cls = cont_cls;
347 h->response_proc = &process_status_message;
348 h->cfg = cfg;
349 h->key = *session_key;
387 h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg); 350 h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
388 if (!h->client)
389 {
390 LOG (GNUNET_ERROR_TYPE_ERROR,
391 _ ("Failed to connect to the scalarproduct service\n"));
392 GNUNET_free (h);
393 return NULL;
394 }
395 h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
396 if (!h->stats)
397 {
398 LOG (GNUNET_ERROR_TYPE_ERROR,
399 _ ("Failed to send a message to the statistics service\n"));
400 GNUNET_CLIENT_disconnect (h->client);
401 GNUNET_free (h);
402 return NULL;
403 }
404
405 h->element_count_total = element_count; 351 h->element_count_total = element_count;
406 size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message) + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element); 352 if (NULL == h->client)
407 if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size) { 353 {
354 /* scalarproduct configuration error */
355 GNUNET_break (0);
356 GNUNET_free (h);
357 return NULL;
358 }
359 size = sizeof (struct ComputationMessage)
360 + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element);
361 if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size)
362 {
408 possible = element_count; 363 possible = element_count;
409 h->element_count_transfered = element_count; 364 h->element_count_transfered = element_count;
410 } 365 }
411 else { 366 else
412 //create a multipart msg, first we calculate a new msg size for the head msg 367 {
413 possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct GNUNET_SCALARPRODUCT_computation_message)) / sizeof (struct GNUNET_SCALARPRODUCT_Element); 368 /* create a multipart msg, first we calculate a new msg size for the head msg */
369 possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct ComputationMessage))
370 / sizeof (struct GNUNET_SCALARPRODUCT_Element);
414 h->element_count_transfered = possible; 371 h->element_count_transfered = possible;
415 size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message) + possible*sizeof (struct GNUNET_SCALARPRODUCT_Element); 372 size = sizeof (struct ComputationMessage)
416 h->elements = (struct GNUNET_SCALARPRODUCT_Element*) 373 + possible * sizeof (struct GNUNET_SCALARPRODUCT_Element);
417 GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count); 374 h->elements = GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count);
418 memcpy (h->elements, elements, sizeof (struct GNUNET_SCALARPRODUCT_Element)*element_count); 375 memcpy (h->elements,
376 elements,
377 sizeof (struct GNUNET_SCALARPRODUCT_Element) * element_count);
419 } 378 }
420 379
421 h->cont_status = cont; 380 msg = GNUNET_malloc (size);
422 h->cont_cls = cont_cls; 381 h->msg = &msg->header;
423 h->response_proc = &process_status_message;
424 h->cfg = cfg;
425 memcpy (&h->key, session_key, sizeof (struct GNUNET_HashCode));
426
427 msg = (struct GNUNET_SCALARPRODUCT_computation_message*) GNUNET_malloc (size);
428 h->msg = msg;
429 msg->header.size = htons (size); 382 msg->header.size = htons (size);
430 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB); 383 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB);
431 msg->element_count_total = htonl (element_count); 384 msg->element_count_total = htonl (element_count);
432 msg->element_count_contained = htonl (possible); 385 msg->element_count_contained = htonl (possible);
433 386 msg->session_key = *session_key;
434 memcpy (&msg->session_key, session_key, sizeof (struct GNUNET_HashCode)); 387 memcpy (&msg[1],
435 memcpy (&msg[1], elements, possible); 388 elements,
436 389 possible);
437 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, 390 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
438 GNUNET_TIME_UNIT_FOREVER_REL, 391 GNUNET_TIME_UNIT_FOREVER_REL,
439 GNUNET_YES, // retry is OK in the initial stage 392 GNUNET_YES, /* retry is OK in the initial stage */
440 &do_send_message, h); 393 &do_send_message, h);
441 if (!h->th) 394 GNUNET_assert (NULL != h->th);
442 {
443 LOG (GNUNET_ERROR_TYPE_ERROR,
444 _ ("Failed to send a message to the scalarproduct service\n"));
445 GNUNET_STATISTICS_update (h->stats,
446 gettext_noop ("# transmission request failures"),
447 1, GNUNET_NO);
448 GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
449 GNUNET_CLIENT_disconnect (h->client);
450 GNUNET_free (h->msg);
451 GNUNET_free_non_null (h->elements);
452 GNUNET_free (h);
453 return NULL;
454 }
455 return h; 395 return h;
456} 396}
457 397
@@ -463,99 +403,82 @@ GNUNET_SCALARPRODUCT_accept_computation (const struct GNUNET_CONFIGURATION_Handl
463 * @param session_key Session key should be unique to the requesting client 403 * @param session_key Session key should be unique to the requesting client
464 * @param peer PeerID of the other peer 404 * @param peer PeerID of the other peer
465 * @param elements Array of elements of the vector 405 * @param elements Array of elements of the vector
466 * @param element_count Number of elements in the vector 406 * @param element_count Number of elements in the @a elements vector
467 * @param cont Callback function 407 * @param cont Callback function
468 * @param cont_cls Closure for the callback function 408 * @param cont_cls Closure for @a cont
469 *
470 * @return a new handle for this computation 409 * @return a new handle for this computation
471 */ 410 */
472struct GNUNET_SCALARPRODUCT_ComputationHandle * 411struct GNUNET_SCALARPRODUCT_ComputationHandle *
473GNUNET_SCALARPRODUCT_start_computation (const struct GNUNET_CONFIGURATION_Handle * cfg, 412GNUNET_SCALARPRODUCT_start_computation (const struct GNUNET_CONFIGURATION_Handle *cfg,
474 const struct GNUNET_HashCode * session_key, 413 const struct GNUNET_HashCode *session_key,
475 const struct GNUNET_PeerIdentity *peer, 414 const struct GNUNET_PeerIdentity *peer,
476 const struct GNUNET_SCALARPRODUCT_Element * elements, 415 const struct GNUNET_SCALARPRODUCT_Element *elements,
477 uint32_t element_count, 416 uint32_t element_count,
478 GNUNET_SCALARPRODUCT_DatumProcessor cont, 417 GNUNET_SCALARPRODUCT_DatumProcessor cont,
479 void * cont_cls) 418 void *cont_cls)
480{ 419{
481 struct GNUNET_SCALARPRODUCT_ComputationHandle *h; 420 struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
482 struct GNUNET_SCALARPRODUCT_computation_message *msg; 421 struct ComputationMessage *msg;
483 uint32_t size; 422 uint32_t size;
484 uint16_t possible; 423 uint32_t possible;
485 424
486 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); 425 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
487 h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg); 426 h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
488 if (!h->client) 427 if (NULL == h->client)
489 { 428 {
490 LOG (GNUNET_ERROR_TYPE_ERROR, 429 /* missconfigured scalarproduct service */
491 _ ("Failed to connect to the scalarproduct service\n")); 430 GNUNET_break (0);
492 GNUNET_free (h); 431 GNUNET_free (h);
493 return NULL; 432 return NULL;
494 } 433 }
495 h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
496 if (!h->stats)
497 {
498 LOG (GNUNET_ERROR_TYPE_ERROR,
499 _ ("Failed to send a message to the statistics service\n"));
500 GNUNET_CLIENT_disconnect (h->client);
501 GNUNET_free (h);
502 return NULL;
503 }
504
505 h->element_count_total = element_count; 434 h->element_count_total = element_count;
506 size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message) + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element); 435 h->cont_datum = cont;
507 if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size) { 436 h->cont_cls = cont_cls;
437 h->response_proc = &process_result_message;
438 h->cfg = cfg;
439 h->key = *session_key;
440 size = sizeof (struct ComputationMessage)
441 + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element);
442 if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size)
443 {
508 possible = element_count; 444 possible = element_count;
509 h->element_count_transfered = element_count; 445 h->element_count_transfered = element_count;
510 } 446 }
511 else { 447 else
512 //create a multipart msg, first we calculate a new msg size for the head msg 448 {
513 possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct GNUNET_SCALARPRODUCT_computation_message)) / sizeof (struct GNUNET_SCALARPRODUCT_Element); 449 /* create a multipart msg, first we calculate a new msg size for the head msg */
450 possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct ComputationMessage))
451 / sizeof (struct GNUNET_SCALARPRODUCT_Element);
514 h->element_count_transfered = possible; 452 h->element_count_transfered = possible;
515 size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message) + possible*sizeof (struct GNUNET_SCALARPRODUCT_Element); 453 size = sizeof (struct ComputationMessage)
516 h->elements = (struct GNUNET_SCALARPRODUCT_Element*) 454 + possible * sizeof (struct GNUNET_SCALARPRODUCT_Element);
517 GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count); 455 h->elements = GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count);
518 memcpy (h->elements, elements, sizeof (struct GNUNET_SCALARPRODUCT_Element) * element_count); 456 memcpy (h->elements,
457 elements,
458 sizeof (struct GNUNET_SCALARPRODUCT_Element) * element_count);
519 } 459 }
520
521 h->cont_datum = cont;
522 h->cont_cls = cont_cls;
523 h->response_proc = &process_result_message;
524 h->cfg = cfg;
525 memcpy (&h->key, session_key, sizeof (struct GNUNET_HashCode));
526 460
527 msg = (struct GNUNET_SCALARPRODUCT_computation_message*) GNUNET_malloc (size); 461 msg = GNUNET_malloc (size);
528 h->msg = msg; 462 h->msg = &msg->header;
529 msg->header.size = htons (size); 463 msg->header.size = htons (size);
530 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE); 464 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
531 msg->element_count_total = htonl (element_count); 465 msg->element_count_total = htonl (element_count);
532 msg->element_count_contained = htonl (possible); 466 msg->element_count_contained = htonl (possible);
533 467 msg->reserved = htonl (0);
534 memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity)); 468 msg->peer = *peer;
535 memcpy (&msg->session_key, session_key, sizeof (struct GNUNET_HashCode)); 469 msg->session_key = *session_key;
536 memcpy (&msg[1], elements, sizeof (struct GNUNET_SCALARPRODUCT_Element) * possible); 470 memcpy (&msg[1],
537 471 elements,
472 sizeof (struct GNUNET_SCALARPRODUCT_Element) * possible);
538 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, 473 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
539 GNUNET_TIME_UNIT_FOREVER_REL, 474 GNUNET_TIME_UNIT_FOREVER_REL,
540 GNUNET_YES, // retry is OK in the initial stage 475 GNUNET_YES, /* retry is OK in the initial stage */
541 &do_send_message, h); 476 &do_send_message, h);
542 if (!h->th) 477 GNUNET_assert (NULL != h->th);
543 {
544 LOG (GNUNET_ERROR_TYPE_ERROR,
545 _ ("Failed to send a message to the scalarproduct service\n"));
546 GNUNET_STATISTICS_update (h->stats,
547 gettext_noop ("# transmission request failures"),
548 1, GNUNET_NO);
549 GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
550 GNUNET_CLIENT_disconnect (h->client);
551 GNUNET_free (h->msg);
552 GNUNET_free_non_null (h->elements);
553 GNUNET_free (h);
554 return NULL;
555 }
556 return h; 478 return h;
557} 479}
558 480
481
559/** 482/**
560 * Cancel an ongoing computation or revoke our collaboration offer. 483 * Cancel an ongoing computation or revoke our collaboration offer.
561 * Closes the connection to the service 484 * Closes the connection to the service
@@ -563,16 +486,20 @@ GNUNET_SCALARPRODUCT_start_computation (const struct GNUNET_CONFIGURATION_Handle
563 * @param h computation handle to terminate 486 * @param h computation handle to terminate
564 */ 487 */
565void 488void
566GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h) 489GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle *h)
567{ 490{
568 if (NULL != h->th) 491 if (NULL != h->th)
492 {
569 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); 493 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
570 if (GNUNET_SCHEDULER_NO_TASK != h->cont_multipart) 494 h->th = NULL;
571 GNUNET_SCHEDULER_cancel (h->cont_multipart); 495 }
572 GNUNET_free_non_null (h->elements); 496 GNUNET_free_non_null (h->elements);
573 GNUNET_free_non_null (h->msg); 497 GNUNET_free_non_null (h->msg);
574 GNUNET_CLIENT_disconnect (h->client); 498 if (NULL != h->client)
575 GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES); 499 {
500 GNUNET_CLIENT_disconnect (h->client);
501 h->client = NULL;
502 }
576 GNUNET_free (h); 503 GNUNET_free (h);
577} 504}
578 505