aboutsummaryrefslogtreecommitdiff
path: root/src/fragmentation/fragmentation.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-08-15 21:46:35 +0000
committerChristian Grothoff <christian@grothoff.org>2011-08-15 21:46:35 +0000
commit502af2167f7c218366666ca4944bd7cc54b5b19a (patch)
treea91fec5cc9769d260640bd91c6633cb9cf395524 /src/fragmentation/fragmentation.c
parent03af5a603b7cc53432249d5854cd412aa90dde0d (diff)
downloadgnunet-502af2167f7c218366666ca4944bd7cc54b5b19a.tar.gz
gnunet-502af2167f7c218366666ca4944bd7cc54b5b19a.zip
indentation
Diffstat (limited to 'src/fragmentation/fragmentation.c')
-rw-r--r--src/fragmentation/fragmentation.c209
1 files changed, 99 insertions, 110 deletions
diff --git a/src/fragmentation/fragmentation.c b/src/fragmentation/fragmentation.c
index 81db1b831..d3483fc8d 100644
--- a/src/fragmentation/fragmentation.c
+++ b/src/fragmentation/fragmentation.c
@@ -113,7 +113,7 @@ struct GNUNET_FRAGMENT_Context
113 * Target fragment size. 113 * Target fragment size.
114 */ 114 */
115 uint16_t mtu; 115 uint16_t mtu;
116 116
117}; 117};
118 118
119 119
@@ -124,8 +124,7 @@ struct GNUNET_FRAGMENT_Context
124 * @param tc scheduler context 124 * @param tc scheduler context
125 */ 125 */
126static void 126static void
127transmit_next (void *cls, 127transmit_next (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
128 const struct GNUNET_SCHEDULER_TaskContext *tc)
129{ 128{
130 struct GNUNET_FRAGMENT_Context *fc = cls; 129 struct GNUNET_FRAGMENT_Context *fc = cls;
131 char msg[fc->mtu]; 130 char msg[fc->mtu];
@@ -140,56 +139,53 @@ transmit_next (void *cls,
140 fc->task = GNUNET_SCHEDULER_NO_TASK; 139 fc->task = GNUNET_SCHEDULER_NO_TASK;
141 GNUNET_assert (GNUNET_NO == fc->proc_busy); 140 GNUNET_assert (GNUNET_NO == fc->proc_busy);
142 if (0 == fc->acks) 141 if (0 == fc->acks)
143 return; /* all done */ 142 return; /* all done */
144 143
145 /* calculate delay */ 144 /* calculate delay */
146 wrap = 0; 145 wrap = 0;
147 while (0 == (fc->acks & (1LL << fc->next_transmission))) 146 while (0 == (fc->acks & (1LL << fc->next_transmission)))
148 { 147 {
149 fc->next_transmission = (fc->next_transmission + 1) % 64; 148 fc->next_transmission = (fc->next_transmission + 1) % 64;
150 wrap |= (fc->next_transmission == 0); 149 wrap |= (fc->next_transmission == 0);
151 } 150 }
152 bit = fc->next_transmission; 151 bit = fc->next_transmission;
153 size = ntohs (fc->msg->size); 152 size = ntohs (fc->msg->size);
154 if (bit == size / (fc->mtu - sizeof (struct FragmentHeader))) 153 if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
155 fsize = size % (fc->mtu - sizeof (struct FragmentHeader)) + sizeof (struct FragmentHeader); 154 fsize =
155 size % (fc->mtu - sizeof (struct FragmentHeader)) +
156 sizeof (struct FragmentHeader);
156 else 157 else
157 fsize = fc->mtu; 158 fsize = fc->mtu;
158 if (fc->tracker != NULL) 159 if (fc->tracker != NULL)
159 delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, 160 delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, fsize);
160 fsize);
161 else 161 else
162 delay = GNUNET_TIME_UNIT_ZERO; 162 delay = GNUNET_TIME_UNIT_ZERO;
163 if (delay.rel_value > 0) 163 if (delay.rel_value > 0)
164 { 164 {
165 fc->task = GNUNET_SCHEDULER_add_delayed (delay, 165 fc->task = GNUNET_SCHEDULER_add_delayed (delay, &transmit_next, fc);
166 &transmit_next, 166 return;
167 fc); 167 }
168 return;
169 }
170 fc->next_transmission = (fc->next_transmission + 1) % 64; 168 fc->next_transmission = (fc->next_transmission + 1) % 64;
171 wrap |= (fc->next_transmission == 0); 169 wrap |= (fc->next_transmission == 0);
172 170
173 /* assemble fragmentation message */ 171 /* assemble fragmentation message */
174 mbuf = (const char*) &fc[1]; 172 mbuf = (const char *) &fc[1];
175 fh = (struct FragmentHeader*) msg; 173 fh = (struct FragmentHeader *) msg;
176 fh->header.size = htons (fsize); 174 fh->header.size = htons (fsize);
177 fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT); 175 fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
178 fh->fragment_id = htonl (fc->fragment_id); 176 fh->fragment_id = htonl (fc->fragment_id);
179 fh->total_size = fc->msg->size; /* already in big-endian */ 177 fh->total_size = fc->msg->size; /* already in big-endian */
180 fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit); 178 fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit);
181 memcpy (&fh[1], 179 memcpy (&fh[1],
182 &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))], 180 &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))],
183 fsize - sizeof (struct FragmentHeader)); 181 fsize - sizeof (struct FragmentHeader));
184 if (NULL != fc->tracker) 182 if (NULL != fc->tracker)
185 GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize); 183 GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
186 GNUNET_STATISTICS_update (fc->stats, 184 GNUNET_STATISTICS_update (fc->stats,
187 _("# fragments transmitted"), 185 _("# fragments transmitted"), 1, GNUNET_NO);
188 1, GNUNET_NO);
189 if (0 != fc->last_round.abs_value) 186 if (0 != fc->last_round.abs_value)
190 GNUNET_STATISTICS_update (fc->stats, 187 GNUNET_STATISTICS_update (fc->stats,
191 _("# fragments retransmitted"), 188 _("# fragments retransmitted"), 1, GNUNET_NO);
192 1, GNUNET_NO);
193 189
194 /* select next message to calculate delay */ 190 /* select next message to calculate delay */
195 bit = fc->next_transmission; 191 bit = fc->next_transmission;
@@ -199,21 +195,19 @@ transmit_next (void *cls,
199 else 195 else
200 fsize = fc->mtu; 196 fsize = fc->mtu;
201 if (NULL != fc->tracker) 197 if (NULL != fc->tracker)
202 delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, 198 delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker, fsize);
203 fsize);
204 else 199 else
205 delay = GNUNET_TIME_UNIT_ZERO; 200 delay = GNUNET_TIME_UNIT_ZERO;
206 if (wrap) 201 if (wrap)
207 { 202 {
208 /* full round transmitted wait 2x delay for ACK before going again */ 203 /* full round transmitted wait 2x delay for ACK before going again */
209 delay = GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 2), 204 delay = GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 2),
210 fc->delay); 205 fc->delay);
211 /* never use zero, need some time for ACK always */ 206 /* never use zero, need some time for ACK always */
212 delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_MILLISECONDS, 207 delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_MILLISECONDS, delay);
213 delay); 208 fc->last_round = GNUNET_TIME_absolute_get ();
214 fc->last_round = GNUNET_TIME_absolute_get (); 209 fc->wack = GNUNET_YES;
215 fc->wack = GNUNET_YES; 210 }
216 }
217 fc->proc_busy = GNUNET_YES; 211 fc->proc_busy = GNUNET_YES;
218 fc->delay_until = GNUNET_TIME_relative_to_absolute (delay); 212 fc->delay_until = GNUNET_TIME_relative_to_absolute (delay);
219 fc->proc (fc->proc_cls, &fh->header); 213 fc->proc (fc->proc_cls, &fh->header);
@@ -240,46 +234,46 @@ transmit_next (void *cls,
240 */ 234 */
241struct GNUNET_FRAGMENT_Context * 235struct GNUNET_FRAGMENT_Context *
242GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, 236GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
243 uint16_t mtu, 237 uint16_t mtu,
244 struct GNUNET_BANDWIDTH_Tracker *tracker, 238 struct GNUNET_BANDWIDTH_Tracker *tracker,
245 struct GNUNET_TIME_Relative delay, 239 struct GNUNET_TIME_Relative delay,
246 const struct GNUNET_MessageHeader *msg, 240 const struct GNUNET_MessageHeader *msg,
247 GNUNET_FRAGMENT_MessageProcessor proc, 241 GNUNET_FRAGMENT_MessageProcessor proc,
248 void *proc_cls) 242 void *proc_cls)
249{ 243{
250 struct GNUNET_FRAGMENT_Context *fc; 244 struct GNUNET_FRAGMENT_Context *fc;
251 size_t size; 245 size_t size;
252 uint64_t bits; 246 uint64_t bits;
253 247
254 GNUNET_STATISTICS_update (stats, 248 GNUNET_STATISTICS_update (stats, _("# messages fragmented"), 1, GNUNET_NO);
255 _("# messages fragmented"),
256 1, GNUNET_NO);
257 GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader)); 249 GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
258 size = ntohs (msg->size); 250 size = ntohs (msg->size);
259 GNUNET_STATISTICS_update (stats, 251 GNUNET_STATISTICS_update (stats,
260 _("# total size of fragmented messages"), 252 _("# total size of fragmented messages"),
261 size, GNUNET_NO); 253 size, GNUNET_NO);
262 GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader)); 254 GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
263 fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size); 255 fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
264 fc->stats = stats; 256 fc->stats = stats;
265 fc->mtu = mtu; 257 fc->mtu = mtu;
266 fc->tracker = tracker; 258 fc->tracker = tracker;
267 fc->delay = delay; 259 fc->delay = delay;
268 fc->msg = (const struct GNUNET_MessageHeader*)&fc[1]; 260 fc->msg = (const struct GNUNET_MessageHeader *) &fc[1];
269 fc->proc = proc; 261 fc->proc = proc;
270 fc->proc_cls = proc_cls; 262 fc->proc_cls = proc_cls;
271 fc->fragment_id = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 263 fc->fragment_id = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
272 UINT32_MAX); 264 UINT32_MAX);
273 memcpy (&fc[1], msg, size); 265 memcpy (&fc[1], msg, size);
274 bits = (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu - sizeof (struct FragmentHeader)); 266 bits =
267 (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu -
268 sizeof (struct
269 FragmentHeader));
275 GNUNET_assert (bits <= 64); 270 GNUNET_assert (bits <= 64);
276 if (bits == 64) 271 if (bits == 64)
277 fc->acks_mask = UINT64_MAX; /* set all 64 bit */ 272 fc->acks_mask = UINT64_MAX; /* set all 64 bit */
278 else 273 else
279 fc->acks_mask = (1LL << bits) - 1; /* set lowest 'bits' bit */ 274 fc->acks_mask = (1LL << bits) - 1; /* set lowest 'bits' bit */
280 fc->acks = fc->acks_mask; 275 fc->acks = fc->acks_mask;
281 fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, 276 fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
282 fc);
283 return fc; 277 return fc;
284} 278}
285 279
@@ -297,9 +291,9 @@ GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
297 GNUNET_assert (fc->proc_busy == GNUNET_YES); 291 GNUNET_assert (fc->proc_busy == GNUNET_YES);
298 fc->proc_busy = GNUNET_NO; 292 fc->proc_busy = GNUNET_NO;
299 GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK); 293 GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK);
300 fc->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (fc->delay_until), 294 fc->task =
301 &transmit_next, 295 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
302 fc); 296 (fc->delay_until), &transmit_next, fc);
303} 297}
304 298
305 299
@@ -314,72 +308,68 @@ GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
314 * GNUNET_NO if more messages are pending 308 * GNUNET_NO if more messages are pending
315 * GNUNET_SYSERR if this ack is not valid for this fc 309 * GNUNET_SYSERR if this ack is not valid for this fc
316 */ 310 */
317int 311int
318GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc, 312GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
319 const struct GNUNET_MessageHeader *msg) 313 const struct GNUNET_MessageHeader *msg)
320{ 314{
321 const struct FragmentAcknowledgement *fa; 315 const struct FragmentAcknowledgement *fa;
322 uint64_t abits; 316 uint64_t abits;
323 struct GNUNET_TIME_Relative ndelay; 317 struct GNUNET_TIME_Relative ndelay;
324 318
325 if (sizeof (struct FragmentAcknowledgement) != 319 if (sizeof (struct FragmentAcknowledgement) != ntohs (msg->size))
326 ntohs (msg->size)) 320 {
327 { 321 GNUNET_break_op (0);
328 GNUNET_break_op (0); 322 return GNUNET_SYSERR;
329 return GNUNET_SYSERR; 323 }
330 }
331 fa = (const struct FragmentAcknowledgement *) msg; 324 fa = (const struct FragmentAcknowledgement *) msg;
332 if (ntohl (fa->fragment_id) != fc->fragment_id) 325 if (ntohl (fa->fragment_id) != fc->fragment_id)
333 return GNUNET_SYSERR; /* not our ACK */ 326 return GNUNET_SYSERR; /* not our ACK */
334 abits = GNUNET_ntohll (fa->bits); 327 abits = GNUNET_ntohll (fa->bits);
335 if (GNUNET_YES == fc->wack) 328 if (GNUNET_YES == fc->wack)
336 { 329 {
337 /* normal ACK, can update running average of delay... */ 330 /* normal ACK, can update running average of delay... */
338 fc->wack = GNUNET_NO; 331 fc->wack = GNUNET_NO;
339 ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round); 332 ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
340 fc->delay.rel_value = (ndelay.rel_value + 3 * fc->delay.rel_value) / 4; 333 fc->delay.rel_value = (ndelay.rel_value + 3 * fc->delay.rel_value) / 4;
341 } 334 }
342 GNUNET_STATISTICS_update (fc->stats, 335 GNUNET_STATISTICS_update (fc->stats,
343 _("# fragment acknowledgements received"), 336 _("# fragment acknowledgements received"),
344 1, 337 1, GNUNET_NO);
345 GNUNET_NO);
346 if (abits != (fc->acks & abits)) 338 if (abits != (fc->acks & abits))
347 { 339 {
348 /* ID collission or message reordering, count! This should be rare! */ 340 /* ID collission or message reordering, count! This should be rare! */
349 GNUNET_STATISTICS_update (fc->stats, 341 GNUNET_STATISTICS_update (fc->stats,
350 _("# bits removed from fragmentation ACKs"), 342 _("# bits removed from fragmentation ACKs"),
351 1, GNUNET_NO); 343 1, GNUNET_NO);
352 } 344 }
353 fc->acks = abits & fc->acks_mask; 345 fc->acks = abits & fc->acks_mask;
354 if (0 != fc->acks) 346 if (0 != fc->acks)
347 {
348 /* more to transmit, do so right now (if tracker permits...) */
349 if (fc->task != GNUNET_SCHEDULER_NO_TASK)
350 {
351 /* schedule next transmission now, no point in waiting... */
352 GNUNET_SCHEDULER_cancel (fc->task);
353 fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
354 }
355 else
355 { 356 {
356 /* more to transmit, do so right now (if tracker permits...) */ 357 /* only case where there is no task should be if we're waiting
357 if (fc->task != GNUNET_SCHEDULER_NO_TASK) 358 * for the right to transmit again (proc_busy set to YES) */
358 { 359 GNUNET_assert (GNUNET_YES == fc->proc_busy);
359 /* schedule next transmission now, no point in waiting... */
360 GNUNET_SCHEDULER_cancel (fc->task);
361 fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
362 fc);
363 }
364 else
365 {
366 /* only case where there is no task should be if we're waiting
367 for the right to transmit again (proc_busy set to YES) */
368 GNUNET_assert (GNUNET_YES == fc->proc_busy);
369 }
370 return GNUNET_NO;
371 } 360 }
361 return GNUNET_NO;
362 }
372 363
373 /* all done */ 364 /* all done */
374 GNUNET_STATISTICS_update (fc->stats, 365 GNUNET_STATISTICS_update (fc->stats,
375 _("# fragmentation transmissions completed"), 366 _("# fragmentation transmissions completed"),
376 1, 367 1, GNUNET_NO);
377 GNUNET_NO);
378 if (fc->task != GNUNET_SCHEDULER_NO_TASK) 368 if (fc->task != GNUNET_SCHEDULER_NO_TASK)
379 { 369 {
380 GNUNET_SCHEDULER_cancel (fc->task); 370 GNUNET_SCHEDULER_cancel (fc->task);
381 fc->task = GNUNET_SCHEDULER_NO_TASK; 371 fc->task = GNUNET_SCHEDULER_NO_TASK;
382 } 372 }
383 return GNUNET_OK; 373 return GNUNET_OK;
384} 374}
385 375
@@ -406,4 +396,3 @@ GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc)
406 396
407 397
408/* end of fragmentation.c */ 398/* end of fragmentation.c */
409