aboutsummaryrefslogtreecommitdiff
path: root/src/fs
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-03-11 16:23:52 +0000
committerChristian Grothoff <christian@grothoff.org>2011-03-11 16:23:52 +0000
commit64821d4ae43b03b30de3dd136137598c0d5a2ab2 (patch)
tree1ed54d5721882d5f1e6bf225616d202d9d8a7b08 /src/fs
parentd984e2895f96f67fe2c44f27cdacfbb404485ada (diff)
downloadgnunet-64821d4ae43b03b30de3dd136137598c0d5a2ab2.tar.gz
gnunet-64821d4ae43b03b30de3dd136137598c0d5a2ab2.zip
stuff
Diffstat (limited to 'src/fs')
-rw-r--r--src/fs/gnunet-service-fs_new.c26
-rw-r--r--src/fs/gnunet-service-fs_pe.c219
-rw-r--r--src/fs/gnunet-service-fs_pe.h24
-rw-r--r--src/fs/gnunet-service-fs_pr.c11
-rw-r--r--src/fs/gnunet-service-fs_pr.h2
5 files changed, 159 insertions, 123 deletions
diff --git a/src/fs/gnunet-service-fs_new.c b/src/fs/gnunet-service-fs_new.c
index 7ad1874f7..5e9fce754 100644
--- a/src/fs/gnunet-service-fs_new.c
+++ b/src/fs/gnunet-service-fs_new.c
@@ -228,9 +228,6 @@ GSF_test_get_load_too_high_ (uint32_t priority)
228} 228}
229 229
230 230
231
232
233
234/** 231/**
235 * Handle P2P "PUT" message. 232 * Handle P2P "PUT" message.
236 * 233 *
@@ -261,25 +258,6 @@ handle_p2p_put (void *cls,
261 258
262 259
263/** 260/**
264 * Decide with what weight we should forward the given
265 * request to the given peer.
266 *
267 * @param cp target peer
268 * @param pr request
269 */
270static void
271plan (struct GSF_ConnectedPeer *cp,
272 struct GSF_PendingRequest *pr)
273{
274 GNUNET_CONTAINER_HeapCostType weight;
275
276 weight = 0;
277 /* FIXME: calculate weight properly... */
278 GSF_plan_add_ (cp, pr, weight);
279}
280
281
282/**
283 * We have a new request, consider forwarding it to the given 261 * We have a new request, consider forwarding it to the given
284 * peer. 262 * peer.
285 * 263 *
@@ -296,7 +274,7 @@ consider_request_for_forwarding (void *cls,
296{ 274{
297 struct GSF_PendingRequest *pr = cls; 275 struct GSF_PendingRequest *pr = cls;
298 276
299 plan (cp, pr); 277 GSF_plan_add_ (cp, pr);
300} 278}
301 279
302 280
@@ -466,7 +444,7 @@ consider_peer_for_forwarding (void *cls,
466{ 444{
467 struct GSF_ConnectedPeer *cp = cls; 445 struct GSF_ConnectedPeer *cp = cls;
468 446
469 plan (cp, pr); 447 GSF_plan_add_ (cp, pr);
470 return GNUNET_YES; 448 return GNUNET_YES;
471} 449}
472 450
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c
index db501b761..e7608653e 100644
--- a/src/fs/gnunet-service-fs_pe.c
+++ b/src/fs/gnunet-service-fs_pe.c
@@ -26,28 +26,130 @@
26#include "platform.h" 26#include "platform.h"
27#include "gnunet-service-fs_cp.h" 27#include "gnunet-service-fs_cp.h"
28#include "gnunet-service-fs_pe.h" 28#include "gnunet-service-fs_pe.h"
29#include "gnunet-service-fs_pr.h"
29 30
30/** 31/**
31 * Hash map from peer identities to GNUNET_CONTAINER_Heap's with 32 * Transmission plan for a peer.
32 * pending requests as entries. 33 */
34struct PeerPlan
35{
36 /**
37 * Heap with pending queries, smaller weights mean higher priority.
38 */
39 struct GNUNET_CONTAINER_Heap *heap;
40
41 /**
42 * Current transmission request handle.
43 */
44 struct GSF_PeerTransmitHandle *pth;
45
46 /**
47 * Peer for which this is the plan.
48 */
49 struct GSF_ConnectedPeer *cp;
50
51 /**
52 * Current task for executing the plan.
53 */
54 GNUNET_SCHEDULER_TaskIdentifier task;
55};
56
57
58/**
59 * Hash map from peer identities to PeerPlans.
33 */ 60 */
34static struct GNUNET_CONTAINER_MultiHashMap *plans; 61static struct GNUNET_CONTAINER_MultiHashMap *plans;
35 62
36 63
37/** 64/**
38 * Get the size of the request queue for the given peer. 65 * Figure out when and how to transmit to the given peer.
66 *
67 * @param cls the 'struct GSF_ConnectedPeer' for transmission
68 * @param tc scheduler context
69 */
70static void
71schedule_peer_transmission (void *cls,
72 const struct GNUNET_SCHEDULER_TaskContext *tc);
73
74
75/**
76 * Function called to get a message for transmission.
39 * 77 *
40 * @param cp connected peer to query 78 * @param cls closure
41 * @return number of entries in this peer's request queue 79 * @param buf_size number of bytes available in buf
80 * @param buf where to copy the message, NULL on error (peer disconnect)
81 * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
42 */ 82 */
43static struct GNUNET_CONTAINER_Heap * 83static size_t
44get_heap (const struct GSF_ConnectedPeer *cp) 84transmit_message_callback (void *cls,
85 size_t buf_size,
86 void *buf)
45{ 87{
46 struct GNUNET_PeerIdentity id; 88 struct PeerPlan *pp = cls;
89 struct GSF_PendingRequest *pr;
90 size_t msize;
47 91
48 GSF_connected_peer_get_identity_ (cp, &id); 92 if (NULL == buf)
49 return GNUNET_CONTAINER_multihashmap_get (plans, 93 {
50 &id.hashPubKey); 94 /* failed, try again... */
95 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
96 return 0;
97 }
98 pr = GNUNET_CONTAINER_heap_peek (pp->heap);
99 msize = GSF_pending_request_get_message_ (pr, buf_size, buf);
100 if (msize > buf_size)
101 {
102 /* buffer to small (message changed), try again */
103 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
104 return 0;
105 }
106 /* remove from root, add again elsewhere... */
107 GNUNET_assert (pr == GNUNET_CONTAINER_heap_remove_root (pp->heap));
108 GSF_plan_add_ (pp->cp, pr);
109 return msize;
110}
111
112
113/**
114 * Figure out when and how to transmit to the given peer.
115 *
116 * @param cls the 'struct PeerPlan'
117 * @param tc scheduler context
118 */
119static void
120schedule_peer_transmission (void *cls,
121 const struct GNUNET_SCHEDULER_TaskContext *tc)
122{
123 struct PeerPlan *pp = cls;
124 struct GSF_PendingRequest *pr;
125 size_t msize;
126 struct GNUNET_TIME_Relative delay;
127
128 pp->task = GNUNET_SCHEDULER_NO_TASK;
129 if (NULL == pp->heap)
130 return;
131 if (0 == GNUNET_CONTAINER_heap_get_size (pp->heap))
132 return;
133 GNUNET_assert (NULL == pp->pth);
134 pr = GNUNET_CONTAINER_heap_peek (pp->heap);
135 if (0) // FIXME: if (re)transmission should wait, wait...
136 {
137 delay = GNUNET_TIME_UNIT_SECONDS;
138 // FIXME
139 pp->task = GNUNET_SCHEDULER_add_delayed (delay,
140 &schedule_peer_transmission,
141 pp);
142 return;
143 }
144 msize = GSF_pending_request_get_message_ (pr, 0, NULL);
145 pp->pth = GSF_peer_transmit_ (pp->cp,
146 GNUNET_YES,
147 0 /* FIXME: pr->priority? */,
148 GNUNET_TIME_UNIT_FOREVER_REL,
149 msize,
150 &transmit_message_callback,
151 pp);
152 GNUNET_assert (NULL != pp->pth);
51} 153}
52 154
53 155
@@ -56,31 +158,42 @@ get_heap (const struct GSF_ConnectedPeer *cp)
56 * 158 *
57 * @param cp peer with the entry 159 * @param cp peer with the entry
58 * @param pr request with the entry 160 * @param pr request with the entry
59 * @param weight determines position of the entry in the cp queue,
60 * lower weights are earlier in the queue
61 */ 161 */
62void 162void
63GSF_plan_add_ (const struct GSF_ConnectedPeer *cp, 163GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
64 struct GSF_PendingRequest *pr, 164 struct GSF_PendingRequest *pr)
65 GNUNET_CONTAINER_HeapCostType weight)
66{ 165{
67 struct GNUNET_PeerIdentity id; 166 struct GNUNET_PeerIdentity id;
68 struct GNUNET_CONTAINER_Heap *h; 167 struct PeerPlan *pp;
69 168 GNUNET_CONTAINER_HeapCostType weight;
169
70 GSF_connected_peer_get_identity_ (cp, &id); 170 GSF_connected_peer_get_identity_ (cp, &id);
71 h = GNUNET_CONTAINER_multihashmap_get (plans, 171 pp = GNUNET_CONTAINER_multihashmap_get (plans,
72 &id.hashPubKey); 172 &id.hashPubKey);
73 if (NULL == h) 173 if (NULL == pp)
74 { 174 {
75 h = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 175 pp = GNUNET_malloc (sizeof (struct PeerPlan));
176 pp->heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
76 GNUNET_CONTAINER_multihashmap_put (plans, 177 GNUNET_CONTAINER_multihashmap_put (plans,
77 &id.hashPubKey, 178 &id.hashPubKey,
78 h, 179 pp,
79 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 180 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
80 } 181 }
81 GNUNET_CONTAINER_heap_insert (h, 182 weight = 0; // FIXME: calculate real weight!
183 GNUNET_CONTAINER_heap_insert (pp->heap,
82 pr, 184 pr,
83 weight); 185 weight);
186 if (pp->pth != NULL)
187 {
188 if (pr != GNUNET_CONTAINER_heap_peek (pp->heap))
189 return;
190 GSF_peer_transmit_cancel_ (pp->pth);
191 pp->pth = NULL;
192 }
193 if (GNUNET_SCHEDULER_NO_TASK != pp->task)
194 GNUNET_SCHEDULER_cancel (pp->task);
195 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
196 pp);
84} 197}
85 198
86 199
@@ -94,15 +207,20 @@ void
94GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) 207GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
95{ 208{
96 struct GNUNET_PeerIdentity id; 209 struct GNUNET_PeerIdentity id;
97 struct GNUNET_CONTAINER_Heap *h; 210 struct PeerPlan *pp;
98 211
99 GSF_connected_peer_get_identity_ (cp, &id); 212 GSF_connected_peer_get_identity_ (cp, &id);
100 h = GNUNET_CONTAINER_multihashmap_get (plans, 213 pp = GNUNET_CONTAINER_multihashmap_get (plans,
101 &id.hashPubKey); 214 &id.hashPubKey);
102 GNUNET_CONTAINER_multihashmap_remove (plans, 215 GNUNET_CONTAINER_multihashmap_remove (plans,
103 &id.hashPubKey, 216 &id.hashPubKey,
104 h); 217 pp);
105 GNUNET_CONTAINER_heap_destroy (h); 218 if (NULL != pp->pth)
219 GSF_peer_transmit_cancel_ (pp->pth);
220 if (GNUNET_SCHEDULER_NO_TASK != pp->task)
221 GNUNET_SCHEDULER_cancel (pp->task);
222 GNUNET_CONTAINER_heap_destroy (pp->heap);
223 GNUNET_free (pp);
106} 224}
107 225
108 226
@@ -152,11 +270,11 @@ find_request (void *cls,
152 270
153 271
154/** 272/**
155 * Remove the given request from all heaps. * 273 * Remove the given request from all heaps. * FIXME: O(n) -- inefficient!
156 * 274 *
157 * @param cls 'struct GSF_PendingRequest' to purge 275 * @param cls 'struct GSF_PendingRequest' to purge
158 * @param key identity of the peer we're currently looking at (unused) 276 * @param key identity of the peer we're currently looking at (unused)
159 * @param value request heap for the given peer to search for the 'cls' 277 * @param value PeerPlan for the given peer to search for the 'cls'
160 * @return GNUNET_OK (continue iteration) 278 * @return GNUNET_OK (continue iteration)
161 */ 279 */
162static int 280static int
@@ -165,7 +283,8 @@ remove_request (void *cls,
165 void *value) 283 void *value)
166{ 284{
167 const struct GSF_PendingRequest *pr = cls; 285 const struct GSF_PendingRequest *pr = cls;
168 struct GNUNET_CONTAINER_Heap *h = value; 286 struct PeerPlan *pp = value;
287 struct GNUNET_CONTAINER_Heap *h = pp->heap;
169 struct FindRequestClosure frc; 288 struct FindRequestClosure frc;
170 289
171 frc.pr = pr; 290 frc.pr = pr;
@@ -199,44 +318,6 @@ GSF_plan_notify_request_done_ (const struct GSF_PendingRequest *pr)
199 318
200 319
201/** 320/**
202 * Get the lowest-weight entry for the respective peer
203 * from the plan. Removes the entry from the plan's queue.
204 *
205 * @param cp connected peer to query for the next request
206 * @return NULL if the queue for this peer is empty
207 */
208struct GSF_PendingRequest *
209GSF_plan_get_ (const struct GSF_ConnectedPeer *cp)
210{
211 struct GNUNET_CONTAINER_Heap *h;
212
213 h = get_heap (cp);
214 if (NULL == h)
215 return NULL;
216 return GNUNET_CONTAINER_heap_remove_root (h);
217}
218
219
220/**
221 * Get the size of the request queue for the given peer.
222 *
223 * @param cp connected peer to query
224 * @return number of entries in this peer's request queue
225 */
226unsigned int
227GSF_plan_size_ (const struct GSF_ConnectedPeer *cp)
228{
229 struct GNUNET_CONTAINER_Heap *h;
230
231 h = get_heap (cp);
232 if (NULL == h)
233 return 0;
234 return GNUNET_CONTAINER_heap_get_size (h);
235}
236
237
238
239/**
240 * Initialize plan subsystem. 321 * Initialize plan subsystem.
241 */ 322 */
242void 323void
diff --git a/src/fs/gnunet-service-fs_pe.h b/src/fs/gnunet-service-fs_pe.h
index d70001356..14e9dec2e 100644
--- a/src/fs/gnunet-service-fs_pe.h
+++ b/src/fs/gnunet-service-fs_pe.h
@@ -39,8 +39,7 @@
39 */ 39 */
40void 40void
41GSF_plan_add_ (const struct GSF_ConnectedPeer *cp, 41GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
42 struct GSF_PendingRequest *pr, 42 struct GSF_PendingRequest *pr);
43 GNUNET_CONTAINER_HeapCostType weight);
44 43
45 44
46/** 45/**
@@ -64,27 +63,6 @@ GSF_plan_notify_request_done_ (const struct GSF_PendingRequest *pr);
64 63
65 64
66/** 65/**
67 * Get the lowest-weight entry for the respective peer
68 * from the plan. Removes the entry from the plan's queue.
69 *
70 * @param cp connected peer to query for the next request
71 * @return NULL if the queue for this peer is empty
72 */
73struct GSF_PendingRequest *
74GSF_plan_get_ (const struct GSF_ConnectedPeer *cp);
75
76
77/**
78 * Get the size of the request queue for the given peer.
79 *
80 * @param cp connected peer to query
81 * @return number of entries in this peer's request queue
82 */
83unsigned int
84GSF_plan_size_ (const struct GSF_ConnectedPeer *cp);
85
86
87/**
88 * Initialize plan subsystem. 66 * Initialize plan subsystem.
89 */ 67 */
90void 68void
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index 0fdcd0cf1..ff2f7a3a3 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -423,14 +423,12 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
423 * transmission to other peers (or at least determine its size). 423 * transmission to other peers (or at least determine its size).
424 * 424 *
425 * @param pr request to generate the message for 425 * @param pr request to generate the message for
426 * @param do_route are we routing the reply
427 * @param buf_size number of bytes available in buf 426 * @param buf_size number of bytes available in buf
428 * @param buf where to copy the message (can be NULL) 427 * @param buf where to copy the message (can be NULL)
429 * @return number of bytes needed (if > buf_size) or used 428 * @return number of bytes needed (if > buf_size) or used
430 */ 429 */
431size_t 430size_t
432GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, 431GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
433 int do_route,
434 size_t buf_size, 432 size_t buf_size,
435 void *buf) 433 void *buf)
436{ 434{
@@ -444,10 +442,13 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
444 size_t bf_size; 442 size_t bf_size;
445 struct GNUNET_TIME_Absolute now; 443 struct GNUNET_TIME_Absolute now;
446 int64_t ttl; 444 int64_t ttl;
445 int do_route;
446
447 447
448 k = 0; 448 k = 0;
449 bm = 0; 449 bm = 0;
450 if (GNUNET_YES != do_route) 450 do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY));
451 if (! do_route)
451 { 452 {
452 bm |= GET_MESSAGE_BIT_RETURN_TO; 453 bm |= GET_MESSAGE_BIT_RETURN_TO;
453 k++; 454 k++;
@@ -471,7 +472,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
471 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); 472 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
472 gm->header.size = htons (msize); 473 gm->header.size = htons (msize);
473 gm->type = htonl (pr->public_data.type); 474 gm->type = htonl (pr->public_data.type);
474 if (GNUNET_YES == do_route) 475 if (do_route)
475 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 476 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
476 pr->public_data.priority + 1); 477 pr->public_data.priority + 1);
477 else 478 else
@@ -486,7 +487,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
486 gm->query = pr->public_data.query; 487 gm->query = pr->public_data.query;
487 ext = (GNUNET_HashCode*) &gm[1]; 488 ext = (GNUNET_HashCode*) &gm[1];
488 k = 0; 489 k = 0;
489 if (GNUNET_YES != do_route) 490 if (! do_route)
490 GNUNET_PEER_resolve (pr->sender_pid, 491 GNUNET_PEER_resolve (pr->sender_pid,
491 (struct GNUNET_PeerIdentity*) &ext[k++]); 492 (struct GNUNET_PeerIdentity*) &ext[k++]);
492 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type) 493 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h
index 885947295..9632df015 100644
--- a/src/fs/gnunet-service-fs_pr.h
+++ b/src/fs/gnunet-service-fs_pr.h
@@ -234,14 +234,12 @@ GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr);
234 * transmission to other peers (or at least determine its size). 234 * transmission to other peers (or at least determine its size).
235 * 235 *
236 * @param pr request to generate the message for 236 * @param pr request to generate the message for
237 * @param do_route are we routing the reply
238 * @param buf_size number of bytes available in buf 237 * @param buf_size number of bytes available in buf
239 * @param buf where to copy the message (can be NULL) 238 * @param buf where to copy the message (can be NULL)
240 * @return number of bytes needed (if buf_size too small) or used 239 * @return number of bytes needed (if buf_size too small) or used
241 */ 240 */
242size_t 241size_t
243GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, 242GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
244 int do_route,
245 size_t buf_size, 243 size_t buf_size,
246 void *buf); 244 void *buf);
247 245