aboutsummaryrefslogtreecommitdiff
path: root/src/psycutil
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2016-08-17 21:26:41 +0000
committerGabor X Toth <*@tg-x.net>2016-08-17 21:26:41 +0000
commit667cc67f8224ccf4ff391b125a614cf90cf5917e (patch)
treeae2048a6525ab2521ad989afa795d7a2f0833af6 /src/psycutil
parent720a38ea7519f5a1a820157f056b150ab3d4abd5 (diff)
downloadgnunet-667cc67f8224ccf4ff391b125a614cf90cf5917e.tar.gz
gnunet-667cc67f8224ccf4ff391b125a614cf90cf5917e.zip
psyc, social: switch to MQ
Diffstat (limited to 'src/psycutil')
-rw-r--r--src/psycutil/psyc_message.c34
1 files changed, 24 insertions, 10 deletions
diff --git a/src/psycutil/psyc_message.c b/src/psycutil/psyc_message.c
index 303ba8466..bc1896b1f 100644
--- a/src/psycutil/psyc_message.c
+++ b/src/psycutil/psyc_message.c
@@ -39,7 +39,7 @@ struct GNUNET_PSYC_TransmitHandle
39 /** 39 /**
40 * Client connection to service. 40 * Client connection to service.
41 */ 41 */
42 struct GNUNET_CLIENT_MANAGER_Connection *client; 42 struct GNUNET_MQ_Handle *mq;
43 43
44 /** 44 /**
45 * Message currently being received from the client. 45 * Message currently being received from the client.
@@ -47,6 +47,11 @@ struct GNUNET_PSYC_TransmitHandle
47 struct GNUNET_MessageHeader *msg; 47 struct GNUNET_MessageHeader *msg;
48 48
49 /** 49 /**
50 * Envelope for @a msg
51 */
52 struct GNUNET_MQ_Envelope *env;
53
54 /**
50 * Callback to request next modifier from client. 55 * Callback to request next modifier from client.
51 */ 56 */
52 GNUNET_PSYC_TransmitNotifyModifier notify_mod; 57 GNUNET_PSYC_TransmitNotifyModifier notify_mod;
@@ -327,11 +332,11 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
327 * Create a transmission handle. 332 * Create a transmission handle.
328 */ 333 */
329struct GNUNET_PSYC_TransmitHandle * 334struct GNUNET_PSYC_TransmitHandle *
330GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client) 335GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq)
331{ 336{
332 struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle); 337 struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle);
333 338
334 tmit->client = client; 339 tmit->mq = mq;
335 return tmit; 340 return tmit;
336} 341}
337 342
@@ -378,16 +383,15 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
378 { 383 {
379 /* End of message or buffer is full, add it to transmission queue 384 /* End of message or buffer is full, add it to transmission queue
380 * and start with empty buffer */ 385 * and start with empty buffer */
381 tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
382 tmit->msg->size = htons (tmit->msg->size); 386 tmit->msg->size = htons (tmit->msg->size);
383 GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg); 387 GNUNET_MQ_send (tmit->mq, tmit->env);
388 tmit->env = NULL;
384 tmit->msg = NULL; 389 tmit->msg = NULL;
385 tmit->acks_pending++; 390 tmit->acks_pending++;
386 } 391 }
387 else 392 else
388 { 393 {
389 /* Message fits in current buffer, append */ 394 /* Message fits in current buffer, append */
390 tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
391 GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size); 395 GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
392 tmit->msg->size += size; 396 tmit->msg->size += size;
393 } 397 }
@@ -396,8 +400,13 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
396 if (NULL == tmit->msg && NULL != msg) 400 if (NULL == tmit->msg && NULL != msg)
397 { 401 {
398 /* Empty buffer, copy over message. */ 402 /* Empty buffer, copy over message. */
399 tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size); 403 tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
404 GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
405 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
406 /* store current message size in host byte order
407 * then later switch it to network byte order before sending */
400 tmit->msg->size = sizeof (*tmit->msg) + size; 408 tmit->msg->size = sizeof (*tmit->msg) + size;
409
401 GNUNET_memcpy (&tmit->msg[1], msg, size); 410 GNUNET_memcpy (&tmit->msg[1], msg, size);
402 } 411 }
403 412
@@ -407,9 +416,9 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
407 < tmit->msg->size + sizeof (struct GNUNET_MessageHeader)))) 416 < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
408 { 417 {
409 /* End of message or buffer is full, add it to transmission queue. */ 418 /* End of message or buffer is full, add it to transmission queue. */
410 tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
411 tmit->msg->size = htons (tmit->msg->size); 419 tmit->msg->size = htons (tmit->msg->size);
412 GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg); 420 GNUNET_MQ_send (tmit->mq, tmit->env);
421 tmit->env = NULL;
413 tmit->msg = NULL; 422 tmit->msg = NULL;
414 tmit->acks_pending++; 423 tmit->acks_pending++;
415 } 424 }
@@ -722,7 +731,12 @@ GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
722 731
723 size_t size = strlen (method_name) + 1; 732 size_t size = strlen (method_name) + 1;
724 struct GNUNET_PSYC_MessageMethod *pmeth; 733 struct GNUNET_PSYC_MessageMethod *pmeth;
725 tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size); 734
735 tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
736 GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
737 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
738 /* store current message size in host byte order
739 * then later switch it to network byte order before sending */
726 tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size; 740 tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
727 741
728 if (NULL != notify_mod) 742 if (NULL != notify_mod)