aboutsummaryrefslogtreecommitdiff
path: root/src/set/mq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/set/mq.c')
-rw-r--r--src/set/mq.c56
1 files changed, 34 insertions, 22 deletions
diff --git a/src/set/mq.c b/src/set/mq.c
index 236a692d4..92120a607 100644
--- a/src/set/mq.c
+++ b/src/set/mq.c
@@ -215,21 +215,25 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
215} 215}
216 216
217 217
218struct GNUNET_MQ_Message * 218int
219GNUNET_MQ_msg_concat_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, struct GNUNET_MessageHeader *m, uint16_t type) 219GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
220 const struct GNUNET_MessageHeader *m)
220{ 221{
221 struct GNUNET_MQ_Message *mq; 222 size_t new_size;
223 size_t old_size;
222 224
223 GNUNET_assert (NULL != mhp);
224 if (NULL == m) 225 if (NULL == m)
225 return GNUNET_MQ_msg_ (mhp, base_size, type); 226 return GNUNET_OK;
226 GNUNET_assert (ntohs (m->size >= sizeof (struct GNUNET_MessageHeader))); 227 GNUNET_assert (NULL != mqmp);
227 /* check for overflow */ 228 old_size = ntohs ((*mqmp)->mh->size);
228 if (base_size + ntohs (m->size) <= base_size) 229 /* message too large to concatenate? */
229 return NULL; 230 if (ntohs ((*mqmp)->mh->size) + ntohs (m->size) < ntohs (m->size))
230 mq = GNUNET_MQ_msg_ (mhp, base_size + ntohs (m->size), type); 231 return GNUNET_SYSERR;
231 memcpy (((void *) *mhp) + base_size, m, ntohs (m->size)); 232 new_size = old_size + ntohs (m->size);
232 return mq; 233 *mqmp = GNUNET_realloc (mqmp, sizeof (struct GNUNET_MQ_Message) + new_size);
234 memcpy ((*mqmp)->mh + old_size, m, new_size - old_size);
235 (*mqmp)->mh->size = htons (new_size);
236 return GNUNET_OK;
233} 237}
234 238
235 239
@@ -274,20 +278,26 @@ stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
274 return; 278 return;
275 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm); 279 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm);
276 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), 280 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
277 GNUNET_TIME_UNIT_FOREVER_REL, stream_write_queued, cls); 281 GNUNET_TIME_UNIT_FOREVER_REL,
282 stream_write_queued, mq);
278 GNUNET_assert (NULL != mss->wh); 283 GNUNET_assert (NULL != mss->wh);
279} 284}
280 285
281 286
282static void 287static void
283stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) 288stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq,
289 struct GNUNET_MQ_Message *mqm)
284{ 290{
291 struct MessageStreamState *mss = (struct MessageStreamState *) mq->impl_state;
285 if (NULL != mq->current_msg) 292 if (NULL != mq->current_msg)
286 { 293 {
287 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); 294 GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm);
288 return; 295 return;
289 } 296 }
290 stream_write_queued (mq, GNUNET_STREAM_OK, 0); 297 mq->current_msg = mqm;
298 mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size),
299 GNUNET_TIME_UNIT_FOREVER_REL,
300 stream_write_queued, mq);
291} 301}
292 302
293 303
@@ -304,7 +314,8 @@ stream_socket_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Mes
304 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing 314 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
305 */ 315 */
306static int 316static int
307stream_mst_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) 317stream_mst_callback (void *cls, void *client,
318 const struct GNUNET_MessageHeader *message)
308{ 319{
309 struct GNUNET_MQ_MessageQueue *mq = cls; 320 struct GNUNET_MQ_MessageQueue *mq = cls;
310 321
@@ -334,12 +345,14 @@ stream_data_processor (void *cls,
334 struct GNUNET_MQ_MessageQueue *mq = cls; 345 struct GNUNET_MQ_MessageQueue *mq = cls;
335 struct MessageStreamState *mss; 346 struct MessageStreamState *mss;
336 int ret; 347 int ret;
348
337 mss = (struct MessageStreamState *) mq->impl_state; 349 mss = (struct MessageStreamState *) mq->impl_state;
338
339 GNUNET_assert (GNUNET_STREAM_OK == status); 350 GNUNET_assert (GNUNET_STREAM_OK == status);
340 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); 351 ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
341 GNUNET_assert (GNUNET_OK == ret); 352 GNUNET_assert (GNUNET_OK == ret);
342 /* we always read all data */ 353 /* we always read all data */
354 mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL,
355 stream_data_processor, mq);
343 return size; 356 return size;
344} 357}
345 358
@@ -369,8 +382,7 @@ GNUNET_MQ_queue_for_stream_socket (struct GNUNET_STREAM_Socket *socket,
369} 382}
370 383
371 384
372/** 385/*** Transmit a queued message to the session's client.
373 * Transmit a queued message to the session's client.
374 * 386 *
375 * @param cls consensus session 387 * @param cls consensus session
376 * @param size number of bytes available in buf 388 * @param size number of bytes available in buf
@@ -474,7 +486,7 @@ connection_client_transmit_queued (void *cls, size_t size,
474 mq->current_msg = mq->msg_head; 486 mq->current_msg = mq->msg_head;
475 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); 487 GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg);
476 state->th = 488 state->th =
477 GNUNET_CLIENT_notify_transmit_ready (state->connection, msg_size, 489 GNUNET_CLIENT_notify_transmit_ready (state->connection, htons (mq->current_msg->mh->size),
478 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, 490 GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO,
479 &connection_client_transmit_queued, mq); 491 &connection_client_transmit_queued, mq);
480 } 492 }
@@ -483,7 +495,8 @@ connection_client_transmit_queued (void *cls, size_t size,
483 495
484 496
485static void 497static void
486connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) 498connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq,
499 struct GNUNET_MQ_Message *mqm)
487{ 500{
488 struct ClientConnectionState *state = mq->impl_state; 501 struct ClientConnectionState *state = mq->impl_state;
489 int msize; 502 int msize;
@@ -519,7 +532,6 @@ handle_client_message (void *cls,
519 struct GNUNET_MQ_MessageQueue *mq = cls; 532 struct GNUNET_MQ_MessageQueue *mq = cls;
520 533
521 GNUNET_assert (NULL != msg); 534 GNUNET_assert (NULL != msg);
522
523 dispatch_message (mq, msg); 535 dispatch_message (mq, msg);
524} 536}
525 537