aboutsummaryrefslogtreecommitdiff
path: root/src/util/server_mst.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/server_mst.c')
-rw-r--r--src/util/server_mst.c297
1 files changed, 143 insertions, 154 deletions
diff --git a/src/util/server_mst.c b/src/util/server_mst.c
index 835d8eeba..f48ef5475 100644
--- a/src/util/server_mst.c
+++ b/src/util/server_mst.c
@@ -50,7 +50,7 @@ struct GNUNET_SERVER_MessageStreamTokenizer
50 * Function to call on completed messages. 50 * Function to call on completed messages.
51 */ 51 */
52 GNUNET_SERVER_MessageTokenizerCallback cb; 52 GNUNET_SERVER_MessageTokenizerCallback cb;
53 53
54 /** 54 /**
55 * Closure for cb. 55 * Closure for cb.
56 */ 56 */
@@ -89,12 +89,12 @@ struct GNUNET_SERVER_MessageStreamTokenizer
89 */ 89 */
90struct GNUNET_SERVER_MessageStreamTokenizer * 90struct GNUNET_SERVER_MessageStreamTokenizer *
91GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb, 91GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb,
92 void *cb_cls) 92 void *cb_cls)
93{ 93{
94 struct GNUNET_SERVER_MessageStreamTokenizer *ret; 94 struct GNUNET_SERVER_MessageStreamTokenizer *ret;
95 95
96 ret = GNUNET_malloc (sizeof (struct GNUNET_SERVER_MessageStreamTokenizer)); 96 ret = GNUNET_malloc (sizeof (struct GNUNET_SERVER_MessageStreamTokenizer));
97 ret->hdr = GNUNET_malloc(GNUNET_SERVER_MIN_BUFFER_SIZE); 97 ret->hdr = GNUNET_malloc (GNUNET_SERVER_MIN_BUFFER_SIZE);
98 ret->curr_buf = GNUNET_SERVER_MIN_BUFFER_SIZE; 98 ret->curr_buf = GNUNET_SERVER_MIN_BUFFER_SIZE;
99 ret->cb = cb; 99 ret->cb = cb;
100 ret->cb_cls = cb_cls; 100 ret->cb_cls = cb_cls;
@@ -119,11 +119,9 @@ GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb,
119 */ 119 */
120int 120int
121GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, 121GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst,
122 void *client_identity, 122 void *client_identity,
123 const char *buf, 123 const char *buf,
124 size_t size, 124 size_t size, int purge, int one_shot)
125 int purge,
126 int one_shot)
127{ 125{
128 const struct GNUNET_MessageHeader *hdr; 126 const struct GNUNET_MessageHeader *hdr;
129 size_t delta; 127 size_t delta;
@@ -135,169 +133,160 @@ GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst,
135 133
136#if DEBUG_SERVER_MST 134#if DEBUG_SERVER_MST
137 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 135 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
138 "Server-mst receives %u bytes with %u bytes already in private buffer\n", 136 "Server-mst receives %u bytes with %u bytes already in private buffer\n",
139 (unsigned int) size, 137 (unsigned int) size, (unsigned int) (mst->pos - mst->off));
140 (unsigned int) (mst->pos - mst->off));
141#endif 138#endif
142 ret = GNUNET_OK; 139 ret = GNUNET_OK;
143 ibuf = (char*)mst->hdr; 140 ibuf = (char *) mst->hdr;
144 while (mst->pos > 0) 141 while (mst->pos > 0)
142 {
143do_align:
144 if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) ||
145 (0 != (mst->off % ALIGN_FACTOR)))
146 {
147 /* need to align or need more space */
148 mst->pos -= mst->off;
149 memmove (ibuf, &ibuf[mst->off], mst->pos);
150 mst->off = 0;
151 }
152 if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
153 {
154 delta =
155 GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) -
156 (mst->pos - mst->off), size);
157 memcpy (&ibuf[mst->pos], buf, delta);
158 mst->pos += delta;
159 buf += delta;
160 size -= delta;
161 }
162 if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
163 {
164 if (purge)
165 {
166 mst->off = 0;
167 mst->pos = 0;
168 }
169 return GNUNET_OK;
170 }
171 hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
172 want = ntohs (hdr->size);
173 if (want < sizeof (struct GNUNET_MessageHeader))
145 { 174 {
146 do_align: 175 GNUNET_break_op (0);
147 if ( (mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) || 176 return GNUNET_SYSERR;
148 (0 != (mst->off % ALIGN_FACTOR)) ) 177 }
149 { 178 if (mst->curr_buf - mst->off < want)
150 /* need to align or need more space */ 179 {
151 mst->pos -= mst->off; 180 /* need more space */
152 memmove (ibuf, 181 mst->pos -= mst->off;
153 &ibuf[mst->off], 182 memmove (ibuf, &ibuf[mst->off], mst->pos);
154 mst->pos); 183 mst->off = 0;
155 mst->off = 0; 184 }
156 } 185 if (want > mst->curr_buf)
157 if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) 186 {
158 { 187 mst->hdr = GNUNET_realloc (mst->hdr, want);
159 delta = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - (mst->pos - mst->off), 188 ibuf = (char *) mst->hdr;
160 size); 189 mst->curr_buf = want;
161 memcpy (&ibuf[mst->pos], 190 }
162 buf, 191 hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
163 delta); 192 if (mst->pos - mst->off < want)
164 mst->pos += delta; 193 {
165 buf += delta; 194 delta = GNUNET_MIN (want - (mst->pos - mst->off), size);
166 size -= delta; 195 memcpy (&ibuf[mst->pos], buf, delta);
167 } 196 mst->pos += delta;
168 if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) 197 buf += delta;
169 { 198 size -= delta;
170 if (purge) 199 }
171 { 200 if (mst->pos - mst->off < want)
172 mst->off = 0; 201 {
173 mst->pos = 0; 202 if (purge)
174 } 203 {
175 return GNUNET_OK; 204 mst->off = 0;
176 } 205 mst->pos = 0;
177 hdr = (const struct GNUNET_MessageHeader*) &ibuf[mst->off]; 206 }
207 return GNUNET_OK;
208 }
209 if (one_shot == GNUNET_SYSERR)
210 {
211 /* cannot call callback again, but return value saying that
212 * we have another full message in the buffer */
213 ret = GNUNET_NO;
214 goto copy;
215 }
216 if (one_shot == GNUNET_YES)
217 one_shot = GNUNET_SYSERR;
218 mst->cb (mst->cb_cls, client_identity, hdr);
219 mst->off += want;
220 if (mst->off == mst->pos)
221 {
222 /* reset to beginning of buffer, it's free right now! */
223 mst->off = 0;
224 mst->pos = 0;
225 }
226 }
227 while (size > 0)
228 {
229#if DEBUG_SERVER_MST
230 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
231 "Server-mst has %u bytes left in inbound buffer\n",
232 (unsigned int) size);
233#endif
234 if (size < sizeof (struct GNUNET_MessageHeader))
235 break;
236 offset = (unsigned long) buf;
237 need_align = (0 != offset % ALIGN_FACTOR) ? GNUNET_YES : GNUNET_NO;
238 if (GNUNET_NO == need_align)
239 {
240 /* can try to do zero-copy and process directly from original buffer */
241 hdr = (const struct GNUNET_MessageHeader *) buf;
178 want = ntohs (hdr->size); 242 want = ntohs (hdr->size);
179 if (want < sizeof (struct GNUNET_MessageHeader)) 243 if (want < sizeof (struct GNUNET_MessageHeader))
180 { 244 {
181 GNUNET_break_op (0); 245 GNUNET_break_op (0);
182 return GNUNET_SYSERR; 246 mst->off = 0;
183 } 247 return GNUNET_SYSERR;
184 if (mst->curr_buf - mst->off < want) 248 }
185 { 249 if (size < want)
186 /* need more space */ 250 break; /* or not, buffer incomplete, so copy to private buffer... */
187 mst->pos -= mst->off;
188 memmove (ibuf,
189 &ibuf[mst->off],
190 mst->pos);
191 mst->off = 0;
192 }
193 if (want > mst->curr_buf)
194 {
195 mst->hdr = GNUNET_realloc(mst->hdr, want);
196 ibuf = (char*)mst->hdr;
197 mst->curr_buf = want;
198 }
199 hdr = (const struct GNUNET_MessageHeader*) &ibuf[mst->off];
200 if (mst->pos - mst->off < want)
201 {
202 delta = GNUNET_MIN (want - (mst->pos - mst->off),
203 size);
204 memcpy (&ibuf[mst->pos],
205 buf,
206 delta);
207 mst->pos += delta;
208 buf += delta;
209 size -= delta;
210 }
211 if (mst->pos - mst->off < want)
212 {
213 if (purge)
214 {
215 mst->off = 0;
216 mst->pos = 0;
217 }
218 return GNUNET_OK;
219 }
220 if (one_shot == GNUNET_SYSERR) 251 if (one_shot == GNUNET_SYSERR)
221 { 252 {
222 /* cannot call callback again, but return value saying that 253 /* cannot call callback again, but return value saying that
223 we have another full message in the buffer */ 254 * we have another full message in the buffer */
224 ret = GNUNET_NO; 255 ret = GNUNET_NO;
225 goto copy; 256 goto copy;
226 } 257 }
227 if (one_shot == GNUNET_YES) 258 if (one_shot == GNUNET_YES)
228 one_shot = GNUNET_SYSERR; 259 one_shot = GNUNET_SYSERR;
229 mst->cb (mst->cb_cls, client_identity, hdr); 260 mst->cb (mst->cb_cls, client_identity, hdr);
230 mst->off += want; 261 buf += want;
231 if (mst->off == mst->pos) 262 size -= want;
232 {
233 /* reset to beginning of buffer, it's free right now! */
234 mst->off = 0;
235 mst->pos = 0;
236 }
237 } 263 }
238 while (size > 0) 264 else
239 { 265 {
240#if DEBUG_SERVER_MST 266 /* need to copy to private buffer to align;
241 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 267 * yes, we go a bit more spagetti than usual here */
242 "Server-mst has %u bytes left in inbound buffer\n", 268 goto do_align;
243 (unsigned int) size);
244#endif
245 if (size < sizeof (struct GNUNET_MessageHeader))
246 break;
247 offset = (unsigned long) buf;
248 need_align = (0 != offset % ALIGN_FACTOR) ? GNUNET_YES : GNUNET_NO;
249 if (GNUNET_NO == need_align)
250 {
251 /* can try to do zero-copy and process directly from original buffer */
252 hdr = (const struct GNUNET_MessageHeader *) buf;
253 want = ntohs (hdr->size);
254 if (want < sizeof (struct GNUNET_MessageHeader))
255 {
256 GNUNET_break_op (0);
257 mst->off = 0;
258 return GNUNET_SYSERR;
259 }
260 if (size < want)
261 break; /* or not, buffer incomplete, so copy to private buffer... */
262 if (one_shot == GNUNET_SYSERR)
263 {
264 /* cannot call callback again, but return value saying that
265 we have another full message in the buffer */
266 ret = GNUNET_NO;
267 goto copy;
268 }
269 if (one_shot == GNUNET_YES)
270 one_shot = GNUNET_SYSERR;
271 mst->cb (mst->cb_cls, client_identity, hdr);
272 buf += want;
273 size -= want;
274 }
275 else
276 {
277 /* need to copy to private buffer to align;
278 yes, we go a bit more spagetti than usual here */
279 goto do_align;
280 }
281 } 269 }
282 copy: 270 }
283 if ( (size > 0) && (! purge) ) 271copy:
272 if ((size > 0) && (!purge))
273 {
274 if (size + mst->pos > mst->curr_buf)
284 { 275 {
285 if (size + mst->pos > mst->curr_buf) 276 mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos);
286 { 277 ibuf = (char *) mst->hdr;
287 mst->hdr = GNUNET_realloc(mst->hdr, size + mst->pos); 278 mst->curr_buf = size + mst->pos;
288 ibuf = (char*)mst->hdr;
289 mst->curr_buf = size + mst->pos;
290 }
291 GNUNET_assert (mst->pos + size <= mst->curr_buf);
292 memcpy (&ibuf[mst->pos], buf, size);
293 mst->pos += size;
294 } 279 }
280 GNUNET_assert (mst->pos + size <= mst->curr_buf);
281 memcpy (&ibuf[mst->pos], buf, size);
282 mst->pos += size;
283 }
295 if (purge) 284 if (purge)
296 mst->off = 0; 285 mst->off = 0;
297#if DEBUG_SERVER_MST 286#if DEBUG_SERVER_MST
298 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 287 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
299 "Server-mst leaves %u bytes in private buffer\n", 288 "Server-mst leaves %u bytes in private buffer\n",
300 (unsigned int) (mst->pos - mst->off)); 289 (unsigned int) (mst->pos - mst->off));
301#endif 290#endif
302 return ret; 291 return ret;
303} 292}