aboutsummaryrefslogtreecommitdiff
path: root/src/fragmentation/fragmentation.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-07-13 11:08:01 +0000
committerChristian Grothoff <christian@grothoff.org>2011-07-13 11:08:01 +0000
commitb2c1c35b6344bd03d9a1b07afcd064b7be34094a (patch)
tree420f2529be00eb6ab40a29b568910440f8cc590a /src/fragmentation/fragmentation.c
parent1ad6c770476ec896d2a5c9f8e26237de36eb05e0 (diff)
downloadgnunet-b2c1c35b6344bd03d9a1b07afcd064b7be34094a.tar.gz
gnunet-b2c1c35b6344bd03d9a1b07afcd064b7be34094a.zip
rename
Diffstat (limited to 'src/fragmentation/fragmentation.c')
-rw-r--r--src/fragmentation/fragmentation.c374
1 files changed, 374 insertions, 0 deletions
diff --git a/src/fragmentation/fragmentation.c b/src/fragmentation/fragmentation.c
new file mode 100644
index 000000000..db66f5a5b
--- /dev/null
+++ b/src/fragmentation/fragmentation.c
@@ -0,0 +1,374 @@
1/*
2 This file is part of GNUnet
3 (C) 2009, 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20/**
21 * @file src/fragmentation/fragmentation_new.c
22 * @brief library to help fragment messages
23 * @author Christian Grothoff
24 */
25#include "platform.h"
26#include "gnunet_fragmentation_lib.h"
27#include "gnunet_protocols.h"
28#include "fragmentation.h"
29
30
31/**
32 * Fragmentation context.
33 */
34struct GNUNET_FRAGMENT_Context
35{
36 /**
37 * Statistics to use.
38 */
39 struct GNUNET_STATISTICS_Handle *stats;
40
41 /**
42 * Tracker for flow control.
43 */
44 struct GNUNET_BANDWIDTH_Tracker *tracker;
45
46 /**
47 * Current expected delay for ACKs.
48 */
49 struct GNUNET_TIME_Relative delay;
50
51 /**
52 * Time we transmitted the last message of the last round.
53 */
54 struct GNUNET_TIME_Absolute last_round;
55
56 /**
57 * Message to fragment (allocated at the end of this struct).
58 */
59 const struct GNUNET_MessageHeader *msg;
60
61 /**
62 * Function to call for transmissions.
63 */
64 GNUNET_FRAGMENT_MessageProcessor proc;
65
66 /**
67 * Closure for 'proc'.
68 */
69 void *proc_cls;
70
71 /**
72 * Bitfield, set to 1 for each unacknowledged fragment.
73 */
74 uint64_t acks;
75
76 /**
77 * Task performing work for the fragmenter.
78 */
79 GNUNET_SCHEDULER_TaskIdentifier task;
80
81 /**
82 * Our fragmentation ID. (chosen at random)
83 */
84 uint32_t fragment_id;
85
86 /**
87 * Round-robin selector for the next transmission.
88 */
89 unsigned int next_transmission;
90
91 /**
92 * GNUNET_YES if we called 'proc' and are now waiting for 'GNUNET_FRAGMENT_transmission_done'
93 */
94 int8_t proc_busy;
95
96 /**
97 * GNUNET_YES if we are waiting for an ACK.
98 */
99 int8_t wack;
100
101 /**
102 * Target fragment size.
103 */
104 uint16_t mtu;
105
106};
107
108
109/**
110 * Transmit the next fragment to the other peer.
111 *
112 * @param cls the 'struct GNUNET_FRAGMENT_Context'
113 * @param tc scheduler context
114 */
115static void
116transmit_next (void *cls,
117 const struct GNUNET_SCHEDULER_TaskContext *tc)
118{
119 struct GNUNET_FRAGMENT_Context *fc = cls;
120 char msg[fc->mtu];
121 const char *mbuf;
122 struct FragmentHeader *fh;
123 struct GNUNET_TIME_Relative delay;
124 unsigned int bit;
125 size_t size;
126 size_t fsize;
127 int wrap;
128
129 fc->task = GNUNET_SCHEDULER_NO_TASK;
130 GNUNET_assert (GNUNET_NO == fc->proc_busy);
131 if (0 == fc->acks)
132 return; /* all done */
133
134 /* calculate delay */
135 wrap = 0;
136 while (0 == (fc->acks & (1LL << fc->next_transmission)))
137 {
138 fc->next_transmission = (fc->next_transmission + 1) % 64;
139 wrap |= (fc->next_transmission == 0);
140 }
141 bit = fc->next_transmission;
142 size = ntohs (fc->msg->size);
143 if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
144 fsize = size % (fc->mtu - sizeof (struct FragmentHeader)) + sizeof (struct FragmentHeader);
145 else
146 fsize = fc->mtu;
147 if (fc->tracker != NULL)
148 delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
149 fsize);
150 else
151 delay = GNUNET_TIME_UNIT_ZERO;
152 if (delay.rel_value > 0)
153 {
154 fc->task = GNUNET_SCHEDULER_add_delayed (delay,
155 &transmit_next,
156 fc);
157 return;
158 }
159 fc->next_transmission = (fc->next_transmission + 1) % 64;
160 wrap |= (fc->next_transmission == 0);
161
162 /* assemble fragmentation message */
163 mbuf = (const char*) &fc[1];
164 fh = (struct FragmentHeader*) msg;
165 fh->header.size = htons (fsize);
166 fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
167 fh->fragment_id = htonl (fc->fragment_id);
168 fh->total_size = fc->msg->size; /* already in big-endian */
169 fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit);
170 memcpy (&fh[1],
171 &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))],
172 fsize - sizeof (struct FragmentHeader));
173 if (NULL != fc->tracker)
174 GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
175 GNUNET_STATISTICS_update (fc->stats,
176 _("Fragments transmitted"),
177 1, GNUNET_NO);
178 if (0 != fc->last_round.abs_value)
179 GNUNET_STATISTICS_update (fc->stats,
180 _("Fragments retransmitted"),
181 1, GNUNET_NO);
182
183 /* select next message to calculate delay */
184 bit = fc->next_transmission;
185 size = ntohs (fc->msg->size);
186 if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
187 fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
188 else
189 fsize = fc->mtu;
190 if (NULL != fc->tracker)
191 delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
192 fsize);
193 else
194 delay = GNUNET_TIME_UNIT_ZERO;
195 if (wrap)
196 {
197 /* full round transmitted wait 2x delay for ACK before going again */
198 delay = GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 2),
199 fc->delay);
200 fc->last_round = GNUNET_TIME_absolute_get ();
201 fc->wack = GNUNET_YES;
202 }
203 fc->proc_busy = GNUNET_YES;
204 fc->proc (fc->proc_cls, &fh->header);
205}
206
207
208/**
209 * Create a fragmentation context for the given message.
210 * Fragments the message into fragments of size "mtu" or
211 * less. Calls 'proc' on each un-acknowledged fragment,
212 * using both the expected 'delay' between messages and
213 * acknowledgements and the given 'tracker' to guide the
214 * frequency of calls to 'proc'.
215 *
216 * @param stats statistics context
217 * @param mtu the maximum message size for each fragment
218 * @param tracker bandwidth tracker to use for flow control (can be NULL)
219 * @param delay expected delay between fragment transmission
220 * and ACK based on previous messages
221 * @param msg the message to fragment
222 * @param proc function to call for each fragment to transmit
223 * @param proc_cls closure for proc
224 * @return the fragmentation context
225 */
226struct GNUNET_FRAGMENT_Context *
227GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
228 uint16_t mtu,
229 struct GNUNET_BANDWIDTH_Tracker *tracker,
230 struct GNUNET_TIME_Relative delay,
231 const struct GNUNET_MessageHeader *msg,
232 GNUNET_FRAGMENT_MessageProcessor proc,
233 void *proc_cls)
234{
235 struct GNUNET_FRAGMENT_Context *fc;
236 size_t size;
237 uint64_t bits;
238
239 GNUNET_STATISTICS_update (stats,
240 _("Messages fragmented"),
241 1, GNUNET_NO);
242 GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
243 size = ntohs (msg->size);
244 GNUNET_STATISTICS_update (stats,
245 _("Total size of fragmented messages"),
246 size, GNUNET_NO);
247 GNUNET_assert (size > mtu);
248 fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
249 fc->stats = stats;
250 fc->mtu = mtu;
251 fc->tracker = tracker;
252 fc->delay = delay;
253 fc->msg = (const struct GNUNET_MessageHeader*)&fc[1];
254 fc->proc = proc;
255 fc->proc_cls = proc_cls;
256 fc->fragment_id = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
257 UINT32_MAX);
258 memcpy (&fc[1], msg, size);
259 bits = (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu - sizeof (struct FragmentHeader));
260 GNUNET_assert (bits <= 64);
261 if (bits == 64)
262 fc->acks = UINT64_MAX; /* set all 64 bit */
263 else
264 fc->acks = (1LL << bits) - 1; /* set lowest 'bits' bit */
265 fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
266 fc);
267 return fc;
268}
269
270
271/**
272 * Continuation to call from the 'proc' function after the fragment
273 * has been transmitted (and hence the next fragment can now be
274 * given to proc).
275 *
276 * @param fc fragmentation context
277 */
278void
279GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
280{
281 GNUNET_assert (fc->proc_busy == GNUNET_YES);
282 fc->proc_busy = GNUNET_NO;
283 GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK);
284 fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
285 fc);
286}
287
288
289/**
290 * Process an acknowledgement message we got from the other
291 * side (to control re-transmits).
292 *
293 * @param fc fragmentation context
294 * @param msg acknowledgement message we received
295 * @return GNUNET_OK if this ack completes the work of the 'fc'
296 * (all fragments have been received);
297 * GNUNET_NO if more messages are pending
298 * GNUNET_SYSERR if this ack is not valid for this fc
299 */
300int
301GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
302 const struct GNUNET_MessageHeader *msg)
303{
304 const struct FragmentAcknowledgement *fa;
305 uint64_t abits;
306 struct GNUNET_TIME_Relative ndelay;
307
308 if (sizeof (struct FragmentAcknowledgement) !=
309 ntohs (msg->size))
310 {
311 GNUNET_break_op (0);
312 return GNUNET_SYSERR;
313 }
314 fa = (const struct FragmentAcknowledgement *) msg;
315 if (ntohl (fa->fragment_id) != fc->fragment_id)
316 return GNUNET_SYSERR; /* not our ACK */
317 abits = GNUNET_ntohll (fa->bits);
318 if (GNUNET_YES == fc->wack)
319 {
320 /* normal ACK, can update running average of delay... */
321 fc->wack = GNUNET_NO;
322 ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
323 fc->delay.rel_value = (ndelay.rel_value + 3 * fc->delay.rel_value) / 4;
324 }
325 if (abits != (fc->acks & abits))
326 {
327 /* ID collission or message reordering, count! This should be rare! */
328 GNUNET_STATISTICS_update (fc->stats,
329 _("Bits removed from ACK"),
330 1, GNUNET_NO);
331 }
332 fc->acks = abits;
333 if (0 != fc->acks)
334 {
335 /* more to transmit, do so right now (if tracker permits...) */
336 GNUNET_SCHEDULER_cancel (fc->task);
337 fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
338 fc);
339 return GNUNET_NO;
340 }
341
342 /* all done */
343 if (fc->task != GNUNET_SCHEDULER_NO_TASK)
344 {
345 GNUNET_SCHEDULER_cancel (fc->task);
346 fc->task = GNUNET_SCHEDULER_NO_TASK;
347 }
348 return GNUNET_OK;
349}
350
351
352/**
353 * Destroy the given fragmentation context (stop calling 'proc', free
354 * resources).
355 *
356 * @param fc fragmentation context
357 * @return average delay between transmission and ACK for the
358 * last message, FOREVER if the message was not fully transmitted
359 */
360struct GNUNET_TIME_Relative
361GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc)
362{
363 struct GNUNET_TIME_Relative ret;
364
365 if (fc->task != GNUNET_SCHEDULER_NO_TASK)
366 GNUNET_SCHEDULER_cancel (fc->task);
367 ret = fc->delay;
368 GNUNET_free (fc);
369 return ret;
370}
371
372
373/* end of fragmentation_new.c */
374