diff options
author | Christian Fuchs <christian.fuchs@cfuchs.net> | 2013-09-02 14:46:52 +0000 |
---|---|---|
committer | Christian Fuchs <christian.fuchs@cfuchs.net> | 2013-09-02 14:46:52 +0000 |
commit | 9f064f70f7aff38119ebf1b4345118cb61302f2d (patch) | |
tree | 376024ba781c11965bb5911b217657dd60c2dd3e /src/scalarproduct | |
parent | 6ea7ba4bfcd11cbfc0b828f12d881f983d277bc7 (diff) | |
download | gnunet-9f064f70f7aff38119ebf1b4345118cb61302f2d.tar.gz gnunet-9f064f70f7aff38119ebf1b4345118cb61302f2d.zip |
removed much of the excell logics in the scalar product API
finished the the alice/bob API initiation functions in the SP API
merged structes in SP API
reorganized SP bookkeeping of computations
Diffstat (limited to 'src/scalarproduct')
-rw-r--r-- | src/scalarproduct/scalarproduct_api.c | 341 |
1 files changed, 78 insertions, 263 deletions
diff --git a/src/scalarproduct/scalarproduct_api.c b/src/scalarproduct/scalarproduct_api.c index b77c30925..7c1f5394e 100644 --- a/src/scalarproduct/scalarproduct_api.c +++ b/src/scalarproduct/scalarproduct_api.c | |||
@@ -41,22 +41,42 @@ | |||
41 | /** | 41 | /** |
42 | * Entry in the request queue per client | 42 | * Entry in the request queue per client |
43 | */ | 43 | */ |
44 | struct GNUNET_SCALARPRODUCT_QueueEntry | 44 | struct GNUNET_SCALARPRODUCT_ComputationHandle |
45 | { | 45 | { |
46 | /** | 46 | /** |
47 | * This is a linked list. | 47 | * This is a linked list. |
48 | */ | 48 | */ |
49 | struct GNUNET_SCALARPRODUCT_QueueEntry *next; | 49 | struct GNUNET_SCALARPRODUCT_ComputationHandle *next; |
50 | 50 | ||
51 | /** | 51 | /** |
52 | * This is a linked list. | 52 | * This is a linked list. |
53 | */ | 53 | */ |
54 | struct GNUNET_SCALARPRODUCT_QueueEntry *prev; | 54 | struct GNUNET_SCALARPRODUCT_ComputationHandle *prev; |
55 | |||
56 | /** | ||
57 | * Our configuration. | ||
58 | */ | ||
59 | const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
60 | |||
61 | /** | ||
62 | * Current connection to the scalarproduct service. | ||
63 | */ | ||
64 | struct GNUNET_CLIENT_Connection *client; | ||
65 | |||
66 | /** | ||
67 | * Handle for statistics. | ||
68 | */ | ||
69 | struct GNUNET_STATISTICS_Handle *stats; | ||
55 | 70 | ||
56 | /** | 71 | /** |
57 | * Handle to the master context. | 72 | * The shared session key identifying this computation |
58 | */ | 73 | */ |
59 | struct GNUNET_SCALARPRODUCT_Handle *h; | 74 | struct GNUNET_HashCode * key; |
75 | |||
76 | /** | ||
77 | * Current transmit handle. | ||
78 | */ | ||
79 | struct GNUNET_CLIENT_TransmitHandle *th; | ||
60 | 80 | ||
61 | /** | 81 | /** |
62 | * Size of the message | 82 | * Size of the message |
@@ -66,7 +86,7 @@ struct GNUNET_SCALARPRODUCT_QueueEntry | |||
66 | /** | 86 | /** |
67 | * Message to be sent to the scalarproduct service | 87 | * Message to be sent to the scalarproduct service |
68 | */ | 88 | */ |
69 | struct GNUNET_SCALARPRODUCT_client_request* msg; | 89 | struct GNUNET_SCALARPRODUCT_client_request * msg; |
70 | 90 | ||
71 | union | 91 | union |
72 | { | 92 | { |
@@ -87,14 +107,6 @@ struct GNUNET_SCALARPRODUCT_QueueEntry | |||
87 | void *cont_cls; | 107 | void *cont_cls; |
88 | 108 | ||
89 | /** | 109 | /** |
90 | * Has this message been transmitted to the service? | ||
91 | * Only ever GNUNET_YES for the head of the queue. | ||
92 | * Note that the overall struct should end at a | ||
93 | * multiple of 64 bits. | ||
94 | */ | ||
95 | int16_t was_transmitted; | ||
96 | |||
97 | /** | ||
98 | * Response Processor for response from the service. This function calls the | 110 | * Response Processor for response from the service. This function calls the |
99 | * continuation function provided by the client. | 111 | * continuation function provided by the client. |
100 | */ | 112 | */ |
@@ -102,35 +114,20 @@ struct GNUNET_SCALARPRODUCT_QueueEntry | |||
102 | }; | 114 | }; |
103 | 115 | ||
104 | /************************************************************** | 116 | /************************************************************** |
105 | *** Function Declarations ********** | 117 | *** Global Variables ********** |
106 | **************************************************************/ | 118 | **************************************************************/ |
107 | |||
108 | /** | 119 | /** |
109 | * Creates a new entry at the tail of the DLL | 120 | * Head of the active sessions queue |
110 | * | ||
111 | * @param h handle to the master context | ||
112 | * | ||
113 | * @return pointer to the entry | ||
114 | */ | 121 | */ |
115 | static struct GNUNET_SCALARPRODUCT_QueueEntry * | 122 | struct GNUNET_SCALARPRODUCT_ComputationHandle *head; |
116 | make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h); | ||
117 | |||
118 | /** | 123 | /** |
119 | * Removes the head entry from the queue | 124 | * Tail of the active sessions queue |
120 | * | ||
121 | * @param h Handle to the master context | ||
122 | */ | 125 | */ |
123 | static struct GNUNET_SCALARPRODUCT_QueueEntry * | 126 | struct GNUNET_SCALARPRODUCT_ComputationHandle *tail; |
124 | free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h); | ||
125 | 127 | ||
126 | /** | 128 | /************************************************************** |
127 | * Triggered when timeout occurs for a request in queue | 129 | *** Function Declarations ********** |
128 | * | 130 | **************************************************************/ |
129 | * @param cls The pointer to the QueueEntry | ||
130 | * @param tc Task Context | ||
131 | */ | ||
132 | static void | ||
133 | timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | ||
134 | 131 | ||
135 | /** | 132 | /** |
136 | * Called when a response is received from the service. After basic check | 133 | * Called when a response is received from the service. After basic check |
@@ -155,120 +152,10 @@ receive_cb (void *cls, const struct GNUNET_MessageHeader *msg); | |||
155 | static size_t transmit_request (void *cls, size_t size, | 152 | static size_t transmit_request (void *cls, size_t size, |
156 | void *buf); | 153 | void *buf); |
157 | 154 | ||
158 | /** | ||
159 | * Issues transmit request for the new entries in the queue | ||
160 | * | ||
161 | * @param h handle to the master context | ||
162 | */ | ||
163 | static void | ||
164 | process_queue (struct GNUNET_SCALARPRODUCT_Handle *h); | ||
165 | |||
166 | /************************************************************** | 155 | /************************************************************** |
167 | *** Static Function Declarations ********** | 156 | *** Static Function Declarations ********** |
168 | **************************************************************/ | 157 | **************************************************************/ |
169 | 158 | ||
170 | |||
171 | /** | ||
172 | * Creates a new entry at the tail of the DLL | ||
173 | * | ||
174 | * @param h handle to the master context | ||
175 | * | ||
176 | * @return pointer to the entry | ||
177 | */ | ||
178 | static struct GNUNET_SCALARPRODUCT_QueueEntry * | ||
179 | make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h) | ||
180 | { | ||
181 | struct GNUNET_SCALARPRODUCT_QueueEntry *qe; | ||
182 | |||
183 | qe = GNUNET_new (struct GNUNET_SCALARPRODUCT_QueueEntry); | ||
184 | |||
185 | // if queue empty | ||
186 | if (NULL == h->queue_head && NULL == h->queue_tail) | ||
187 | { | ||
188 | qe->next = NULL; | ||
189 | qe->prev = NULL; | ||
190 | h->queue_head = qe; | ||
191 | h->queue_tail = qe; | ||
192 | } | ||
193 | else | ||
194 | { | ||
195 | qe->prev = h->queue_tail; | ||
196 | h->queue_tail->next = qe; | ||
197 | h->queue_tail = qe; | ||
198 | } | ||
199 | |||
200 | return qe; | ||
201 | } | ||
202 | |||
203 | |||
204 | /** | ||
205 | * Removes the head entry from the queue | ||
206 | * | ||
207 | * @param h Handle to the master context | ||
208 | */ | ||
209 | static struct GNUNET_SCALARPRODUCT_QueueEntry * | ||
210 | free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h) | ||
211 | { | ||
212 | struct GNUNET_SCALARPRODUCT_QueueEntry * qe = NULL; | ||
213 | |||
214 | GNUNET_assert (NULL != h); | ||
215 | if (NULL == h->queue_head && NULL == h->queue_tail) | ||
216 | { | ||
217 | // The queue is empty. Just return. | ||
218 | qe = NULL; | ||
219 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue was empty when free_queue_head_entry was called.\n"); | ||
220 | } | ||
221 | else if (h->queue_head == h->queue_tail) //only one entry | ||
222 | { | ||
223 | qe = h->queue_head; | ||
224 | qe->next = NULL; | ||
225 | qe->prev = NULL; | ||
226 | h->queue_head = NULL; | ||
227 | h->queue_tail = NULL; | ||
228 | } | ||
229 | else | ||
230 | { | ||
231 | qe = h->queue_head; | ||
232 | h->queue_head = h->queue_head->next; | ||
233 | h->queue_head->prev = NULL; | ||
234 | qe->next = NULL; | ||
235 | qe->prev = NULL; | ||
236 | } | ||
237 | return qe; | ||
238 | } | ||
239 | |||
240 | |||
241 | /** | ||
242 | * Triggered when timeout occurs for a request in queue | ||
243 | * | ||
244 | * @param cls The pointer to the QueueEntry | ||
245 | * @param tc Task Context | ||
246 | */ | ||
247 | static void | ||
248 | timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
249 | { | ||
250 | struct GNUNET_SCALARPRODUCT_QueueEntry * qe = cls; | ||
251 | |||
252 | // Update Statistics | ||
253 | GNUNET_STATISTICS_update (qe->h->stats, | ||
254 | gettext_noop ("# queue entry timeouts"), 1, | ||
255 | GNUNET_NO); | ||
256 | |||
257 | // Clear the timeout_task | ||
258 | qe->timeout_task = GNUNET_SCHEDULER_NO_TASK; | ||
259 | |||
260 | // transmit_request is supposed to cancel timeout task. | ||
261 | // If message was not transmitted, there is definitely an error. | ||
262 | GNUNET_assert (GNUNET_NO == qe->was_transmitted); | ||
263 | |||
264 | LOG (GNUNET_ERROR_TYPE_INFO, "Timeout of request in datastore queue\n"); | ||
265 | |||
266 | // remove the queue_entry for the queue | ||
267 | GNUNET_CONTAINER_DLL_remove (qe->h->queue_head, qe->h->queue_tail, qe); | ||
268 | qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Timeout); | ||
269 | } | ||
270 | |||
271 | |||
272 | /** | 159 | /** |
273 | * Handles the RESULT received in reply of prepare_response from the | 160 | * Handles the RESULT received in reply of prepare_response from the |
274 | * service | 161 | * service |
@@ -281,7 +168,7 @@ process_status_message (void *cls, | |||
281 | const struct GNUNET_MessageHeader *msg, | 168 | const struct GNUNET_MessageHeader *msg, |
282 | enum GNUNET_SCALARPRODUCT_ResponseStatus status) | 169 | enum GNUNET_SCALARPRODUCT_ResponseStatus status) |
283 | { | 170 | { |
284 | struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls; | 171 | struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; |
285 | 172 | ||
286 | GNUNET_assert (qe != NULL); | 173 | GNUNET_assert (qe != NULL); |
287 | 174 | ||
@@ -302,7 +189,7 @@ process_result_message (void *cls, | |||
302 | const struct GNUNET_MessageHeader *msg, | 189 | const struct GNUNET_MessageHeader *msg, |
303 | enum GNUNET_SCALARPRODUCT_ResponseStatus status) | 190 | enum GNUNET_SCALARPRODUCT_ResponseStatus status) |
304 | { | 191 | { |
305 | struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls; | 192 | struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; |
306 | 193 | ||
307 | GNUNET_assert (qe != NULL); | 194 | GNUNET_assert (qe != NULL); |
308 | 195 | ||
@@ -328,8 +215,8 @@ process_result_message (void *cls, | |||
328 | static void | 215 | static void |
329 | receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) | 216 | receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) |
330 | { | 217 | { |
331 | struct GNUNET_SCALARPRODUCT_Handle *h = cls; | 218 | struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls; |
332 | struct GNUNET_SCALARPRODUCT_QueueEntry *qe; | 219 | struct GNUNET_SCALARPRODUCT_ComputationHandle *qe; |
333 | int16_t was_transmitted; | 220 | int16_t was_transmitted; |
334 | struct GNUNET_SCALARPRODUCT_client_response *message = | 221 | struct GNUNET_SCALARPRODUCT_client_response *message = |
335 | (struct GNUNET_SCALARPRODUCT_client_response *) msg; | 222 | (struct GNUNET_SCALARPRODUCT_client_response *) msg; |
@@ -414,34 +301,16 @@ static size_t | |||
414 | transmit_request (void *cls, size_t size, | 301 | transmit_request (void *cls, size_t size, |
415 | void *buf) | 302 | void *buf) |
416 | { | 303 | { |
417 | struct GNUNET_SCALARPRODUCT_Handle *h = cls; | 304 | struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; |
418 | struct GNUNET_SCALARPRODUCT_QueueEntry *qe; | ||
419 | size_t msize; | 305 | size_t msize; |
420 | 306 | ||
421 | if (NULL == (qe = h->queue_head)) | ||
422 | { | ||
423 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue head is NULL!\n\n"); | ||
424 | return 0; | ||
425 | } | ||
426 | |||
427 | GNUNET_SCHEDULER_cancel (qe->timeout_task); | ||
428 | qe->timeout_task = GNUNET_SCHEDULER_NO_TASK; | ||
429 | |||
430 | h->th = NULL; | ||
431 | if (NULL == (qe = h->queue_head)) | ||
432 | return 0; /* no entry in queue */ | ||
433 | if (buf == NULL) | 307 | if (buf == NULL) |
434 | { | 308 | { |
435 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n"); | 309 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n"); |
436 | GNUNET_STATISTICS_update (h->stats, | 310 | GNUNET_STATISTICS_update (qe->stats, |
437 | gettext_noop ("# transmission request failures"), | 311 | gettext_noop ("# transmission request failures"), |
438 | 1, GNUNET_NO); | 312 | 1, GNUNET_NO); |
439 | GNUNET_SCALARPRODUCT_disconnect (h); | 313 | GNUNET_SCALARPRODUCT_disconnect (qe); |
440 | return 0; | ||
441 | } | ||
442 | if (size < (msize = qe->message_size)) | ||
443 | { | ||
444 | process_queue (h); | ||
445 | return 0; | 314 | return 0; |
446 | } | 315 | } |
447 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to SCALARPRODUCT\n", | 316 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to SCALARPRODUCT\n", |
@@ -451,8 +320,7 @@ transmit_request (void *cls, size_t size, | |||
451 | GNUNET_free (qe->msg); | 320 | GNUNET_free (qe->msg); |
452 | qe->was_transmitted = GNUNET_YES; | 321 | qe->was_transmitted = GNUNET_YES; |
453 | 322 | ||
454 | GNUNET_assert (GNUNET_NO == h->in_receive); | 323 | qe->th = NULL; |
455 | h->in_receive = GNUNET_YES; | ||
456 | 324 | ||
457 | GNUNET_CLIENT_receive (h->client, &receive_cb, h, | 325 | GNUNET_CLIENT_receive (h->client, &receive_cb, h, |
458 | GNUNET_TIME_UNIT_FOREVER_REL); | 326 | GNUNET_TIME_UNIT_FOREVER_REL); |
@@ -466,61 +334,6 @@ transmit_request (void *cls, size_t size, | |||
466 | } | 334 | } |
467 | 335 | ||
468 | 336 | ||
469 | /** | ||
470 | * Issues transmit request for the new entries in the queue | ||
471 | * | ||
472 | * @param h handle to the master context | ||
473 | */ | ||
474 | static void | ||
475 | process_queue (struct GNUNET_SCALARPRODUCT_Handle *h) | ||
476 | { | ||
477 | struct GNUNET_SCALARPRODUCT_QueueEntry *qe; | ||
478 | |||
479 | if (NULL == (qe = h->queue_head)) | ||
480 | { | ||
481 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n"); | ||
482 | return; /* no entry in queue */ | ||
483 | } | ||
484 | if (qe->was_transmitted == GNUNET_YES) | ||
485 | { | ||
486 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n"); | ||
487 | return; /* waiting for replies */ | ||
488 | } | ||
489 | if (h->th != NULL) | ||
490 | { | ||
491 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n"); | ||
492 | return; /* request pending */ | ||
493 | } | ||
494 | if (h->client == NULL) | ||
495 | { | ||
496 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n"); | ||
497 | return; /* waiting for reconnect */ | ||
498 | } | ||
499 | if (GNUNET_YES == h->in_receive) | ||
500 | { | ||
501 | /* wait for response to previous query */ | ||
502 | return; | ||
503 | } | ||
504 | |||
505 | h->th = | ||
506 | GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size, | ||
507 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
508 | GNUNET_YES, | ||
509 | &transmit_request, h); | ||
510 | |||
511 | if (h->th == NULL) | ||
512 | { | ||
513 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
514 | _ ("Failed to send a message to the scalarproduct service\n")); | ||
515 | return; | ||
516 | } | ||
517 | |||
518 | GNUNET_assert (GNUNET_NO == h->in_receive); | ||
519 | GNUNET_break (NULL != h->th); | ||
520 | } | ||
521 | |||
522 | |||
523 | |||
524 | /************************************************************** | 337 | /************************************************************** |
525 | *** API ********** | 338 | *** API ********** |
526 | **************************************************************/ | 339 | **************************************************************/ |
@@ -536,7 +349,7 @@ process_queue (struct GNUNET_SCALARPRODUCT_Handle *h) | |||
536 | * @param cont Callback function | 349 | * @param cont Callback function |
537 | * @param cont_cls Closure for the callback function | 350 | * @param cont_cls Closure for the callback function |
538 | */ | 351 | */ |
539 | struct GNUNET_SCALARPRODUCT_Handle * | 352 | struct GNUNET_SCALARPRODUCT_ComputationHandle * |
540 | GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, | 353 | GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, |
541 | const struct GNUNET_HashCode * key, | 354 | const struct GNUNET_HashCode * key, |
542 | const int32_t * elements, | 355 | const int32_t * elements, |
@@ -544,7 +357,7 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
544 | GNUNET_SCALARPRODUCT_ContinuationWithStatus cont, | 357 | GNUNET_SCALARPRODUCT_ContinuationWithStatus cont, |
545 | void *cont_cls) | 358 | void *cont_cls) |
546 | { | 359 | { |
547 | struct GNUNET_SCALARPRODUCT_Handle *h; | 360 | struct GNUNET_SCALARPRODUCT_ComputationHandle *h; |
548 | struct GNUNET_SCALARPRODUCT_client_request *msg; | 361 | struct GNUNET_SCALARPRODUCT_client_request *msg; |
549 | int32_t * vector; | 362 | int32_t * vector; |
550 | uint16_t size; | 363 | uint16_t size; |
@@ -556,7 +369,7 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
556 | GNUNET_assert(element_count > 1); | 369 | GNUNET_assert(element_count > 1); |
557 | GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) | 370 | GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) |
558 | + element_count * sizeof (int32_t)); | 371 | + element_count * sizeof (int32_t)); |
559 | h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle); | 372 | h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); |
560 | h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg); | 373 | h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg); |
561 | if (!h->client) | 374 | if (!h->client) |
562 | { | 375 | { |
@@ -595,7 +408,6 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
595 | 408 | ||
596 | memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode)); | 409 | memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode)); |
597 | 410 | ||
598 | |||
599 | h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, | 411 | h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, |
600 | GNUNET_TIME_UNIT_FOREVER_REL, | 412 | GNUNET_TIME_UNIT_FOREVER_REL, |
601 | GNUNET_YES, // retry is OK in the initial stage | 413 | GNUNET_YES, // retry is OK in the initial stage |
@@ -610,6 +422,7 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
610 | GNUNET_free(h); | 422 | GNUNET_free(h); |
611 | return NULL; | 423 | return NULL; |
612 | } | 424 | } |
425 | GNUNET_CONTAINER_DLL_insert (head, tail, h); | ||
613 | return h; | 426 | return h; |
614 | } | 427 | } |
615 | 428 | ||
@@ -627,7 +440,7 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
627 | * @param cont Callback function | 440 | * @param cont Callback function |
628 | * @param cont_cls Closure for the callback function | 441 | * @param cont_cls Closure for the callback function |
629 | */ | 442 | */ |
630 | struct GNUNET_SCALARPRODUCT_Handle * | 443 | struct GNUNET_SCALARPRODUCT_ComputationHandle * |
631 | GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, | 444 | GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, |
632 | const struct GNUNET_HashCode * key, | 445 | const struct GNUNET_HashCode * key, |
633 | const struct GNUNET_PeerIdentity *peer, | 446 | const struct GNUNET_PeerIdentity *peer, |
@@ -638,38 +451,39 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
638 | GNUNET_SCALARPRODUCT_DatumProcessor cont, | 451 | GNUNET_SCALARPRODUCT_DatumProcessor cont, |
639 | void *cont_cls) | 452 | void *cont_cls) |
640 | { | 453 | { |
641 | struct GNUNET_CLIENT_Connection *client; | 454 | struct GNUNET_SCALARPRODUCT_ComputationHandle *h; |
642 | struct GNUNET_SCALARPRODUCT_Handle *h; | ||
643 | struct GNUNET_SCALARPRODUCT_client_request *msg; | 455 | struct GNUNET_SCALARPRODUCT_client_request *msg; |
644 | int32_t * vector; | 456 | int32_t * vector; |
645 | uint16_t size; | 457 | uint16_t size; |
646 | uint64_t i; | 458 | uint64_t i; |
647 | 459 | ||
648 | GNUNET_assert(key); | ||
649 | GNUNET_assert(peer); | ||
650 | GNUNET_assert(elements); | ||
651 | GNUNET_assert(mask); | ||
652 | GNUNET_assert(cont); | ||
653 | GNUNET_assert(element_count > 1); | ||
654 | GNUNET_assert(mask_bytes != 0); | ||
655 | GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) | 460 | GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) |
656 | + element_count * sizeof (int32_t) | 461 | + element_count * sizeof (int32_t) |
657 | + mask_length); | 462 | + mask_length); |
658 | client = GNUNET_CLIENT_connect ("scalarproduct", cfg); | 463 | |
659 | 464 | h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); | |
660 | if (!client) | 465 | h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg); |
466 | if (!h->client) | ||
661 | { | 467 | { |
662 | LOG (GNUNET_ERROR_TYPE_ERROR, | 468 | LOG (GNUNET_ERROR_TYPE_ERROR, |
663 | _ ("Failed to connect to the scalarproduct service\n")); | 469 | _ ("Failed to connect to the scalarproduct service\n")); |
470 | GNUNET_free(h); | ||
664 | return NULL; | 471 | return NULL; |
665 | } | 472 | } |
473 | h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg); | ||
474 | if (!h->th){ | ||
475 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
476 | _("Failed to send a message to the statistics service\n")); | ||
477 | GNUNET_CLIENT_disconnect(h->client); | ||
478 | GNUNET_free(h); | ||
479 | return NULL; | ||
480 | } | ||
481 | |||
666 | size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_length; | 482 | size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_length; |
667 | 483 | ||
668 | h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle); | ||
669 | h->cont_datum = cont; | 484 | h->cont_datum = cont; |
670 | h->cont_cls = cont_cls; | 485 | h->cont_cls = cont_cls; |
671 | h->response_proc = &process_status_message; | 486 | h->response_proc = &process_status_message; |
672 | h->client = client; | ||
673 | h->cfg = cfg; | 487 | h->cfg = cfg; |
674 | h->msg = GNUNET_malloc (size); | 488 | h->msg = GNUNET_malloc (size); |
675 | memcpy (&h->key, key, sizeof (struct GNUNET_HashCode)); | 489 | memcpy (&h->key, key, sizeof (struct GNUNET_HashCode)); |
@@ -689,17 +503,21 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
689 | memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode)); | 503 | memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode)); |
690 | memcpy (&vector[element_count], mask, mask_length); | 504 | memcpy (&vector[element_count], mask, mask_length); |
691 | 505 | ||
692 | h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg); | ||
693 | h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, | 506 | h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, |
694 | GNUNET_TIME_UNIT_FOREVER_REL, | 507 | GNUNET_TIME_UNIT_FOREVER_REL, |
695 | GNUNET_YES, // retry is OK in the initial stage | 508 | GNUNET_YES, // retry is OK in the initial stage |
696 | &transmit_request, h); | 509 | &transmit_request, h); |
697 | if ( !h->th) | 510 | if (!h->th) |
698 | { | 511 | { |
699 | LOG (GNUNET_ERROR_TYPE_ERROR, | 512 | LOG (GNUNET_ERROR_TYPE_ERROR, |
700 | _ ("Failed to send a message to the scalarproduct service\n")); | 513 | _ ("Failed to send a message to the scalarproduct service\n")); |
514 | GNUNET_STATISTICS_destroy(h->GNUNET_YES); | ||
515 | GNUNET_CLIENT_disconnect(h->client); | ||
516 | GNUNET_free(h->msg); | ||
517 | GNUNET_free(h); | ||
701 | return NULL; | 518 | return NULL; |
702 | } | 519 | } |
520 | GNUNET_CONTAINER_DLL_insert (head, tail, h); | ||
703 | return h; | 521 | return h; |
704 | } | 522 | } |
705 | 523 | ||
@@ -709,27 +527,24 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
709 | * @param h handle to the scalarproduct | 527 | * @param h handle to the scalarproduct |
710 | */ | 528 | */ |
711 | void | 529 | void |
712 | GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_Handle * h) | 530 | GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_ComputationHandle * h) |
713 | { | 531 | { |
714 | struct GNUNET_SCALARPRODUCT_QueueEntry * qe; | 532 | struct GNUNET_SCALARPRODUCT_ComputationHandle * qe; |
715 | 533 | ||
716 | LOG (GNUNET_ERROR_TYPE_INFO, | 534 | LOG (GNUNET_ERROR_TYPE_INFO, |
717 | "Disconnecting from VectorProduct\n"); | 535 | "Disconnecting from VectorProduct\n"); |
718 | 536 | ||
719 | while (NULL != h->queue_head) | 537 | for (qe = head; head != NULL; qe = head) |
720 | { | ||
721 | GNUNET_assert (NULL != (qe = free_queue_head_entry (h))); | ||
722 | qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_ServiceDisconnected); | ||
723 | } | ||
724 | |||
725 | if (h->client != NULL) | ||
726 | { | 538 | { |
539 | GNUNET_CONTAINER_DLL_remove (head, tail, qe); | ||
540 | if (NULL == qe->th) | ||
541 | GNUNET_CLIENT_notify_transmit_ready_cancel(qe->th); | ||
727 | GNUNET_CLIENT_disconnect (h->client); | 542 | GNUNET_CLIENT_disconnect (h->client); |
728 | h->client = NULL; | 543 | GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES); |
544 | qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_ServiceDisconnected); | ||
545 | GNUNET_free(qe->msg); | ||
546 | GNUNET_free(qe); | ||
729 | } | 547 | } |
730 | |||
731 | GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO); | ||
732 | h->stats = NULL; | ||
733 | } | 548 | } |
734 | 549 | ||
735 | /* end of ext_api.c */ | 550 | /* end of ext_api.c */ |