aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pe.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-03-11 16:23:52 +0000
committerChristian Grothoff <christian@grothoff.org>2011-03-11 16:23:52 +0000
commit64821d4ae43b03b30de3dd136137598c0d5a2ab2 (patch)
tree1ed54d5721882d5f1e6bf225616d202d9d8a7b08 /src/fs/gnunet-service-fs_pe.c
parentd984e2895f96f67fe2c44f27cdacfbb404485ada (diff)
downloadgnunet-64821d4ae43b03b30de3dd136137598c0d5a2ab2.tar.gz
gnunet-64821d4ae43b03b30de3dd136137598c0d5a2ab2.zip
stuff
Diffstat (limited to 'src/fs/gnunet-service-fs_pe.c')
-rw-r--r--src/fs/gnunet-service-fs_pe.c219
1 files changed, 150 insertions, 69 deletions
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c
index db501b761..e7608653e 100644
--- a/src/fs/gnunet-service-fs_pe.c
+++ b/src/fs/gnunet-service-fs_pe.c
@@ -26,28 +26,130 @@
26#include "platform.h" 26#include "platform.h"
27#include "gnunet-service-fs_cp.h" 27#include "gnunet-service-fs_cp.h"
28#include "gnunet-service-fs_pe.h" 28#include "gnunet-service-fs_pe.h"
29#include "gnunet-service-fs_pr.h"
29 30
30/** 31/**
31 * Hash map from peer identities to GNUNET_CONTAINER_Heap's with 32 * Transmission plan for a peer.
32 * pending requests as entries. 33 */
34struct PeerPlan
35{
36 /**
37 * Heap with pending queries, smaller weights mean higher priority.
38 */
39 struct GNUNET_CONTAINER_Heap *heap;
40
41 /**
42 * Current transmission request handle.
43 */
44 struct GSF_PeerTransmitHandle *pth;
45
46 /**
47 * Peer for which this is the plan.
48 */
49 struct GSF_ConnectedPeer *cp;
50
51 /**
52 * Current task for executing the plan.
53 */
54 GNUNET_SCHEDULER_TaskIdentifier task;
55};
56
57
58/**
59 * Hash map from peer identities to PeerPlans.
33 */ 60 */
34static struct GNUNET_CONTAINER_MultiHashMap *plans; 61static struct GNUNET_CONTAINER_MultiHashMap *plans;
35 62
36 63
37/** 64/**
38 * Get the size of the request queue for the given peer. 65 * Figure out when and how to transmit to the given peer.
66 *
67 * @param cls the 'struct GSF_ConnectedPeer' for transmission
68 * @param tc scheduler context
69 */
70static void
71schedule_peer_transmission (void *cls,
72 const struct GNUNET_SCHEDULER_TaskContext *tc);
73
74
75/**
76 * Function called to get a message for transmission.
39 * 77 *
40 * @param cp connected peer to query 78 * @param cls closure
41 * @return number of entries in this peer's request queue 79 * @param buf_size number of bytes available in buf
80 * @param buf where to copy the message, NULL on error (peer disconnect)
81 * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
42 */ 82 */
43static struct GNUNET_CONTAINER_Heap * 83static size_t
44get_heap (const struct GSF_ConnectedPeer *cp) 84transmit_message_callback (void *cls,
85 size_t buf_size,
86 void *buf)
45{ 87{
46 struct GNUNET_PeerIdentity id; 88 struct PeerPlan *pp = cls;
89 struct GSF_PendingRequest *pr;
90 size_t msize;
47 91
48 GSF_connected_peer_get_identity_ (cp, &id); 92 if (NULL == buf)
49 return GNUNET_CONTAINER_multihashmap_get (plans, 93 {
50 &id.hashPubKey); 94 /* failed, try again... */
95 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
96 return 0;
97 }
98 pr = GNUNET_CONTAINER_heap_peek (pp->heap);
99 msize = GSF_pending_request_get_message_ (pr, buf_size, buf);
100 if (msize > buf_size)
101 {
102 /* buffer to small (message changed), try again */
103 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
104 return 0;
105 }
106 /* remove from root, add again elsewhere... */
107 GNUNET_assert (pr == GNUNET_CONTAINER_heap_remove_root (pp->heap));
108 GSF_plan_add_ (pp->cp, pr);
109 return msize;
110}
111
112
113/**
114 * Figure out when and how to transmit to the given peer.
115 *
116 * @param cls the 'struct PeerPlan'
117 * @param tc scheduler context
118 */
119static void
120schedule_peer_transmission (void *cls,
121 const struct GNUNET_SCHEDULER_TaskContext *tc)
122{
123 struct PeerPlan *pp = cls;
124 struct GSF_PendingRequest *pr;
125 size_t msize;
126 struct GNUNET_TIME_Relative delay;
127
128 pp->task = GNUNET_SCHEDULER_NO_TASK;
129 if (NULL == pp->heap)
130 return;
131 if (0 == GNUNET_CONTAINER_heap_get_size (pp->heap))
132 return;
133 GNUNET_assert (NULL == pp->pth);
134 pr = GNUNET_CONTAINER_heap_peek (pp->heap);
135 if (0) // FIXME: if (re)transmission should wait, wait...
136 {
137 delay = GNUNET_TIME_UNIT_SECONDS;
138 // FIXME
139 pp->task = GNUNET_SCHEDULER_add_delayed (delay,
140 &schedule_peer_transmission,
141 pp);
142 return;
143 }
144 msize = GSF_pending_request_get_message_ (pr, 0, NULL);
145 pp->pth = GSF_peer_transmit_ (pp->cp,
146 GNUNET_YES,
147 0 /* FIXME: pr->priority? */,
148 GNUNET_TIME_UNIT_FOREVER_REL,
149 msize,
150 &transmit_message_callback,
151 pp);
152 GNUNET_assert (NULL != pp->pth);
51} 153}
52 154
53 155
@@ -56,31 +158,42 @@ get_heap (const struct GSF_ConnectedPeer *cp)
56 * 158 *
57 * @param cp peer with the entry 159 * @param cp peer with the entry
58 * @param pr request with the entry 160 * @param pr request with the entry
59 * @param weight determines position of the entry in the cp queue,
60 * lower weights are earlier in the queue
61 */ 161 */
62void 162void
63GSF_plan_add_ (const struct GSF_ConnectedPeer *cp, 163GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
64 struct GSF_PendingRequest *pr, 164 struct GSF_PendingRequest *pr)
65 GNUNET_CONTAINER_HeapCostType weight)
66{ 165{
67 struct GNUNET_PeerIdentity id; 166 struct GNUNET_PeerIdentity id;
68 struct GNUNET_CONTAINER_Heap *h; 167 struct PeerPlan *pp;
69 168 GNUNET_CONTAINER_HeapCostType weight;
169
70 GSF_connected_peer_get_identity_ (cp, &id); 170 GSF_connected_peer_get_identity_ (cp, &id);
71 h = GNUNET_CONTAINER_multihashmap_get (plans, 171 pp = GNUNET_CONTAINER_multihashmap_get (plans,
72 &id.hashPubKey); 172 &id.hashPubKey);
73 if (NULL == h) 173 if (NULL == pp)
74 { 174 {
75 h = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 175 pp = GNUNET_malloc (sizeof (struct PeerPlan));
176 pp->heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
76 GNUNET_CONTAINER_multihashmap_put (plans, 177 GNUNET_CONTAINER_multihashmap_put (plans,
77 &id.hashPubKey, 178 &id.hashPubKey,
78 h, 179 pp,
79 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 180 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
80 } 181 }
81 GNUNET_CONTAINER_heap_insert (h, 182 weight = 0; // FIXME: calculate real weight!
183 GNUNET_CONTAINER_heap_insert (pp->heap,
82 pr, 184 pr,
83 weight); 185 weight);
186 if (pp->pth != NULL)
187 {
188 if (pr != GNUNET_CONTAINER_heap_peek (pp->heap))
189 return;
190 GSF_peer_transmit_cancel_ (pp->pth);
191 pp->pth = NULL;
192 }
193 if (GNUNET_SCHEDULER_NO_TASK != pp->task)
194 GNUNET_SCHEDULER_cancel (pp->task);
195 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
196 pp);
84} 197}
85 198
86 199
@@ -94,15 +207,20 @@ void
94GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp) 207GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
95{ 208{
96 struct GNUNET_PeerIdentity id; 209 struct GNUNET_PeerIdentity id;
97 struct GNUNET_CONTAINER_Heap *h; 210 struct PeerPlan *pp;
98 211
99 GSF_connected_peer_get_identity_ (cp, &id); 212 GSF_connected_peer_get_identity_ (cp, &id);
100 h = GNUNET_CONTAINER_multihashmap_get (plans, 213 pp = GNUNET_CONTAINER_multihashmap_get (plans,
101 &id.hashPubKey); 214 &id.hashPubKey);
102 GNUNET_CONTAINER_multihashmap_remove (plans, 215 GNUNET_CONTAINER_multihashmap_remove (plans,
103 &id.hashPubKey, 216 &id.hashPubKey,
104 h); 217 pp);
105 GNUNET_CONTAINER_heap_destroy (h); 218 if (NULL != pp->pth)
219 GSF_peer_transmit_cancel_ (pp->pth);
220 if (GNUNET_SCHEDULER_NO_TASK != pp->task)
221 GNUNET_SCHEDULER_cancel (pp->task);
222 GNUNET_CONTAINER_heap_destroy (pp->heap);
223 GNUNET_free (pp);
106} 224}
107 225
108 226
@@ -152,11 +270,11 @@ find_request (void *cls,
152 270
153 271
154/** 272/**
155 * Remove the given request from all heaps. * 273 * Remove the given request from all heaps. * FIXME: O(n) -- inefficient!
156 * 274 *
157 * @param cls 'struct GSF_PendingRequest' to purge 275 * @param cls 'struct GSF_PendingRequest' to purge
158 * @param key identity of the peer we're currently looking at (unused) 276 * @param key identity of the peer we're currently looking at (unused)
159 * @param value request heap for the given peer to search for the 'cls' 277 * @param value PeerPlan for the given peer to search for the 'cls'
160 * @return GNUNET_OK (continue iteration) 278 * @return GNUNET_OK (continue iteration)
161 */ 279 */
162static int 280static int
@@ -165,7 +283,8 @@ remove_request (void *cls,
165 void *value) 283 void *value)
166{ 284{
167 const struct GSF_PendingRequest *pr = cls; 285 const struct GSF_PendingRequest *pr = cls;
168 struct GNUNET_CONTAINER_Heap *h = value; 286 struct PeerPlan *pp = value;
287 struct GNUNET_CONTAINER_Heap *h = pp->heap;
169 struct FindRequestClosure frc; 288 struct FindRequestClosure frc;
170 289
171 frc.pr = pr; 290 frc.pr = pr;
@@ -199,44 +318,6 @@ GSF_plan_notify_request_done_ (const struct GSF_PendingRequest *pr)
199 318
200 319
201/** 320/**
202 * Get the lowest-weight entry for the respective peer
203 * from the plan. Removes the entry from the plan's queue.
204 *
205 * @param cp connected peer to query for the next request
206 * @return NULL if the queue for this peer is empty
207 */
208struct GSF_PendingRequest *
209GSF_plan_get_ (const struct GSF_ConnectedPeer *cp)
210{
211 struct GNUNET_CONTAINER_Heap *h;
212
213 h = get_heap (cp);
214 if (NULL == h)
215 return NULL;
216 return GNUNET_CONTAINER_heap_remove_root (h);
217}
218
219
220/**
221 * Get the size of the request queue for the given peer.
222 *
223 * @param cp connected peer to query
224 * @return number of entries in this peer's request queue
225 */
226unsigned int
227GSF_plan_size_ (const struct GSF_ConnectedPeer *cp)
228{
229 struct GNUNET_CONTAINER_Heap *h;
230
231 h = get_heap (cp);
232 if (NULL == h)
233 return 0;
234 return GNUNET_CONTAINER_heap_get_size (h);
235}
236
237
238
239/**
240 * Initialize plan subsystem. 321 * Initialize plan subsystem.
241 */ 322 */
242void 323void