diff options
Diffstat (limited to 'src/util/server_mst.c')
-rw-r--r-- | src/util/server_mst.c | 264 |
1 files changed, 132 insertions, 132 deletions
diff --git a/src/util/server_mst.c b/src/util/server_mst.c index 6fd2647d7..dcb8c4810 100644 --- a/src/util/server_mst.c +++ b/src/util/server_mst.c | |||
@@ -91,7 +91,7 @@ struct GNUNET_SERVER_MessageStreamTokenizer | |||
91 | */ | 91 | */ |
92 | struct GNUNET_SERVER_MessageStreamTokenizer * | 92 | struct GNUNET_SERVER_MessageStreamTokenizer * |
93 | GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb, | 93 | GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb, |
94 | void *cb_cls) | 94 | void *cb_cls) |
95 | { | 95 | { |
96 | struct GNUNET_SERVER_MessageStreamTokenizer *ret; | 96 | struct GNUNET_SERVER_MessageStreamTokenizer *ret; |
97 | 97 | ||
@@ -121,8 +121,8 @@ GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb, | |||
121 | */ | 121 | */ |
122 | int | 122 | int |
123 | GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, | 123 | GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, |
124 | void *client_identity, const char *buf, | 124 | void *client_identity, const char *buf, size_t size, |
125 | size_t size, int purge, int one_shot) | 125 | int purge, int one_shot) |
126 | { | 126 | { |
127 | const struct GNUNET_MessageHeader *hdr; | 127 | const struct GNUNET_MessageHeader *hdr; |
128 | size_t delta; | 128 | size_t delta; |
@@ -140,148 +140,148 @@ GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, | |||
140 | ret = GNUNET_OK; | 140 | ret = GNUNET_OK; |
141 | ibuf = (char *) mst->hdr; | 141 | ibuf = (char *) mst->hdr; |
142 | while (mst->pos > 0) | 142 | while (mst->pos > 0) |
143 | { | ||
144 | do_align: | ||
145 | if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) || | ||
146 | (0 != (mst->off % ALIGN_FACTOR))) | ||
143 | { | 147 | { |
144 | do_align: | 148 | /* need to align or need more space */ |
145 | if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) || | 149 | mst->pos -= mst->off; |
146 | (0 != (mst->off % ALIGN_FACTOR))) | 150 | memmove (ibuf, &ibuf[mst->off], mst->pos); |
147 | { | 151 | mst->off = 0; |
148 | /* need to align or need more space */ | 152 | } |
149 | mst->pos -= mst->off; | 153 | if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) |
150 | memmove (ibuf, &ibuf[mst->off], mst->pos); | 154 | { |
151 | mst->off = 0; | 155 | delta = |
152 | } | 156 | GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - |
153 | if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) | 157 | (mst->pos - mst->off), size); |
154 | { | 158 | memcpy (&ibuf[mst->pos], buf, delta); |
155 | delta = | 159 | mst->pos += delta; |
156 | GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - | 160 | buf += delta; |
157 | (mst->pos - mst->off), size); | 161 | size -= delta; |
158 | memcpy (&ibuf[mst->pos], buf, delta); | 162 | } |
159 | mst->pos += delta; | 163 | if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) |
160 | buf += delta; | 164 | { |
161 | size -= delta; | 165 | if (purge) |
162 | } | 166 | { |
163 | if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) | 167 | mst->off = 0; |
164 | { | 168 | mst->pos = 0; |
165 | if (purge) | 169 | } |
166 | { | 170 | return GNUNET_OK; |
167 | mst->off = 0; | 171 | } |
168 | mst->pos = 0; | 172 | hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; |
169 | } | 173 | want = ntohs (hdr->size); |
170 | return GNUNET_OK; | 174 | if (want < sizeof (struct GNUNET_MessageHeader)) |
171 | } | 175 | { |
172 | hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; | 176 | GNUNET_break_op (0); |
177 | return GNUNET_SYSERR; | ||
178 | } | ||
179 | if (mst->curr_buf - mst->off < want) | ||
180 | { | ||
181 | /* need more space */ | ||
182 | mst->pos -= mst->off; | ||
183 | memmove (ibuf, &ibuf[mst->off], mst->pos); | ||
184 | mst->off = 0; | ||
185 | } | ||
186 | if (want > mst->curr_buf) | ||
187 | { | ||
188 | mst->hdr = GNUNET_realloc (mst->hdr, want); | ||
189 | ibuf = (char *) mst->hdr; | ||
190 | mst->curr_buf = want; | ||
191 | } | ||
192 | hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; | ||
193 | if (mst->pos - mst->off < want) | ||
194 | { | ||
195 | delta = GNUNET_MIN (want - (mst->pos - mst->off), size); | ||
196 | memcpy (&ibuf[mst->pos], buf, delta); | ||
197 | mst->pos += delta; | ||
198 | buf += delta; | ||
199 | size -= delta; | ||
200 | } | ||
201 | if (mst->pos - mst->off < want) | ||
202 | { | ||
203 | if (purge) | ||
204 | { | ||
205 | mst->off = 0; | ||
206 | mst->pos = 0; | ||
207 | } | ||
208 | return GNUNET_OK; | ||
209 | } | ||
210 | if (one_shot == GNUNET_SYSERR) | ||
211 | { | ||
212 | /* cannot call callback again, but return value saying that | ||
213 | * we have another full message in the buffer */ | ||
214 | ret = GNUNET_NO; | ||
215 | goto copy; | ||
216 | } | ||
217 | if (one_shot == GNUNET_YES) | ||
218 | one_shot = GNUNET_SYSERR; | ||
219 | mst->cb (mst->cb_cls, client_identity, hdr); | ||
220 | mst->off += want; | ||
221 | if (mst->off == mst->pos) | ||
222 | { | ||
223 | /* reset to beginning of buffer, it's free right now! */ | ||
224 | mst->off = 0; | ||
225 | mst->pos = 0; | ||
226 | } | ||
227 | } | ||
228 | while (size > 0) | ||
229 | { | ||
230 | #if DEBUG_SERVER_MST | ||
231 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
232 | "Server-mst has %u bytes left in inbound buffer\n", | ||
233 | (unsigned int) size); | ||
234 | #endif | ||
235 | if (size < sizeof (struct GNUNET_MessageHeader)) | ||
236 | break; | ||
237 | offset = (unsigned long) buf; | ||
238 | need_align = (0 != offset % ALIGN_FACTOR) ? GNUNET_YES : GNUNET_NO; | ||
239 | if (GNUNET_NO == need_align) | ||
240 | { | ||
241 | /* can try to do zero-copy and process directly from original buffer */ | ||
242 | hdr = (const struct GNUNET_MessageHeader *) buf; | ||
173 | want = ntohs (hdr->size); | 243 | want = ntohs (hdr->size); |
174 | if (want < sizeof (struct GNUNET_MessageHeader)) | 244 | if (want < sizeof (struct GNUNET_MessageHeader)) |
175 | { | 245 | { |
176 | GNUNET_break_op (0); | 246 | GNUNET_break_op (0); |
177 | return GNUNET_SYSERR; | 247 | mst->off = 0; |
178 | } | 248 | return GNUNET_SYSERR; |
179 | if (mst->curr_buf - mst->off < want) | 249 | } |
180 | { | 250 | if (size < want) |
181 | /* need more space */ | 251 | break; /* or not, buffer incomplete, so copy to private buffer... */ |
182 | mst->pos -= mst->off; | ||
183 | memmove (ibuf, &ibuf[mst->off], mst->pos); | ||
184 | mst->off = 0; | ||
185 | } | ||
186 | if (want > mst->curr_buf) | ||
187 | { | ||
188 | mst->hdr = GNUNET_realloc (mst->hdr, want); | ||
189 | ibuf = (char *) mst->hdr; | ||
190 | mst->curr_buf = want; | ||
191 | } | ||
192 | hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; | ||
193 | if (mst->pos - mst->off < want) | ||
194 | { | ||
195 | delta = GNUNET_MIN (want - (mst->pos - mst->off), size); | ||
196 | memcpy (&ibuf[mst->pos], buf, delta); | ||
197 | mst->pos += delta; | ||
198 | buf += delta; | ||
199 | size -= delta; | ||
200 | } | ||
201 | if (mst->pos - mst->off < want) | ||
202 | { | ||
203 | if (purge) | ||
204 | { | ||
205 | mst->off = 0; | ||
206 | mst->pos = 0; | ||
207 | } | ||
208 | return GNUNET_OK; | ||
209 | } | ||
210 | if (one_shot == GNUNET_SYSERR) | 252 | if (one_shot == GNUNET_SYSERR) |
211 | { | 253 | { |
212 | /* cannot call callback again, but return value saying that | 254 | /* cannot call callback again, but return value saying that |
213 | * we have another full message in the buffer */ | 255 | * we have another full message in the buffer */ |
214 | ret = GNUNET_NO; | 256 | ret = GNUNET_NO; |
215 | goto copy; | 257 | goto copy; |
216 | } | 258 | } |
217 | if (one_shot == GNUNET_YES) | 259 | if (one_shot == GNUNET_YES) |
218 | one_shot = GNUNET_SYSERR; | 260 | one_shot = GNUNET_SYSERR; |
219 | mst->cb (mst->cb_cls, client_identity, hdr); | 261 | mst->cb (mst->cb_cls, client_identity, hdr); |
220 | mst->off += want; | 262 | buf += want; |
221 | if (mst->off == mst->pos) | 263 | size -= want; |
222 | { | ||
223 | /* reset to beginning of buffer, it's free right now! */ | ||
224 | mst->off = 0; | ||
225 | mst->pos = 0; | ||
226 | } | ||
227 | } | 264 | } |
228 | while (size > 0) | 265 | else |
229 | { | 266 | { |
230 | #if DEBUG_SERVER_MST | 267 | /* need to copy to private buffer to align; |
231 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 268 | * yes, we go a bit more spagetti than usual here */ |
232 | "Server-mst has %u bytes left in inbound buffer\n", | 269 | goto do_align; |
233 | (unsigned int) size); | ||
234 | #endif | ||
235 | if (size < sizeof (struct GNUNET_MessageHeader)) | ||
236 | break; | ||
237 | offset = (unsigned long) buf; | ||
238 | need_align = (0 != offset % ALIGN_FACTOR) ? GNUNET_YES : GNUNET_NO; | ||
239 | if (GNUNET_NO == need_align) | ||
240 | { | ||
241 | /* can try to do zero-copy and process directly from original buffer */ | ||
242 | hdr = (const struct GNUNET_MessageHeader *) buf; | ||
243 | want = ntohs (hdr->size); | ||
244 | if (want < sizeof (struct GNUNET_MessageHeader)) | ||
245 | { | ||
246 | GNUNET_break_op (0); | ||
247 | mst->off = 0; | ||
248 | return GNUNET_SYSERR; | ||
249 | } | ||
250 | if (size < want) | ||
251 | break; /* or not, buffer incomplete, so copy to private buffer... */ | ||
252 | if (one_shot == GNUNET_SYSERR) | ||
253 | { | ||
254 | /* cannot call callback again, but return value saying that | ||
255 | * we have another full message in the buffer */ | ||
256 | ret = GNUNET_NO; | ||
257 | goto copy; | ||
258 | } | ||
259 | if (one_shot == GNUNET_YES) | ||
260 | one_shot = GNUNET_SYSERR; | ||
261 | mst->cb (mst->cb_cls, client_identity, hdr); | ||
262 | buf += want; | ||
263 | size -= want; | ||
264 | } | ||
265 | else | ||
266 | { | ||
267 | /* need to copy to private buffer to align; | ||
268 | * yes, we go a bit more spagetti than usual here */ | ||
269 | goto do_align; | ||
270 | } | ||
271 | } | 270 | } |
271 | } | ||
272 | copy: | 272 | copy: |
273 | if ((size > 0) && (!purge)) | 273 | if ((size > 0) && (!purge)) |
274 | { | ||
275 | if (size + mst->pos > mst->curr_buf) | ||
274 | { | 276 | { |
275 | if (size + mst->pos > mst->curr_buf) | 277 | mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos); |
276 | { | 278 | ibuf = (char *) mst->hdr; |
277 | mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos); | 279 | mst->curr_buf = size + mst->pos; |
278 | ibuf = (char *) mst->hdr; | ||
279 | mst->curr_buf = size + mst->pos; | ||
280 | } | ||
281 | GNUNET_assert (mst->pos + size <= mst->curr_buf); | ||
282 | memcpy (&ibuf[mst->pos], buf, size); | ||
283 | mst->pos += size; | ||
284 | } | 280 | } |
281 | GNUNET_assert (mst->pos + size <= mst->curr_buf); | ||
282 | memcpy (&ibuf[mst->pos], buf, size); | ||
283 | mst->pos += size; | ||
284 | } | ||
285 | if (purge) | 285 | if (purge) |
286 | mst->off = 0; | 286 | mst->off = 0; |
287 | #if DEBUG_SERVER_MST | 287 | #if DEBUG_SERVER_MST |