aboutsummaryrefslogtreecommitdiff
path: root/src/fragmentation
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-07-09 17:25:43 +0000
committerChristian Grothoff <christian@grothoff.org>2011-07-09 17:25:43 +0000
commit1e463296e3635a4a82584a3c4e205e096ce53ad1 (patch)
tree332c3d5480c43e75ce0a55b0508804e5df2fb8d0 /src/fragmentation
parent42b758ecd70b4057d2b9a11d16284c9b5fc15d36 (diff)
downloadgnunet-1e463296e3635a4a82584a3c4e205e096ce53ad1.tar.gz
gnunet-1e463296e3635a4a82584a3c4e205e096ce53ad1.zip
defrag
Diffstat (limited to 'src/fragmentation')
-rw-r--r--src/fragmentation/defragmentation_new.c192
1 files changed, 183 insertions, 9 deletions
diff --git a/src/fragmentation/defragmentation_new.c b/src/fragmentation/defragmentation_new.c
index 523a2cede..775885158 100644
--- a/src/fragmentation/defragmentation_new.c
+++ b/src/fragmentation/defragmentation_new.c
@@ -35,7 +35,7 @@ struct FragTimes
35 /** 35 /**
36 * The time the fragment was received. 36 * The time the fragment was received.
37 */ 37 */
38 GNUNET_TIME_Absolute time; 38 struct GNUNET_TIME_Absolute time;
39 39
40 /** 40 /**
41 * Number of the bit for the fragment (in [0,..,63]). 41 * Number of the bit for the fragment (in [0,..,63]).
@@ -45,7 +45,10 @@ struct FragTimes
45 45
46 46
47/** 47/**
48 * Information we keep for one message that is being assembled. 48 * Information we keep for one message that is being assembled. Note
49 * that we keep the context around even after the assembly is done to
50 * handle 'stray' messages that are received 'late'. A message
51 * context is ONLY discarded when the queue gets too big.
49 */ 52 */
50struct MessageContext 53struct MessageContext
51{ 54{
@@ -60,6 +63,11 @@ struct MessageContext
60 struct MessageContext *prev; 63 struct MessageContext *prev;
61 64
62 /** 65 /**
66 * Associated defragmentation context.
67 */
68 struct GNUNET_DEFRAGMENT_Context *dc;
69
70 /**
63 * Pointer to the assembled message, allocated at the 71 * Pointer to the assembled message, allocated at the
64 * end of this struct. 72 * end of this struct.
65 */ 73 */
@@ -76,7 +84,7 @@ struct MessageContext
76 * Task scheduled for transmitting the next ACK to the 84 * Task scheduled for transmitting the next ACK to the
77 * other peer. 85 * other peer.
78 */ 86 */
79 struct GNUNET_SCHEDULER_TaskIdentifier ack_task; 87 GNUNET_SCHEDULER_TaskIdentifier ack_task;
80 88
81 /** 89 /**
82 * When did we receive which fragment? Used to calculate 90 * When did we receive which fragment? Used to calculate
@@ -127,11 +135,6 @@ struct GNUNET_DEFRAGMENT_Context
127 struct GNUNET_STATISTICS_Handle *stats; 135 struct GNUNET_STATISTICS_Handle *stats;
128 136
129 /** 137 /**
130 * Closure for 'proc' and 'ackp'.
131 */
132 void *cls;
133
134 /**
135 * Head of list of messages we're defragmenting. 138 * Head of list of messages we're defragmenting.
136 */ 139 */
137 struct MessageContext *head; 140 struct MessageContext *head;
@@ -142,6 +145,11 @@ struct GNUNET_DEFRAGMENT_Context
142 struct MessageContext *tail; 145 struct MessageContext *tail;
143 146
144 /** 147 /**
148 * Closure for 'proc' and 'ackp'.
149 */
150 void *cls;
151
152 /**
145 * Function to call with defragmented messages. 153 * Function to call with defragmented messages.
146 */ 154 */
147 GNUNET_FRAGMENT_MessageProcessor proc; 155 GNUNET_FRAGMENT_MessageProcessor proc;
@@ -169,7 +177,10 @@ struct GNUNET_DEFRAGMENT_Context
169 */ 177 */
170 unsigned int list_size; 178 unsigned int list_size;
171 179
172 180 /**
181 * Maximum message size for each fragment.
182 */
183 uint16_t mtu;
173}; 184};
174 185
175 186
@@ -177,6 +188,7 @@ struct GNUNET_DEFRAGMENT_Context
177 * Create a defragmentation context. 188 * Create a defragmentation context.
178 * 189 *
179 * @param stats statistics context 190 * @param stats statistics context
191 * @param mtu the maximum message size for each fragment
180 * @param num_msgs how many fragmented messages 192 * @param num_msgs how many fragmented messages
181 * to we defragment at most at the same time? 193 * to we defragment at most at the same time?
182 * @param cls closure for proc and ackp 194 * @param cls closure for proc and ackp
@@ -187,6 +199,7 @@ struct GNUNET_DEFRAGMENT_Context
187 */ 199 */
188struct GNUNET_DEFRAGMENT_Context * 200struct GNUNET_DEFRAGMENT_Context *
189GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, 201GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
202 uint16_t mtu,
190 unsigned int num_msgs, 203 unsigned int num_msgs,
191 void *cls, 204 void *cls,
192 GNUNET_FRAGMENT_MessageProcessor proc, 205 GNUNET_FRAGMENT_MessageProcessor proc,
@@ -200,6 +213,7 @@ GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
200 dc->proc = proc; 213 dc->proc = proc;
201 dc->ackp = ackp; 214 dc->ackp = ackp;
202 dc->num_msgs = num_msgs; 215 dc->num_msgs = num_msgs;
216 dc->mtu = mtu;
203 dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */ 217 dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */
204 return dc; 218 return dc;
205} 219}
@@ -213,11 +227,50 @@ GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
213void 227void
214GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc) 228GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
215{ 229{
230 struct MessageContext *mc;
231
232 while (NULL != (mc = dc->head))
233 {
234 GNUNET_CONTAINER_DLL_remove (dc->head,
235 dc->tail,
236 mc);
237 dc->list_size--;
238 if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
239 {
240 GNUNET_SCHEDULER_cancel (mc->ack_task);
241 mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
242 }
243 GNUNET_free (mc);
244 }
245 GNUNET_assert (0 == dc->list_size);
216 GNUNET_free (dc); 246 GNUNET_free (dc);
217} 247}
218 248
219 249
220/** 250/**
251 * Send acknowledgement to the other peer now.
252 *
253 * @param cls the message context
254 * @param tc the scheduler context
255 */
256static void
257send_ack (void *cls,
258 const struct GNUNET_SCHEDULER_TaskContext *tc)
259{
260 struct MessageContext *mc = cls;
261 struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
262 struct FragmentAcknowledgement fa;
263
264 mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
265 fa.header.size = htons (sizeof (struct FragmentAcknowledgement));
266 fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
267 fa.fragment_id = htonl (mc->fragment_id);
268 fa.bits = GNUNET_htonll (mc->bits);
269 dc->ackp (dc->cls, &fa.header);
270}
271
272
273/**
221 * We have received a fragment. Process it. 274 * We have received a fragment. Process it.
222 * 275 *
223 * @param dc the context 276 * @param dc the context
@@ -227,6 +280,127 @@ void
227GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc, 280GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
228 const struct GNUNET_MessageHeader *msg) 281 const struct GNUNET_MessageHeader *msg)
229{ 282{
283 struct MessageContext *mc;
284 const struct FragmentHeader *fh;
285 uint16_t msize;
286 uint16_t foff;
287 uint32_t fid;
288 char *mbuf;
289 unsigned int bit;
290 struct GNUNET_TIME_Absolute now;
291 struct GNUNET_TIME_Relative delay;
292
293 if (ntohs(msg->size) < sizeof (struct FragmentHeader))
294 {
295 GNUNET_break_op (0);
296 return;
297 }
298 if (ntohs (msg->size) > dc->mtu)
299 {
300 GNUNET_break_op (0);
301 return;
302 }
303 fh = (const struct FragmentHeader*) msg;
304 msize = ntohs (fh->total_size);
305 fid = ntohl (fh->fragment_id);
306 foff = ntohl (fh->offset);
307 if (foff >= msize)
308 {
309 GNUNET_break_op (0);
310 return;
311 }
312 GNUNET_STATISTICS_update (dc->stats,
313 _("Fragments received"),
314 1,
315 GNUNET_NO);
316 mc = dc->head;
317 while ( (NULL != mc) &&
318 (fid != mc->fragment_id) )
319 mc = mc->next;
320 bit = foff / dc->mtu;
321 if (bit * dc->mtu + ntohs (msg->size)
322 - sizeof (struct FragmentHeader) > msize)
323 {
324 /* payload extends past total message size */
325 GNUNET_break_op (0);
326 return;
327 }
328 if ( (NULL != mc) && (msize != mc->total_size) )
329 {
330 /* inconsistent message size */
331 GNUNET_break_op (0);
332 return;
333 }
334 now = GNUNET_TIME_absolute_get ();
335 if (NULL == mc)
336 {
337 mc = GNUNET_malloc (sizeof (struct MessageContext) + msize);
338 mc->msg = (const struct GNUNET_MessageHeader*) &mc[1];
339 mc->dc = dc;
340 mc->total_size = msize;
341 mc->fragment_id = fid;
342 mc->last_update = now;
343 mc->bits = (msize + dc->mtu - 1) / (dc->mtu - sizeof (struct FragmentHeader));
344 GNUNET_CONTAINER_DLL_insert (dc->head,
345 dc->tail,
346 mc);
347 dc->list_size++;
348 if (dc->list_size > dc->num_msgs)
349 {
350 /* FIXME: discard oldest entry... */
351 }
352 }
353
354 /* copy data to 'mc' */
355 if (0 != (mc->bits & (1 << bit)))
356 {
357 mc->bits -= 1 << bit;
358 mbuf = (char* )&mc[1];
359 memcpy (&mbuf[bit * dc->mtu],
360 &fh[1],
361 ntohs (msg->size) - sizeof (struct FragmentHeader));
362 mc->last_update = now;
363 mc->frag_times[mc->frag_times_write_offset].time = now;
364 mc->frag_times[mc->frag_times_write_offset].bit = bit;
365 mc->frag_times_write_offset++;
366 if (0 == mc->bits)
367 {
368 /* message complete, notify! */
369 dc->proc (dc->cls,
370 mc->msg);
371 GNUNET_STATISTICS_update (dc->stats,
372 _("Messages defragmented"),
373 1,
374 GNUNET_NO);
375 }
376 }
377 else
378 {
379 GNUNET_STATISTICS_update (dc->stats,
380 _("Duplicate fragments received"),
381 1,
382 GNUNET_NO);
383 }
384
385 /* FIXME: update ACK timer (if 0==mc->bits, always ACK now!) */
386 delay = GNUNET_TIME_UNIT_SECONDS; /* FIXME: bad! */
387 if (mc->frag_times_write_offset == 1)
388 {
389 /* FIXME: use number-of-fragments * dc->delay */
390 }
391 else
392 {
393 /* FIXME: use best-fit regression */
394 }
395 /* FIXME: update dc->latency! */
396
397 if (0 == mc->bits)
398 delay = GNUNET_TIME_UNIT_ZERO;
399 if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
400 GNUNET_SCHEDULER_cancel (mc->ack_task);
401 mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay,
402 &send_ack,
403 mc);
230} 404}
231 405
232/* end of defragmentation_new.c */ 406/* end of defragmentation_new.c */