aboutsummaryrefslogtreecommitdiff
path: root/src/ats
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-06-23 17:31:13 +0000
committerChristian Grothoff <christian@grothoff.org>2016-06-23 17:31:13 +0000
commit9a3d2cb8ea446ae1291286b94a7784676ea134cb (patch)
tree20abd14fb32c4720e4ca6b18513c11d22185be43 /src/ats
parent021e5d47b4ac2fd2088cee65e551fd7e6114e99b (diff)
downloadgnunet-9a3d2cb8ea446ae1291286b94a7784676ea134cb.tar.gz
gnunet-9a3d2cb8ea446ae1291286b94a7784676ea134cb.zip
convert perf API to new MQ API
Diffstat (limited to 'src/ats')
-rw-r--r--src/ats/ats_api_performance.c655
1 files changed, 268 insertions, 387 deletions
diff --git a/src/ats/ats_api_performance.c b/src/ats/ats_api_performance.c
index 48bb2ebf4..60827e174 100644
--- a/src/ats/ats_api_performance.c
+++ b/src/ats/ats_api_performance.c
@@ -1,21 +1,21 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2010,2011 GNUnet e.V. 3 Copyright (C) 2010, 2011, 2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your 7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version. 8 option) any later version.
9 9
10 GNUnet is distributed in the hope that it will be useful, but 10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details. 13 General Public License for more details.
14 14
15 You should have received a copy of the GNU General Public License 15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the 16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, 17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA. 18 Boston, MA 02110-1301, USA.
19 */ 19 */
20/** 20/**
21 * @file ats/ats_api_performance.c 21 * @file ats/ats_api_performance.c
@@ -32,35 +32,6 @@
32 32
33 33
34/** 34/**
35 * Message in linked list we should send to the ATS service. The
36 * actual binary message follows this struct.
37 */
38struct PendingMessage
39{
40
41 /**
42 * Kept in a DLL.
43 */
44 struct PendingMessage *next;
45
46 /**
47 * Kept in a DLL.
48 */
49 struct PendingMessage *prev;
50
51 /**
52 * Size of the message.
53 */
54 size_t size;
55
56 /**
57 * Is this the 'ATS_START' message?
58 */
59 int is_init;
60};
61
62
63/**
64 * Linked list of pending reservations. 35 * Linked list of pending reservations.
65 */ 36 */
66struct GNUNET_ATS_ReservationContext 37struct GNUNET_ATS_ReservationContext
@@ -185,17 +156,7 @@ struct GNUNET_ATS_PerformanceHandle
185 /** 156 /**
186 * Connection to ATS service. 157 * Connection to ATS service.
187 */ 158 */
188 struct GNUNET_CLIENT_Connection *client; 159 struct GNUNET_MQ_Handle *mq;
189
190 /**
191 * Head of list of messages for the ATS service.
192 */
193 struct PendingMessage *pending_head;
194
195 /**
196 * Tail of list of messages for the ATS service
197 */
198 struct PendingMessage *pending_tail;
199 160
200 /** 161 /**
201 * Head of linked list of pending reservation requests. 162 * Head of linked list of pending reservation requests.
@@ -273,182 +234,162 @@ reconnect_task (void *cls)
273 234
274 235
275/** 236/**
276 * Transmit messages from the message queue to the service 237 * Reconnect to the ATS service, something went wrong.
277 * (if there are any, and if we are not already trying).
278 *
279 * @param ph handle to use
280 */
281static void
282do_transmit (struct GNUNET_ATS_PerformanceHandle *ph);
283
284
285/**
286 * Type of a function to call when we receive a message
287 * from the service.
288 * 238 *
289 * @param cls the `struct GNUNET_ATS_SchedulingHandle` 239 * @param ph handle to reconnect
290 * @param msg message received, NULL on timeout or fatal error
291 */ 240 */
292static void 241static void
293process_ats_message (void *cls, 242do_reconnect (struct GNUNET_ATS_PerformanceHandle *ph)
294 const struct GNUNET_MessageHeader *msg);
295
296
297/**
298 * We can now transmit a message to ATS. Do it.
299 *
300 * @param cls the `struct GNUNET_ATS_PerformanceHandle`
301 * @param size number of bytes we can transmit to ATS
302 * @param buf where to copy the messages
303 * @return number of bytes copied into @a buf
304 */
305static size_t
306transmit_message_to_ats (void *cls,
307 size_t size,
308 void *buf)
309{ 243{
310 struct GNUNET_ATS_PerformanceHandle *ph = cls; 244 struct GNUNET_ATS_ReservationContext *rc;
311 struct PendingMessage *p; 245 struct GNUNET_ATS_AddressListHandle *alh;
312 size_t ret; 246 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_zero;
313 char *cbuf; 247
314 248 if (NULL != ph->mq)
315 ph->th = NULL;
316 ret = 0;
317 cbuf = buf;
318 while ((NULL != (p = ph->pending_head)) && (p->size <= size))
319 { 249 {
320 memcpy (&cbuf[ret], &p[1], p->size); 250 GNUNET_MQ_destroy (ph->mq);
321 ret += p->size; 251 ph->mq = NULL;
322 size -= p->size;
323 GNUNET_CONTAINER_DLL_remove (ph->pending_head,
324 ph->pending_tail,
325 p);
326 GNUNET_free(p);
327 } 252 }
328 do_transmit (ph); 253 while (NULL != (rc = ph->reservation_head))
329 if (GNUNET_NO == ph->in_receive)
330 { 254 {
331 ph->in_receive = GNUNET_YES; 255 GNUNET_CONTAINER_DLL_remove (ph->reservation_head,
332 GNUNET_CLIENT_receive (ph->client, 256 ph->reservation_tail,
333 &process_ats_message, 257 rc);
334 ph, 258 if (NULL != rc->rcb)
335 GNUNET_TIME_UNIT_FOREVER_REL); 259 rc->rcb (rc->rcb_cls,
260 NULL,
261 0,
262 GNUNET_TIME_UNIT_FOREVER_REL);
263 GNUNET_free (rc);
336 } 264 }
337 return ret; 265 bandwidth_zero.value__ = htonl (0);
266 while (NULL != (alh = ph->addresslist_head))
267 {
268 GNUNET_CONTAINER_DLL_remove (ph->addresslist_head,
269 ph->addresslist_tail,
270 alh);
271 if (NULL != alh->cb)
272 alh->cb (alh->cb_cls,
273 NULL,
274 GNUNET_NO,
275 bandwidth_zero,
276 bandwidth_zero,
277 NULL);
278 GNUNET_free (alh);
279 }
280 if (NULL != ph->addr_info_cb)
281 {
282 /* Indicate reconnect */
283 ph->addr_info_cb (ph->addr_info_cb_cls,
284 NULL,
285 GNUNET_NO,
286 bandwidth_zero,
287 bandwidth_zero,
288 NULL);
289 }
290 ph->backoff = GNUNET_TIME_STD_BACKOFF (ph->backoff);
291 ph->task = GNUNET_SCHEDULER_add_delayed (ph->backoff,
292 &reconnect_task,
293 ph);
338} 294}
339 295
340 296
341/** 297/**
342 * Transmit messages from the message queue to the service 298 * We received a peer information message. Validate and process it.
343 * (if there are any, and if we are not already trying).
344 * 299 *
345 * @param ph handle to use 300 * @param cls our context with the callback
301 * @param pi the message
302 * @return #GNUNET_OK if the message was well-formed
346 */ 303 */
347static void 304static int
348do_transmit (struct GNUNET_ATS_PerformanceHandle *ph) 305check_peer_information (void *cls,
306 const struct PeerInformationMessage *pi)
349{ 307{
350 struct PendingMessage *p; 308 const char *plugin_address;
309 const char *plugin_name;
310 uint16_t plugin_address_length;
311 uint16_t plugin_name_length;
351 312
352 if (NULL != ph->th) 313 plugin_address_length = ntohs (pi->address_length);
353 return; 314 plugin_name_length = ntohs (pi->plugin_name_length);
354 if (NULL == (p = ph->pending_head)) 315 plugin_address = (const char *) &pi[1];
355 return; 316 plugin_name = &plugin_address[plugin_address_length];
356 if (NULL == ph->client) 317 if ( (plugin_address_length + plugin_name_length
357 return; /* currently reconnecting */ 318 + sizeof(struct PeerInformationMessage) != ntohs (pi->header.size)) ||
358 ph->th = GNUNET_CLIENT_notify_transmit_ready (ph->client, 319 (plugin_name[plugin_name_length - 1] != '\0'))
359 p->size, 320 {
360 GNUNET_TIME_UNIT_FOREVER_REL, 321 GNUNET_break(0);
361 GNUNET_YES, 322 return GNUNET_SYSERR;
362 &transmit_message_to_ats, ph); 323 }
324 return GNUNET_OK;
363} 325}
364 326
365 327
366/** 328/**
367 * We received a peer information message. Validate and process it. 329 * We received a peer information message. Validate and process it.
368 * 330 *
369 * @param ph our context with the callback 331 * @param cls our context with the callback
370 * @param msg the message 332 * @param pi the message
371 * @return #GNUNET_OK if the message was well-formed 333 * @return #GNUNET_OK if the message was well-formed
372 */ 334 */
373static int 335static void
374process_pi_message (struct GNUNET_ATS_PerformanceHandle *ph, 336handle_peer_information (void *cls,
375 const struct GNUNET_MessageHeader *msg) 337 const struct PeerInformationMessage *pi)
376{ 338{
377 const struct PeerInformationMessage *pi; 339 struct GNUNET_ATS_PerformanceHandle *ph = cls;
378 const char *plugin_address; 340 const char *plugin_address;
379 const char *plugin_name; 341 const char *plugin_name;
380 struct GNUNET_HELLO_Address address; 342 struct GNUNET_HELLO_Address address;
381 uint16_t plugin_address_length; 343 uint16_t plugin_address_length;
382 uint16_t plugin_name_length;
383 int addr_active; 344 int addr_active;
384 struct GNUNET_ATS_Properties prop; 345 struct GNUNET_ATS_Properties prop;
385 346
386 if (ntohs (msg->size) < sizeof(struct PeerInformationMessage)) 347 if (NULL == ph->addr_info_cb)
387 { 348 return;
388 GNUNET_break(0);
389 return GNUNET_SYSERR;
390 }
391 pi = (const struct PeerInformationMessage *) msg;
392 plugin_address_length = ntohs (pi->address_length); 349 plugin_address_length = ntohs (pi->address_length);
393 plugin_name_length = ntohs (pi->plugin_name_length);
394 addr_active = (int) ntohl (pi->address_active); 350 addr_active = (int) ntohl (pi->address_active);
395 plugin_address = (const char *) &pi[1]; 351 plugin_address = (const char *) &pi[1];
396 plugin_name = &plugin_address[plugin_address_length]; 352 plugin_name = &plugin_address[plugin_address_length];
397 if ((plugin_address_length + plugin_name_length
398 + sizeof(struct PeerInformationMessage) != ntohs (msg->size))
399 || (plugin_name[plugin_name_length - 1] != '\0'))
400 {
401 GNUNET_break(0);
402 return GNUNET_SYSERR;
403 }
404 353
405 if (NULL != ph->addr_info_cb) 354 GNUNET_ATS_properties_ntoh (&prop,
406 { 355 &pi->properties);
407 GNUNET_ATS_properties_ntoh (&prop, 356 address.peer = pi->peer;
408 &pi->properties); 357 address.local_info = (enum GNUNET_HELLO_AddressInfo) ntohl (pi->address_local_info);
409 address.peer = pi->peer; 358 address.address = plugin_address;
410 address.local_info = (enum GNUNET_HELLO_AddressInfo) ntohl (pi->address_local_info); 359 address.address_length = plugin_address_length;
411 address.address = plugin_address; 360 address.transport_name = plugin_name;
412 address.address_length = plugin_address_length; 361 ph->addr_info_cb (ph->addr_info_cb_cls,
413 address.transport_name = plugin_name; 362 &address,
414 ph->addr_info_cb (ph->addr_info_cb_cls, 363 addr_active,
415 &address, 364 pi->bandwidth_out,
416 addr_active, 365 pi->bandwidth_in,
417 pi->bandwidth_out, 366 &prop);
418 pi->bandwidth_in,
419 &prop);
420 }
421 return GNUNET_OK;
422} 367}
423 368
424 369
425/** 370/**
426 * We received a reservation result message. Validate and process it. 371 * We received a reservation result message. Validate and process it.
427 * 372 *
428 * @param ph our context with the callback 373 * @param cls our context with the callback
429 * @param msg the message 374 * @param rr the message
430 * @return #GNUNET_OK if the message was well-formed
431 */ 375 */
432static int 376static void
433process_rr_message (struct GNUNET_ATS_PerformanceHandle *ph, 377handle_reservation_result (void *cls,
434 const struct GNUNET_MessageHeader *msg) 378 const struct ReservationResultMessage *rr)
435{ 379{
436 const struct ReservationResultMessage *rr; 380 struct GNUNET_ATS_PerformanceHandle *ph = cls;
437 struct GNUNET_ATS_ReservationContext *rc; 381 struct GNUNET_ATS_ReservationContext *rc;
438 int32_t amount; 382 int32_t amount;
439 383
440 if (ntohs (msg->size) < sizeof(struct ReservationResultMessage))
441 {
442 GNUNET_break(0);
443 return GNUNET_SYSERR;
444 }
445 rr = (const struct ReservationResultMessage *) msg;
446 amount = ntohl (rr->amount); 384 amount = ntohl (rr->amount);
447 rc = ph->reservation_head; 385 rc = ph->reservation_head;
448 if (0 != memcmp (&rr->peer, &rc->peer, sizeof(struct GNUNET_PeerIdentity))) 386 if (0 != memcmp (&rr->peer,
387 &rc->peer,
388 sizeof(struct GNUNET_PeerIdentity)))
449 { 389 {
450 GNUNET_break(0); 390 GNUNET_break(0);
451 return GNUNET_SYSERR; 391 reconnect (ph);
392 return;
452 } 393 }
453 GNUNET_CONTAINER_DLL_remove (ph->reservation_head, 394 GNUNET_CONTAINER_DLL_remove (ph->reservation_head,
454 ph->reservation_tail, 395 ph->reservation_tail,
@@ -457,41 +398,71 @@ process_rr_message (struct GNUNET_ATS_PerformanceHandle *ph,
457 (NULL != rc->rcb) ) 398 (NULL != rc->rcb) )
458 { 399 {
459 /* tell client if not cancelled */ 400 /* tell client if not cancelled */
460 if (rc->rcb != NULL ) 401 if (NULL != rc->rcb)
461 rc->rcb (rc->rcb_cls, 402 rc->rcb (rc->rcb_cls,
462 &rr->peer, 403 &rr->peer,
463 amount, 404 amount,
464 GNUNET_TIME_relative_ntoh (rr->res_delay)); 405 GNUNET_TIME_relative_ntoh (rr->res_delay));
465 GNUNET_free(rc); 406 GNUNET_free (rc);
466 return GNUNET_OK; 407 return;
467 } 408 }
468 /* amount non-zero, but client cancelled, consider undo! */ 409 /* amount non-zero, but client cancelled, consider undo! */
469 if (GNUNET_YES != rc->undo) 410 if (GNUNET_YES != rc->undo)
470 { 411 {
471 GNUNET_free(rc); 412 GNUNET_free (rc);
472 return GNUNET_OK; /* do not try to undo failed undos or negative amounts */ 413 return; /* do not try to undo failed undos or negative amounts */
473 } 414 }
474 GNUNET_free(rc); 415 GNUNET_free (rc);
475 (void) GNUNET_ATS_reserve_bandwidth (ph, 416 (void) GNUNET_ATS_reserve_bandwidth (ph,
476 &rr->peer, 417 &rr->peer,
477 -amount, 418 -amount,
478 NULL, NULL); 419 NULL, NULL);
479 return GNUNET_OK;
480} 420}
481 421
482 422
483/** 423/**
484 * We received a PeerInformationMessage. Validate and process it. 424 * We received a PeerInformationMessage. Validate it.
485 * 425 *
486 * @param ph our context with the callback 426 * @param cls our context with the callback
487 * @param msg the message 427 * @param msg the message
488 * @return #GNUNET_OK if the message was well-formed 428 * @return #GNUNET_OK if the message was well-formed
489 */ 429 */
490static int 430static int
491process_ar_message (struct GNUNET_ATS_PerformanceHandle *ph, 431check_address_list (void *cls,
492 const struct GNUNET_MessageHeader *msg) 432 const struct PeerInformationMessage *pi)
433{
434 const char *plugin_address;
435 const char *plugin_name;
436 uint16_t plugin_address_length;
437 uint16_t plugin_name_length;
438
439 plugin_address_length = ntohs (pi->address_length);
440 plugin_name_length = ntohs (pi->plugin_name_length);
441 plugin_address = (const char *) &pi[1];
442 plugin_name = &plugin_address[plugin_address_length];
443 if ( (plugin_address_length + plugin_name_length
444 + sizeof (struct PeerInformationMessage) != ntohs (pi->header.size)) ||
445 (plugin_name[plugin_name_length - 1] != '\0') )
446 {
447 GNUNET_break(0);
448 return GNUNET_SYSERR;
449 }
450 return GNUNET_OK;
451}
452
453
454/**
455 * We received a #GNUNET_MESSAGE_TYPE_ATS_ADDRESSLIST_RESPONSE.
456 * Process it.
457 *
458 * @param cls our context with the callback
459 * @param msg the message
460 */
461static void
462handle_address_list (void *cls,
463 const struct PeerInformationMessage *pi)
493{ 464{
494 const struct PeerInformationMessage *pi; 465 struct GNUNET_ATS_PerformanceHandle *ph = cls;
495 struct GNUNET_ATS_AddressListHandle *alh; 466 struct GNUNET_ATS_AddressListHandle *alh;
496 struct GNUNET_ATS_AddressListHandle *next; 467 struct GNUNET_ATS_AddressListHandle *next;
497 const char *plugin_address; 468 const char *plugin_address;
@@ -505,25 +476,12 @@ process_ar_message (struct GNUNET_ATS_PerformanceHandle *ph,
505 uint32_t active; 476 uint32_t active;
506 uint32_t id; 477 uint32_t id;
507 478
508 if (ntohs (msg->size) < sizeof(struct PeerInformationMessage))
509 {
510 GNUNET_break(0);
511 return GNUNET_SYSERR;
512 }
513 pi = (const struct PeerInformationMessage *) msg;
514 id = ntohl (pi->id); 479 id = ntohl (pi->id);
515 active = ntohl (pi->address_active); 480 active = ntohl (pi->address_active);
516 plugin_address_length = ntohs (pi->address_length); 481 plugin_address_length = ntohs (pi->address_length);
517 plugin_name_length = ntohs (pi->plugin_name_length); 482 plugin_name_length = ntohs (pi->plugin_name_length);
518 plugin_address = (const char *) &pi[1]; 483 plugin_address = (const char *) &pi[1];
519 plugin_name = &plugin_address[plugin_address_length]; 484 plugin_name = &plugin_address[plugin_address_length];
520 if ( (plugin_address_length + plugin_name_length
521 + sizeof (struct PeerInformationMessage) != ntohs (msg->size)) ||
522 (plugin_name[plugin_name_length - 1] != '\0') )
523 {
524 GNUNET_break(0);
525 return GNUNET_SYSERR;
526 }
527 LOG (GNUNET_ERROR_TYPE_DEBUG, 485 LOG (GNUNET_ERROR_TYPE_DEBUG,
528 "Received ATS_ADDRESSLIST_RESPONSE message for peer %s and plugin %s\n", 486 "Received ATS_ADDRESSLIST_RESPONSE message for peer %s and plugin %s\n",
529 GNUNET_i2s (&pi->peer), 487 GNUNET_i2s (&pi->peer),
@@ -537,10 +495,7 @@ process_ar_message (struct GNUNET_ATS_PerformanceHandle *ph,
537 break; 495 break;
538 } 496 }
539 if (NULL == alh) 497 if (NULL == alh)
540 { 498 return; /* was canceled */
541 /* was canceled */
542 return GNUNET_SYSERR;
543 }
544 499
545 memset (&allzeros, '\0', sizeof (allzeros)); 500 memset (&allzeros, '\0', sizeof (allzeros));
546 if ( (0 == memcmp (&allzeros, &pi->peer, sizeof(allzeros))) && 501 if ( (0 == memcmp (&allzeros, &pi->peer, sizeof(allzeros))) &&
@@ -562,7 +517,7 @@ process_ar_message (struct GNUNET_ATS_PerformanceHandle *ph,
562 bandwidth_zero, 517 bandwidth_zero,
563 NULL); 518 NULL);
564 GNUNET_free (alh); 519 GNUNET_free (alh);
565 return GNUNET_OK; 520 return;
566 } 521 }
567 522
568 address.peer = pi->peer; 523 address.peer = pi->peer;
@@ -582,87 +537,24 @@ process_ar_message (struct GNUNET_ATS_PerformanceHandle *ph,
582 pi->bandwidth_in, 537 pi->bandwidth_in,
583 &prop); 538 &prop);
584 } 539 }
585 return GNUNET_OK;
586} 540}
587 541
588 542
589/** 543/**
590 * Type of a function to call when we receive a message 544 * Generic error handler, called with the appropriate error code and
591 * from the service. 545 * the same closure specified at the creation of the message queue.
546 * Not every message queue implementation supports an error handler.
592 * 547 *
593 * @param cls the 'struct GNUNET_ATS_SchedulingHandle' 548 * @param cls closure with the `struct GNUNET_ATS_PerformanceHandle *`
594 * @param msg message received, NULL on timeout or fatal error 549 * @param error error code
595 */ 550 */
596static void 551static void
597process_ats_message (void *cls, 552mq_error_handler (void *cls,
598 const struct GNUNET_MessageHeader *msg) 553 enum GNUNET_MQ_Error error)
599{ 554{
600 struct GNUNET_ATS_PerformanceHandle *ph = cls; 555 struct GNUNET_ATS_PerformanceHandle *ph = cls;
601 556
602 if (NULL == msg) 557 do_reconnect (ph);
603 goto reconnect;
604 switch (ntohs (msg->type))
605 {
606 case GNUNET_MESSAGE_TYPE_ATS_PEER_INFORMATION:
607 if (GNUNET_OK != process_pi_message (ph, msg))
608 {
609 GNUNET_break (0);
610 goto reconnect;
611 }
612 break;
613 case GNUNET_MESSAGE_TYPE_ATS_RESERVATION_RESULT:
614 if (GNUNET_OK != process_rr_message (ph, msg))
615 {
616 GNUNET_break (0);
617 goto reconnect;
618 }
619 break;
620 case GNUNET_MESSAGE_TYPE_ATS_ADDRESSLIST_RESPONSE:
621 if (GNUNET_OK != process_ar_message (ph, msg))
622 {
623 GNUNET_break (0);
624 goto reconnect;
625 }
626 break;
627 default:
628 GNUNET_break (0);
629 goto reconnect;
630 }
631 ph->backoff = GNUNET_TIME_UNIT_ZERO;
632 GNUNET_CLIENT_receive (ph->client,
633 &process_ats_message,
634 ph,
635 GNUNET_TIME_UNIT_FOREVER_REL);
636 return;
637
638 reconnect:
639 LOG (GNUNET_ERROR_TYPE_DEBUG,
640 "Reconnecting!\n");
641 if (NULL != ph->th)
642 {
643 GNUNET_CLIENT_notify_transmit_ready_cancel (ph->th);
644 ph->th = NULL;
645 }
646 if (NULL != ph->client)
647 {
648 GNUNET_CLIENT_disconnect (ph->client);
649 ph->client = NULL;
650 ph->in_receive = GNUNET_NO;
651 if (NULL != ph->addr_info_cb)
652 {
653 /* Indicate reconnect */
654 ph->addr_info_cb (ph->addr_info_cb_cls,
655 NULL,
656 GNUNET_NO,
657 GNUNET_BANDWIDTH_value_init (0),
658 GNUNET_BANDWIDTH_value_init (0),
659 NULL);
660 }
661 }
662 ph->backoff = GNUNET_TIME_STD_BACKOFF (ph->backoff);
663 ph->task = GNUNET_SCHEDULER_add_delayed (ph->backoff,
664 &reconnect_task,
665 ph);
666} 558}
667 559
668 560
@@ -674,30 +566,39 @@ process_ats_message (void *cls,
674static void 566static void
675reconnect (struct GNUNET_ATS_PerformanceHandle *ph) 567reconnect (struct GNUNET_ATS_PerformanceHandle *ph)
676{ 568{
677 struct PendingMessage *p; 569 GNUNET_MQ_hd_var_size (peer_information,
570 GNUNET_MESSAGE_TYPE_ATS_PEER_INFORMATION,
571 struct PeerInformationMessage);
572 GNUNET_MQ_hd_fixed_size (reservation_result,
573 GNUNET_MESSAGE_TYPE_ATS_RESERVATION_RESULT,
574 struct ReservationResultMessage);
575 GNUNET_MQ_hd_var_size (address_list,
576 GNUNET_MESSAGE_TYPE_ATS_ADDRESSLIST_RESPONSE,
577 struct PeerInformationMessage);
578 struct GNUNET_MQ_MessageHandler handlers[] = {
579 make_peer_information_handler (ph),
580 make_reservation_result_handler (ph),
581 make_address_list_handler (ph),
582 GNUNET_MQ_handler_end ()
583 };
584 struct GNUNET_MQ_Envelope *env;
678 struct ClientStartMessage *init; 585 struct ClientStartMessage *init;
679 586
680 GNUNET_assert (NULL == ph->client); 587 GNUNET_assert (NULL == ph->mq);
681 ph->client = GNUNET_CLIENT_connect ("ats", 588 ph->mq = GNUNET_CLIENT_connecT (ph->cfg,
682 ph->cfg); 589 "ats",
683 GNUNET_assert (NULL != ph->client); 590 handlers,
684 if ((NULL == (p = ph->pending_head)) || (GNUNET_YES != p->is_init)) 591 &mq_error_handler,
685 { 592 ph);
686 p = GNUNET_malloc (sizeof (struct PendingMessage) + 593 if (NULL == ph->mq)
687 sizeof (struct ClientStartMessage)); 594 return;
688 p->size = sizeof(struct ClientStartMessage); 595 env = GNUNET_MQ_msg (init,
689 p->is_init = GNUNET_YES; 596 GNUNET_MESSAGE_TYPE_ATS_START);
690 init = (struct ClientStartMessage *) &p[1]; 597 init->start_flag = htonl ( (NULL == ph->addr_info_cb)
691 init->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_START); 598 ? START_FLAG_PERFORMANCE_NO_PIC
692 init->header.size = htons (sizeof(struct ClientStartMessage)); 599 : START_FLAG_PERFORMANCE_WITH_PIC);
693 init->start_flag = htonl ( (NULL == ph->addr_info_cb) 600 GNUNET_MQ_send (ph->mq,
694 ? START_FLAG_PERFORMANCE_NO_PIC 601 env);
695 : START_FLAG_PERFORMANCE_WITH_PIC);
696 GNUNET_CONTAINER_DLL_insert (ph->pending_head,
697 ph->pending_tail,
698 p);
699 }
700 do_transmit (ph);
701} 602}
702 603
703 604
@@ -721,8 +622,12 @@ GNUNET_ATS_performance_init (const struct GNUNET_CONFIGURATION_Handle *cfg,
721 ph->cfg = cfg; 622 ph->cfg = cfg;
722 ph->addr_info_cb = addr_info_cb; 623 ph->addr_info_cb = addr_info_cb;
723 ph->addr_info_cb_cls = addr_info_cb_cls; 624 ph->addr_info_cb_cls = addr_info_cb_cls;
724 ph->id = 0;
725 reconnect (ph); 625 reconnect (ph);
626 if (NULL == ph->mq)
627 {
628 GNUNET_free (ph);
629 return NULL;
630 }
726 return ph; 631 return ph;
727} 632}
728 633
@@ -735,17 +640,9 @@ GNUNET_ATS_performance_init (const struct GNUNET_CONFIGURATION_Handle *cfg,
735void 640void
736GNUNET_ATS_performance_done (struct GNUNET_ATS_PerformanceHandle *ph) 641GNUNET_ATS_performance_done (struct GNUNET_ATS_PerformanceHandle *ph)
737{ 642{
738 struct PendingMessage *p;
739 struct GNUNET_ATS_ReservationContext *rc; 643 struct GNUNET_ATS_ReservationContext *rc;
740 struct GNUNET_ATS_AddressListHandle *alh; 644 struct GNUNET_ATS_AddressListHandle *alh;
741 645
742 while (NULL != (p = ph->pending_head))
743 {
744 GNUNET_CONTAINER_DLL_remove (ph->pending_head,
745 ph->pending_tail,
746 p);
747 GNUNET_free (p);
748 }
749 while (NULL != (alh = ph->addresslist_head)) 646 while (NULL != (alh = ph->addresslist_head))
750 { 647 {
751 GNUNET_CONTAINER_DLL_remove (ph->addresslist_head, 648 GNUNET_CONTAINER_DLL_remove (ph->addresslist_head,
@@ -761,16 +658,15 @@ GNUNET_ATS_performance_done (struct GNUNET_ATS_PerformanceHandle *ph)
761 GNUNET_break (NULL == rc->rcb); 658 GNUNET_break (NULL == rc->rcb);
762 GNUNET_free (rc); 659 GNUNET_free (rc);
763 } 660 }
764
765 if (NULL != ph->task) 661 if (NULL != ph->task)
766 { 662 {
767 GNUNET_SCHEDULER_cancel (ph->task); 663 GNUNET_SCHEDULER_cancel (ph->task);
768 ph->task = NULL; 664 ph->task = NULL;
769 } 665 }
770 if (NULL != ph->client) 666 if (NULL != ph->mq)
771 { 667 {
772 GNUNET_CLIENT_disconnect (ph->client); 668 GNUNET_MQ_destroy (ph->mq);
773 ph->client = NULL; 669 ph->mq = NULL;
774 } 670 }
775 GNUNET_free (ph); 671 GNUNET_free (ph);
776} 672}
@@ -779,7 +675,7 @@ GNUNET_ATS_performance_done (struct GNUNET_ATS_PerformanceHandle *ph)
779/** 675/**
780 * Reserve inbound bandwidth from the given peer. ATS will look at 676 * Reserve inbound bandwidth from the given peer. ATS will look at
781 * the current amount of traffic we receive from the peer and ensure 677 * the current amount of traffic we receive from the peer and ensure
782 * that the peer could add 'amount' of data to its stream. 678 * that the peer could add @a amount of data to its stream.
783 * 679 *
784 * @param ph performance handle 680 * @param ph performance handle
785 * @param peer identifies the peer 681 * @param peer identifies the peer
@@ -794,12 +690,15 @@ struct GNUNET_ATS_ReservationContext *
794GNUNET_ATS_reserve_bandwidth (struct GNUNET_ATS_PerformanceHandle *ph, 690GNUNET_ATS_reserve_bandwidth (struct GNUNET_ATS_PerformanceHandle *ph,
795 const struct GNUNET_PeerIdentity *peer, 691 const struct GNUNET_PeerIdentity *peer,
796 int32_t amount, 692 int32_t amount,
797 GNUNET_ATS_ReservationCallback rcb, void *rcb_cls) 693 GNUNET_ATS_ReservationCallback rcb,
694 void *rcb_cls)
798{ 695{
799 struct GNUNET_ATS_ReservationContext *rc; 696 struct GNUNET_ATS_ReservationContext *rc;
800 struct PendingMessage *p; 697 struct GNUNET_MQ_Envelope *env;
801 struct ReservationRequestMessage *m; 698 struct ReservationRequestMessage *m;
802 699
700 if (NULL == ph->mq)
701 return NULL;
803 rc = GNUNET_new (struct GNUNET_ATS_ReservationContext); 702 rc = GNUNET_new (struct GNUNET_ATS_ReservationContext);
804 rc->size = amount; 703 rc->size = amount;
805 rc->peer = *peer; 704 rc->peer = *peer;
@@ -811,20 +710,12 @@ GNUNET_ATS_reserve_bandwidth (struct GNUNET_ATS_PerformanceHandle *ph,
811 GNUNET_CONTAINER_DLL_insert_tail (ph->reservation_head, 710 GNUNET_CONTAINER_DLL_insert_tail (ph->reservation_head,
812 ph->reservation_tail, 711 ph->reservation_tail,
813 rc); 712 rc);
814 713 env = GNUNET_MQ_msg (m,
815 p = GNUNET_malloc (sizeof (struct PendingMessage) + 714 GNUNET_MESSAGE_TYPE_ATS_RESERVATION_REQUEST);
816 sizeof (struct ReservationRequestMessage));
817 p->size = sizeof(struct ReservationRequestMessage);
818 p->is_init = GNUNET_NO;
819 m = (struct ReservationRequestMessage *) &p[1];
820 m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_RESERVATION_REQUEST);
821 m->header.size = htons (sizeof(struct ReservationRequestMessage));
822 m->amount = htonl (amount); 715 m->amount = htonl (amount);
823 m->peer = *peer; 716 m->peer = *peer;
824 GNUNET_CONTAINER_DLL_insert_tail (ph->pending_head, 717 GNUNET_MQ_send (ph->mq,
825 ph->pending_tail, 718 env);
826 p);
827 do_transmit (ph);
828 return rc; 719 return rc;
829} 720}
830 721
@@ -832,7 +723,7 @@ GNUNET_ATS_reserve_bandwidth (struct GNUNET_ATS_PerformanceHandle *ph,
832/** 723/**
833 * Cancel request for reserving bandwidth. 724 * Cancel request for reserving bandwidth.
834 * 725 *
835 * @param rc context returned by the original GNUNET_ATS_reserve_bandwidth call 726 * @param rc context returned by the original #GNUNET_ATS_reserve_bandwidth() call
836 */ 727 */
837void 728void
838GNUNET_ATS_reserve_bandwidth_cancel (struct GNUNET_ATS_ReservationContext *rc) 729GNUNET_ATS_reserve_bandwidth_cancel (struct GNUNET_ATS_ReservationContext *rc)
@@ -844,7 +735,7 @@ GNUNET_ATS_reserve_bandwidth_cancel (struct GNUNET_ATS_ReservationContext *rc)
844/** 735/**
845 * Get information about addresses known to the ATS subsystem. 736 * Get information about addresses known to the ATS subsystem.
846 * 737 *
847 * @param handle the performance handle to use 738 * @param ph the performance handle to use
848 * @param peer peer idm can be NULL for all peers 739 * @param peer peer idm can be NULL for all peers
849 * @param all #GNUNET_YES to get information about all addresses or #GNUNET_NO to 740 * @param all #GNUNET_YES to get information about all addresses or #GNUNET_NO to
850 * get only address currently used 741 * get only address currently used
@@ -854,24 +745,28 @@ GNUNET_ATS_reserve_bandwidth_cancel (struct GNUNET_ATS_ReservationContext *rc)
854 * @return ats performance context 745 * @return ats performance context
855 */ 746 */
856struct GNUNET_ATS_AddressListHandle* 747struct GNUNET_ATS_AddressListHandle*
857GNUNET_ATS_performance_list_addresses (struct GNUNET_ATS_PerformanceHandle *handle, 748GNUNET_ATS_performance_list_addresses (struct GNUNET_ATS_PerformanceHandle *ph,
858 const struct GNUNET_PeerIdentity *peer, 749 const struct GNUNET_PeerIdentity *peer,
859 int all, 750 int all,
860 GNUNET_ATS_AddressInformationCallback infocb, 751 GNUNET_ATS_AddressInformationCallback infocb,
861 void *infocb_cls) 752 void *infocb_cls)
862{ 753{
863 struct GNUNET_ATS_AddressListHandle *alh; 754 struct GNUNET_ATS_AddressListHandle *alh;
864 struct PendingMessage *p; 755 struct GNUNET_MQ_Envelope *env;
865 struct AddressListRequestMessage *m; 756 struct AddressListRequestMessage *m;
866 757
758 if (NULL == ph->mq)
759 return NULL;
867 if (NULL == infocb) 760 if (NULL == infocb)
761 {
762 GNUNET_break (0);
868 return NULL; 763 return NULL;
764 }
869 alh = GNUNET_new (struct GNUNET_ATS_AddressListHandle); 765 alh = GNUNET_new (struct GNUNET_ATS_AddressListHandle);
870 alh->id = handle->id; 766 alh->id = ph->id++;
871 handle->id++;
872 alh->cb = infocb; 767 alh->cb = infocb;
873 alh->cb_cls = infocb_cls; 768 alh->cb_cls = infocb_cls;
874 alh->ph = handle; 769 alh->ph = ph;
875 alh->all_addresses = all; 770 alh->all_addresses = all;
876 if (NULL == peer) 771 if (NULL == peer)
877 { 772 {
@@ -882,25 +777,17 @@ GNUNET_ATS_performance_list_addresses (struct GNUNET_ATS_PerformanceHandle *hand
882 alh->all_peers = GNUNET_NO; 777 alh->all_peers = GNUNET_NO;
883 alh->peer = *peer; 778 alh->peer = *peer;
884 } 779 }
885 GNUNET_CONTAINER_DLL_insert (handle->addresslist_head, 780 GNUNET_CONTAINER_DLL_insert (ph->addresslist_head,
886 handle->addresslist_tail, 781 ph->addresslist_tail,
887 alh); 782 alh);
888 783 env = GNUNET_MQ_msg (m,
889 p = GNUNET_malloc (sizeof (struct PendingMessage) + 784 GNUNET_MESSAGE_TYPE_ATS_ADDRESSLIST_REQUEST);
890 sizeof (struct AddressListRequestMessage));
891 p->size = sizeof (struct AddressListRequestMessage);
892 m = (struct AddressListRequestMessage *) &p[1];
893 m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_ADDRESSLIST_REQUEST);
894 m->header.size = htons (sizeof(struct AddressListRequestMessage));
895 m->all = htonl (all); 785 m->all = htonl (all);
896 m->id = htonl (alh->id); 786 m->id = htonl (alh->id);
897 if (NULL != peer) 787 if (NULL != peer)
898 m->peer = *peer; 788 m->peer = *peer;
899 GNUNET_CONTAINER_DLL_insert_tail (handle->pending_head, 789 GNUNET_MQ_send (ph->mq,
900 handle->pending_tail, 790 env);
901 p);
902 do_transmit (handle);
903
904 return alh; 791 return alh;
905} 792}
906 793
@@ -908,15 +795,17 @@ GNUNET_ATS_performance_list_addresses (struct GNUNET_ATS_PerformanceHandle *hand
908/** 795/**
909 * Cancel a pending address listing operation 796 * Cancel a pending address listing operation
910 * 797 *
911 * @param handle the handle of the request to cancel 798 * @param alh the handle of the request to cancel
912 */ 799 */
913void 800void
914GNUNET_ATS_performance_list_addresses_cancel (struct GNUNET_ATS_AddressListHandle *handle) 801GNUNET_ATS_performance_list_addresses_cancel (struct GNUNET_ATS_AddressListHandle *alh)
915{ 802{
916 GNUNET_CONTAINER_DLL_remove (handle->ph->addresslist_head, 803 struct GNUNET_ATS_PerformanceHandle *ph = alh->ph;
917 handle->ph->addresslist_tail, 804
918 handle); 805 GNUNET_CONTAINER_DLL_remove (ph->addresslist_head,
919 GNUNET_free (handle); 806 ph->addresslist_tail,
807 alh);
808 GNUNET_free (alh);
920} 809}
921 810
922 811
@@ -947,16 +836,18 @@ GNUNET_ATS_print_preference_type (uint32_t type)
947 */ 836 */
948void 837void
949GNUNET_ATS_performance_change_preference (struct GNUNET_ATS_PerformanceHandle *ph, 838GNUNET_ATS_performance_change_preference (struct GNUNET_ATS_PerformanceHandle *ph,
950 const struct GNUNET_PeerIdentity *peer, ...) 839 const struct GNUNET_PeerIdentity *peer,
840 ...)
951{ 841{
952 struct PendingMessage *p; 842 struct GNUNET_MQ_Envelope *env;
953 struct ChangePreferenceMessage *m; 843 struct ChangePreferenceMessage *m;
954 size_t msize;
955 uint32_t count; 844 uint32_t count;
956 struct PreferenceInformation *pi; 845 struct PreferenceInformation *pi;
957 va_list ap; 846 va_list ap;
958 enum GNUNET_ATS_PreferenceKind kind; 847 enum GNUNET_ATS_PreferenceKind kind;
959 848
849 if (NULL == ph->mq)
850 return;
960 count = 0; 851 count = 0;
961 va_start(ap, peer); 852 va_start(ap, peer);
962 while (GNUNET_ATS_PREFERENCE_END != 853 while (GNUNET_ATS_PREFERENCE_END !=
@@ -977,14 +868,9 @@ GNUNET_ATS_performance_change_preference (struct GNUNET_ATS_PerformanceHandle *p
977 } 868 }
978 } 869 }
979 va_end(ap); 870 va_end(ap);
980 msize = count * sizeof(struct PreferenceInformation) 871 env = GNUNET_MQ_msg_extra (m,
981 + sizeof(struct ChangePreferenceMessage); 872 count * sizeof(struct PreferenceInformation),
982 p = GNUNET_malloc (sizeof (struct PendingMessage) + msize); 873 GNUNET_MESSAGE_TYPE_ATS_PREFERENCE_CHANGE);
983 p->size = msize;
984 p->is_init = GNUNET_NO;
985 m = (struct ChangePreferenceMessage *) &p[1];
986 m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_PREFERENCE_CHANGE);
987 m->header.size = htons (msize);
988 m->num_preferences = htonl (count); 874 m->num_preferences = htonl (count);
989 m->peer = *peer; 875 m->peer = *peer;
990 pi = (struct PreferenceInformation *) &m[1]; 876 pi = (struct PreferenceInformation *) &m[1];
@@ -1011,8 +897,8 @@ GNUNET_ATS_performance_change_preference (struct GNUNET_ATS_PerformanceHandle *p
1011 } 897 }
1012 } 898 }
1013 va_end(ap); 899 va_end(ap);
1014 GNUNET_CONTAINER_DLL_insert_tail(ph->pending_head, ph->pending_tail, p); 900 GNUNET_MQ_send (ph->mq,
1015 do_transmit (ph); 901 env);
1016} 902}
1017 903
1018 904
@@ -1028,16 +914,18 @@ GNUNET_ATS_performance_change_preference (struct GNUNET_ATS_PerformanceHandle *p
1028void 914void
1029GNUNET_ATS_performance_give_feedback (struct GNUNET_ATS_PerformanceHandle *ph, 915GNUNET_ATS_performance_give_feedback (struct GNUNET_ATS_PerformanceHandle *ph,
1030 const struct GNUNET_PeerIdentity *peer, 916 const struct GNUNET_PeerIdentity *peer,
1031 const struct GNUNET_TIME_Relative scope, ...) 917 const struct GNUNET_TIME_Relative scope,
918 ...)
1032{ 919{
1033 struct PendingMessage *p; 920 struct GNUNET_MQ_Envelope *env;
1034 struct FeedbackPreferenceMessage *m; 921 struct FeedbackPreferenceMessage *m;
1035 size_t msize;
1036 uint32_t count; 922 uint32_t count;
1037 struct PreferenceInformation *pi; 923 struct PreferenceInformation *pi;
1038 va_list ap; 924 va_list ap;
1039 enum GNUNET_ATS_PreferenceKind kind; 925 enum GNUNET_ATS_PreferenceKind kind;
1040 926
927 if (NULL == ph->mq)
928 return;
1041 count = 0; 929 count = 0;
1042 va_start(ap, scope); 930 va_start(ap, scope);
1043 while (GNUNET_ATS_PREFERENCE_END != 931 while (GNUNET_ATS_PREFERENCE_END !=
@@ -1058,14 +946,9 @@ GNUNET_ATS_performance_give_feedback (struct GNUNET_ATS_PerformanceHandle *ph,
1058 } 946 }
1059 } 947 }
1060 va_end(ap); 948 va_end(ap);
1061 msize = count * sizeof(struct PreferenceInformation) 949 env = GNUNET_MQ_msg_extra (m,
1062 + sizeof(struct FeedbackPreferenceMessage); 950 count * sizeof(struct PreferenceInformation),
1063 p = GNUNET_malloc (sizeof (struct PendingMessage) + msize); 951 GNUNET_MESSAGE_TYPE_ATS_PREFERENCE_FEEDBACK);
1064 p->size = msize;
1065 p->is_init = GNUNET_NO;
1066 m = (struct FeedbackPreferenceMessage *) &p[1];
1067 m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_PREFERENCE_FEEDBACK);
1068 m->header.size = htons (msize);
1069 m->scope = GNUNET_TIME_relative_hton (scope); 952 m->scope = GNUNET_TIME_relative_hton (scope);
1070 m->num_feedback = htonl (count); 953 m->num_feedback = htonl (count);
1071 m->peer = *peer; 954 m->peer = *peer;
@@ -1093,10 +976,8 @@ GNUNET_ATS_performance_give_feedback (struct GNUNET_ATS_PerformanceHandle *ph,
1093 } 976 }
1094 } 977 }
1095 va_end(ap); 978 va_end(ap);
1096 GNUNET_CONTAINER_DLL_insert_tail (ph->pending_head, 979 GNUNET_MQ_send (ph->mq,
1097 ph->pending_tail, 980 env);
1098 p);
1099 do_transmit (ph);
1100} 981}
1101 982
1102/* end of ats_api_performance.c */ 983/* end of ats_api_performance.c */