aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2010-06-22 14:13:02 +0000
committerChristian Grothoff <christian@grothoff.org>2010-06-22 14:13:02 +0000
commit739ac84730e1d5f1b5affdeaedec11a986d2faa6 (patch)
tree5037c9135b89d54ccb00c735c29899a9a49f98a4
parent7ec59f1b0753f97f02be5a5064af534adea02377 (diff)
downloadgnunet-739ac84730e1d5f1b5affdeaedec11a986d2faa6.tar.gz
gnunet-739ac84730e1d5f1b5affdeaedec11a986d2faa6.zip
mst fixes
-rw-r--r--src/util/server_mst.c131
1 files changed, 96 insertions, 35 deletions
diff --git a/src/util/server_mst.c b/src/util/server_mst.c
index 8df4e4b7d..27c95815a 100644
--- a/src/util/server_mst.c
+++ b/src/util/server_mst.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 (C) 2009 Christian Grothoff (and other contributing authors) 3 (C) 2010 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -31,6 +31,12 @@
31#include "gnunet_server_lib.h" 31#include "gnunet_server_lib.h"
32#include "gnunet_time_lib.h" 32#include "gnunet_time_lib.h"
33 33
34#if HAVE_UNALIGNED_64_ACCESS
35#define ALIGN_FACTOR 4
36#else
37#define ALIGN_FACTOR 8
38#endif
39
34 40
35/** 41/**
36 * Handle to a message stream tokenizer. 42 * Handle to a message stream tokenizer.
@@ -38,18 +44,38 @@
38struct GNUNET_SERVER_MessageStreamTokenizer 44struct GNUNET_SERVER_MessageStreamTokenizer
39{ 45{
40 46
41 size_t maxbuf; 47 /**
42 48 * Function to call on completed messages.
43 size_t off; 49 */
50 GNUNET_SERVER_MessageTokenizerCallback cb;
51
52 /**
53 * Closure for cb.
54 */
55 void *cb_cls;
44 56
57 /**
58 * Client to pass to cb.
59 */
45 void *client_identity; 60 void *client_identity;
46 61
47 GNUNET_SERVER_MessageTokenizerCallback cb; 62 /**
63 * Size of the buffer (starting at 'hdr').
64 */
65 size_t maxbuf;
48 66
49 void *cb_cls; 67 /**
68 * How many bytes in buffer have we already processed?
69 */
70 size_t off;
71
72 /**
73 * How many bytes in buffer are valid right now?
74 */
75 size_t pos;
50 76
51 /** 77 /**
52 * Beginning of the buffer. 78 * Beginning of the buffer. Typed like this to force alignment.
53 */ 79 */
54 struct GNUNET_MessageHeader hdr; 80 struct GNUNET_MessageHeader hdr;
55 81
@@ -64,6 +90,8 @@ struct GNUNET_SERVER_MessageStreamTokenizer
64 * GNUNET_SERVER_MAX_MESSAGE_SIZE) 90 * GNUNET_SERVER_MAX_MESSAGE_SIZE)
65 * @param client_identity ID of client for which this is a buffer, 91 * @param client_identity ID of client for which this is a buffer,
66 * can be NULL (will be passed back to 'cb') 92 * can be NULL (will be passed back to 'cb')
93 * @param cb function to call on completed messages
94 * @param cb_cls closure for cb
67 * @return handle to tokenizer 95 * @return handle to tokenizer
68 */ 96 */
69struct GNUNET_SERVER_MessageStreamTokenizer * 97struct GNUNET_SERVER_MessageStreamTokenizer *
@@ -106,7 +134,7 @@ GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst,
106{ 134{
107 const struct GNUNET_MessageHeader *hdr; 135 const struct GNUNET_MessageHeader *hdr;
108 size_t delta; 136 size_t delta;
109 size_t want; 137 uint16_t want;
110 char *ibuf; 138 char *ibuf;
111 int need_align; 139 int need_align;
112 unsigned long offset; 140 unsigned long offset;
@@ -114,78 +142,110 @@ GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst,
114 142
115 ret = GNUNET_OK; 143 ret = GNUNET_OK;
116 ibuf = (char*) &mst->hdr; 144 ibuf = (char*) &mst->hdr;
117 if (mst->off > 0) 145 if (mst->pos > 0)
118 { 146 {
119 do_align: 147 do_align:
120 if (mst->off < sizeof (struct GNUNET_MessageHeader)) 148 if ( (mst->maxbuf - mst->off < sizeof (struct GNUNET_MessageHeader)) ||
149 (0 != (mst->off % ALIGN_FACTOR)) )
150 {
151 /* need to align or need more space */
152 mst->pos -= mst->off;
153 memmove (ibuf,
154 &ibuf[mst->off],
155 mst->pos);
156 mst->off = 0;
157 }
158 if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
121 { 159 {
122 delta = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - mst->off, 160 delta = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - (mst->pos - mst->off),
123 size); 161 size);
124 memcpy (&ibuf[mst->off], 162 memcpy (&ibuf[mst->pos],
125 buf, 163 buf,
126 delta); 164 delta);
127 mst->off += delta; 165 mst->pos += delta;
128 buf += delta; 166 buf += delta;
129 size -= delta; 167 size -= delta;
130 } 168 }
131 if (mst->off < sizeof (struct GNUNET_MessageHeader)) 169 if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
132 { 170 {
133 if (purge) 171 if (purge)
134 mst->off = 0; 172 {
173 mst->off = 0;
174 mst->pos = 0;
175 }
135 return GNUNET_OK; 176 return GNUNET_OK;
136 } 177 }
137 want = ntohs (mst->hdr.size); 178 hdr = (const struct GNUNET_MessageHeader*) &ibuf[mst->off];
179 want = ntohs (hdr->size);
138 if (want < sizeof (struct GNUNET_MessageHeader)) 180 if (want < sizeof (struct GNUNET_MessageHeader))
139 { 181 {
140 GNUNET_break_op (0); 182 GNUNET_break_op (0);
141 return GNUNET_SYSERR; 183 return GNUNET_SYSERR;
142 } 184 }
143 if (want < mst->off) 185 if (mst->maxbuf - mst->off < want)
144 { 186 {
145 delta = GNUNET_MIN (want - mst->off, 187 /* need more space */
188 mst->pos -= mst->off;
189 memmove (ibuf,
190 &ibuf[mst->off],
191 mst->pos);
192 mst->off = 0;
193 }
194 if (mst->pos - mst->off < want)
195 {
196 delta = GNUNET_MIN (want - (mst->pos - mst->off),
146 size); 197 size);
147 memcpy (&ibuf[mst->off], 198 memcpy (&ibuf[mst->pos],
148 buf, 199 buf,
149 delta); 200 delta);
150 mst->off += delta; 201 mst->pos += delta;
151 buf += delta; 202 buf += delta;
152 size -= delta; 203 size -= delta;
153 } 204 }
154 if (want < mst->off) 205 if (mst->pos - mst->off < want)
155 { 206 {
156 if (purge) 207 if (purge)
157 mst->off = 0; 208 {
209 mst->off = 0;
210 mst->pos = 0;
211 }
158 return GNUNET_OK; 212 return GNUNET_OK;
159 } 213 }
160 if (one_shot == GNUNET_SYSERR) 214 if (one_shot == GNUNET_SYSERR)
161 { 215 {
216 /* cannot call callback again, but return value saying that
217 we have another full message in the buffer */
162 ret = GNUNET_NO; 218 ret = GNUNET_NO;
163 goto copy; 219 goto copy;
164 } 220 }
165 if (one_shot == GNUNET_YES) 221 if (one_shot == GNUNET_YES)
166 one_shot = GNUNET_SYSERR; 222 one_shot = GNUNET_SYSERR;
167 mst->cb (mst->cb_cls, mst->client_identity, &mst->hdr); 223 mst->cb (mst->cb_cls, mst->client_identity, hdr);
168 mst->off = 0; 224 mst->off += want;
225 if (mst->off == mst->pos)
226 {
227 /* reset to beginning of buffer, it's free right now! */
228 mst->off = 0;
229 mst->pos = 0;
230 }
169 } 231 }
170 while (size > 0) 232 while (size > 0)
171 { 233 {
172 if (size < sizeof (struct GNUNET_MessageHeader)) 234 if (size < sizeof (struct GNUNET_MessageHeader))
173 break; 235 break;
174 offset = (unsigned long) buf; 236 offset = (unsigned long) buf;
175#if HAVE_UNALIGNED_64_ACCESS 237 need_align = (0 != offset % ALIGN_FACTOR) ? GNUNET_YES : GNUNET_NO;
176 need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
177#else
178 need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
179#endif
180 if (GNUNET_NO == need_align) 238 if (GNUNET_NO == need_align)
181 { 239 {
182 /* can try to do zero-copy */ 240 /* can try to do zero-copy and process directly from original buffer */
183 hdr = (const struct GNUNET_MessageHeader *) buf; 241 hdr = (const struct GNUNET_MessageHeader *) buf;
184 want = ntohs (hdr->size); 242 want = ntohs (hdr->size);
185 if (size < want) 243 if (size < want)
186 break; /* or not, buffer incomplete... */ 244 break; /* or not, buffer incomplete, so copy to private buffer... */
187 if (one_shot == GNUNET_SYSERR) 245 if (one_shot == GNUNET_SYSERR)
188 { 246 {
247 /* cannot call callback again, but return value saying that
248 we have another full message in the buffer */
189 ret = GNUNET_NO; 249 ret = GNUNET_NO;
190 goto copy; 250 goto copy;
191 } 251 }
@@ -197,16 +257,17 @@ GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst,
197 } 257 }
198 else 258 else
199 { 259 {
200 /* yes, we go a bit more spagetti than usual here */ 260 /* need to copy to private buffer to align;
261 yes, we go a bit more spagetti than usual here */
201 goto do_align; 262 goto do_align;
202 } 263 }
203 } 264 }
204 copy: 265 copy:
205 if ( (size > 0) && (! purge) ) 266 if ( (size > 0) && (! purge) )
206 { 267 {
207 memcpy (&ibuf[mst->off], buf, size); 268 GNUNET_assert (mst->pos + size <= mst->maxbuf);
208 mst->off += size; 269 memcpy (&ibuf[mst->pos], buf, size);
209 size = 0; 270 mst->pos += size;
210 } 271 }
211 if (purge) 272 if (purge)
212 mst->off = 0; 273 mst->off = 0;