aboutsummaryrefslogtreecommitdiff
path: root/src/util/server_mst.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/server_mst.c')
-rw-r--r--src/util/server_mst.c278
1 files changed, 140 insertions, 138 deletions
diff --git a/src/util/server_mst.c b/src/util/server_mst.c
index adb0a0818..6fd2647d7 100644
--- a/src/util/server_mst.c
+++ b/src/util/server_mst.c
@@ -39,6 +39,8 @@
39#define ALIGN_FACTOR 8 39#define ALIGN_FACTOR 8
40#endif 40#endif
41 41
42#define LOG(kind,...) GNUNET_log_from (kind, "util", __VA_ARGS__)
43
42 44
43/** 45/**
44 * Handle to a message stream tokenizer. 46 * Handle to a message stream tokenizer.
@@ -89,7 +91,7 @@ struct GNUNET_SERVER_MessageStreamTokenizer
89 */ 91 */
90struct GNUNET_SERVER_MessageStreamTokenizer * 92struct GNUNET_SERVER_MessageStreamTokenizer *
91GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb, 93GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb,
92 void *cb_cls) 94 void *cb_cls)
93{ 95{
94 struct GNUNET_SERVER_MessageStreamTokenizer *ret; 96 struct GNUNET_SERVER_MessageStreamTokenizer *ret;
95 97
@@ -119,8 +121,8 @@ GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb,
119 */ 121 */
120int 122int
121GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, 123GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst,
122 void *client_identity, const char *buf, size_t size, 124 void *client_identity, const char *buf,
123 int purge, int one_shot) 125 size_t size, int purge, int one_shot)
124{ 126{
125 const struct GNUNET_MessageHeader *hdr; 127 const struct GNUNET_MessageHeader *hdr;
126 size_t delta; 128 size_t delta;
@@ -131,161 +133,161 @@ GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst,
131 int ret; 133 int ret;
132 134
133#if DEBUG_SERVER_MST 135#if DEBUG_SERVER_MST
134 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 136 LOG (GNUNET_ERROR_TYPE_DEBUG,
135 "Server-mst receives %u bytes with %u bytes already in private buffer\n", 137 "Server-mst receives %u bytes with %u bytes already in private buffer\n",
136 (unsigned int) size, (unsigned int) (mst->pos - mst->off)); 138 (unsigned int) size, (unsigned int) (mst->pos - mst->off));
137#endif 139#endif
138 ret = GNUNET_OK; 140 ret = GNUNET_OK;
139 ibuf = (char *) mst->hdr; 141 ibuf = (char *) mst->hdr;
140 while (mst->pos > 0) 142 while (mst->pos > 0)
141 {
142do_align:
143 if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) ||
144 (0 != (mst->off % ALIGN_FACTOR)))
145 {
146 /* need to align or need more space */
147 mst->pos -= mst->off;
148 memmove (ibuf, &ibuf[mst->off], mst->pos);
149 mst->off = 0;
150 }
151 if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
152 {
153 delta =
154 GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) -
155 (mst->pos - mst->off), size);
156 memcpy (&ibuf[mst->pos], buf, delta);
157 mst->pos += delta;
158 buf += delta;
159 size -= delta;
160 }
161 if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
162 {
163 if (purge)
164 {
165 mst->off = 0;
166 mst->pos = 0;
167 }
168 return GNUNET_OK;
169 }
170 hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
171 want = ntohs (hdr->size);
172 if (want < sizeof (struct GNUNET_MessageHeader))
173 { 143 {
174 GNUNET_break_op (0); 144 do_align:
175 return GNUNET_SYSERR; 145 if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) ||
176 } 146 (0 != (mst->off % ALIGN_FACTOR)))
177 if (mst->curr_buf - mst->off < want) 147 {
178 { 148 /* need to align or need more space */
179 /* need more space */ 149 mst->pos -= mst->off;
180 mst->pos -= mst->off; 150 memmove (ibuf, &ibuf[mst->off], mst->pos);
181 memmove (ibuf, &ibuf[mst->off], mst->pos); 151 mst->off = 0;
182 mst->off = 0; 152 }
183 } 153 if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
184 if (want > mst->curr_buf) 154 {
185 { 155 delta =
186 mst->hdr = GNUNET_realloc (mst->hdr, want); 156 GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) -
187 ibuf = (char *) mst->hdr; 157 (mst->pos - mst->off), size);
188 mst->curr_buf = want; 158 memcpy (&ibuf[mst->pos], buf, delta);
189 } 159 mst->pos += delta;
190 hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; 160 buf += delta;
191 if (mst->pos - mst->off < want) 161 size -= delta;
192 { 162 }
193 delta = GNUNET_MIN (want - (mst->pos - mst->off), size); 163 if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
194 memcpy (&ibuf[mst->pos], buf, delta); 164 {
195 mst->pos += delta; 165 if (purge)
196 buf += delta; 166 {
197 size -= delta; 167 mst->off = 0;
198 } 168 mst->pos = 0;
199 if (mst->pos - mst->off < want) 169 }
200 { 170 return GNUNET_OK;
201 if (purge) 171 }
202 { 172 hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
203 mst->off = 0;
204 mst->pos = 0;
205 }
206 return GNUNET_OK;
207 }
208 if (one_shot == GNUNET_SYSERR)
209 {
210 /* cannot call callback again, but return value saying that
211 * we have another full message in the buffer */
212 ret = GNUNET_NO;
213 goto copy;
214 }
215 if (one_shot == GNUNET_YES)
216 one_shot = GNUNET_SYSERR;
217 mst->cb (mst->cb_cls, client_identity, hdr);
218 mst->off += want;
219 if (mst->off == mst->pos)
220 {
221 /* reset to beginning of buffer, it's free right now! */
222 mst->off = 0;
223 mst->pos = 0;
224 }
225 }
226 while (size > 0)
227 {
228#if DEBUG_SERVER_MST
229 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
230 "Server-mst has %u bytes left in inbound buffer\n",
231 (unsigned int) size);
232#endif
233 if (size < sizeof (struct GNUNET_MessageHeader))
234 break;
235 offset = (unsigned long) buf;
236 need_align = (0 != offset % ALIGN_FACTOR) ? GNUNET_YES : GNUNET_NO;
237 if (GNUNET_NO == need_align)
238 {
239 /* can try to do zero-copy and process directly from original buffer */
240 hdr = (const struct GNUNET_MessageHeader *) buf;
241 want = ntohs (hdr->size); 173 want = ntohs (hdr->size);
242 if (want < sizeof (struct GNUNET_MessageHeader)) 174 if (want < sizeof (struct GNUNET_MessageHeader))
243 { 175 {
244 GNUNET_break_op (0); 176 GNUNET_break_op (0);
245 mst->off = 0; 177 return GNUNET_SYSERR;
246 return GNUNET_SYSERR; 178 }
247 } 179 if (mst->curr_buf - mst->off < want)
248 if (size < want) 180 {
249 break; /* or not, buffer incomplete, so copy to private buffer... */ 181 /* need more space */
182 mst->pos -= mst->off;
183 memmove (ibuf, &ibuf[mst->off], mst->pos);
184 mst->off = 0;
185 }
186 if (want > mst->curr_buf)
187 {
188 mst->hdr = GNUNET_realloc (mst->hdr, want);
189 ibuf = (char *) mst->hdr;
190 mst->curr_buf = want;
191 }
192 hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
193 if (mst->pos - mst->off < want)
194 {
195 delta = GNUNET_MIN (want - (mst->pos - mst->off), size);
196 memcpy (&ibuf[mst->pos], buf, delta);
197 mst->pos += delta;
198 buf += delta;
199 size -= delta;
200 }
201 if (mst->pos - mst->off < want)
202 {
203 if (purge)
204 {
205 mst->off = 0;
206 mst->pos = 0;
207 }
208 return GNUNET_OK;
209 }
250 if (one_shot == GNUNET_SYSERR) 210 if (one_shot == GNUNET_SYSERR)
251 { 211 {
252 /* cannot call callback again, but return value saying that 212 /* cannot call callback again, but return value saying that
253 * we have another full message in the buffer */ 213 * we have another full message in the buffer */
254 ret = GNUNET_NO; 214 ret = GNUNET_NO;
255 goto copy; 215 goto copy;
256 } 216 }
257 if (one_shot == GNUNET_YES) 217 if (one_shot == GNUNET_YES)
258 one_shot = GNUNET_SYSERR; 218 one_shot = GNUNET_SYSERR;
259 mst->cb (mst->cb_cls, client_identity, hdr); 219 mst->cb (mst->cb_cls, client_identity, hdr);
260 buf += want; 220 mst->off += want;
261 size -= want; 221 if (mst->off == mst->pos)
222 {
223 /* reset to beginning of buffer, it's free right now! */
224 mst->off = 0;
225 mst->pos = 0;
226 }
262 } 227 }
263 else 228 while (size > 0)
264 { 229 {
265 /* need to copy to private buffer to align; 230#if DEBUG_SERVER_MST
266 * yes, we go a bit more spagetti than usual here */ 231 LOG (GNUNET_ERROR_TYPE_DEBUG,
267 goto do_align; 232 "Server-mst has %u bytes left in inbound buffer\n",
233 (unsigned int) size);
234#endif
235 if (size < sizeof (struct GNUNET_MessageHeader))
236 break;
237 offset = (unsigned long) buf;
238 need_align = (0 != offset % ALIGN_FACTOR) ? GNUNET_YES : GNUNET_NO;
239 if (GNUNET_NO == need_align)
240 {
241 /* can try to do zero-copy and process directly from original buffer */
242 hdr = (const struct GNUNET_MessageHeader *) buf;
243 want = ntohs (hdr->size);
244 if (want < sizeof (struct GNUNET_MessageHeader))
245 {
246 GNUNET_break_op (0);
247 mst->off = 0;
248 return GNUNET_SYSERR;
249 }
250 if (size < want)
251 break; /* or not, buffer incomplete, so copy to private buffer... */
252 if (one_shot == GNUNET_SYSERR)
253 {
254 /* cannot call callback again, but return value saying that
255 * we have another full message in the buffer */
256 ret = GNUNET_NO;
257 goto copy;
258 }
259 if (one_shot == GNUNET_YES)
260 one_shot = GNUNET_SYSERR;
261 mst->cb (mst->cb_cls, client_identity, hdr);
262 buf += want;
263 size -= want;
264 }
265 else
266 {
267 /* need to copy to private buffer to align;
268 * yes, we go a bit more spagetti than usual here */
269 goto do_align;
270 }
268 } 271 }
269 }
270copy: 272copy:
271 if ((size > 0) && (!purge)) 273 if ((size > 0) && (!purge))
272 {
273 if (size + mst->pos > mst->curr_buf)
274 { 274 {
275 mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos); 275 if (size + mst->pos > mst->curr_buf)
276 ibuf = (char *) mst->hdr; 276 {
277 mst->curr_buf = size + mst->pos; 277 mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos);
278 ibuf = (char *) mst->hdr;
279 mst->curr_buf = size + mst->pos;
280 }
281 GNUNET_assert (mst->pos + size <= mst->curr_buf);
282 memcpy (&ibuf[mst->pos], buf, size);
283 mst->pos += size;
278 } 284 }
279 GNUNET_assert (mst->pos + size <= mst->curr_buf);
280 memcpy (&ibuf[mst->pos], buf, size);
281 mst->pos += size;
282 }
283 if (purge) 285 if (purge)
284 mst->off = 0; 286 mst->off = 0;
285#if DEBUG_SERVER_MST 287#if DEBUG_SERVER_MST
286 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 288 LOG (GNUNET_ERROR_TYPE_DEBUG,
287 "Server-mst leaves %u bytes in private buffer\n", 289 "Server-mst leaves %u bytes in private buffer\n",
288 (unsigned int) (mst->pos - mst->off)); 290 (unsigned int) (mst->pos - mst->off));
289#endif 291#endif
290 return ret; 292 return ret;
291} 293}