aboutsummaryrefslogtreecommitdiff
path: root/src/util/server_mst.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/server_mst.c')
-rw-r--r--src/util/server_mst.c264
1 files changed, 132 insertions, 132 deletions
diff --git a/src/util/server_mst.c b/src/util/server_mst.c
index 6fd2647d7..dcb8c4810 100644
--- a/src/util/server_mst.c
+++ b/src/util/server_mst.c
@@ -91,7 +91,7 @@ struct GNUNET_SERVER_MessageStreamTokenizer
91 */ 91 */
92struct GNUNET_SERVER_MessageStreamTokenizer * 92struct GNUNET_SERVER_MessageStreamTokenizer *
93GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb, 93GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb,
94 void *cb_cls) 94 void *cb_cls)
95{ 95{
96 struct GNUNET_SERVER_MessageStreamTokenizer *ret; 96 struct GNUNET_SERVER_MessageStreamTokenizer *ret;
97 97
@@ -121,8 +121,8 @@ GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb,
121 */ 121 */
122int 122int
123GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, 123GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst,
124 void *client_identity, const char *buf, 124 void *client_identity, const char *buf, size_t size,
125 size_t size, int purge, int one_shot) 125 int purge, int one_shot)
126{ 126{
127 const struct GNUNET_MessageHeader *hdr; 127 const struct GNUNET_MessageHeader *hdr;
128 size_t delta; 128 size_t delta;
@@ -140,148 +140,148 @@ GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst,
140 ret = GNUNET_OK; 140 ret = GNUNET_OK;
141 ibuf = (char *) mst->hdr; 141 ibuf = (char *) mst->hdr;
142 while (mst->pos > 0) 142 while (mst->pos > 0)
143 {
144do_align:
145 if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) ||
146 (0 != (mst->off % ALIGN_FACTOR)))
143 { 147 {
144 do_align: 148 /* need to align or need more space */
145 if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) || 149 mst->pos -= mst->off;
146 (0 != (mst->off % ALIGN_FACTOR))) 150 memmove (ibuf, &ibuf[mst->off], mst->pos);
147 { 151 mst->off = 0;
148 /* need to align or need more space */ 152 }
149 mst->pos -= mst->off; 153 if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
150 memmove (ibuf, &ibuf[mst->off], mst->pos); 154 {
151 mst->off = 0; 155 delta =
152 } 156 GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) -
153 if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) 157 (mst->pos - mst->off), size);
154 { 158 memcpy (&ibuf[mst->pos], buf, delta);
155 delta = 159 mst->pos += delta;
156 GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - 160 buf += delta;
157 (mst->pos - mst->off), size); 161 size -= delta;
158 memcpy (&ibuf[mst->pos], buf, delta); 162 }
159 mst->pos += delta; 163 if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
160 buf += delta; 164 {
161 size -= delta; 165 if (purge)
162 } 166 {
163 if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) 167 mst->off = 0;
164 { 168 mst->pos = 0;
165 if (purge) 169 }
166 { 170 return GNUNET_OK;
167 mst->off = 0; 171 }
168 mst->pos = 0; 172 hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
169 } 173 want = ntohs (hdr->size);
170 return GNUNET_OK; 174 if (want < sizeof (struct GNUNET_MessageHeader))
171 } 175 {
172 hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; 176 GNUNET_break_op (0);
177 return GNUNET_SYSERR;
178 }
179 if (mst->curr_buf - mst->off < want)
180 {
181 /* need more space */
182 mst->pos -= mst->off;
183 memmove (ibuf, &ibuf[mst->off], mst->pos);
184 mst->off = 0;
185 }
186 if (want > mst->curr_buf)
187 {
188 mst->hdr = GNUNET_realloc (mst->hdr, want);
189 ibuf = (char *) mst->hdr;
190 mst->curr_buf = want;
191 }
192 hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
193 if (mst->pos - mst->off < want)
194 {
195 delta = GNUNET_MIN (want - (mst->pos - mst->off), size);
196 memcpy (&ibuf[mst->pos], buf, delta);
197 mst->pos += delta;
198 buf += delta;
199 size -= delta;
200 }
201 if (mst->pos - mst->off < want)
202 {
203 if (purge)
204 {
205 mst->off = 0;
206 mst->pos = 0;
207 }
208 return GNUNET_OK;
209 }
210 if (one_shot == GNUNET_SYSERR)
211 {
212 /* cannot call callback again, but return value saying that
213 * we have another full message in the buffer */
214 ret = GNUNET_NO;
215 goto copy;
216 }
217 if (one_shot == GNUNET_YES)
218 one_shot = GNUNET_SYSERR;
219 mst->cb (mst->cb_cls, client_identity, hdr);
220 mst->off += want;
221 if (mst->off == mst->pos)
222 {
223 /* reset to beginning of buffer, it's free right now! */
224 mst->off = 0;
225 mst->pos = 0;
226 }
227 }
228 while (size > 0)
229 {
230#if DEBUG_SERVER_MST
231 LOG (GNUNET_ERROR_TYPE_DEBUG,
232 "Server-mst has %u bytes left in inbound buffer\n",
233 (unsigned int) size);
234#endif
235 if (size < sizeof (struct GNUNET_MessageHeader))
236 break;
237 offset = (unsigned long) buf;
238 need_align = (0 != offset % ALIGN_FACTOR) ? GNUNET_YES : GNUNET_NO;
239 if (GNUNET_NO == need_align)
240 {
241 /* can try to do zero-copy and process directly from original buffer */
242 hdr = (const struct GNUNET_MessageHeader *) buf;
173 want = ntohs (hdr->size); 243 want = ntohs (hdr->size);
174 if (want < sizeof (struct GNUNET_MessageHeader)) 244 if (want < sizeof (struct GNUNET_MessageHeader))
175 { 245 {
176 GNUNET_break_op (0); 246 GNUNET_break_op (0);
177 return GNUNET_SYSERR; 247 mst->off = 0;
178 } 248 return GNUNET_SYSERR;
179 if (mst->curr_buf - mst->off < want) 249 }
180 { 250 if (size < want)
181 /* need more space */ 251 break; /* or not, buffer incomplete, so copy to private buffer... */
182 mst->pos -= mst->off;
183 memmove (ibuf, &ibuf[mst->off], mst->pos);
184 mst->off = 0;
185 }
186 if (want > mst->curr_buf)
187 {
188 mst->hdr = GNUNET_realloc (mst->hdr, want);
189 ibuf = (char *) mst->hdr;
190 mst->curr_buf = want;
191 }
192 hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
193 if (mst->pos - mst->off < want)
194 {
195 delta = GNUNET_MIN (want - (mst->pos - mst->off), size);
196 memcpy (&ibuf[mst->pos], buf, delta);
197 mst->pos += delta;
198 buf += delta;
199 size -= delta;
200 }
201 if (mst->pos - mst->off < want)
202 {
203 if (purge)
204 {
205 mst->off = 0;
206 mst->pos = 0;
207 }
208 return GNUNET_OK;
209 }
210 if (one_shot == GNUNET_SYSERR) 252 if (one_shot == GNUNET_SYSERR)
211 { 253 {
212 /* cannot call callback again, but return value saying that 254 /* cannot call callback again, but return value saying that
213 * we have another full message in the buffer */ 255 * we have another full message in the buffer */
214 ret = GNUNET_NO; 256 ret = GNUNET_NO;
215 goto copy; 257 goto copy;
216 } 258 }
217 if (one_shot == GNUNET_YES) 259 if (one_shot == GNUNET_YES)
218 one_shot = GNUNET_SYSERR; 260 one_shot = GNUNET_SYSERR;
219 mst->cb (mst->cb_cls, client_identity, hdr); 261 mst->cb (mst->cb_cls, client_identity, hdr);
220 mst->off += want; 262 buf += want;
221 if (mst->off == mst->pos) 263 size -= want;
222 {
223 /* reset to beginning of buffer, it's free right now! */
224 mst->off = 0;
225 mst->pos = 0;
226 }
227 } 264 }
228 while (size > 0) 265 else
229 { 266 {
230#if DEBUG_SERVER_MST 267 /* need to copy to private buffer to align;
231 LOG (GNUNET_ERROR_TYPE_DEBUG, 268 * yes, we go a bit more spagetti than usual here */
232 "Server-mst has %u bytes left in inbound buffer\n", 269 goto do_align;
233 (unsigned int) size);
234#endif
235 if (size < sizeof (struct GNUNET_MessageHeader))
236 break;
237 offset = (unsigned long) buf;
238 need_align = (0 != offset % ALIGN_FACTOR) ? GNUNET_YES : GNUNET_NO;
239 if (GNUNET_NO == need_align)
240 {
241 /* can try to do zero-copy and process directly from original buffer */
242 hdr = (const struct GNUNET_MessageHeader *) buf;
243 want = ntohs (hdr->size);
244 if (want < sizeof (struct GNUNET_MessageHeader))
245 {
246 GNUNET_break_op (0);
247 mst->off = 0;
248 return GNUNET_SYSERR;
249 }
250 if (size < want)
251 break; /* or not, buffer incomplete, so copy to private buffer... */
252 if (one_shot == GNUNET_SYSERR)
253 {
254 /* cannot call callback again, but return value saying that
255 * we have another full message in the buffer */
256 ret = GNUNET_NO;
257 goto copy;
258 }
259 if (one_shot == GNUNET_YES)
260 one_shot = GNUNET_SYSERR;
261 mst->cb (mst->cb_cls, client_identity, hdr);
262 buf += want;
263 size -= want;
264 }
265 else
266 {
267 /* need to copy to private buffer to align;
268 * yes, we go a bit more spagetti than usual here */
269 goto do_align;
270 }
271 } 270 }
271 }
272copy: 272copy:
273 if ((size > 0) && (!purge)) 273 if ((size > 0) && (!purge))
274 {
275 if (size + mst->pos > mst->curr_buf)
274 { 276 {
275 if (size + mst->pos > mst->curr_buf) 277 mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos);
276 { 278 ibuf = (char *) mst->hdr;
277 mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos); 279 mst->curr_buf = size + mst->pos;
278 ibuf = (char *) mst->hdr;
279 mst->curr_buf = size + mst->pos;
280 }
281 GNUNET_assert (mst->pos + size <= mst->curr_buf);
282 memcpy (&ibuf[mst->pos], buf, size);
283 mst->pos += size;
284 } 280 }
281 GNUNET_assert (mst->pos + size <= mst->curr_buf);
282 memcpy (&ibuf[mst->pos], buf, size);
283 mst->pos += size;
284 }
285 if (purge) 285 if (purge)
286 mst->off = 0; 286 mst->off = 0;
287#if DEBUG_SERVER_MST 287#if DEBUG_SERVER_MST