aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cp.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-02-02 15:34:14 +0000
committerChristian Grothoff <christian@grothoff.org>2011-02-02 15:34:14 +0000
commit07b5d5dad5ba589cde1c97e574de84e5c7d5d696 (patch)
tree70d5879e2632d98881c22c38ef3e7bdeeafbf71f /src/fs/gnunet-service-fs_cp.c
parent1240962f4e349aee4e3a57afe293a399d7d316fd (diff)
downloadgnunet-07b5d5dad5ba589cde1c97e574de84e5c7d5d696.tar.gz
gnunet-07b5d5dad5ba589cde1c97e574de84e5c7d5d696.zip
cleaning
Diffstat (limited to 'src/fs/gnunet-service-fs_cp.c')
-rw-r--r--src/fs/gnunet-service-fs_cp.c386
1 files changed, 307 insertions, 79 deletions
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index 48e850cab..a3994adf5 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -121,13 +121,14 @@ struct GSF_ConnectedPeer
121 121
122 /** 122 /**
123 * Context of our GNUNET_CORE_peer_change_preference call (or NULL). 123 * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
124 * NULL if we have successfully reserved 32k, otherwise non-NULL.
124 */ 125 */
125 struct GNUNET_CORE_InformationRequestContext *irc; 126 struct GNUNET_CORE_InformationRequestContext *irc;
126 127
127 /** 128 /**
128 * ID of delay task for scheduling transmission. 129 * ID of delay task for scheduling transmission.
129 */ 130 */
130 GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; 131 GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: unused!
131 132
132 /** 133 /**
133 * Increase in traffic preference still to be submitted 134 * Increase in traffic preference still to be submitted
@@ -234,7 +235,111 @@ static void
234update_atsi (struct GSF_ConnectedPeer *cp, 235update_atsi (struct GSF_ConnectedPeer *cp,
235 const struct GNUNET_TRANSPORT_ATS_Information *atsi) 236 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
236{ 237{
237 // FIXME: merge atsi into cp's performance data! 238 struct GNUNET_TIME_Relative latency;
239
240 latency = get_latency (atsi);
241 GNUNET_LOAD_value_set_decline (cp->transmission_delay,
242 latency);
243 /* LATER: merge atsi into cp's performance data (if we ever care...) */
244}
245
246
247/**
248 * Core is ready to transmit to a peer, get the message.
249 *
250 * @param cls the 'struct GSF_PeerTransmitHandle' of the message
251 * @param size number of bytes core is willing to take
252 * @param buf where to copy the message
253 * @return number of bytes copied to buf
254 */
255static size_t
256peer_transmit_ready_cb (void *cls,
257 size_t size,
258 void *buf)
259{
260 struct GSF_PeerTransmitHandle *pth = cls;
261 struct GSF_ConnectedPeer *cp;
262 size_t ret;
263
264 if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
265 {
266 GNUNET_SCHEDULER_cancel (pth->timeout_task);
267 pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
268 }
269 cp = pth->cp;
270 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
271 cp->pth_tail,
272 pth);
273 if (pth->is_query)
274 {
275 cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
276 GNUNET_assert (0 < cp->ppd.pending_queries--);
277 }
278 else
279 {
280 GNUNET_assert (0 < cp->ppd.pending_replies--);
281 }
282 GNUNET_LOAD_update (cp->ppd.transmission_delay,
283 GNUNET_TIME_absolute_get_duration (pth->request_start_time).rel_value);
284 ret = pth->gmc (pth->gmc_cls,
285 0, NULL);
286 GNUNET_free (pth);
287 return ret;
288}
289
290
291/**
292 * Function called by core upon success or failure of our bandwidth reservation request.
293 *
294 * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
295 * @param peer identifies the peer
296 * @param bandwidth_out available amount of outbound bandwidth
297 * @param amount set to the amount that was actually reserved or unreserved;
298 * either the full requested amount or zero (no partial reservations)
299 * @param preference current traffic preference for the given peer
300 */
301static void
302core_reserve_callback (void *cls,
303 const struct GNUNET_PeerIdentity * peer,
304 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
305 int amount,
306 uint64_t preference)
307{
308 struct GSF_ConnectedPeer *cp = cls;
309 uint64_t ip;
310
311 cp->irc = NULL;
312 if (0 == amount)
313 {
314 /* failed; retry! (how did we get here!?) */
315 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
316 _("Failed to reserve bandwidth to peer `%s'\n"),
317 GNUNET_i2s (peer));
318 ip = cp->inc_preference;
319 cp->inc_preference = 0;
320 cp->irc = GNUNET_CORE_peer_change_preference (core,
321 peer,
322 GNUNET_TIME_UNIT_FOREVER_REL,
323 GNUNET_BANDWIDTH_VALUE_MAX,
324 GNUNET_FS_DBLOCK_SIZE,
325 ip,
326 &core_reserve_callback,
327 cp);
328 return;
329 }
330 pth = cp->pth_head;
331 if ( (NULL != pth) &&
332 (NULL == pth->cth) )
333 {
334 /* reservation success, try transmission now! */
335 pth->cth = GNUNET_CORE_notify_transmit_ready (core,
336 priority,
337 GNUNET_TIME_absolute_get_remaining (pth->timeout),
338 &target,
339 size,
340 &peer_transmit_ready_cb,
341 pth);
342 }
238} 343}
239 344
240 345
@@ -258,6 +363,15 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
258 cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer)); 363 cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
259 cp->transmission_delay = GNUNET_LOAD_value_init (latency); 364 cp->transmission_delay = GNUNET_LOAD_value_init (latency);
260 cp->pid = GNUNET_PEER_intern (peer); 365 cp->pid = GNUNET_PEER_intern (peer);
366 cp->transmission_delay = GNUNET_LOAD_value_init (0);
367 cp->irc = GNUNET_CORE_peer_change_preference (core,
368 peer,
369 GNUNET_TIME_UNIT_FOREVER_REL,
370 GNUNET_BANDWIDTH_VALUE_MAX,
371 GNUNET_FS_DBLOCK_SIZE,
372 0,
373 &core_reserve_callback,
374 cp);
261 fn = get_trust_filename (peer); 375 fn = get_trust_filename (peer);
262 if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) && 376 if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
263 (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust)))) 377 (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
@@ -269,40 +383,42 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
269 cp, 383 cp,
270 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 384 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
271 update_atsi (cp, atsi); 385 update_atsi (cp, atsi);
272 386 GSF_plan_notify_new_peer_ (cp);
273
274 // FIXME: notify plan & migration about new peer!
275
276 return cp; 387 return cp;
277} 388}
278 389
279 390
280/** 391/**
281 * Core is ready to transmit to a peer, get the message. 392 * Handle P2P "MIGRATION_STOP" message.
282 * 393 *
283 * @param cls the 'struct GSF_PeerTransmitHandle' of the message 394 * @param cls closure, always NULL
284 * @param size number of bytes core is willing to take 395 * @param other the other peer involved (sender or receiver, NULL
285 * @param buf where to copy the message 396 * for loopback messages where we are both sender and receiver)
286 * @return number of bytes copied to buf 397 * @param message the actual message
398 * @param atsi performance information
399 * @return GNUNET_OK to keep the connection open,
400 * GNUNET_SYSERR to close it (signal serious error)
287 */ 401 */
288static size_t 402int
289peer_transmit_ready_cb (void *cls, 403GSF_handle_p2p_migration_stop_ (void *cls,
290 size_t size, 404 const struct GNUNET_PeerIdentity *other,
291 void *buf) 405 const struct GNUNET_MessageHeader *message,
406 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
292{ 407{
293 struct GSF_PeerTransmitHandle *pth = cls; 408 struct GSF_ConnectedPeer *cp;
294 struct GSF_ConnectedPeer *cp; 409 const struct MigrationStopMessage *msm;
295 size_t ret;
296 410
297 cp = pth->cp; 411 msm = (const struct MigrationStopMessage*) message;
298 GNUNET_CONTAINER_DLL_remove (cp->pth_head, 412 cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
299 cp->pth_tail, 413 &other->hashPubKey);
300 pth); 414 if (cp == NULL)
301 // FIXME: update 'cp' counters! 415 {
302 ret = pth->gmc (pth->gmc_cls, 416 GNUNET_break (0);
303 0, NULL); 417 return GNUNET_OK;
304 GNUNET_free (pth); 418 }
305 return ret; 419 cp->ppd.migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
420 update_atsi (cp, atsi);
421 return GNUNET_OK;
306} 422}
307 423
308 424
@@ -325,7 +441,12 @@ peer_transmit_timeout (void *cls,
325 GNUNET_CONTAINER_DLL_remove (cp->pth_head, 441 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
326 cp->pth_tail, 442 cp->pth_tail,
327 pth); 443 pth);
328 // FIXME: update 'cp' counters! 444 if (pth->is_query)
445 GNUNET_assert (0 < cp->ppd.pending_queries--);
446 else
447 GNUNET_assert (0 < cp->ppd.pending_replies--);
448 GNUNET_LOAD_update (cp->ppd.transmission_delay,
449 UINT64_MAX);
329 pth->gmc (pth->gmc_cls, 450 pth->gmc (pth->gmc_cls,
330 0, NULL); 451 0, NULL);
331 GNUNET_free (pth); 452 GNUNET_free (pth);
@@ -360,6 +481,8 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
360 struct GSF_PeerTransmitHandle *pos; 481 struct GSF_PeerTransmitHandle *pos;
361 struct GSF_PeerTransmitHandle *prev; 482 struct GSF_PeerTransmitHandle *prev;
362 struct GNUNET_PeerIdentity target; 483 struct GNUNET_PeerIdentity target;
484 uint64_t ip;
485 int is_ready;
363 486
364 cp = GNUNET_CONTAINER_multihashmap_get (cp_map, 487 cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
365 &peer->hashPubKey); 488 &peer->hashPubKey);
@@ -393,19 +516,55 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
393 pth); 516 pth);
394 GNUNET_PEER_resolve (cp->pid, 517 GNUNET_PEER_resolve (cp->pid,
395 &target); 518 &target);
396 pth->cth = GNUNET_CORE_notify_transmit_ready (core, 519 if (is_query)
397 priority, 520 {
398 timeout, 521 /* query, need reservation */
399 &target, 522 if (NULL == cp->irc)
400 size, 523 {
401 &peer_transmit_ready_cb, 524 /* reservation already done! */
402 pth); 525 is_ready = GNUNET_YES;
403 /* pth->cth could be NULL here, that's OK, we'll try again 526 ip = cp->inc_preference;
404 later... */ 527 cp->inc_preference = 0;
528 cp->irc = GNUNET_CORE_peer_change_preference (core,
529 peer,
530 GNUNET_TIME_UNIT_FOREVER_REL,
531 GNUNET_BANDWIDTH_VALUE_MAX,
532 GNUNET_FS_DBLOCK_SIZE,
533 ip,
534 &core_reserve_callback,
535 cp);
536 }
537 else
538 {
539 /* still waiting for reservation */
540 is_ready = GNUNET_NO;
541 }
542 }
543 else
544 {
545 /* no reservation needed for content */
546 is_ready = GNUNET_YES;
547 }
548 if (is_ready)
549 {
550 pth->cth = GNUNET_CORE_notify_transmit_ready (core,
551 priority,
552 timeout,
553 &target,
554 size,
555 &peer_transmit_ready_cb,
556 pth);
557 /* pth->cth could be NULL here, that's OK, we'll try again
558 later... */
559 }
405 if (pth->cth == NULL) 560 if (pth->cth == NULL)
406 pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, 561 {
407 &peer_transmit_timeout, 562 /* if we're waiting for reservation OR if we could not do notify_transmit_ready,
408 pth); 563 install a timeout task to be on the safe side */
564 pth->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
565 &peer_transmit_timeout,
566 pth);
567 }
409 return pth; 568 return pth;
410} 569}
411 570
@@ -426,11 +585,19 @@ GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
426 GNUNET_SCHEDULER_cancel (pth->timeout_task); 585 GNUNET_SCHEDULER_cancel (pth->timeout_task);
427 pth->timeout_task = GNUNET_SCHEDULER_NO_TASK; 586 pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
428 } 587 }
588 if (NULL != pth->cth)
589 {
590 GNUNET_CORE_notify_transmit_ready_cancel (pth->cth);
591 pth->cth = NULL;
592 }
429 cp = pth->cp; 593 cp = pth->cp;
430 GNUNET_CONTAINER_DLL_remove (cp->pth_head, 594 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
431 cp->pth_tail, 595 cp->pth_tail,
432 pth); 596 pth);
433 // FIXME: update 'cp' counters! 597 if (pth->is_query)
598 GNUNET_assert (0 < cp->ppd.pending_queries--);
599 else
600 GNUNET_assert (0 < cp->ppd.pending_replies--);
434 GNUNET_free (pth); 601 GNUNET_free (pth);
435} 602}
436 603
@@ -438,20 +605,37 @@ GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
438/** 605/**
439 * Report on receiving a reply; update the performance record of the given peer. 606 * Report on receiving a reply; update the performance record of the given peer.
440 * 607 *
441 * @param peer responding peer (will be updated) 608 * @param cp responding peer (will be updated)
442 * @param request_time time at which the original query was transmitted 609 * @param request_time time at which the original query was transmitted
443 * @param request_priority priority of the original request 610 * @param request_priority priority of the original request
444 * @param initiator_client local client on responsible for query (or NULL) 611 * @param initiator_client local client on responsible for query (or NULL)
445 * @param initiator_peer other peer responsible for query (or NULL) 612 * @param initiator_peer other peer responsible for query (or NULL)
446 */ 613 */
447void 614void
448GSF_peer_update_performance_ (struct GSF_ConnectedPeer *peer, 615GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
449 GNUNET_TIME_Absolute request_time, 616 struct GNUNET_TIME_Absolute request_time,
450 uint32_t request_priority, 617 uint32_t request_priority,
451 const struct GSF_LocalClient *initiator_client, 618 const struct GSF_LocalClient *initiator_client,
452 const struct GSF_ConnectedPeer *initiator_peer) 619 const struct GSF_ConnectedPeer *initiator_peer)
453{ 620{
454 // FIXME... 621 struct GNUNET_TIME_Relative delay;
622 unsigned int i;
623
624 delay = GNUNET_TIME_absolute_get_duration (request_time);
625 cp->ppd.avg_reply_delay = (cp->ppd.avg_reply_delay * (RUNAVG_DELAY_N-1) + delay.rel_value) / RUNAVG_DELAY_N;
626 cp->ppd.avg_priority = (cp->avg_priority * (RUNAVG_DELAY_N-1) + request_priority) / RUNAVG_DELAY_N;
627 if (NULL != initiator_client)
628 {
629 cp->ppd.last_client_replies[cp->last_client_replies_woff++ % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
630 }
631 else if (NULL != initiator_peer)
632 {
633 GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
634 cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->pid;
635 GNUNET_PEER_change_rc (initiator_peer->pid, 1);
636 }
637 else
638 GNUNET_break (0);
455} 639}
456 640
457 641
@@ -495,6 +679,7 @@ GSF_peer_disconnect_handler_ (void *cls,
495 const struct GNUNET_PeerIdentity *peer) 679 const struct GNUNET_PeerIdentity *peer)
496{ 680{
497 struct GSF_ConnectedPeer *cp; 681 struct GSF_ConnectedPeer *cp;
682 struct GSF_PeerTransmitHandle *pth;
498 683
499 cp = GNUNET_CONTAINER_multihashmap_get (cp_map, 684 cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
500 &peer->hashPubKey); 685 &peer->hashPubKey);
@@ -502,7 +687,27 @@ GSF_peer_disconnect_handler_ (void *cls,
502 GNUNET_CONTAINER_multihashmap_remove (cp_map, 687 GNUNET_CONTAINER_multihashmap_remove (cp_map,
503 &peer->hashPubKey, 688 &peer->hashPubKey,
504 cp); 689 cp);
505 // FIXME: more cleanup 690 if (NULL != cp->irc)
691 {
692 GNUNET_CORE_peer_change_preference_cancel (cp->irc);
693 cp->irc = NULL;
694 }
695 GSF_plan_notify_peer_disconnect_ (cp);
696 GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
697 GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
698 while (NULL != (pth = cp->pth_head))
699 {
700 if (NULL != pth->th)
701 {
702 GNUNET_CORE_notify_transmit_ready_cancel (pth->th);
703 pth->th = NULL;
704 }
705 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
706 cp->pth_tail,
707 pth);
708 GNUNET_free (pth);
709 }
710 GNUNET_PEER_change_rc (cp->pid, -1);
506 GNUNET_free (cp); 711 GNUNET_free (cp);
507} 712}
508 713
@@ -569,38 +774,6 @@ GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
569 774
570 775
571/** 776/**
572 * Try to reserve bandwidth (to receive data FROM the given peer).
573 * This function must only be called ONCE per connected peer at a
574 * time; it can be called again after the 'rc' callback was invoked.
575 * If the peer disconnects, the request is (silently!) ignored (and
576 * the requester is responsible to register for notification about the
577 * peer disconnect if any special action needs to be taken in this
578 * case).
579 *
580 * @param cp peer to reserve bandwidth from
581 * @param size number of bytes to reserve
582 * @param rc function to call upon reservation success or failure
583 * @param rc_cls closure for rc
584 */
585void
586GSF_connected_peer_reserve_ (struct GSF_ConnectedPeer *cp,
587 size_t size,
588 GSF_PeerReserveCallback rc,
589 void *rc_cls)
590{
591 // FIXME: should we allow queueing multiple reservation requests?
592 // FIXME: what about cancellation?
593 // FIXME: change docu on peer disconnect handling?
594 if (NULL != cp->irc)
595 {
596 rc (rc_cls, cp, GNUNET_NO);
597 return;
598 }
599 // FIXME...
600}
601
602
603/**
604 * Write host-trust information to a file - flush the buffer entry! 777 * Write host-trust information to a file - flush the buffer entry!
605 * 778 *
606 * @param cls closure, not used 779 * @param cls closure, not used
@@ -644,6 +817,23 @@ flush_trust (void *cls,
644 817
645 818
646/** 819/**
820 * Notify core about a preference we have for the given peer
821 * (to allocate more resources towards it). The change will
822 * be communicated the next time we reserve bandwidth with
823 * core (not instantly).
824 *
825 * @param cp peer to reserve bandwidth from
826 * @param pref preference change
827 */
828void
829GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
830 uint64_t pref)
831{
832 cp->inc_preference += pref;
833}
834
835
836/**
647 * Call this method periodically to flush trust information to disk. 837 * Call this method periodically to flush trust information to disk.
648 * 838 *
649 * @param cls closure, not used 839 * @param cls closure, not used
@@ -715,7 +905,7 @@ void
715GSF_connected_peer_done_ () 905GSF_connected_peer_done_ ()
716{ 906{
717 cron_flush_trust (NULL, NULL); 907 cron_flush_trust (NULL, NULL);
718 GNUNET_CONTAINER_multihashmap_iterate (cp_peers, 908 GNUNET_CONTAINER_multihashmap_iterate (cp_map,
719 &clean_peer, 909 &clean_peer,
720 NULL); 910 NULL);
721 GNUNET_CONTAINER_multihashmap_destroy (cp_map); 911 GNUNET_CONTAINER_multihashmap_destroy (cp_map);
@@ -725,6 +915,44 @@ GSF_connected_peer_done_ ()
725} 915}
726 916
727 917
918/**
919 * Iterator to remove references to LC entry.
920 *
921 * @param the 'struct GSF_LocalClient*' to look for
922 * @param key current key code
923 * @param value value in the hash map (peer entry)
924 * @return GNUNET_YES (we should continue to iterate)
925 */
926static int
927clean_peer (void *cls,
928 const GNUNET_HashCode * key,
929 void *value)
930{
931 const struct GSF_LocalClient *lc = cls;
932 struct GSF_ConnectedPeer *cp = value;
933 unsigned int i;
934
935 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
936 if (cp->ppd.last_client_replies[i] == lc)
937 cp->ppd.last_client_replies[i] = NULL;
938 return GNUNET_YES;
939}
940
941
942/**
943 * Notification that a local client disconnected. Clean up all of our
944 * references to the given handle.
945 *
946 * @param lc handle to the local client (henceforth invalid)
947 */
948void
949GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
950{
951 GNUNET_CONTAINER_multihashmap_iterate (cp_map,
952 &clean_local_client,
953 (void*) lc);
954}
955
728 956
729#endif 957#endif
730/* end of gnunet-service-fs_cp.h */ 958/* end of gnunet-service-fs_cp.h */