aboutsummaryrefslogtreecommitdiff
path: root/src/util/mst.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/mst.c')
-rw-r--r--src/util/mst.c463
1 files changed, 230 insertions, 233 deletions
diff --git a/src/util/mst.c b/src/util/mst.c
index bc52f2356..4ccf4988f 100644
--- a/src/util/mst.c
+++ b/src/util/mst.c
@@ -11,12 +11,12 @@
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details. 13 Affero General Public License for more details.
14 14
15 You should have received a copy of the GNU Affero General Public License 15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 17
18 SPDX-License-Identifier: AGPL3.0-or-later 18 SPDX-License-Identifier: AGPL3.0-or-later
19*/ 19 */
20 20
21/** 21/**
22 * @file util/mst.c 22 * @file util/mst.c
@@ -34,15 +34,13 @@
34#define ALIGN_FACTOR 8 34#define ALIGN_FACTOR 8
35#endif 35#endif
36 36
37#define LOG(kind,...) GNUNET_log_from (kind, "util-mst", __VA_ARGS__) 37#define LOG(kind, ...) GNUNET_log_from(kind, "util-mst", __VA_ARGS__)
38 38
39 39
40/** 40/**
41 * Handle to a message stream tokenizer. 41 * Handle to a message stream tokenizer.
42 */ 42 */
43struct GNUNET_MessageStreamTokenizer 43struct GNUNET_MessageStreamTokenizer {
44{
45
46 /** 44 /**
47 * Function to call on completed messages. 45 * Function to call on completed messages.
48 */ 46 */
@@ -72,7 +70,6 @@ struct GNUNET_MessageStreamTokenizer
72 * Beginning of the buffer. Typed like this to force alignment. 70 * Beginning of the buffer. Typed like this to force alignment.
73 */ 71 */
74 struct GNUNET_MessageHeader *hdr; 72 struct GNUNET_MessageHeader *hdr;
75
76}; 73};
77 74
78 75
@@ -84,13 +81,13 @@ struct GNUNET_MessageStreamTokenizer
84 * @return handle to tokenizer 81 * @return handle to tokenizer
85 */ 82 */
86struct GNUNET_MessageStreamTokenizer * 83struct GNUNET_MessageStreamTokenizer *
87GNUNET_MST_create (GNUNET_MessageTokenizerCallback cb, 84GNUNET_MST_create(GNUNET_MessageTokenizerCallback cb,
88 void *cb_cls) 85 void *cb_cls)
89{ 86{
90 struct GNUNET_MessageStreamTokenizer *ret; 87 struct GNUNET_MessageStreamTokenizer *ret;
91 88
92 ret = GNUNET_new (struct GNUNET_MessageStreamTokenizer); 89 ret = GNUNET_new(struct GNUNET_MessageStreamTokenizer);
93 ret->hdr = GNUNET_malloc (GNUNET_MIN_MESSAGE_SIZE); 90 ret->hdr = GNUNET_malloc(GNUNET_MIN_MESSAGE_SIZE);
94 ret->curr_buf = GNUNET_MIN_MESSAGE_SIZE; 91 ret->curr_buf = GNUNET_MIN_MESSAGE_SIZE;
95 ret->cb = cb; 92 ret->cb = cb;
96 ret->cb_cls = cb_cls; 93 ret->cb_cls = cb_cls;
@@ -113,11 +110,11 @@ GNUNET_MST_create (GNUNET_MessageTokenizerCallback cb,
113 * #GNUNET_SYSERR if the data stream is corrupt 110 * #GNUNET_SYSERR if the data stream is corrupt
114 */ 111 */
115int 112int
116GNUNET_MST_from_buffer (struct GNUNET_MessageStreamTokenizer *mst, 113GNUNET_MST_from_buffer(struct GNUNET_MessageStreamTokenizer *mst,
117 const char *buf, 114 const char *buf,
118 size_t size, 115 size_t size,
119 int purge, 116 int purge,
120 int one_shot) 117 int one_shot)
121{ 118{
122 const struct GNUNET_MessageHeader *hdr; 119 const struct GNUNET_MessageHeader *hdr;
123 size_t delta; 120 size_t delta;
@@ -128,203 +125,203 @@ GNUNET_MST_from_buffer (struct GNUNET_MessageStreamTokenizer *mst,
128 int ret; 125 int ret;
129 int cbret; 126 int cbret;
130 127
131 GNUNET_assert (mst->off <= mst->pos); 128 GNUNET_assert(mst->off <= mst->pos);
132 GNUNET_assert (mst->pos <= mst->curr_buf); 129 GNUNET_assert(mst->pos <= mst->curr_buf);
133 LOG (GNUNET_ERROR_TYPE_DEBUG, 130 LOG(GNUNET_ERROR_TYPE_DEBUG,
134 "MST receives %u bytes with %u bytes already in private buffer\n", 131 "MST receives %u bytes with %u bytes already in private buffer\n",
135 (unsigned int) size, 132 (unsigned int)size,
136 (unsigned int) (mst->pos - mst->off)); 133 (unsigned int)(mst->pos - mst->off));
137 ret = GNUNET_OK; 134 ret = GNUNET_OK;
138 ibuf = (char *) mst->hdr; 135 ibuf = (char *)mst->hdr;
139 while (mst->pos > 0) 136 while (mst->pos > 0)
140 {
141do_align:
142 GNUNET_assert (mst->pos >= mst->off);
143 if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) ||
144 (0 != (mst->off % ALIGN_FACTOR)))
145 {
146 /* need to align or need more space */
147 mst->pos -= mst->off;
148 memmove (ibuf,
149 &ibuf[mst->off],
150 mst->pos);
151 mst->off = 0;
152 }
153 if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
154 {
155 delta
156 = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader)
157 - (mst->pos - mst->off),
158 size);
159 GNUNET_memcpy (&ibuf[mst->pos],
160 buf,
161 delta);
162 mst->pos += delta;
163 buf += delta;
164 size -= delta;
165 }
166 if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
167 {
168 if (purge)
169 {
170 mst->off = 0;
171 mst->pos = 0;
172 }
173 return GNUNET_OK;
174 }
175 hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
176 want = ntohs (hdr->size);
177 if (want < sizeof (struct GNUNET_MessageHeader))
178 {
179 GNUNET_break_op (0);
180 return GNUNET_SYSERR;
181 }
182 if ( (mst->curr_buf - mst->off < want) &&
183 (mst->off > 0) )
184 {
185 /* can get more space by moving */
186 mst->pos -= mst->off;
187 memmove (ibuf,
188 &ibuf[mst->off],
189 mst->pos);
190 mst->off = 0;
191 }
192 if (mst->curr_buf < want)
193 {
194 /* need to get more space by growing buffer */
195 GNUNET_assert (0 == mst->off);
196 mst->hdr = GNUNET_realloc (mst->hdr,
197 want);
198 ibuf = (char *) mst->hdr;
199 mst->curr_buf = want;
200 }
201 hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
202 if (mst->pos - mst->off < want)
203 {
204 delta = GNUNET_MIN (want - (mst->pos - mst->off),
205 size);
206 GNUNET_assert (mst->pos + delta <= mst->curr_buf);
207 GNUNET_memcpy (&ibuf[mst->pos],
208 buf,
209 delta);
210 mst->pos += delta;
211 buf += delta;
212 size -= delta;
213 }
214 if (mst->pos - mst->off < want)
215 {
216 if (purge)
217 {
218 mst->off = 0;
219 mst->pos = 0;
220 }
221 return GNUNET_OK;
222 }
223 if (one_shot == GNUNET_SYSERR)
224 { 137 {
225 /* cannot call callback again, but return value saying that 138do_align:
226 * we have another full message in the buffer */ 139 GNUNET_assert(mst->pos >= mst->off);
227 ret = GNUNET_NO; 140 if ((mst->curr_buf - mst->off < sizeof(struct GNUNET_MessageHeader)) ||
228 goto copy; 141 (0 != (mst->off % ALIGN_FACTOR)))
229 } 142 {
230 if (one_shot == GNUNET_YES) 143 /* need to align or need more space */
231 one_shot = GNUNET_SYSERR; 144 mst->pos -= mst->off;
232 mst->off += want; 145 memmove(ibuf,
233 if (GNUNET_OK != 146 &ibuf[mst->off],
234 (cbret = mst->cb (mst->cb_cls, 147 mst->pos);
235 hdr))) 148 mst->off = 0;
236 { 149 }
237 if (GNUNET_SYSERR == cbret) 150 if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
238 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 151 {
239 "Failure processing message of type %u and size %u\n", 152 delta
240 ntohs (hdr->type), 153 = GNUNET_MIN(sizeof(struct GNUNET_MessageHeader)
241 ntohs (hdr->size)); 154 - (mst->pos - mst->off),
242 return GNUNET_SYSERR; 155 size);
243 } 156 GNUNET_memcpy(&ibuf[mst->pos],
244 if (mst->off == mst->pos) 157 buf,
245 { 158 delta);
246 /* reset to beginning of buffer, it's free right now! */ 159 mst->pos += delta;
247 mst->off = 0; 160 buf += delta;
248 mst->pos = 0; 161 size -= delta;
249 } 162 }
250 } 163 if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
251 GNUNET_assert (0 == mst->pos); 164 {
252 while (size > 0) 165 if (purge)
253 { 166 {
254 LOG (GNUNET_ERROR_TYPE_DEBUG, 167 mst->off = 0;
255 "Server-mst has %u bytes left in inbound buffer\n", 168 mst->pos = 0;
256 (unsigned int) size); 169 }
257 if (size < sizeof (struct GNUNET_MessageHeader)) 170 return GNUNET_OK;
258 break; 171 }
259 offset = (unsigned long) buf; 172 hdr = (const struct GNUNET_MessageHeader *)&ibuf[mst->off];
260 need_align = (0 != (offset % ALIGN_FACTOR)) ? GNUNET_YES : GNUNET_NO; 173 want = ntohs(hdr->size);
261 if (GNUNET_NO == need_align) 174 if (want < sizeof(struct GNUNET_MessageHeader))
262 { 175 {
263 /* can try to do zero-copy and process directly from original buffer */ 176 GNUNET_break_op(0);
264 hdr = (const struct GNUNET_MessageHeader *) buf; 177 return GNUNET_SYSERR;
265 want = ntohs (hdr->size); 178 }
266 if (want < sizeof (struct GNUNET_MessageHeader)) 179 if ((mst->curr_buf - mst->off < want) &&
267 { 180 (mst->off > 0))
268 GNUNET_break_op (0); 181 {
269 mst->off = 0; 182 /* can get more space by moving */
270 return GNUNET_SYSERR; 183 mst->pos -= mst->off;
271 } 184 memmove(ibuf,
272 if (size < want) 185 &ibuf[mst->off],
273 break; /* or not: buffer incomplete, so copy to private buffer... */ 186 mst->pos);
187 mst->off = 0;
188 }
189 if (mst->curr_buf < want)
190 {
191 /* need to get more space by growing buffer */
192 GNUNET_assert(0 == mst->off);
193 mst->hdr = GNUNET_realloc(mst->hdr,
194 want);
195 ibuf = (char *)mst->hdr;
196 mst->curr_buf = want;
197 }
198 hdr = (const struct GNUNET_MessageHeader *)&ibuf[mst->off];
199 if (mst->pos - mst->off < want)
200 {
201 delta = GNUNET_MIN(want - (mst->pos - mst->off),
202 size);
203 GNUNET_assert(mst->pos + delta <= mst->curr_buf);
204 GNUNET_memcpy(&ibuf[mst->pos],
205 buf,
206 delta);
207 mst->pos += delta;
208 buf += delta;
209 size -= delta;
210 }
211 if (mst->pos - mst->off < want)
212 {
213 if (purge)
214 {
215 mst->off = 0;
216 mst->pos = 0;
217 }
218 return GNUNET_OK;
219 }
274 if (one_shot == GNUNET_SYSERR) 220 if (one_shot == GNUNET_SYSERR)
275 { 221 {
276 /* cannot call callback again, but return value saying that 222 /* cannot call callback again, but return value saying that
277 * we have another full message in the buffer */ 223 * we have another full message in the buffer */
278 ret = GNUNET_NO; 224 ret = GNUNET_NO;
279 goto copy; 225 goto copy;
280 } 226 }
281 if (one_shot == GNUNET_YES) 227 if (one_shot == GNUNET_YES)
282 one_shot = GNUNET_SYSERR; 228 one_shot = GNUNET_SYSERR;
229 mst->off += want;
283 if (GNUNET_OK != 230 if (GNUNET_OK !=
284 (cbret = mst->cb (mst->cb_cls, 231 (cbret = mst->cb(mst->cb_cls,
285 hdr))) 232 hdr)))
286 { 233 {
287 if (GNUNET_SYSERR == cbret) 234 if (GNUNET_SYSERR == cbret)
288 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 235 GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
289 "Failure processing message of type %u and size %u\n", 236 "Failure processing message of type %u and size %u\n",
290 ntohs (hdr->type), 237 ntohs(hdr->type),
291 ntohs (hdr->size)); 238 ntohs(hdr->size));
292 return GNUNET_SYSERR; 239 return GNUNET_SYSERR;
293 } 240 }
294 buf += want; 241 if (mst->off == mst->pos)
295 size -= want; 242 {
243 /* reset to beginning of buffer, it's free right now! */
244 mst->off = 0;
245 mst->pos = 0;
246 }
296 } 247 }
297 else 248 GNUNET_assert(0 == mst->pos);
249 while (size > 0)
298 { 250 {
299 /* need to copy to private buffer to align; 251 LOG(GNUNET_ERROR_TYPE_DEBUG,
300 * yes, we go a bit more spagetti than usual here */ 252 "Server-mst has %u bytes left in inbound buffer\n",
301 goto do_align; 253 (unsigned int)size);
254 if (size < sizeof(struct GNUNET_MessageHeader))
255 break;
256 offset = (unsigned long)buf;
257 need_align = (0 != (offset % ALIGN_FACTOR)) ? GNUNET_YES : GNUNET_NO;
258 if (GNUNET_NO == need_align)
259 {
260 /* can try to do zero-copy and process directly from original buffer */
261 hdr = (const struct GNUNET_MessageHeader *)buf;
262 want = ntohs(hdr->size);
263 if (want < sizeof(struct GNUNET_MessageHeader))
264 {
265 GNUNET_break_op(0);
266 mst->off = 0;
267 return GNUNET_SYSERR;
268 }
269 if (size < want)
270 break; /* or not: buffer incomplete, so copy to private buffer... */
271 if (one_shot == GNUNET_SYSERR)
272 {
273 /* cannot call callback again, but return value saying that
274 * we have another full message in the buffer */
275 ret = GNUNET_NO;
276 goto copy;
277 }
278 if (one_shot == GNUNET_YES)
279 one_shot = GNUNET_SYSERR;
280 if (GNUNET_OK !=
281 (cbret = mst->cb(mst->cb_cls,
282 hdr)))
283 {
284 if (GNUNET_SYSERR == cbret)
285 GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
286 "Failure processing message of type %u and size %u\n",
287 ntohs(hdr->type),
288 ntohs(hdr->size));
289 return GNUNET_SYSERR;
290 }
291 buf += want;
292 size -= want;
293 }
294 else
295 {
296 /* need to copy to private buffer to align;
297 * yes, we go a bit more spagetti than usual here */
298 goto do_align;
299 }
302 } 300 }
303 }
304copy: 301copy:
305 if ((size > 0) && (!purge)) 302 if ((size > 0) && (!purge))
306 {
307 if (size + mst->pos > mst->curr_buf)
308 { 303 {
309 mst->hdr = GNUNET_realloc (mst->hdr, 304 if (size + mst->pos > mst->curr_buf)
310 size + mst->pos); 305 {
311 ibuf = (char *) mst->hdr; 306 mst->hdr = GNUNET_realloc(mst->hdr,
312 mst->curr_buf = size + mst->pos; 307 size + mst->pos);
308 ibuf = (char *)mst->hdr;
309 mst->curr_buf = size + mst->pos;
310 }
311 GNUNET_assert(size + mst->pos <= mst->curr_buf);
312 GNUNET_memcpy(&ibuf[mst->pos],
313 buf,
314 size);
315 mst->pos += size;
313 } 316 }
314 GNUNET_assert (size + mst->pos <= mst->curr_buf);
315 GNUNET_memcpy (&ibuf[mst->pos],
316 buf,
317 size);
318 mst->pos += size;
319 }
320 if (purge) 317 if (purge)
321 { 318 {
322 mst->off = 0; 319 mst->off = 0;
323 mst->pos = 0; 320 mst->pos = 0;
324 } 321 }
325 LOG (GNUNET_ERROR_TYPE_DEBUG, 322 LOG(GNUNET_ERROR_TYPE_DEBUG,
326 "Server-mst leaves %u bytes in private buffer\n", 323 "Server-mst leaves %u bytes in private buffer\n",
327 (unsigned int) (mst->pos - mst->off)); 324 (unsigned int)(mst->pos - mst->off));
328 return ret; 325 return ret;
329} 326}
330 327
@@ -344,40 +341,40 @@ copy:
344 * #GNUNET_SYSERR if the data stream is corrupt 341 * #GNUNET_SYSERR if the data stream is corrupt
345 */ 342 */
346int 343int
347GNUNET_MST_read (struct GNUNET_MessageStreamTokenizer *mst, 344GNUNET_MST_read(struct GNUNET_MessageStreamTokenizer *mst,
348 struct GNUNET_NETWORK_Handle *sock, 345 struct GNUNET_NETWORK_Handle *sock,
349 int purge, 346 int purge,
350 int one_shot) 347 int one_shot)
351{ 348{
352 ssize_t ret; 349 ssize_t ret;
353 size_t left; 350 size_t left;
354 char *buf; 351 char *buf;
355 352
356 left = mst->curr_buf - mst->pos; 353 left = mst->curr_buf - mst->pos;
357 buf = (char *) mst->hdr; 354 buf = (char *)mst->hdr;
358 ret = GNUNET_NETWORK_socket_recv (sock, 355 ret = GNUNET_NETWORK_socket_recv(sock,
359 &buf[mst->pos], 356 &buf[mst->pos],
360 left); 357 left);
361 if (-1 == ret) 358 if (-1 == ret)
362 { 359 {
363 if ( (EAGAIN == errno) || 360 if ((EAGAIN == errno) ||
364 (EINTR == errno) ) 361 (EINTR == errno))
365 return GNUNET_OK; 362 return GNUNET_OK;
366 GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO, 363 GNUNET_log_strerror(GNUNET_ERROR_TYPE_INFO,
367 "recv"); 364 "recv");
368 return GNUNET_SYSERR; 365 return GNUNET_SYSERR;
369 } 366 }
370 if (0 == ret) 367 if (0 == ret)
371 { 368 {
372 /* other side closed connection, treat as error */ 369 /* other side closed connection, treat as error */
373 return GNUNET_SYSERR; 370 return GNUNET_SYSERR;
374 } 371 }
375 mst->pos += ret; 372 mst->pos += ret;
376 return GNUNET_MST_from_buffer (mst, 373 return GNUNET_MST_from_buffer(mst,
377 NULL, 374 NULL,
378 0, 375 0,
379 purge, 376 purge,
380 one_shot); 377 one_shot);
381} 378}
382 379
383 380
@@ -393,14 +390,14 @@ GNUNET_MST_read (struct GNUNET_MessageStreamTokenizer *mst,
393 * #GNUNET_SYSERR if the data stream is corrupt 390 * #GNUNET_SYSERR if the data stream is corrupt
394 */ 391 */
395int 392int
396GNUNET_MST_next (struct GNUNET_MessageStreamTokenizer *mst, 393GNUNET_MST_next(struct GNUNET_MessageStreamTokenizer *mst,
397 int one_shot) 394 int one_shot)
398{ 395{
399 return GNUNET_MST_from_buffer (mst, 396 return GNUNET_MST_from_buffer(mst,
400 NULL, 397 NULL,
401 0, 398 0,
402 GNUNET_NO, 399 GNUNET_NO,
403 one_shot); 400 one_shot);
404} 401}
405 402
406 403
@@ -410,10 +407,10 @@ GNUNET_MST_next (struct GNUNET_MessageStreamTokenizer *mst,
410 * @param mst tokenizer to destroy 407 * @param mst tokenizer to destroy
411 */ 408 */
412void 409void
413GNUNET_MST_destroy (struct GNUNET_MessageStreamTokenizer *mst) 410GNUNET_MST_destroy(struct GNUNET_MessageStreamTokenizer *mst)
414{ 411{
415 GNUNET_free (mst->hdr); 412 GNUNET_free(mst->hdr);
416 GNUNET_free (mst); 413 GNUNET_free(mst);
417} 414}
418 415
419 416