diff options
Diffstat (limited to 'src/set/mq.c')
-rw-r--r-- | src/set/mq.c | 56 |
1 files changed, 34 insertions, 22 deletions
diff --git a/src/set/mq.c b/src/set/mq.c index 236a692d4..92120a607 100644 --- a/src/set/mq.c +++ b/src/set/mq.c | |||
@@ -215,21 +215,25 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) | |||
215 | } | 215 | } |
216 | 216 | ||
217 | 217 | ||
218 | struct GNUNET_MQ_Message * | 218 | int |
219 | GNUNET_MQ_msg_concat_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, struct GNUNET_MessageHeader *m, uint16_t type) | 219 | GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp, |
220 | const struct GNUNET_MessageHeader *m) | ||
220 | { | 221 | { |
221 | struct GNUNET_MQ_Message *mq; | 222 | size_t new_size; |
223 | size_t old_size; | ||
222 | 224 | ||
223 | GNUNET_assert (NULL != mhp); | ||
224 | if (NULL == m) | 225 | if (NULL == m) |
225 | return GNUNET_MQ_msg_ (mhp, base_size, type); | 226 | return GNUNET_OK; |
226 | GNUNET_assert (ntohs (m->size >= sizeof (struct GNUNET_MessageHeader))); | 227 | GNUNET_assert (NULL != mqmp); |
227 | /* check for overflow */ | 228 | old_size = ntohs ((*mqmp)->mh->size); |
228 | if (base_size + ntohs (m->size) <= base_size) | 229 | /* message too large to concatenate? */ |
229 | return NULL; | 230 | if (ntohs ((*mqmp)->mh->size) + ntohs (m->size) < ntohs (m->size)) |
230 | mq = GNUNET_MQ_msg_ (mhp, base_size + ntohs (m->size), type); | 231 | return GNUNET_SYSERR; |
231 | memcpy (((void *) *mhp) + base_size, m, ntohs (m->size)); | 232 | new_size = old_size + ntohs (m->size); |
232 | return mq; | 233 | *mqmp = GNUNET_realloc (mqmp, sizeof (struct GNUNET_MQ_Message) + new_size); |
234 | memcpy ((*mqmp)->mh + old_size, m, new_size - old_size); | ||
235 | (*mqmp)->mh->size = htons (new_size); | ||
236 | return GNUNET_OK; | ||
233 | } | 237 | } |
234 | 238 | ||
235 | 239 | ||
@@ -274,20 +278,26 @@ stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
274 | return; | 278 | return; |
275 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm); | 279 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm); |
276 | mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), | 280 | mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), |
277 | GNUNET_TIME_UNIT_FOREVER_REL, stream_write_queued, cls); | 281 | GNUNET_TIME_UNIT_FOREVER_REL, |
282 | stream_write_queued, mq); | ||
278 | GNUNET_assert (NULL != mss->wh); | 283 | GNUNET_assert (NULL != mss->wh); |
279 | } | 284 | } |
280 | 285 | ||
281 | 286 | ||
282 | static void | 287 | static void |
283 | stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) | 288 | stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq, |
289 | struct GNUNET_MQ_Message *mqm) | ||
284 | { | 290 | { |
291 | struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state; | ||
285 | if (NULL != mq->current_msg) | 292 | if (NULL != mq->current_msg) |
286 | { | 293 | { |
287 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); | 294 | GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); |
288 | return; | 295 | return; |
289 | } | 296 | } |
290 | stream_write_queued (mq, GNUNET_STREAM_OK, 0); | 297 | mq->current_msg = mqm; |
298 | mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), | ||
299 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
300 | stream_write_queued, mq); | ||
291 | } | 301 | } |
292 | 302 | ||
293 | 303 | ||
@@ -304,7 +314,8 @@ stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Mes | |||
304 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing | 314 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing |
305 | */ | 315 | */ |
306 | static int | 316 | static int |
307 | stream_mst_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) | 317 | stream_mst_callback (void *cls, void *client, |
318 | const struct GNUNET_MessageHeader *message) | ||
308 | { | 319 | { |
309 | struct GNUNET_MQ_MessageQueue *mq = cls; | 320 | struct GNUNET_MQ_MessageQueue *mq = cls; |
310 | 321 | ||
@@ -334,12 +345,14 @@ stream_data_processor (void *cls, | |||
334 | struct GNUNET_MQ_MessageQueue *mq = cls; | 345 | struct GNUNET_MQ_MessageQueue *mq = cls; |
335 | struct MessageStreamState *mss; | 346 | struct MessageStreamState *mss; |
336 | int ret; | 347 | int ret; |
348 | |||
337 | mss = (struct MessageStreamState *) mq->impl_state; | 349 | mss = (struct MessageStreamState *) mq->impl_state; |
338 | |||
339 | GNUNET_assert (GNUNET_STREAM_OK == status); | 350 | GNUNET_assert (GNUNET_STREAM_OK == status); |
340 | ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); | 351 | ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); |
341 | GNUNET_assert (GNUNET_OK == ret); | 352 | GNUNET_assert (GNUNET_OK == ret); |
342 | /* we always read all data */ | 353 | /* we always read all data */ |
354 | mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
355 | stream_data_processor, mq); | ||
343 | return size; | 356 | return size; |
344 | } | 357 | } |
345 | 358 | ||
@@ -369,8 +382,7 @@ GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket, | |||
369 | } | 382 | } |
370 | 383 | ||
371 | 384 | ||
372 | /** | 385 | /*** Transmit a queued message to the session's client. |
373 | * Transmit a queued message to the session's client. | ||
374 | * | 386 | * |
375 | * @param cls consensus session | 387 | * @param cls consensus session |
376 | * @param size number of bytes available in buf | 388 | * @param size number of bytes available in buf |
@@ -474,7 +486,7 @@ connection_client_transmit_queued (void *cls, size_t size, | |||
474 | mq->current_msg = mq->msg_head; | 486 | mq->current_msg = mq->msg_head; |
475 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); | 487 | GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); |
476 | state->th = | 488 | state->th = |
477 | GNUNET_CLIENT_notify_transmit_ready (state->connection, msg_size, | 489 | GNUNET_CLIENT_notify_transmit_ready (state->connection, htons (mq->current_msg->mh->size), |
478 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, | 490 | GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, |
479 | &connection_client_transmit_queued, mq); | 491 | &connection_client_transmit_queued, mq); |
480 | } | 492 | } |
@@ -483,7 +495,8 @@ connection_client_transmit_queued (void *cls, size_t size, | |||
483 | 495 | ||
484 | 496 | ||
485 | static void | 497 | static void |
486 | connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) | 498 | connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, |
499 | struct GNUNET_MQ_Message *mqm) | ||
487 | { | 500 | { |
488 | struct ClientConnectionState *state = mq->impl_state; | 501 | struct ClientConnectionState *state = mq->impl_state; |
489 | int msize; | 502 | int msize; |
@@ -519,7 +532,6 @@ handle_client_message (void *cls, | |||
519 | struct GNUNET_MQ_MessageQueue *mq = cls; | 532 | struct GNUNET_MQ_MessageQueue *mq = cls; |
520 | 533 | ||
521 | GNUNET_assert (NULL != msg); | 534 | GNUNET_assert (NULL != msg); |
522 | |||
523 | dispatch_message (mq, msg); | 535 | dispatch_message (mq, msg); |
524 | } | 536 | } |
525 | 537 | ||