diff options
author | Christian Grothoff <christian@grothoff.org> | 2010-06-22 14:13:02 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2010-06-22 14:13:02 +0000 |
commit | 739ac84730e1d5f1b5affdeaedec11a986d2faa6 (patch) | |
tree | 5037c9135b89d54ccb00c735c29899a9a49f98a4 | |
parent | 7ec59f1b0753f97f02be5a5064af534adea02377 (diff) | |
download | gnunet-739ac84730e1d5f1b5affdeaedec11a986d2faa6.tar.gz gnunet-739ac84730e1d5f1b5affdeaedec11a986d2faa6.zip |
mst fixes
-rw-r--r-- | src/util/server_mst.c | 131 |
1 files changed, 96 insertions, 35 deletions
diff --git a/src/util/server_mst.c b/src/util/server_mst.c index 8df4e4b7d..27c95815a 100644 --- a/src/util/server_mst.c +++ b/src/util/server_mst.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | (C) 2009 Christian Grothoff (and other contributing authors) | 3 | (C) 2010 Christian Grothoff (and other contributing authors) |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -31,6 +31,12 @@ | |||
31 | #include "gnunet_server_lib.h" | 31 | #include "gnunet_server_lib.h" |
32 | #include "gnunet_time_lib.h" | 32 | #include "gnunet_time_lib.h" |
33 | 33 | ||
34 | #if HAVE_UNALIGNED_64_ACCESS | ||
35 | #define ALIGN_FACTOR 4 | ||
36 | #else | ||
37 | #define ALIGN_FACTOR 8 | ||
38 | #endif | ||
39 | |||
34 | 40 | ||
35 | /** | 41 | /** |
36 | * Handle to a message stream tokenizer. | 42 | * Handle to a message stream tokenizer. |
@@ -38,18 +44,38 @@ | |||
38 | struct GNUNET_SERVER_MessageStreamTokenizer | 44 | struct GNUNET_SERVER_MessageStreamTokenizer |
39 | { | 45 | { |
40 | 46 | ||
41 | size_t maxbuf; | 47 | /** |
42 | 48 | * Function to call on completed messages. | |
43 | size_t off; | 49 | */ |
50 | GNUNET_SERVER_MessageTokenizerCallback cb; | ||
51 | |||
52 | /** | ||
53 | * Closure for cb. | ||
54 | */ | ||
55 | void *cb_cls; | ||
44 | 56 | ||
57 | /** | ||
58 | * Client to pass to cb. | ||
59 | */ | ||
45 | void *client_identity; | 60 | void *client_identity; |
46 | 61 | ||
47 | GNUNET_SERVER_MessageTokenizerCallback cb; | 62 | /** |
63 | * Size of the buffer (starting at 'hdr'). | ||
64 | */ | ||
65 | size_t maxbuf; | ||
48 | 66 | ||
49 | void *cb_cls; | 67 | /** |
68 | * How many bytes in buffer have we already processed? | ||
69 | */ | ||
70 | size_t off; | ||
71 | |||
72 | /** | ||
73 | * How many bytes in buffer are valid right now? | ||
74 | */ | ||
75 | size_t pos; | ||
50 | 76 | ||
51 | /** | 77 | /** |
52 | * Beginning of the buffer. | 78 | * Beginning of the buffer. Typed like this to force alignment. |
53 | */ | 79 | */ |
54 | struct GNUNET_MessageHeader hdr; | 80 | struct GNUNET_MessageHeader hdr; |
55 | 81 | ||
@@ -64,6 +90,8 @@ struct GNUNET_SERVER_MessageStreamTokenizer | |||
64 | * GNUNET_SERVER_MAX_MESSAGE_SIZE) | 90 | * GNUNET_SERVER_MAX_MESSAGE_SIZE) |
65 | * @param client_identity ID of client for which this is a buffer, | 91 | * @param client_identity ID of client for which this is a buffer, |
66 | * can be NULL (will be passed back to 'cb') | 92 | * can be NULL (will be passed back to 'cb') |
93 | * @param cb function to call on completed messages | ||
94 | * @param cb_cls closure for cb | ||
67 | * @return handle to tokenizer | 95 | * @return handle to tokenizer |
68 | */ | 96 | */ |
69 | struct GNUNET_SERVER_MessageStreamTokenizer * | 97 | struct GNUNET_SERVER_MessageStreamTokenizer * |
@@ -106,7 +134,7 @@ GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, | |||
106 | { | 134 | { |
107 | const struct GNUNET_MessageHeader *hdr; | 135 | const struct GNUNET_MessageHeader *hdr; |
108 | size_t delta; | 136 | size_t delta; |
109 | size_t want; | 137 | uint16_t want; |
110 | char *ibuf; | 138 | char *ibuf; |
111 | int need_align; | 139 | int need_align; |
112 | unsigned long offset; | 140 | unsigned long offset; |
@@ -114,78 +142,110 @@ GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, | |||
114 | 142 | ||
115 | ret = GNUNET_OK; | 143 | ret = GNUNET_OK; |
116 | ibuf = (char*) &mst->hdr; | 144 | ibuf = (char*) &mst->hdr; |
117 | if (mst->off > 0) | 145 | if (mst->pos > 0) |
118 | { | 146 | { |
119 | do_align: | 147 | do_align: |
120 | if (mst->off < sizeof (struct GNUNET_MessageHeader)) | 148 | if ( (mst->maxbuf - mst->off < sizeof (struct GNUNET_MessageHeader)) || |
149 | (0 != (mst->off % ALIGN_FACTOR)) ) | ||
150 | { | ||
151 | /* need to align or need more space */ | ||
152 | mst->pos -= mst->off; | ||
153 | memmove (ibuf, | ||
154 | &ibuf[mst->off], | ||
155 | mst->pos); | ||
156 | mst->off = 0; | ||
157 | } | ||
158 | if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) | ||
121 | { | 159 | { |
122 | delta = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - mst->off, | 160 | delta = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - (mst->pos - mst->off), |
123 | size); | 161 | size); |
124 | memcpy (&ibuf[mst->off], | 162 | memcpy (&ibuf[mst->pos], |
125 | buf, | 163 | buf, |
126 | delta); | 164 | delta); |
127 | mst->off += delta; | 165 | mst->pos += delta; |
128 | buf += delta; | 166 | buf += delta; |
129 | size -= delta; | 167 | size -= delta; |
130 | } | 168 | } |
131 | if (mst->off < sizeof (struct GNUNET_MessageHeader)) | 169 | if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) |
132 | { | 170 | { |
133 | if (purge) | 171 | if (purge) |
134 | mst->off = 0; | 172 | { |
173 | mst->off = 0; | ||
174 | mst->pos = 0; | ||
175 | } | ||
135 | return GNUNET_OK; | 176 | return GNUNET_OK; |
136 | } | 177 | } |
137 | want = ntohs (mst->hdr.size); | 178 | hdr = (const struct GNUNET_MessageHeader*) &ibuf[mst->off]; |
179 | want = ntohs (hdr->size); | ||
138 | if (want < sizeof (struct GNUNET_MessageHeader)) | 180 | if (want < sizeof (struct GNUNET_MessageHeader)) |
139 | { | 181 | { |
140 | GNUNET_break_op (0); | 182 | GNUNET_break_op (0); |
141 | return GNUNET_SYSERR; | 183 | return GNUNET_SYSERR; |
142 | } | 184 | } |
143 | if (want < mst->off) | 185 | if (mst->maxbuf - mst->off < want) |
144 | { | 186 | { |
145 | delta = GNUNET_MIN (want - mst->off, | 187 | /* need more space */ |
188 | mst->pos -= mst->off; | ||
189 | memmove (ibuf, | ||
190 | &ibuf[mst->off], | ||
191 | mst->pos); | ||
192 | mst->off = 0; | ||
193 | } | ||
194 | if (mst->pos - mst->off < want) | ||
195 | { | ||
196 | delta = GNUNET_MIN (want - (mst->pos - mst->off), | ||
146 | size); | 197 | size); |
147 | memcpy (&ibuf[mst->off], | 198 | memcpy (&ibuf[mst->pos], |
148 | buf, | 199 | buf, |
149 | delta); | 200 | delta); |
150 | mst->off += delta; | 201 | mst->pos += delta; |
151 | buf += delta; | 202 | buf += delta; |
152 | size -= delta; | 203 | size -= delta; |
153 | } | 204 | } |
154 | if (want < mst->off) | 205 | if (mst->pos - mst->off < want) |
155 | { | 206 | { |
156 | if (purge) | 207 | if (purge) |
157 | mst->off = 0; | 208 | { |
209 | mst->off = 0; | ||
210 | mst->pos = 0; | ||
211 | } | ||
158 | return GNUNET_OK; | 212 | return GNUNET_OK; |
159 | } | 213 | } |
160 | if (one_shot == GNUNET_SYSERR) | 214 | if (one_shot == GNUNET_SYSERR) |
161 | { | 215 | { |
216 | /* cannot call callback again, but return value saying that | ||
217 | we have another full message in the buffer */ | ||
162 | ret = GNUNET_NO; | 218 | ret = GNUNET_NO; |
163 | goto copy; | 219 | goto copy; |
164 | } | 220 | } |
165 | if (one_shot == GNUNET_YES) | 221 | if (one_shot == GNUNET_YES) |
166 | one_shot = GNUNET_SYSERR; | 222 | one_shot = GNUNET_SYSERR; |
167 | mst->cb (mst->cb_cls, mst->client_identity, &mst->hdr); | 223 | mst->cb (mst->cb_cls, mst->client_identity, hdr); |
168 | mst->off = 0; | 224 | mst->off += want; |
225 | if (mst->off == mst->pos) | ||
226 | { | ||
227 | /* reset to beginning of buffer, it's free right now! */ | ||
228 | mst->off = 0; | ||
229 | mst->pos = 0; | ||
230 | } | ||
169 | } | 231 | } |
170 | while (size > 0) | 232 | while (size > 0) |
171 | { | 233 | { |
172 | if (size < sizeof (struct GNUNET_MessageHeader)) | 234 | if (size < sizeof (struct GNUNET_MessageHeader)) |
173 | break; | 235 | break; |
174 | offset = (unsigned long) buf; | 236 | offset = (unsigned long) buf; |
175 | #if HAVE_UNALIGNED_64_ACCESS | 237 | need_align = (0 != offset % ALIGN_FACTOR) ? GNUNET_YES : GNUNET_NO; |
176 | need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO; | ||
177 | #else | ||
178 | need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO; | ||
179 | #endif | ||
180 | if (GNUNET_NO == need_align) | 238 | if (GNUNET_NO == need_align) |
181 | { | 239 | { |
182 | /* can try to do zero-copy */ | 240 | /* can try to do zero-copy and process directly from original buffer */ |
183 | hdr = (const struct GNUNET_MessageHeader *) buf; | 241 | hdr = (const struct GNUNET_MessageHeader *) buf; |
184 | want = ntohs (hdr->size); | 242 | want = ntohs (hdr->size); |
185 | if (size < want) | 243 | if (size < want) |
186 | break; /* or not, buffer incomplete... */ | 244 | break; /* or not, buffer incomplete, so copy to private buffer... */ |
187 | if (one_shot == GNUNET_SYSERR) | 245 | if (one_shot == GNUNET_SYSERR) |
188 | { | 246 | { |
247 | /* cannot call callback again, but return value saying that | ||
248 | we have another full message in the buffer */ | ||
189 | ret = GNUNET_NO; | 249 | ret = GNUNET_NO; |
190 | goto copy; | 250 | goto copy; |
191 | } | 251 | } |
@@ -197,16 +257,17 @@ GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, | |||
197 | } | 257 | } |
198 | else | 258 | else |
199 | { | 259 | { |
200 | /* yes, we go a bit more spagetti than usual here */ | 260 | /* need to copy to private buffer to align; |
261 | yes, we go a bit more spagetti than usual here */ | ||
201 | goto do_align; | 262 | goto do_align; |
202 | } | 263 | } |
203 | } | 264 | } |
204 | copy: | 265 | copy: |
205 | if ( (size > 0) && (! purge) ) | 266 | if ( (size > 0) && (! purge) ) |
206 | { | 267 | { |
207 | memcpy (&ibuf[mst->off], buf, size); | 268 | GNUNET_assert (mst->pos + size <= mst->maxbuf); |
208 | mst->off += size; | 269 | memcpy (&ibuf[mst->pos], buf, size); |
209 | size = 0; | 270 | mst->pos += size; |
210 | } | 271 | } |
211 | if (purge) | 272 | if (purge) |
212 | mst->off = 0; | 273 | mst->off = 0; |