diff options
author | Gabor X Toth <*@tg-x.net> | 2016-08-17 21:26:41 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2016-08-17 21:26:41 +0000 |
commit | 667cc67f8224ccf4ff391b125a614cf90cf5917e (patch) | |
tree | ae2048a6525ab2521ad989afa795d7a2f0833af6 /src/psycutil | |
parent | 720a38ea7519f5a1a820157f056b150ab3d4abd5 (diff) | |
download | gnunet-667cc67f8224ccf4ff391b125a614cf90cf5917e.tar.gz gnunet-667cc67f8224ccf4ff391b125a614cf90cf5917e.zip |
psyc, social: switch to MQ
Diffstat (limited to 'src/psycutil')
-rw-r--r-- | src/psycutil/psyc_message.c | 34 |
1 files changed, 24 insertions, 10 deletions
diff --git a/src/psycutil/psyc_message.c b/src/psycutil/psyc_message.c index 303ba8466..bc1896b1f 100644 --- a/src/psycutil/psyc_message.c +++ b/src/psycutil/psyc_message.c | |||
@@ -39,7 +39,7 @@ struct GNUNET_PSYC_TransmitHandle | |||
39 | /** | 39 | /** |
40 | * Client connection to service. | 40 | * Client connection to service. |
41 | */ | 41 | */ |
42 | struct GNUNET_CLIENT_MANAGER_Connection *client; | 42 | struct GNUNET_MQ_Handle *mq; |
43 | 43 | ||
44 | /** | 44 | /** |
45 | * Message currently being received from the client. | 45 | * Message currently being received from the client. |
@@ -47,6 +47,11 @@ struct GNUNET_PSYC_TransmitHandle | |||
47 | struct GNUNET_MessageHeader *msg; | 47 | struct GNUNET_MessageHeader *msg; |
48 | 48 | ||
49 | /** | 49 | /** |
50 | * Envelope for @a msg | ||
51 | */ | ||
52 | struct GNUNET_MQ_Envelope *env; | ||
53 | |||
54 | /** | ||
50 | * Callback to request next modifier from client. | 55 | * Callback to request next modifier from client. |
51 | */ | 56 | */ |
52 | GNUNET_PSYC_TransmitNotifyModifier notify_mod; | 57 | GNUNET_PSYC_TransmitNotifyModifier notify_mod; |
@@ -327,11 +332,11 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, | |||
327 | * Create a transmission handle. | 332 | * Create a transmission handle. |
328 | */ | 333 | */ |
329 | struct GNUNET_PSYC_TransmitHandle * | 334 | struct GNUNET_PSYC_TransmitHandle * |
330 | GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client) | 335 | GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq) |
331 | { | 336 | { |
332 | struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle); | 337 | struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle); |
333 | 338 | ||
334 | tmit->client = client; | 339 | tmit->mq = mq; |
335 | return tmit; | 340 | return tmit; |
336 | } | 341 | } |
337 | 342 | ||
@@ -378,16 +383,15 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, | |||
378 | { | 383 | { |
379 | /* End of message or buffer is full, add it to transmission queue | 384 | /* End of message or buffer is full, add it to transmission queue |
380 | * and start with empty buffer */ | 385 | * and start with empty buffer */ |
381 | tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
382 | tmit->msg->size = htons (tmit->msg->size); | 386 | tmit->msg->size = htons (tmit->msg->size); |
383 | GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg); | 387 | GNUNET_MQ_send (tmit->mq, tmit->env); |
388 | tmit->env = NULL; | ||
384 | tmit->msg = NULL; | 389 | tmit->msg = NULL; |
385 | tmit->acks_pending++; | 390 | tmit->acks_pending++; |
386 | } | 391 | } |
387 | else | 392 | else |
388 | { | 393 | { |
389 | /* Message fits in current buffer, append */ | 394 | /* Message fits in current buffer, append */ |
390 | tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size); | ||
391 | GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size); | 395 | GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size); |
392 | tmit->msg->size += size; | 396 | tmit->msg->size += size; |
393 | } | 397 | } |
@@ -396,8 +400,13 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, | |||
396 | if (NULL == tmit->msg && NULL != msg) | 400 | if (NULL == tmit->msg && NULL != msg) |
397 | { | 401 | { |
398 | /* Empty buffer, copy over message. */ | 402 | /* Empty buffer, copy over message. */ |
399 | tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size); | 403 | tmit->env = GNUNET_MQ_msg_extra (tmit->msg, |
404 | GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, | ||
405 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
406 | /* store current message size in host byte order | ||
407 | * then later switch it to network byte order before sending */ | ||
400 | tmit->msg->size = sizeof (*tmit->msg) + size; | 408 | tmit->msg->size = sizeof (*tmit->msg) + size; |
409 | |||
401 | GNUNET_memcpy (&tmit->msg[1], msg, size); | 410 | GNUNET_memcpy (&tmit->msg[1], msg, size); |
402 | } | 411 | } |
403 | 412 | ||
@@ -407,9 +416,9 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, | |||
407 | < tmit->msg->size + sizeof (struct GNUNET_MessageHeader)))) | 416 | < tmit->msg->size + sizeof (struct GNUNET_MessageHeader)))) |
408 | { | 417 | { |
409 | /* End of message or buffer is full, add it to transmission queue. */ | 418 | /* End of message or buffer is full, add it to transmission queue. */ |
410 | tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
411 | tmit->msg->size = htons (tmit->msg->size); | 419 | tmit->msg->size = htons (tmit->msg->size); |
412 | GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg); | 420 | GNUNET_MQ_send (tmit->mq, tmit->env); |
421 | tmit->env = NULL; | ||
413 | tmit->msg = NULL; | 422 | tmit->msg = NULL; |
414 | tmit->acks_pending++; | 423 | tmit->acks_pending++; |
415 | } | 424 | } |
@@ -722,7 +731,12 @@ GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit, | |||
722 | 731 | ||
723 | size_t size = strlen (method_name) + 1; | 732 | size_t size = strlen (method_name) + 1; |
724 | struct GNUNET_PSYC_MessageMethod *pmeth; | 733 | struct GNUNET_PSYC_MessageMethod *pmeth; |
725 | tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size); | 734 | |
735 | tmit->env = GNUNET_MQ_msg_extra (tmit->msg, | ||
736 | GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, | ||
737 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
738 | /* store current message size in host byte order | ||
739 | * then later switch it to network byte order before sending */ | ||
726 | tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size; | 740 | tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size; |
727 | 741 | ||
728 | if (NULL != notify_mod) | 742 | if (NULL != notify_mod) |