aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pe.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-07-31 21:23:23 +0000
committerChristian Grothoff <christian@grothoff.org>2016-07-31 21:23:23 +0000
commita78990b412db2c0ead2da8061c4f454f068991d1 (patch)
tree2e87adae62fd1ca3cd5a9f4c69248986f78bb106 /src/fs/gnunet-service-fs_pe.c
parent406c7d2d2d126c994a1fff13470b1f96439c6f9d (diff)
downloadgnunet-a78990b412db2c0ead2da8061c4f454f068991d1.tar.gz
gnunet-a78990b412db2c0ead2da8061c4f454f068991d1.zip
converting FS to new MQ-based core API
Diffstat (limited to 'src/fs/gnunet-service-fs_pe.c')
-rw-r--r--src/fs/gnunet-service-fs_pe.c165
1 files changed, 55 insertions, 110 deletions
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c
index b338c1a13..098c3d180 100644
--- a/src/fs/gnunet-service-fs_pe.c
+++ b/src/fs/gnunet-service-fs_pe.c
@@ -189,11 +189,6 @@ struct PeerPlan
189 struct GNUNET_CONTAINER_MultiHashMap *plan_map; 189 struct GNUNET_CONTAINER_MultiHashMap *plan_map;
190 190
191 /** 191 /**
192 * Current transmission request handle.
193 */
194 struct GSF_PeerTransmitHandle *pth;
195
196 /**
197 * Peer for which this is the plan. 192 * Peer for which this is the plan.
198 */ 193 */
199 struct GSF_ConnectedPeer *cp; 194 struct GSF_ConnectedPeer *cp;
@@ -202,6 +197,12 @@ struct PeerPlan
202 * Current task for executing the plan. 197 * Current task for executing the plan.
203 */ 198 */
204 struct GNUNET_SCHEDULER_Task *task; 199 struct GNUNET_SCHEDULER_Task *task;
200
201 /**
202 * Current message under transmission for the plan.
203 */
204 struct GNUNET_MQ_Envelope *env;
205
205}; 206};
206 207
207 208
@@ -241,15 +242,6 @@ get_rp_key (struct GSF_RequestPlan *rp)
241 242
242 243
243/** 244/**
244 * Figure out when and how to transmit to the given peer.
245 *
246 * @param cls the `struct GSF_ConnectedPeer` for transmission
247 */
248static void
249schedule_peer_transmission (void *cls);
250
251
252/**
253 * Insert the given request plan into the heap with the appropriate weight. 245 * Insert the given request plan into the heap with the appropriate weight.
254 * 246 *
255 * @param pp associated peer's plan 247 * @param pp associated peer's plan
@@ -329,21 +321,22 @@ plan (struct PeerPlan *pp,
329 rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay); 321 rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
330 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 322 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
331 "Earliest (re)transmission for `%s' in %us\n", 323 "Earliest (re)transmission for `%s' in %us\n",
332 GNUNET_h2s (&prd->query), rp->transmission_counter); 324 GNUNET_h2s (&prd->query),
325 rp->transmission_counter);
333 GNUNET_assert (rp->hn == NULL); 326 GNUNET_assert (rp->hn == NULL);
334 if (0 == GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value_us) 327 if (0 == GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value_us)
335 rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority); 328 rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
329 rp,
330 rp->priority);
336 else 331 else
337 rp->hn = 332 rp->hn =
338 GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp, 333 GNUNET_CONTAINER_heap_insert (pp->delay_heap,
334 rp,
339 rp->earliest_transmission.abs_value_us); 335 rp->earliest_transmission.abs_value_us);
340 GNUNET_assert (GNUNET_YES == 336 GNUNET_assert (GNUNET_YES ==
341 GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map, 337 GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map,
342 get_rp_key (rp), 338 get_rp_key (rp),
343 rp)); 339 rp));
344 if (NULL != pp->task)
345 GNUNET_SCHEDULER_cancel (pp->task);
346 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
347#undef N 340#undef N
348} 341}
349 342
@@ -383,75 +376,6 @@ get_latest (const struct GSF_RequestPlan *rp)
383 376
384 377
385/** 378/**
386 * Function called to get a message for transmission.
387 *
388 * @param cls closure
389 * @param buf_size number of bytes available in @a buf
390 * @param buf where to copy the message, NULL on error (peer disconnect)
391 * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
392 */
393static size_t
394transmit_message_callback (void *cls,
395 size_t buf_size,
396 void *buf)
397{
398 struct PeerPlan *pp = cls;
399 struct GSF_RequestPlan *rp;
400 size_t msize;
401
402 pp->pth = NULL;
403 if (NULL == buf)
404 {
405 /* failed, try again... */
406 if (NULL != pp->task)
407 GNUNET_SCHEDULER_cancel (pp->task);
408
409 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
410 GNUNET_STATISTICS_update (GSF_stats,
411 gettext_noop
412 ("# transmission failed (core has no bandwidth)"),
413 1, GNUNET_NO);
414 return 0;
415 }
416 rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
417 if (NULL == rp)
418 {
419 if (NULL != pp->task)
420 GNUNET_SCHEDULER_cancel (pp->task);
421 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
422 return 0;
423 }
424 msize = GSF_pending_request_get_message_ (get_latest (rp),
425 buf_size,
426 buf);
427 if (msize > buf_size)
428 {
429 if (NULL != pp->task)
430 GNUNET_SCHEDULER_cancel (pp->task);
431 /* buffer to small (message changed), try again */
432 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
433 return 0;
434 }
435 /* remove from root, add again elsewhere... */
436 GNUNET_assert (rp ==
437 GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
438 rp->hn = NULL;
439 rp->last_transmission = GNUNET_TIME_absolute_get ();
440 rp->transmission_counter++;
441 total_delay++;
442 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
443 "Executing plan %p executed %u times, planning retransmission\n",
444 rp, rp->transmission_counter);
445 plan (pp, rp);
446 GNUNET_STATISTICS_update (GSF_stats,
447 gettext_noop ("# query messages sent to other peers"),
448 1,
449 GNUNET_NO);
450 return msize;
451}
452
453
454/**
455 * Figure out when and how to transmit to the given peer. 379 * Figure out when and how to transmit to the given peer.
456 * 380 *
457 * @param cls the `struct PeerPlan` 381 * @param cls the `struct PeerPlan`
@@ -461,14 +385,16 @@ schedule_peer_transmission (void *cls)
461{ 385{
462 struct PeerPlan *pp = cls; 386 struct PeerPlan *pp = cls;
463 struct GSF_RequestPlan *rp; 387 struct GSF_RequestPlan *rp;
464 size_t msize;
465 struct GNUNET_TIME_Relative delay; 388 struct GNUNET_TIME_Relative delay;
466 389
467 pp->task = NULL; 390 if (NULL != pp->task)
468 if (NULL != pp->pth) 391 {
392 pp->task = NULL;
393 }
394 else
469 { 395 {
470 GSF_peer_transmit_cancel_ (pp->pth); 396 GNUNET_assert (NULL != pp->env);
471 pp->pth = NULL; 397 pp->env = NULL;
472 } 398 }
473 /* move ready requests to priority queue */ 399 /* move ready requests to priority queue */
474 while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) && 400 while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
@@ -508,23 +434,40 @@ schedule_peer_transmission (void *cls)
508 return; 434 return;
509 } 435 }
510#if INSANE_STATISTICS 436#if INSANE_STATISTICS
511 GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plans executed"), 437 GNUNET_STATISTICS_update (GSF_stats,
512 1, GNUNET_NO); 438 gettext_noop ("# query plans executed"),
439 1,
440 GNUNET_NO);
513#endif 441#endif
514 /* process from priority heap */ 442 /* process from priority heap */
515 rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap); 443 rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap);
516 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 444 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
517 "Executing query plan %p\n", 445 "Executing query plan %p\n",
518 rp); 446 rp);
519 GNUNET_assert (NULL != rp); 447 GNUNET_assert (NULL != rp);
520 msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL); 448 rp->hn = NULL;
521 pp->pth = 449 rp->last_transmission = GNUNET_TIME_absolute_get ();
522 GSF_peer_transmit_ (pp->cp, GNUNET_YES, 450 rp->transmission_counter++;
523 rp->priority, 451 total_delay++;
524 GNUNET_TIME_UNIT_FOREVER_REL, 452 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
525 msize, 453 "Executing plan %p executed %u times, planning retransmission\n",
526 &transmit_message_callback, pp); 454 rp,
527 GNUNET_assert (NULL != pp->pth); 455 rp->transmission_counter);
456 GNUNET_assert (NULL == pp->env);
457 pp->env = GSF_pending_request_get_message_ (get_latest (rp));
458 GNUNET_MQ_notify_sent (pp->env,
459 &schedule_peer_transmission,
460 pp);
461 GSF_peer_transmit_ (pp->cp,
462 GNUNET_YES,
463 rp->priority,
464 pp->env);
465 GNUNET_STATISTICS_update (GSF_stats,
466 gettext_noop ("# query messages sent to other peers"),
467 1,
468 GNUNET_NO);
469 plan (pp,
470 rp);
528} 471}
529 472
530 473
@@ -646,6 +589,8 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
646 id, 589 id,
647 pp, 590 pp,
648 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 591 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
592 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
593 pp);
649 } 594 }
650 mpc.merged = GNUNET_NO; 595 mpc.merged = GNUNET_NO;
651 mpc.pr = pr; 596 mpc.pr = pr;
@@ -710,16 +655,16 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
710 GNUNET_assert (GNUNET_YES == 655 GNUNET_assert (GNUNET_YES ==
711 GNUNET_CONTAINER_multipeermap_remove (plans, id, 656 GNUNET_CONTAINER_multipeermap_remove (plans, id,
712 pp)); 657 pp));
713 if (NULL != pp->pth)
714 {
715 GSF_peer_transmit_cancel_ (pp->pth);
716 pp->pth = NULL;
717 }
718 if (NULL != pp->task) 658 if (NULL != pp->task)
719 { 659 {
720 GNUNET_SCHEDULER_cancel (pp->task); 660 GNUNET_SCHEDULER_cancel (pp->task);
721 pp->task = NULL; 661 pp->task = NULL;
722 } 662 }
663 if (NULL != pp->env)
664 {
665 GNUNET_MQ_send_cancel (pp->env);
666 pp->env = NULL;
667 }
723 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap))) 668 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
724 { 669 {
725 GNUNET_break (GNUNET_YES == 670 GNUNET_break (GNUNET_YES ==