diff options
Diffstat (limited to 'src/util/server_mst.c')
-rw-r--r-- | src/util/server_mst.c | 297 |
1 files changed, 143 insertions, 154 deletions
diff --git a/src/util/server_mst.c b/src/util/server_mst.c index 835d8eeba..f48ef5475 100644 --- a/src/util/server_mst.c +++ b/src/util/server_mst.c | |||
@@ -50,7 +50,7 @@ struct GNUNET_SERVER_MessageStreamTokenizer | |||
50 | * Function to call on completed messages. | 50 | * Function to call on completed messages. |
51 | */ | 51 | */ |
52 | GNUNET_SERVER_MessageTokenizerCallback cb; | 52 | GNUNET_SERVER_MessageTokenizerCallback cb; |
53 | 53 | ||
54 | /** | 54 | /** |
55 | * Closure for cb. | 55 | * Closure for cb. |
56 | */ | 56 | */ |
@@ -89,12 +89,12 @@ struct GNUNET_SERVER_MessageStreamTokenizer | |||
89 | */ | 89 | */ |
90 | struct GNUNET_SERVER_MessageStreamTokenizer * | 90 | struct GNUNET_SERVER_MessageStreamTokenizer * |
91 | GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb, | 91 | GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb, |
92 | void *cb_cls) | 92 | void *cb_cls) |
93 | { | 93 | { |
94 | struct GNUNET_SERVER_MessageStreamTokenizer *ret; | 94 | struct GNUNET_SERVER_MessageStreamTokenizer *ret; |
95 | 95 | ||
96 | ret = GNUNET_malloc (sizeof (struct GNUNET_SERVER_MessageStreamTokenizer)); | 96 | ret = GNUNET_malloc (sizeof (struct GNUNET_SERVER_MessageStreamTokenizer)); |
97 | ret->hdr = GNUNET_malloc(GNUNET_SERVER_MIN_BUFFER_SIZE); | 97 | ret->hdr = GNUNET_malloc (GNUNET_SERVER_MIN_BUFFER_SIZE); |
98 | ret->curr_buf = GNUNET_SERVER_MIN_BUFFER_SIZE; | 98 | ret->curr_buf = GNUNET_SERVER_MIN_BUFFER_SIZE; |
99 | ret->cb = cb; | 99 | ret->cb = cb; |
100 | ret->cb_cls = cb_cls; | 100 | ret->cb_cls = cb_cls; |
@@ -119,11 +119,9 @@ GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb, | |||
119 | */ | 119 | */ |
120 | int | 120 | int |
121 | GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, | 121 | GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, |
122 | void *client_identity, | 122 | void *client_identity, |
123 | const char *buf, | 123 | const char *buf, |
124 | size_t size, | 124 | size_t size, int purge, int one_shot) |
125 | int purge, | ||
126 | int one_shot) | ||
127 | { | 125 | { |
128 | const struct GNUNET_MessageHeader *hdr; | 126 | const struct GNUNET_MessageHeader *hdr; |
129 | size_t delta; | 127 | size_t delta; |
@@ -135,169 +133,160 @@ GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, | |||
135 | 133 | ||
136 | #if DEBUG_SERVER_MST | 134 | #if DEBUG_SERVER_MST |
137 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 135 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
138 | "Server-mst receives %u bytes with %u bytes already in private buffer\n", | 136 | "Server-mst receives %u bytes with %u bytes already in private buffer\n", |
139 | (unsigned int) size, | 137 | (unsigned int) size, (unsigned int) (mst->pos - mst->off)); |
140 | (unsigned int) (mst->pos - mst->off)); | ||
141 | #endif | 138 | #endif |
142 | ret = GNUNET_OK; | 139 | ret = GNUNET_OK; |
143 | ibuf = (char*)mst->hdr; | 140 | ibuf = (char *) mst->hdr; |
144 | while (mst->pos > 0) | 141 | while (mst->pos > 0) |
142 | { | ||
143 | do_align: | ||
144 | if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) || | ||
145 | (0 != (mst->off % ALIGN_FACTOR))) | ||
146 | { | ||
147 | /* need to align or need more space */ | ||
148 | mst->pos -= mst->off; | ||
149 | memmove (ibuf, &ibuf[mst->off], mst->pos); | ||
150 | mst->off = 0; | ||
151 | } | ||
152 | if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) | ||
153 | { | ||
154 | delta = | ||
155 | GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - | ||
156 | (mst->pos - mst->off), size); | ||
157 | memcpy (&ibuf[mst->pos], buf, delta); | ||
158 | mst->pos += delta; | ||
159 | buf += delta; | ||
160 | size -= delta; | ||
161 | } | ||
162 | if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) | ||
163 | { | ||
164 | if (purge) | ||
165 | { | ||
166 | mst->off = 0; | ||
167 | mst->pos = 0; | ||
168 | } | ||
169 | return GNUNET_OK; | ||
170 | } | ||
171 | hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; | ||
172 | want = ntohs (hdr->size); | ||
173 | if (want < sizeof (struct GNUNET_MessageHeader)) | ||
145 | { | 174 | { |
146 | do_align: | 175 | GNUNET_break_op (0); |
147 | if ( (mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) || | 176 | return GNUNET_SYSERR; |
148 | (0 != (mst->off % ALIGN_FACTOR)) ) | 177 | } |
149 | { | 178 | if (mst->curr_buf - mst->off < want) |
150 | /* need to align or need more space */ | 179 | { |
151 | mst->pos -= mst->off; | 180 | /* need more space */ |
152 | memmove (ibuf, | 181 | mst->pos -= mst->off; |
153 | &ibuf[mst->off], | 182 | memmove (ibuf, &ibuf[mst->off], mst->pos); |
154 | mst->pos); | 183 | mst->off = 0; |
155 | mst->off = 0; | 184 | } |
156 | } | 185 | if (want > mst->curr_buf) |
157 | if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) | 186 | { |
158 | { | 187 | mst->hdr = GNUNET_realloc (mst->hdr, want); |
159 | delta = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - (mst->pos - mst->off), | 188 | ibuf = (char *) mst->hdr; |
160 | size); | 189 | mst->curr_buf = want; |
161 | memcpy (&ibuf[mst->pos], | 190 | } |
162 | buf, | 191 | hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; |
163 | delta); | 192 | if (mst->pos - mst->off < want) |
164 | mst->pos += delta; | 193 | { |
165 | buf += delta; | 194 | delta = GNUNET_MIN (want - (mst->pos - mst->off), size); |
166 | size -= delta; | 195 | memcpy (&ibuf[mst->pos], buf, delta); |
167 | } | 196 | mst->pos += delta; |
168 | if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) | 197 | buf += delta; |
169 | { | 198 | size -= delta; |
170 | if (purge) | 199 | } |
171 | { | 200 | if (mst->pos - mst->off < want) |
172 | mst->off = 0; | 201 | { |
173 | mst->pos = 0; | 202 | if (purge) |
174 | } | 203 | { |
175 | return GNUNET_OK; | 204 | mst->off = 0; |
176 | } | 205 | mst->pos = 0; |
177 | hdr = (const struct GNUNET_MessageHeader*) &ibuf[mst->off]; | 206 | } |
207 | return GNUNET_OK; | ||
208 | } | ||
209 | if (one_shot == GNUNET_SYSERR) | ||
210 | { | ||
211 | /* cannot call callback again, but return value saying that | ||
212 | * we have another full message in the buffer */ | ||
213 | ret = GNUNET_NO; | ||
214 | goto copy; | ||
215 | } | ||
216 | if (one_shot == GNUNET_YES) | ||
217 | one_shot = GNUNET_SYSERR; | ||
218 | mst->cb (mst->cb_cls, client_identity, hdr); | ||
219 | mst->off += want; | ||
220 | if (mst->off == mst->pos) | ||
221 | { | ||
222 | /* reset to beginning of buffer, it's free right now! */ | ||
223 | mst->off = 0; | ||
224 | mst->pos = 0; | ||
225 | } | ||
226 | } | ||
227 | while (size > 0) | ||
228 | { | ||
229 | #if DEBUG_SERVER_MST | ||
230 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
231 | "Server-mst has %u bytes left in inbound buffer\n", | ||
232 | (unsigned int) size); | ||
233 | #endif | ||
234 | if (size < sizeof (struct GNUNET_MessageHeader)) | ||
235 | break; | ||
236 | offset = (unsigned long) buf; | ||
237 | need_align = (0 != offset % ALIGN_FACTOR) ? GNUNET_YES : GNUNET_NO; | ||
238 | if (GNUNET_NO == need_align) | ||
239 | { | ||
240 | /* can try to do zero-copy and process directly from original buffer */ | ||
241 | hdr = (const struct GNUNET_MessageHeader *) buf; | ||
178 | want = ntohs (hdr->size); | 242 | want = ntohs (hdr->size); |
179 | if (want < sizeof (struct GNUNET_MessageHeader)) | 243 | if (want < sizeof (struct GNUNET_MessageHeader)) |
180 | { | 244 | { |
181 | GNUNET_break_op (0); | 245 | GNUNET_break_op (0); |
182 | return GNUNET_SYSERR; | 246 | mst->off = 0; |
183 | } | 247 | return GNUNET_SYSERR; |
184 | if (mst->curr_buf - mst->off < want) | 248 | } |
185 | { | 249 | if (size < want) |
186 | /* need more space */ | 250 | break; /* or not, buffer incomplete, so copy to private buffer... */ |
187 | mst->pos -= mst->off; | ||
188 | memmove (ibuf, | ||
189 | &ibuf[mst->off], | ||
190 | mst->pos); | ||
191 | mst->off = 0; | ||
192 | } | ||
193 | if (want > mst->curr_buf) | ||
194 | { | ||
195 | mst->hdr = GNUNET_realloc(mst->hdr, want); | ||
196 | ibuf = (char*)mst->hdr; | ||
197 | mst->curr_buf = want; | ||
198 | } | ||
199 | hdr = (const struct GNUNET_MessageHeader*) &ibuf[mst->off]; | ||
200 | if (mst->pos - mst->off < want) | ||
201 | { | ||
202 | delta = GNUNET_MIN (want - (mst->pos - mst->off), | ||
203 | size); | ||
204 | memcpy (&ibuf[mst->pos], | ||
205 | buf, | ||
206 | delta); | ||
207 | mst->pos += delta; | ||
208 | buf += delta; | ||
209 | size -= delta; | ||
210 | } | ||
211 | if (mst->pos - mst->off < want) | ||
212 | { | ||
213 | if (purge) | ||
214 | { | ||
215 | mst->off = 0; | ||
216 | mst->pos = 0; | ||
217 | } | ||
218 | return GNUNET_OK; | ||
219 | } | ||
220 | if (one_shot == GNUNET_SYSERR) | 251 | if (one_shot == GNUNET_SYSERR) |
221 | { | 252 | { |
222 | /* cannot call callback again, but return value saying that | 253 | /* cannot call callback again, but return value saying that |
223 | we have another full message in the buffer */ | 254 | * we have another full message in the buffer */ |
224 | ret = GNUNET_NO; | 255 | ret = GNUNET_NO; |
225 | goto copy; | 256 | goto copy; |
226 | } | 257 | } |
227 | if (one_shot == GNUNET_YES) | 258 | if (one_shot == GNUNET_YES) |
228 | one_shot = GNUNET_SYSERR; | 259 | one_shot = GNUNET_SYSERR; |
229 | mst->cb (mst->cb_cls, client_identity, hdr); | 260 | mst->cb (mst->cb_cls, client_identity, hdr); |
230 | mst->off += want; | 261 | buf += want; |
231 | if (mst->off == mst->pos) | 262 | size -= want; |
232 | { | ||
233 | /* reset to beginning of buffer, it's free right now! */ | ||
234 | mst->off = 0; | ||
235 | mst->pos = 0; | ||
236 | } | ||
237 | } | 263 | } |
238 | while (size > 0) | 264 | else |
239 | { | 265 | { |
240 | #if DEBUG_SERVER_MST | 266 | /* need to copy to private buffer to align; |
241 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 267 | * yes, we go a bit more spagetti than usual here */ |
242 | "Server-mst has %u bytes left in inbound buffer\n", | 268 | goto do_align; |
243 | (unsigned int) size); | ||
244 | #endif | ||
245 | if (size < sizeof (struct GNUNET_MessageHeader)) | ||
246 | break; | ||
247 | offset = (unsigned long) buf; | ||
248 | need_align = (0 != offset % ALIGN_FACTOR) ? GNUNET_YES : GNUNET_NO; | ||
249 | if (GNUNET_NO == need_align) | ||
250 | { | ||
251 | /* can try to do zero-copy and process directly from original buffer */ | ||
252 | hdr = (const struct GNUNET_MessageHeader *) buf; | ||
253 | want = ntohs (hdr->size); | ||
254 | if (want < sizeof (struct GNUNET_MessageHeader)) | ||
255 | { | ||
256 | GNUNET_break_op (0); | ||
257 | mst->off = 0; | ||
258 | return GNUNET_SYSERR; | ||
259 | } | ||
260 | if (size < want) | ||
261 | break; /* or not, buffer incomplete, so copy to private buffer... */ | ||
262 | if (one_shot == GNUNET_SYSERR) | ||
263 | { | ||
264 | /* cannot call callback again, but return value saying that | ||
265 | we have another full message in the buffer */ | ||
266 | ret = GNUNET_NO; | ||
267 | goto copy; | ||
268 | } | ||
269 | if (one_shot == GNUNET_YES) | ||
270 | one_shot = GNUNET_SYSERR; | ||
271 | mst->cb (mst->cb_cls, client_identity, hdr); | ||
272 | buf += want; | ||
273 | size -= want; | ||
274 | } | ||
275 | else | ||
276 | { | ||
277 | /* need to copy to private buffer to align; | ||
278 | yes, we go a bit more spagetti than usual here */ | ||
279 | goto do_align; | ||
280 | } | ||
281 | } | 269 | } |
282 | copy: | 270 | } |
283 | if ( (size > 0) && (! purge) ) | 271 | copy: |
272 | if ((size > 0) && (!purge)) | ||
273 | { | ||
274 | if (size + mst->pos > mst->curr_buf) | ||
284 | { | 275 | { |
285 | if (size + mst->pos > mst->curr_buf) | 276 | mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos); |
286 | { | 277 | ibuf = (char *) mst->hdr; |
287 | mst->hdr = GNUNET_realloc(mst->hdr, size + mst->pos); | 278 | mst->curr_buf = size + mst->pos; |
288 | ibuf = (char*)mst->hdr; | ||
289 | mst->curr_buf = size + mst->pos; | ||
290 | } | ||
291 | GNUNET_assert (mst->pos + size <= mst->curr_buf); | ||
292 | memcpy (&ibuf[mst->pos], buf, size); | ||
293 | mst->pos += size; | ||
294 | } | 279 | } |
280 | GNUNET_assert (mst->pos + size <= mst->curr_buf); | ||
281 | memcpy (&ibuf[mst->pos], buf, size); | ||
282 | mst->pos += size; | ||
283 | } | ||
295 | if (purge) | 284 | if (purge) |
296 | mst->off = 0; | 285 | mst->off = 0; |
297 | #if DEBUG_SERVER_MST | 286 | #if DEBUG_SERVER_MST |
298 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 287 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
299 | "Server-mst leaves %u bytes in private buffer\n", | 288 | "Server-mst leaves %u bytes in private buffer\n", |
300 | (unsigned int) (mst->pos - mst->off)); | 289 | (unsigned int) (mst->pos - mst->off)); |
301 | #endif | 290 | #endif |
302 | return ret; | 291 | return ret; |
303 | } | 292 | } |