aboutsummaryrefslogtreecommitdiff
path: root/src/scalarproduct
diff options
context:
space:
mode:
authorChristian Fuchs <christian.fuchs@cfuchs.net>2013-09-02 14:46:52 +0000
committerChristian Fuchs <christian.fuchs@cfuchs.net>2013-09-02 14:46:52 +0000
commit9f064f70f7aff38119ebf1b4345118cb61302f2d (patch)
tree376024ba781c11965bb5911b217657dd60c2dd3e /src/scalarproduct
parent6ea7ba4bfcd11cbfc0b828f12d881f983d277bc7 (diff)
downloadgnunet-9f064f70f7aff38119ebf1b4345118cb61302f2d.tar.gz
gnunet-9f064f70f7aff38119ebf1b4345118cb61302f2d.zip
removed much of the excell logics in the scalar product API
finished the the alice/bob API initiation functions in the SP API merged structes in SP API reorganized SP bookkeeping of computations
Diffstat (limited to 'src/scalarproduct')
-rw-r--r--src/scalarproduct/scalarproduct_api.c341
1 files changed, 78 insertions, 263 deletions
diff --git a/src/scalarproduct/scalarproduct_api.c b/src/scalarproduct/scalarproduct_api.c
index b77c30925..7c1f5394e 100644
--- a/src/scalarproduct/scalarproduct_api.c
+++ b/src/scalarproduct/scalarproduct_api.c
@@ -41,22 +41,42 @@
41/** 41/**
42 * Entry in the request queue per client 42 * Entry in the request queue per client
43 */ 43 */
44struct GNUNET_SCALARPRODUCT_QueueEntry 44struct GNUNET_SCALARPRODUCT_ComputationHandle
45{ 45{
46 /** 46 /**
47 * This is a linked list. 47 * This is a linked list.
48 */ 48 */
49 struct GNUNET_SCALARPRODUCT_QueueEntry *next; 49 struct GNUNET_SCALARPRODUCT_ComputationHandle *next;
50 50
51 /** 51 /**
52 * This is a linked list. 52 * This is a linked list.
53 */ 53 */
54 struct GNUNET_SCALARPRODUCT_QueueEntry *prev; 54 struct GNUNET_SCALARPRODUCT_ComputationHandle *prev;
55
56 /**
57 * Our configuration.
58 */
59 const struct GNUNET_CONFIGURATION_Handle *cfg;
60
61 /**
62 * Current connection to the scalarproduct service.
63 */
64 struct GNUNET_CLIENT_Connection *client;
65
66 /**
67 * Handle for statistics.
68 */
69 struct GNUNET_STATISTICS_Handle *stats;
55 70
56 /** 71 /**
57 * Handle to the master context. 72 * The shared session key identifying this computation
58 */ 73 */
59 struct GNUNET_SCALARPRODUCT_Handle *h; 74 struct GNUNET_HashCode * key;
75
76 /**
77 * Current transmit handle.
78 */
79 struct GNUNET_CLIENT_TransmitHandle *th;
60 80
61 /** 81 /**
62 * Size of the message 82 * Size of the message
@@ -66,7 +86,7 @@ struct GNUNET_SCALARPRODUCT_QueueEntry
66 /** 86 /**
67 * Message to be sent to the scalarproduct service 87 * Message to be sent to the scalarproduct service
68 */ 88 */
69 struct GNUNET_SCALARPRODUCT_client_request* msg; 89 struct GNUNET_SCALARPRODUCT_client_request * msg;
70 90
71 union 91 union
72 { 92 {
@@ -87,14 +107,6 @@ struct GNUNET_SCALARPRODUCT_QueueEntry
87 void *cont_cls; 107 void *cont_cls;
88 108
89 /** 109 /**
90 * Has this message been transmitted to the service?
91 * Only ever GNUNET_YES for the head of the queue.
92 * Note that the overall struct should end at a
93 * multiple of 64 bits.
94 */
95 int16_t was_transmitted;
96
97 /**
98 * Response Processor for response from the service. This function calls the 110 * Response Processor for response from the service. This function calls the
99 * continuation function provided by the client. 111 * continuation function provided by the client.
100 */ 112 */
@@ -102,35 +114,20 @@ struct GNUNET_SCALARPRODUCT_QueueEntry
102}; 114};
103 115
104/************************************************************** 116/**************************************************************
105 *** Function Declarations ********** 117 *** Global Variables **********
106 **************************************************************/ 118 **************************************************************/
107
108/** 119/**
109 * Creates a new entry at the tail of the DLL 120 * Head of the active sessions queue
110 *
111 * @param h handle to the master context
112 *
113 * @return pointer to the entry
114 */ 121 */
115static struct GNUNET_SCALARPRODUCT_QueueEntry * 122struct GNUNET_SCALARPRODUCT_ComputationHandle *head;
116make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h);
117
118/** 123/**
119 * Removes the head entry from the queue 124 * Tail of the active sessions queue
120 *
121 * @param h Handle to the master context
122 */ 125 */
123static struct GNUNET_SCALARPRODUCT_QueueEntry * 126struct GNUNET_SCALARPRODUCT_ComputationHandle *tail;
124free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h);
125 127
126/** 128/**************************************************************
127 * Triggered when timeout occurs for a request in queue 129 *** Function Declarations **********
128 * 130 **************************************************************/
129 * @param cls The pointer to the QueueEntry
130 * @param tc Task Context
131 */
132static void
133timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
134 131
135/** 132/**
136 * Called when a response is received from the service. After basic check 133 * Called when a response is received from the service. After basic check
@@ -155,120 +152,10 @@ receive_cb (void *cls, const struct GNUNET_MessageHeader *msg);
155static size_t transmit_request (void *cls, size_t size, 152static size_t transmit_request (void *cls, size_t size,
156 void *buf); 153 void *buf);
157 154
158/**
159 * Issues transmit request for the new entries in the queue
160 *
161 * @param h handle to the master context
162 */
163static void
164process_queue (struct GNUNET_SCALARPRODUCT_Handle *h);
165
166/************************************************************** 155/**************************************************************
167 *** Static Function Declarations ********** 156 *** Static Function Declarations **********
168 **************************************************************/ 157 **************************************************************/
169 158
170
171/**
172 * Creates a new entry at the tail of the DLL
173 *
174 * @param h handle to the master context
175 *
176 * @return pointer to the entry
177 */
178static struct GNUNET_SCALARPRODUCT_QueueEntry *
179make_queue_entry (struct GNUNET_SCALARPRODUCT_Handle *h)
180{
181 struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
182
183 qe = GNUNET_new (struct GNUNET_SCALARPRODUCT_QueueEntry);
184
185 // if queue empty
186 if (NULL == h->queue_head && NULL == h->queue_tail)
187 {
188 qe->next = NULL;
189 qe->prev = NULL;
190 h->queue_head = qe;
191 h->queue_tail = qe;
192 }
193 else
194 {
195 qe->prev = h->queue_tail;
196 h->queue_tail->next = qe;
197 h->queue_tail = qe;
198 }
199
200 return qe;
201}
202
203
204/**
205 * Removes the head entry from the queue
206 *
207 * @param h Handle to the master context
208 */
209static struct GNUNET_SCALARPRODUCT_QueueEntry *
210free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h)
211{
212 struct GNUNET_SCALARPRODUCT_QueueEntry * qe = NULL;
213
214 GNUNET_assert (NULL != h);
215 if (NULL == h->queue_head && NULL == h->queue_tail)
216 {
217 // The queue is empty. Just return.
218 qe = NULL;
219 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue was empty when free_queue_head_entry was called.\n");
220 }
221 else if (h->queue_head == h->queue_tail) //only one entry
222 {
223 qe = h->queue_head;
224 qe->next = NULL;
225 qe->prev = NULL;
226 h->queue_head = NULL;
227 h->queue_tail = NULL;
228 }
229 else
230 {
231 qe = h->queue_head;
232 h->queue_head = h->queue_head->next;
233 h->queue_head->prev = NULL;
234 qe->next = NULL;
235 qe->prev = NULL;
236 }
237 return qe;
238}
239
240
241/**
242 * Triggered when timeout occurs for a request in queue
243 *
244 * @param cls The pointer to the QueueEntry
245 * @param tc Task Context
246 */
247static void
248timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
249{
250 struct GNUNET_SCALARPRODUCT_QueueEntry * qe = cls;
251
252 // Update Statistics
253 GNUNET_STATISTICS_update (qe->h->stats,
254 gettext_noop ("# queue entry timeouts"), 1,
255 GNUNET_NO);
256
257 // Clear the timeout_task
258 qe->timeout_task = GNUNET_SCHEDULER_NO_TASK;
259
260 // transmit_request is supposed to cancel timeout task.
261 // If message was not transmitted, there is definitely an error.
262 GNUNET_assert (GNUNET_NO == qe->was_transmitted);
263
264 LOG (GNUNET_ERROR_TYPE_INFO, "Timeout of request in datastore queue\n");
265
266 // remove the queue_entry for the queue
267 GNUNET_CONTAINER_DLL_remove (qe->h->queue_head, qe->h->queue_tail, qe);
268 qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Timeout);
269}
270
271
272/** 159/**
273 * Handles the RESULT received in reply of prepare_response from the 160 * Handles the RESULT received in reply of prepare_response from the
274 * service 161 * service
@@ -281,7 +168,7 @@ process_status_message (void *cls,
281 const struct GNUNET_MessageHeader *msg, 168 const struct GNUNET_MessageHeader *msg,
282 enum GNUNET_SCALARPRODUCT_ResponseStatus status) 169 enum GNUNET_SCALARPRODUCT_ResponseStatus status)
283{ 170{
284 struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls; 171 struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
285 172
286 GNUNET_assert (qe != NULL); 173 GNUNET_assert (qe != NULL);
287 174
@@ -302,7 +189,7 @@ process_result_message (void *cls,
302 const struct GNUNET_MessageHeader *msg, 189 const struct GNUNET_MessageHeader *msg,
303 enum GNUNET_SCALARPRODUCT_ResponseStatus status) 190 enum GNUNET_SCALARPRODUCT_ResponseStatus status)
304{ 191{
305 struct GNUNET_SCALARPRODUCT_QueueEntry *qe = cls; 192 struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
306 193
307 GNUNET_assert (qe != NULL); 194 GNUNET_assert (qe != NULL);
308 195
@@ -328,8 +215,8 @@ process_result_message (void *cls,
328static void 215static void
329receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) 216receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
330{ 217{
331 struct GNUNET_SCALARPRODUCT_Handle *h = cls; 218 struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
332 struct GNUNET_SCALARPRODUCT_QueueEntry *qe; 219 struct GNUNET_SCALARPRODUCT_ComputationHandle *qe;
333 int16_t was_transmitted; 220 int16_t was_transmitted;
334 struct GNUNET_SCALARPRODUCT_client_response *message = 221 struct GNUNET_SCALARPRODUCT_client_response *message =
335 (struct GNUNET_SCALARPRODUCT_client_response *) msg; 222 (struct GNUNET_SCALARPRODUCT_client_response *) msg;
@@ -414,34 +301,16 @@ static size_t
414transmit_request (void *cls, size_t size, 301transmit_request (void *cls, size_t size,
415 void *buf) 302 void *buf)
416{ 303{
417 struct GNUNET_SCALARPRODUCT_Handle *h = cls; 304 struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
418 struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
419 size_t msize; 305 size_t msize;
420 306
421 if (NULL == (qe = h->queue_head))
422 {
423 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue head is NULL!\n\n");
424 return 0;
425 }
426
427 GNUNET_SCHEDULER_cancel (qe->timeout_task);
428 qe->timeout_task = GNUNET_SCHEDULER_NO_TASK;
429
430 h->th = NULL;
431 if (NULL == (qe = h->queue_head))
432 return 0; /* no entry in queue */
433 if (buf == NULL) 307 if (buf == NULL)
434 { 308 {
435 LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n"); 309 LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n");
436 GNUNET_STATISTICS_update (h->stats, 310 GNUNET_STATISTICS_update (qe->stats,
437 gettext_noop ("# transmission request failures"), 311 gettext_noop ("# transmission request failures"),
438 1, GNUNET_NO); 312 1, GNUNET_NO);
439 GNUNET_SCALARPRODUCT_disconnect (h); 313 GNUNET_SCALARPRODUCT_disconnect (qe);
440 return 0;
441 }
442 if (size < (msize = qe->message_size))
443 {
444 process_queue (h);
445 return 0; 314 return 0;
446 } 315 }
447 LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to SCALARPRODUCT\n", 316 LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to SCALARPRODUCT\n",
@@ -451,8 +320,7 @@ transmit_request (void *cls, size_t size,
451 GNUNET_free (qe->msg); 320 GNUNET_free (qe->msg);
452 qe->was_transmitted = GNUNET_YES; 321 qe->was_transmitted = GNUNET_YES;
453 322
454 GNUNET_assert (GNUNET_NO == h->in_receive); 323 qe->th = NULL;
455 h->in_receive = GNUNET_YES;
456 324
457 GNUNET_CLIENT_receive (h->client, &receive_cb, h, 325 GNUNET_CLIENT_receive (h->client, &receive_cb, h,
458 GNUNET_TIME_UNIT_FOREVER_REL); 326 GNUNET_TIME_UNIT_FOREVER_REL);
@@ -466,61 +334,6 @@ transmit_request (void *cls, size_t size,
466} 334}
467 335
468 336
469/**
470 * Issues transmit request for the new entries in the queue
471 *
472 * @param h handle to the master context
473 */
474static void
475process_queue (struct GNUNET_SCALARPRODUCT_Handle *h)
476{
477 struct GNUNET_SCALARPRODUCT_QueueEntry *qe;
478
479 if (NULL == (qe = h->queue_head))
480 {
481 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
482 return; /* no entry in queue */
483 }
484 if (qe->was_transmitted == GNUNET_YES)
485 {
486 LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
487 return; /* waiting for replies */
488 }
489 if (h->th != NULL)
490 {
491 LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
492 return; /* request pending */
493 }
494 if (h->client == NULL)
495 {
496 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
497 return; /* waiting for reconnect */
498 }
499 if (GNUNET_YES == h->in_receive)
500 {
501 /* wait for response to previous query */
502 return;
503 }
504
505 h->th =
506 GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
507 GNUNET_TIME_UNIT_FOREVER_REL,
508 GNUNET_YES,
509 &transmit_request, h);
510
511 if (h->th == NULL)
512 {
513 LOG (GNUNET_ERROR_TYPE_ERROR,
514 _ ("Failed to send a message to the scalarproduct service\n"));
515 return;
516 }
517
518 GNUNET_assert (GNUNET_NO == h->in_receive);
519 GNUNET_break (NULL != h->th);
520}
521
522
523
524/************************************************************** 337/**************************************************************
525 *** API ********** 338 *** API **********
526 **************************************************************/ 339 **************************************************************/
@@ -536,7 +349,7 @@ process_queue (struct GNUNET_SCALARPRODUCT_Handle *h)
536 * @param cont Callback function 349 * @param cont Callback function
537 * @param cont_cls Closure for the callback function 350 * @param cont_cls Closure for the callback function
538 */ 351 */
539struct GNUNET_SCALARPRODUCT_Handle * 352struct GNUNET_SCALARPRODUCT_ComputationHandle *
540GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, 353GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
541 const struct GNUNET_HashCode * key, 354 const struct GNUNET_HashCode * key,
542 const int32_t * elements, 355 const int32_t * elements,
@@ -544,7 +357,7 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
544 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont, 357 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
545 void *cont_cls) 358 void *cont_cls)
546{ 359{
547 struct GNUNET_SCALARPRODUCT_Handle *h; 360 struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
548 struct GNUNET_SCALARPRODUCT_client_request *msg; 361 struct GNUNET_SCALARPRODUCT_client_request *msg;
549 int32_t * vector; 362 int32_t * vector;
550 uint16_t size; 363 uint16_t size;
@@ -556,7 +369,7 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
556 GNUNET_assert(element_count > 1); 369 GNUNET_assert(element_count > 1);
557 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) 370 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
558 + element_count * sizeof (int32_t)); 371 + element_count * sizeof (int32_t));
559 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle); 372 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
560 h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg); 373 h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
561 if (!h->client) 374 if (!h->client)
562 { 375 {
@@ -595,7 +408,6 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
595 408
596 memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode)); 409 memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
597 410
598
599 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, 411 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
600 GNUNET_TIME_UNIT_FOREVER_REL, 412 GNUNET_TIME_UNIT_FOREVER_REL,
601 GNUNET_YES, // retry is OK in the initial stage 413 GNUNET_YES, // retry is OK in the initial stage
@@ -610,6 +422,7 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
610 GNUNET_free(h); 422 GNUNET_free(h);
611 return NULL; 423 return NULL;
612 } 424 }
425 GNUNET_CONTAINER_DLL_insert (head, tail, h);
613 return h; 426 return h;
614} 427}
615 428
@@ -627,7 +440,7 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
627 * @param cont Callback function 440 * @param cont Callback function
628 * @param cont_cls Closure for the callback function 441 * @param cont_cls Closure for the callback function
629 */ 442 */
630struct GNUNET_SCALARPRODUCT_Handle * 443struct GNUNET_SCALARPRODUCT_ComputationHandle *
631GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, 444GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
632 const struct GNUNET_HashCode * key, 445 const struct GNUNET_HashCode * key,
633 const struct GNUNET_PeerIdentity *peer, 446 const struct GNUNET_PeerIdentity *peer,
@@ -638,38 +451,39 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
638 GNUNET_SCALARPRODUCT_DatumProcessor cont, 451 GNUNET_SCALARPRODUCT_DatumProcessor cont,
639 void *cont_cls) 452 void *cont_cls)
640{ 453{
641 struct GNUNET_CLIENT_Connection *client; 454 struct GNUNET_SCALARPRODUCT_ComputationHandle *h;
642 struct GNUNET_SCALARPRODUCT_Handle *h;
643 struct GNUNET_SCALARPRODUCT_client_request *msg; 455 struct GNUNET_SCALARPRODUCT_client_request *msg;
644 int32_t * vector; 456 int32_t * vector;
645 uint16_t size; 457 uint16_t size;
646 uint64_t i; 458 uint64_t i;
647 459
648 GNUNET_assert(key);
649 GNUNET_assert(peer);
650 GNUNET_assert(elements);
651 GNUNET_assert(mask);
652 GNUNET_assert(cont);
653 GNUNET_assert(element_count > 1);
654 GNUNET_assert(mask_bytes != 0);
655 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) 460 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
656 + element_count * sizeof (int32_t) 461 + element_count * sizeof (int32_t)
657 + mask_length); 462 + mask_length);
658 client = GNUNET_CLIENT_connect ("scalarproduct", cfg); 463
659 464 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
660 if (!client) 465 h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
466 if (!h->client)
661 { 467 {
662 LOG (GNUNET_ERROR_TYPE_ERROR, 468 LOG (GNUNET_ERROR_TYPE_ERROR,
663 _ ("Failed to connect to the scalarproduct service\n")); 469 _ ("Failed to connect to the scalarproduct service\n"));
470 GNUNET_free(h);
664 return NULL; 471 return NULL;
665 } 472 }
473 h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
474 if (!h->th){
475 LOG (GNUNET_ERROR_TYPE_ERROR,
476 _("Failed to send a message to the statistics service\n"));
477 GNUNET_CLIENT_disconnect(h->client);
478 GNUNET_free(h);
479 return NULL;
480 }
481
666 size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_length; 482 size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_length;
667 483
668 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle);
669 h->cont_datum = cont; 484 h->cont_datum = cont;
670 h->cont_cls = cont_cls; 485 h->cont_cls = cont_cls;
671 h->response_proc = &process_status_message; 486 h->response_proc = &process_status_message;
672 h->client = client;
673 h->cfg = cfg; 487 h->cfg = cfg;
674 h->msg = GNUNET_malloc (size); 488 h->msg = GNUNET_malloc (size);
675 memcpy (&h->key, key, sizeof (struct GNUNET_HashCode)); 489 memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
@@ -689,17 +503,21 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
689 memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode)); 503 memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
690 memcpy (&vector[element_count], mask, mask_length); 504 memcpy (&vector[element_count], mask, mask_length);
691 505
692 h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
693 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, 506 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
694 GNUNET_TIME_UNIT_FOREVER_REL, 507 GNUNET_TIME_UNIT_FOREVER_REL,
695 GNUNET_YES, // retry is OK in the initial stage 508 GNUNET_YES, // retry is OK in the initial stage
696 &transmit_request, h); 509 &transmit_request, h);
697 if ( !h->th) 510 if (!h->th)
698 { 511 {
699 LOG (GNUNET_ERROR_TYPE_ERROR, 512 LOG (GNUNET_ERROR_TYPE_ERROR,
700 _ ("Failed to send a message to the scalarproduct service\n")); 513 _ ("Failed to send a message to the scalarproduct service\n"));
514 GNUNET_STATISTICS_destroy(h->GNUNET_YES);
515 GNUNET_CLIENT_disconnect(h->client);
516 GNUNET_free(h->msg);
517 GNUNET_free(h);
701 return NULL; 518 return NULL;
702 } 519 }
520 GNUNET_CONTAINER_DLL_insert (head, tail, h);
703 return h; 521 return h;
704} 522}
705 523
@@ -709,27 +527,24 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
709 * @param h handle to the scalarproduct 527 * @param h handle to the scalarproduct
710 */ 528 */
711void 529void
712GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_Handle * h) 530GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_ComputationHandle * h)
713{ 531{
714 struct GNUNET_SCALARPRODUCT_QueueEntry * qe; 532 struct GNUNET_SCALARPRODUCT_ComputationHandle * qe;
715 533
716 LOG (GNUNET_ERROR_TYPE_INFO, 534 LOG (GNUNET_ERROR_TYPE_INFO,
717 "Disconnecting from VectorProduct\n"); 535 "Disconnecting from VectorProduct\n");
718 536
719 while (NULL != h->queue_head) 537 for (qe = head; head != NULL; qe = head)
720 {
721 GNUNET_assert (NULL != (qe = free_queue_head_entry (h)));
722 qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_ServiceDisconnected);
723 }
724
725 if (h->client != NULL)
726 { 538 {
539 GNUNET_CONTAINER_DLL_remove (head, tail, qe);
540 if (NULL == qe->th)
541 GNUNET_CLIENT_notify_transmit_ready_cancel(qe->th);
727 GNUNET_CLIENT_disconnect (h->client); 542 GNUNET_CLIENT_disconnect (h->client);
728 h->client = NULL; 543 GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
544 qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_ServiceDisconnected);
545 GNUNET_free(qe->msg);
546 GNUNET_free(qe);
729 } 547 }
730
731 GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
732 h->stats = NULL;
733} 548}
734 549
735/* end of ext_api.c */ 550/* end of ext_api.c */