diff options
Diffstat (limited to 'src/util/server_mst.c')
-rw-r--r-- | src/util/server_mst.c | 278 |
1 files changed, 140 insertions, 138 deletions
diff --git a/src/util/server_mst.c b/src/util/server_mst.c index adb0a0818..6fd2647d7 100644 --- a/src/util/server_mst.c +++ b/src/util/server_mst.c | |||
@@ -39,6 +39,8 @@ | |||
39 | #define ALIGN_FACTOR 8 | 39 | #define ALIGN_FACTOR 8 |
40 | #endif | 40 | #endif |
41 | 41 | ||
42 | #define LOG(kind,...) GNUNET_log_from (kind, "util", __VA_ARGS__) | ||
43 | |||
42 | 44 | ||
43 | /** | 45 | /** |
44 | * Handle to a message stream tokenizer. | 46 | * Handle to a message stream tokenizer. |
@@ -89,7 +91,7 @@ struct GNUNET_SERVER_MessageStreamTokenizer | |||
89 | */ | 91 | */ |
90 | struct GNUNET_SERVER_MessageStreamTokenizer * | 92 | struct GNUNET_SERVER_MessageStreamTokenizer * |
91 | GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb, | 93 | GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb, |
92 | void *cb_cls) | 94 | void *cb_cls) |
93 | { | 95 | { |
94 | struct GNUNET_SERVER_MessageStreamTokenizer *ret; | 96 | struct GNUNET_SERVER_MessageStreamTokenizer *ret; |
95 | 97 | ||
@@ -119,8 +121,8 @@ GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb, | |||
119 | */ | 121 | */ |
120 | int | 122 | int |
121 | GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, | 123 | GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, |
122 | void *client_identity, const char *buf, size_t size, | 124 | void *client_identity, const char *buf, |
123 | int purge, int one_shot) | 125 | size_t size, int purge, int one_shot) |
124 | { | 126 | { |
125 | const struct GNUNET_MessageHeader *hdr; | 127 | const struct GNUNET_MessageHeader *hdr; |
126 | size_t delta; | 128 | size_t delta; |
@@ -131,161 +133,161 @@ GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, | |||
131 | int ret; | 133 | int ret; |
132 | 134 | ||
133 | #if DEBUG_SERVER_MST | 135 | #if DEBUG_SERVER_MST |
134 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 136 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
135 | "Server-mst receives %u bytes with %u bytes already in private buffer\n", | 137 | "Server-mst receives %u bytes with %u bytes already in private buffer\n", |
136 | (unsigned int) size, (unsigned int) (mst->pos - mst->off)); | 138 | (unsigned int) size, (unsigned int) (mst->pos - mst->off)); |
137 | #endif | 139 | #endif |
138 | ret = GNUNET_OK; | 140 | ret = GNUNET_OK; |
139 | ibuf = (char *) mst->hdr; | 141 | ibuf = (char *) mst->hdr; |
140 | while (mst->pos > 0) | 142 | while (mst->pos > 0) |
141 | { | ||
142 | do_align: | ||
143 | if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) || | ||
144 | (0 != (mst->off % ALIGN_FACTOR))) | ||
145 | { | ||
146 | /* need to align or need more space */ | ||
147 | mst->pos -= mst->off; | ||
148 | memmove (ibuf, &ibuf[mst->off], mst->pos); | ||
149 | mst->off = 0; | ||
150 | } | ||
151 | if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) | ||
152 | { | ||
153 | delta = | ||
154 | GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - | ||
155 | (mst->pos - mst->off), size); | ||
156 | memcpy (&ibuf[mst->pos], buf, delta); | ||
157 | mst->pos += delta; | ||
158 | buf += delta; | ||
159 | size -= delta; | ||
160 | } | ||
161 | if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) | ||
162 | { | ||
163 | if (purge) | ||
164 | { | ||
165 | mst->off = 0; | ||
166 | mst->pos = 0; | ||
167 | } | ||
168 | return GNUNET_OK; | ||
169 | } | ||
170 | hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; | ||
171 | want = ntohs (hdr->size); | ||
172 | if (want < sizeof (struct GNUNET_MessageHeader)) | ||
173 | { | 143 | { |
174 | GNUNET_break_op (0); | 144 | do_align: |
175 | return GNUNET_SYSERR; | 145 | if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) || |
176 | } | 146 | (0 != (mst->off % ALIGN_FACTOR))) |
177 | if (mst->curr_buf - mst->off < want) | 147 | { |
178 | { | 148 | /* need to align or need more space */ |
179 | /* need more space */ | 149 | mst->pos -= mst->off; |
180 | mst->pos -= mst->off; | 150 | memmove (ibuf, &ibuf[mst->off], mst->pos); |
181 | memmove (ibuf, &ibuf[mst->off], mst->pos); | 151 | mst->off = 0; |
182 | mst->off = 0; | 152 | } |
183 | } | 153 | if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) |
184 | if (want > mst->curr_buf) | 154 | { |
185 | { | 155 | delta = |
186 | mst->hdr = GNUNET_realloc (mst->hdr, want); | 156 | GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - |
187 | ibuf = (char *) mst->hdr; | 157 | (mst->pos - mst->off), size); |
188 | mst->curr_buf = want; | 158 | memcpy (&ibuf[mst->pos], buf, delta); |
189 | } | 159 | mst->pos += delta; |
190 | hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; | 160 | buf += delta; |
191 | if (mst->pos - mst->off < want) | 161 | size -= delta; |
192 | { | 162 | } |
193 | delta = GNUNET_MIN (want - (mst->pos - mst->off), size); | 163 | if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) |
194 | memcpy (&ibuf[mst->pos], buf, delta); | 164 | { |
195 | mst->pos += delta; | 165 | if (purge) |
196 | buf += delta; | 166 | { |
197 | size -= delta; | 167 | mst->off = 0; |
198 | } | 168 | mst->pos = 0; |
199 | if (mst->pos - mst->off < want) | 169 | } |
200 | { | 170 | return GNUNET_OK; |
201 | if (purge) | 171 | } |
202 | { | 172 | hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; |
203 | mst->off = 0; | ||
204 | mst->pos = 0; | ||
205 | } | ||
206 | return GNUNET_OK; | ||
207 | } | ||
208 | if (one_shot == GNUNET_SYSERR) | ||
209 | { | ||
210 | /* cannot call callback again, but return value saying that | ||
211 | * we have another full message in the buffer */ | ||
212 | ret = GNUNET_NO; | ||
213 | goto copy; | ||
214 | } | ||
215 | if (one_shot == GNUNET_YES) | ||
216 | one_shot = GNUNET_SYSERR; | ||
217 | mst->cb (mst->cb_cls, client_identity, hdr); | ||
218 | mst->off += want; | ||
219 | if (mst->off == mst->pos) | ||
220 | { | ||
221 | /* reset to beginning of buffer, it's free right now! */ | ||
222 | mst->off = 0; | ||
223 | mst->pos = 0; | ||
224 | } | ||
225 | } | ||
226 | while (size > 0) | ||
227 | { | ||
228 | #if DEBUG_SERVER_MST | ||
229 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
230 | "Server-mst has %u bytes left in inbound buffer\n", | ||
231 | (unsigned int) size); | ||
232 | #endif | ||
233 | if (size < sizeof (struct GNUNET_MessageHeader)) | ||
234 | break; | ||
235 | offset = (unsigned long) buf; | ||
236 | need_align = (0 != offset % ALIGN_FACTOR) ? GNUNET_YES : GNUNET_NO; | ||
237 | if (GNUNET_NO == need_align) | ||
238 | { | ||
239 | /* can try to do zero-copy and process directly from original buffer */ | ||
240 | hdr = (const struct GNUNET_MessageHeader *) buf; | ||
241 | want = ntohs (hdr->size); | 173 | want = ntohs (hdr->size); |
242 | if (want < sizeof (struct GNUNET_MessageHeader)) | 174 | if (want < sizeof (struct GNUNET_MessageHeader)) |
243 | { | 175 | { |
244 | GNUNET_break_op (0); | 176 | GNUNET_break_op (0); |
245 | mst->off = 0; | 177 | return GNUNET_SYSERR; |
246 | return GNUNET_SYSERR; | 178 | } |
247 | } | 179 | if (mst->curr_buf - mst->off < want) |
248 | if (size < want) | 180 | { |
249 | break; /* or not, buffer incomplete, so copy to private buffer... */ | 181 | /* need more space */ |
182 | mst->pos -= mst->off; | ||
183 | memmove (ibuf, &ibuf[mst->off], mst->pos); | ||
184 | mst->off = 0; | ||
185 | } | ||
186 | if (want > mst->curr_buf) | ||
187 | { | ||
188 | mst->hdr = GNUNET_realloc (mst->hdr, want); | ||
189 | ibuf = (char *) mst->hdr; | ||
190 | mst->curr_buf = want; | ||
191 | } | ||
192 | hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; | ||
193 | if (mst->pos - mst->off < want) | ||
194 | { | ||
195 | delta = GNUNET_MIN (want - (mst->pos - mst->off), size); | ||
196 | memcpy (&ibuf[mst->pos], buf, delta); | ||
197 | mst->pos += delta; | ||
198 | buf += delta; | ||
199 | size -= delta; | ||
200 | } | ||
201 | if (mst->pos - mst->off < want) | ||
202 | { | ||
203 | if (purge) | ||
204 | { | ||
205 | mst->off = 0; | ||
206 | mst->pos = 0; | ||
207 | } | ||
208 | return GNUNET_OK; | ||
209 | } | ||
250 | if (one_shot == GNUNET_SYSERR) | 210 | if (one_shot == GNUNET_SYSERR) |
251 | { | 211 | { |
252 | /* cannot call callback again, but return value saying that | 212 | /* cannot call callback again, but return value saying that |
253 | * we have another full message in the buffer */ | 213 | * we have another full message in the buffer */ |
254 | ret = GNUNET_NO; | 214 | ret = GNUNET_NO; |
255 | goto copy; | 215 | goto copy; |
256 | } | 216 | } |
257 | if (one_shot == GNUNET_YES) | 217 | if (one_shot == GNUNET_YES) |
258 | one_shot = GNUNET_SYSERR; | 218 | one_shot = GNUNET_SYSERR; |
259 | mst->cb (mst->cb_cls, client_identity, hdr); | 219 | mst->cb (mst->cb_cls, client_identity, hdr); |
260 | buf += want; | 220 | mst->off += want; |
261 | size -= want; | 221 | if (mst->off == mst->pos) |
222 | { | ||
223 | /* reset to beginning of buffer, it's free right now! */ | ||
224 | mst->off = 0; | ||
225 | mst->pos = 0; | ||
226 | } | ||
262 | } | 227 | } |
263 | else | 228 | while (size > 0) |
264 | { | 229 | { |
265 | /* need to copy to private buffer to align; | 230 | #if DEBUG_SERVER_MST |
266 | * yes, we go a bit more spagetti than usual here */ | 231 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
267 | goto do_align; | 232 | "Server-mst has %u bytes left in inbound buffer\n", |
233 | (unsigned int) size); | ||
234 | #endif | ||
235 | if (size < sizeof (struct GNUNET_MessageHeader)) | ||
236 | break; | ||
237 | offset = (unsigned long) buf; | ||
238 | need_align = (0 != offset % ALIGN_FACTOR) ? GNUNET_YES : GNUNET_NO; | ||
239 | if (GNUNET_NO == need_align) | ||
240 | { | ||
241 | /* can try to do zero-copy and process directly from original buffer */ | ||
242 | hdr = (const struct GNUNET_MessageHeader *) buf; | ||
243 | want = ntohs (hdr->size); | ||
244 | if (want < sizeof (struct GNUNET_MessageHeader)) | ||
245 | { | ||
246 | GNUNET_break_op (0); | ||
247 | mst->off = 0; | ||
248 | return GNUNET_SYSERR; | ||
249 | } | ||
250 | if (size < want) | ||
251 | break; /* or not, buffer incomplete, so copy to private buffer... */ | ||
252 | if (one_shot == GNUNET_SYSERR) | ||
253 | { | ||
254 | /* cannot call callback again, but return value saying that | ||
255 | * we have another full message in the buffer */ | ||
256 | ret = GNUNET_NO; | ||
257 | goto copy; | ||
258 | } | ||
259 | if (one_shot == GNUNET_YES) | ||
260 | one_shot = GNUNET_SYSERR; | ||
261 | mst->cb (mst->cb_cls, client_identity, hdr); | ||
262 | buf += want; | ||
263 | size -= want; | ||
264 | } | ||
265 | else | ||
266 | { | ||
267 | /* need to copy to private buffer to align; | ||
268 | * yes, we go a bit more spagetti than usual here */ | ||
269 | goto do_align; | ||
270 | } | ||
268 | } | 271 | } |
269 | } | ||
270 | copy: | 272 | copy: |
271 | if ((size > 0) && (!purge)) | 273 | if ((size > 0) && (!purge)) |
272 | { | ||
273 | if (size + mst->pos > mst->curr_buf) | ||
274 | { | 274 | { |
275 | mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos); | 275 | if (size + mst->pos > mst->curr_buf) |
276 | ibuf = (char *) mst->hdr; | 276 | { |
277 | mst->curr_buf = size + mst->pos; | 277 | mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos); |
278 | ibuf = (char *) mst->hdr; | ||
279 | mst->curr_buf = size + mst->pos; | ||
280 | } | ||
281 | GNUNET_assert (mst->pos + size <= mst->curr_buf); | ||
282 | memcpy (&ibuf[mst->pos], buf, size); | ||
283 | mst->pos += size; | ||
278 | } | 284 | } |
279 | GNUNET_assert (mst->pos + size <= mst->curr_buf); | ||
280 | memcpy (&ibuf[mst->pos], buf, size); | ||
281 | mst->pos += size; | ||
282 | } | ||
283 | if (purge) | 285 | if (purge) |
284 | mst->off = 0; | 286 | mst->off = 0; |
285 | #if DEBUG_SERVER_MST | 287 | #if DEBUG_SERVER_MST |
286 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 288 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
287 | "Server-mst leaves %u bytes in private buffer\n", | 289 | "Server-mst leaves %u bytes in private buffer\n", |
288 | (unsigned int) (mst->pos - mst->off)); | 290 | (unsigned int) (mst->pos - mst->off)); |
289 | #endif | 291 | #endif |
290 | return ret; | 292 | return ret; |
291 | } | 293 | } |