aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Fuchs <christian.fuchs@cfuchs.net>2014-05-07 08:21:49 +0000
committerChristian Fuchs <christian.fuchs@cfuchs.net>2014-05-07 08:21:49 +0000
commit8a2d1e6aedbf1bc95052e63ac67093b89385b0a1 (patch)
tree993d5f09035ce90f8f9a2d50cdcc417631d88227 /src
parent3a658d6f0923662f193a49af8aae52a83aafae1c (diff)
downloadgnunet-8a2d1e6aedbf1bc95052e63ac67093b89385b0a1.tar.gz
gnunet-8a2d1e6aedbf1bc95052e63ac67093b89385b0a1.zip
WIP commit of scalar product 2.0. It is unfinished and does not yet pass tests. This commit happens because of the move from mesh->cadet.
- reverted SP API back to the original design from 2012 (using key-value pairs) - SP now uses set intersection to determine common elements from the set provided by the user - values are sorted based on their keys after intersection - removed state tracking, as simple tracking is now insufficient. Just checking for conditions directly is easier readable and less buggy - modified/renamed SP message types to reflect the changed behavior of SP
Diffstat (limited to 'src')
-rw-r--r--src/include/gnunet_protocols.h36
-rw-r--r--src/include/gnunet_scalarproduct_service.h52
-rw-r--r--src/scalarproduct/gnunet-scalarproduct.c219
-rw-r--r--src/scalarproduct/gnunet-service-scalarproduct.c1631
-rw-r--r--src/scalarproduct/scalarproduct.h67
-rw-r--r--src/scalarproduct/scalarproduct_api.c335
-rw-r--r--src/set/gnunet-service-set_intersection.c2
7 files changed, 1286 insertions, 1056 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 93965fd9c..3c04939ce 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2056,39 +2056,51 @@ extern "C"
2056 ******************************************************************************/ 2056 ******************************************************************************/
2057 2057
2058/** 2058/**
2059 * Client -> Vector-Product Service request message 2059 * Client -> Alice
2060 */ 2060 */
2061#define GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE 640 2061#define GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE 640
2062 2062
2063/** 2063/**
2064 * Client -> Vector-Product Service request message 2064 * Client -> Bob
2065 */ 2065 */
2066#define GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB 641 2066#define GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB 641
2067 2067
2068/** 2068/**
2069 * Vector-Product Service request -> remote VP Service 2069 * Client -> Alice/Bob multipart
2070 */ 2070 */
2071#define GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_TO_BOB 642 2071#define GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART 642
2072 2072
2073/** 2073/**
2074 * Vector-Product Service request -> remote VP Service Multipart 2074 * Alice -> Bob session initialization
2075 */ 2075 */
2076#define GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_TO_BOB_MULTIPART 643 2076#define GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SESSION_INITIALIZATION 643
2077 2077
2078/** 2078/**
2079 * remote Vector-Product Service response -> requesting VP Service 2079 * Alice -> Bob SP crypto-data (after intersection)
2080 */ 2080 */
2081#define GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_TO_ALICE 644 2081#define GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA 644
2082 2082
2083/** 2083/**
2084 * remote Vector-Product Service response -> requesting VP Service Multipart 2084 * Alice -> Bob SP crypto-data multipart
2085 */ 2085 */
2086#define GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_TO_ALICE_MULTIPART 645 2086#define GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA_MULTIPART 645
2087 2087
2088/** 2088/**
2089 * Vector-Product Service response -> Client 2089 * Bob -> Alice SP crypto-data
2090 */ 2090 */
2091#define GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT 646 2091#define GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA 646
2092
2093/**
2094 * Bob -> Alice SP crypto-data multipart
2095 */
2096#define GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA_MULTIPART 647
2097
2098/**
2099 * Alice/Bob -> Client Result
2100 */
2101#define GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT 648
2102
2103
2092 2104
2093 2105
2094/******************************************************************************* 2106/*******************************************************************************
diff --git a/src/include/gnunet_scalarproduct_service.h b/src/include/gnunet_scalarproduct_service.h
index 7a1eec91a..63bc29f0e 100644
--- a/src/include/gnunet_scalarproduct_service.h
+++ b/src/include/gnunet_scalarproduct_service.h
@@ -39,7 +39,7 @@ extern "C" {
39/** 39/**
40 * Version of the scalarproduct API. 40 * Version of the scalarproduct API.
41 */ 41 */
42#define GNUNET_SCALARPRODUCT_VERSION 0x00000042 42#define GNUNET_SCALARPRODUCT_VERSION 0x00000043
43 43
44enum GNUNET_SCALARPRODUCT_ResponseStatus 44enum GNUNET_SCALARPRODUCT_ResponseStatus
45{ 45{
@@ -55,6 +55,14 @@ enum GNUNET_SCALARPRODUCT_ResponseStatus
55 */ 55 */
56struct GNUNET_SCALARPRODUCT_Handle; 56struct GNUNET_SCALARPRODUCT_Handle;
57 57
58/**
59 * An element key-value pair for scalarproduct
60 */
61struct GNUNET_SCALARPRODUCT_Element {
62 int32_t value;
63 struct GNUNET_HashCode key;
64};
65
58 66
59/** 67/**
60 * Continuation called to notify client about result of the 68 * Continuation called to notify client about result of the
@@ -87,24 +95,21 @@ struct GNUNET_SCALARPRODUCT_ComputationHandle;
87 * Request by Alice's client for computing a scalar product 95 * Request by Alice's client for computing a scalar product
88 * 96 *
89 * @param cfg the gnunet configuration handle 97 * @param cfg the gnunet configuration handle
90 * @param key Session key should be unique to the requesting client 98 * @param session_key Session key should be unique to the requesting client
91 * @param peer PeerID of the other peer 99 * @param peer PeerID of the other peer
92 * @param elements Array of elements of the vector 100 * @param elements Array of elements of the vector
93 * @param element_count Number of elements in the vector 101 * @param element_count Number of elements in the vector
94 * @param mask Array of the mask
95 * @param mask_bytes number of bytes in the mask
96 * @param cont Callback function 102 * @param cont Callback function
97 * @param cont_cls Closure for @a cont 103 * @param cont_cls Closure for the callback function
104 *
98 * @return a new handle for this computation 105 * @return a new handle for this computation
99 */ 106 */
100struct GNUNET_SCALARPRODUCT_ComputationHandle * 107struct GNUNET_SCALARPRODUCT_ComputationHandle *
101GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, 108GNUNET_SCALARPRODUCT_start_computation (const struct GNUNET_CONFIGURATION_Handle * cfg,
102 const struct GNUNET_HashCode *key, 109 const struct GNUNET_HashCode * session_key,
103 const struct GNUNET_PeerIdentity *peer, 110 const struct GNUNET_PeerIdentity *peer,
104 const int32_t *elements, 111 const struct GNUNET_SCALARPRODUCT_Element * elements,
105 uint32_t element_count, 112 uint32_t element_count,
106 const unsigned char *mask,
107 uint32_t mask_bytes,
108 GNUNET_SCALARPRODUCT_DatumProcessor cont, 113 GNUNET_SCALARPRODUCT_DatumProcessor cont,
109 void * cont_cls); 114 void * cont_cls);
110 115
@@ -112,20 +117,21 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
112 * Used by Bob's client to cooperate with Alice, 117 * Used by Bob's client to cooperate with Alice,
113 * 118 *
114 * @param cfg the gnunet configuration handle 119 * @param cfg the gnunet configuration handle
115 * @param key Session key unique to the requesting client 120 * @param session_key Session key unique to the requesting client
116 * @param elements Array of elements of the vector 121 * @param elements Array of elements of the vector
117 * @param element_count Number of elements in the vector 122 * @param element_count Number of elements in the vector
118 * @param cont Callback function 123 * @param cont Callback function
119 * @param cont_cls Closure for @a cont 124 * @param cont_cls Closure for the callback function
125 *
120 * @return a new handle for this computation 126 * @return a new handle for this computation
121 */ 127 */
122struct GNUNET_SCALARPRODUCT_ComputationHandle * 128struct GNUNET_SCALARPRODUCT_ComputationHandle *
123GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, 129GNUNET_SCALARPRODUCT_accept_computation (const struct GNUNET_CONFIGURATION_Handle * cfg,
124 const struct GNUNET_HashCode *key, 130 const struct GNUNET_HashCode * key,
125 const int32_t *elements, 131 const struct GNUNET_SCALARPRODUCT_Element * elements,
126 uint32_t element_count, 132 uint32_t element_count,
127 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont, 133 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
128 void *cont_cls); 134 void * cont_cls);
129 135
130 136
131/** 137/**
@@ -137,20 +143,6 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
137void 143void
138GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle *h); 144GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle *h);
139 145
140
141/**
142 * Cancel ALL ongoing computation or revoke our collaboration offer.
143 * Closes ALL connections to the service
144 *
145 * FIXME: this should take an argument, and we should
146 * have an explicit 'connect' API which returns an opaque
147 * connection handle. Avoid (globals) in the library!
148 * @deprecated in this form
149 */
150void
151GNUNET_SCALARPRODUCT_disconnect ();
152
153
154#if 0 /* keep Emacsens' auto-indent happy */ 146#if 0 /* keep Emacsens' auto-indent happy */
155{ 147{
156#endif 148#endif
diff --git a/src/scalarproduct/gnunet-scalarproduct.c b/src/scalarproduct/gnunet-scalarproduct.c
index a93902640..2bb708fc9 100644
--- a/src/scalarproduct/gnunet-scalarproduct.c
+++ b/src/scalarproduct/gnunet-scalarproduct.c
@@ -44,12 +44,12 @@ struct ScalarProductCallbackClosure
44 /** 44 /**
45 * the session key identifying this computation 45 * the session key identifying this computation
46 */ 46 */
47 struct GNUNET_HashCode key; 47 struct GNUNET_HashCode session_key;
48 48
49 /** 49 /**
50 * PeerID we want to compute a scalar product with 50 * PeerID we want to compute a scalar product with
51 */ 51 */
52 struct GNUNET_PeerIdentity peer; 52 struct GNUNET_PeerIdentity peer_id;
53}; 53};
54 54
55/** 55/**
@@ -60,7 +60,7 @@ static char *input_peer_id;
60/** 60/**
61 * Option -p: destination peer identity for checking message-ids with 61 * Option -p: destination peer identity for checking message-ids with
62 */ 62 */
63static char *input_key; 63static char *input_session_key;
64 64
65/** 65/**
66 * Option -e: vector to calculate a scalarproduct with 66 * Option -e: vector to calculate a scalarproduct with
@@ -68,15 +68,14 @@ static char *input_key;
68static char *input_elements; 68static char *input_elements;
69 69
70/** 70/**
71 * Option -m: message-ids to calculate a scalarproduct with
72 */
73static char *input_mask;
74
75/**
76 * Global return value 71 * Global return value
77 */ 72 */
78static int ret = -1; 73static int ret = -1;
79 74
75/**
76 * our Scalarproduct Computation handle
77 */
78struct GNUNET_SCALARPRODUCT_ComputationHandle * computation;
80 79
81/** 80/**
82 * Callback called if we are initiating a new computation session 81 * Callback called if we are initiating a new computation session
@@ -96,29 +95,30 @@ responder_callback (void *cls,
96 ret = 0; 95 ret = 0;
97 LOG (GNUNET_ERROR_TYPE_INFO, 96 LOG (GNUNET_ERROR_TYPE_INFO,
98 "Session %s concluded.\n", 97 "Session %s concluded.\n",
99 GNUNET_h2s (&closure->key)); 98 GNUNET_h2s (&closure->session_key));
100 break; 99 break;
101 case GNUNET_SCALARPRODUCT_Status_InvalidResponse: 100 case GNUNET_SCALARPRODUCT_Status_InvalidResponse:
102 LOG (GNUNET_ERROR_TYPE_ERROR, 101 LOG (GNUNET_ERROR_TYPE_ERROR,
103 "Session %s failed: invalid response\n", 102 "Session %s failed: invalid response\n",
104 GNUNET_h2s (&closure->key)); 103 GNUNET_h2s (&closure->session_key));
105 break; 104 break;
106 case GNUNET_SCALARPRODUCT_Status_Failure: 105 case GNUNET_SCALARPRODUCT_Status_Failure:
107 LOG (GNUNET_ERROR_TYPE_ERROR, 106 LOG (GNUNET_ERROR_TYPE_ERROR,
108 "Session %s failed: service failure\n", 107 "Session %s failed: service failure\n",
109 GNUNET_h2s (&closure->key)); 108 GNUNET_h2s (&closure->session_key));
110 break; 109 break;
111 case GNUNET_SCALARPRODUCT_Status_ServiceDisconnected: 110 case GNUNET_SCALARPRODUCT_Status_ServiceDisconnected:
112 LOG (GNUNET_ERROR_TYPE_ERROR, 111 LOG (GNUNET_ERROR_TYPE_ERROR,
113 "Session %s failed: service disconnect!\n", 112 "Session %s failed: service disconnect!\n",
114 GNUNET_h2s (&closure->key)); 113 GNUNET_h2s (&closure->session_key));
115 break; 114 break;
116 default: 115 default:
117 LOG (GNUNET_ERROR_TYPE_ERROR, 116 LOG (GNUNET_ERROR_TYPE_ERROR,
118 "Session %s failed: return code %d\n", 117 "Session %s failed: return code %d\n",
119 GNUNET_h2s (&closure->key), 118 GNUNET_h2s (&closure->session_key),
120 status); 119 status);
121 } 120 }
121 computation = NULL;
122 GNUNET_SCHEDULER_shutdown(); 122 GNUNET_SCHEDULER_shutdown();
123} 123}
124 124
@@ -155,28 +155,29 @@ requester_callback (void *cls,
155 case GNUNET_SCALARPRODUCT_Status_InvalidResponse: 155 case GNUNET_SCALARPRODUCT_Status_InvalidResponse:
156 LOG (GNUNET_ERROR_TYPE_ERROR, 156 LOG (GNUNET_ERROR_TYPE_ERROR,
157 "Session %s with peer %s failed: invalid response received\n", 157 "Session %s with peer %s failed: invalid response received\n",
158 GNUNET_h2s (&closure->key), 158 GNUNET_h2s (&closure->session_key),
159 GNUNET_i2s (&closure->peer)); 159 GNUNET_i2s (&closure->peer_id));
160 break; 160 break;
161 case GNUNET_SCALARPRODUCT_Status_Failure: 161 case GNUNET_SCALARPRODUCT_Status_Failure:
162 LOG (GNUNET_ERROR_TYPE_ERROR, 162 LOG (GNUNET_ERROR_TYPE_ERROR,
163 "Session %s with peer %s failed: API failure\n", 163 "Session %s with peer %s failed: API failure\n",
164 GNUNET_h2s (&closure->key), 164 GNUNET_h2s (&closure->session_key),
165 GNUNET_i2s (&closure->peer)); 165 GNUNET_i2s (&closure->peer_id));
166 break; 166 break;
167 case GNUNET_SCALARPRODUCT_Status_ServiceDisconnected: 167 case GNUNET_SCALARPRODUCT_Status_ServiceDisconnected:
168 LOG (GNUNET_ERROR_TYPE_ERROR, 168 LOG (GNUNET_ERROR_TYPE_ERROR,
169 "Session %s with peer %s was disconnected from service.\n", 169 "Session %s with peer %s was disconnected from service.\n",
170 GNUNET_h2s (&closure->key), 170 GNUNET_h2s (&closure->session_key),
171 GNUNET_i2s (&closure->peer)); 171 GNUNET_i2s (&closure->peer_id));
172 break; 172 break;
173 default: 173 default:
174 LOG (GNUNET_ERROR_TYPE_ERROR, 174 LOG (GNUNET_ERROR_TYPE_ERROR,
175 "Session %s with peer %s failed: return code %d\n", 175 "Session %s with peer %s failed: return code %d\n",
176 GNUNET_h2s (&closure->key), 176 GNUNET_h2s (&closure->session_key),
177 GNUNET_i2s (&closure->peer), 177 GNUNET_i2s (&closure->peer_id),
178 status); 178 status);
179 } 179 }
180 computation = NULL;
180 GNUNET_SCHEDULER_shutdown(); 181 GNUNET_SCHEDULER_shutdown();
181} 182}
182 183
@@ -191,7 +192,8 @@ static void
191shutdown_task (void *cls, 192shutdown_task (void *cls,
192 const struct GNUNET_SCHEDULER_TaskContext *tc) 193 const struct GNUNET_SCHEDULER_TaskContext *tc)
193{ 194{
194 GNUNET_SCALARPRODUCT_disconnect (); 195 if (NULL != computation)
196 GNUNET_SCALARPRODUCT_cancel(computation);
195} 197}
196 198
197 199
@@ -211,11 +213,8 @@ run (void *cls,
211{ 213{
212 char *begin = input_elements; 214 char *begin = input_elements;
213 char *end; 215 char *end;
214 int32_t element;
215 int i; 216 int i;
216 int32_t *elements; 217 struct GNUNET_SCALARPRODUCT_Element * elements;
217 unsigned char *mask;
218 uint32_t mask_bytes;
219 uint32_t element_count = 0; 218 uint32_t element_count = 0;
220 struct ScalarProductCallbackClosure * closure; 219 struct ScalarProductCallbackClosure * closure;
221 220
@@ -226,27 +225,27 @@ run (void *cls,
226 return; 225 return;
227 } 226 }
228 227
229 if (NULL == input_key) 228 if (NULL == input_session_key)
230 { 229 {
231 LOG (GNUNET_ERROR_TYPE_ERROR, 230 LOG (GNUNET_ERROR_TYPE_ERROR,
232 _ ("This program needs a session identifier for comparing vectors.\n")); 231 _ ("This program needs a session identifier for comparing vectors.\n"));
233 return; 232 return;
234 } 233 }
235 234
236 if (1 > strnlen (input_key, sizeof (struct GNUNET_HashCode))) 235 if (1 > strnlen (input_session_key, sizeof (struct GNUNET_HashCode)))
237 { 236 {
238 LOG (GNUNET_ERROR_TYPE_ERROR, 237 LOG (GNUNET_ERROR_TYPE_ERROR,
239 _ ("Please give a session key for --input_key!\n")); 238 _ ("Please give a session key for --input_key!\n"));
240 return; 239 return;
241 } 240 }
242 closure = GNUNET_new (struct ScalarProductCallbackClosure); 241 closure = GNUNET_new (struct ScalarProductCallbackClosure);
243 GNUNET_CRYPTO_hash (input_key, strlen (input_key), &closure->key); 242 GNUNET_CRYPTO_hash (input_session_key, strlen (input_session_key), &closure->session_key);
244 243
245 if (input_peer_id && 244 if (input_peer_id &&
246 (GNUNET_OK != 245 (GNUNET_OK !=
247 GNUNET_CRYPTO_eddsa_public_key_from_string (input_peer_id, 246 GNUNET_CRYPTO_eddsa_public_key_from_string (input_peer_id,
248 strlen (input_peer_id), 247 strlen (input_peer_id),
249 (struct GNUNET_CRYPTO_EddsaPublicKey *) &closure->peer))) { 248 (struct GNUNET_CRYPTO_EddsaPublicKey *) &closure->peer_id))) {
250 LOG (GNUNET_ERROR_TYPE_ERROR, 249 LOG (GNUNET_ERROR_TYPE_ERROR,
251 _ ("Tried to set initiator mode, as peer ID was given. " 250 _ ("Tried to set initiator mode, as peer ID was given. "
252 "However, `%s' is not a valid peer identifier.\n"), 251 "However, `%s' is not a valid peer identifier.\n"),
@@ -254,136 +253,75 @@ run (void *cls,
254 return; 253 return;
255 } 254 }
256 255
257 /* Count input_elements_peer1, and put in elements_peer1 array */ 256 for (end = begin; 0 != *end; end++)
258 do 257 if (*end == ';')
259 {
260 // get the length of the current element
261 for (end = begin; *end && *end != ','; end++);
262
263 if (0 == *begin)
264 {
265 break;
266 }
267 else if (1 == sscanf (begin, "%" SCNd32 ",", &element))
268 {
269 //element in the middle
270 element_count++; 258 element_count++;
271 begin = end; 259 if (0 == element_count) {
272 if (',' == *end)
273 begin += 1;
274 }
275 else
276 {
277 LOG (GNUNET_ERROR_TYPE_ERROR,
278 _ ("Could not convert `%s' to int32_t.\n"), begin);
279 return;
280 }
281 }
282 while (1);
283 if (0 == element_count)
284 {
285 LOG (GNUNET_ERROR_TYPE_ERROR, 260 LOG (GNUNET_ERROR_TYPE_ERROR,
286 _ ("Need elements to compute the vectorproduct, got none.\n")); 261 _ ("Need elements to compute the vectorproduct, got none.\n"));
287 return; 262 return;
288 } 263 }
289 264
290 begin = input_elements; 265 elements = (struct GNUNET_SCALARPRODUCT_Element *)
291 elements = GNUNET_malloc (sizeof (int32_t) * element_count); 266 GNUNET_malloc(sizeof(struct GNUNET_SCALARPRODUCT_Element)*element_count);
292 element_count = 0; 267
293 /* Read input_elements_peer1, and put in elements_peer1 array */ 268 for (i = 0; i < element_count;i++)
294 do
295 { 269 {
296 // get the length of the current element 270 struct GNUNET_SCALARPRODUCT_Element element;
297 for (end = begin; *end && *end != ','; end++); 271 char* separator=NULL;
298 272
299 if (0 == *begin) 273 // get the length of the current key,value; tupel
300 { 274 for (end = begin; *end != ';'; end++)
301 break; 275 if (*end == ',')
276 separator = end;
277
278 // final element
279 if ((NULL == separator)
280 || (begin == separator)
281 || (separator == end - 1 )) {
282 LOG (GNUNET_ERROR_TYPE_ERROR,
283 _ ("Malformed input, could not parse `%s'\n"), begin);
284 GNUNET_free(elements);
285 return;
302 } 286 }
303 else if (1 == sscanf (begin, "%" SCNd32 ",", &elements[element_count])) 287
304 { 288 // read the element's key
305 //element in the middle 289 *separator = 0;
306 element_count++; 290 GNUNET_CRYPTO_hash (begin, strlen (begin), &element.key);
307 begin = end; 291
308 if (',' == *end) 292 // read the element's value
309 begin += 1; 293 if (1 != sscanf (separator+1, "%" SCNd32 ";", &element.value))
310 }
311 else
312 { 294 {
313 LOG (GNUNET_ERROR_TYPE_ERROR, 295 LOG (GNUNET_ERROR_TYPE_ERROR,
314 _ ("Could not convert `%s' to int32_t.\n"), begin); 296 _ ("Could not convert `%s' to int32_t.\n"), begin);
297 GNUNET_free(elements);
315 return; 298 return;
316 } 299 }
300
301 elements[i]=element;
302 begin = end+1;
317 } 303 }
318 while (1);
319
320 mask_bytes = element_count / 8 + ( (element_count % 8) ? 1 : 0);
321 mask = GNUNET_malloc ((element_count / 8) + 1);
322 304
323 /* Read input_mask_peer1 and read in mask_peer1 array */ 305 if (((NULL != input_peer_id) &&
324 if ((NULL != input_peer_id) && (NULL != input_mask)) 306 (NULL == (computation = GNUNET_SCALARPRODUCT_start_computation (cfg,
325 { 307 &closure->session_key,
326 begin = input_mask; 308 &closure->peer_id,
327 unsigned short mask_count = 0;
328
329 do
330 {
331 // get the length of the current element and replace , with null
332 for (end = begin; *end && *end != ','; end++);
333
334 if (1 == sscanf (begin, "%" SCNd32 ",", &element))
335 {
336 //element in the middle
337 begin = end + 1;
338 }
339 else if (*begin == 0)
340 {
341 break;
342 }
343 else
344 {
345 LOG (GNUNET_ERROR_TYPE_ERROR,
346 _ ("Could not convert `%s' to integer.\n"), begin);
347 return;
348 }
349
350 if (element)
351 mask[mask_count / 8] = mask[mask_count / 8] | 1 << (mask_count % 8);
352 mask_count++;
353 }
354 while (mask_count < element_count);
355 }
356 else if (NULL != input_peer_id)
357 for (i = 0; i <= mask_bytes; i++)
358 mask[i] = UCHAR_MAX; // all 1's
359
360 if (input_peer_id &&
361 (NULL == GNUNET_SCALARPRODUCT_request (cfg,
362 &closure->key,
363 &closure->peer,
364 elements, element_count, 309 elements, element_count,
365 mask, mask_bytes,
366 &requester_callback, 310 &requester_callback,
367 (void *) &closure))) 311 (void *) &closure))))
368 { 312 ||
369 GNUNET_free (elements); 313 ((NULL == input_peer_id) &&
370 GNUNET_free (mask); 314 (NULL == (computation = GNUNET_SCALARPRODUCT_accept_computation (cfg,
371 return; 315 &closure->session_key,
372 }
373
374 if ((NULL == input_peer_id) &&
375 (NULL == GNUNET_SCALARPRODUCT_response (cfg,
376 &closure->key,
377 elements, element_count, 316 elements, element_count,
378 &responder_callback, 317 &responder_callback,
379 (void *) &closure))) 318 (void *) &closure)))))
380 { 319 {
381 GNUNET_free (elements); 320 GNUNET_free (elements);
382 GNUNET_free (mask);
383 return; 321 return;
384 } 322 }
323
385 GNUNET_free (elements); 324 GNUNET_free (elements);
386 GNUNET_free (mask);
387 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, 325 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
388 &shutdown_task, 326 &shutdown_task,
389 NULL); 327 NULL);
@@ -403,18 +341,15 @@ int
403main (int argc, char *const *argv) 341main (int argc, char *const *argv)
404{ 342{
405 static const struct GNUNET_GETOPT_CommandLineOption options[] = { 343 static const struct GNUNET_GETOPT_CommandLineOption options[] = {
406 {'e', "elements", "\"val1,val2,...,valn\"", 344 {'e', "elements", "\"key1,val1;key2,val2;...,keyn,valn;\"",
407 gettext_noop ("A comma separated list of elements to compare as vector with our remote peer."), 345 gettext_noop ("A comma separated list of elements to compare as vector with our remote peer."),
408 1, &GNUNET_GETOPT_set_string, &input_elements}, 346 1, &GNUNET_GETOPT_set_string, &input_elements},
409 {'m', "mask", "\"0,1,...,maskn\"",
410 gettext_noop ("A comma separated mask to select which elements should actually be compared."),
411 1, &GNUNET_GETOPT_set_string, &input_mask},
412 {'p', "peer", "PEERID", 347 {'p', "peer", "PEERID",
413 gettext_noop ("[Optional] peer to calculate our scalarproduct with. If this parameter is not given, the service will wait for a remote peer to compute the request."), 348 gettext_noop ("[Optional] peer to calculate our scalarproduct with. If this parameter is not given, the service will wait for a remote peer to compute the request."),
414 1, &GNUNET_GETOPT_set_string, &input_peer_id}, 349 1, &GNUNET_GETOPT_set_string, &input_peer_id},
415 {'k', "key", "TRANSACTION_ID", 350 {'k', "key", "TRANSACTION_ID",
416 gettext_noop ("Transaction ID shared with peer."), 351 gettext_noop ("Transaction ID shared with peer."),
417 1, &GNUNET_GETOPT_set_string, &input_key}, 352 1, &GNUNET_GETOPT_set_string, &input_session_key},
418 GNUNET_GETOPT_OPTION_END 353 GNUNET_GETOPT_OPTION_END
419 }; 354 };
420 355
diff --git a/src/scalarproduct/gnunet-service-scalarproduct.c b/src/scalarproduct/gnunet-service-scalarproduct.c
index d7407e3ae..75881afed 100644
--- a/src/scalarproduct/gnunet-service-scalarproduct.c
+++ b/src/scalarproduct/gnunet-service-scalarproduct.c
@@ -32,6 +32,7 @@
32#include "gnunet_applications.h" 32#include "gnunet_applications.h"
33#include "gnunet_protocols.h" 33#include "gnunet_protocols.h"
34#include "gnunet_scalarproduct_service.h" 34#include "gnunet_scalarproduct_service.h"
35#include "gnunet_set_service.h"
35#include "scalarproduct.h" 36#include "scalarproduct.h"
36 37
37#define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct", __VA_ARGS__) 38#define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct", __VA_ARGS__)
@@ -42,23 +43,6 @@
42 43
43 44
44/** 45/**
45 * state a session can be in
46 */
47enum SessionState
48{
49 CLIENT_REQUEST_RECEIVED,
50 WAITING_FOR_BOBS_CONNECT,
51 CLIENT_RESPONSE_RECEIVED,
52 WAITING_FOR_SERVICE_REQUEST,
53 WAITING_FOR_MULTIPART_TRANSMISSION,
54 WAITING_FOR_SERVICE_RESPONSE,
55 SERVICE_REQUEST_RECEIVED,
56 SERVICE_RESPONSE_RECEIVED,
57 FINALIZED
58};
59
60
61/**
62 * role a peer in a session can assume 46 * role a peer in a session can assume
63 */ 47 */
64enum PeerRole 48enum PeerRole
@@ -68,6 +52,15 @@ enum PeerRole
68}; 52};
69 53
70 54
55struct SortedValue
56{
57 struct SortedValue * next;
58 struct SortedValue * prev;
59 struct GNUNET_SCALARPRODUCT_Element * elem;
60 gcry_mpi_t val;
61};
62
63
71/** 64/**
72 * A scalarproduct session which tracks: 65 * A scalarproduct session which tracks:
73 * 66 *
@@ -95,12 +88,7 @@ struct ServiceSession
95 /** 88 /**
96 * (hopefully) unique transaction ID 89 * (hopefully) unique transaction ID
97 */ 90 */
98 struct GNUNET_HashCode key; 91 struct GNUNET_HashCode session_id;
99
100 /**
101 * state of the session
102 */
103 enum SessionState state;
104 92
105 /** 93 /**
106 * Alice or Bob's peerID 94 * Alice or Bob's peerID
@@ -123,45 +111,61 @@ struct ServiceSession
123 uint32_t total; 111 uint32_t total;
124 112
125 /** 113 /**
126 * how many elements actually are used after applying the mask 114 * how many elements we used for intersection
127 */ 115 */
128 uint32_t used; 116 uint32_t intersected_elements_count;
129 117
130 /** 118 /**
131 * already transferred elements (sent/received) for multipart messages, less or equal than used_element_count for 119 * all non-0-value'd elements transmitted to us
132 */ 120 */
133 uint32_t transferred; 121 struct GNUNET_CONTAINER_MultiHashMap * intersected_elements;
134 122
135 /** 123 /**
136 * index of the last transferred element for multipart messages 124 * how many elements actually are used for the scalar product
137 */ 125 */
138 uint32_t last_processed; 126 uint32_t used_elements_count;
139 127
140 /** 128 /**
141 * how many bytes the mask is long. 129 * already transferred elements (sent/received) for multipart messages, less or equal than used_element_count for
142 * just for convenience so we don't have to re-re-re calculate it each time
143 */ 130 */
144 uint32_t mask_length; 131 uint32_t transferred_element_count;
145 132
146 /** 133 /**
147 * all the vector elements we received 134 * Set of elements for which will conduction an intersection.
135 * the resulting elements are then used for computing the scalar product.
148 */ 136 */
149 int32_t * vector; 137 struct GNUNET_SET_Handle * intersection_set;
150 138
151 /** 139 /**
152 * mask of which elements to check 140 * Set of elements for which will conduction an intersection.
141 * the resulting elements are then used for computing the scalar product.
153 */ 142 */
154 unsigned char * mask; 143 struct GNUNET_SET_OperationHandle * intersection_op;
144
145 /**
146 * Handle to Alice's Intersection operation listening for Bob
147 */
148 struct GNUNET_SET_ListenHandle * intersection_listen;
155 149
156 /** 150 /**
157 * Public key of the remote service, only used by bob 151 * Public key of the remote service, only used by bob
158 */ 152 */
159 struct GNUNET_CRYPTO_PaillierPublicKey * remote_pubkey; 153 struct GNUNET_CRYPTO_PaillierPublicKey remote_pubkey;
160 154
161 /** 155 /**
162 * ai(Alice) after applying the mask 156 * DLL for sorting elements after intersection
163 */ 157 */
164 gcry_mpi_t * a; 158 struct SortedValue * a_head;
159
160 /**
161 * a(Alice)
162 */
163 struct SortedValue * a_tail;
164
165 /**
166 * a(Alice)
167 */
168 gcry_mpi_t * sorted_elements;
165 169
166 /** 170 /**
167 * E(ai)(Bob) after applying the mask 171 * E(ai)(Bob) after applying the mask
@@ -217,11 +221,6 @@ struct ServiceSession
217 * Handle to a task that sends a msg to the our client 221 * Handle to a task that sends a msg to the our client
218 */ 222 */
219 GNUNET_SCHEDULER_TaskIdentifier client_notification_task; 223 GNUNET_SCHEDULER_TaskIdentifier client_notification_task;
220
221 /**
222 * Handle to a task that sends a msg to the our peer
223 */
224 GNUNET_SCHEDULER_TaskIdentifier service_request_task;
225}; 224};
226 225
227/////////////////////////////////////////////////////////////////////////////// 226///////////////////////////////////////////////////////////////////////////////
@@ -238,7 +237,7 @@ struct ServiceSession
238 * @param cls the associated service session 237 * @param cls the associated service session
239 */ 238 */
240static void 239static void
241prepare_service_request_multipart (void *cls); 240prepare_alices_cyrptodata_message_multipart (void *cls);
242 241
243/** 242/**
244 * Send a multi part chunk of a service response from bob to alice. 243 * Send a multi part chunk of a service response from bob to alice.
@@ -247,7 +246,7 @@ prepare_service_request_multipart (void *cls);
247 * @param cls the associated service session 246 * @param cls the associated service session
248 */ 247 */
249static void 248static void
250prepare_service_response_multipart (void *cls); 249prepare_bobs_cryptodata_message_multipart (void *cls);
251 250
252 251
253/////////////////////////////////////////////////////////////////////////////// 252///////////////////////////////////////////////////////////////////////////////
@@ -256,6 +255,11 @@ prepare_service_response_multipart (void *cls);
256 255
257 256
258/** 257/**
258 * Gnunet configuration handle
259 */
260const struct GNUNET_CONFIGURATION_Handle * cfg;
261
262/**
259 * Handle to the core service (NULL until we've connected to it). 263 * Handle to the core service (NULL until we've connected to it).
260 */ 264 */
261static struct GNUNET_MESH_Handle *my_mesh; 265static struct GNUNET_MESH_Handle *my_mesh;
@@ -350,7 +354,7 @@ compute_square_sum (gcry_mpi_t * vector, uint32_t length)
350 * usually are too complex to be handled in the callback itself. 354 * usually are too complex to be handled in the callback itself.
351 * clears a session-callback, if a session was handed over and the transmit handle was stored 355 * clears a session-callback, if a session was handed over and the transmit handle was stored
352 * 356 *
353 * @param cls the message object 357 * @param cls the session containing the message object
354 * @param size the size of the buffer we got 358 * @param size the size of the buffer we got
355 * @param buf the buffer to copy the message to 359 * @param buf the buffer to copy the message to
356 * @return 0 if we couldn't copy, else the size copied over 360 * @return 0 if we couldn't copy, else the size copied over
@@ -358,43 +362,42 @@ compute_square_sum (gcry_mpi_t * vector, uint32_t length)
358static size_t 362static size_t
359do_send_message (void *cls, size_t size, void *buf) 363do_send_message (void *cls, size_t size, void *buf)
360{ 364{
361 struct ServiceSession * session = cls; 365 struct ServiceSession * s = cls;
362 uint16_t type; 366 uint16_t type;
363 367
364 GNUNET_assert (buf); 368 GNUNET_assert (buf);
365 369
366 if (ntohs (session->msg->size) != size) { 370 if (ntohs (s->msg->size) != size) {
367 GNUNET_break (0); 371 GNUNET_break (0);
368 return 0; 372 return 0;
369 } 373 }
370 374
371 type = ntohs (session->msg->type); 375 type = ntohs (s->msg->type);
372 memcpy (buf, session->msg, size); 376 memcpy (buf, s->msg, size);
373 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 377 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
374 "Sent a message of type %hu.\n", 378 "Sent a message of type %hu.\n",
375 type); 379 type);
376 GNUNET_free (session->msg); 380 GNUNET_free (s->msg);
377 session->msg = NULL; 381 s->msg = NULL;
378 382
379 switch (type) 383 switch (type)
380 { 384 {
381 case GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT: 385 case GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT:
382 session->state = FINALIZED; 386 s->client_transmit_handle = NULL;
383 session->client_transmit_handle = NULL;
384 break; 387 break;
385 388
386 case GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_TO_BOB: 389 case GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA:
387 case GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_TO_BOB_MULTIPART: 390 case GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA_MULTIPART:
388 session->service_transmit_handle = NULL; 391 s->service_transmit_handle = NULL;
389 if (session->state == WAITING_FOR_MULTIPART_TRANSMISSION) 392 if (s->used_elements_count != s->transferred_element_count)
390 prepare_service_request_multipart (session); 393 prepare_alices_cyrptodata_message_multipart (s);
391 break; 394 break;
392 395
393 case GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_TO_ALICE: 396 case GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA:
394 case GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_TO_ALICE_MULTIPART: 397 case GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA_MULTIPART:
395 session->service_transmit_handle = NULL; 398 s->service_transmit_handle = NULL;
396 if (session->state == WAITING_FOR_MULTIPART_TRANSMISSION) 399 if (s->used_elements_count != s->transferred_element_count)
397 prepare_service_response_multipart (session); 400 prepare_bobs_cryptodata_message_multipart (s);
398 break; 401 break;
399 402
400 default: 403 default:
@@ -406,31 +409,12 @@ do_send_message (void *cls, size_t size, void *buf)
406 409
407 410
408/** 411/**
409 * initializes a new vector with fresh MPI values (=0) of a given length
410 *
411 * @param length of the vector to create
412 * @return the initialized vector, never NULL
413 */
414static gcry_mpi_t *
415initialize_mpi_vector (uint32_t length)
416{
417 uint32_t i;
418 gcry_mpi_t * output = GNUNET_malloc (sizeof (gcry_mpi_t) * length);
419
420 for (i = 0; i < length; i++)
421 GNUNET_assert (NULL != (output[i] = gcry_mpi_new (0)));
422 return output;
423}
424
425
426/**
427 * Finds a not terminated client/service session in the 412 * Finds a not terminated client/service session in the
428 * given DLL based on session key, element count and state. 413 * given DLL based on session key, element count and state.
429 * 414 *
430 * @param tail - the tail of the DLL 415 * @param tail - the tail of the DLL
431 * @param key - the key we want to search for 416 * @param key - the key we want to search for
432 * @param element_count - the total element count of the dataset (session->total) 417 * @param element_count - the total element count of the dataset (session->total)
433 * @param state - a pointer to the state the session should be in, NULL to ignore
434 * @param peerid - a pointer to the peer ID of the associated peer, NULL to ignore 418 * @param peerid - a pointer to the peer ID of the associated peer, NULL to ignore
435 * @return a pointer to a matching session, or NULL 419 * @return a pointer to a matching session, or NULL
436 */ 420 */
@@ -438,23 +422,19 @@ static struct ServiceSession *
438find_matching_session (struct ServiceSession * tail, 422find_matching_session (struct ServiceSession * tail,
439 const struct GNUNET_HashCode * key, 423 const struct GNUNET_HashCode * key,
440 uint32_t element_count, 424 uint32_t element_count,
441 enum SessionState * state,
442 const struct GNUNET_PeerIdentity * peerid) 425 const struct GNUNET_PeerIdentity * peerid)
443{ 426{
444 struct ServiceSession * curr; 427 struct ServiceSession * curr;
445 428
446 for (curr = tail; NULL != curr; curr = curr->prev) { 429 for (curr = tail; NULL != curr; curr = curr->prev) {
447 // if the key matches, and the element_count is same 430 // if the key matches, and the element_count is same
448 if ((!memcmp (&curr->key, key, sizeof (struct GNUNET_HashCode))) 431 if ((!memcmp (&curr->session_id, key, sizeof (struct GNUNET_HashCode)))
449 && (curr->total == element_count)) { 432 && (curr->total == element_count)) {
450 // if incoming state is NULL OR is same as state of the queued request 433 // if peerid is NULL OR same as the peer Id in the queued request
451 if ((NULL == state) || (curr->state == *state)) { 434 if ((NULL == peerid)
452 // if peerid is NULL OR same as the peer Id in the queued request 435 || (!memcmp (&curr->peer, peerid, sizeof (struct GNUNET_PeerIdentity))))
453 if ((NULL == peerid) 436 // matches and is not an already terminated session
454 || (!memcmp (&curr->peer, peerid, sizeof (struct GNUNET_PeerIdentity)))) 437 return curr;
455 // matches and is not an already terminated session
456 return curr;
457 }
458 } 438 }
459 } 439 }
460 440
@@ -470,21 +450,41 @@ find_matching_session (struct ServiceSession * tail,
470static void 450static void
471free_session_variables (struct ServiceSession * session) 451free_session_variables (struct ServiceSession * session)
472{ 452{
473 unsigned int i; 453 while (NULL != session->a_head) {
474 454 struct SortedValue * e = session->a_head;
475 if (session->a) { 455 GNUNET_free (e->elem);
476 for (i = 0; i < session->used; i++) 456 gcry_mpi_release (e->val);
477 if (session->a[i]) gcry_mpi_release (session->a[i]); 457 GNUNET_CONTAINER_DLL_remove (session->a_head, session->a_tail, e);
478 GNUNET_free (session->a); 458 GNUNET_free (e);
479 session->a = NULL;
480 } 459 }
481 if (session->e_a) { 460 if (session->e_a) {
482 GNUNET_free (session->e_a); 461 GNUNET_free (session->e_a);
483 session->e_a = NULL; 462 session->e_a = NULL;
484 } 463 }
485 if (session->mask) { 464 if (session->sorted_elements) {
486 GNUNET_free (session->mask); 465 GNUNET_free (session->sorted_elements);
487 session->mask = NULL; 466 session->sorted_elements = NULL;
467 }
468 if (session->intersected_elements) {
469 GNUNET_CONTAINER_multihashmap_destroy (session->intersected_elements);
470 //elements are freed independently in session->a_head/tail
471 session->intersected_elements = NULL;
472 }
473 if (session->intersection_listen) {
474 GNUNET_SET_listen_cancel (session->intersection_listen);
475 session->intersection_listen = NULL;
476 }
477 if (session->intersection_op) {
478 GNUNET_SET_operation_cancel (session->intersection_op);
479 session->intersection_op = NULL;
480 }
481 if (session->intersection_set) {
482 GNUNET_SET_destroy (session->intersection_set);
483 session->intersection_set = NULL;
484 }
485 if (session->msg) {
486 GNUNET_free (session->msg);
487 session->msg = NULL;
488 } 488 }
489 if (session->r) { 489 if (session->r) {
490 GNUNET_free (session->r); 490 GNUNET_free (session->r);
@@ -506,14 +506,6 @@ free_session_variables (struct ServiceSession * session)
506 gcry_mpi_release (session->product); 506 gcry_mpi_release (session->product);
507 session->product = NULL; 507 session->product = NULL;
508 } 508 }
509 if (session->remote_pubkey) {
510 GNUNET_free (session->remote_pubkey);
511 session->remote_pubkey = NULL;
512 }
513 if (session->vector) {
514 GNUNET_free_non_null (session->vector);
515 session->s = NULL;
516 }
517} 509}
518/////////////////////////////////////////////////////////////////////////////// 510///////////////////////////////////////////////////////////////////////////////
519// Event and Message Handlers 511// Event and Message Handlers
@@ -547,21 +539,17 @@ handle_client_disconnect (void *cls,
547 return; 539 return;
548 GNUNET_CONTAINER_DLL_remove (from_client_head, from_client_tail, session); 540 GNUNET_CONTAINER_DLL_remove (from_client_head, from_client_tail, session);
549 541
550 if (!(session->role == BOB && session->state == FINALIZED)) { 542 if (!(session->role == BOB && 0/*//TODO: if session concluded*/)) {
551 //we MUST terminate any client message underway 543 //we MUST terminate any client message underway
552 if (session->service_transmit_handle && session->channel) 544 if (session->service_transmit_handle && session->channel)
553 GNUNET_MESH_notify_transmit_ready_cancel (session->service_transmit_handle); 545 GNUNET_MESH_notify_transmit_ready_cancel (session->service_transmit_handle);
554 if (session->channel && session->state == WAITING_FOR_SERVICE_RESPONSE) 546 if (session->channel && 0/* //TODO: waiting for service response */)
555 GNUNET_MESH_channel_destroy (session->channel); 547 GNUNET_MESH_channel_destroy (session->channel);
556 } 548 }
557 if (GNUNET_SCHEDULER_NO_TASK != session->client_notification_task) { 549 if (GNUNET_SCHEDULER_NO_TASK != session->client_notification_task) {
558 GNUNET_SCHEDULER_cancel (session->client_notification_task); 550 GNUNET_SCHEDULER_cancel (session->client_notification_task);
559 session->client_notification_task = GNUNET_SCHEDULER_NO_TASK; 551 session->client_notification_task = GNUNET_SCHEDULER_NO_TASK;
560 } 552 }
561 if (GNUNET_SCHEDULER_NO_TASK != session->service_request_task) {
562 GNUNET_SCHEDULER_cancel (session->service_request_task);
563 session->service_request_task = GNUNET_SCHEDULER_NO_TASK;
564 }
565 if (NULL != session->client_transmit_handle) { 553 if (NULL != session->client_transmit_handle) {
566 GNUNET_SERVER_notify_transmit_ready_cancel (session->client_transmit_handle); 554 GNUNET_SERVER_notify_transmit_ready_cancel (session->client_transmit_handle);
567 session->client_transmit_handle = NULL; 555 session->client_transmit_handle = NULL;
@@ -590,13 +578,13 @@ prepare_client_end_notification (void * cls,
590 session->client_notification_task = GNUNET_SCHEDULER_NO_TASK; 578 session->client_notification_task = GNUNET_SCHEDULER_NO_TASK;
591 579
592 msg = GNUNET_new (struct GNUNET_SCALARPRODUCT_client_response); 580 msg = GNUNET_new (struct GNUNET_SCALARPRODUCT_client_response);
593 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT); 581 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT);
594 memcpy (&msg->key, &session->key, sizeof (struct GNUNET_HashCode)); 582 memcpy (&msg->key, &session->session_id, sizeof (struct GNUNET_HashCode));
595 memcpy (&msg->peer, &session->peer, sizeof ( struct GNUNET_PeerIdentity)); 583 memcpy (&msg->peer, &session->peer, sizeof ( struct GNUNET_PeerIdentity));
596 msg->header.size = htons (sizeof (struct GNUNET_SCALARPRODUCT_client_response)); 584 msg->header.size = htons (sizeof (struct GNUNET_SCALARPRODUCT_client_response));
597 // signal error if not signalized, positive result-range field but zero length. 585 // signal error if not signalized, positive result-range field but zero length.
598 msg->product_length = htonl (0); 586 msg->product_length = htonl (0);
599 msg->range = (session->state == FINALIZED) ? 0 : -1; 587 msg->range = (session /* //TODO: if finalized */) ? 0 : -1;
600 588
601 session->msg = &msg->header; 589 session->msg = &msg->header;
602 590
@@ -616,103 +604,77 @@ prepare_client_end_notification (void * cls,
616 GNUNET_free (msg); 604 GNUNET_free (msg);
617 } 605 }
618 else 606 else
619 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Sending session-end notification to client (%p) for session %s\n"), &session->client, GNUNET_h2s (&session->key)); 607 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Sending session-end notification to client (%p) for session %s\n"), &session->client, GNUNET_h2s (&session->session_id));
620 608
621 free_session_variables (session); 609 free_session_variables (session);
622} 610}
623 611
624 612
625/** 613/**
626 * prepare the response we will send to alice or bobs' clients. 614 * Executed by Alice, fills in a service-request message and sends it to the given peer
627 * in Bobs case the product will be NULL.
628 * 615 *
629 * @param cls the session associated with our client. 616 * @param cls the session associated with this request
630 * @param tc the task context handed to us by the scheduler, unused
631 */ 617 */
632static void 618static void
633prepare_client_response (void *cls, 619prepare_alices_cyrptodata_message (void *cls)
634 const struct GNUNET_SCHEDULER_TaskContext *tc)
635{ 620{
636 struct ServiceSession * session = cls; 621 struct ServiceSession * session = cls;
637 struct GNUNET_SCALARPRODUCT_client_response * msg; 622 struct GNUNET_SCALARPRODUCT_alices_cryptodata_message * msg;
638 unsigned char * product_exported = NULL; 623 struct GNUNET_CRYPTO_PaillierCiphertext * payload;
639 size_t product_length = 0; 624 unsigned int i;
640 uint32_t msg_length = 0; 625 uint32_t msg_length;
641 int8_t range = -1; 626 gcry_mpi_t a;
642 gcry_error_t rc;
643 int sign;
644
645 session->client_notification_task = GNUNET_SCHEDULER_NO_TASK;
646
647 if (session->product) {
648 gcry_mpi_t value = gcry_mpi_new (0);
649 627
650 sign = gcry_mpi_cmp_ui (session->product, 0); 628 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, _ ("Successfully created new channel to peer (%s)!\n"), GNUNET_i2s (&session->peer));
651 // libgcrypt can not handle a print of a negative number
652 // if (a->sign) return gcry_error (GPG_ERR_INTERNAL); /* Can't handle it yet. */
653 if (0 > sign) {
654 gcry_mpi_sub (value, value, session->product);
655 }
656 else if (0 < sign) {
657 range = 1;
658 gcry_mpi_add (value, value, session->product);
659 }
660 else
661 range = 0;
662 629
663 gcry_mpi_release (session->product); 630 msg_length = sizeof (struct GNUNET_SCALARPRODUCT_alices_cryptodata_message)
664 session->product = NULL; 631 +session->used_elements_count * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
665 632
666 // get representation as string 633 if (GNUNET_SERVER_MAX_MESSAGE_SIZE > msg_length) {
667 if (range 634 session->transferred_element_count = session->used_elements_count;
668 && (0 != (rc = gcry_mpi_aprint (GCRYMPI_FMT_STD, 635 }
669 &product_exported, 636 else {
670 &product_length, 637 //create a multipart msg, first we calculate a new msg size for the head msg
671 value)))) { 638 session->transferred_element_count = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct GNUNET_SCALARPRODUCT_alices_cryptodata_message))
672 LOG_GCRY (GNUNET_ERROR_TYPE_ERROR, "gcry_mpi_scan", rc); 639 / sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
673 product_length = 0; 640 msg_length = sizeof (struct GNUNET_SCALARPRODUCT_alices_cryptodata_message)
674 range = -1; // signal error with product-length = 0 and range = -1 641 +session->transferred_element_count * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
675 }
676 gcry_mpi_release (value);
677 } 642 }
678 643
679 msg_length = sizeof (struct GNUNET_SCALARPRODUCT_client_response) +product_length;
680 msg = GNUNET_malloc (msg_length); 644 msg = GNUNET_malloc (msg_length);
681 msg->key = session->key;
682 msg->peer = session->peer;
683 if (product_exported != NULL) {
684 memcpy (&msg[1], product_exported, product_length);
685 GNUNET_free (product_exported);
686 }
687 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT);
688 msg->header.size = htons (msg_length); 645 msg->header.size = htons (msg_length);
689 msg->range = range; 646 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA);
690 msg->product_length = htonl (product_length); 647 msg->contained_element_count = htonl (session->transferred_element_count);
648
649 // fill in the payload
650 payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
651
652 // now copy over the sorted element vector
653 a = gcry_mpi_new (0);
654 for (i = 0; i < session->transferred_element_count; i++) {
655 gcry_mpi_add (a, session->sorted_elements[i], my_offset);
656 GNUNET_CRYPTO_paillier_encrypt (&my_pubkey, a, 3, &payload[i]);
657 }
658 gcry_mpi_release (a);
691 659
692 session->msg = (struct GNUNET_MessageHeader *) msg; 660 session->msg = (struct GNUNET_MessageHeader *) msg;
693 //transmit this message to our client 661 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Transmitting service request.\n"));
694 session->client_transmit_handle = 662
695 GNUNET_SERVER_notify_transmit_ready (session->client, 663 //transmit via mesh messaging
696 msg_length, 664 session->service_transmit_handle = GNUNET_MESH_notify_transmit_ready (session->channel, GNUNET_YES,
697 GNUNET_TIME_UNIT_FOREVER_REL, 665 GNUNET_TIME_UNIT_FOREVER_REL,
698 &do_send_message, 666 msg_length,
699 session); 667 &do_send_message,
700 if (NULL == session->client_transmit_handle) { 668 session);
701 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 669 if (NULL == session->service_transmit_handle) {
702 _ ("Could not send message to client (%p)!\n"), 670 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Could not send message to channel!\n"));
703 session->client);
704 session->client = NULL;
705 // callback was not called!
706 GNUNET_free (msg); 671 GNUNET_free (msg);
707 session->msg = NULL; 672 session->msg = NULL;
673 session->client_notification_task =
674 GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
675 session);
676 return;
708 } 677 }
709 else
710 // gracefully sent message, just terminate session structure
711 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
712 _ ("Sent result to client (%p), this session (%s) has ended!\n"),
713 session->client,
714 GNUNET_h2s (&session->key));
715 free_session_variables (session);
716} 678}
717 679
718 680
@@ -723,7 +685,7 @@ prepare_client_response (void *cls,
723 * @param cls the associated service session 685 * @param cls the associated service session
724 */ 686 */
725static void 687static void
726prepare_service_response_multipart (void *cls) 688prepare_bobs_cryptodata_message_multipart (void *cls)
727{ 689{
728 struct ServiceSession * session = cls; 690 struct ServiceSession * session = cls;
729 struct GNUNET_CRYPTO_PaillierCiphertext * payload; 691 struct GNUNET_CRYPTO_PaillierCiphertext * payload;
@@ -734,7 +696,7 @@ prepare_service_response_multipart (void *cls)
734 uint32_t todo_count; 696 uint32_t todo_count;
735 697
736 msg_length = sizeof (struct GNUNET_SCALARPRODUCT_multipart_message); 698 msg_length = sizeof (struct GNUNET_SCALARPRODUCT_multipart_message);
737 todo_count = session->used - session->transferred; 699 todo_count = session->used_elements_count - session->transferred_element_count;
738 700
739 if (todo_count > MULTIPART_ELEMENT_CAPACITY / 2) 701 if (todo_count > MULTIPART_ELEMENT_CAPACITY / 2)
740 // send the currently possible maximum chunk, we always transfer both permutations 702 // send the currently possible maximum chunk, we always transfer both permutations
@@ -742,17 +704,17 @@ prepare_service_response_multipart (void *cls)
742 704
743 msg_length += todo_count * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * 2; 705 msg_length += todo_count * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * 2;
744 msg = GNUNET_malloc (msg_length); 706 msg = GNUNET_malloc (msg_length);
745 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_TO_BOB_MULTIPART); 707 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA_MULTIPART);
746 msg->header.size = htons (msg_length); 708 msg->header.size = htons (msg_length);
747 msg->multipart_element_count = htonl (todo_count); 709 msg->contained_element_count = htonl (todo_count);
748 710
749 payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1]; 711 payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
750 for (i = session->transferred, j=0; i < session->transferred + todo_count; i++) { 712 for (i = session->transferred_element_count, j = 0; i < session->transferred_element_count + todo_count; i++) {
751 //r[i][p] and r[i][q] 713 //r[i][p] and r[i][q]
752 memcpy (&payload[j++], &session->r[i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)); 714 memcpy (&payload[j++], &session->r[i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
753 memcpy (&payload[j++], &session->r_prime[i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)); 715 memcpy (&payload[j++], &session->r_prime[i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
754 } 716 }
755 session->transferred += todo_count; 717 session->transferred_element_count += todo_count;
756 session->msg = (struct GNUNET_MessageHeader *) msg; 718 session->msg = (struct GNUNET_MessageHeader *) msg;
757 session->service_transmit_handle = 719 session->service_transmit_handle =
758 GNUNET_MESH_notify_transmit_ready (session->channel, 720 GNUNET_MESH_notify_transmit_ready (session->channel,
@@ -764,19 +726,17 @@ prepare_service_response_multipart (void *cls)
764 //disconnect our client 726 //disconnect our client
765 if (NULL == session->service_transmit_handle) { 727 if (NULL == session->service_transmit_handle) {
766 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Could not send service-response message via mesh!)\n")); 728 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Could not send service-response message via mesh!)\n"));
767 session->state = FINALIZED;
768 729
769 session->response->client_notification_task = 730 session->response->client_notification_task =
770 GNUNET_SCHEDULER_add_now (&prepare_client_end_notification, 731 GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
771 session->response); 732 session->response);
772 return; 733 return;
773 } 734 }
774 if (session->transferred != session->used) 735 if (session->transferred_element_count != session->used_elements_count) {
775 // more multiparts 736 // more multiparts
776 session->state = WAITING_FOR_MULTIPART_TRANSMISSION; 737 }
777 else { 738 else {
778 // final part 739 // final part
779 session->state = FINALIZED;
780 GNUNET_free (session->r_prime); 740 GNUNET_free (session->r_prime);
781 GNUNET_free (session->r); 741 GNUNET_free (session->r);
782 session->r_prime = NULL; 742 session->r_prime = NULL;
@@ -795,12 +755,13 @@ prepare_service_response_multipart (void *cls)
795 * S': $S' := E_A(sum r_i^2)$ 755 * S': $S' := E_A(sum r_i^2)$
796 * 756 *
797 * @param session the associated requesting session with alice 757 * @param session the associated requesting session with alice
798 * @return #GNUNET_NO if we could not send our message
799 * #GNUNET_OK if the operation succeeded
800 */ 758 */
801static int 759static void
802prepare_service_response (struct ServiceSession * session) 760prepare_bobs_cryptodata_message (void *cls,
761 const struct GNUNET_SCHEDULER_TaskContext
762 * tc)
803{ 763{
764 struct ServiceSession * session = (struct ServiceSession *) cls;
804 struct GNUNET_SCALARPRODUCT_service_response * msg; 765 struct GNUNET_SCALARPRODUCT_service_response * msg;
805 uint32_t msg_length = 0; 766 uint32_t msg_length = 0;
806 struct GNUNET_CRYPTO_PaillierCiphertext * payload; 767 struct GNUNET_CRYPTO_PaillierCiphertext * payload;
@@ -810,22 +771,21 @@ prepare_service_response (struct ServiceSession * session)
810 + 2 * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext); // s, stick 771 + 2 * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext); // s, stick
811 772
812 if (GNUNET_SERVER_MAX_MESSAGE_SIZE > 773 if (GNUNET_SERVER_MAX_MESSAGE_SIZE >
813 msg_length + 2 * session->used * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)) { //r, r' 774 msg_length + 2 * session->used_elements_count * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)) { //r, r'
814 msg_length += 2 * session->used * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext); 775 msg_length += 2 * session->used_elements_count * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
815 session->transferred = session->used; 776 session->transferred_element_count = session->used_elements_count;
816 } 777 }
817 else 778 else
818 session->transferred = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - msg_length) / 779 session->transferred_element_count = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - msg_length) /
819 (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * 2); 780 (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * 2);
820 781
821 msg = GNUNET_malloc (msg_length); 782 msg = GNUNET_malloc (msg_length);
822 783 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA);
823 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_TO_ALICE);
824 msg->header.size = htons (msg_length); 784 msg->header.size = htons (msg_length);
825 msg->total_element_count = htonl (session->total); 785 msg->total_element_count = htonl (session->total);
826 msg->used_element_count = htonl (session->used); 786 msg->used_element_count = htonl (session->used_elements_count);
827 msg->contained_element_count = htonl (session->transferred); 787 msg->contained_element_count = htonl (session->transferred_element_count);
828 memcpy (&msg->key, &session->key, sizeof (struct GNUNET_HashCode)); 788 memcpy (&msg->key, &session->session_id, sizeof (struct GNUNET_HashCode));
829 789
830 payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1]; 790 payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
831 memcpy (&payload[0], session->s, sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)); 791 memcpy (&payload[0], session->s, sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
@@ -834,12 +794,13 @@ prepare_service_response (struct ServiceSession * session)
834 session->s_prime = NULL; 794 session->s_prime = NULL;
835 GNUNET_free (session->s); 795 GNUNET_free (session->s);
836 session->s = NULL; 796 session->s = NULL;
837 797
798 payload = &payload[2];
838 // convert k[][] 799 // convert k[][]
839 for (i = 0; i < session->transferred; i++) { 800 for (i = 0; i < session->transferred_element_count; i++) {
840 //k[i][p] and k[i][q] 801 //k[i][p] and k[i][q]
841 memcpy (&payload[2 + i*2], &session->r[i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)); 802 memcpy (&payload[i * 2], &session->r[i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
842 memcpy (&payload[3 + i*2], &session->r_prime[i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)); 803 memcpy (&payload[i * 2 + 1], &session->r_prime[i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
843 } 804 }
844 805
845 session->msg = (struct GNUNET_MessageHeader *) msg; 806 session->msg = (struct GNUNET_MessageHeader *) msg;
@@ -853,26 +814,21 @@ prepare_service_response (struct ServiceSession * session)
853 //disconnect our client 814 //disconnect our client
854 if (NULL == session->service_transmit_handle) { 815 if (NULL == session->service_transmit_handle) {
855 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Could not send service-response message via mesh!)\n")); 816 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Could not send service-response message via mesh!)\n"));
856 session->state = FINALIZED;
857 817
858 session->response->client_notification_task = 818 session->response->client_notification_task =
859 GNUNET_SCHEDULER_add_now (&prepare_client_end_notification, 819 GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
860 session->response); 820 session->response);
861 return GNUNET_NO;
862 } 821 }
863 if (session->transferred != session->used) 822 if (session->transferred_element_count != session->used_elements_count) {
864 // multipart 823 // multipart
865 session->state = WAITING_FOR_MULTIPART_TRANSMISSION; 824 }
866 else { 825 else {
867 //singlepart 826 //singlepart
868 session->state = FINALIZED;
869 GNUNET_free (session->r); 827 GNUNET_free (session->r);
870 session->r = NULL; 828 session->r = NULL;
871 GNUNET_free (session->r_prime); 829 GNUNET_free (session->r_prime);
872 session->r_prime = NULL; 830 session->r_prime = NULL;
873 } 831 }
874
875 return GNUNET_OK;
876} 832}
877 833
878 834
@@ -885,17 +841,11 @@ prepare_service_response (struct ServiceSession * session)
885 * S': $S' := E_A(sum r_i^2)$ 841 * S': $S' := E_A(sum r_i^2)$
886 * 842 *
887 * @param request the requesting session + bob's requesting peer 843 * @param request the requesting session + bob's requesting peer
888 * @param response the responding session + bob's client handle
889 * @return GNUNET_SYSERR if the computation failed
890 * GNUNET_OK if everything went well.
891 */ 844 */
892static int 845static void
893compute_service_response (struct ServiceSession * request, 846compute_service_response (struct ServiceSession * session)
894 struct ServiceSession * response)
895{ 847{
896 int i; 848 int i;
897 int j;
898 int ret = GNUNET_SYSERR;
899 unsigned int * p; 849 unsigned int * p;
900 unsigned int * q; 850 unsigned int * q;
901 uint32_t count; 851 uint32_t count;
@@ -907,34 +857,20 @@ compute_service_response (struct ServiceSession * request,
907 struct GNUNET_CRYPTO_PaillierCiphertext * r_prime; 857 struct GNUNET_CRYPTO_PaillierCiphertext * r_prime;
908 struct GNUNET_CRYPTO_PaillierCiphertext * s; 858 struct GNUNET_CRYPTO_PaillierCiphertext * s;
909 struct GNUNET_CRYPTO_PaillierCiphertext * s_prime; 859 struct GNUNET_CRYPTO_PaillierCiphertext * s_prime;
910 uint32_t value;
911 860
912 count = request->used; 861 count = session->used_elements_count;
913 a = request->e_a; 862 a = session->e_a;
914 b = initialize_mpi_vector (count); 863 b = session->sorted_elements;
915 q = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_WEAK, count); 864 q = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_WEAK, count);
916 p = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_WEAK, count); 865 p = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_WEAK, count);
917 rand = initialize_mpi_vector (count); 866
867 for (i = 0; i < count; i++)
868 GNUNET_assert (NULL != (rand[i] = gcry_mpi_new (0)));
918 r = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * count); 869 r = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * count);
919 r_prime = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * count); 870 r_prime = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * count);
920 s = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)); 871 s = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
921 s_prime = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)); 872 s_prime = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
922 873
923 // convert responder session to from long to mpi
924 for (i = 0, j = 0; i < response->total && j < count; i++) {
925 if (request->mask[i / 8] & (1 << (i % 8))) {
926 value = response->vector[i] >= 0 ? response->vector[i] : -response->vector[i];
927 // long to gcry_mpi_t
928 if (0 > response->vector[i])
929 gcry_mpi_sub_ui (b[j], b[j], value);
930 else
931 b[j] = gcry_mpi_set_ui (b[j], value);
932 j++;
933 }
934 }
935 GNUNET_free (response->vector);
936 response->vector = NULL;
937
938 for (i = 0; i < count; i++) { 874 for (i = 0; i < count; i++) {
939 int32_t svalue; 875 int32_t svalue;
940 876
@@ -959,13 +895,13 @@ compute_service_response (struct ServiceSession * request,
959 // E(S - r_pi - b_pi) 895 // E(S - r_pi - b_pi)
960 gcry_mpi_sub (tmp, my_offset, rand[p[i]]); 896 gcry_mpi_sub (tmp, my_offset, rand[p[i]]);
961 gcry_mpi_sub (tmp, tmp, b[p[i]]); 897 gcry_mpi_sub (tmp, tmp, b[p[i]]);
962 GNUNET_CRYPTO_paillier_encrypt (request->remote_pubkey, 898 GNUNET_CRYPTO_paillier_encrypt (&session->remote_pubkey,
963 tmp, 899 tmp,
964 2, 900 2,
965 &r[i]); 901 &r[i]);
966 902
967 // E(S - r_pi - b_pi) * E(S + a_pi) == E(2*S + a - r - b) 903 // E(S - r_pi - b_pi) * E(S + a_pi) == E(2*S + a - r - b)
968 GNUNET_CRYPTO_paillier_hom_add (request->remote_pubkey, 904 GNUNET_CRYPTO_paillier_hom_add (&session->remote_pubkey,
969 &r[i], 905 &r[i],
970 &a[p[i]], 906 &a[p[i]],
971 &r[i]); 907 &r[i]);
@@ -975,13 +911,13 @@ compute_service_response (struct ServiceSession * request,
975 for (i = 0; i < count; i++) { 911 for (i = 0; i < count; i++) {
976 // E(S - r_qi) 912 // E(S - r_qi)
977 gcry_mpi_sub (tmp, my_offset, rand[q[i]]); 913 gcry_mpi_sub (tmp, my_offset, rand[q[i]]);
978 GNUNET_assert (2 == GNUNET_CRYPTO_paillier_encrypt (request->remote_pubkey, 914 GNUNET_assert (2 == GNUNET_CRYPTO_paillier_encrypt (&session->remote_pubkey,
979 tmp, 915 tmp,
980 2, 916 2,
981 &r_prime[i])); 917 &r_prime[i]));
982 918
983 // E(S - r_qi) * E(S + a_qi) == E(2*S + a_qi - r_qi) 919 // E(S - r_qi) * E(S + a_qi) == E(2*S + a_qi - r_qi)
984 GNUNET_assert (1 == GNUNET_CRYPTO_paillier_hom_add (request->remote_pubkey, 920 GNUNET_assert (1 == GNUNET_CRYPTO_paillier_hom_add (&session->remote_pubkey,
985 &r_prime[i], 921 &r_prime[i],
986 &a[q[i]], 922 &a[q[i]],
987 &r_prime[i])); 923 &r_prime[i]));
@@ -989,7 +925,7 @@ compute_service_response (struct ServiceSession * request,
989 925
990 // Calculate S' = E(SUM( r_i^2 )) 926 // Calculate S' = E(SUM( r_i^2 ))
991 tmp = compute_square_sum (rand, count); 927 tmp = compute_square_sum (rand, count);
992 GNUNET_CRYPTO_paillier_encrypt (request->remote_pubkey, 928 GNUNET_CRYPTO_paillier_encrypt (&session->remote_pubkey,
993 tmp, 929 tmp,
994 1, 930 1,
995 s_prime); 931 s_prime);
@@ -998,102 +934,333 @@ compute_service_response (struct ServiceSession * request,
998 for (i = 0; i < count; i++) 934 for (i = 0; i < count; i++)
999 gcry_mpi_add (rand[i], rand[i], b[i]); 935 gcry_mpi_add (rand[i], rand[i], b[i]);
1000 tmp = compute_square_sum (rand, count); 936 tmp = compute_square_sum (rand, count);
1001 GNUNET_CRYPTO_paillier_encrypt (request->remote_pubkey, 937 GNUNET_CRYPTO_paillier_encrypt (&session->remote_pubkey,
1002 tmp, 938 tmp,
1003 1, 939 1,
1004 s); 940 s);
1005 941
1006 request->r = r; 942 session->r = r;
1007 request->r_prime = r_prime; 943 session->r_prime = r_prime;
1008 request->s = s; 944 session->s = s;
1009 request->s_prime = s_prime; 945 session->s_prime = s_prime;
1010 request->response = response;
1011 946
1012 // release rand, b and a 947 // release rand, b and a
1013 for (i = 0; i < count; i++) { 948 for (i = 0; i < count; i++) {
1014 gcry_mpi_release (rand[i]); 949 gcry_mpi_release (rand[i]);
1015 gcry_mpi_release (b[i]); 950 gcry_mpi_release (b[i]);
1016 gcry_mpi_release (request->a[i]);
1017 } 951 }
1018 gcry_mpi_release (tmp); 952 gcry_mpi_release (tmp);
1019 GNUNET_free (request->a); 953 GNUNET_free (session->e_a);
1020 request->a = NULL; 954 session->e_a = NULL;
1021 GNUNET_free (p); 955 GNUNET_free (p);
1022 GNUNET_free (q); 956 GNUNET_free (q);
1023 GNUNET_free (b); 957 GNUNET_free (b);
1024 GNUNET_free (rand); 958 GNUNET_free (rand);
1025 959
1026 // copy the r[], r_prime[], S and Stick into a new message, prepare_service_response frees these 960 // copy the r[], r_prime[], S and Stick into a new message, prepare_service_response frees these
1027 if (GNUNET_YES != prepare_service_response (request)) 961 GNUNET_SCHEDULER_add_now (&prepare_bobs_cryptodata_message, session);
1028 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Failed to communicate with `%s', scalar product calculation aborted.\n"), 962}
1029 GNUNET_i2s (&request->peer)); 963
964
965/**
966 * Iterator over all hash map entries in session->intersected_elements.
967 *
968 * @param cls closure
969 * @param key current key code
970 * @param value value in the hash map
971 * @return #GNUNET_YES if we should continue to
972 * iterate,
973 * #GNUNET_NO if not.
974 */
975int
976cb_insert_element_sorted (void *cls,
977 const struct GNUNET_HashCode *key,
978 void *value)
979{
980 struct ServiceSession * session = (struct ServiceSession*) cls;
981 struct SortedValue * e = GNUNET_new (struct SortedValue);
982 struct SortedValue * o = session->a_head;
983
984 e->elem = value;
985 e->val = gcry_mpi_new (0);
986 if (0 > e->elem->value)
987 gcry_mpi_sub_ui (e->val, e->val, abs (e->elem->value));
1030 else 988 else
1031 ret = GNUNET_OK; 989 gcry_mpi_add_ui (e->val, e->val, e->elem->value);
990
991 // insert as first element with the lowest key
992 if (NULL == session->a_head
993 || (0 <= GNUNET_CRYPTO_hash_cmp (&session->a_head->elem->key, &e->elem->key))) {
994 GNUNET_CONTAINER_DLL_insert (session->a_head, session->a_tail, e);
995 return GNUNET_YES;
996 }
997 // insert as last element with the highest key
998 if (0 >= GNUNET_CRYPTO_hash_cmp (&session->a_tail->elem->key, &e->elem->key)) {
999 GNUNET_CONTAINER_DLL_insert_tail (session->a_head, session->a_tail, e);
1000 return GNUNET_YES;
1001 }
1002 // insert before the first higher/equal element
1003 do {
1004 if (0 <= GNUNET_CRYPTO_hash_cmp (&o->elem->key, &e->elem->key)) {
1005 GNUNET_CONTAINER_DLL_insert_before (session->a_head, session->a_tail, o, e);
1006 return GNUNET_YES;
1007 }
1008 o = o->next;
1009 }
1010 while (NULL != o);
1011 // broken DLL
1012 GNUNET_assert (0);
1013 return GNUNET_NO;
1014}
1015
1016
1017/**
1018 * Callback for set operation results. Called for each element
1019 * in the result set.
1020 *
1021 * @param cls closure
1022 * @param element a result element, only valid if status is #GNUNET_SET_STATUS_OK
1023 * @param status see `enum GNUNET_SET_Status`
1024 */
1025static void
1026cb_intersection_element_removed (void *cls,
1027 const struct GNUNET_SET_Element *element,
1028 enum GNUNET_SET_Status status)
1029{
1030 struct ServiceSession * session = (struct ServiceSession*) cls;
1031 struct GNUNET_SCALARPRODUCT_Element * se;
1032 int i;
1033
1034 switch (status)
1035 {
1036 case GNUNET_SET_STATUS_OK:
1037 //this element has been removed from the set
1038 se = GNUNET_CONTAINER_multihashmap_get (session->intersected_elements,
1039 element->data);
1040
1041 GNUNET_CONTAINER_multihashmap_remove (session->intersected_elements,
1042 element->data,
1043 se);
1044 session->used_elements_count--;
1045 return;
1046
1047 case GNUNET_SET_STATUS_DONE:
1048 if (2 > session->used_elements_count) {
1049 // failed! do not leak information about our single remaining element!
1050 // continue after the loop
1051 break;
1052 }
1053
1054 GNUNET_CONTAINER_multihashmap_iterate (session->intersected_elements,
1055 &cb_insert_element_sorted,
1056 session);
1057
1058 session->sorted_elements = GNUNET_malloc (session->used_elements_count * sizeof (gcry_mpi_t));
1059 for (i = 0; NULL != session->a_head; i++) {
1060 struct SortedValue* a = session->a_head;
1061 if (i > session->used_elements_count) {
1062 GNUNET_assert (0);
1063 return;
1064 }
1065 session->sorted_elements[i] = a->val;
1066 GNUNET_CONTAINER_DLL_remove (session->a_head, session->a_tail, a);
1067 GNUNET_free (a->elem);
1068 }
1069 if (i != session->used_elements_count)
1070 GNUNET_assert (0);
1071
1072 if (ALICE == session->role) {
1073 prepare_alices_cyrptodata_message (session);
1074 return;
1075 }
1076 else {
1077 if (session->used_elements_count == session->transferred_element_count)
1078 compute_service_response (session);
1079
1080 return;
1081 }
1082 default:
1083 break;
1084 }
1085
1086 //failed if we go here
1087 if (ALICE == session->role) {
1088 session->client_notification_task =
1089 GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1090 session);
1091 }
1092 else {
1093 //TODO: Fail service session, exit tunnel
1094
1095
1096 session->response->client_notification_task =
1097 GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1098 session->response);
1032 1099
1033 return ret; 1100 }
1034} 1101}
1035 1102
1036 1103
1037/** 1104/**
1038 * Send a multi part chunk of a service request from alice to bob. 1105 * Called when another peer wants to do a set operation with the
1039 * This element only contains a part of the elements-vector (session->a[]), 1106 * local peer. If a listen error occurs, the @a request is NULL.
1040 * mask and public key set have to be contained within the first message
1041 * 1107 *
1042 * This allows a ~32kbit key length while using 32000 elements or 62000 elements per request. 1108 * @param cls closure
1109 * @param other_peer the other peer
1110 * @param context_msg message with application specific information from
1111 * the other peer
1112 * @param request request from the other peer (never NULL), use GNUNET_SET_accept()
1113 * to accept it, otherwise the request will be refused
1114 * Note that we can't just return value from the listen callback,
1115 * as it is also necessary to specify the set we want to do the
1116 * operation with, whith sometimes can be derived from the context
1117 * message. It's necessary to specify the timeout.
1118 */
1119static void
1120cb_intersection_request_alice (void *cls,
1121 const struct GNUNET_PeerIdentity *other_peer,
1122 const struct GNUNET_MessageHeader *context_msg,
1123 struct GNUNET_SET_Request *request)
1124{
1125 struct ServiceSession * session = (struct ServiceSession *) cls;
1126
1127 // check the peer-id, the app-id=session-id is compared by SET
1128 if (0 != memcmp (&session->peer, &other_peer, sizeof (struct GNUNET_PeerIdentity)))
1129 return;
1130
1131 session->intersection_op = GNUNET_SET_accept (request,
1132 GNUNET_SET_RESULT_REMOVED,
1133 cb_intersection_element_removed,
1134 session);
1135
1136 if (NULL == session->intersection_op) {
1137 session->response->client_notification_task =
1138 GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1139 session);
1140 return;
1141 }
1142 if (GNUNET_OK != GNUNET_SET_commit (session->intersection_op, session->intersection_set)) {
1143 session->response->client_notification_task =
1144 GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1145 session);
1146 return;
1147 }
1148 session->intersection_set = NULL;
1149 session->intersection_listen = NULL;
1150}
1151
1152
1153/**
1154 * prepare the response we will send to alice or bobs' clients.
1155 * in Bobs case the product will be NULL.
1043 * 1156 *
1044 * @param cls the associated service session 1157 * @param cls the session associated with our client.
1158 * @param tc the task context handed to us by the scheduler, unused
1045 */ 1159 */
1046static void 1160static void
1047prepare_service_request_multipart (void *cls) 1161prepare_client_response (void *cls,
1162 const struct GNUNET_SCHEDULER_TaskContext *tc)
1048{ 1163{
1049 struct ServiceSession * session = cls; 1164 struct ServiceSession * session = cls;
1050 struct GNUNET_SCALARPRODUCT_multipart_message * msg; 1165 struct GNUNET_SCALARPRODUCT_client_response * msg;
1051 struct GNUNET_CRYPTO_PaillierCiphertext * payload; 1166 unsigned char * product_exported = NULL;
1052 unsigned int i; 1167 size_t product_length = 0;
1053 unsigned int j; 1168 uint32_t msg_length = 0;
1054 uint32_t msg_length; 1169 int8_t range = -1;
1055 uint32_t todo_count; 1170 gcry_error_t rc;
1056 gcry_mpi_t a; 1171 int sign;
1057 uint32_t value;
1058 1172
1059 msg_length = sizeof (struct GNUNET_SCALARPRODUCT_multipart_message); 1173 session->client_notification_task = GNUNET_SCHEDULER_NO_TASK;
1060 todo_count = session->used - session->transferred;
1061 1174
1062 if (todo_count > MULTIPART_ELEMENT_CAPACITY) 1175 if (session->product) {
1063 // send the currently possible maximum chunk 1176 gcry_mpi_t value = gcry_mpi_new (0);
1064 todo_count = MULTIPART_ELEMENT_CAPACITY; 1177
1178 sign = gcry_mpi_cmp_ui (session->product, 0);
1179 // libgcrypt can not handle a print of a negative number
1180 // if (a->sign) return gcry_error (GPG_ERR_INTERNAL); /* Can't handle it yet. */
1181 if (0 > sign) {
1182 gcry_mpi_sub (value, value, session->product);
1183 }
1184 else if (0 < sign) {
1185 range = 1;
1186 gcry_mpi_add (value, value, session->product);
1187 }
1188 else
1189 range = 0;
1190
1191 gcry_mpi_release (session->product);
1192 session->product = NULL;
1193
1194 // get representation as string
1195 if (range
1196 && (0 != (rc = gcry_mpi_aprint (GCRYMPI_FMT_STD,
1197 &product_exported,
1198 &product_length,
1199 value)))) {
1200 LOG_GCRY (GNUNET_ERROR_TYPE_ERROR, "gcry_mpi_scan", rc);
1201 product_length = 0;
1202 range = -1; // signal error with product-length = 0 and range = -1
1203 }
1204 gcry_mpi_release (value);
1205 }
1065 1206
1066 msg_length += todo_count * sizeof(struct GNUNET_CRYPTO_PaillierCiphertext); 1207 msg_length = sizeof (struct GNUNET_SCALARPRODUCT_client_response) +product_length;
1067 msg = GNUNET_malloc (msg_length); 1208 msg = GNUNET_malloc (msg_length);
1068 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_TO_BOB_MULTIPART); 1209 msg->key = session->session_id;
1210 msg->peer = session->peer;
1211 if (product_exported != NULL) {
1212 memcpy (&msg[1], product_exported, product_length);
1213 GNUNET_free (product_exported);
1214 }
1215 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT);
1069 msg->header.size = htons (msg_length); 1216 msg->header.size = htons (msg_length);
1070 msg->multipart_element_count = htonl (todo_count); 1217 msg->range = range;
1218 msg->product_length = htonl (product_length);
1071 1219
1072 a = gcry_mpi_new (0); 1220 session->msg = (struct GNUNET_MessageHeader *) msg;
1073 payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1]; 1221 //transmit this message to our client
1074 // encrypt our vector and generate string representations 1222 session->client_transmit_handle =
1075 for (i = session->last_processed, j = 0; i < session->total; i++) { 1223 GNUNET_SERVER_notify_transmit_ready (session->client,
1076 // is this a used element? 1224 msg_length,
1077 if (session->mask[i / 8] & 1 << (i % 8)) { 1225 GNUNET_TIME_UNIT_FOREVER_REL,
1078 if (todo_count <= j) 1226 &do_send_message,
1079 break; //reached end of this message, can't include more 1227 session);
1080 1228 if (NULL == session->client_transmit_handle) {
1081 value = session->vector[i] >= 0 ? session->vector[i] : -session->vector[i]; 1229 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1082 1230 _ ("Could not send message to client (%p)!\n"),
1083 a = gcry_mpi_set_ui (a, 0); 1231 session->client);
1084 // long to gcry_mpi_t 1232 session->client = NULL;
1085 if (session->vector[i] < 0) 1233 // callback was not called!
1086 gcry_mpi_sub_ui (a, a, value); 1234 GNUNET_free (msg);
1087 else 1235 session->msg = NULL;
1088 gcry_mpi_add_ui (a, a, value);
1089
1090 session->a[session->transferred + j] = gcry_mpi_set (NULL, a);
1091 gcry_mpi_add (a, a, my_offset);
1092 GNUNET_CRYPTO_paillier_encrypt (&my_pubkey, a, 3, &payload[j++]);
1093 }
1094 } 1236 }
1095 gcry_mpi_release (a); 1237 else
1096 session->transferred += todo_count; 1238 // gracefully sent message, just terminate session structure
1239 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1240 _ ("Sent result to client (%p), this session (%s) has ended!\n"),
1241 session->client,
1242 GNUNET_h2s (&session->session_id));
1243 free_session_variables (session);
1244}
1245
1246
1247/**
1248 * Executed by Alice, fills in a service-request message and sends it to the given peer
1249 *
1250 * @param session the session associated with this request
1251 */
1252static void
1253prepare_alices_computation_request (struct ServiceSession * session)
1254{
1255 struct GNUNET_SCALARPRODUCT_service_request * msg;
1256
1257 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, _ ("Successfully created new channel to peer (%s)!\n"), GNUNET_i2s (&session->peer));
1258
1259 msg = GNUNET_new (struct GNUNET_SCALARPRODUCT_service_request);
1260 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA);
1261 msg->total_element_count = htonl (session->used_elements_count);
1262 memcpy (&msg->session_id, &session->session_id, sizeof (struct GNUNET_HashCode));
1263 msg->header.size = htons (sizeof (struct GNUNET_SCALARPRODUCT_service_request));
1097 1264
1098 session->msg = (struct GNUNET_MessageHeader *) msg; 1265 session->msg = (struct GNUNET_MessageHeader *) msg;
1099 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Transmitting service request.\n")); 1266 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Transmitting service request.\n"));
@@ -1101,11 +1268,11 @@ prepare_service_request_multipart (void *cls)
1101 //transmit via mesh messaging 1268 //transmit via mesh messaging
1102 session->service_transmit_handle = GNUNET_MESH_notify_transmit_ready (session->channel, GNUNET_YES, 1269 session->service_transmit_handle = GNUNET_MESH_notify_transmit_ready (session->channel, GNUNET_YES,
1103 GNUNET_TIME_UNIT_FOREVER_REL, 1270 GNUNET_TIME_UNIT_FOREVER_REL,
1104 msg_length, 1271 sizeof (struct GNUNET_SCALARPRODUCT_service_request),
1105 &do_send_message, 1272 &do_send_message,
1106 session); 1273 session);
1107 if (!session->service_transmit_handle) { 1274 if (!session->service_transmit_handle) {
1108 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Could not send service-request multipart message to channel!\n")); 1275 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Could not send message to channel!\n"));
1109 GNUNET_free (msg); 1276 GNUNET_free (msg);
1110 session->msg = NULL; 1277 session->msg = NULL;
1111 session->client_notification_task = 1278 session->client_notification_task =
@@ -1113,96 +1280,52 @@ prepare_service_request_multipart (void *cls)
1113 session); 1280 session);
1114 return; 1281 return;
1115 } 1282 }
1116 if (session->transferred != session->used) {
1117 session->last_processed = i;
1118 }
1119 else
1120 //final part
1121 session->state = WAITING_FOR_SERVICE_RESPONSE;
1122} 1283}
1123 1284
1124 1285
1125/** 1286/**
1126 * Executed by Alice, fills in a service-request message and sends it to the given peer 1287 * Send a multi part chunk of a service request from alice to bob.
1288 * This element only contains a part of the elements-vector (session->a[]),
1289 * mask and public key set have to be contained within the first message
1127 * 1290 *
1128 * @param cls the session associated with this request 1291 * This allows a ~32kbit key length while using 32000 elements or 62000 elements per request.
1129 * @param tc task context handed over by scheduler, unsued 1292 *
1293 * @param cls the associated service session
1130 */ 1294 */
1131static void 1295static void
1132prepare_service_request (void *cls, 1296prepare_alices_cyrptodata_message_multipart (void *cls)
1133 const struct GNUNET_SCHEDULER_TaskContext *tc)
1134{ 1297{
1135 struct ServiceSession * session = cls; 1298 struct ServiceSession * session = cls;
1136 unsigned char * current; 1299 struct GNUNET_SCALARPRODUCT_multipart_message * msg;
1137 struct GNUNET_SCALARPRODUCT_service_request * msg;
1138 struct GNUNET_CRYPTO_PaillierCiphertext * payload; 1300 struct GNUNET_CRYPTO_PaillierCiphertext * payload;
1139 unsigned int i; 1301 unsigned int i;
1140 unsigned int j;
1141 uint32_t msg_length; 1302 uint32_t msg_length;
1303 uint32_t todo_count;
1142 gcry_mpi_t a; 1304 gcry_mpi_t a;
1143 uint32_t value;
1144
1145 session->service_request_task = GNUNET_SCHEDULER_NO_TASK;
1146
1147 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, _ ("Successfully created new channel to peer (%s)!\n"), GNUNET_i2s (&session->peer));
1148 1305
1149 msg_length = sizeof (struct GNUNET_SCALARPRODUCT_service_request) 1306 msg_length = sizeof (struct GNUNET_SCALARPRODUCT_multipart_message);
1150 + session->mask_length 1307 todo_count = session->used_elements_count - session->transferred_element_count;
1151 + sizeof(struct GNUNET_CRYPTO_PaillierPublicKey);
1152 1308
1153 if (GNUNET_SERVER_MAX_MESSAGE_SIZE > msg_length + session->used * sizeof(struct GNUNET_CRYPTO_PaillierCiphertext)) { 1309 if (todo_count > MULTIPART_ELEMENT_CAPACITY)
1154 msg_length += session->used * sizeof(struct GNUNET_CRYPTO_PaillierCiphertext); 1310 // send the currently possible maximum chunk
1155 session->transferred = session->used; 1311 todo_count = MULTIPART_ELEMENT_CAPACITY;
1156 }
1157 else {
1158 //create a multipart msg, first we calculate a new msg size for the head msg
1159 session->transferred = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - msg_length) / sizeof(struct GNUNET_CRYPTO_PaillierCiphertext);
1160 }
1161 1312
1313 msg_length += todo_count * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
1162 msg = GNUNET_malloc (msg_length); 1314 msg = GNUNET_malloc (msg_length);
1163 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_TO_BOB); 1315 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA_MULTIPART);
1164 msg->total_element_count = htonl (session->used);
1165 msg->contained_element_count = htonl (session->transferred);
1166 memcpy (&msg->key, &session->key, sizeof (struct GNUNET_HashCode));
1167 msg->mask_length = htonl (session->mask_length);
1168 msg->element_count = htonl (session->total);
1169 msg->header.size = htons (msg_length); 1316 msg->header.size = htons (msg_length);
1317 msg->contained_element_count = htonl (todo_count);
1170 1318
1171 // fill in the payload 1319 payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
1172 current = (unsigned char *) &msg[1]; 1320
1173 // copy over the mask 1321 // now copy over the sorted element vector
1174 memcpy (current, session->mask, session->mask_length);
1175 // copy over our public key
1176 current += session->mask_length;
1177 memcpy (current, &my_pubkey, sizeof(struct GNUNET_CRYPTO_PaillierPublicKey));
1178 current += sizeof(struct GNUNET_CRYPTO_PaillierPublicKey);
1179 payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) current;
1180
1181 // now copy over the element vector
1182 session->a = GNUNET_malloc (sizeof (gcry_mpi_t) * session->used);
1183 a = gcry_mpi_new (0); 1322 a = gcry_mpi_new (0);
1184 // encrypt our vector and generate string representations 1323 for (i = session->transferred_element_count; i < todo_count; i++) {
1185 for (i = 0, j = 0; i < session->total; i++) { 1324 gcry_mpi_add (a, session->sorted_elements[i], my_offset);
1186 // if this is a used element... 1325 GNUNET_CRYPTO_paillier_encrypt (&my_pubkey, a, 3, &payload[i - session->transferred_element_count]);
1187 if (session->mask[i / 8] & 1 << (i % 8)) {
1188 if (session->transferred <= j)
1189 break; //reached end of this message, can't include more
1190
1191 value = session->vector[i] >= 0 ? session->vector[i] : -session->vector[i];
1192
1193 a = gcry_mpi_set_ui (a, 0);
1194 // long to gcry_mpi_t
1195 if (session->vector[i] < 0)
1196 gcry_mpi_sub_ui (a, a, value);
1197 else
1198 gcry_mpi_add_ui (a, a, value);
1199
1200 session->a[j] = gcry_mpi_set (NULL, a);
1201 gcry_mpi_add (a, a, my_offset);
1202 GNUNET_CRYPTO_paillier_encrypt (&my_pubkey, a, 3, &payload[j++]);
1203 }
1204 } 1326 }
1205 gcry_mpi_release (a); 1327 gcry_mpi_release (a);
1328 session->transferred_element_count += todo_count;
1206 1329
1207 session->msg = (struct GNUNET_MessageHeader *) msg; 1330 session->msg = (struct GNUNET_MessageHeader *) msg;
1208 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Transmitting service request.\n")); 1331 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Transmitting service request.\n"));
@@ -1214,7 +1337,7 @@ prepare_service_request (void *cls,
1214 &do_send_message, 1337 &do_send_message,
1215 session); 1338 session);
1216 if (!session->service_transmit_handle) { 1339 if (!session->service_transmit_handle) {
1217 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Could not send message to channel!\n")); 1340 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Could not send service-request multipart message to channel!\n"));
1218 GNUNET_free (msg); 1341 GNUNET_free (msg);
1219 session->msg = NULL; 1342 session->msg = NULL;
1220 session->client_notification_task = 1343 session->client_notification_task =
@@ -1222,13 +1345,154 @@ prepare_service_request (void *cls,
1222 session); 1345 session);
1223 return; 1346 return;
1224 } 1347 }
1225 if (session->transferred != session->used) { 1348}
1226 session->state = WAITING_FOR_MULTIPART_TRANSMISSION; 1349
1227 session->last_processed = i; 1350
1351/**
1352 * Our client has finished sending us its multipart message.
1353 *
1354 * @param session the service session context
1355 */
1356static void
1357client_request_complete_bob (struct ServiceSession * client_session)
1358{
1359 struct ServiceSession * session;
1360
1361 //check if service queue contains a matching request
1362 session = find_matching_session (from_service_tail,
1363 &client_session->session_id,
1364 client_session->total, NULL);
1365 if (NULL != session) {
1366 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1367 _ ("Got client-responder-session with key %s and a matching service-request-session set, processing.\n"),
1368 GNUNET_h2s (&client_session->session_id));
1369
1370 session->response = client_session;
1371 session->intersected_elements = client_session->intersected_elements;
1372 client_session->intersected_elements = NULL;
1373 session->intersection_set = client_session->intersection_set;
1374 client_session->intersection_set = NULL;
1375
1376 session->intersection_op = GNUNET_SET_prepare (&session->peer,
1377 &session->session_id,
1378 NULL,
1379 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT16_MAX),
1380 GNUNET_SET_RESULT_REMOVED,
1381 cb_intersection_element_removed,
1382 session);
1383
1384 GNUNET_SET_commit (session->intersection_op, session->intersection_set);
1385 }
1386 else {
1387 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1388 _ ("Got client-responder-session with key %s but NO matching service-request-session set, queuing element for later use.\n"),
1389 GNUNET_h2s (&client_session->session_id));
1390 // no matching session exists yet, store the response
1391 // for later processing by handle_service_request()
1392 }
1393}
1394
1395
1396/**
1397 * Our client has finished sending us its multipart message.
1398 *
1399 * @param session the service session context
1400 */
1401static void
1402client_request_complete_alice (struct ServiceSession * session)
1403{
1404 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1405 _ ("Creating new channel for session with key %s.\n"),
1406 GNUNET_h2s (&session->session_id));
1407 session->channel = GNUNET_MESH_channel_create (my_mesh, session,
1408 &session->peer,
1409 GNUNET_APPLICATION_TYPE_SCALARPRODUCT,
1410 GNUNET_MESH_OPTION_RELIABLE);
1411 if (NULL == session->channel) {
1412 session->response->client_notification_task =
1413 GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1414 session);
1415 return;
1416 }
1417 session->intersection_listen = GNUNET_SET_listen (cfg,
1418 GNUNET_SET_OPERATION_INTERSECTION,
1419 &session->session_id,
1420 cb_intersection_request_alice,
1421 session);
1422 if (NULL == session->intersection_listen) {
1423 session->response->client_notification_task =
1424 GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1425 session);
1426 return;
1228 } 1427 }
1428 prepare_alices_computation_request (session);
1429}
1430
1431
1432static void
1433handle_client_message_multipart (void *cls,
1434 struct GNUNET_SERVER_Client *client,
1435 const struct GNUNET_MessageHeader *message)
1436{
1437 const struct GNUNET_SCALARPRODUCT_computation_message_multipart * msg = (const struct GNUNET_SCALARPRODUCT_computation_message_multipart *) message;
1438 struct ServiceSession * session;
1439 uint32_t contained_count;
1440 struct GNUNET_SCALARPRODUCT_Element * elements;
1441 uint32_t i;
1442
1443 // only one concurrent session per client connection allowed, simplifies logics a lot...
1444 session = GNUNET_SERVER_client_get_user_context (client, struct ServiceSession);
1445 if (NULL == session) {
1446 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1447 return;
1448 }
1449
1450 contained_count = ntohl (msg->element_count_contained);
1451
1452 //sanity check: is the message as long as the message_count fields suggests?
1453 if ((ntohs (msg->header.size) != (sizeof (struct GNUNET_SCALARPRODUCT_computation_message_multipart) +contained_count * sizeof (struct GNUNET_SCALARPRODUCT_Element)))
1454 || (0 == contained_count) || (session->total < session->transferred_element_count + contained_count)) {
1455 GNUNET_break_op (0);
1456 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1457 return;
1458 }
1459 session->transferred_element_count += contained_count;
1460
1461 elements = (struct GNUNET_SCALARPRODUCT_Element *) & msg[1];
1462 for (i = 0; i < contained_count; i++) {
1463 struct GNUNET_SET_Element set_elem;
1464 struct GNUNET_SCALARPRODUCT_Element * elem;
1465
1466 if (0 == ntohl (elements[i].value))
1467 continue;
1468
1469 elem = GNUNET_new (struct GNUNET_SCALARPRODUCT_Element);
1470 memcpy (elem, &elements[i], sizeof (struct GNUNET_SCALARPRODUCT_Element));
1471
1472 if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (session->intersected_elements,
1473 &elem->key,
1474 elem,
1475 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) {
1476 GNUNET_free (elem);
1477 continue;
1478 }
1479 set_elem.data = &elements[i].key;
1480 set_elem.size = htons (sizeof (elements[i].key));
1481 set_elem.type = htons (0); /* do we REALLY need this? */
1482 GNUNET_SET_add_element (session->intersection_set, &set_elem, NULL, NULL);
1483 session->used_elements_count++;
1484 }
1485
1486 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1487
1488 if (session->total != session->transferred_element_count)
1489 // multipart msg
1490 return;
1491
1492 if (ALICE == session->role)
1493 client_request_complete_alice (session);
1229 else 1494 else
1230 //singlepart message 1495 client_request_complete_bob (session);
1231 session->state = WAITING_FOR_SERVICE_RESPONSE;
1232} 1496}
1233 1497
1234 1498
@@ -1243,176 +1507,111 @@ prepare_service_request (void *cls,
1243 * @param message the actual message 1507 * @param message the actual message
1244 */ 1508 */
1245static void 1509static void
1246handle_client_request (void *cls, 1510handle_client_message (void *cls,
1247 struct GNUNET_SERVER_Client *client, 1511 struct GNUNET_SERVER_Client *client,
1248 const struct GNUNET_MessageHeader *message) 1512 const struct GNUNET_MessageHeader *message)
1249{ 1513{
1250 const struct GNUNET_SCALARPRODUCT_client_request * msg = (const struct GNUNET_SCALARPRODUCT_client_request *) message; 1514 const struct GNUNET_SCALARPRODUCT_computation_message * msg = (const struct GNUNET_SCALARPRODUCT_computation_message *) message;
1251 struct ServiceSession * session; 1515 struct ServiceSession * session;
1252 uint32_t element_count; 1516 uint32_t contained_count;
1253 uint32_t mask_length; 1517 uint32_t total_count;
1254 uint32_t msg_type; 1518 uint32_t msg_type;
1255 int32_t * vector; 1519 struct GNUNET_SCALARPRODUCT_Element * elements;
1256 uint32_t i; 1520 uint32_t i;
1257 1521
1258 // only one concurrent session per client connection allowed, simplifies logics a lot... 1522 // only one concurrent session per client connection allowed, simplifies logics a lot...
1259 session = GNUNET_SERVER_client_get_user_context (client, struct ServiceSession); 1523 session = GNUNET_SERVER_client_get_user_context (client, struct ServiceSession);
1260 if ((NULL != session) && (session->state != FINALIZED)) { 1524 if (NULL != session) {
1261 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1525 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1262 return; 1526 return;
1263 } 1527 }
1264 else if (NULL != session) {
1265 // old session is already completed, clean it up
1266 GNUNET_CONTAINER_DLL_remove (from_client_head, from_client_tail, session);
1267 free_session_variables (session);
1268 GNUNET_free (session);
1269 }
1270 1528
1271 //we need at least a peer and one message id to compare 1529 msg_type = ntohs (msg->header.type);
1272 if (sizeof (struct GNUNET_SCALARPRODUCT_client_request) > ntohs (msg->header.size)) { 1530 total_count = ntohl (msg->element_count_total);
1273 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1531 contained_count = ntohl (msg->element_count_contained);
1274 _ ("Too short message received from client!\n")); 1532
1533 if ((GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE == msg_type)
1534 && (!memcmp (&msg->peer, &me, sizeof (struct GNUNET_PeerIdentity)))) {
1535 //session with ourself makes no sense!
1536 GNUNET_break_op (0);
1275 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1537 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1276 return; 1538 return;
1277 } 1539 }
1278 1540
1279 msg_type = ntohs (msg->header.type);
1280 element_count = ntohl (msg->element_count);
1281 mask_length = ntohl (msg->mask_length);
1282
1283 //sanity check: is the message as long as the message_count fields suggests? 1541 //sanity check: is the message as long as the message_count fields suggests?
1284 if ((ntohs (msg->header.size) != (sizeof (struct GNUNET_SCALARPRODUCT_client_request) +element_count * sizeof (int32_t) + mask_length)) 1542 if ((ntohs (msg->header.size) != (sizeof (struct GNUNET_SCALARPRODUCT_computation_message) +contained_count * sizeof (struct GNUNET_SCALARPRODUCT_Element)))
1285 || (0 == element_count)) { 1543 || (0 == total_count)) {
1286 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1544 GNUNET_break_op (0);
1287 _ ("Invalid message received from client, session information incorrect!\n"));
1288 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1545 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1289 return; 1546 return;
1290 } 1547 }
1291 1548
1292 // do we have a duplicate session here already? 1549 // do we have a duplicate session here already?
1293 if (NULL != find_matching_session (from_client_tail, 1550 if (NULL != find_matching_session (from_client_tail,
1294 &msg->key, 1551 &msg->session_key,
1295 element_count, 1552 total_count, NULL)) {
1296 NULL, NULL)) {
1297 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1553 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1298 _ ("Duplicate session information received, cannot create new session with key `%s'\n"), 1554 _ ("Duplicate session information received, can not create new session with key `%s'\n"),
1299 GNUNET_h2s (&msg->key)); 1555 GNUNET_h2s (&msg->session_key));
1300 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1556 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1301 return; 1557 return;
1302 } 1558 }
1303 1559
1304 session = GNUNET_new (struct ServiceSession); 1560 session = GNUNET_new (struct ServiceSession);
1305 session->service_request_task = GNUNET_SCHEDULER_NO_TASK;
1306 session->client_notification_task = GNUNET_SCHEDULER_NO_TASK; 1561 session->client_notification_task = GNUNET_SCHEDULER_NO_TASK;
1307 session->client = client; 1562 session->client = client;
1308 session->total = element_count; 1563 session->total = total_count;
1309 session->mask_length = mask_length; 1564 session->transferred_element_count = contained_count;
1310 // get our transaction key 1565 // get our transaction key
1311 memcpy (&session->key, &msg->key, sizeof (struct GNUNET_HashCode)); 1566 memcpy (&session->session_id, &msg->session_key, sizeof (struct GNUNET_HashCode));
1312 //allocate memory for vector and encrypted vector 1567
1313 session->vector = GNUNET_malloc (sizeof (int32_t) * element_count); 1568 elements = (struct GNUNET_SCALARPRODUCT_Element *) & msg[1];
1314 vector = (int32_t *) & msg[1]; 1569 session->intersected_elements = GNUNET_CONTAINER_multihashmap_create (session->total, GNUNET_NO);
1570 session->intersection_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_INTERSECTION);
1571 for (i = 0; i < contained_count; i++) {
1572 struct GNUNET_SET_Element set_elem;
1573 struct GNUNET_SCALARPRODUCT_Element * elem;
1574
1575 if (0 == ntohl (elements[i].value))
1576 continue;
1577
1578 elem = GNUNET_new (struct GNUNET_SCALARPRODUCT_Element);
1579 memcpy (elem, &elements[i], sizeof (struct GNUNET_SCALARPRODUCT_Element));
1580
1581 if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (session->intersected_elements,
1582 &elem->key,
1583 elem,
1584 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) {
1585 GNUNET_free (elem);
1586 continue;
1587 }
1588 set_elem.data = &elements[i].key;
1589 set_elem.size = htons (sizeof (elements[i].key));
1590 set_elem.type = htons (0); /* do we REALLY need this? */
1591 GNUNET_SET_add_element (session->intersection_set, &set_elem, NULL, NULL);
1592 session->used_elements_count++;
1593 }
1315 1594
1316 if (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE == msg_type) { 1595 if (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE == msg_type) {
1317 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1318 _ ("Got client-request-session with key %s, preparing channel to remote service.\n"),
1319 GNUNET_h2s (&session->key));
1320
1321 session->role = ALICE; 1596 session->role = ALICE;
1322 // fill in the mask
1323 session->mask = GNUNET_malloc (mask_length);
1324 memcpy (session->mask, &vector[element_count], mask_length);
1325
1326 // copy over the elements
1327 session->used = 0;
1328 for (i = 0; i < element_count; i++) {
1329 session->vector[i] = ntohl (vector[i]);
1330 if (session->vector[i] == 0)
1331 session->mask[i / 8] &= ~(1 << (i % 8));
1332 if (session->mask[i / 8] & (1 << (i % 8)))
1333 session->used++;
1334 }
1335
1336 if (0 == session->used) {
1337 GNUNET_break_op (0);
1338 GNUNET_free (session->vector);
1339 GNUNET_free (session);
1340 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1341 return;
1342 }
1343 //session with ourself makes no sense!
1344 if (!memcmp (&msg->peer, &me, sizeof (struct GNUNET_PeerIdentity))) {
1345 GNUNET_break (0);
1346 GNUNET_free (session->vector);
1347 GNUNET_free (session);
1348 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1349 return;
1350 }
1351 // get our peer ID
1352 memcpy (&session->peer, &msg->peer, sizeof (struct GNUNET_PeerIdentity)); 1597 memcpy (&session->peer, &msg->peer, sizeof (struct GNUNET_PeerIdentity));
1353 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1354 _ ("Creating new channel for session with key %s.\n"),
1355 GNUNET_h2s (&session->key));
1356 session->channel = GNUNET_MESH_channel_create (my_mesh, session,
1357 &session->peer,
1358 GNUNET_APPLICATION_TYPE_SCALARPRODUCT,
1359 GNUNET_MESH_OPTION_RELIABLE);
1360 //prepare_service_request, channel_peer_disconnect_handler,
1361 if (!session->channel) {
1362 GNUNET_break (0);
1363 GNUNET_free (session->vector);
1364 GNUNET_free (session);
1365 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1366 return;
1367 }
1368 GNUNET_SERVER_client_set_user_context (client, session);
1369 GNUNET_CONTAINER_DLL_insert (from_client_head, from_client_tail, session);
1370
1371 session->state = CLIENT_REQUEST_RECEIVED;
1372 session->service_request_task =
1373 GNUNET_SCHEDULER_add_now (&prepare_service_request,
1374 session);
1375
1376 } 1598 }
1377 else { 1599 else {
1378 struct ServiceSession * requesting_session;
1379 enum SessionState needed_state = SERVICE_REQUEST_RECEIVED;
1380
1381 session->role = BOB; 1600 session->role = BOB;
1382 session->mask = NULL;
1383 // copy over the elements
1384 session->used = element_count;
1385 for (i = 0; i < element_count; i++)
1386 session->vector[i] = ntohl (vector[i]);
1387 session->state = CLIENT_RESPONSE_RECEIVED;
1388
1389 GNUNET_SERVER_client_set_user_context (client, session);
1390 GNUNET_CONTAINER_DLL_insert (from_client_head, from_client_tail, session);
1391
1392 //check if service queue contains a matching request
1393 requesting_session = find_matching_session (from_service_tail,
1394 &session->key,
1395 session->total,
1396 &needed_state, NULL);
1397 if (NULL != requesting_session) {
1398 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1399 _ ("Got client-responder-session with key %s and a matching service-request-session set, processing.\n"),
1400 GNUNET_h2s (&session->key));
1401 if (GNUNET_OK != compute_service_response (requesting_session, session))
1402 session->client_notification_task =
1403 GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1404 session);
1405
1406 }
1407 else {
1408 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1409 _ ("Got client-responder-session with key %s but NO matching service-request-session set, queuing element for later use.\n"),
1410 GNUNET_h2s (&session->key));
1411 // no matching session exists yet, store the response
1412 // for later processing by handle_service_request()
1413 }
1414 } 1601 }
1602
1603 GNUNET_CONTAINER_DLL_insert (from_client_head, from_client_tail, session);
1604 GNUNET_SERVER_client_set_user_context (client, session);
1415 GNUNET_SERVER_receive_done (client, GNUNET_YES); 1605 GNUNET_SERVER_receive_done (client, GNUNET_YES);
1606
1607 if (session->total != session->transferred_element_count)
1608 // multipart msg
1609 return;
1610
1611 if (ALICE == session->role)
1612 client_request_complete_alice (session);
1613 else
1614 client_request_complete_bob (session);
1416} 1615}
1417 1616
1418 1617
@@ -1428,7 +1627,7 @@ handle_client_request (void *cls,
1428 * @return session associated with the channel 1627 * @return session associated with the channel
1429 */ 1628 */
1430static void * 1629static void *
1431channel_incoming_handler (void *cls, 1630cb_channel_incoming (void *cls,
1432 struct GNUNET_MESH_Channel *channel, 1631 struct GNUNET_MESH_Channel *channel,
1433 const struct GNUNET_PeerIdentity *initiator, 1632 const struct GNUNET_PeerIdentity *initiator,
1434 uint32_t port, enum GNUNET_MESH_ChannelOption options) 1633 uint32_t port, enum GNUNET_MESH_ChannelOption options)
@@ -1442,7 +1641,6 @@ channel_incoming_handler (void *cls,
1442 c->peer = *initiator; 1641 c->peer = *initiator;
1443 c->channel = channel; 1642 c->channel = channel;
1444 c->role = BOB; 1643 c->role = BOB;
1445 c->state = WAITING_FOR_SERVICE_REQUEST;
1446 return c; 1644 return c;
1447} 1645}
1448 1646
@@ -1459,7 +1657,7 @@ channel_incoming_handler (void *cls,
1459 * with the channel is stored 1657 * with the channel is stored
1460 */ 1658 */
1461static void 1659static void
1462channel_destruction_handler (void *cls, 1660cb_channel_destruction (void *cls,
1463 const struct GNUNET_MESH_Channel *channel, 1661 const struct GNUNET_MESH_Channel *channel,
1464 void *channel_ctx) 1662 void *channel_ctx)
1465{ 1663{
@@ -1469,12 +1667,12 @@ channel_destruction_handler (void *cls,
1469 1667
1470 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1668 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1471 _ ("Peer disconnected, terminating session %s with peer (%s)\n"), 1669 _ ("Peer disconnected, terminating session %s with peer (%s)\n"),
1472 GNUNET_h2s (&session->key), 1670 GNUNET_h2s (&session->session_id),
1473 GNUNET_i2s (&session->peer)); 1671 GNUNET_i2s (&session->peer));
1474 if (ALICE == session->role) { 1672 if (ALICE == session->role) {
1475 // as we have only one peer connected in each session, just remove the session 1673 // as we have only one peer connected in each session, just remove the session
1476 1674
1477 if ((SERVICE_RESPONSE_RECEIVED > session->state) && (!do_shutdown)) { 1675 if ((0/*//TODO: only for complete session*/) && (!do_shutdown)) {
1478 session->channel = NULL; 1676 session->channel = NULL;
1479 // if this happened before we received the answer, we must terminate the session 1677 // if this happened before we received the answer, we must terminate the session
1480 session->client_notification_task = 1678 session->client_notification_task =
@@ -1494,16 +1692,14 @@ channel_destruction_handler (void *cls,
1494 // there is a client waiting for this service session, terminate it, too! 1692 // there is a client waiting for this service session, terminate it, too!
1495 // i assume the tupel of key and element count is unique. if it was not the rest of the code would not work either. 1693 // i assume the tupel of key and element count is unique. if it was not the rest of the code would not work either.
1496 client_session = find_matching_session (from_client_tail, 1694 client_session = find_matching_session (from_client_tail,
1497 &session->key, 1695 &session->session_id,
1498 session->total, 1696 session->total, NULL);
1499 NULL, NULL);
1500 free_session_variables (session); 1697 free_session_variables (session);
1501 GNUNET_free (session); 1698 GNUNET_free (session);
1502 1699
1503 // the client has to check if it was waiting for a result 1700 // the client has to check if it was waiting for a result
1504 // or if it was a responder, no point in adding more statefulness 1701 // or if it was a responder, no point in adding more statefulness
1505 if (client_session && (!do_shutdown)) { 1702 if (client_session && (!do_shutdown)) {
1506 client_session->state = FINALIZED;
1507 client_session->client_notification_task = 1703 client_session->client_notification_task =
1508 GNUNET_SCHEDULER_add_now (&prepare_client_end_notification, 1704 GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1509 client_session); 1705 client_session);
@@ -1528,13 +1724,13 @@ compute_scalar_product (struct ServiceSession * session)
1528 gcry_mpi_t p; 1724 gcry_mpi_t p;
1529 gcry_mpi_t p_prime; 1725 gcry_mpi_t p_prime;
1530 gcry_mpi_t tmp; 1726 gcry_mpi_t tmp;
1531 gcry_mpi_t r[session->used]; 1727 gcry_mpi_t r[session->used_elements_count];
1532 gcry_mpi_t r_prime[session->used]; 1728 gcry_mpi_t r_prime[session->used_elements_count];
1533 gcry_mpi_t s; 1729 gcry_mpi_t s;
1534 gcry_mpi_t s_prime; 1730 gcry_mpi_t s_prime;
1535 unsigned int i; 1731 unsigned int i;
1536 1732
1537 count = session->used; 1733 count = session->used_elements_count;
1538 // due to the introduced static offset S, we now also have to remove this 1734 // due to the introduced static offset S, we now also have to remove this
1539 // from the E(a_pi)(+)E(-b_pi-r_pi) and E(a_qi)(+)E(-r_qi) twice each, 1735 // from the E(a_pi)(+)E(-b_pi-r_pi) and E(a_qi)(+)E(-r_qi) twice each,
1540 // the result is E((S + a_pi) + (S -b_pi-r_pi)) and E(S + a_qi + S - r_qi) 1736 // the result is E((S + a_pi) + (S -b_pi-r_pi)) and E(S + a_qi + S - r_qi)
@@ -1550,7 +1746,7 @@ compute_scalar_product (struct ServiceSession * session)
1550 } 1746 }
1551 1747
1552 // calculate t = sum(ai) 1748 // calculate t = sum(ai)
1553 t = compute_square_sum (session->a, count); 1749 t = compute_square_sum (session->sorted_elements, count);
1554 1750
1555 // calculate U 1751 // calculate U
1556 u = gcry_mpi_new (0); 1752 u = gcry_mpi_new (0);
@@ -1570,9 +1766,9 @@ compute_scalar_product (struct ServiceSession * session)
1570 1766
1571 // compute P 1767 // compute P
1572 GNUNET_CRYPTO_paillier_decrypt (&my_privkey, &my_pubkey, 1768 GNUNET_CRYPTO_paillier_decrypt (&my_privkey, &my_pubkey,
1573 session->s, s); 1769 session->s, s);
1574 GNUNET_CRYPTO_paillier_decrypt (&my_privkey, &my_pubkey, 1770 GNUNET_CRYPTO_paillier_decrypt (&my_privkey, &my_pubkey,
1575 session->s_prime, s_prime); 1771 session->s_prime, s_prime);
1576 1772
1577 // compute P 1773 // compute P
1578 gcry_mpi_add (p, s, t); 1774 gcry_mpi_add (p, s, t);
@@ -1595,13 +1791,13 @@ compute_scalar_product (struct ServiceSession * session)
1595 gcry_mpi_div (p, NULL, p, tmp, 0); 1791 gcry_mpi_div (p, NULL, p, tmp, 0);
1596 1792
1597 gcry_mpi_release (tmp); 1793 gcry_mpi_release (tmp);
1598 for (i = 0; i < count; i++){ 1794 for (i = 0; i < count; i++) {
1599 gcry_mpi_release (session->a[i]); 1795 gcry_mpi_release (session->sorted_elements[i]);
1600 gcry_mpi_release (r[i]); 1796 gcry_mpi_release (r[i]);
1601 gcry_mpi_release (r_prime[i]); 1797 gcry_mpi_release (r_prime[i]);
1602 } 1798 }
1603 GNUNET_free (session->a); 1799 GNUNET_free (session->a_head);
1604 session->a = NULL; 1800 session->a_head = NULL;
1605 GNUNET_free (session->s); 1801 GNUNET_free (session->s);
1606 session->s = NULL; 1802 session->s = NULL;
1607 GNUNET_free (session->s_prime); 1803 GNUNET_free (session->s_prime);
@@ -1626,64 +1822,49 @@ compute_scalar_product (struct ServiceSession * session)
1626 * #GNUNET_SYSERR to close it (signal serious error) 1822 * #GNUNET_SYSERR to close it (signal serious error)
1627 */ 1823 */
1628static int 1824static int
1629handle_service_request_multipart (void *cls, 1825handle_alices_cyrptodata_message_multipart (void *cls,
1630 struct GNUNET_MESH_Channel * channel, 1826 struct GNUNET_MESH_Channel * channel,
1631 void **channel_ctx, 1827 void **channel_ctx,
1632 const struct GNUNET_MessageHeader * message) 1828 const struct GNUNET_MessageHeader * message)
1633{ 1829{
1634 struct ServiceSession * session; 1830 struct ServiceSession * session;
1635 const struct GNUNET_SCALARPRODUCT_multipart_message * msg = (const struct GNUNET_SCALARPRODUCT_multipart_message *) message; 1831 const struct GNUNET_SCALARPRODUCT_multipart_message * msg = (const struct GNUNET_SCALARPRODUCT_multipart_message *) message;
1636 struct GNUNET_CRYPTO_PaillierCiphertext *payload; 1832 struct GNUNET_CRYPTO_PaillierCiphertext *payload;
1637 uint32_t used_elements; 1833 uint32_t contained_elements;
1638 uint32_t contained_elements = 0;
1639 uint32_t msg_length; 1834 uint32_t msg_length;
1640 1835
1641 // are we in the correct state? 1836 // are we in the correct state?
1642 session = (struct ServiceSession *) * channel_ctx; 1837 session = (struct ServiceSession *) * channel_ctx;
1643 if ((BOB != session->role) || (WAITING_FOR_MULTIPART_TRANSMISSION != session->state)) { 1838 //we are not bob
1839 if ((NULL == session->e_a) || //or we did not expect this message yet
1840 (session->used_elements_count == session->transferred_element_count)) { //we not expecting multipart messages
1644 goto except; 1841 goto except;
1645 } 1842 }
1646 // shorter than minimum? 1843 // shorter than minimum?
1647 if (ntohs (msg->header.size) <= sizeof (struct GNUNET_SCALARPRODUCT_multipart_message)) { 1844 if (ntohs (msg->header.size) <= sizeof (struct GNUNET_SCALARPRODUCT_multipart_message)) {
1648 goto except; 1845 goto except;
1649 } 1846 }
1650 used_elements = session->used; 1847 contained_elements = ntohl (msg->contained_element_count);
1651 contained_elements = ntohl (msg->multipart_element_count);
1652 msg_length = sizeof (struct GNUNET_SCALARPRODUCT_multipart_message) 1848 msg_length = sizeof (struct GNUNET_SCALARPRODUCT_multipart_message)
1653 + contained_elements * sizeof(struct GNUNET_CRYPTO_PaillierCiphertext); 1849 +contained_elements * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
1654 //sanity check 1850 //sanity check
1655 if ((ntohs (msg->header.size) != msg_length) 1851 if ((ntohs (msg->header.size) != msg_length)
1656 || (used_elements < contained_elements + session->transferred)) { 1852 || (session->used_elements_count < contained_elements + session->transferred_element_count)
1853 || (0 == contained_elements)) {
1657 goto except; 1854 goto except;
1658 } 1855 }
1659 payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1]; 1856 payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
1660 if (contained_elements != 0) { 1857 // Convert each vector element to MPI_value
1661 // Convert each vector element to MPI_value 1858 memcpy (&session->e_a[session->transferred_element_count], payload,
1662 memcpy(&session->e_a[session->transferred], payload, 1859 sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * contained_elements);
1663 sizeof(struct GNUNET_CRYPTO_PaillierCiphertext) * contained_elements); 1860
1664 1861 session->transferred_element_count += contained_elements;
1665 session->transferred += contained_elements; 1862
1666 1863 if (contained_elements == session->used_elements_count) {
1667 if (session->transferred == used_elements) { 1864 // single part finished
1668 // single part finished 1865 if (NULL == session->intersection_op)
1669 session->state = SERVICE_REQUEST_RECEIVED; 1866 // intersection has already finished, so we can proceed
1670 if (session->response) { 1867 compute_service_response (session);
1671 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1672 _ ("Got session with key %s and a matching element set, processing.\n"),
1673 GNUNET_h2s (&session->key));
1674 if (GNUNET_OK != compute_service_response (session, session->response)) {
1675 //something went wrong, remove it again...
1676 goto except;
1677 }
1678 }
1679 else
1680 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1681 _ ("Got session with key %s without a matching element set, queueing.\n"),
1682 GNUNET_h2s (&session->key));
1683 }
1684 else {
1685 // multipart message
1686 }
1687 } 1868 }
1688 1869
1689 return GNUNET_OK; 1870 return GNUNET_OK;
@@ -1712,24 +1893,98 @@ except:
1712 * #GNUNET_SYSERR to close it (signal serious error) 1893 * #GNUNET_SYSERR to close it (signal serious error)
1713 */ 1894 */
1714static int 1895static int
1715handle_service_request (void *cls, 1896handle_alices_cyrptodata_message (void *cls,
1897 struct GNUNET_MESH_Channel * channel,
1898 void **channel_ctx,
1899 const struct GNUNET_MessageHeader * message)
1900{
1901 struct ServiceSession * session;
1902 const struct GNUNET_SCALARPRODUCT_alices_cryptodata_message * msg = (const struct GNUNET_SCALARPRODUCT_alices_cryptodata_message *) message;
1903 struct GNUNET_CRYPTO_PaillierCiphertext *payload;
1904 uint32_t contained_elements = 0;
1905 uint32_t msg_length;
1906
1907 session = (struct ServiceSession *) * channel_ctx;
1908 //we are not bob
1909 if ((BOB != session->role)
1910 //we are expecting multipart messages instead
1911 || (NULL != session->e_a)
1912 //or we did not expect this message yet
1913 || //intersection OP has not yet finished
1914 !((NULL != session->intersection_op)
1915 //intersection OP done
1916 || (session->response->sorted_elements)
1917 )) {
1918 goto invalid_msg;
1919 }
1920
1921 // shorter than minimum?
1922 if (ntohs (msg->header.size) <= sizeof (struct GNUNET_SCALARPRODUCT_multipart_message)) {
1923 goto invalid_msg;
1924 }
1925
1926 contained_elements = ntohl (msg->contained_element_count);
1927 msg_length = sizeof (struct GNUNET_SCALARPRODUCT_alices_cryptodata_message)
1928 +contained_elements * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
1929
1930 //sanity check: is the message as long as the message_count fields suggests?
1931 if ((ntohs (msg->header.size) != msg_length) ||
1932 (session->used_elements_count < session->transferred_element_count + contained_elements) ||
1933 (0 == contained_elements)) {
1934 goto invalid_msg;
1935 }
1936
1937 session->transferred_element_count = contained_elements;
1938 payload = (struct GNUNET_CRYPTO_PaillierCiphertext*) &msg[1];
1939
1940 session->e_a = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * session->used_elements_count);
1941 memcpy (&session->e_a[0], payload, contained_elements * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
1942 if (contained_elements == session->used_elements_count) {
1943 // single part finished
1944 if (NULL == session->intersection_op)
1945 // intersection has already finished, so we can proceed
1946 compute_service_response (session);
1947 }
1948 return GNUNET_OK;
1949invalid_msg:
1950 GNUNET_break_op (0);
1951 if ((NULL != session->next) || (NULL != session->prev) || (from_service_head == session))
1952 GNUNET_CONTAINER_DLL_remove (from_service_head, from_service_tail, session);
1953 // and notify our client-session that we could not complete the session
1954 if (session->response)
1955 // we just found the responder session in this queue
1956 session->response->client_notification_task =
1957 GNUNET_SCHEDULER_add_now (&prepare_client_end_notification,
1958 session->response);
1959 free_session_variables (session);
1960 return GNUNET_SYSERR;
1961}
1962
1963
1964/**
1965 * Handle a request from another service to calculate a scalarproduct with us.
1966 *
1967 * @param cls closure (set from #GNUNET_MESH_connect)
1968 * @param channel connection to the other end
1969 * @param channel_ctx place to store local state associated with the channel
1970 * @param message the actual message
1971 * @return #GNUNET_OK to keep the connection open,
1972 * #GNUNET_SYSERR to close it (signal serious error)
1973 */
1974static int
1975handle_alices_computation_request (void *cls,
1716 struct GNUNET_MESH_Channel * channel, 1976 struct GNUNET_MESH_Channel * channel,
1717 void **channel_ctx, 1977 void **channel_ctx,
1718 const struct GNUNET_MessageHeader * message) 1978 const struct GNUNET_MessageHeader * message)
1719{ 1979{
1720 struct ServiceSession * session; 1980 struct ServiceSession * session;
1981 struct ServiceSession * client_session;
1721 const struct GNUNET_SCALARPRODUCT_service_request * msg = (const struct GNUNET_SCALARPRODUCT_service_request *) message; 1982 const struct GNUNET_SCALARPRODUCT_service_request * msg = (const struct GNUNET_SCALARPRODUCT_service_request *) message;
1722 uint32_t mask_length; 1983 uint32_t total_elements;
1723 struct GNUNET_CRYPTO_PaillierCiphertext *payload;
1724 uint32_t used_elements;
1725 uint32_t contained_elements = 0;
1726 uint32_t element_count;
1727 uint32_t msg_length;
1728 unsigned char * current;
1729 enum SessionState needed_state;
1730 1984
1731 session = (struct ServiceSession *) * channel_ctx; 1985 session = (struct ServiceSession *) * channel_ctx;
1732 if (WAITING_FOR_SERVICE_REQUEST != session->state) { 1986 if (session->total != 0) {
1987 // must be a fresh session
1733 goto invalid_msg; 1988 goto invalid_msg;
1734 } 1989 }
1735 // Check if message was sent by me, which would be bad! 1990 // Check if message was sent by me, which would be bad!
@@ -1739,91 +1994,71 @@ handle_service_request (void *cls,
1739 return GNUNET_SYSERR; 1994 return GNUNET_SYSERR;
1740 } 1995 }
1741 // shorter than expected? 1996 // shorter than expected?
1742 if (ntohs (msg->header.size) < sizeof (struct GNUNET_SCALARPRODUCT_service_request)) { 1997 if (ntohs (msg->header.size) != sizeof (struct GNUNET_SCALARPRODUCT_service_request)) {
1743 GNUNET_free (session); 1998 GNUNET_free (session);
1744 GNUNET_break_op (0); 1999 GNUNET_break_op (0);
1745 return GNUNET_SYSERR; 2000 return GNUNET_SYSERR;
1746 } 2001 }
1747 mask_length = ntohl (msg->mask_length); 2002 total_elements = ntohl (msg->total_element_count);
1748 used_elements = ntohl (msg->total_element_count);
1749 contained_elements = ntohl (msg->contained_element_count);
1750 element_count = ntohl (msg->element_count);
1751 msg_length = sizeof (struct GNUNET_SCALARPRODUCT_service_request)
1752 + mask_length + sizeof(struct GNUNET_CRYPTO_PaillierPublicKey)
1753 + contained_elements * sizeof(struct GNUNET_CRYPTO_PaillierCiphertext);
1754 2003
1755 //sanity check: is the message as long as the message_count fields suggests? 2004 //sanity check: is the message as long as the message_count fields suggests?
1756 if ((ntohs (msg->header.size) != msg_length) || 2005 if (1 > total_elements) {
1757 (element_count < used_elements) ||
1758 (used_elements < contained_elements) ||
1759 (0 == used_elements) ||
1760 (mask_length != (element_count / 8 + ((element_count % 8) ? 1 : 0)))) {
1761 GNUNET_free (session); 2006 GNUNET_free (session);
1762 GNUNET_break_op (0); 2007 GNUNET_break_op (0);
1763 return GNUNET_SYSERR; 2008 return GNUNET_SYSERR;
1764 } 2009 }
1765 if (find_matching_session (from_service_tail, 2010 if (find_matching_session (from_service_tail,
1766 &msg->key, 2011 &msg->session_id,
1767 element_count, 2012 total_elements,
1768 NULL,
1769 NULL)) { 2013 NULL)) {
1770 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 2014 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1771 _ ("Got message with duplicate session key (`%s'), ignoring service request.\n"), 2015 _ ("Got message with duplicate session key (`%s'), ignoring service request.\n"),
1772 (const char *) &(msg->key)); 2016 (const char *) &(msg->session_id));
1773 GNUNET_free (session); 2017 GNUNET_free (session);
1774 return GNUNET_SYSERR; 2018 return GNUNET_SYSERR;
1775 } 2019 }
1776 2020
1777 session->total = element_count; 2021 session->total = total_elements;
1778 session->used = used_elements;
1779 session->transferred = contained_elements;
1780 session->channel = channel; 2022 session->channel = channel;
1781 2023
1782 // session key 2024 // session key
1783 memcpy (&session->key, &msg->key, sizeof (struct GNUNET_HashCode)); 2025 memcpy (&session->session_id, &msg->session_id, sizeof (struct GNUNET_HashCode));
1784 current = (unsigned char *) &msg[1]; 2026
1785 //preserve the mask, we will need that later on 2027 // public key
1786 session->mask = GNUNET_malloc (mask_length); 2028 memcpy (&session->remote_pubkey, &msg->public_key, sizeof (struct GNUNET_CRYPTO_PaillierPublicKey));
1787 memcpy (session->mask, current, mask_length); 2029
1788 //the public key
1789 current += mask_length;
1790
1791 //convert the publickey to sexp
1792 session->remote_pubkey = GNUNET_malloc(sizeof(struct GNUNET_CRYPTO_PaillierPublicKey));
1793 memcpy(session->remote_pubkey, current, sizeof(struct GNUNET_CRYPTO_PaillierPublicKey));
1794 current += sizeof(struct GNUNET_CRYPTO_PaillierPublicKey);
1795
1796 payload = (struct GNUNET_CRYPTO_PaillierCiphertext*) current;
1797 //check if service queue contains a matching request 2030 //check if service queue contains a matching request
1798 needed_state = CLIENT_RESPONSE_RECEIVED; 2031 client_session = find_matching_session (from_client_tail,
1799 session->response = find_matching_session (from_client_tail, 2032 &session->session_id,
1800 &session->key, 2033 session->total, NULL);
1801 session->total, 2034
1802 &needed_state, NULL);
1803 session->state = WAITING_FOR_MULTIPART_TRANSMISSION;
1804 GNUNET_CONTAINER_DLL_insert (from_service_head, from_service_tail, session); 2035 GNUNET_CONTAINER_DLL_insert (from_service_head, from_service_tail, session);
1805 2036
1806 session->e_a = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * used_elements); 2037 if ((NULL != client_session)
1807 if (contained_elements != 0) { 2038 && (client_session->transferred_element_count == client_session->total)) {
1808 // Convert each vector element to MPI_value 2039
1809 memcpy(session->e_a, payload, sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * used_elements); 2040 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Got session with key %s and a matching element set, processing.\n"), GNUNET_h2s (&session->session_id));
1810 if (contained_elements == used_elements) { 2041
1811 // single part finished 2042 session->response = client_session;
1812 session->state = SERVICE_REQUEST_RECEIVED; 2043 session->intersected_elements = client_session->intersected_elements;
1813 if (session->response) { 2044 client_session->intersected_elements = NULL;
1814 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Got session with key %s and a matching element set, processing.\n"), GNUNET_h2s (&session->key)); 2045 session->intersection_set = client_session->intersection_set;
1815 if (GNUNET_OK != compute_service_response (session, session->response)) { 2046 client_session->intersection_set = NULL;
1816 //something went wrong, remove it again... 2047
1817 goto invalid_msg; 2048 session->intersection_op = GNUNET_SET_prepare (&session->peer,
1818 } 2049 &session->session_id,
1819 } 2050 NULL,
1820 else 2051 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT16_MAX),
1821 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Got session with key %s without a matching element set, queueing.\n"), GNUNET_h2s (&session->key)); 2052 GNUNET_SET_RESULT_REMOVED,
1822 } 2053 cb_intersection_element_removed,
1823 else { 2054 session);
1824 // multipart message 2055
1825 } 2056 GNUNET_SET_commit (session->intersection_op, session->intersection_set);
1826 } 2057 }
2058 else {
2059 GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Got session with key %s without a matching element set, queueing.\n"), GNUNET_h2s (&session->session_id));
2060 }
2061
1827 return GNUNET_OK; 2062 return GNUNET_OK;
1828invalid_msg: 2063invalid_msg:
1829 GNUNET_break_op (0); 2064 GNUNET_break_op (0);
@@ -1851,7 +2086,7 @@ invalid_msg:
1851 * #GNUNET_SYSERR to close it (signal serious error) 2086 * #GNUNET_SYSERR to close it (signal serious error)
1852 */ 2087 */
1853static int 2088static int
1854handle_service_response_multipart (void *cls, 2089handle_bobs_cryptodata_multipart (void *cls,
1855 struct GNUNET_MESH_Channel * channel, 2090 struct GNUNET_MESH_Channel * channel,
1856 void **channel_ctx, 2091 void **channel_ctx,
1857 const struct GNUNET_MessageHeader * message) 2092 const struct GNUNET_MessageHeader * message)
@@ -1867,33 +2102,32 @@ handle_service_response_multipart (void *cls,
1867 GNUNET_assert (NULL != message); 2102 GNUNET_assert (NULL != message);
1868 // are we in the correct state? 2103 // are we in the correct state?
1869 session = (struct ServiceSession *) * channel_ctx; 2104 session = (struct ServiceSession *) * channel_ctx;
1870 if ((ALICE != session->role) || (WAITING_FOR_MULTIPART_TRANSMISSION != session->state)) { 2105 if ((ALICE != session->role) || (NULL == session->sorted_elements)) {
1871 goto invalid_msg; 2106 goto invalid_msg;
1872 } 2107 }
1873 msg_size = ntohs (msg->header.size); 2108 msg_size = ntohs (msg->header.size);
1874 required_size = sizeof (struct GNUNET_SCALARPRODUCT_multipart_message) 2109 required_size = sizeof (struct GNUNET_SCALARPRODUCT_multipart_message)
1875 + 2 * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext); 2110 + 2 * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
1876 // shorter than minimum? 2111 // shorter than minimum?
1877 if (required_size > msg_size) { 2112 if (required_size > msg_size) {
1878 goto invalid_msg; 2113 goto invalid_msg;
1879 } 2114 }
1880 contained = ntohl (msg->multipart_element_count); 2115 contained = ntohl (msg->contained_element_count);
1881 required_size = sizeof (struct GNUNET_SCALARPRODUCT_multipart_message) 2116 required_size = sizeof (struct GNUNET_SCALARPRODUCT_multipart_message)
1882 + 2 * contained * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext); 2117 + 2 * contained * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
1883 //sanity check: is the message as long as the message_count fields suggests? 2118 //sanity check: is the message as long as the message_count fields suggests?
1884 if ((required_size != msg_size) || (session->used < session->transferred + contained)) { 2119 if ((required_size != msg_size) || (session->used_elements_count < session->transferred_element_count + contained)) {
1885 goto invalid_msg; 2120 goto invalid_msg;
1886 } 2121 }
1887 payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1]; 2122 payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
1888 // Convert each k[][perm] to its MPI_value 2123 // Convert each k[][perm] to its MPI_value
1889 for (i = 0; i < contained; i++) { 2124 for (i = 0; i < contained; i++) {
1890 memcpy(&session->r[session->transferred+i], &payload[2*i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)); 2125 memcpy (&session->r[session->transferred_element_count + i], &payload[2 * i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
1891 memcpy(&session->r_prime[session->transferred+i], &payload[2*i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)); 2126 memcpy (&session->r_prime[session->transferred_element_count + i], &payload[2 * i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
1892 } 2127 }
1893 session->transferred += contained; 2128 session->transferred_element_count += contained;
1894 if (session->transferred != session->used) 2129 if (session->transferred_element_count != session->used_elements_count)
1895 return GNUNET_OK; 2130 return GNUNET_OK;
1896 session->state = SERVICE_RESPONSE_RECEIVED;
1897 session->product = compute_scalar_product (session); //never NULL 2131 session->product = compute_scalar_product (session); //never NULL
1898 2132
1899invalid_msg: 2133invalid_msg:
@@ -1901,7 +2135,6 @@ invalid_msg:
1901 2135
1902 // send message with product to client 2136 // send message with product to client
1903 if (ALICE == session->role) { 2137 if (ALICE == session->role) {
1904 session->state = FINALIZED;
1905 session->channel = NULL; 2138 session->channel = NULL;
1906 session->client_notification_task = 2139 session->client_notification_task =
1907 GNUNET_SCHEDULER_add_now (&prepare_client_response, 2140 GNUNET_SCHEDULER_add_now (&prepare_client_response,
@@ -1925,7 +2158,7 @@ invalid_msg:
1925 * #GNUNET_SYSERR to close it (we are done) 2158 * #GNUNET_SYSERR to close it (we are done)
1926 */ 2159 */
1927static int 2160static int
1928handle_service_response (void *cls, 2161handle_bobs_cryptodata_message (void *cls,
1929 struct GNUNET_MESH_Channel * channel, 2162 struct GNUNET_MESH_Channel * channel,
1930 void **channel_ctx, 2163 void **channel_ctx,
1931 const struct GNUNET_MessageHeader * message) 2164 const struct GNUNET_MessageHeader * message)
@@ -1941,7 +2174,7 @@ handle_service_response (void *cls,
1941 GNUNET_assert (NULL != message); 2174 GNUNET_assert (NULL != message);
1942 session = (struct ServiceSession *) * channel_ctx; 2175 session = (struct ServiceSession *) * channel_ctx;
1943 // are we in the correct state? 2176 // are we in the correct state?
1944 if (WAITING_FOR_SERVICE_RESPONSE != session->state) { 2177 if (0 /*//TODO: correct state*/) {
1945 goto invalid_msg; 2178 goto invalid_msg;
1946 } 2179 }
1947 //we need at least a full message without elements attached 2180 //we need at least a full message without elements attached
@@ -1956,38 +2189,34 @@ handle_service_response (void *cls,
1956 + 2 * contained * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) 2189 + 2 * contained * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)
1957 + 2 * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext); 2190 + 2 * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext);
1958 //sanity check: is the message as long as the message_count fields suggests? 2191 //sanity check: is the message as long as the message_count fields suggests?
1959 if ((msg_size != required_size) || (session->used < contained)) { 2192 if ((msg_size != required_size) || (session->used_elements_count < contained)) {
1960 goto invalid_msg; 2193 goto invalid_msg;
1961 } 2194 }
1962 session->state = WAITING_FOR_MULTIPART_TRANSMISSION; 2195 session->transferred_element_count = contained;
1963 session->transferred = contained;
1964 //convert s 2196 //convert s
1965 payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1]; 2197 payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1];
1966 2198
1967 session->s = GNUNET_malloc(sizeof(struct GNUNET_CRYPTO_PaillierCiphertext)); 2199 session->s = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
1968 session->s_prime = GNUNET_malloc(sizeof(struct GNUNET_CRYPTO_PaillierCiphertext)); 2200 session->s_prime = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
1969 memcpy(session->s,&payload[0],sizeof(struct GNUNET_CRYPTO_PaillierCiphertext)); 2201 memcpy (session->s, &payload[0], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
1970 memcpy(session->s_prime,&payload[1],sizeof(struct GNUNET_CRYPTO_PaillierCiphertext)); 2202 memcpy (session->s_prime, &payload[1], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
1971 2203
1972 session->r = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * session->used); 2204 session->r = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * session->used_elements_count);
1973 session->r_prime = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * session->used); 2205 session->r_prime = GNUNET_malloc (sizeof (struct GNUNET_CRYPTO_PaillierCiphertext) * session->used_elements_count);
1974 2206
1975 // Convert each k[][perm] to its MPI_value 2207 // Convert each k[][perm] to its MPI_value
1976 for (i = 0; i < contained; i++) { 2208 for (i = 0; i < contained; i++) {
1977 memcpy(&session->r[i], &payload[2 + 2*i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)); 2209 memcpy (&session->r[i], &payload[2 + 2 * i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
1978 memcpy(&session->r_prime[i], &payload[3 + 2*i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)); 2210 memcpy (&session->r_prime[i], &payload[3 + 2 * i], sizeof (struct GNUNET_CRYPTO_PaillierCiphertext));
1979 } 2211 }
1980 if (session->transferred != session->used) 2212 if (session->transferred_element_count != session->used_elements_count)
1981 return GNUNET_OK; //wait for the other multipart chunks 2213 return GNUNET_OK; //wait for the other multipart chunks
1982
1983 session->state = SERVICE_RESPONSE_RECEIVED;
1984 session->product = compute_scalar_product (session); //never NULL 2214 session->product = compute_scalar_product (session); //never NULL
1985 2215
1986invalid_msg: 2216invalid_msg:
1987 GNUNET_break_op (NULL != session->product); 2217 GNUNET_break_op (NULL != session->product);
1988 // send message with product to client 2218 // send message with product to client
1989 if (ALICE == session->role) { 2219 if (ALICE == session->role) {
1990 session->state = FINALIZED;
1991 session->channel = NULL; 2220 session->channel = NULL;
1992 session->client_notification_task = 2221 session->client_notification_task =
1993 GNUNET_SCHEDULER_add_now (&prepare_client_response, 2222 GNUNET_SCHEDULER_add_now (&prepare_client_response,
@@ -2017,7 +2246,7 @@ shutdown_task (void *cls,
2017 2246
2018 // terminate all owned open channels. 2247 // terminate all owned open channels.
2019 for (session = from_client_head; NULL != session; session = session->next) { 2248 for (session = from_client_head; NULL != session; session = session->next) {
2020 if ((FINALIZED != session->state) && (NULL != session->channel)) { 2249 if ((0/*//TODO: not finalized*/) && (NULL != session->channel)) {
2021 GNUNET_MESH_channel_destroy (session->channel); 2250 GNUNET_MESH_channel_destroy (session->channel);
2022 session->channel = NULL; 2251 session->channel = NULL;
2023 } 2252 }
@@ -2025,10 +2254,6 @@ shutdown_task (void *cls,
2025 GNUNET_SCHEDULER_cancel (session->client_notification_task); 2254 GNUNET_SCHEDULER_cancel (session->client_notification_task);
2026 session->client_notification_task = GNUNET_SCHEDULER_NO_TASK; 2255 session->client_notification_task = GNUNET_SCHEDULER_NO_TASK;
2027 } 2256 }
2028 if (GNUNET_SCHEDULER_NO_TASK != session->service_request_task) {
2029 GNUNET_SCHEDULER_cancel (session->service_request_task);
2030 session->service_request_task = GNUNET_SCHEDULER_NO_TASK;
2031 }
2032 if (NULL != session->client) { 2257 if (NULL != session->client) {
2033 GNUNET_SERVER_client_disconnect (session->client); 2258 GNUNET_SERVER_client_disconnect (session->client);
2034 session->client = NULL; 2259 session->client = NULL;
@@ -2060,15 +2285,17 @@ run (void *cls,
2060 const struct GNUNET_CONFIGURATION_Handle *c) 2285 const struct GNUNET_CONFIGURATION_Handle *c)
2061{ 2286{
2062 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { 2287 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
2063 {&handle_client_request, NULL, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE, 0}, 2288 {&handle_client_message, NULL, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE, 0},
2064 {&handle_client_request, NULL, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB, 0}, 2289 {&handle_client_message, NULL, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB, 0},
2290 {&handle_client_message_multipart, NULL, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART, 0},
2065 {NULL, NULL, 0, 0} 2291 {NULL, NULL, 0, 0}
2066 }; 2292 };
2067 static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = { 2293 static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = {
2068 { &handle_service_request, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_TO_BOB, 0}, 2294 { &handle_alices_computation_request, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA, 0},
2069 { &handle_service_request_multipart, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_TO_BOB_MULTIPART, 0}, 2295 { &handle_alices_cyrptodata_message, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA, 0},
2070 { &handle_service_response, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_TO_ALICE, 0}, 2296 { &handle_alices_cyrptodata_message_multipart, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA_MULTIPART, 0},
2071 { &handle_service_response_multipart, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_TO_ALICE_MULTIPART, 0}, 2297 { &handle_bobs_cryptodata_message, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA, 0},
2298 { &handle_bobs_cryptodata_multipart, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_BOB_CRYPTODATA_MULTIPART, 0},
2072 {NULL, 0, 0} 2299 {NULL, 0, 0}
2073 }; 2300 };
2074 static const uint32_t ports[] = { 2301 static const uint32_t ports[] = {
@@ -2077,13 +2304,13 @@ run (void *cls,
2077 }; 2304 };
2078 //generate private/public key set 2305 //generate private/public key set
2079 GNUNET_CRYPTO_paillier_create (&my_pubkey, &my_privkey); 2306 GNUNET_CRYPTO_paillier_create (&my_pubkey, &my_privkey);
2080 2307
2081 // offset has to be sufficiently small to allow computation of: 2308 // offset has to be sufficiently small to allow computation of:
2082 // m1+m2 mod n == (S + a) + (S + b) mod n, 2309 // m1+m2 mod n == (S + a) + (S + b) mod n,
2083 // if we have more complex operations, this factor needs to be lowered 2310 // if we have more complex operations, this factor needs to be lowered
2084 my_offset = gcry_mpi_new (GNUNET_CRYPTO_PAILLIER_BITS / 3); 2311 my_offset = gcry_mpi_new (GNUNET_CRYPTO_PAILLIER_BITS / 3);
2085 gcry_mpi_set_bit (my_offset, GNUNET_CRYPTO_PAILLIER_BITS / 3); 2312 gcry_mpi_set_bit (my_offset, GNUNET_CRYPTO_PAILLIER_BITS / 3);
2086 2313
2087 // register server callbacks and disconnect handler 2314 // register server callbacks and disconnect handler
2088 GNUNET_SERVER_add_handlers (server, server_handlers); 2315 GNUNET_SERVER_add_handlers (server, server_handlers);
2089 GNUNET_SERVER_disconnect_notify (server, 2316 GNUNET_SERVER_disconnect_notify (server,
@@ -2093,8 +2320,8 @@ run (void *cls,
2093 GNUNET_CRYPTO_get_peer_identity (c, 2320 GNUNET_CRYPTO_get_peer_identity (c,
2094 &me)); 2321 &me));
2095 my_mesh = GNUNET_MESH_connect (c, NULL, 2322 my_mesh = GNUNET_MESH_connect (c, NULL,
2096 &channel_incoming_handler, 2323 &cb_channel_incoming,
2097 &channel_destruction_handler, 2324 &cb_channel_destruction,
2098 mesh_handlers, ports); 2325 mesh_handlers, ports);
2099 if (!my_mesh) { 2326 if (!my_mesh) {
2100 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Connect to MESH failed\n")); 2327 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Connect to MESH failed\n"));
diff --git a/src/scalarproduct/scalarproduct.h b/src/scalarproduct/scalarproduct.h
index 5682362fd..9f6036358 100644
--- a/src/scalarproduct/scalarproduct.h
+++ b/src/scalarproduct/scalarproduct.h
@@ -57,7 +57,7 @@ extern "C"
57 * Message type passed from client to service 57 * Message type passed from client to service
58 * to initiate a request or responder role 58 * to initiate a request or responder role
59 */ 59 */
60struct GNUNET_SCALARPRODUCT_client_request 60struct GNUNET_SCALARPRODUCT_computation_message
61{ 61{
62 /** 62 /**
63 * GNUNET message header 63 * GNUNET message header
@@ -67,17 +67,17 @@ struct GNUNET_SCALARPRODUCT_client_request
67 /** 67 /**
68 * how many elements the vector in payload contains 68 * how many elements the vector in payload contains
69 */ 69 */
70 uint32_t element_count GNUNET_PACKED; 70 uint32_t element_count_total GNUNET_PACKED;
71 71
72 /** 72 /**
73 * how many bytes the mask has 73 * contained elements the vector in payload contains
74 */ 74 */
75 uint32_t mask_length GNUNET_PACKED; 75 uint32_t element_count_contained GNUNET_PACKED;
76 76
77 /** 77 /**
78 * the transaction/session key used to identify a session 78 * the transaction/session key used to identify a session
79 */ 79 */
80 struct GNUNET_HashCode key; 80 struct GNUNET_HashCode session_key;
81 81
82 /** 82 /**
83 * the identity of a remote peer we want to communicate with 83 * the identity of a remote peer we want to communicate with
@@ -85,11 +85,32 @@ struct GNUNET_SCALARPRODUCT_client_request
85 struct GNUNET_PeerIdentity peer; 85 struct GNUNET_PeerIdentity peer;
86 86
87 /** 87 /**
88 * followed by long vector[element_count] | [unsigned char mask[mask_bytes]] 88 * followed by struct GNUNET_SCALARPRODUCT_Element[]
89 */ 89 */
90}; 90};
91 91
92/** 92/**
93 * multipart messages following GNUNET_SCALARPRODUCT_client_request
94 */
95struct GNUNET_SCALARPRODUCT_computation_message_multipart
96{
97 /**
98 * GNUNET message header
99 */
100 struct GNUNET_MessageHeader header;
101
102 /**
103 * contained elements the vector in payload contains
104 */
105 uint32_t element_count_contained GNUNET_PACKED;
106
107 /**
108 * followed by struct GNUNET_SCALARPRODUCT_Element[]
109 */
110};
111
112
113/**
93 * Message type passed from requesting service Alice to responding service Bob 114 * Message type passed from requesting service Alice to responding service Bob
94 * to initiate a request and make bob participate in our protocol 115 * to initiate a request and make bob participate in our protocol
95 */ 116 */
@@ -104,28 +125,36 @@ struct GNUNET_SCALARPRODUCT_service_request {
104 */ 125 */
105 uint32_t total_element_count GNUNET_PACKED; 126 uint32_t total_element_count GNUNET_PACKED;
106 127
107 /** 128 /**
108 * how many elements are actually included after the mask was applied. 129 * the transaction/session key used to identify a session
109 */ 130 */
110 uint32_t contained_element_count GNUNET_PACKED; 131 struct GNUNET_HashCode session_id;
111 132
112 /** 133 /**
113 * how many bytes the mask has 134 * Alice's public key
114 */ 135 */
115 uint32_t mask_length GNUNET_PACKED; 136 struct GNUNET_CRYPTO_PaillierPublicKey public_key;
137
138};
116 139
140
141/**
142 * Message type passed from requesting service Alice to responding service Bob
143 * to initiate a request and make bob participate in our protocol
144 */
145struct GNUNET_SCALARPRODUCT_alices_cryptodata_message {
117 /** 146 /**
118 * the transaction/session key used to identify a session 147 * GNUNET message header
119 */ 148 */
120 struct GNUNET_HashCode key; 149 struct GNUNET_MessageHeader header;
121 150
122 /** 151 /**
123 * how many elements the vector in payload contains 152 * how many elements we appended to this message
124 */ 153 */
125 uint32_t element_count GNUNET_PACKED; 154 uint32_t contained_element_count GNUNET_PACKED;
126 155
127 /** 156 /**
128 * followed by mask | public_key | vector[used_element_count] 157 * struct GNUNET_CRYPTO_PaillierCiphertext[contained_element_count]
129 */ 158 */
130}; 159};
131 160
@@ -141,9 +170,9 @@ struct GNUNET_SCALARPRODUCT_multipart_message {
141 /** 170 /**
142 * how many elements we supply within this message 171 * how many elements we supply within this message
143 */ 172 */
144 uint32_t multipart_element_count GNUNET_PACKED; 173 uint32_t contained_element_count GNUNET_PACKED;
145 174
146 // followed by vector[multipart_element_count] or k[i][perm] 175 // struct GNUNET_CRYPTO_PaillierCiphertext[multipart_element_count]
147}; 176};
148 177
149/** 178/**
diff --git a/src/scalarproduct/scalarproduct_api.c b/src/scalarproduct/scalarproduct_api.c
index 13c35a28d..297b3505e 100644
--- a/src/scalarproduct/scalarproduct_api.c
+++ b/src/scalarproduct/scalarproduct_api.c
@@ -46,21 +46,11 @@ typedef void (*GNUNET_SCALARPRODUCT_ResponseMessageHandler) (void *cls,
46 enum GNUNET_SCALARPRODUCT_ResponseStatus status); 46 enum GNUNET_SCALARPRODUCT_ResponseStatus status);
47 47
48/** 48/**
49 * Entry in the request queue per client 49 * A handle returned for each computation
50 */ 50 */
51struct GNUNET_SCALARPRODUCT_ComputationHandle 51struct GNUNET_SCALARPRODUCT_ComputationHandle
52{ 52{
53 /** 53 /**
54 * This is a linked list.
55 */
56 struct GNUNET_SCALARPRODUCT_ComputationHandle *next;
57
58 /**
59 * This is a linked list.
60 */
61 struct GNUNET_SCALARPRODUCT_ComputationHandle *prev;
62
63 /**
64 * Our configuration. 54 * Our configuration.
65 */ 55 */
66 const struct GNUNET_CONFIGURATION_Handle *cfg; 56 const struct GNUNET_CONFIGURATION_Handle *cfg;
@@ -86,27 +76,37 @@ struct GNUNET_SCALARPRODUCT_ComputationHandle
86 struct GNUNET_CLIENT_TransmitHandle *th; 76 struct GNUNET_CLIENT_TransmitHandle *th;
87 77
88 /** 78 /**
89 * Size of the message 79 * count of all elements we offer for computation
90 */ 80 */
91 uint16_t message_size; 81 uint32_t element_count_total;
92 82
93 /** 83 /**
84 * count of the transfered elements we offer for computation
85 */
86 uint32_t element_count_transfered;
87
88 /**
89 * the client's elements which
90 */
91 struct GNUNET_SCALARPRODUCT_Element * elements;
92
93 /**
94 * Message to be sent to the scalarproduct service 94 * Message to be sent to the scalarproduct service
95 */ 95 */
96 struct GNUNET_SCALARPRODUCT_client_request * msg; 96 void * msg;
97 97
98 /** 98 /**
99 * The msg handler callback 99 * The client's msg handler callback
100 */ 100 */
101 union 101 union
102 { 102 {
103 /** 103 /**
104 * Function to call after transmission of the request. 104 * Function to call after transmission of the request (Bob).
105 */ 105 */
106 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status; 106 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
107 107
108 /** 108 /**
109 * Function to call after transmission of the request. 109 * Function to call after transmission of the request (Alice).
110 */ 110 */
111 GNUNET_SCALARPRODUCT_DatumProcessor cont_datum; 111 GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
112 }; 112 };
@@ -117,31 +117,24 @@ struct GNUNET_SCALARPRODUCT_ComputationHandle
117 void *cont_cls; 117 void *cont_cls;
118 118
119 /** 119 /**
120 * Response Processor for response from the service. This function calls the 120 * API internal callback for results and failures to be forwarded to the client
121 * continuation function provided by the client.
122 */ 121 */
123 GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc; 122 GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
123
124 /**
125 *
126 */
127 GNUNET_SCHEDULER_TaskIdentifier cont_multipart;
124}; 128};
125 129
126/************************************************************** 130/**************************************************************
127 *** Global Variables ********** 131 *** Forward Function Declarations **********
128 **************************************************************/
129/**
130 * Head of the active sessions queue
131 */
132static struct GNUNET_SCALARPRODUCT_ComputationHandle *head;
133/**
134 * Tail of the active sessions queue
135 */
136static struct GNUNET_SCALARPRODUCT_ComputationHandle *tail;
137
138/**************************************************************
139 *** Function Declarations **********
140 **************************************************************/ 132 **************************************************************/
141 133
142void 134void
143GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h); 135GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h);
144 136
137static size_t do_send_message (void *cls, size_t size, void *buf);
145/************************************************************** 138/**************************************************************
146 *** Static Function Declarations ********** 139 *** Static Function Declarations **********
147 **************************************************************/ 140 **************************************************************/
@@ -225,7 +218,7 @@ process_result_message (void *cls,
225static void 218static void
226receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) 219receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
227{ 220{
228 struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; 221 struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
229 const struct GNUNET_SCALARPRODUCT_client_response *message = 222 const struct GNUNET_SCALARPRODUCT_client_response *message =
230 (const struct GNUNET_SCALARPRODUCT_client_response *) msg; 223 (const struct GNUNET_SCALARPRODUCT_client_response *) msg;
231 enum GNUNET_SCALARPRODUCT_ResponseStatus status = GNUNET_SCALARPRODUCT_Status_InvalidResponse; 224 enum GNUNET_SCALARPRODUCT_ResponseStatus status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
@@ -235,28 +228,76 @@ receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
235 LOG (GNUNET_ERROR_TYPE_WARNING, "Disconnected by Service.\n"); 228 LOG (GNUNET_ERROR_TYPE_WARNING, "Disconnected by Service.\n");
236 status = GNUNET_SCALARPRODUCT_Status_ServiceDisconnected; 229 status = GNUNET_SCALARPRODUCT_Status_ServiceDisconnected;
237 } 230 }
238 else if (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT != ntohs (msg->type)) 231 else if (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT != ntohs (msg->type))
239 { 232 {
240 LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid message type received\n"); 233 LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid message type received\n");
241 } 234 }
242 else if (0 < ntohl (message->product_length) || (0 == message->range)) 235 else if (0 < ntohl (message->product_length) || (0 == message->range))
243 { 236 {
244 // response for the responder client, successful 237 // response for the responder client, successful
245 GNUNET_STATISTICS_update (qe->stats, 238 GNUNET_STATISTICS_update (h->stats,
246 gettext_noop ("# SUC responder result messages received"), 1, 239 gettext_noop ("# SUC responder result messages received"), 1,
247 GNUNET_NO); 240 GNUNET_NO);
248 241
249 status = GNUNET_SCALARPRODUCT_Status_Success; 242 status = GNUNET_SCALARPRODUCT_Status_Success;
250 } 243 }
251 244
252 if (qe->cont_datum != NULL) 245 if (h->cont_status != NULL)
253 qe->response_proc (qe, msg, status); 246 h->response_proc (h, msg, status);
254 247
255 GNUNET_CONTAINER_DLL_remove (head, tail, qe); 248 GNUNET_free (h);
256 GNUNET_free (qe);
257} 249}
258 250
259 251
252static void
253send_multipart (void * cls, const struct GNUNET_SCHEDULER_TaskContext * tc)
254{
255 struct GNUNET_SCALARPRODUCT_ComputationHandle *h = (struct GNUNET_SCALARPRODUCT_ComputationHandle *) cls;
256 struct GNUNET_SCALARPRODUCT_computation_message_multipart *msg;
257 uint32_t size;
258 uint32_t todo;
259
260 h->cont_multipart = GNUNET_SCHEDULER_NO_TASK;
261
262 todo = h->element_count_total - h->element_count_transfered;
263 size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message_multipart) +todo * sizeof (struct GNUNET_SCALARPRODUCT_Element);
264 if (GNUNET_SERVER_MAX_MESSAGE_SIZE <= size) {
265 //create a multipart msg, first we calculate a new msg size for the head msg
266 todo = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct GNUNET_SCALARPRODUCT_computation_message_multipart)) / sizeof (struct GNUNET_SCALARPRODUCT_Element);
267 size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message_multipart) +todo * sizeof (struct GNUNET_SCALARPRODUCT_Element);
268 }
269
270 msg = (struct GNUNET_SCALARPRODUCT_computation_message_multipart*) GNUNET_malloc (size);
271 h->msg = msg;
272 msg->header.size = htons (size);
273 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART);
274 msg->element_count_contained = htonl (todo);
275
276 memcpy (&msg[1], &h->elements[h->element_count_transfered], todo);
277 h->element_count_transfered += todo;
278
279 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
280 GNUNET_TIME_UNIT_FOREVER_REL,
281 GNUNET_YES, // retry is OK in the initial stage
282 &do_send_message, h);
283
284 if (!h->th) {
285 LOG (GNUNET_ERROR_TYPE_ERROR,
286 _ ("Failed to send a multipart message to the scalarproduct service\n"));
287 GNUNET_STATISTICS_update (h->stats,
288 gettext_noop ("# transmission request failures"),
289 1, GNUNET_NO);
290 GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
291 GNUNET_CLIENT_disconnect (h->client);
292 GNUNET_free (h->msg);
293 h->msg = NULL;
294 if (h->cont_status != NULL)
295 h->response_proc (h, NULL, GNUNET_SCALARPRODUCT_Status_Failure);
296
297 GNUNET_SCALARPRODUCT_cancel (cls);
298 }
299}
300
260/** 301/**
261 * Transmits the request to the VectorProduct Service 302 * Transmits the request to the VectorProduct Service
262 * 303 *
@@ -267,39 +308,45 @@ receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
267 * @return Size of the message sent 308 * @return Size of the message sent
268 */ 309 */
269static size_t 310static size_t
270transmit_request (void *cls, size_t size, 311do_send_message (void *cls, size_t size,
271 void *buf) 312 void *buf)
272{ 313{
273 struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; 314 struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
274 315
275 if (NULL == buf) 316 if (NULL == buf) {
276 { 317 LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n");
277 LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n"); 318 GNUNET_STATISTICS_update (h->stats,
278 GNUNET_STATISTICS_update (qe->stats, 319 gettext_noop ("# transmission request failures"),
279 gettext_noop ("# transmission request failures"), 320 1, GNUNET_NO);
280 1, GNUNET_NO);
281 321
282 // notify caller about the error, done here. 322 // notify caller about the error, done here.
283 if (qe->cont_datum != NULL) 323 if (h->cont_status != NULL)
284 qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Failure); 324 h->response_proc (h, NULL, GNUNET_SCALARPRODUCT_Status_Failure);
285 325
286 GNUNET_SCALARPRODUCT_cancel (cls); 326 GNUNET_SCALARPRODUCT_cancel (cls);
287 return 0; 327 return 0;
288 } 328 }
289 memcpy (buf, qe->msg, size); 329 memcpy (buf, h->msg, size);
290 330
291 GNUNET_free (qe->msg); 331 GNUNET_free (h->msg);
292 qe->msg = NULL; 332 h->msg = NULL;
293 qe->th = NULL; 333 h->th = NULL;
294
295 GNUNET_CLIENT_receive (qe->client, &receive_cb, qe,
296 GNUNET_TIME_UNIT_FOREVER_REL);
297 334
298#if INSANE_STATISTICS 335#if INSANE_STATISTICS
299 GNUNET_STATISTICS_update (qe->stats, 336 GNUNET_STATISTICS_update (h->stats,
300 gettext_noop ("# bytes sent to scalarproduct"), 1, 337 gettext_noop ("# bytes sent to scalarproduct"), 1,
301 GNUNET_NO); 338 GNUNET_NO);
302#endif 339#endif
340
341 /* done sending */
342 if (h->element_count_total == h->element_count_transfered) {
343 GNUNET_CLIENT_receive (h->client, &receive_cb, h,
344 GNUNET_TIME_UNIT_FOREVER_REL);
345 return size;
346 }
347
348 h->cont_multipart = GNUNET_SCHEDULER_add_now (&send_multipart, h);
349
303 return size; 350 return size;
304} 351}
305 352
@@ -322,20 +369,19 @@ transmit_request (void *cls, size_t size,
322 * @return a new handle for this computation 369 * @return a new handle for this computation
323 */ 370 */
324struct GNUNET_SCALARPRODUCT_ComputationHandle * 371struct GNUNET_SCALARPRODUCT_ComputationHandle *
325GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle * cfg, 372GNUNET_SCALARPRODUCT_accept_computation (const struct GNUNET_CONFIGURATION_Handle * cfg,
326 const struct GNUNET_HashCode * key, 373 const struct GNUNET_HashCode * session_key,
327 const int32_t * elements, 374 const struct GNUNET_SCALARPRODUCT_Element * elements,
328 uint32_t element_count, 375 uint32_t element_count,
329 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont, 376 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
330 void * cont_cls) 377 void * cont_cls)
331{ 378{
332 struct GNUNET_SCALARPRODUCT_ComputationHandle *h; 379 struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
333 struct GNUNET_SCALARPRODUCT_client_request *msg; 380 struct GNUNET_SCALARPRODUCT_computation_message *msg;
334 int32_t * vector; 381 uint32_t size;
335 uint16_t size; 382 uint16_t possible;
336 uint64_t i;
337 383
338 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) 384 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_computation_message)
339 + element_count * sizeof (int32_t)); 385 + element_count * sizeof (int32_t));
340 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); 386 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
341 h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg); 387 h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
@@ -356,42 +402,56 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle * cfg,
356 return NULL; 402 return NULL;
357 } 403 }
358 404
359 size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t); 405 h->element_count_total = element_count;
406 size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message) + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element);
407 if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size) {
408 possible = element_count;
409 h->element_count_transfered = element_count;
410 }
411 else {
412 //create a multipart msg, first we calculate a new msg size for the head msg
413 possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct GNUNET_SCALARPRODUCT_computation_message)) / sizeof (struct GNUNET_SCALARPRODUCT_Element);
414 h->element_count_transfered = possible;
415 size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message) + possible*sizeof (struct GNUNET_SCALARPRODUCT_Element);
416 h->elements = (struct GNUNET_SCALARPRODUCT_Element*)
417 GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count);
418 memcpy (h->elements, elements, sizeof (struct GNUNET_SCALARPRODUCT_Element)*element_count);
419 }
360 420
361 h->cont_status = cont; 421 h->cont_status = cont;
362 h->cont_cls = cont_cls; 422 h->cont_cls = cont_cls;
363 h->response_proc = &process_status_message; 423 h->response_proc = &process_status_message;
364 h->cfg = cfg; 424 h->cfg = cfg;
365 memcpy (&h->key, key, sizeof (struct GNUNET_HashCode)); 425 memcpy (&h->key, session_key, sizeof (struct GNUNET_HashCode));
366 426
367 msg = (struct GNUNET_SCALARPRODUCT_client_request*) GNUNET_malloc (size); 427 msg = (struct GNUNET_SCALARPRODUCT_computation_message*) GNUNET_malloc (size);
368 h->msg = msg; 428 h->msg = msg;
369 msg->header.size = htons (size); 429 msg->header.size = htons (size);
370 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB); 430 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB);
371 msg->element_count = htonl (element_count); 431 msg->element_count_total = htonl (element_count);
372 432 msg->element_count_contained = htonl (possible);
373 vector = (int32_t*) & msg[1];
374 // copy each element over to the message
375 for (i = 0; i < element_count; i++)
376 vector[i] = htonl (elements[i]);
377 433
378 memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode)); 434 memcpy (&msg->session_key, session_key, sizeof (struct GNUNET_HashCode));
435 memcpy (&msg[1], elements, possible);
379 436
380 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, 437 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
381 GNUNET_TIME_UNIT_FOREVER_REL, 438 GNUNET_TIME_UNIT_FOREVER_REL,
382 GNUNET_YES, // retry is OK in the initial stage 439 GNUNET_YES, // retry is OK in the initial stage
383 &transmit_request, h); 440 &do_send_message, h);
384 if (!h->th) 441 if (!h->th)
385 { 442 {
386 LOG (GNUNET_ERROR_TYPE_ERROR, 443 LOG (GNUNET_ERROR_TYPE_ERROR,
387 _ ("Failed to send a message to the scalarproduct service\n")); 444 _ ("Failed to send a message to the scalarproduct service\n"));
445 GNUNET_STATISTICS_update (h->stats,
446 gettext_noop ("# transmission request failures"),
447 1, GNUNET_NO);
388 GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES); 448 GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
389 GNUNET_CLIENT_disconnect (h->client); 449 GNUNET_CLIENT_disconnect (h->client);
390 GNUNET_free (h->msg); 450 GNUNET_free (h->msg);
451 GNUNET_free_non_null (h->elements);
391 GNUNET_free (h); 452 GNUNET_free (h);
392 return NULL; 453 return NULL;
393 } 454 }
394 GNUNET_CONTAINER_DLL_insert (head, tail, h);
395 return h; 455 return h;
396} 456}
397 457
@@ -400,37 +460,28 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle * cfg,
400 * Request by Alice's client for computing a scalar product 460 * Request by Alice's client for computing a scalar product
401 * 461 *
402 * @param cfg the gnunet configuration handle 462 * @param cfg the gnunet configuration handle
403 * @param key Session key should be unique to the requesting client 463 * @param session_key Session key should be unique to the requesting client
404 * @param peer PeerID of the other peer 464 * @param peer PeerID of the other peer
405 * @param elements Array of elements of the vector 465 * @param elements Array of elements of the vector
406 * @param element_count Number of elements in the vector 466 * @param element_count Number of elements in the vector
407 * @param mask Array of the mask
408 * @param mask_bytes number of bytes in the mask
409 * @param cont Callback function 467 * @param cont Callback function
410 * @param cont_cls Closure for the callback function 468 * @param cont_cls Closure for the callback function
411 * 469 *
412 * @return a new handle for this computation 470 * @return a new handle for this computation
413 */ 471 */
414struct GNUNET_SCALARPRODUCT_ComputationHandle * 472struct GNUNET_SCALARPRODUCT_ComputationHandle *
415GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle * cfg, 473GNUNET_SCALARPRODUCT_start_computation (const struct GNUNET_CONFIGURATION_Handle * cfg,
416 const struct GNUNET_HashCode * key, 474 const struct GNUNET_HashCode * session_key,
417 const struct GNUNET_PeerIdentity *peer, 475 const struct GNUNET_PeerIdentity *peer,
418 const int32_t * elements, 476 const struct GNUNET_SCALARPRODUCT_Element * elements,
419 uint32_t element_count, 477 uint32_t element_count,
420 const unsigned char * mask,
421 uint32_t mask_bytes,
422 GNUNET_SCALARPRODUCT_DatumProcessor cont, 478 GNUNET_SCALARPRODUCT_DatumProcessor cont,
423 void * cont_cls) 479 void * cont_cls)
424{ 480{
425 struct GNUNET_SCALARPRODUCT_ComputationHandle *h; 481 struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
426 struct GNUNET_SCALARPRODUCT_client_request *msg; 482 struct GNUNET_SCALARPRODUCT_computation_message *msg;
427 int32_t * vector; 483 uint32_t size;
428 uint16_t size; 484 uint16_t possible;
429 uint64_t i;
430
431 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
432 +element_count * sizeof (int32_t)
433 + mask_bytes);
434 485
435 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); 486 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
436 h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg); 487 h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
@@ -451,49 +502,60 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle * cfg,
451 return NULL; 502 return NULL;
452 } 503 }
453 504
454 size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_bytes; 505 h->element_count_total = element_count;
455 506 size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message) + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element);
507 if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size) {
508 possible = element_count;
509 h->element_count_transfered = element_count;
510 }
511 else {
512 //create a multipart msg, first we calculate a new msg size for the head msg
513 possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct GNUNET_SCALARPRODUCT_computation_message)) / sizeof (struct GNUNET_SCALARPRODUCT_Element);
514 h->element_count_transfered = possible;
515 size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message) + possible*sizeof (struct GNUNET_SCALARPRODUCT_Element);
516 h->elements = (struct GNUNET_SCALARPRODUCT_Element*)
517 GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count);
518 memcpy (h->elements, elements, sizeof (struct GNUNET_SCALARPRODUCT_Element)*element_count);
519 }
520
456 h->cont_datum = cont; 521 h->cont_datum = cont;
457 h->cont_cls = cont_cls; 522 h->cont_cls = cont_cls;
458 h->response_proc = &process_result_message; 523 h->response_proc = &process_result_message;
459 h->cfg = cfg; 524 h->cfg = cfg;
460 memcpy (&h->key, key, sizeof (struct GNUNET_HashCode)); 525 memcpy (&h->key, session_key, sizeof (struct GNUNET_HashCode));
461 526
462 msg = (struct GNUNET_SCALARPRODUCT_client_request*) GNUNET_malloc (size); 527 msg = (struct GNUNET_SCALARPRODUCT_computation_message*) GNUNET_malloc (size);
463 h->msg = msg; 528 h->msg = msg;
464 msg->header.size = htons (size); 529 msg->header.size = htons (size);
465 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE); 530 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
466 msg->element_count = htonl (element_count); 531 msg->element_count_total = htonl (element_count);
467 msg->mask_length = htonl (mask_bytes); 532 msg->element_count_contained = htonl (possible);
468
469 vector = (int32_t*) & msg[1];
470 // copy each element over to the message
471 for (i = 0; i < element_count; i++)
472 vector[i] = htonl (elements[i]);
473 533
474 memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity)); 534 memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity));
475 memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode)); 535 memcpy (&msg->session_key, session_key, sizeof (struct GNUNET_HashCode));
476 memcpy (&vector[element_count], mask, mask_bytes); 536 memcpy (&msg[1], elements, possible);
477 537
478 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, 538 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
479 GNUNET_TIME_UNIT_FOREVER_REL, 539 GNUNET_TIME_UNIT_FOREVER_REL,
480 GNUNET_YES, // retry is OK in the initial stage 540 GNUNET_YES, // retry is OK in the initial stage
481 &transmit_request, h); 541 &do_send_message, h);
482 if (!h->th) 542 if (!h->th)
483 { 543 {
484 LOG (GNUNET_ERROR_TYPE_ERROR, 544 LOG (GNUNET_ERROR_TYPE_ERROR,
485 _ ("Failed to send a message to the scalarproduct service\n")); 545 _ ("Failed to send a message to the scalarproduct service\n"));
546 GNUNET_STATISTICS_update (h->stats,
547 gettext_noop ("# transmission request failures"),
548 1, GNUNET_NO);
486 GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES); 549 GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
487 GNUNET_CLIENT_disconnect (h->client); 550 GNUNET_CLIENT_disconnect (h->client);
488 GNUNET_free (h->msg); 551 GNUNET_free (h->msg);
552 GNUNET_free_non_null (h->elements);
489 GNUNET_free (h); 553 GNUNET_free (h);
490 return NULL; 554 return NULL;
491 } 555 }
492 GNUNET_CONTAINER_DLL_insert (head, tail, h);
493 return h; 556 return h;
494} 557}
495 558
496
497/** 559/**
498 * Cancel an ongoing computation or revoke our collaboration offer. 560 * Cancel an ongoing computation or revoke our collaboration offer.
499 * Closes the connection to the service 561 * Closes the connection to the service
@@ -503,43 +565,16 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle * cfg,
503void 565void
504GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h) 566GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h)
505{ 567{
506 struct GNUNET_SCALARPRODUCT_ComputationHandle * qe; 568 if (NULL != h->th)
507 569 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
508 for (qe = head; head != NULL; qe = head) 570 if (GNUNET_SCHEDULER_NO_TASK != h->cont_multipart)
509 { 571 GNUNET_SCHEDULER_cancel (h->cont_multipart);
510 if (qe == h) 572 GNUNET_free_non_null (h->elements);
511 { 573 GNUNET_free_non_null (h->msg);
512 GNUNET_CONTAINER_DLL_remove (head, tail, qe); 574 GNUNET_CLIENT_disconnect (h->client);
513 if (NULL != qe->th) 575 GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
514 GNUNET_CLIENT_notify_transmit_ready_cancel (qe->th); 576 GNUNET_free (h);
515 GNUNET_CLIENT_disconnect (qe->client);
516 GNUNET_STATISTICS_destroy (qe->stats, GNUNET_YES);
517 GNUNET_free_non_null (qe->msg);
518 GNUNET_free (qe);
519 break;
520 }
521 }
522} 577}
523/**
524 * Cancel ALL ongoing computation or revoke our collaboration offer.
525 * Closes ALL connections to the service
526 */
527void
528GNUNET_SCALARPRODUCT_disconnect ()
529{
530 struct GNUNET_SCALARPRODUCT_ComputationHandle * qe;
531 578
532 LOG (GNUNET_ERROR_TYPE_INFO, "Disconnecting from VectorProduct\n");
533 for (qe = head; head != NULL; qe = head)
534 {
535 GNUNET_CONTAINER_DLL_remove (head, tail, qe);
536 if (NULL != qe->th)
537 GNUNET_CLIENT_notify_transmit_ready_cancel (qe->th);
538 GNUNET_CLIENT_disconnect (qe->client);
539 GNUNET_STATISTICS_destroy (qe->stats, GNUNET_YES);
540 GNUNET_free_non_null (qe->msg);
541 GNUNET_free (qe);
542 }
543}
544 579
545/* end of scalarproduct_api.c */ 580/* end of scalarproduct_api.c */
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c
index cc0c721af..5c4bc9dd5 100644
--- a/src/set/gnunet-service-set_intersection.c
+++ b/src/set/gnunet-service-set_intersection.c
@@ -766,7 +766,7 @@ handle_p2p_element_info (void *cls, const struct GNUNET_MessageHeader *mh)
766 766
767 767
768/** 768/**
769 * Send our element to the peer, in case our element count is lower than his 769 * Send our element count to the peer, in case our element count is lower than his
770 * 770 *
771 * @param op intersection operation 771 * @param op intersection operation
772 */ 772 */