summaryrefslogtreecommitdiff
path: root/src/util/mst.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/mst.c')
-rw-r--r--src/util/mst.c457
1 files changed, 229 insertions, 228 deletions
diff --git a/src/util/mst.c b/src/util/mst.c
index 4ccf4988f..a3f1368e4 100644
--- a/src/util/mst.c
+++ b/src/util/mst.c
@@ -34,13 +34,14 @@
34#define ALIGN_FACTOR 8 34#define ALIGN_FACTOR 8
35#endif 35#endif
36 36
37#define LOG(kind, ...) GNUNET_log_from(kind, "util-mst", __VA_ARGS__) 37#define LOG(kind, ...) GNUNET_log_from (kind, "util-mst", __VA_ARGS__)
38 38
39 39
40/** 40/**
41 * Handle to a message stream tokenizer. 41 * Handle to a message stream tokenizer.
42 */ 42 */
43struct GNUNET_MessageStreamTokenizer { 43struct GNUNET_MessageStreamTokenizer
44{
44 /** 45 /**
45 * Function to call on completed messages. 46 * Function to call on completed messages.
46 */ 47 */
@@ -81,13 +82,13 @@ struct GNUNET_MessageStreamTokenizer {
81 * @return handle to tokenizer 82 * @return handle to tokenizer
82 */ 83 */
83struct GNUNET_MessageStreamTokenizer * 84struct GNUNET_MessageStreamTokenizer *
84GNUNET_MST_create(GNUNET_MessageTokenizerCallback cb, 85GNUNET_MST_create (GNUNET_MessageTokenizerCallback cb,
85 void *cb_cls) 86 void *cb_cls)
86{ 87{
87 struct GNUNET_MessageStreamTokenizer *ret; 88 struct GNUNET_MessageStreamTokenizer *ret;
88 89
89 ret = GNUNET_new(struct GNUNET_MessageStreamTokenizer); 90 ret = GNUNET_new (struct GNUNET_MessageStreamTokenizer);
90 ret->hdr = GNUNET_malloc(GNUNET_MIN_MESSAGE_SIZE); 91 ret->hdr = GNUNET_malloc (GNUNET_MIN_MESSAGE_SIZE);
91 ret->curr_buf = GNUNET_MIN_MESSAGE_SIZE; 92 ret->curr_buf = GNUNET_MIN_MESSAGE_SIZE;
92 ret->cb = cb; 93 ret->cb = cb;
93 ret->cb_cls = cb_cls; 94 ret->cb_cls = cb_cls;
@@ -110,11 +111,11 @@ GNUNET_MST_create(GNUNET_MessageTokenizerCallback cb,
110 * #GNUNET_SYSERR if the data stream is corrupt 111 * #GNUNET_SYSERR if the data stream is corrupt
111 */ 112 */
112int 113int
113GNUNET_MST_from_buffer(struct GNUNET_MessageStreamTokenizer *mst, 114GNUNET_MST_from_buffer (struct GNUNET_MessageStreamTokenizer *mst,
114 const char *buf, 115 const char *buf,
115 size_t size, 116 size_t size,
116 int purge, 117 int purge,
117 int one_shot) 118 int one_shot)
118{ 119{
119 const struct GNUNET_MessageHeader *hdr; 120 const struct GNUNET_MessageHeader *hdr;
120 size_t delta; 121 size_t delta;
@@ -125,203 +126,203 @@ GNUNET_MST_from_buffer(struct GNUNET_MessageStreamTokenizer *mst,
125 int ret; 126 int ret;
126 int cbret; 127 int cbret;
127 128
128 GNUNET_assert(mst->off <= mst->pos); 129 GNUNET_assert (mst->off <= mst->pos);
129 GNUNET_assert(mst->pos <= mst->curr_buf); 130 GNUNET_assert (mst->pos <= mst->curr_buf);
130 LOG(GNUNET_ERROR_TYPE_DEBUG, 131 LOG (GNUNET_ERROR_TYPE_DEBUG,
131 "MST receives %u bytes with %u bytes already in private buffer\n", 132 "MST receives %u bytes with %u bytes already in private buffer\n",
132 (unsigned int)size, 133 (unsigned int) size,
133 (unsigned int)(mst->pos - mst->off)); 134 (unsigned int) (mst->pos - mst->off));
134 ret = GNUNET_OK; 135 ret = GNUNET_OK;
135 ibuf = (char *)mst->hdr; 136 ibuf = (char *) mst->hdr;
136 while (mst->pos > 0) 137 while (mst->pos > 0)
137 { 138 {
138do_align: 139do_align:
139 GNUNET_assert(mst->pos >= mst->off); 140 GNUNET_assert (mst->pos >= mst->off);
140 if ((mst->curr_buf - mst->off < sizeof(struct GNUNET_MessageHeader)) || 141 if ((mst->curr_buf - mst->off < sizeof(struct GNUNET_MessageHeader)) ||
141 (0 != (mst->off % ALIGN_FACTOR))) 142 (0 != (mst->off % ALIGN_FACTOR)))
142 { 143 {
143 /* need to align or need more space */ 144 /* need to align or need more space */
144 mst->pos -= mst->off; 145 mst->pos -= mst->off;
145 memmove(ibuf, 146 memmove (ibuf,
146 &ibuf[mst->off], 147 &ibuf[mst->off],
147 mst->pos); 148 mst->pos);
148 mst->off = 0; 149 mst->off = 0;
149 } 150 }
150 if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader)) 151 if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
151 { 152 {
152 delta 153 delta
153 = GNUNET_MIN(sizeof(struct GNUNET_MessageHeader) 154 = GNUNET_MIN (sizeof(struct GNUNET_MessageHeader)
154 - (mst->pos - mst->off), 155 - (mst->pos - mst->off),
155 size); 156 size);
156 GNUNET_memcpy(&ibuf[mst->pos], 157 GNUNET_memcpy (&ibuf[mst->pos],
157 buf, 158 buf,
158 delta); 159 delta);
159 mst->pos += delta; 160 mst->pos += delta;
160 buf += delta; 161 buf += delta;
161 size -= delta; 162 size -= delta;
162 } 163 }
163 if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader)) 164 if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
164 { 165 {
165 if (purge) 166 if (purge)
166 { 167 {
167 mst->off = 0; 168 mst->off = 0;
168 mst->pos = 0; 169 mst->pos = 0;
169 } 170 }
170 return GNUNET_OK; 171 return GNUNET_OK;
171 } 172 }
172 hdr = (const struct GNUNET_MessageHeader *)&ibuf[mst->off]; 173 hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
173 want = ntohs(hdr->size); 174 want = ntohs (hdr->size);
175 if (want < sizeof(struct GNUNET_MessageHeader))
176 {
177 GNUNET_break_op (0);
178 return GNUNET_SYSERR;
179 }
180 if ((mst->curr_buf - mst->off < want) &&
181 (mst->off > 0))
182 {
183 /* can get more space by moving */
184 mst->pos -= mst->off;
185 memmove (ibuf,
186 &ibuf[mst->off],
187 mst->pos);
188 mst->off = 0;
189 }
190 if (mst->curr_buf < want)
191 {
192 /* need to get more space by growing buffer */
193 GNUNET_assert (0 == mst->off);
194 mst->hdr = GNUNET_realloc (mst->hdr,
195 want);
196 ibuf = (char *) mst->hdr;
197 mst->curr_buf = want;
198 }
199 hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
200 if (mst->pos - mst->off < want)
201 {
202 delta = GNUNET_MIN (want - (mst->pos - mst->off),
203 size);
204 GNUNET_assert (mst->pos + delta <= mst->curr_buf);
205 GNUNET_memcpy (&ibuf[mst->pos],
206 buf,
207 delta);
208 mst->pos += delta;
209 buf += delta;
210 size -= delta;
211 }
212 if (mst->pos - mst->off < want)
213 {
214 if (purge)
215 {
216 mst->off = 0;
217 mst->pos = 0;
218 }
219 return GNUNET_OK;
220 }
221 if (one_shot == GNUNET_SYSERR)
222 {
223 /* cannot call callback again, but return value saying that
224 * we have another full message in the buffer */
225 ret = GNUNET_NO;
226 goto copy;
227 }
228 if (one_shot == GNUNET_YES)
229 one_shot = GNUNET_SYSERR;
230 mst->off += want;
231 if (GNUNET_OK !=
232 (cbret = mst->cb (mst->cb_cls,
233 hdr)))
234 {
235 if (GNUNET_SYSERR == cbret)
236 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
237 "Failure processing message of type %u and size %u\n",
238 ntohs (hdr->type),
239 ntohs (hdr->size));
240 return GNUNET_SYSERR;
241 }
242 if (mst->off == mst->pos)
243 {
244 /* reset to beginning of buffer, it's free right now! */
245 mst->off = 0;
246 mst->pos = 0;
247 }
248 }
249 GNUNET_assert (0 == mst->pos);
250 while (size > 0)
251 {
252 LOG (GNUNET_ERROR_TYPE_DEBUG,
253 "Server-mst has %u bytes left in inbound buffer\n",
254 (unsigned int) size);
255 if (size < sizeof(struct GNUNET_MessageHeader))
256 break;
257 offset = (unsigned long) buf;
258 need_align = (0 != (offset % ALIGN_FACTOR)) ? GNUNET_YES : GNUNET_NO;
259 if (GNUNET_NO == need_align)
260 {
261 /* can try to do zero-copy and process directly from original buffer */
262 hdr = (const struct GNUNET_MessageHeader *) buf;
263 want = ntohs (hdr->size);
174 if (want < sizeof(struct GNUNET_MessageHeader)) 264 if (want < sizeof(struct GNUNET_MessageHeader))
175 { 265 {
176 GNUNET_break_op(0); 266 GNUNET_break_op (0);
177 return GNUNET_SYSERR; 267 mst->off = 0;
178 } 268 return GNUNET_SYSERR;
179 if ((mst->curr_buf - mst->off < want) && 269 }
180 (mst->off > 0)) 270 if (size < want)
181 { 271 break; /* or not: buffer incomplete, so copy to private buffer... */
182 /* can get more space by moving */
183 mst->pos -= mst->off;
184 memmove(ibuf,
185 &ibuf[mst->off],
186 mst->pos);
187 mst->off = 0;
188 }
189 if (mst->curr_buf < want)
190 {
191 /* need to get more space by growing buffer */
192 GNUNET_assert(0 == mst->off);
193 mst->hdr = GNUNET_realloc(mst->hdr,
194 want);
195 ibuf = (char *)mst->hdr;
196 mst->curr_buf = want;
197 }
198 hdr = (const struct GNUNET_MessageHeader *)&ibuf[mst->off];
199 if (mst->pos - mst->off < want)
200 {
201 delta = GNUNET_MIN(want - (mst->pos - mst->off),
202 size);
203 GNUNET_assert(mst->pos + delta <= mst->curr_buf);
204 GNUNET_memcpy(&ibuf[mst->pos],
205 buf,
206 delta);
207 mst->pos += delta;
208 buf += delta;
209 size -= delta;
210 }
211 if (mst->pos - mst->off < want)
212 {
213 if (purge)
214 {
215 mst->off = 0;
216 mst->pos = 0;
217 }
218 return GNUNET_OK;
219 }
220 if (one_shot == GNUNET_SYSERR) 272 if (one_shot == GNUNET_SYSERR)
221 { 273 {
222 /* cannot call callback again, but return value saying that 274 /* cannot call callback again, but return value saying that
223 * we have another full message in the buffer */ 275 * we have another full message in the buffer */
224 ret = GNUNET_NO; 276 ret = GNUNET_NO;
225 goto copy; 277 goto copy;
226 } 278 }
227 if (one_shot == GNUNET_YES) 279 if (one_shot == GNUNET_YES)
228 one_shot = GNUNET_SYSERR; 280 one_shot = GNUNET_SYSERR;
229 mst->off += want;
230 if (GNUNET_OK != 281 if (GNUNET_OK !=
231 (cbret = mst->cb(mst->cb_cls, 282 (cbret = mst->cb (mst->cb_cls,
232 hdr))) 283 hdr)))
233 { 284 {
234 if (GNUNET_SYSERR == cbret) 285 if (GNUNET_SYSERR == cbret)
235 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, 286 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
236 "Failure processing message of type %u and size %u\n", 287 "Failure processing message of type %u and size %u\n",
237 ntohs(hdr->type), 288 ntohs (hdr->type),
238 ntohs(hdr->size)); 289 ntohs (hdr->size));
239 return GNUNET_SYSERR; 290 return GNUNET_SYSERR;
240 } 291 }
241 if (mst->off == mst->pos) 292 buf += want;
242 { 293 size -= want;
243 /* reset to beginning of buffer, it's free right now! */
244 mst->off = 0;
245 mst->pos = 0;
246 }
247 } 294 }
248 GNUNET_assert(0 == mst->pos); 295 else
249 while (size > 0)
250 { 296 {
251 LOG(GNUNET_ERROR_TYPE_DEBUG, 297 /* need to copy to private buffer to align;
252 "Server-mst has %u bytes left in inbound buffer\n", 298 * yes, we go a bit more spagetti than usual here */
253 (unsigned int)size); 299 goto do_align;
254 if (size < sizeof(struct GNUNET_MessageHeader))
255 break;
256 offset = (unsigned long)buf;
257 need_align = (0 != (offset % ALIGN_FACTOR)) ? GNUNET_YES : GNUNET_NO;
258 if (GNUNET_NO == need_align)
259 {
260 /* can try to do zero-copy and process directly from original buffer */
261 hdr = (const struct GNUNET_MessageHeader *)buf;
262 want = ntohs(hdr->size);
263 if (want < sizeof(struct GNUNET_MessageHeader))
264 {
265 GNUNET_break_op(0);
266 mst->off = 0;
267 return GNUNET_SYSERR;
268 }
269 if (size < want)
270 break; /* or not: buffer incomplete, so copy to private buffer... */
271 if (one_shot == GNUNET_SYSERR)
272 {
273 /* cannot call callback again, but return value saying that
274 * we have another full message in the buffer */
275 ret = GNUNET_NO;
276 goto copy;
277 }
278 if (one_shot == GNUNET_YES)
279 one_shot = GNUNET_SYSERR;
280 if (GNUNET_OK !=
281 (cbret = mst->cb(mst->cb_cls,
282 hdr)))
283 {
284 if (GNUNET_SYSERR == cbret)
285 GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
286 "Failure processing message of type %u and size %u\n",
287 ntohs(hdr->type),
288 ntohs(hdr->size));
289 return GNUNET_SYSERR;
290 }
291 buf += want;
292 size -= want;
293 }
294 else
295 {
296 /* need to copy to private buffer to align;
297 * yes, we go a bit more spagetti than usual here */
298 goto do_align;
299 }
300 } 300 }
301 }
301copy: 302copy:
302 if ((size > 0) && (!purge)) 303 if ((size > 0) && (! purge))
304 {
305 if (size + mst->pos > mst->curr_buf)
303 { 306 {
304 if (size + mst->pos > mst->curr_buf) 307 mst->hdr = GNUNET_realloc (mst->hdr,
305 { 308 size + mst->pos);
306 mst->hdr = GNUNET_realloc(mst->hdr, 309 ibuf = (char *) mst->hdr;
307 size + mst->pos); 310 mst->curr_buf = size + mst->pos;
308 ibuf = (char *)mst->hdr;
309 mst->curr_buf = size + mst->pos;
310 }
311 GNUNET_assert(size + mst->pos <= mst->curr_buf);
312 GNUNET_memcpy(&ibuf[mst->pos],
313 buf,
314 size);
315 mst->pos += size;
316 } 311 }
312 GNUNET_assert (size + mst->pos <= mst->curr_buf);
313 GNUNET_memcpy (&ibuf[mst->pos],
314 buf,
315 size);
316 mst->pos += size;
317 }
317 if (purge) 318 if (purge)
318 { 319 {
319 mst->off = 0; 320 mst->off = 0;
320 mst->pos = 0; 321 mst->pos = 0;
321 } 322 }
322 LOG(GNUNET_ERROR_TYPE_DEBUG, 323 LOG (GNUNET_ERROR_TYPE_DEBUG,
323 "Server-mst leaves %u bytes in private buffer\n", 324 "Server-mst leaves %u bytes in private buffer\n",
324 (unsigned int)(mst->pos - mst->off)); 325 (unsigned int) (mst->pos - mst->off));
325 return ret; 326 return ret;
326} 327}
327 328
@@ -341,40 +342,40 @@ copy:
341 * #GNUNET_SYSERR if the data stream is corrupt 342 * #GNUNET_SYSERR if the data stream is corrupt
342 */ 343 */
343int 344int
344GNUNET_MST_read(struct GNUNET_MessageStreamTokenizer *mst, 345GNUNET_MST_read (struct GNUNET_MessageStreamTokenizer *mst,
345 struct GNUNET_NETWORK_Handle *sock, 346 struct GNUNET_NETWORK_Handle *sock,
346 int purge, 347 int purge,
347 int one_shot) 348 int one_shot)
348{ 349{
349 ssize_t ret; 350 ssize_t ret;
350 size_t left; 351 size_t left;
351 char *buf; 352 char *buf;
352 353
353 left = mst->curr_buf - mst->pos; 354 left = mst->curr_buf - mst->pos;
354 buf = (char *)mst->hdr; 355 buf = (char *) mst->hdr;
355 ret = GNUNET_NETWORK_socket_recv(sock, 356 ret = GNUNET_NETWORK_socket_recv (sock,
356 &buf[mst->pos], 357 &buf[mst->pos],
357 left); 358 left);
358 if (-1 == ret) 359 if (-1 == ret)
359 { 360 {
360 if ((EAGAIN == errno) || 361 if ((EAGAIN == errno) ||
361 (EINTR == errno)) 362 (EINTR == errno))
362 return GNUNET_OK; 363 return GNUNET_OK;
363 GNUNET_log_strerror(GNUNET_ERROR_TYPE_INFO, 364 GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO,
364 "recv"); 365 "recv");
365 return GNUNET_SYSERR; 366 return GNUNET_SYSERR;
366 } 367 }
367 if (0 == ret) 368 if (0 == ret)
368 { 369 {
369 /* other side closed connection, treat as error */ 370 /* other side closed connection, treat as error */
370 return GNUNET_SYSERR; 371 return GNUNET_SYSERR;
371 } 372 }
372 mst->pos += ret; 373 mst->pos += ret;
373 return GNUNET_MST_from_buffer(mst, 374 return GNUNET_MST_from_buffer (mst,
374 NULL, 375 NULL,
375 0, 376 0,
376 purge, 377 purge,
377 one_shot); 378 one_shot);
378} 379}
379 380
380 381
@@ -390,14 +391,14 @@ GNUNET_MST_read(struct GNUNET_MessageStreamTokenizer *mst,
390 * #GNUNET_SYSERR if the data stream is corrupt 391 * #GNUNET_SYSERR if the data stream is corrupt
391 */ 392 */
392int 393int
393GNUNET_MST_next(struct GNUNET_MessageStreamTokenizer *mst, 394GNUNET_MST_next (struct GNUNET_MessageStreamTokenizer *mst,
394 int one_shot) 395 int one_shot)
395{ 396{
396 return GNUNET_MST_from_buffer(mst, 397 return GNUNET_MST_from_buffer (mst,
397 NULL, 398 NULL,
398 0, 399 0,
399 GNUNET_NO, 400 GNUNET_NO,
400 one_shot); 401 one_shot);
401} 402}
402 403
403 404
@@ -407,10 +408,10 @@ GNUNET_MST_next(struct GNUNET_MessageStreamTokenizer *mst,
407 * @param mst tokenizer to destroy 408 * @param mst tokenizer to destroy
408 */ 409 */
409void 410void
410GNUNET_MST_destroy(struct GNUNET_MessageStreamTokenizer *mst) 411GNUNET_MST_destroy (struct GNUNET_MessageStreamTokenizer *mst)
411{ 412{
412 GNUNET_free(mst->hdr); 413 GNUNET_free (mst->hdr);
413 GNUNET_free(mst); 414 GNUNET_free (mst);
414} 415}
415 416
416 417