diff options
author | Gabor X Toth <*@tg-x.net> | 2016-08-17 21:26:41 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2016-08-17 21:26:41 +0000 |
commit | 667cc67f8224ccf4ff391b125a614cf90cf5917e (patch) | |
tree | ae2048a6525ab2521ad989afa795d7a2f0833af6 /src | |
parent | 720a38ea7519f5a1a820157f056b150ab3d4abd5 (diff) | |
download | gnunet-667cc67f8224ccf4ff391b125a614cf90cf5917e.tar.gz gnunet-667cc67f8224ccf4ff391b125a614cf90cf5917e.zip |
psyc, social: switch to MQ
Diffstat (limited to 'src')
-rw-r--r-- | src/include/gnunet_psyc_message.h | 2 | ||||
-rw-r--r-- | src/multicast/multicast_api.c | 25 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 662 | ||||
-rw-r--r-- | src/psycstore/psycstore_api.c | 1 | ||||
-rw-r--r-- | src/psycutil/psyc_message.c | 34 | ||||
-rw-r--r-- | src/social/social_api.c | 1080 |
6 files changed, 1063 insertions, 741 deletions
diff --git a/src/include/gnunet_psyc_message.h b/src/include/gnunet_psyc_message.h index 42ff0ea86..e6337d093 100644 --- a/src/include/gnunet_psyc_message.h +++ b/src/include/gnunet_psyc_message.h | |||
@@ -107,7 +107,7 @@ struct GNUNET_PSYC_TransmitHandle; | |||
107 | * Create a transmission handle. | 107 | * Create a transmission handle. |
108 | */ | 108 | */ |
109 | struct GNUNET_PSYC_TransmitHandle * | 109 | struct GNUNET_PSYC_TransmitHandle * |
110 | GNUNET_PSYC_transmit_create (); | 110 | GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq); |
111 | 111 | ||
112 | 112 | ||
113 | /** | 113 | /** |
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index 6fb45d722..89a9bf5e1 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c | |||
@@ -76,6 +76,11 @@ struct GNUNET_MULTICAST_Group | |||
76 | struct GNUNET_MQ_Handle *mq; | 76 | struct GNUNET_MQ_Handle *mq; |
77 | 77 | ||
78 | /** | 78 | /** |
79 | * Message to send on connect. | ||
80 | */ | ||
81 | struct GNUNET_MQ_Envelope *connect_env; | ||
82 | |||
83 | /** | ||
79 | * Time to wait until we try to reconnect on failure. | 84 | * Time to wait until we try to reconnect on failure. |
80 | */ | 85 | */ |
81 | struct GNUNET_TIME_Relative reconnect_delay; | 86 | struct GNUNET_TIME_Relative reconnect_delay; |
@@ -85,11 +90,6 @@ struct GNUNET_MULTICAST_Group | |||
85 | */ | 90 | */ |
86 | struct GNUNET_SCHEDULER_Task *reconnect_task; | 91 | struct GNUNET_SCHEDULER_Task *reconnect_task; |
87 | 92 | ||
88 | /** | ||
89 | * Message to send on connect. | ||
90 | */ | ||
91 | struct GNUNET_MQ_Envelope *connect_env; | ||
92 | |||
93 | GNUNET_MULTICAST_JoinRequestCallback join_req_cb; | 93 | GNUNET_MULTICAST_JoinRequestCallback join_req_cb; |
94 | GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb; | 94 | GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb; |
95 | GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb; | 95 | GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb; |
@@ -522,7 +522,7 @@ handle_member_join_decision (void *cls, | |||
522 | static void | 522 | static void |
523 | group_cleanup (struct GNUNET_MULTICAST_Group *grp) | 523 | group_cleanup (struct GNUNET_MULTICAST_Group *grp) |
524 | { | 524 | { |
525 | GNUNET_free (grp->connect_env); | 525 | GNUNET_MQ_discard (grp->connect_env); |
526 | if (NULL != grp->disconnect_cb) | 526 | if (NULL != grp->disconnect_cb) |
527 | grp->disconnect_cb (grp->disconnect_cls); | 527 | grp->disconnect_cb (grp->disconnect_cls); |
528 | } | 528 | } |
@@ -724,7 +724,7 @@ origin_disconnected (void *cls, enum GNUNET_MQ_Error error) | |||
724 | } | 724 | } |
725 | 725 | ||
726 | grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, | 726 | grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, |
727 | &origin_reconnect, | 727 | origin_reconnect, |
728 | orig); | 728 | orig); |
729 | grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); | 729 | grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); |
730 | } | 730 | } |
@@ -829,10 +829,11 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
829 | grp->connect_env = GNUNET_MQ_msg (start, | 829 | grp->connect_env = GNUNET_MQ_msg (start, |
830 | GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START); | 830 | GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START); |
831 | start->max_fragment_id = max_fragment_id; | 831 | start->max_fragment_id = max_fragment_id; |
832 | GNUNET_memcpy (&start->group_key, priv_key, sizeof (*priv_key)); | 832 | start->group_key = *priv_key; |
833 | 833 | ||
834 | grp->is_origin = GNUNET_YES; | ||
835 | grp->cfg = cfg; | 834 | grp->cfg = cfg; |
835 | grp->is_origin = GNUNET_YES; | ||
836 | grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; | ||
836 | 837 | ||
837 | grp->cb_cls = cls; | 838 | grp->cb_cls = cls; |
838 | grp->join_req_cb = join_request_cb; | 839 | grp->join_req_cb = join_request_cb; |
@@ -1024,7 +1025,7 @@ member_disconnected (void *cls, enum GNUNET_MQ_Error error) | |||
1024 | grp->mq = NULL; | 1025 | grp->mq = NULL; |
1025 | 1026 | ||
1026 | grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, | 1027 | grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, |
1027 | &member_reconnect, | 1028 | member_reconnect, |
1028 | mem); | 1029 | mem); |
1029 | grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); | 1030 | grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); |
1030 | } | 1031 | } |
@@ -1162,9 +1163,9 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
1162 | if (0 < join_msg_size) | 1163 | if (0 < join_msg_size) |
1163 | GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); | 1164 | GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); |
1164 | 1165 | ||
1165 | grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; | ||
1166 | grp->is_origin = GNUNET_NO; | ||
1167 | grp->cfg = cfg; | 1166 | grp->cfg = cfg; |
1167 | grp->is_origin = GNUNET_NO; | ||
1168 | grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; | ||
1168 | 1169 | ||
1169 | mem->join_dcsn_cb = join_decision_cb; | 1170 | mem->join_dcsn_cb = join_decision_cb; |
1170 | grp->join_req_cb = join_request_cb; | 1171 | grp->join_req_cb = join_request_cb; |
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 515a2731a..2f6a15bab 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -55,7 +55,27 @@ struct GNUNET_PSYC_Channel | |||
55 | /** | 55 | /** |
56 | * Client connection to the service. | 56 | * Client connection to the service. |
57 | */ | 57 | */ |
58 | struct GNUNET_CLIENT_MANAGER_Connection *client; | 58 | struct GNUNET_MQ_Handle *mq; |
59 | |||
60 | /** | ||
61 | * Message to send on connect. | ||
62 | */ | ||
63 | struct GNUNET_MQ_Envelope *connect_env; | ||
64 | |||
65 | /** | ||
66 | * Time to wait until we try to reconnect on failure. | ||
67 | */ | ||
68 | struct GNUNET_TIME_Relative reconnect_delay; | ||
69 | |||
70 | /** | ||
71 | * Task for reconnecting when the listener fails. | ||
72 | */ | ||
73 | struct GNUNET_SCHEDULER_Task *reconnect_task; | ||
74 | |||
75 | /** | ||
76 | * Async operations. | ||
77 | */ | ||
78 | struct GNUNET_OP_Handle *op; | ||
59 | 79 | ||
60 | /** | 80 | /** |
61 | * Transmission handle; | 81 | * Transmission handle; |
@@ -68,11 +88,6 @@ struct GNUNET_PSYC_Channel | |||
68 | struct GNUNET_PSYC_ReceiveHandle *recv; | 88 | struct GNUNET_PSYC_ReceiveHandle *recv; |
69 | 89 | ||
70 | /** | 90 | /** |
71 | * Message to send on reconnect. | ||
72 | */ | ||
73 | struct GNUNET_MessageHeader *connect_msg; | ||
74 | |||
75 | /** | ||
76 | * Function called after disconnected from the service. | 91 | * Function called after disconnected from the service. |
77 | */ | 92 | */ |
78 | GNUNET_ContinuationCallback disconnect_cb; | 93 | GNUNET_ContinuationCallback disconnect_cb; |
@@ -219,41 +234,21 @@ struct GNUNET_PSYC_StateRequest | |||
219 | }; | 234 | }; |
220 | 235 | ||
221 | 236 | ||
222 | static void | 237 | static int |
223 | channel_send_connect_msg (struct GNUNET_PSYC_Channel *chn) | 238 | check_channel_result (void *cls, |
224 | { | 239 | const struct GNUNET_OperationResultMessage *res) |
225 | uint16_t cmsg_size = ntohs (chn->connect_msg->size); | ||
226 | struct GNUNET_MessageHeader *cmsg = GNUNET_malloc (cmsg_size); | ||
227 | GNUNET_memcpy (cmsg, chn->connect_msg, cmsg_size); | ||
228 | GNUNET_CLIENT_MANAGER_transmit_now (chn->client, cmsg); | ||
229 | GNUNET_free (cmsg); | ||
230 | } | ||
231 | |||
232 | |||
233 | static void | ||
234 | channel_recv_disconnect (void *cls, | ||
235 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
236 | const struct GNUNET_MessageHeader *msg) | ||
237 | { | 240 | { |
238 | struct GNUNET_PSYC_Channel * | 241 | return GNUNET_OK; |
239 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | ||
240 | GNUNET_CLIENT_MANAGER_reconnect (client); | ||
241 | channel_send_connect_msg (chn); | ||
242 | } | 242 | } |
243 | 243 | ||
244 | 244 | ||
245 | static void | 245 | static void |
246 | channel_recv_result (void *cls, | 246 | handle_channel_result (void *cls, |
247 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 247 | const struct GNUNET_OperationResultMessage *res) |
248 | const struct GNUNET_MessageHeader *msg) | ||
249 | { | 248 | { |
250 | struct GNUNET_PSYC_Channel * | 249 | struct GNUNET_PSYC_Channel *chn = cls; |
251 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | ||
252 | 250 | ||
253 | const struct GNUNET_OperationResultMessage * | 251 | uint16_t size = ntohs (res->header.size); |
254 | res = (const struct GNUNET_OperationResultMessage *) msg; | ||
255 | |||
256 | uint16_t size = ntohs (msg->size); | ||
257 | if (size < sizeof (*res)) | 252 | if (size < sizeof (*res)) |
258 | { /* Error, message too small. */ | 253 | { /* Error, message too small. */ |
259 | GNUNET_break (0); | 254 | GNUNET_break (0); |
@@ -262,9 +257,9 @@ channel_recv_result (void *cls, | |||
262 | 257 | ||
263 | uint16_t data_size = size - sizeof (*res); | 258 | uint16_t data_size = size - sizeof (*res); |
264 | const char *data = (0 < data_size) ? (void *) &res[1] : NULL; | 259 | const char *data = (0 < data_size) ? (void *) &res[1] : NULL; |
265 | GNUNET_CLIENT_MANAGER_op_result (chn->client, GNUNET_ntohll (res->op_id), | 260 | GNUNET_OP_result (chn->op, GNUNET_ntohll (res->op_id), |
266 | GNUNET_ntohll (res->result_code), | 261 | GNUNET_ntohll (res->result_code), |
267 | data, data_size); | 262 | data, data_size, NULL); |
268 | } | 263 | } |
269 | 264 | ||
270 | 265 | ||
@@ -301,18 +296,30 @@ op_recv_state_result (void *cls, int64_t result, | |||
301 | } | 296 | } |
302 | 297 | ||
303 | 298 | ||
304 | static void | 299 | static int |
305 | channel_recv_history_result (void *cls, | 300 | check_channel_history_result (void *cls, |
306 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 301 | const struct GNUNET_OperationResultMessage *res) |
307 | const struct GNUNET_MessageHeader *msg) | ||
308 | { | 302 | { |
309 | struct GNUNET_PSYC_Channel * | 303 | struct GNUNET_PSYC_MessageHeader * |
310 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | 304 | pmsg = (struct GNUNET_PSYC_MessageHeader *) GNUNET_MQ_extract_nested_mh (res); |
305 | uint16_t size = ntohs (res->header.size); | ||
311 | 306 | ||
312 | const struct GNUNET_OperationResultMessage * | 307 | if (NULL == pmsg || size < sizeof (*res) + sizeof (*pmsg)) |
313 | res = (const struct GNUNET_OperationResultMessage *) msg; | 308 | { /* Error, message too small. */ |
309 | GNUNET_break_op (0); | ||
310 | return GNUNET_SYSERR; | ||
311 | } | ||
312 | return GNUNET_OK; | ||
313 | } | ||
314 | |||
315 | |||
316 | static void | ||
317 | handle_channel_history_result (void *cls, | ||
318 | const struct GNUNET_OperationResultMessage *res) | ||
319 | { | ||
320 | struct GNUNET_PSYC_Channel *chn = cls; | ||
314 | struct GNUNET_PSYC_MessageHeader * | 321 | struct GNUNET_PSYC_MessageHeader * |
315 | pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1]; | 322 | pmsg = (struct GNUNET_PSYC_MessageHeader *) GNUNET_MQ_extract_nested_mh (res); |
316 | 323 | ||
317 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 324 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
318 | "%p Received historic fragment for message #%" PRIu64 ".\n", | 325 | "%p Received historic fragment for message #%" PRIu64 ".\n", |
@@ -321,9 +328,9 @@ channel_recv_history_result (void *cls, | |||
321 | GNUNET_ResultCallback result_cb = NULL; | 328 | GNUNET_ResultCallback result_cb = NULL; |
322 | struct GNUNET_PSYC_HistoryRequest *hist = NULL; | 329 | struct GNUNET_PSYC_HistoryRequest *hist = NULL; |
323 | 330 | ||
324 | if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client, | 331 | if (GNUNET_YES != GNUNET_OP_get (chn->op, |
325 | GNUNET_ntohll (res->op_id), | 332 | GNUNET_ntohll (res->op_id), |
326 | &result_cb, (void *) &hist)) | 333 | &result_cb, (void *) &hist, NULL)) |
327 | { /* Operation not found. */ | 334 | { /* Operation not found. */ |
328 | LOG (GNUNET_ERROR_TYPE_WARNING, | 335 | LOG (GNUNET_ERROR_TYPE_WARNING, |
329 | "%p Replay operation not found for historic fragment of message #%" | 336 | "%p Replay operation not found for historic fragment of message #%" |
@@ -332,47 +339,47 @@ channel_recv_history_result (void *cls, | |||
332 | return; | 339 | return; |
333 | } | 340 | } |
334 | 341 | ||
335 | uint16_t size = ntohs (msg->size); | ||
336 | if (size < sizeof (*res) + sizeof (*pmsg)) | ||
337 | { /* Error, message too small. */ | ||
338 | GNUNET_break (0); | ||
339 | return; | ||
340 | } | ||
341 | |||
342 | GNUNET_PSYC_receive_message (hist->recv, | 342 | GNUNET_PSYC_receive_message (hist->recv, |
343 | (const struct GNUNET_PSYC_MessageHeader *) pmsg); | 343 | (const struct GNUNET_PSYC_MessageHeader *) pmsg); |
344 | } | 344 | } |
345 | 345 | ||
346 | 346 | ||
347 | static void | 347 | static int |
348 | channel_recv_state_result (void *cls, | 348 | check_channel_state_result (void *cls, |
349 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 349 | const struct GNUNET_OperationResultMessage *res) |
350 | const struct GNUNET_MessageHeader *msg) | ||
351 | { | 350 | { |
352 | struct GNUNET_PSYC_Channel * | 351 | const struct GNUNET_MessageHeader *mod = GNUNET_MQ_extract_nested_mh (res); |
353 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | 352 | uint16_t mod_size = ntohs (mod->size); |
353 | uint16_t size = ntohs (res->header.size); | ||
354 | |||
355 | if (NULL == mod || size - sizeof (*res) != mod_size) | ||
356 | { | ||
357 | GNUNET_break_op (0); | ||
358 | return GNUNET_SYSERR; | ||
359 | } | ||
360 | return GNUNET_OK; | ||
361 | } | ||
362 | |||
354 | 363 | ||
355 | const struct GNUNET_OperationResultMessage * | 364 | static void |
356 | res = (const struct GNUNET_OperationResultMessage *) msg; | 365 | handle_channel_state_result (void *cls, |
366 | const struct GNUNET_OperationResultMessage *res) | ||
367 | { | ||
368 | struct GNUNET_PSYC_Channel *chn = cls; | ||
357 | 369 | ||
358 | GNUNET_ResultCallback result_cb = NULL; | 370 | GNUNET_ResultCallback result_cb = NULL; |
359 | struct GNUNET_PSYC_StateRequest *sr = NULL; | 371 | struct GNUNET_PSYC_StateRequest *sr = NULL; |
360 | 372 | ||
361 | if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client, | 373 | if (GNUNET_YES != GNUNET_OP_get (chn->op, |
362 | GNUNET_ntohll (res->op_id), | 374 | GNUNET_ntohll (res->op_id), |
363 | &result_cb, (void *) &sr)) | 375 | &result_cb, (void *) &sr, NULL)) |
364 | { /* Operation not found. */ | 376 | { /* Operation not found. */ |
365 | return; | 377 | return; |
366 | } | 378 | } |
367 | 379 | ||
368 | const struct GNUNET_MessageHeader * | 380 | const struct GNUNET_MessageHeader *mod = GNUNET_MQ_extract_nested_mh (res); |
369 | mod = (struct GNUNET_MessageHeader *) &res[1]; | ||
370 | uint16_t mod_size = ntohs (mod->size); | 381 | uint16_t mod_size = ntohs (mod->size); |
371 | if (ntohs (msg->size) - sizeof (*res) != mod_size) | 382 | |
372 | { | ||
373 | GNUNET_break (0); | ||
374 | return; | ||
375 | } | ||
376 | switch (ntohs (mod->type)) | 383 | switch (ntohs (mod->type)) |
377 | { | 384 | { |
378 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | 385 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: |
@@ -401,40 +408,40 @@ channel_recv_state_result (void *cls, | |||
401 | } | 408 | } |
402 | 409 | ||
403 | 410 | ||
411 | static int | ||
412 | check_channel_message (void *cls, | ||
413 | const struct GNUNET_PSYC_MessageHeader *pmsg) | ||
414 | { | ||
415 | return GNUNET_OK; | ||
416 | } | ||
417 | |||
418 | |||
404 | static void | 419 | static void |
405 | channel_recv_message (void *cls, | 420 | handle_channel_message (void *cls, |
406 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 421 | const struct GNUNET_PSYC_MessageHeader *pmsg) |
407 | const struct GNUNET_MessageHeader *msg) | ||
408 | { | 422 | { |
409 | struct GNUNET_PSYC_Channel * | 423 | struct GNUNET_PSYC_Channel *chn = cls; |
410 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | 424 | |
411 | GNUNET_PSYC_receive_message (chn->recv, | 425 | GNUNET_PSYC_receive_message (chn->recv, pmsg); |
412 | (const struct GNUNET_PSYC_MessageHeader *) msg); | ||
413 | } | 426 | } |
414 | 427 | ||
415 | 428 | ||
416 | static void | 429 | static void |
417 | channel_recv_message_ack (void *cls, | 430 | handle_channel_message_ack (void *cls, |
418 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 431 | const struct GNUNET_MessageHeader *msg) |
419 | const struct GNUNET_MessageHeader *msg) | ||
420 | { | 432 | { |
421 | struct GNUNET_PSYC_Channel * | 433 | struct GNUNET_PSYC_Channel *chn = cls; |
422 | chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); | 434 | |
423 | GNUNET_PSYC_transmit_got_ack (chn->tmit); | 435 | GNUNET_PSYC_transmit_got_ack (chn->tmit); |
424 | } | 436 | } |
425 | 437 | ||
426 | 438 | ||
427 | static void | 439 | static void |
428 | master_recv_start_ack (void *cls, | 440 | handle_master_start_ack (void *cls, |
429 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 441 | const struct GNUNET_PSYC_CountersResultMessage *cres) |
430 | const struct GNUNET_MessageHeader *msg) | ||
431 | { | 442 | { |
432 | struct GNUNET_PSYC_Master * | 443 | struct GNUNET_PSYC_Master *mst = cls; |
433 | mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, | ||
434 | sizeof (struct GNUNET_PSYC_Channel)); | ||
435 | 444 | ||
436 | struct GNUNET_PSYC_CountersResultMessage * | ||
437 | cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; | ||
438 | int32_t result = ntohl (cres->result_code); | 445 | int32_t result = ntohl (cres->result_code); |
439 | if (GNUNET_OK != result && GNUNET_NO != result) | 446 | if (GNUNET_OK != result && GNUNET_NO != result) |
440 | { | 447 | { |
@@ -447,23 +454,27 @@ master_recv_start_ack (void *cls, | |||
447 | } | 454 | } |
448 | 455 | ||
449 | 456 | ||
457 | static int | ||
458 | check_master_join_request (void *cls, | ||
459 | const struct GNUNET_PSYC_JoinRequestMessage *req) | ||
460 | { | ||
461 | return GNUNET_OK; | ||
462 | } | ||
463 | |||
464 | |||
450 | static void | 465 | static void |
451 | master_recv_join_request (void *cls, | 466 | handle_master_join_request (void *cls, |
452 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 467 | const struct GNUNET_PSYC_JoinRequestMessage *req) |
453 | const struct GNUNET_MessageHeader *msg) | ||
454 | { | 468 | { |
455 | struct GNUNET_PSYC_Master * | 469 | struct GNUNET_PSYC_Master *mst = cls; |
456 | mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, | 470 | |
457 | sizeof (struct GNUNET_PSYC_Channel)); | ||
458 | if (NULL == mst->join_req_cb) | 471 | if (NULL == mst->join_req_cb) |
459 | return; | 472 | return; |
460 | 473 | ||
461 | const struct GNUNET_PSYC_JoinRequestMessage * | ||
462 | req = (const struct GNUNET_PSYC_JoinRequestMessage *) msg; | ||
463 | const struct GNUNET_PSYC_Message *join_msg = NULL; | 474 | const struct GNUNET_PSYC_Message *join_msg = NULL; |
464 | if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size)) | 475 | if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size)) |
465 | { | 476 | { |
466 | join_msg = (struct GNUNET_PSYC_Message *) &req[1]; | 477 | join_msg = (struct GNUNET_PSYC_Message *) GNUNET_MQ_extract_nested_mh (req); |
467 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 478 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
468 | "Received join_msg of type %u and size %u.\n", | 479 | "Received join_msg of type %u and size %u.\n", |
469 | ntohs (join_msg->header.type), ntohs (join_msg->header.size)); | 480 | ntohs (join_msg->header.type), ntohs (join_msg->header.size)); |
@@ -479,15 +490,11 @@ master_recv_join_request (void *cls, | |||
479 | 490 | ||
480 | 491 | ||
481 | static void | 492 | static void |
482 | slave_recv_join_ack (void *cls, | 493 | handle_slave_join_ack (void *cls, |
483 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 494 | const struct GNUNET_PSYC_CountersResultMessage *cres) |
484 | const struct GNUNET_MessageHeader *msg) | ||
485 | { | 495 | { |
486 | struct GNUNET_PSYC_Slave * | 496 | struct GNUNET_PSYC_Slave *slv = cls; |
487 | slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client, | 497 | |
488 | sizeof (struct GNUNET_PSYC_Channel)); | ||
489 | struct GNUNET_PSYC_CountersResultMessage * | ||
490 | cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; | ||
491 | int32_t result = ntohl (cres->result_code); | 498 | int32_t result = ntohl (cres->result_code); |
492 | if (GNUNET_YES != result && GNUNET_NO != result) | 499 | if (GNUNET_YES != result && GNUNET_NO != result) |
493 | { | 500 | { |
@@ -500,16 +507,19 @@ slave_recv_join_ack (void *cls, | |||
500 | } | 507 | } |
501 | 508 | ||
502 | 509 | ||
510 | static int | ||
511 | check_slave_join_decision (void *cls, | ||
512 | const struct GNUNET_PSYC_JoinDecisionMessage *dcsn) | ||
513 | { | ||
514 | return GNUNET_OK; | ||
515 | } | ||
516 | |||
517 | |||
503 | static void | 518 | static void |
504 | slave_recv_join_decision (void *cls, | 519 | handle_slave_join_decision (void *cls, |
505 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 520 | const struct GNUNET_PSYC_JoinDecisionMessage *dcsn) |
506 | const struct GNUNET_MessageHeader *msg) | ||
507 | { | 521 | { |
508 | struct GNUNET_PSYC_Slave * | 522 | struct GNUNET_PSYC_Slave *slv = cls; |
509 | slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client, | ||
510 | sizeof (struct GNUNET_PSYC_Channel)); | ||
511 | const struct GNUNET_PSYC_JoinDecisionMessage * | ||
512 | dcsn = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg; | ||
513 | 523 | ||
514 | struct GNUNET_PSYC_Message *pmsg = NULL; | 524 | struct GNUNET_PSYC_Message *pmsg = NULL; |
515 | if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg)) | 525 | if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg)) |
@@ -520,104 +530,164 @@ slave_recv_join_decision (void *cls, | |||
520 | } | 530 | } |
521 | 531 | ||
522 | 532 | ||
523 | static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] = | 533 | static void |
534 | channel_cleanup (struct GNUNET_PSYC_Channel *chn) | ||
524 | { | 535 | { |
525 | { &channel_recv_message, NULL, | 536 | if (NULL != chn->tmit) |
526 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, | 537 | { |
527 | sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES }, | 538 | GNUNET_PSYC_transmit_destroy (chn->tmit); |
528 | 539 | chn->tmit = NULL; | |
529 | { &channel_recv_message_ack, NULL, | 540 | } |
530 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, | 541 | if (NULL != chn->recv) |
531 | sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, | 542 | { |
532 | 543 | GNUNET_PSYC_receive_destroy (chn->recv); | |
533 | { &master_recv_start_ack, NULL, | 544 | chn->recv = NULL; |
534 | GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK, | 545 | } |
535 | sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO }, | 546 | if (NULL != chn->connect_env) |
536 | 547 | { | |
537 | { &master_recv_join_request, NULL, | 548 | GNUNET_MQ_discard (chn->connect_env); |
538 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, | 549 | chn->connect_env = NULL; |
539 | sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES }, | 550 | } |
540 | 551 | if (NULL != chn->disconnect_cb) | |
541 | { &channel_recv_history_result, NULL, | 552 | { |
542 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, | 553 | chn->disconnect_cb (chn->disconnect_cls); |
543 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | 554 | chn->disconnect_cb = NULL; |
544 | 555 | } | |
545 | { &channel_recv_state_result, NULL, | 556 | } |
546 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, | ||
547 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | ||
548 | |||
549 | { &channel_recv_result, NULL, | ||
550 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, | ||
551 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | ||
552 | 557 | ||
553 | { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, | ||
554 | 558 | ||
555 | { NULL, NULL, 0, 0, GNUNET_NO } | 559 | static void |
556 | }; | 560 | master_cleanup (void *cls) |
561 | { | ||
562 | struct GNUNET_PSYC_Master *mst = cls; | ||
563 | channel_cleanup (&mst->chn); | ||
564 | GNUNET_free (mst); | ||
565 | } | ||
557 | 566 | ||
558 | 567 | ||
559 | static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] = | 568 | static void |
569 | slave_cleanup (void *cls) | ||
560 | { | 570 | { |
561 | { &channel_recv_message, NULL, | 571 | struct GNUNET_PSYC_Slave *slv = cls; |
562 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, | 572 | channel_cleanup (&slv->chn); |
563 | sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES }, | 573 | GNUNET_free (slv); |
564 | 574 | } | |
565 | { &channel_recv_message_ack, NULL, | ||
566 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, | ||
567 | sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, | ||
568 | 575 | ||
569 | { &slave_recv_join_ack, NULL, | ||
570 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK, | ||
571 | sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO }, | ||
572 | 576 | ||
573 | { &slave_recv_join_decision, NULL, | 577 | static void |
574 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, | 578 | channel_disconnect (struct GNUNET_PSYC_Channel *chn, |
575 | sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES }, | 579 | GNUNET_ContinuationCallback cb, |
580 | void *cls) | ||
581 | { | ||
582 | chn->is_disconnecting = GNUNET_YES; | ||
583 | chn->disconnect_cb = cb; | ||
584 | chn->disconnect_cls = cls; | ||
576 | 585 | ||
577 | { &channel_recv_history_result, NULL, | 586 | // FIXME: wait till queued messages are sent |
578 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, | 587 | if (NULL != chn->mq) |
579 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | 588 | { |
589 | GNUNET_MQ_destroy (chn->mq); | ||
590 | chn->mq = NULL; | ||
591 | } | ||
592 | } | ||
580 | 593 | ||
581 | { &channel_recv_state_result, NULL, | ||
582 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, | ||
583 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | ||
584 | 594 | ||
585 | { &channel_recv_result, NULL, | 595 | /*** MASTER ***/ |
586 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, | ||
587 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | ||
588 | 596 | ||
589 | { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, | ||
590 | 597 | ||
591 | { NULL, NULL, 0, 0, GNUNET_NO } | 598 | static void |
592 | }; | 599 | master_connect (struct GNUNET_PSYC_Master *mst); |
593 | 600 | ||
594 | 601 | ||
595 | static void | 602 | static void |
596 | channel_cleanup (struct GNUNET_PSYC_Channel *chn) | 603 | master_reconnect (void *cls) |
597 | { | 604 | { |
598 | GNUNET_PSYC_transmit_destroy (chn->tmit); | 605 | master_connect (cls); |
599 | GNUNET_PSYC_receive_destroy (chn->recv); | ||
600 | GNUNET_free (chn->connect_msg); | ||
601 | if (NULL != chn->disconnect_cb) | ||
602 | chn->disconnect_cb (chn->disconnect_cls); | ||
603 | } | 606 | } |
604 | 607 | ||
605 | 608 | ||
609 | /** | ||
610 | * Master client disconnected from service. | ||
611 | * | ||
612 | * Reconnect after backoff period. | ||
613 | */ | ||
606 | static void | 614 | static void |
607 | master_cleanup (void *cls) | 615 | master_disconnected (void *cls, enum GNUNET_MQ_Error error) |
608 | { | 616 | { |
609 | struct GNUNET_PSYC_Master *mst = cls; | 617 | struct GNUNET_PSYC_Master *mst = cls; |
610 | channel_cleanup (&mst->chn); | 618 | struct GNUNET_PSYC_Channel *chn = &mst->chn; |
611 | GNUNET_free (mst); | 619 | |
620 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
621 | "Master client disconnected (%d), re-connecting\n", | ||
622 | (int) error); | ||
623 | if (NULL != chn->mq) | ||
624 | { | ||
625 | GNUNET_MQ_destroy (chn->mq); | ||
626 | chn->mq = NULL; | ||
627 | } | ||
628 | if (NULL != chn->tmit) | ||
629 | { | ||
630 | GNUNET_PSYC_transmit_destroy (chn->tmit); | ||
631 | chn->tmit = NULL; | ||
632 | } | ||
633 | |||
634 | chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay, | ||
635 | master_reconnect, | ||
636 | mst); | ||
637 | chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay); | ||
612 | } | 638 | } |
613 | 639 | ||
614 | 640 | ||
615 | static void | 641 | static void |
616 | slave_cleanup (void *cls) | 642 | master_connect (struct GNUNET_PSYC_Master *mst) |
617 | { | 643 | { |
618 | struct GNUNET_PSYC_Slave *slv = cls; | 644 | struct GNUNET_PSYC_Channel *chn = &mst->chn; |
619 | channel_cleanup (&slv->chn); | 645 | |
620 | GNUNET_free (slv); | 646 | GNUNET_MQ_hd_fixed_size (master_start_ack, |
647 | GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK, | ||
648 | struct GNUNET_PSYC_CountersResultMessage); | ||
649 | |||
650 | GNUNET_MQ_hd_var_size (master_join_request, | ||
651 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, | ||
652 | struct GNUNET_PSYC_JoinRequestMessage); | ||
653 | |||
654 | GNUNET_MQ_hd_var_size (channel_message, | ||
655 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, | ||
656 | struct GNUNET_PSYC_MessageHeader); | ||
657 | |||
658 | GNUNET_MQ_hd_fixed_size (channel_message_ack, | ||
659 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, | ||
660 | struct GNUNET_MessageHeader); | ||
661 | |||
662 | GNUNET_MQ_hd_var_size (channel_history_result, | ||
663 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, | ||
664 | struct GNUNET_OperationResultMessage); | ||
665 | |||
666 | GNUNET_MQ_hd_var_size (channel_state_result, | ||
667 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, | ||
668 | struct GNUNET_OperationResultMessage); | ||
669 | |||
670 | GNUNET_MQ_hd_var_size (channel_result, | ||
671 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, | ||
672 | struct GNUNET_OperationResultMessage); | ||
673 | |||
674 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
675 | make_master_start_ack_handler (mst), | ||
676 | make_master_join_request_handler (mst), | ||
677 | make_channel_message_handler (chn), | ||
678 | make_channel_message_ack_handler (chn), | ||
679 | make_channel_history_result_handler (chn), | ||
680 | make_channel_state_result_handler (chn), | ||
681 | make_channel_result_handler (chn), | ||
682 | GNUNET_MQ_handler_end () | ||
683 | }; | ||
684 | |||
685 | chn->mq = GNUNET_CLIENT_connecT (chn->cfg, "psyc", | ||
686 | handlers, master_disconnected, mst); | ||
687 | GNUNET_assert (NULL != chn->mq); | ||
688 | chn->tmit = GNUNET_PSYC_transmit_create (chn->mq); | ||
689 | |||
690 | GNUNET_MQ_send_copy (chn->mq, chn->connect_env); | ||
621 | } | 691 | } |
622 | 692 | ||
623 | 693 | ||
@@ -664,26 +734,23 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
664 | struct GNUNET_PSYC_Channel *chn = &mst->chn; | 734 | struct GNUNET_PSYC_Channel *chn = &mst->chn; |
665 | 735 | ||
666 | struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); | 736 | struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); |
667 | req->header.size = htons (sizeof (*req)); | 737 | chn->connect_env = GNUNET_MQ_msg (req, |
668 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); | 738 | GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); |
669 | req->channel_key = *channel_key; | 739 | req->channel_key = *channel_key; |
670 | req->policy = policy; | 740 | req->policy = policy; |
671 | 741 | ||
672 | chn->connect_msg = &req->header; | ||
673 | chn->cfg = cfg; | 742 | chn->cfg = cfg; |
674 | chn->is_master = GNUNET_YES; | 743 | chn->is_master = GNUNET_YES; |
744 | chn->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; | ||
745 | |||
746 | chn->op = GNUNET_OP_create (); | ||
747 | chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls); | ||
675 | 748 | ||
676 | mst->start_cb = start_cb; | 749 | mst->start_cb = start_cb; |
677 | mst->join_req_cb = join_request_cb; | 750 | mst->join_req_cb = join_request_cb; |
678 | mst->cb_cls = cls; | 751 | mst->cb_cls = cls; |
679 | 752 | ||
680 | chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", master_handlers); | 753 | master_connect (mst); |
681 | GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, mst, sizeof (*chn)); | ||
682 | |||
683 | chn->tmit = GNUNET_PSYC_transmit_create (chn->client); | ||
684 | chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls); | ||
685 | |||
686 | channel_send_connect_msg (chn); | ||
687 | return mst; | 754 | return mst; |
688 | } | 755 | } |
689 | 756 | ||
@@ -704,12 +771,8 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst, | |||
704 | 771 | ||
705 | /* FIXME: send msg to service */ | 772 | /* FIXME: send msg to service */ |
706 | 773 | ||
707 | chn->is_disconnecting = GNUNET_YES; | 774 | channel_disconnect (chn, stop_cb, stop_cls); |
708 | chn->disconnect_cb = stop_cb; | 775 | master_cleanup (mst); |
709 | chn->disconnect_cls = stop_cls; | ||
710 | |||
711 | GNUNET_CLIENT_MANAGER_disconnect (mst->chn.client, GNUNET_YES, | ||
712 | &master_cleanup, mst); | ||
713 | } | 776 | } |
714 | 777 | ||
715 | 778 | ||
@@ -753,17 +816,16 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, | |||
753 | < sizeof (*dcsn) + relay_size + join_resp_size) | 816 | < sizeof (*dcsn) + relay_size + join_resp_size) |
754 | return GNUNET_SYSERR; | 817 | return GNUNET_SYSERR; |
755 | 818 | ||
756 | dcsn = GNUNET_malloc (sizeof (*dcsn) + relay_size + join_resp_size); | 819 | struct GNUNET_MQ_Envelope * |
757 | dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size); | 820 | env = GNUNET_MQ_msg_extra (dcsn, relay_size + join_resp_size, |
758 | dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); | 821 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); |
759 | dcsn->is_admitted = htonl (is_admitted); | 822 | dcsn->is_admitted = htonl (is_admitted); |
760 | dcsn->slave_pub_key = jh->slave_pub_key; | 823 | dcsn->slave_pub_key = jh->slave_pub_key; |
761 | 824 | ||
762 | if (0 < join_resp_size) | 825 | if (0 < join_resp_size) |
763 | GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size); | 826 | GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size); |
764 | 827 | ||
765 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &dcsn->header); | 828 | GNUNET_MQ_send (chn->mq, env); |
766 | GNUNET_free (dcsn); | ||
767 | GNUNET_free (jh); | 829 | GNUNET_free (jh); |
768 | return GNUNET_OK; | 830 | return GNUNET_OK; |
769 | } | 831 | } |
@@ -838,6 +900,104 @@ GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) | |||
838 | } | 900 | } |
839 | 901 | ||
840 | 902 | ||
903 | /*** SLAVE ***/ | ||
904 | |||
905 | |||
906 | static void | ||
907 | slave_connect (struct GNUNET_PSYC_Slave *slv); | ||
908 | |||
909 | |||
910 | static void | ||
911 | slave_reconnect (void *cls) | ||
912 | { | ||
913 | slave_connect (cls); | ||
914 | } | ||
915 | |||
916 | |||
917 | /** | ||
918 | * Slave client disconnected from service. | ||
919 | * | ||
920 | * Reconnect after backoff period. | ||
921 | */ | ||
922 | static void | ||
923 | slave_disconnected (void *cls, enum GNUNET_MQ_Error error) | ||
924 | { | ||
925 | struct GNUNET_PSYC_Slave *slv = cls; | ||
926 | struct GNUNET_PSYC_Channel *chn = &slv->chn; | ||
927 | |||
928 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
929 | "Slave client disconnected (%d), re-connecting\n", | ||
930 | (int) error); | ||
931 | if (NULL != chn->mq) | ||
932 | { | ||
933 | GNUNET_MQ_destroy (chn->mq); | ||
934 | chn->mq = NULL; | ||
935 | } | ||
936 | if (NULL != chn->tmit) | ||
937 | { | ||
938 | GNUNET_PSYC_transmit_destroy (chn->tmit); | ||
939 | chn->tmit = NULL; | ||
940 | } | ||
941 | chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay, | ||
942 | slave_reconnect, | ||
943 | slv); | ||
944 | chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay); | ||
945 | } | ||
946 | |||
947 | |||
948 | static void | ||
949 | slave_connect (struct GNUNET_PSYC_Slave *slv) | ||
950 | { | ||
951 | struct GNUNET_PSYC_Channel *chn = &slv->chn; | ||
952 | |||
953 | GNUNET_MQ_hd_fixed_size (slave_join_ack, | ||
954 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK, | ||
955 | struct GNUNET_PSYC_CountersResultMessage); | ||
956 | |||
957 | GNUNET_MQ_hd_var_size (slave_join_decision, | ||
958 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, | ||
959 | struct GNUNET_PSYC_JoinDecisionMessage); | ||
960 | |||
961 | GNUNET_MQ_hd_var_size (channel_message, | ||
962 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, | ||
963 | struct GNUNET_PSYC_MessageHeader); | ||
964 | |||
965 | GNUNET_MQ_hd_fixed_size (channel_message_ack, | ||
966 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, | ||
967 | struct GNUNET_MessageHeader); | ||
968 | |||
969 | GNUNET_MQ_hd_var_size (channel_history_result, | ||
970 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, | ||
971 | struct GNUNET_OperationResultMessage); | ||
972 | |||
973 | GNUNET_MQ_hd_var_size (channel_state_result, | ||
974 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, | ||
975 | struct GNUNET_OperationResultMessage); | ||
976 | |||
977 | GNUNET_MQ_hd_var_size (channel_result, | ||
978 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, | ||
979 | struct GNUNET_OperationResultMessage); | ||
980 | |||
981 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
982 | make_slave_join_ack_handler (slv), | ||
983 | make_slave_join_decision_handler (slv), | ||
984 | make_channel_message_handler (chn), | ||
985 | make_channel_message_ack_handler (chn), | ||
986 | make_channel_history_result_handler (chn), | ||
987 | make_channel_state_result_handler (chn), | ||
988 | make_channel_result_handler (chn), | ||
989 | GNUNET_MQ_handler_end () | ||
990 | }; | ||
991 | |||
992 | chn->mq = GNUNET_CLIENT_connecT (chn->cfg, "psyc", | ||
993 | handlers, slave_disconnected, slv); | ||
994 | GNUNET_assert (NULL != chn->mq); | ||
995 | chn->tmit = GNUNET_PSYC_transmit_create (chn->mq); | ||
996 | |||
997 | GNUNET_MQ_send_copy (chn->mq, chn->connect_env); | ||
998 | } | ||
999 | |||
1000 | |||
841 | /** | 1001 | /** |
842 | * Join a PSYC channel. | 1002 | * Join a PSYC channel. |
843 | * | 1003 | * |
@@ -892,15 +1052,14 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
892 | struct GNUNET_PSYC_Channel *chn = &slv->chn; | 1052 | struct GNUNET_PSYC_Channel *chn = &slv->chn; |
893 | uint16_t relay_size = relay_count * sizeof (*relays); | 1053 | uint16_t relay_size = relay_count * sizeof (*relays); |
894 | uint16_t join_msg_size; | 1054 | uint16_t join_msg_size; |
895 | struct SlaveJoinRequest *req; | ||
896 | |||
897 | if (NULL == join_msg) | 1055 | if (NULL == join_msg) |
898 | join_msg_size = 0; | 1056 | join_msg_size = 0; |
899 | else | 1057 | else |
900 | join_msg_size = ntohs (join_msg->header.size); | 1058 | join_msg_size = ntohs (join_msg->header.size); |
901 | req = GNUNET_malloc (sizeof (*req) + relay_size + join_msg_size); | 1059 | |
902 | req->header.size = htons (sizeof (*req) + relay_size + join_msg_size); | 1060 | struct SlaveJoinRequest *req; |
903 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); | 1061 | chn->connect_env = GNUNET_MQ_msg_extra (req, relay_size + join_msg_size, |
1062 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); | ||
904 | req->channel_pub_key = *channel_pub_key; | 1063 | req->channel_pub_key = *channel_pub_key; |
905 | req->slave_key = *slave_key; | 1064 | req->slave_key = *slave_key; |
906 | req->origin = *origin; | 1065 | req->origin = *origin; |
@@ -913,21 +1072,18 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
913 | if (NULL != join_msg) | 1072 | if (NULL != join_msg) |
914 | GNUNET_memcpy ((char *) &req[1] + relay_size, join_msg, join_msg_size); | 1073 | GNUNET_memcpy ((char *) &req[1] + relay_size, join_msg, join_msg_size); |
915 | 1074 | ||
916 | chn->connect_msg = &req->header; | ||
917 | chn->cfg = cfg; | 1075 | chn->cfg = cfg; |
918 | chn->is_master = GNUNET_NO; | 1076 | chn->is_master = GNUNET_NO; |
1077 | chn->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; | ||
1078 | |||
1079 | chn->op = GNUNET_OP_create (); | ||
1080 | chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls); | ||
919 | 1081 | ||
920 | slv->connect_cb = connect_cb; | 1082 | slv->connect_cb = connect_cb; |
921 | slv->join_dcsn_cb = join_decision_cb; | 1083 | slv->join_dcsn_cb = join_decision_cb; |
922 | slv->cb_cls = cls; | 1084 | slv->cb_cls = cls; |
923 | 1085 | ||
924 | chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", slave_handlers); | 1086 | slave_connect (slv); |
925 | GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, slv, sizeof (*chn)); | ||
926 | |||
927 | chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls); | ||
928 | chn->tmit = GNUNET_PSYC_transmit_create (chn->client); | ||
929 | |||
930 | channel_send_connect_msg (chn); | ||
931 | return slv; | 1087 | return slv; |
932 | } | 1088 | } |
933 | 1089 | ||
@@ -950,12 +1106,8 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv, | |||
950 | 1106 | ||
951 | /* FIXME: send msg to service */ | 1107 | /* FIXME: send msg to service */ |
952 | 1108 | ||
953 | chn->is_disconnecting = GNUNET_YES; | 1109 | channel_disconnect (chn, part_cb, part_cls); |
954 | chn->disconnect_cb = part_cb; | 1110 | slave_cleanup (slv); |
955 | chn->disconnect_cls = part_cls; | ||
956 | |||
957 | GNUNET_CLIENT_MANAGER_disconnect (slv->chn.client, GNUNET_YES, | ||
958 | &slave_cleanup, slv); | ||
959 | } | 1111 | } |
960 | 1112 | ||
961 | 1113 | ||
@@ -1069,18 +1221,16 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, | |||
1069 | GNUNET_ResultCallback result_cb, | 1221 | GNUNET_ResultCallback result_cb, |
1070 | void *cls) | 1222 | void *cls) |
1071 | { | 1223 | { |
1072 | struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); | 1224 | struct ChannelMembershipStoreRequest *req; |
1073 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); | 1225 | struct GNUNET_MQ_Envelope * |
1074 | req->header.size = htons (sizeof (*req)); | 1226 | env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); |
1075 | req->slave_pub_key = *slave_pub_key; | 1227 | req->slave_pub_key = *slave_pub_key; |
1076 | req->announced_at = GNUNET_htonll (announced_at); | 1228 | req->announced_at = GNUNET_htonll (announced_at); |
1077 | req->effective_since = GNUNET_htonll (effective_since); | 1229 | req->effective_since = GNUNET_htonll (effective_since); |
1078 | req->did_join = GNUNET_YES; | 1230 | req->did_join = GNUNET_YES; |
1079 | req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client, | 1231 | req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL)); |
1080 | result_cb, cls)); | ||
1081 | 1232 | ||
1082 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | 1233 | GNUNET_MQ_send (chn->mq, env); |
1083 | GNUNET_free (req); | ||
1084 | } | 1234 | } |
1085 | 1235 | ||
1086 | 1236 | ||
@@ -1122,17 +1272,15 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, | |||
1122 | GNUNET_ResultCallback result_cb, | 1272 | GNUNET_ResultCallback result_cb, |
1123 | void *cls) | 1273 | void *cls) |
1124 | { | 1274 | { |
1125 | struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); | 1275 | struct ChannelMembershipStoreRequest *req; |
1126 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); | 1276 | struct GNUNET_MQ_Envelope * |
1127 | req->header.size = htons (sizeof (*req)); | 1277 | env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); |
1128 | req->slave_pub_key = *slave_pub_key; | 1278 | req->slave_pub_key = *slave_pub_key; |
1129 | req->announced_at = GNUNET_htonll (announced_at); | 1279 | req->announced_at = GNUNET_htonll (announced_at); |
1130 | req->did_join = GNUNET_NO; | 1280 | req->did_join = GNUNET_NO; |
1131 | req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client, | 1281 | req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL)); |
1132 | result_cb, cls)); | ||
1133 | 1282 | ||
1134 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | 1283 | GNUNET_MQ_send (chn->mq, env); |
1135 | GNUNET_free (req); | ||
1136 | } | 1284 | } |
1137 | 1285 | ||
1138 | 1286 | ||
@@ -1154,17 +1302,17 @@ channel_history_replay (struct GNUNET_PSYC_Channel *chn, | |||
1154 | hist->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls); | 1302 | hist->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls); |
1155 | hist->result_cb = result_cb; | 1303 | hist->result_cb = result_cb; |
1156 | hist->cls = cls; | 1304 | hist->cls = cls; |
1157 | hist->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client, | 1305 | hist->op_id = GNUNET_OP_add (chn->op, op_recv_history_result, hist, NULL); |
1158 | &op_recv_history_result, hist); | ||
1159 | 1306 | ||
1160 | GNUNET_assert (NULL != method_prefix); | 1307 | GNUNET_assert (NULL != method_prefix); |
1161 | uint16_t method_size = strnlen (method_prefix, | 1308 | uint16_t method_size = strnlen (method_prefix, |
1162 | GNUNET_SERVER_MAX_MESSAGE_SIZE | 1309 | GNUNET_SERVER_MAX_MESSAGE_SIZE |
1163 | - sizeof (*req)) + 1; | 1310 | - sizeof (*req)) + 1; |
1164 | GNUNET_assert ('\0' == method_prefix[method_size - 1]); | 1311 | GNUNET_assert ('\0' == method_prefix[method_size - 1]); |
1165 | req = GNUNET_malloc (sizeof (*req) + method_size); | 1312 | |
1166 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); | 1313 | struct GNUNET_MQ_Envelope * |
1167 | req->header.size = htons (sizeof (*req) + method_size); | 1314 | env = GNUNET_MQ_msg_extra (req, method_size, |
1315 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); | ||
1168 | req->start_message_id = GNUNET_htonll (start_message_id); | 1316 | req->start_message_id = GNUNET_htonll (start_message_id); |
1169 | req->end_message_id = GNUNET_htonll (end_message_id); | 1317 | req->end_message_id = GNUNET_htonll (end_message_id); |
1170 | req->message_limit = GNUNET_htonll (message_limit); | 1318 | req->message_limit = GNUNET_htonll (message_limit); |
@@ -1172,8 +1320,7 @@ channel_history_replay (struct GNUNET_PSYC_Channel *chn, | |||
1172 | req->op_id = GNUNET_htonll (hist->op_id); | 1320 | req->op_id = GNUNET_htonll (hist->op_id); |
1173 | GNUNET_memcpy (&req[1], method_prefix, method_size); | 1321 | GNUNET_memcpy (&req[1], method_prefix, method_size); |
1174 | 1322 | ||
1175 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | 1323 | GNUNET_MQ_send (chn->mq, env); |
1176 | GNUNET_free (req); | ||
1177 | return hist; | 1324 | return hist; |
1178 | } | 1325 | } |
1179 | 1326 | ||
@@ -1263,7 +1410,7 @@ GNUNET_PSYC_channel_history_replay_cancel (struct GNUNET_PSYC_Channel *channel, | |||
1263 | struct GNUNET_PSYC_HistoryRequest *hist) | 1410 | struct GNUNET_PSYC_HistoryRequest *hist) |
1264 | { | 1411 | { |
1265 | GNUNET_PSYC_receive_destroy (hist->recv); | 1412 | GNUNET_PSYC_receive_destroy (hist->recv); |
1266 | GNUNET_CLIENT_MANAGER_op_cancel (hist->chn->client, hist->op_id); | 1413 | GNUNET_OP_remove (hist->chn->op, hist->op_id); |
1267 | GNUNET_free (hist); | 1414 | GNUNET_free (hist); |
1268 | } | 1415 | } |
1269 | 1416 | ||
@@ -1301,20 +1448,17 @@ channel_state_get (struct GNUNET_PSYC_Channel *chn, | |||
1301 | sr->var_cb = var_cb; | 1448 | sr->var_cb = var_cb; |
1302 | sr->result_cb = result_cb; | 1449 | sr->result_cb = result_cb; |
1303 | sr->cls = cls; | 1450 | sr->cls = cls; |
1304 | sr->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client, | 1451 | sr->op_id = GNUNET_OP_add (chn->op, op_recv_state_result, sr, NULL); |
1305 | &op_recv_state_result, sr); | ||
1306 | 1452 | ||
1307 | GNUNET_assert (NULL != name); | 1453 | GNUNET_assert (NULL != name); |
1308 | size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE | 1454 | size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE |
1309 | - sizeof (*req)) + 1; | 1455 | - sizeof (*req)) + 1; |
1310 | req = GNUNET_malloc (sizeof (*req) + name_size); | 1456 | struct GNUNET_MQ_Envelope * |
1311 | req->header.type = htons (type); | 1457 | env = GNUNET_MQ_msg_extra (req, name_size, type); |
1312 | req->header.size = htons (sizeof (*req) + name_size); | ||
1313 | req->op_id = GNUNET_htonll (sr->op_id); | 1458 | req->op_id = GNUNET_htonll (sr->op_id); |
1314 | GNUNET_memcpy (&req[1], name, name_size); | 1459 | GNUNET_memcpy (&req[1], name, name_size); |
1315 | 1460 | ||
1316 | GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); | 1461 | GNUNET_MQ_send (chn->mq, env); |
1317 | GNUNET_free (req); | ||
1318 | return sr; | 1462 | return sr; |
1319 | } | 1463 | } |
1320 | 1464 | ||
@@ -1397,7 +1541,7 @@ GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn, | |||
1397 | void | 1541 | void |
1398 | GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateRequest *sr) | 1542 | GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateRequest *sr) |
1399 | { | 1543 | { |
1400 | GNUNET_CLIENT_MANAGER_op_cancel (sr->chn->client, sr->op_id); | 1544 | GNUNET_OP_remove (sr->chn->op, sr->op_id); |
1401 | GNUNET_free (sr); | 1545 | GNUNET_free (sr); |
1402 | } | 1546 | } |
1403 | 1547 | ||
diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c index 94b7ff9f5..89be19a42 100644 --- a/src/psycstore/psycstore_api.c +++ b/src/psycstore/psycstore_api.c | |||
@@ -89,7 +89,6 @@ struct GNUNET_PSYCSTORE_Handle | |||
89 | */ | 89 | */ |
90 | struct GNUNET_MQ_Handle *mq; | 90 | struct GNUNET_MQ_Handle *mq; |
91 | 91 | ||
92 | |||
93 | /** | 92 | /** |
94 | * Async operations. | 93 | * Async operations. |
95 | */ | 94 | */ |
diff --git a/src/psycutil/psyc_message.c b/src/psycutil/psyc_message.c index 303ba8466..bc1896b1f 100644 --- a/src/psycutil/psyc_message.c +++ b/src/psycutil/psyc_message.c | |||
@@ -39,7 +39,7 @@ struct GNUNET_PSYC_TransmitHandle | |||
39 | /** | 39 | /** |
40 | * Client connection to service. | 40 | * Client connection to service. |
41 | */ | 41 | */ |
42 | struct GNUNET_CLIENT_MANAGER_Connection *client; | 42 | struct GNUNET_MQ_Handle *mq; |
43 | 43 | ||
44 | /** | 44 | /** |
45 | * Message currently being received from the client. | 45 | * Message currently being received from the client. |
@@ -47,6 +47,11 @@ struct GNUNET_PSYC_TransmitHandle | |||
47 | struct GNUNET_MessageHeader *msg; | 47 | struct GNUNET_MessageHeader *msg; |
48 | 48 | ||
49 | /** | 49 | /** |
50 | * Envelope for @a msg | ||
51 | */ | ||
52 | struct GNUNET_MQ_Envelope *env; | ||
53 | |||
54 | /** | ||
50 | * Callback to request next modifier from client. | 55 | * Callback to request next modifier from client. |
51 | */ | 56 | */ |
52 | GNUNET_PSYC_TransmitNotifyModifier notify_mod; | 57 | GNUNET_PSYC_TransmitNotifyModifier notify_mod; |
@@ -327,11 +332,11 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, | |||
327 | * Create a transmission handle. | 332 | * Create a transmission handle. |
328 | */ | 333 | */ |
329 | struct GNUNET_PSYC_TransmitHandle * | 334 | struct GNUNET_PSYC_TransmitHandle * |
330 | GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client) | 335 | GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq) |
331 | { | 336 | { |
332 | struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle); | 337 | struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle); |
333 | 338 | ||
334 | tmit->client = client; | 339 | tmit->mq = mq; |
335 | return tmit; | 340 | return tmit; |
336 | } | 341 | } |
337 | 342 | ||
@@ -378,16 +383,15 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, | |||
378 | { | 383 | { |
379 | /* End of message or buffer is full, add it to transmission queue | 384 | /* End of message or buffer is full, add it to transmission queue |
380 | * and start with empty buffer */ | 385 | * and start with empty buffer */ |
381 | tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
382 | tmit->msg->size = htons (tmit->msg->size); | 386 | tmit->msg->size = htons (tmit->msg->size); |
383 | GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg); | 387 | GNUNET_MQ_send (tmit->mq, tmit->env); |
388 | tmit->env = NULL; | ||
384 | tmit->msg = NULL; | 389 | tmit->msg = NULL; |
385 | tmit->acks_pending++; | 390 | tmit->acks_pending++; |
386 | } | 391 | } |
387 | else | 392 | else |
388 | { | 393 | { |
389 | /* Message fits in current buffer, append */ | 394 | /* Message fits in current buffer, append */ |
390 | tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size); | ||
391 | GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size); | 395 | GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size); |
392 | tmit->msg->size += size; | 396 | tmit->msg->size += size; |
393 | } | 397 | } |
@@ -396,8 +400,13 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, | |||
396 | if (NULL == tmit->msg && NULL != msg) | 400 | if (NULL == tmit->msg && NULL != msg) |
397 | { | 401 | { |
398 | /* Empty buffer, copy over message. */ | 402 | /* Empty buffer, copy over message. */ |
399 | tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size); | 403 | tmit->env = GNUNET_MQ_msg_extra (tmit->msg, |
404 | GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, | ||
405 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
406 | /* store current message size in host byte order | ||
407 | * then later switch it to network byte order before sending */ | ||
400 | tmit->msg->size = sizeof (*tmit->msg) + size; | 408 | tmit->msg->size = sizeof (*tmit->msg) + size; |
409 | |||
401 | GNUNET_memcpy (&tmit->msg[1], msg, size); | 410 | GNUNET_memcpy (&tmit->msg[1], msg, size); |
402 | } | 411 | } |
403 | 412 | ||
@@ -407,9 +416,9 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, | |||
407 | < tmit->msg->size + sizeof (struct GNUNET_MessageHeader)))) | 416 | < tmit->msg->size + sizeof (struct GNUNET_MessageHeader)))) |
408 | { | 417 | { |
409 | /* End of message or buffer is full, add it to transmission queue. */ | 418 | /* End of message or buffer is full, add it to transmission queue. */ |
410 | tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
411 | tmit->msg->size = htons (tmit->msg->size); | 419 | tmit->msg->size = htons (tmit->msg->size); |
412 | GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg); | 420 | GNUNET_MQ_send (tmit->mq, tmit->env); |
421 | tmit->env = NULL; | ||
413 | tmit->msg = NULL; | 422 | tmit->msg = NULL; |
414 | tmit->acks_pending++; | 423 | tmit->acks_pending++; |
415 | } | 424 | } |
@@ -722,7 +731,12 @@ GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit, | |||
722 | 731 | ||
723 | size_t size = strlen (method_name) + 1; | 732 | size_t size = strlen (method_name) + 1; |
724 | struct GNUNET_PSYC_MessageMethod *pmeth; | 733 | struct GNUNET_PSYC_MessageMethod *pmeth; |
725 | tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size); | 734 | |
735 | tmit->env = GNUNET_MQ_msg_extra (tmit->msg, | ||
736 | GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, | ||
737 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
738 | /* store current message size in host byte order | ||
739 | * then later switch it to network byte order before sending */ | ||
726 | tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size; | 740 | tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size; |
727 | 741 | ||
728 | if (NULL != notify_mod) | 742 | if (NULL != notify_mod) |
diff --git a/src/social/social_api.c b/src/social/social_api.c index 66af14813..9f15b4146 100644 --- a/src/social/social_api.c +++ b/src/social/social_api.c | |||
@@ -71,12 +71,27 @@ struct GNUNET_SOCIAL_App | |||
71 | /** | 71 | /** |
72 | * Client connection to the service. | 72 | * Client connection to the service. |
73 | */ | 73 | */ |
74 | struct GNUNET_CLIENT_MANAGER_Connection *client; | 74 | struct GNUNET_MQ_Handle *mq; |
75 | 75 | ||
76 | /** | 76 | /** |
77 | * Message to send on reconnect. | 77 | * Message to send on connect. |
78 | */ | 78 | */ |
79 | struct GNUNET_MessageHeader *connect_msg; | 79 | struct GNUNET_MQ_Envelope *connect_env; |
80 | |||
81 | /** | ||
82 | * Time to wait until we try to reconnect on failure. | ||
83 | */ | ||
84 | struct GNUNET_TIME_Relative reconnect_delay; | ||
85 | |||
86 | /** | ||
87 | * Task for reconnecting when the listener fails. | ||
88 | */ | ||
89 | struct GNUNET_SCHEDULER_Task *reconnect_task; | ||
90 | |||
91 | /** | ||
92 | * Async operations. | ||
93 | */ | ||
94 | struct GNUNET_OP_Handle *op; | ||
80 | 95 | ||
81 | /** | 96 | /** |
82 | * Function called after disconnected from the service. | 97 | * Function called after disconnected from the service. |
@@ -136,7 +151,27 @@ struct GNUNET_SOCIAL_Place | |||
136 | /** | 151 | /** |
137 | * Client connection to the service. | 152 | * Client connection to the service. |
138 | */ | 153 | */ |
139 | struct GNUNET_CLIENT_MANAGER_Connection *client; | 154 | struct GNUNET_MQ_Handle *mq; |
155 | |||
156 | /** | ||
157 | * Message to send on connect. | ||
158 | */ | ||
159 | struct GNUNET_MQ_Envelope *connect_env; | ||
160 | |||
161 | /** | ||
162 | * Time to wait until we try to reconnect on failure. | ||
163 | */ | ||
164 | struct GNUNET_TIME_Relative reconnect_delay; | ||
165 | |||
166 | /** | ||
167 | * Task for reconnecting when the listener fails. | ||
168 | */ | ||
169 | struct GNUNET_SCHEDULER_Task *reconnect_task; | ||
170 | |||
171 | /** | ||
172 | * Async operations. | ||
173 | */ | ||
174 | struct GNUNET_OP_Handle *op; | ||
140 | 175 | ||
141 | /** | 176 | /** |
142 | * Transmission handle. | 177 | * Transmission handle. |
@@ -149,11 +184,6 @@ struct GNUNET_SOCIAL_Place | |||
149 | struct GNUNET_PSYC_Slicer *slicer; | 184 | struct GNUNET_PSYC_Slicer *slicer; |
150 | 185 | ||
151 | /** | 186 | /** |
152 | * Message to send on reconnect. | ||
153 | */ | ||
154 | struct GNUNET_MessageHeader *connect_msg; | ||
155 | |||
156 | /** | ||
157 | * Function called after disconnected from the service. | 187 | * Function called after disconnected from the service. |
158 | */ | 188 | */ |
159 | GNUNET_ContinuationCallback disconnect_cb; | 189 | GNUNET_ContinuationCallback disconnect_cb; |
@@ -337,7 +367,6 @@ struct ZoneAddPlaceHandle | |||
337 | 367 | ||
338 | struct ZoneAddNymHandle | 368 | struct ZoneAddNymHandle |
339 | { | 369 | { |
340 | struct ZoneAddNymRequest *req; | ||
341 | GNUNET_ResultCallback result_cb; | 370 | GNUNET_ResultCallback result_cb; |
342 | void *result_cls; | 371 | void *result_cls; |
343 | }; | 372 | }; |
@@ -481,109 +510,66 @@ host_recv_notice_place_leave_eom (void *cls, | |||
481 | } | 510 | } |
482 | 511 | ||
483 | 512 | ||
484 | /*** CLIENT ***/ | ||
485 | |||
486 | |||
487 | static void | ||
488 | app_send_connect_msg (struct GNUNET_SOCIAL_App *app) | ||
489 | { | ||
490 | uint16_t cmsg_size = ntohs (app->connect_msg->size); | ||
491 | struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size); | ||
492 | GNUNET_memcpy (cmsg, app->connect_msg, cmsg_size); | ||
493 | GNUNET_CLIENT_MANAGER_transmit_now (app->client, cmsg); | ||
494 | GNUNET_free (cmsg); | ||
495 | } | ||
496 | |||
497 | |||
498 | static void | ||
499 | app_recv_disconnect (void *cls, | ||
500 | struct GNUNET_CLIENT_MANAGER_Connection *client, | ||
501 | const struct GNUNET_MessageHeader *msg) | ||
502 | { | ||
503 | struct GNUNET_SOCIAL_App * | ||
504 | app = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*app)); | ||
505 | |||
506 | GNUNET_CLIENT_MANAGER_reconnect (client); | ||
507 | app_send_connect_msg (app); | ||
508 | } | ||
509 | |||
510 | |||
511 | /*** PLACE ***/ | 513 | /*** PLACE ***/ |
512 | 514 | ||
513 | 515 | ||
514 | static void | 516 | static int |
515 | place_send_connect_msg (struct GNUNET_SOCIAL_Place *plc) | 517 | check_place_result (void *cls, |
518 | const struct GNUNET_OperationResultMessage *res) | ||
516 | { | 519 | { |
517 | uint16_t cmsg_size = ntohs (plc->connect_msg->size); | 520 | uint16_t size = ntohs (res->header.size); |
518 | struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size); | 521 | if (size < sizeof (*res)) |
519 | GNUNET_memcpy (cmsg, plc->connect_msg, cmsg_size); | 522 | { /* Error, message too small. */ |
520 | GNUNET_CLIENT_MANAGER_transmit_now (plc->client, cmsg); | 523 | GNUNET_break (0); |
521 | GNUNET_free (cmsg); | 524 | return GNUNET_SYSERR; |
525 | } | ||
526 | return GNUNET_OK; | ||
522 | } | 527 | } |
523 | 528 | ||
524 | 529 | ||
525 | static void | 530 | static void |
526 | place_recv_disconnect (void *cls, | 531 | handle_place_result (void *cls, |
527 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 532 | const struct GNUNET_OperationResultMessage *res) |
528 | const struct GNUNET_MessageHeader *msg) | ||
529 | { | 533 | { |
530 | struct GNUNET_SOCIAL_Place * | 534 | struct GNUNET_SOCIAL_Place *plc = cls; |
531 | plc = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*plc)); | ||
532 | 535 | ||
533 | GNUNET_CLIENT_MANAGER_reconnect (client); | 536 | uint16_t size = ntohs (res->header.size); |
534 | place_send_connect_msg (plc); | 537 | uint16_t data_size = size - sizeof (*res); |
538 | const char *data = (0 < data_size) ? (const char *) &res[1] : NULL; | ||
539 | |||
540 | GNUNET_OP_result (plc->op, GNUNET_ntohll (res->op_id), | ||
541 | GNUNET_ntohll (res->result_code), | ||
542 | data, data_size, NULL); | ||
535 | } | 543 | } |
536 | 544 | ||
537 | 545 | ||
538 | static void | 546 | static int |
539 | place_recv_result (void *cls, | 547 | check_app_result (void *cls, |
540 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 548 | const struct GNUNET_OperationResultMessage *res) |
541 | const struct GNUNET_MessageHeader *msg) | ||
542 | { | 549 | { |
543 | struct GNUNET_SOCIAL_Place * | 550 | uint16_t size = ntohs (res->header.size); |
544 | plc = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*plc)); | ||
545 | |||
546 | const struct GNUNET_OperationResultMessage * | ||
547 | res = (const struct GNUNET_OperationResultMessage *) msg; | ||
548 | |||
549 | uint16_t size = ntohs (msg->size); | ||
550 | if (size < sizeof (*res)) | 551 | if (size < sizeof (*res)) |
551 | { /* Error, message too small. */ | 552 | { /* Error, message too small. */ |
552 | GNUNET_break (0); | 553 | GNUNET_break (0); |
553 | return; | 554 | return GNUNET_SYSERR; |
554 | } | 555 | } |
555 | 556 | return GNUNET_OK; | |
556 | uint16_t data_size = size - sizeof (*res); | ||
557 | const char *data = (0 < data_size) ? (const char *) &res[1] : NULL; | ||
558 | GNUNET_CLIENT_MANAGER_op_result (plc->client, GNUNET_ntohll (res->op_id), | ||
559 | GNUNET_ntohll (res->result_code), | ||
560 | data, data_size); | ||
561 | } | 557 | } |
562 | 558 | ||
563 | 559 | ||
564 | static void | 560 | static void |
565 | app_recv_result (void *cls, | 561 | handle_app_result (void *cls, |
566 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 562 | const struct GNUNET_OperationResultMessage *res) |
567 | const struct GNUNET_MessageHeader *msg) | ||
568 | { | 563 | { |
569 | struct GNUNET_SOCIAL_App * | 564 | struct GNUNET_SOCIAL_App *app = cls; |
570 | app = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*app)); | ||
571 | |||
572 | const struct GNUNET_OperationResultMessage * | ||
573 | res = (const struct GNUNET_OperationResultMessage *) msg; | ||
574 | |||
575 | uint16_t size = ntohs (msg->size); | ||
576 | if (size < sizeof (*res)) | ||
577 | { /* Error, message too small. */ | ||
578 | GNUNET_break (0); | ||
579 | return; | ||
580 | } | ||
581 | 565 | ||
566 | uint16_t size = ntohs (res->header.size); | ||
582 | uint16_t data_size = size - sizeof (*res); | 567 | uint16_t data_size = size - sizeof (*res); |
583 | const char *data = (0 < data_size) ? (const char *) &res[1] : NULL; | 568 | const char *data = (0 < data_size) ? (const char *) &res[1] : NULL; |
584 | GNUNET_CLIENT_MANAGER_op_result (app->client, GNUNET_ntohll (res->op_id), | 569 | |
585 | GNUNET_ntohll (res->result_code), | 570 | GNUNET_OP_result (app->op, GNUNET_ntohll (res->op_id), |
586 | data, data_size); | 571 | GNUNET_ntohll (res->result_code), |
572 | data, data_size, NULL); | ||
587 | } | 573 | } |
588 | 574 | ||
589 | 575 | ||
@@ -619,18 +605,30 @@ op_recv_state_result (void *cls, int64_t result, | |||
619 | } | 605 | } |
620 | 606 | ||
621 | 607 | ||
622 | static void | 608 | static int |
623 | place_recv_history_result (void *cls, | 609 | check_place_history_result (void *cls, |
624 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 610 | const struct GNUNET_OperationResultMessage *res) |
625 | const struct GNUNET_MessageHeader *msg) | ||
626 | { | 611 | { |
627 | struct GNUNET_SOCIAL_Place * | 612 | struct GNUNET_PSYC_MessageHeader * |
628 | plc = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*plc)); | 613 | pmsg = (struct GNUNET_PSYC_MessageHeader *) GNUNET_MQ_extract_nested_mh (res); |
614 | uint16_t size = ntohs (res->header.size); | ||
615 | |||
616 | if (NULL == pmsg || size < sizeof (*res) + sizeof (*pmsg)) | ||
617 | { /* Error, message too small. */ | ||
618 | GNUNET_break (0); | ||
619 | return GNUNET_SYSERR; | ||
620 | } | ||
621 | return GNUNET_OK; | ||
622 | } | ||
629 | 623 | ||
630 | const struct GNUNET_OperationResultMessage * | 624 | |
631 | res = (const struct GNUNET_OperationResultMessage *) msg; | 625 | static void |
626 | handle_place_history_result (void *cls, | ||
627 | const struct GNUNET_OperationResultMessage *res) | ||
628 | { | ||
629 | struct GNUNET_SOCIAL_Place *plc = cls; | ||
632 | struct GNUNET_PSYC_MessageHeader * | 630 | struct GNUNET_PSYC_MessageHeader * |
633 | pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1]; | 631 | pmsg = (struct GNUNET_PSYC_MessageHeader *) GNUNET_MQ_extract_nested_mh (res); |
634 | 632 | ||
635 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 633 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
636 | "%p Received historic fragment for message #%" PRIu64 ".\n", | 634 | "%p Received historic fragment for message #%" PRIu64 ".\n", |
@@ -639,9 +637,9 @@ place_recv_history_result (void *cls, | |||
639 | GNUNET_ResultCallback result_cb = NULL; | 637 | GNUNET_ResultCallback result_cb = NULL; |
640 | struct GNUNET_SOCIAL_HistoryRequest *hist = NULL; | 638 | struct GNUNET_SOCIAL_HistoryRequest *hist = NULL; |
641 | 639 | ||
642 | if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (plc->client, | 640 | if (GNUNET_YES != GNUNET_OP_get (plc->op, |
643 | GNUNET_ntohll (res->op_id), | 641 | GNUNET_ntohll (res->op_id), |
644 | &result_cb, (void *) &hist)) | 642 | &result_cb, (void *) &hist, NULL)) |
645 | { /* Operation not found. */ | 643 | { /* Operation not found. */ |
646 | LOG (GNUNET_ERROR_TYPE_WARNING, | 644 | LOG (GNUNET_ERROR_TYPE_WARNING, |
647 | "%p Replay operation not found for historic fragment of message #%" | 645 | "%p Replay operation not found for historic fragment of message #%" |
@@ -650,50 +648,50 @@ place_recv_history_result (void *cls, | |||
650 | return; | 648 | return; |
651 | } | 649 | } |
652 | 650 | ||
653 | uint16_t size = ntohs (msg->size); | ||
654 | if (size < sizeof (*res) + sizeof (*pmsg)) | ||
655 | { /* Error, message too small. */ | ||
656 | GNUNET_break (0); | ||
657 | return; | ||
658 | } | ||
659 | |||
660 | GNUNET_PSYC_slicer_message (hist->slicer, | 651 | GNUNET_PSYC_slicer_message (hist->slicer, |
661 | (const struct GNUNET_PSYC_MessageHeader *) pmsg); | 652 | (const struct GNUNET_PSYC_MessageHeader *) pmsg); |
662 | } | 653 | } |
663 | 654 | ||
664 | 655 | ||
665 | static void | 656 | static int |
666 | place_recv_state_result (void *cls, | 657 | check_place_state_result (void *cls, |
667 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 658 | const struct GNUNET_OperationResultMessage *res) |
668 | const struct GNUNET_MessageHeader *msg) | ||
669 | { | 659 | { |
670 | struct GNUNET_SOCIAL_Place * | 660 | const struct GNUNET_MessageHeader *mod = GNUNET_MQ_extract_nested_mh (res); |
671 | plc = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*plc)); | 661 | uint16_t mod_size = ntohs (mod->size); |
662 | uint16_t size = ntohs (res->header.size); | ||
672 | 663 | ||
673 | const struct GNUNET_OperationResultMessage * | 664 | if (NULL == mod || size - sizeof (*res) != mod_size) |
674 | res = (const struct GNUNET_OperationResultMessage *) msg; | 665 | { |
666 | GNUNET_break_op (0); | ||
667 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
668 | "Invalid modifier size in state result: %u - %u != %u\n", | ||
669 | ntohs (res->header.size), sizeof (*res), mod_size); | ||
670 | return GNUNET_SYSERR; | ||
671 | } | ||
672 | return GNUNET_OK; | ||
673 | } | ||
674 | |||
675 | |||
676 | static void | ||
677 | handle_place_state_result (void *cls, | ||
678 | const struct GNUNET_OperationResultMessage *res) | ||
679 | { | ||
680 | struct GNUNET_SOCIAL_Place *plc = cls; | ||
675 | 681 | ||
676 | GNUNET_ResultCallback result_cb = NULL; | 682 | GNUNET_ResultCallback result_cb = NULL; |
677 | struct GNUNET_SOCIAL_LookHandle *look = NULL; | 683 | struct GNUNET_SOCIAL_LookHandle *look = NULL; |
678 | 684 | ||
679 | if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (plc->client, | 685 | if (GNUNET_YES != GNUNET_OP_get (plc->op, |
680 | GNUNET_ntohll (res->op_id), | 686 | GNUNET_ntohll (res->op_id), |
681 | &result_cb, (void *) &look)) | 687 | &result_cb, (void *) &look, NULL)) |
682 | { /* Operation not found. */ | 688 | { /* Operation not found. */ |
683 | return; | 689 | return; |
684 | } | 690 | } |
685 | 691 | ||
686 | const struct GNUNET_MessageHeader * | 692 | const struct GNUNET_MessageHeader *mod = GNUNET_MQ_extract_nested_mh (res); |
687 | mod = (struct GNUNET_MessageHeader *) &res[1]; | ||
688 | uint16_t mod_size = ntohs (mod->size); | 693 | uint16_t mod_size = ntohs (mod->size); |
689 | if (ntohs (msg->size) - sizeof (*res) != mod_size) | 694 | |
690 | { | ||
691 | GNUNET_break_op (0); | ||
692 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
693 | "Invalid modifier size in state result: %u - %u != %u\n", | ||
694 | ntohs (msg->size), sizeof (*res), mod_size); | ||
695 | return; | ||
696 | } | ||
697 | switch (ntohs (mod->type)) | 695 | switch (ntohs (mod->type)) |
698 | { | 696 | { |
699 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | 697 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: |
@@ -737,52 +735,58 @@ place_recv_state_result (void *cls, | |||
737 | 735 | ||
738 | 736 | ||
739 | static void | 737 | static void |
740 | place_recv_message_ack (void *cls, | 738 | handle_place_message_ack (void *cls, |
741 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 739 | const struct GNUNET_MessageHeader *msg) |
742 | const struct GNUNET_MessageHeader *msg) | ||
743 | { | 740 | { |
744 | struct GNUNET_SOCIAL_Place * | 741 | struct GNUNET_SOCIAL_Place *plc = cls; |
745 | plc = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*plc)); | 742 | |
746 | GNUNET_PSYC_transmit_got_ack (plc->tmit); | 743 | GNUNET_PSYC_transmit_got_ack (plc->tmit); |
747 | } | 744 | } |
748 | 745 | ||
749 | 746 | ||
747 | static int | ||
748 | check_place_message (void *cls, | ||
749 | const struct GNUNET_PSYC_MessageHeader *pmsg) | ||
750 | { | ||
751 | return GNUNET_OK; | ||
752 | } | ||
753 | |||
754 | |||
750 | static void | 755 | static void |
751 | place_recv_message (void *cls, | 756 | handle_place_message (void *cls, |
752 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 757 | const struct GNUNET_PSYC_MessageHeader *pmsg) |
753 | const struct GNUNET_MessageHeader *msg) | 758 | { |
759 | struct GNUNET_SOCIAL_Place *plc = cls; | ||
760 | |||
761 | GNUNET_PSYC_slicer_message (plc->slicer, pmsg); | ||
762 | } | ||
763 | |||
764 | |||
765 | static int | ||
766 | check_host_message (void *cls, | ||
767 | const struct GNUNET_PSYC_MessageHeader *pmsg) | ||
754 | { | 768 | { |
755 | struct GNUNET_SOCIAL_Place * | 769 | return GNUNET_OK; |
756 | plc = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*plc)); | ||
757 | GNUNET_PSYC_slicer_message (plc->slicer, | ||
758 | (const struct GNUNET_PSYC_MessageHeader *) msg); | ||
759 | } | 770 | } |
760 | 771 | ||
761 | 772 | ||
762 | static void | 773 | static void |
763 | host_recv_message (void *cls, | 774 | handle_host_message (void *cls, |
764 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 775 | const struct GNUNET_PSYC_MessageHeader *pmsg) |
765 | const struct GNUNET_MessageHeader *msg) | ||
766 | { | 776 | { |
767 | struct GNUNET_SOCIAL_Host * | 777 | struct GNUNET_SOCIAL_Host *hst = cls; |
768 | hst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (hst->plc)); | 778 | |
769 | GNUNET_PSYC_slicer_message (hst->slicer, | 779 | GNUNET_PSYC_slicer_message (hst->slicer, pmsg); |
770 | (const struct GNUNET_PSYC_MessageHeader *) msg); | 780 | GNUNET_PSYC_slicer_message (hst->plc.slicer, pmsg); |
771 | GNUNET_PSYC_slicer_message (hst->plc.slicer, | ||
772 | (const struct GNUNET_PSYC_MessageHeader *) msg); | ||
773 | } | 781 | } |
774 | 782 | ||
775 | 783 | ||
776 | static void | 784 | static void |
777 | host_recv_enter_ack (void *cls, | 785 | handle_host_enter_ack (void *cls, |
778 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 786 | const struct HostEnterAck *hack) |
779 | const struct GNUNET_MessageHeader *msg) | ||
780 | { | 787 | { |
781 | struct GNUNET_SOCIAL_Host * | 788 | struct GNUNET_SOCIAL_Host *hst = cls; |
782 | hst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, | ||
783 | sizeof (struct GNUNET_SOCIAL_Place)); | ||
784 | 789 | ||
785 | struct HostEnterAck *hack = (struct HostEnterAck *) msg; | ||
786 | hst->plc.pub_key = hack->place_pub_key; | 790 | hst->plc.pub_key = hack->place_pub_key; |
787 | 791 | ||
788 | int32_t result = ntohl (hack->result_code); | 792 | int32_t result = ntohl (hack->result_code); |
@@ -792,14 +796,20 @@ host_recv_enter_ack (void *cls, | |||
792 | } | 796 | } |
793 | 797 | ||
794 | 798 | ||
799 | static int | ||
800 | check_host_enter_request (void *cls, | ||
801 | const struct GNUNET_PSYC_JoinRequestMessage *req) | ||
802 | { | ||
803 | return GNUNET_OK; | ||
804 | } | ||
805 | |||
806 | |||
795 | static void | 807 | static void |
796 | host_recv_enter_request (void *cls, | 808 | handle_host_enter_request (void *cls, |
797 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 809 | const struct GNUNET_PSYC_JoinRequestMessage *req) |
798 | const struct GNUNET_MessageHeader *msg) | ||
799 | { | 810 | { |
800 | struct GNUNET_SOCIAL_Host * | 811 | struct GNUNET_SOCIAL_Host *hst = cls; |
801 | hst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, | 812 | |
802 | sizeof (struct GNUNET_SOCIAL_Place)); | ||
803 | if (NULL == hst->answer_door_cb) | 813 | if (NULL == hst->answer_door_cb) |
804 | return; | 814 | return; |
805 | 815 | ||
@@ -809,15 +819,13 @@ host_recv_enter_request (void *cls, | |||
809 | const void *data = NULL; | 819 | const void *data = NULL; |
810 | uint16_t data_size = 0; | 820 | uint16_t data_size = 0; |
811 | char *str; | 821 | char *str; |
812 | const struct GNUNET_PSYC_JoinRequestMessage * | ||
813 | req = (const struct GNUNET_PSYC_JoinRequestMessage *) msg; | ||
814 | const struct GNUNET_PSYC_Message *join_msg = NULL; | 822 | const struct GNUNET_PSYC_Message *join_msg = NULL; |
815 | 823 | ||
816 | do | 824 | do |
817 | { | 825 | { |
818 | if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size)) | 826 | if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size)) |
819 | { | 827 | { |
820 | join_msg = (struct GNUNET_PSYC_Message *) &req[1]; | 828 | join_msg = (struct GNUNET_PSYC_Message *) GNUNET_MQ_extract_nested_mh (req); |
821 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 829 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
822 | "Received join_msg of type %u and size %u.\n", | 830 | "Received join_msg of type %u and size %u.\n", |
823 | ntohs (join_msg->header.type), ntohs (join_msg->header.size)); | 831 | ntohs (join_msg->header.type), ntohs (join_msg->header.size)); |
@@ -850,16 +858,11 @@ host_recv_enter_request (void *cls, | |||
850 | 858 | ||
851 | 859 | ||
852 | static void | 860 | static void |
853 | guest_recv_enter_ack (void *cls, | 861 | handle_guest_enter_ack (void *cls, |
854 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 862 | const struct GNUNET_PSYC_CountersResultMessage *cres) |
855 | const struct GNUNET_MessageHeader *msg) | ||
856 | { | 863 | { |
857 | struct GNUNET_SOCIAL_Guest * | 864 | struct GNUNET_SOCIAL_Guest *gst = cls; |
858 | gst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, | ||
859 | sizeof (struct GNUNET_SOCIAL_Place)); | ||
860 | 865 | ||
861 | struct GNUNET_PSYC_CountersResultMessage * | ||
862 | cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; | ||
863 | int32_t result = ntohl (cres->result_code); | 866 | int32_t result = ntohl (cres->result_code); |
864 | if (NULL != gst->enter_cb) | 867 | if (NULL != gst->enter_cb) |
865 | gst->enter_cb (gst->cb_cls, result, &gst->plc.pub_key, | 868 | gst->enter_cb (gst->cb_cls, result, &gst->plc.pub_key, |
@@ -867,36 +870,42 @@ guest_recv_enter_ack (void *cls, | |||
867 | } | 870 | } |
868 | 871 | ||
869 | 872 | ||
873 | static int | ||
874 | check_guest_enter_decision (void *cls, | ||
875 | const struct GNUNET_PSYC_JoinDecisionMessage *dcsn) | ||
876 | { | ||
877 | return GNUNET_OK; | ||
878 | } | ||
879 | |||
880 | |||
870 | static void | 881 | static void |
871 | guest_recv_join_decision (void *cls, | 882 | handle_guest_enter_decision (void *cls, |
872 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 883 | const struct GNUNET_PSYC_JoinDecisionMessage *dcsn) |
873 | const struct GNUNET_MessageHeader *msg) | ||
874 | { | 884 | { |
875 | struct GNUNET_SOCIAL_Guest * | 885 | struct GNUNET_SOCIAL_Guest *gst = cls; |
876 | gst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, | ||
877 | sizeof (struct GNUNET_SOCIAL_Place)); | ||
878 | const struct GNUNET_PSYC_JoinDecisionMessage * | ||
879 | dcsn = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg; | ||
880 | 886 | ||
881 | struct GNUNET_PSYC_Message *pmsg = NULL; | 887 | struct GNUNET_PSYC_Message *pmsg = NULL; |
882 | if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg)) | 888 | if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg)) |
883 | pmsg = (struct GNUNET_PSYC_Message *) &dcsn[1]; | 889 | pmsg = (struct GNUNET_PSYC_Message *) GNUNET_MQ_extract_nested_mh (dcsn); |
884 | 890 | ||
885 | if (NULL != gst->entry_dcsn_cb) | 891 | if (NULL != gst->entry_dcsn_cb) |
886 | gst->entry_dcsn_cb (gst->cb_cls, ntohl (dcsn->is_admitted), pmsg); | 892 | gst->entry_dcsn_cb (gst->cb_cls, ntohl (dcsn->is_admitted), pmsg); |
887 | } | 893 | } |
888 | 894 | ||
889 | 895 | ||
890 | static void | 896 | static int |
891 | app_recv_ego (void *cls, | 897 | check_app_ego (void *cls, |
892 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 898 | const struct AppEgoMessage *emsg) |
893 | const struct GNUNET_MessageHeader *msg) | ||
894 | { | 899 | { |
895 | struct GNUNET_SOCIAL_App * | 900 | return GNUNET_OK; |
896 | app = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*app)); | 901 | } |
897 | 902 | ||
898 | struct AppEgoMessage * | 903 | |
899 | emsg = (struct AppEgoMessage *) msg; | 904 | static void |
905 | handle_app_ego (void *cls, | ||
906 | const struct AppEgoMessage *emsg) | ||
907 | { | ||
908 | struct GNUNET_SOCIAL_App *app = cls; | ||
900 | 909 | ||
901 | uint16_t name_size = ntohs (emsg->header.size) - sizeof (*emsg); | 910 | uint16_t name_size = ntohs (emsg->header.size) - sizeof (*emsg); |
902 | 911 | ||
@@ -928,25 +937,26 @@ app_recv_ego (void *cls, | |||
928 | 937 | ||
929 | 938 | ||
930 | static void | 939 | static void |
931 | app_recv_ego_end (void *cls, | 940 | handle_app_ego_end (void *cls, |
932 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 941 | const struct GNUNET_MessageHeader *msg) |
933 | const struct GNUNET_MessageHeader *msg) | ||
934 | { | 942 | { |
935 | struct GNUNET_SOCIAL_App * | 943 | //struct GNUNET_SOCIAL_App *app = cls; |
936 | app = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*app)); | ||
937 | } | 944 | } |
938 | 945 | ||
939 | 946 | ||
940 | static void | 947 | static int |
941 | app_recv_place (void *cls, | 948 | check_app_place (void *cls, |
942 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 949 | const struct AppPlaceMessage *pmsg) |
943 | const struct GNUNET_MessageHeader *msg) | ||
944 | { | 950 | { |
945 | struct GNUNET_SOCIAL_App * | 951 | return GNUNET_OK; |
946 | app = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*app)); | 952 | } |
947 | 953 | ||
948 | struct AppPlaceMessage * | 954 | |
949 | pmsg = (struct AppPlaceMessage *) msg; | 955 | static void |
956 | handle_app_place (void *cls, | ||
957 | const struct AppPlaceMessage *pmsg) | ||
958 | { | ||
959 | struct GNUNET_SOCIAL_App *app = cls; | ||
950 | 960 | ||
951 | if ((GNUNET_YES == pmsg->is_host && NULL == app->host_cb) | 961 | if ((GNUNET_YES == pmsg->is_host && NULL == app->host_cb) |
952 | || (GNUNET_NO == pmsg->is_host && NULL == app->guest_cb)) | 962 | || (GNUNET_NO == pmsg->is_host && NULL == app->guest_cb)) |
@@ -987,122 +997,16 @@ app_recv_place (void *cls, | |||
987 | 997 | ||
988 | 998 | ||
989 | static void | 999 | static void |
990 | app_recv_place_end (void *cls, | 1000 | handle_app_place_end (void *cls, |
991 | struct GNUNET_CLIENT_MANAGER_Connection *client, | 1001 | const struct GNUNET_MessageHeader *msg) |
992 | const struct GNUNET_MessageHeader *msg) | ||
993 | { | 1002 | { |
994 | struct GNUNET_SOCIAL_App * | 1003 | struct GNUNET_SOCIAL_App *app = cls; |
995 | app = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*app)); | ||
996 | 1004 | ||
997 | if (NULL != app->connected_cb) | 1005 | if (NULL != app->connected_cb) |
998 | app->connected_cb (app->cb_cls); | 1006 | app->connected_cb (app->cb_cls); |
999 | } | 1007 | } |
1000 | 1008 | ||
1001 | 1009 | ||
1002 | static struct GNUNET_CLIENT_MANAGER_MessageHandler host_handlers[] = | ||
1003 | { | ||
1004 | { host_recv_enter_ack, NULL, | ||
1005 | GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK, | ||
1006 | sizeof (struct HostEnterAck), GNUNET_NO }, | ||
1007 | |||
1008 | { host_recv_enter_request, NULL, | ||
1009 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, | ||
1010 | sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES }, | ||
1011 | |||
1012 | { host_recv_message, NULL, | ||
1013 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, | ||
1014 | sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES }, | ||
1015 | |||
1016 | { place_recv_message_ack, NULL, | ||
1017 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, | ||
1018 | sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, | ||
1019 | |||
1020 | { place_recv_history_result, NULL, | ||
1021 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, | ||
1022 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | ||
1023 | |||
1024 | { place_recv_state_result, NULL, | ||
1025 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, | ||
1026 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | ||
1027 | |||
1028 | { place_recv_result, NULL, | ||
1029 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, | ||
1030 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | ||
1031 | |||
1032 | { place_recv_disconnect, NULL, 0, 0, GNUNET_NO }, | ||
1033 | |||
1034 | { NULL, NULL, 0, 0, GNUNET_NO } | ||
1035 | }; | ||
1036 | |||
1037 | |||
1038 | static struct GNUNET_CLIENT_MANAGER_MessageHandler guest_handlers[] = | ||
1039 | { | ||
1040 | { guest_recv_enter_ack, NULL, | ||
1041 | GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK, | ||
1042 | sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO }, | ||
1043 | |||
1044 | { host_recv_enter_request, NULL, | ||
1045 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, | ||
1046 | sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES }, | ||
1047 | |||
1048 | { place_recv_message, NULL, | ||
1049 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, | ||
1050 | sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES }, | ||
1051 | |||
1052 | { place_recv_message_ack, NULL, | ||
1053 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, | ||
1054 | sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, | ||
1055 | |||
1056 | { guest_recv_join_decision, NULL, | ||
1057 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, | ||
1058 | sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES }, | ||
1059 | |||
1060 | { place_recv_history_result, NULL, | ||
1061 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, | ||
1062 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | ||
1063 | |||
1064 | { place_recv_state_result, NULL, | ||
1065 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, | ||
1066 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | ||
1067 | |||
1068 | { place_recv_result, NULL, | ||
1069 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, | ||
1070 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | ||
1071 | |||
1072 | { place_recv_disconnect, NULL, 0, 0, GNUNET_NO }, | ||
1073 | |||
1074 | { NULL, NULL, 0, 0, GNUNET_NO } | ||
1075 | }; | ||
1076 | |||
1077 | |||
1078 | static struct GNUNET_CLIENT_MANAGER_MessageHandler app_handlers[] = | ||
1079 | { | ||
1080 | { app_recv_ego, NULL, | ||
1081 | GNUNET_MESSAGE_TYPE_SOCIAL_APP_EGO, | ||
1082 | sizeof (struct AppEgoMessage), GNUNET_YES }, | ||
1083 | |||
1084 | { app_recv_ego_end, NULL, | ||
1085 | GNUNET_MESSAGE_TYPE_SOCIAL_APP_EGO_END, | ||
1086 | sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, | ||
1087 | |||
1088 | { app_recv_place, NULL, | ||
1089 | GNUNET_MESSAGE_TYPE_SOCIAL_APP_PLACE, | ||
1090 | sizeof (struct AppPlaceMessage), GNUNET_NO }, | ||
1091 | |||
1092 | { app_recv_place_end, NULL, | ||
1093 | GNUNET_MESSAGE_TYPE_SOCIAL_APP_PLACE_END, | ||
1094 | sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, | ||
1095 | |||
1096 | { app_recv_result, NULL, | ||
1097 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, | ||
1098 | sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, | ||
1099 | |||
1100 | { app_recv_disconnect, NULL, 0, 0, GNUNET_NO }, | ||
1101 | |||
1102 | { NULL, NULL, 0, 0, GNUNET_NO } | ||
1103 | }; | ||
1104 | |||
1105 | |||
1106 | static void | 1010 | static void |
1107 | place_cleanup (struct GNUNET_SOCIAL_Place *plc) | 1011 | place_cleanup (struct GNUNET_SOCIAL_Place *plc) |
1108 | { | 1012 | { |
@@ -1114,18 +1018,26 @@ place_cleanup (struct GNUNET_SOCIAL_Place *plc) | |||
1114 | GNUNET_h2s (&place_pub_hash)); | 1018 | GNUNET_h2s (&place_pub_hash)); |
1115 | 1019 | ||
1116 | if (NULL != plc->tmit) | 1020 | if (NULL != plc->tmit) |
1021 | { | ||
1117 | GNUNET_PSYC_transmit_destroy (plc->tmit); | 1022 | GNUNET_PSYC_transmit_destroy (plc->tmit); |
1118 | if (NULL != plc->connect_msg) | 1023 | plc->tmit = NULL; |
1119 | GNUNET_free (plc->connect_msg); | 1024 | } |
1025 | if (NULL != plc->connect_env) | ||
1026 | { | ||
1027 | GNUNET_MQ_discard (plc->connect_env); | ||
1028 | plc->connect_env = NULL; | ||
1029 | } | ||
1120 | if (NULL != plc->disconnect_cb) | 1030 | if (NULL != plc->disconnect_cb) |
1031 | { | ||
1121 | plc->disconnect_cb (plc->disconnect_cls); | 1032 | plc->disconnect_cb (plc->disconnect_cls); |
1033 | plc->disconnect_cb = NULL; | ||
1034 | } | ||
1122 | } | 1035 | } |
1123 | 1036 | ||
1124 | 1037 | ||
1125 | static void | 1038 | static void |
1126 | host_cleanup (void *cls) | 1039 | host_cleanup (struct GNUNET_SOCIAL_Host *hst) |
1127 | { | 1040 | { |
1128 | struct GNUNET_SOCIAL_Host *hst = cls; | ||
1129 | place_cleanup (&hst->plc); | 1041 | place_cleanup (&hst->plc); |
1130 | if (NULL != hst->slicer) | 1042 | if (NULL != hst->slicer) |
1131 | { | 1043 | { |
@@ -1137,9 +1049,8 @@ host_cleanup (void *cls) | |||
1137 | 1049 | ||
1138 | 1050 | ||
1139 | static void | 1051 | static void |
1140 | guest_cleanup (void *cls) | 1052 | guest_cleanup (struct GNUNET_SOCIAL_Guest *gst) |
1141 | { | 1053 | { |
1142 | struct GNUNET_SOCIAL_Guest *gst = cls; | ||
1143 | place_cleanup (&gst->plc); | 1054 | place_cleanup (&gst->plc); |
1144 | GNUNET_free (gst); | 1055 | GNUNET_free (gst); |
1145 | } | 1056 | } |
@@ -1147,6 +1058,103 @@ guest_cleanup (void *cls) | |||
1147 | 1058 | ||
1148 | /*** HOST ***/ | 1059 | /*** HOST ***/ |
1149 | 1060 | ||
1061 | |||
1062 | static void | ||
1063 | host_connect (struct GNUNET_SOCIAL_Host *hst); | ||
1064 | |||
1065 | |||
1066 | static void | ||
1067 | host_reconnect (void *cls) | ||
1068 | { | ||
1069 | host_connect (cls); | ||
1070 | } | ||
1071 | |||
1072 | |||
1073 | /** | ||
1074 | * Host client disconnected from service. | ||
1075 | * | ||
1076 | * Reconnect after backoff period. | ||
1077 | */ | ||
1078 | static void | ||
1079 | host_disconnected (void *cls, enum GNUNET_MQ_Error error) | ||
1080 | { | ||
1081 | struct GNUNET_SOCIAL_Host *hst = cls; | ||
1082 | struct GNUNET_SOCIAL_Place *plc = &hst->plc; | ||
1083 | |||
1084 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1085 | "Host client disconnected (%d), re-connecting\n", | ||
1086 | (int) error); | ||
1087 | if (NULL != plc->mq) | ||
1088 | { | ||
1089 | GNUNET_MQ_destroy (plc->mq); | ||
1090 | plc->mq = NULL; | ||
1091 | } | ||
1092 | if (NULL != plc->tmit) | ||
1093 | { | ||
1094 | GNUNET_PSYC_transmit_destroy (plc->tmit); | ||
1095 | plc->tmit = NULL; | ||
1096 | } | ||
1097 | |||
1098 | plc->reconnect_task = GNUNET_SCHEDULER_add_delayed (plc->reconnect_delay, | ||
1099 | host_reconnect, | ||
1100 | hst); | ||
1101 | plc->reconnect_delay = GNUNET_TIME_STD_BACKOFF (plc->reconnect_delay); | ||
1102 | } | ||
1103 | |||
1104 | |||
1105 | static void | ||
1106 | host_connect (struct GNUNET_SOCIAL_Host *hst) | ||
1107 | { | ||
1108 | struct GNUNET_SOCIAL_Place *plc = &hst->plc; | ||
1109 | |||
1110 | GNUNET_MQ_hd_fixed_size (host_enter_ack, | ||
1111 | GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK, | ||
1112 | struct HostEnterAck); | ||
1113 | |||
1114 | GNUNET_MQ_hd_var_size (host_enter_request, | ||
1115 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, | ||
1116 | struct GNUNET_PSYC_JoinRequestMessage); | ||
1117 | |||
1118 | GNUNET_MQ_hd_var_size (host_message, | ||
1119 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, | ||
1120 | struct GNUNET_PSYC_MessageHeader); | ||
1121 | |||
1122 | GNUNET_MQ_hd_fixed_size (place_message_ack, | ||
1123 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, | ||
1124 | struct GNUNET_MessageHeader); | ||
1125 | |||
1126 | GNUNET_MQ_hd_var_size (place_history_result, | ||
1127 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, | ||
1128 | struct GNUNET_OperationResultMessage); | ||
1129 | |||
1130 | GNUNET_MQ_hd_var_size (place_state_result, | ||
1131 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, | ||
1132 | struct GNUNET_OperationResultMessage); | ||
1133 | |||
1134 | GNUNET_MQ_hd_var_size (place_result, | ||
1135 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, | ||
1136 | struct GNUNET_OperationResultMessage); | ||
1137 | |||
1138 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
1139 | make_host_enter_ack_handler (hst), | ||
1140 | make_host_enter_request_handler (hst), | ||
1141 | make_host_message_handler (plc), | ||
1142 | make_place_message_ack_handler (plc), | ||
1143 | make_place_history_result_handler (plc), | ||
1144 | make_place_state_result_handler (plc), | ||
1145 | make_place_result_handler (plc), | ||
1146 | GNUNET_MQ_handler_end () | ||
1147 | }; | ||
1148 | |||
1149 | plc->mq = GNUNET_CLIENT_connecT (plc->cfg, "social", | ||
1150 | handlers, host_disconnected, hst); | ||
1151 | GNUNET_assert (NULL != plc->mq); | ||
1152 | plc->tmit = GNUNET_PSYC_transmit_create (plc->mq); | ||
1153 | |||
1154 | GNUNET_MQ_send_copy (plc->mq, plc->connect_env); | ||
1155 | } | ||
1156 | |||
1157 | |||
1150 | /** | 1158 | /** |
1151 | * Enter a place as host. | 1159 | * Enter a place as host. |
1152 | * | 1160 | * |
@@ -1194,10 +1202,7 @@ GNUNET_SOCIAL_host_enter (const struct GNUNET_SOCIAL_App *app, | |||
1194 | hst->farewell_cb = farewell_cb; | 1202 | hst->farewell_cb = farewell_cb; |
1195 | hst->cb_cls = cls; | 1203 | hst->cb_cls = cls; |
1196 | 1204 | ||
1197 | plc->client = GNUNET_CLIENT_MANAGER_connect (plc->cfg, "social", host_handlers); | 1205 | plc->op = GNUNET_OP_create (); |
1198 | GNUNET_CLIENT_MANAGER_set_user_context_ (plc->client, hst, sizeof (*plc)); | ||
1199 | |||
1200 | plc->tmit = GNUNET_PSYC_transmit_create (plc->client); | ||
1201 | 1206 | ||
1202 | hst->slicer = GNUNET_PSYC_slicer_create (); | 1207 | hst->slicer = GNUNET_PSYC_slicer_create (); |
1203 | GNUNET_PSYC_slicer_method_add (hst->slicer, "_notice_place_leave", NULL, | 1208 | GNUNET_PSYC_slicer_method_add (hst->slicer, "_notice_place_leave", NULL, |
@@ -1206,16 +1211,14 @@ GNUNET_SOCIAL_host_enter (const struct GNUNET_SOCIAL_App *app, | |||
1206 | NULL, host_recv_notice_place_leave_eom, hst); | 1211 | NULL, host_recv_notice_place_leave_eom, hst); |
1207 | 1212 | ||
1208 | uint16_t app_id_size = strlen (app->id) + 1; | 1213 | uint16_t app_id_size = strlen (app->id) + 1; |
1209 | struct HostEnterRequest *hreq = GNUNET_malloc (sizeof (*hreq) + app_id_size); | 1214 | struct HostEnterRequest *hreq; |
1210 | hreq->header.size = htons (sizeof (*hreq) + app_id_size); | 1215 | plc->connect_env = GNUNET_MQ_msg_extra (hreq, app_id_size, |
1211 | hreq->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER); | 1216 | GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER); |
1212 | hreq->policy = policy; | 1217 | hreq->policy = policy; |
1213 | hreq->ego_pub_key = ego->pub_key; | 1218 | hreq->ego_pub_key = ego->pub_key; |
1214 | GNUNET_memcpy (&hreq[1], app->id, app_id_size); | 1219 | GNUNET_memcpy (&hreq[1], app->id, app_id_size); |
1215 | 1220 | ||
1216 | plc->connect_msg = &hreq->header; | 1221 | host_connect (hst); |
1217 | place_send_connect_msg (plc); | ||
1218 | |||
1219 | return hst; | 1222 | return hst; |
1220 | } | 1223 | } |
1221 | 1224 | ||
@@ -1250,9 +1253,6 @@ GNUNET_SOCIAL_host_enter_reconnect (struct GNUNET_SOCIAL_HostConnection *hconn, | |||
1250 | struct GNUNET_SOCIAL_Host *hst = GNUNET_malloc (sizeof (*hst)); | 1253 | struct GNUNET_SOCIAL_Host *hst = GNUNET_malloc (sizeof (*hst)); |
1251 | struct GNUNET_SOCIAL_Place *plc = &hst->plc; | 1254 | struct GNUNET_SOCIAL_Place *plc = &hst->plc; |
1252 | 1255 | ||
1253 | size_t app_id_size = strlen (hconn->app->id) + 1; | ||
1254 | struct HostEnterRequest *hreq = GNUNET_malloc (sizeof (*hreq) + app_id_size); | ||
1255 | |||
1256 | hst->enter_cb = enter_cb; | 1256 | hst->enter_cb = enter_cb; |
1257 | hst->answer_door_cb = answer_door_cb; | 1257 | hst->answer_door_cb = answer_door_cb; |
1258 | hst->farewell_cb = farewell_cb; | 1258 | hst->farewell_cb = farewell_cb; |
@@ -1264,10 +1264,7 @@ GNUNET_SOCIAL_host_enter_reconnect (struct GNUNET_SOCIAL_HostConnection *hconn, | |||
1264 | plc->pub_key = hconn->plc_msg.place_pub_key; | 1264 | plc->pub_key = hconn->plc_msg.place_pub_key; |
1265 | plc->ego_pub_key = hconn->plc_msg.ego_pub_key; | 1265 | plc->ego_pub_key = hconn->plc_msg.ego_pub_key; |
1266 | 1266 | ||
1267 | plc->client = GNUNET_CLIENT_MANAGER_connect (plc->cfg, "social", host_handlers); | 1267 | plc->op = GNUNET_OP_create (); |
1268 | GNUNET_CLIENT_MANAGER_set_user_context_ (plc->client, hst, sizeof (*plc)); | ||
1269 | |||
1270 | plc->tmit = GNUNET_PSYC_transmit_create (plc->client); | ||
1271 | 1268 | ||
1272 | hst->slicer = GNUNET_PSYC_slicer_create (); | 1269 | hst->slicer = GNUNET_PSYC_slicer_create (); |
1273 | GNUNET_PSYC_slicer_method_add (hst->slicer, "_notice_place_leave", NULL, | 1270 | GNUNET_PSYC_slicer_method_add (hst->slicer, "_notice_place_leave", NULL, |
@@ -1275,15 +1272,15 @@ GNUNET_SOCIAL_host_enter_reconnect (struct GNUNET_SOCIAL_HostConnection *hconn, | |||
1275 | host_recv_notice_place_leave_modifier, | 1272 | host_recv_notice_place_leave_modifier, |
1276 | NULL, host_recv_notice_place_leave_eom, hst); | 1273 | NULL, host_recv_notice_place_leave_eom, hst); |
1277 | 1274 | ||
1278 | hreq->header.size = htons (sizeof (*hreq) + app_id_size); | 1275 | size_t app_id_size = strlen (hconn->app->id) + 1; |
1279 | hreq->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER); | 1276 | struct HostEnterRequest *hreq; |
1277 | plc->connect_env = GNUNET_MQ_msg_extra (hreq, app_id_size, | ||
1278 | GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER); | ||
1280 | hreq->place_pub_key = hconn->plc_msg.place_pub_key; | 1279 | hreq->place_pub_key = hconn->plc_msg.place_pub_key; |
1281 | hreq->ego_pub_key = hconn->plc_msg.ego_pub_key; | 1280 | hreq->ego_pub_key = hconn->plc_msg.ego_pub_key; |
1282 | GNUNET_memcpy (&hreq[1], hconn->app->id, app_id_size); | 1281 | GNUNET_memcpy (&hreq[1], hconn->app->id, app_id_size); |
1283 | 1282 | ||
1284 | plc->connect_msg = &hreq->header; | 1283 | host_connect (hst); |
1285 | place_send_connect_msg (plc); | ||
1286 | |||
1287 | return hst; | 1284 | return hst; |
1288 | } | 1285 | } |
1289 | 1286 | ||
@@ -1316,6 +1313,7 @@ GNUNET_SOCIAL_host_entry_decision (struct GNUNET_SOCIAL_Host *hst, | |||
1316 | int is_admitted, | 1313 | int is_admitted, |
1317 | const struct GNUNET_PSYC_Message *entry_resp) | 1314 | const struct GNUNET_PSYC_Message *entry_resp) |
1318 | { | 1315 | { |
1316 | struct GNUNET_SOCIAL_Place *plc = &hst->plc; | ||
1319 | struct GNUNET_PSYC_JoinDecisionMessage *dcsn; | 1317 | struct GNUNET_PSYC_JoinDecisionMessage *dcsn; |
1320 | uint16_t entry_resp_size | 1318 | uint16_t entry_resp_size |
1321 | = (NULL != entry_resp) ? ntohs (entry_resp->header.size) : 0; | 1319 | = (NULL != entry_resp) ? ntohs (entry_resp->header.size) : 0; |
@@ -1323,17 +1321,16 @@ GNUNET_SOCIAL_host_entry_decision (struct GNUNET_SOCIAL_Host *hst, | |||
1323 | if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < sizeof (*dcsn) + entry_resp_size) | 1321 | if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < sizeof (*dcsn) + entry_resp_size) |
1324 | return GNUNET_SYSERR; | 1322 | return GNUNET_SYSERR; |
1325 | 1323 | ||
1326 | dcsn = GNUNET_malloc (sizeof (*dcsn) + entry_resp_size); | 1324 | struct GNUNET_MQ_Envelope * |
1327 | dcsn->header.size = htons (sizeof (*dcsn) + entry_resp_size); | 1325 | env = GNUNET_MQ_msg_extra (dcsn, entry_resp_size, |
1328 | dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); | 1326 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); |
1329 | dcsn->is_admitted = htonl (is_admitted); | 1327 | dcsn->is_admitted = htonl (is_admitted); |
1330 | dcsn->slave_pub_key = nym->pub_key; | 1328 | dcsn->slave_pub_key = nym->pub_key; |
1331 | 1329 | ||
1332 | if (0 < entry_resp_size) | 1330 | if (0 < entry_resp_size) |
1333 | GNUNET_memcpy (&dcsn[1], entry_resp, entry_resp_size); | 1331 | GNUNET_memcpy (&dcsn[1], entry_resp, entry_resp_size); |
1334 | 1332 | ||
1335 | GNUNET_CLIENT_MANAGER_transmit (hst->plc.client, &dcsn->header); | 1333 | GNUNET_MQ_send (plc->mq, env); |
1336 | GNUNET_free (dcsn); | ||
1337 | return GNUNET_OK; | 1334 | return GNUNET_OK; |
1338 | } | 1335 | } |
1339 | 1336 | ||
@@ -1524,10 +1521,11 @@ GNUNET_SOCIAL_host_get_place (struct GNUNET_SOCIAL_Host *hst) | |||
1524 | void | 1521 | void |
1525 | place_leave (struct GNUNET_SOCIAL_Place *plc) | 1522 | place_leave (struct GNUNET_SOCIAL_Place *plc) |
1526 | { | 1523 | { |
1527 | struct GNUNET_MessageHeader msg; | 1524 | struct GNUNET_MessageHeader *msg; |
1528 | msg.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_PLACE_LEAVE); | 1525 | struct GNUNET_MQ_Envelope * |
1529 | msg.size = htons (sizeof (msg)); | 1526 | env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SOCIAL_PLACE_LEAVE); |
1530 | GNUNET_CLIENT_MANAGER_transmit (plc->client, &msg); | 1527 | |
1528 | GNUNET_MQ_send (plc->mq, env); | ||
1531 | } | 1529 | } |
1532 | 1530 | ||
1533 | 1531 | ||
@@ -1539,10 +1537,12 @@ place_disconnect (struct GNUNET_SOCIAL_Place *plc, | |||
1539 | plc->disconnect_cb = disconnect_cb; | 1537 | plc->disconnect_cb = disconnect_cb; |
1540 | plc->disconnect_cls = disconnect_cls; | 1538 | plc->disconnect_cls = disconnect_cls; |
1541 | 1539 | ||
1542 | GNUNET_CLIENT_MANAGER_disconnect (plc->client, GNUNET_YES, | 1540 | // FIXME: wait till queued messages are sent |
1543 | GNUNET_YES == plc->is_host | 1541 | if (NULL != plc->mq) |
1544 | ? host_cleanup : guest_cleanup, | 1542 | { |
1545 | plc); | 1543 | GNUNET_MQ_destroy (plc->mq); |
1544 | plc->mq = NULL; | ||
1545 | } | ||
1546 | } | 1546 | } |
1547 | 1547 | ||
1548 | 1548 | ||
@@ -1560,6 +1560,7 @@ GNUNET_SOCIAL_host_disconnect (struct GNUNET_SOCIAL_Host *hst, | |||
1560 | void *cls) | 1560 | void *cls) |
1561 | { | 1561 | { |
1562 | place_disconnect (&hst->plc, disconnect_cb, cls); | 1562 | place_disconnect (&hst->plc, disconnect_cb, cls); |
1563 | host_cleanup (hst); | ||
1563 | } | 1564 | } |
1564 | 1565 | ||
1565 | 1566 | ||
@@ -1595,7 +1596,104 @@ GNUNET_SOCIAL_host_leave (struct GNUNET_SOCIAL_Host *hst, | |||
1595 | 1596 | ||
1596 | /*** GUEST ***/ | 1597 | /*** GUEST ***/ |
1597 | 1598 | ||
1598 | static struct GuestEnterRequest * | 1599 | |
1600 | static void | ||
1601 | guest_connect (struct GNUNET_SOCIAL_Guest *gst); | ||
1602 | |||
1603 | |||
1604 | static void | ||
1605 | guest_reconnect (void *cls) | ||
1606 | { | ||
1607 | guest_connect (cls); | ||
1608 | } | ||
1609 | |||
1610 | |||
1611 | /** | ||
1612 | * Guest client disconnected from service. | ||
1613 | * | ||
1614 | * Reconnect after backoff period. | ||
1615 | */ | ||
1616 | static void | ||
1617 | guest_disconnected (void *cls, enum GNUNET_MQ_Error error) | ||
1618 | { | ||
1619 | struct GNUNET_SOCIAL_Guest *gst = cls; | ||
1620 | struct GNUNET_SOCIAL_Place *plc = &gst->plc; | ||
1621 | |||
1622 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1623 | "Guest client disconnected (%d), re-connecting\n", | ||
1624 | (int) error); | ||
1625 | if (NULL != plc->mq) | ||
1626 | { | ||
1627 | GNUNET_MQ_destroy (plc->mq); | ||
1628 | plc->mq = NULL; | ||
1629 | } | ||
1630 | if (NULL != plc->tmit) | ||
1631 | { | ||
1632 | GNUNET_PSYC_transmit_destroy (plc->tmit); | ||
1633 | plc->tmit = NULL; | ||
1634 | } | ||
1635 | |||
1636 | plc->reconnect_task = GNUNET_SCHEDULER_add_delayed (plc->reconnect_delay, | ||
1637 | guest_reconnect, | ||
1638 | gst); | ||
1639 | plc->reconnect_delay = GNUNET_TIME_STD_BACKOFF (plc->reconnect_delay); | ||
1640 | } | ||
1641 | |||
1642 | |||
1643 | static void | ||
1644 | guest_connect (struct GNUNET_SOCIAL_Guest *gst) | ||
1645 | { | ||
1646 | struct GNUNET_SOCIAL_Place *plc = &gst->plc; | ||
1647 | |||
1648 | GNUNET_MQ_hd_fixed_size (guest_enter_ack, | ||
1649 | GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK, | ||
1650 | struct GNUNET_PSYC_CountersResultMessage); | ||
1651 | |||
1652 | GNUNET_MQ_hd_var_size (guest_enter_decision, | ||
1653 | GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, | ||
1654 | struct GNUNET_PSYC_JoinDecisionMessage); | ||
1655 | |||
1656 | GNUNET_MQ_hd_var_size (place_message, | ||
1657 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, | ||
1658 | struct GNUNET_PSYC_MessageHeader); | ||
1659 | |||
1660 | GNUNET_MQ_hd_fixed_size (place_message_ack, | ||
1661 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, | ||
1662 | struct GNUNET_MessageHeader); | ||
1663 | |||
1664 | GNUNET_MQ_hd_var_size (place_history_result, | ||
1665 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, | ||
1666 | struct GNUNET_OperationResultMessage); | ||
1667 | |||
1668 | GNUNET_MQ_hd_var_size (place_state_result, | ||
1669 | GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, | ||
1670 | struct GNUNET_OperationResultMessage); | ||
1671 | |||
1672 | GNUNET_MQ_hd_var_size (place_result, | ||
1673 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, | ||
1674 | struct GNUNET_OperationResultMessage); | ||
1675 | |||
1676 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
1677 | make_guest_enter_ack_handler (gst), | ||
1678 | make_guest_enter_decision_handler (gst), | ||
1679 | make_place_message_handler (plc), | ||
1680 | make_place_message_ack_handler (plc), | ||
1681 | make_place_history_result_handler (plc), | ||
1682 | make_place_state_result_handler (plc), | ||
1683 | make_place_result_handler (plc), | ||
1684 | GNUNET_MQ_handler_end () | ||
1685 | }; | ||
1686 | |||
1687 | plc->mq = GNUNET_CLIENT_connecT (plc->cfg, "social", | ||
1688 | handlers, guest_disconnected, gst); | ||
1689 | GNUNET_assert (NULL != plc->mq); | ||
1690 | plc->tmit = GNUNET_PSYC_transmit_create (plc->mq); | ||
1691 | |||
1692 | GNUNET_MQ_send_copy (plc->mq, plc->connect_env); | ||
1693 | } | ||
1694 | |||
1695 | |||
1696 | static struct GNUNET_MQ_Envelope * | ||
1599 | guest_enter_request_create (const char *app_id, | 1697 | guest_enter_request_create (const char *app_id, |
1600 | const struct GNUNET_CRYPTO_EcdsaPublicKey *ego_pub_key, | 1698 | const struct GNUNET_CRYPTO_EcdsaPublicKey *ego_pub_key, |
1601 | const struct GNUNET_CRYPTO_EddsaPublicKey *place_pub_key, | 1699 | const struct GNUNET_CRYPTO_EddsaPublicKey *place_pub_key, |
@@ -1608,11 +1706,10 @@ guest_enter_request_create (const char *app_id, | |||
1608 | uint16_t join_msg_size = ntohs (join_msg->header.size); | 1706 | uint16_t join_msg_size = ntohs (join_msg->header.size); |
1609 | uint16_t relay_size = relay_count * sizeof (*relays); | 1707 | uint16_t relay_size = relay_count * sizeof (*relays); |
1610 | 1708 | ||
1611 | struct GuestEnterRequest * | 1709 | struct GuestEnterRequest *greq; |
1612 | greq = GNUNET_malloc (sizeof (*greq) + app_id_size + relay_size + join_msg_size); | 1710 | struct GNUNET_MQ_Envelope * |
1613 | 1711 | env = GNUNET_MQ_msg_extra (greq, app_id_size + relay_size + join_msg_size, | |
1614 | greq->header.size = htons (sizeof (*greq) + app_id_size + relay_size + join_msg_size); | 1712 | GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER); |
1615 | greq->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER); | ||
1616 | greq->place_pub_key = *place_pub_key; | 1713 | greq->place_pub_key = *place_pub_key; |
1617 | greq->ego_pub_key = *ego_pub_key; | 1714 | greq->ego_pub_key = *ego_pub_key; |
1618 | greq->origin = *origin; | 1715 | greq->origin = *origin; |
@@ -1629,7 +1726,7 @@ guest_enter_request_create (const char *app_id, | |||
1629 | } | 1726 | } |
1630 | 1727 | ||
1631 | GNUNET_memcpy (p, join_msg, join_msg_size); | 1728 | GNUNET_memcpy (p, join_msg, join_msg_size); |
1632 | return greq; | 1729 | return env; |
1633 | } | 1730 | } |
1634 | 1731 | ||
1635 | 1732 | ||
@@ -1686,20 +1783,17 @@ GNUNET_SOCIAL_guest_enter (const struct GNUNET_SOCIAL_App *app, | |||
1686 | plc->is_host = GNUNET_NO; | 1783 | plc->is_host = GNUNET_NO; |
1687 | plc->slicer = slicer; | 1784 | plc->slicer = slicer; |
1688 | 1785 | ||
1786 | plc->op = GNUNET_OP_create (); | ||
1787 | |||
1788 | plc->connect_env | ||
1789 | = guest_enter_request_create (app->id, &ego->pub_key, &plc->pub_key, | ||
1790 | origin, relay_count, relays, entry_msg); | ||
1791 | |||
1689 | gst->enter_cb = local_enter_cb; | 1792 | gst->enter_cb = local_enter_cb; |
1690 | gst->entry_dcsn_cb = entry_dcsn_cb; | 1793 | gst->entry_dcsn_cb = entry_dcsn_cb; |
1691 | gst->cb_cls = cls; | 1794 | gst->cb_cls = cls; |
1692 | 1795 | ||
1693 | plc->client = GNUNET_CLIENT_MANAGER_connect (plc->cfg, "social", guest_handlers); | 1796 | guest_connect (gst); |
1694 | GNUNET_CLIENT_MANAGER_set_user_context_ (plc->client, gst, sizeof (*plc)); | ||
1695 | |||
1696 | plc->tmit = GNUNET_PSYC_transmit_create (plc->client); | ||
1697 | |||
1698 | struct GuestEnterRequest * | ||
1699 | greq = guest_enter_request_create (app->id, &ego->pub_key, &plc->pub_key, | ||
1700 | origin, relay_count, relays, entry_msg); | ||
1701 | plc->connect_msg = &greq->header; | ||
1702 | place_send_connect_msg (plc); | ||
1703 | return gst; | 1797 | return gst; |
1704 | } | 1798 | } |
1705 | 1799 | ||
@@ -1755,11 +1849,12 @@ GNUNET_SOCIAL_guest_enter_by_name (const struct GNUNET_SOCIAL_App *app, | |||
1755 | if (NULL != join_msg) | 1849 | if (NULL != join_msg) |
1756 | join_msg_size = ntohs (join_msg->header.size); | 1850 | join_msg_size = ntohs (join_msg->header.size); |
1757 | 1851 | ||
1758 | uint16_t greq_size = sizeof (struct GuestEnterByNameRequest) | 1852 | struct GuestEnterByNameRequest *greq; |
1759 | + app_id_size + gns_name_size + password_size + join_msg_size; | 1853 | plc->connect_env |
1760 | struct GuestEnterByNameRequest *greq = GNUNET_malloc (greq_size); | 1854 | = GNUNET_MQ_msg_extra (greq, app_id_size + gns_name_size |
1761 | greq->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_BY_NAME); | 1855 | + password_size + join_msg_size, |
1762 | greq->header.size = htons (greq_size); | 1856 | GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_BY_NAME); |
1857 | |||
1763 | greq->ego_pub_key = ego->pub_key; | 1858 | greq->ego_pub_key = ego->pub_key; |
1764 | 1859 | ||
1765 | char *p = (char *) &greq[1]; | 1860 | char *p = (char *) &greq[1]; |
@@ -1772,23 +1867,18 @@ GNUNET_SOCIAL_guest_enter_by_name (const struct GNUNET_SOCIAL_App *app, | |||
1772 | if (NULL != join_msg) | 1867 | if (NULL != join_msg) |
1773 | GNUNET_memcpy (p, join_msg, join_msg_size); | 1868 | GNUNET_memcpy (p, join_msg, join_msg_size); |
1774 | 1869 | ||
1775 | gst->enter_cb = local_enter_cb; | ||
1776 | gst->entry_dcsn_cb = entry_decision_cb; | ||
1777 | gst->cb_cls = cls; | ||
1778 | |||
1779 | plc->ego_pub_key = ego->pub_key; | 1870 | plc->ego_pub_key = ego->pub_key; |
1780 | plc->cfg = app->cfg; | 1871 | plc->cfg = app->cfg; |
1781 | plc->is_host = GNUNET_NO; | 1872 | plc->is_host = GNUNET_NO; |
1782 | plc->slicer = slicer; | 1873 | plc->slicer = slicer; |
1783 | 1874 | ||
1784 | plc->client = GNUNET_CLIENT_MANAGER_connect (app->cfg, "social", guest_handlers); | 1875 | plc->op = GNUNET_OP_create (); |
1785 | GNUNET_CLIENT_MANAGER_set_user_context_ (plc->client, gst, sizeof (*plc)); | ||
1786 | |||
1787 | plc->tmit = GNUNET_PSYC_transmit_create (plc->client); | ||
1788 | 1876 | ||
1789 | plc->connect_msg = &greq->header; | 1877 | gst->enter_cb = local_enter_cb; |
1790 | place_send_connect_msg (plc); | 1878 | gst->entry_dcsn_cb = entry_decision_cb; |
1879 | gst->cb_cls = cls; | ||
1791 | 1880 | ||
1881 | guest_connect (gst); | ||
1792 | return gst; | 1882 | return gst; |
1793 | } | 1883 | } |
1794 | 1884 | ||
@@ -1821,33 +1911,28 @@ GNUNET_SOCIAL_guest_enter_reconnect (struct GNUNET_SOCIAL_GuestConnection *gconn | |||
1821 | struct GNUNET_SOCIAL_Place *plc = &gst->plc; | 1911 | struct GNUNET_SOCIAL_Place *plc = &gst->plc; |
1822 | 1912 | ||
1823 | uint16_t app_id_size = strlen (gconn->app->id) + 1; | 1913 | uint16_t app_id_size = strlen (gconn->app->id) + 1; |
1824 | uint16_t greq_size = sizeof (struct GuestEnterRequest) + app_id_size; | 1914 | struct GuestEnterRequest *greq; |
1825 | struct GuestEnterRequest *greq = GNUNET_malloc (greq_size); | 1915 | plc->connect_env |
1826 | greq->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER); | 1916 | = GNUNET_MQ_msg_extra (greq, app_id_size, |
1827 | greq->header.size = htons (greq_size); | 1917 | GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER); |
1828 | greq->ego_pub_key = gconn->plc_msg.ego_pub_key; | 1918 | greq->ego_pub_key = gconn->plc_msg.ego_pub_key; |
1829 | greq->place_pub_key = gconn->plc_msg.place_pub_key; | 1919 | greq->place_pub_key = gconn->plc_msg.place_pub_key; |
1830 | greq->flags = htonl (flags); | 1920 | greq->flags = htonl (flags); |
1831 | 1921 | ||
1832 | GNUNET_memcpy (&greq[1], gconn->app->id, app_id_size); | 1922 | GNUNET_memcpy (&greq[1], gconn->app->id, app_id_size); |
1833 | 1923 | ||
1834 | gst->enter_cb = local_enter_cb; | ||
1835 | gst->cb_cls = cls; | ||
1836 | |||
1837 | plc->cfg = gconn->app->cfg; | 1924 | plc->cfg = gconn->app->cfg; |
1838 | plc->is_host = GNUNET_NO; | 1925 | plc->is_host = GNUNET_NO; |
1839 | plc->slicer = slicer; | 1926 | plc->slicer = slicer; |
1840 | plc->pub_key = gconn->plc_msg.place_pub_key; | 1927 | plc->pub_key = gconn->plc_msg.place_pub_key; |
1841 | plc->ego_pub_key = gconn->plc_msg.ego_pub_key; | 1928 | plc->ego_pub_key = gconn->plc_msg.ego_pub_key; |
1842 | 1929 | ||
1843 | plc->client = GNUNET_CLIENT_MANAGER_connect (plc->cfg, "social", guest_handlers); | 1930 | plc->op = GNUNET_OP_create (); |
1844 | GNUNET_CLIENT_MANAGER_set_user_context_ (plc->client, gst, sizeof (*plc)); | ||
1845 | |||
1846 | plc->tmit = GNUNET_PSYC_transmit_create (plc->client); | ||
1847 | 1931 | ||
1848 | plc->connect_msg = &greq->header; | 1932 | gst->enter_cb = local_enter_cb; |
1849 | place_send_connect_msg (plc); | 1933 | gst->cb_cls = cls; |
1850 | 1934 | ||
1935 | guest_connect (gst); | ||
1851 | return gst; | 1936 | return gst; |
1852 | } | 1937 | } |
1853 | 1938 | ||
@@ -1931,6 +2016,7 @@ GNUNET_SOCIAL_guest_disconnect (struct GNUNET_SOCIAL_Guest *gst, | |||
1931 | void *cls) | 2016 | void *cls) |
1932 | { | 2017 | { |
1933 | place_disconnect (&gst->plc, disconnect_cb, cls); | 2018 | place_disconnect (&gst->plc, disconnect_cb, cls); |
2019 | guest_cleanup (gst); | ||
1934 | } | 2020 | } |
1935 | 2021 | ||
1936 | 2022 | ||
@@ -2015,15 +2101,14 @@ GNUNET_SOCIAL_place_msg_proc_set (struct GNUNET_SOCIAL_Place *plc, | |||
2015 | GNUNET_SERVER_MAX_MESSAGE_SIZE | 2101 | GNUNET_SERVER_MAX_MESSAGE_SIZE |
2016 | - sizeof (*mpreq)) + 1; | 2102 | - sizeof (*mpreq)) + 1; |
2017 | GNUNET_assert ('\0' == method_prefix[method_size - 1]); | 2103 | GNUNET_assert ('\0' == method_prefix[method_size - 1]); |
2018 | mpreq = GNUNET_malloc (sizeof (*mpreq) + method_size); | ||
2019 | 2104 | ||
2020 | mpreq->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_MSG_PROC_SET); | 2105 | struct GNUNET_MQ_Envelope * |
2021 | mpreq->header.size = htons (sizeof (*mpreq) + method_size); | 2106 | env = GNUNET_MQ_msg_extra (mpreq, method_size, |
2107 | GNUNET_MESSAGE_TYPE_SOCIAL_MSG_PROC_SET); | ||
2022 | mpreq->flags = htonl (flags); | 2108 | mpreq->flags = htonl (flags); |
2023 | GNUNET_memcpy (&mpreq[1], method_prefix, method_size); | 2109 | GNUNET_memcpy (&mpreq[1], method_prefix, method_size); |
2024 | 2110 | ||
2025 | GNUNET_CLIENT_MANAGER_transmit (plc->client, &mpreq->header); | 2111 | GNUNET_MQ_send (plc->mq, env); |
2026 | GNUNET_free (mpreq); | ||
2027 | } | 2112 | } |
2028 | 2113 | ||
2029 | 2114 | ||
@@ -2033,10 +2118,11 @@ GNUNET_SOCIAL_place_msg_proc_set (struct GNUNET_SOCIAL_Place *plc, | |||
2033 | void | 2118 | void |
2034 | GNUNET_SOCIAL_place_msg_proc_clear (struct GNUNET_SOCIAL_Place *plc) | 2119 | GNUNET_SOCIAL_place_msg_proc_clear (struct GNUNET_SOCIAL_Place *plc) |
2035 | { | 2120 | { |
2036 | struct GNUNET_MessageHeader req; | 2121 | struct GNUNET_MessageHeader *req; |
2037 | req.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_MSG_PROC_CLEAR); | 2122 | struct GNUNET_MQ_Envelope * |
2038 | req.size = htons (sizeof (req)); | 2123 | env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_SOCIAL_MSG_PROC_CLEAR); |
2039 | GNUNET_CLIENT_MANAGER_transmit (plc->client, &req); | 2124 | |
2125 | GNUNET_MQ_send (plc->mq, env); | ||
2040 | } | 2126 | } |
2041 | 2127 | ||
2042 | 2128 | ||
@@ -2057,17 +2143,17 @@ place_history_replay (struct GNUNET_SOCIAL_Place *plc, | |||
2057 | hist->slicer = slicer; | 2143 | hist->slicer = slicer; |
2058 | hist->result_cb = result_cb; | 2144 | hist->result_cb = result_cb; |
2059 | hist->cls = cls; | 2145 | hist->cls = cls; |
2060 | hist->op_id = GNUNET_CLIENT_MANAGER_op_add (plc->client, | 2146 | hist->op_id = GNUNET_OP_add (plc->op, op_recv_history_result, hist, NULL); |
2061 | &op_recv_history_result, hist); | ||
2062 | 2147 | ||
2063 | GNUNET_assert (NULL != method_prefix); | 2148 | GNUNET_assert (NULL != method_prefix); |
2064 | uint16_t method_size = strnlen (method_prefix, | 2149 | uint16_t method_size = strnlen (method_prefix, |
2065 | GNUNET_SERVER_MAX_MESSAGE_SIZE | 2150 | GNUNET_SERVER_MAX_MESSAGE_SIZE |
2066 | - sizeof (*req)) + 1; | 2151 | - sizeof (*req)) + 1; |
2067 | GNUNET_assert ('\0' == method_prefix[method_size - 1]); | 2152 | GNUNET_assert ('\0' == method_prefix[method_size - 1]); |
2068 | req = GNUNET_malloc (sizeof (*req) + method_size); | 2153 | |
2069 | req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); | 2154 | struct GNUNET_MQ_Envelope * |
2070 | req->header.size = htons (sizeof (*req) + method_size); | 2155 | env = GNUNET_MQ_msg_extra (req, method_size, |
2156 | GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); | ||
2071 | req->start_message_id = GNUNET_htonll (start_message_id); | 2157 | req->start_message_id = GNUNET_htonll (start_message_id); |
2072 | req->end_message_id = GNUNET_htonll (end_message_id); | 2158 | req->end_message_id = GNUNET_htonll (end_message_id); |
2073 | req->message_limit = GNUNET_htonll (message_limit); | 2159 | req->message_limit = GNUNET_htonll (message_limit); |
@@ -2075,8 +2161,7 @@ place_history_replay (struct GNUNET_SOCIAL_Place *plc, | |||
2075 | req->op_id = GNUNET_htonll (hist->op_id); | 2161 | req->op_id = GNUNET_htonll (hist->op_id); |
2076 | GNUNET_memcpy (&req[1], method_prefix, method_size); | 2162 | GNUNET_memcpy (&req[1], method_prefix, method_size); |
2077 | 2163 | ||
2078 | GNUNET_CLIENT_MANAGER_transmit (plc->client, &req->header); | 2164 | GNUNET_MQ_send (plc->mq, env); |
2079 | GNUNET_free (req); | ||
2080 | return hist; | 2165 | return hist; |
2081 | } | 2166 | } |
2082 | 2167 | ||
@@ -2165,7 +2250,7 @@ GNUNET_SOCIAL_place_history_replay_latest (struct GNUNET_SOCIAL_Place *plc, | |||
2165 | void | 2250 | void |
2166 | GNUNET_SOCIAL_place_history_replay_cancel (struct GNUNET_SOCIAL_HistoryRequest *hist) | 2251 | GNUNET_SOCIAL_place_history_replay_cancel (struct GNUNET_SOCIAL_HistoryRequest *hist) |
2167 | { | 2252 | { |
2168 | GNUNET_CLIENT_MANAGER_op_cancel (hist->plc->client, hist->op_id); | 2253 | GNUNET_OP_remove (hist->plc->op, hist->op_id); |
2169 | GNUNET_free (hist); | 2254 | GNUNET_free (hist); |
2170 | } | 2255 | } |
2171 | 2256 | ||
@@ -2185,20 +2270,17 @@ place_state_get (struct GNUNET_SOCIAL_Place *plc, | |||
2185 | look->var_cb = var_cb; | 2270 | look->var_cb = var_cb; |
2186 | look->result_cb = result_cb; | 2271 | look->result_cb = result_cb; |
2187 | look->cls = cls; | 2272 | look->cls = cls; |
2188 | look->op_id = GNUNET_CLIENT_MANAGER_op_add (plc->client, | 2273 | look->op_id = GNUNET_OP_add (plc->op, &op_recv_state_result, look, NULL); |
2189 | &op_recv_state_result, look); | ||
2190 | 2274 | ||
2191 | GNUNET_assert (NULL != name); | 2275 | GNUNET_assert (NULL != name); |
2192 | size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE | 2276 | size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE |
2193 | - sizeof (*req)) + 1; | 2277 | - sizeof (*req)) + 1; |
2194 | req = GNUNET_malloc (sizeof (*req) + name_size); | 2278 | struct GNUNET_MQ_Envelope * |
2195 | req->header.type = htons (type); | 2279 | env = GNUNET_MQ_msg_extra (req, name_size, type); |
2196 | req->header.size = htons (sizeof (*req) + name_size); | ||
2197 | req->op_id = GNUNET_htonll (look->op_id); | 2280 | req->op_id = GNUNET_htonll (look->op_id); |
2198 | GNUNET_memcpy (&req[1], name, name_size); | 2281 | GNUNET_memcpy (&req[1], name, name_size); |
2199 | 2282 | ||
2200 | GNUNET_CLIENT_MANAGER_transmit (plc->client, &req->header); | 2283 | GNUNET_MQ_send (plc->mq, env); |
2201 | GNUNET_free (req); | ||
2202 | return look; | 2284 | return look; |
2203 | } | 2285 | } |
2204 | 2286 | ||
@@ -2265,7 +2347,7 @@ GNUNET_SOCIAL_place_look_for (struct GNUNET_SOCIAL_Place *plc, | |||
2265 | void | 2347 | void |
2266 | GNUNET_SOCIAL_place_look_cancel (struct GNUNET_SOCIAL_LookHandle *look) | 2348 | GNUNET_SOCIAL_place_look_cancel (struct GNUNET_SOCIAL_LookHandle *look) |
2267 | { | 2349 | { |
2268 | GNUNET_CLIENT_MANAGER_op_cancel (look->plc->client, look->op_id); | 2350 | GNUNET_OP_remove (look->plc->op, look->op_id); |
2269 | GNUNET_free (look); | 2351 | GNUNET_free (look); |
2270 | } | 2352 | } |
2271 | 2353 | ||
@@ -2331,14 +2413,14 @@ GNUNET_SOCIAL_zone_add_place (const struct GNUNET_SOCIAL_App *app, | |||
2331 | size_t name_size = strlen (name) + 1; | 2413 | size_t name_size = strlen (name) + 1; |
2332 | size_t password_size = strlen (password) + 1; | 2414 | size_t password_size = strlen (password) + 1; |
2333 | size_t relay_size = relay_count * sizeof (*relays); | 2415 | size_t relay_size = relay_count * sizeof (*relays); |
2334 | size_t preq_size = sizeof (*preq) + name_size + password_size + relay_size; | 2416 | size_t payload_size = name_size + password_size + relay_size; |
2335 | 2417 | ||
2336 | if (GNUNET_SERVER_MAX_MESSAGE_SIZE < preq_size) | 2418 | if (GNUNET_SERVER_MAX_MESSAGE_SIZE < sizeof (*preq) + payload_size) |
2337 | return GNUNET_SYSERR; | 2419 | return GNUNET_SYSERR; |
2338 | 2420 | ||
2339 | preq = GNUNET_malloc (preq_size); | 2421 | struct GNUNET_MQ_Envelope * |
2340 | preq->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_ZONE_ADD_PLACE); | 2422 | env = GNUNET_MQ_msg_extra (preq, payload_size, |
2341 | preq->header.size = htons (preq_size); | 2423 | GNUNET_MESSAGE_TYPE_SOCIAL_ZONE_ADD_PLACE); |
2342 | preq->expiration_time = GNUNET_htonll (expiration_time.abs_value_us); | 2424 | preq->expiration_time = GNUNET_htonll (expiration_time.abs_value_us); |
2343 | preq->ego_pub_key = ego->pub_key; | 2425 | preq->ego_pub_key = ego->pub_key; |
2344 | preq->place_pub_key = *place_pub_key; | 2426 | preq->place_pub_key = *place_pub_key; |
@@ -2357,10 +2439,11 @@ GNUNET_SOCIAL_zone_add_place (const struct GNUNET_SOCIAL_App *app, | |||
2357 | add_plc->result_cb = result_cb; | 2439 | add_plc->result_cb = result_cb; |
2358 | add_plc->result_cls = result_cls; | 2440 | add_plc->result_cls = result_cls; |
2359 | 2441 | ||
2360 | preq->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (app->client, | 2442 | preq->op_id = GNUNET_htonll (GNUNET_OP_add (app->op, |
2361 | op_recv_zone_add_place_result, | 2443 | op_recv_zone_add_place_result, |
2362 | add_plc)); | 2444 | add_plc, NULL)); |
2363 | GNUNET_CLIENT_MANAGER_transmit_now (app->client, &preq->header); | 2445 | |
2446 | GNUNET_MQ_send (app->mq, env); | ||
2364 | return GNUNET_OK; | 2447 | return GNUNET_OK; |
2365 | } | 2448 | } |
2366 | 2449 | ||
@@ -2376,7 +2459,6 @@ op_recv_zone_add_nym_result (void *cls, int64_t result, | |||
2376 | if (NULL != add_nym->result_cb) | 2459 | if (NULL != add_nym->result_cb) |
2377 | add_nym->result_cb (add_nym->result_cls, result, err_msg, err_msg_size); | 2460 | add_nym->result_cb (add_nym->result_cls, result, err_msg, err_msg_size); |
2378 | 2461 | ||
2379 | GNUNET_free (add_nym->req); | ||
2380 | GNUNET_free (add_nym); | 2462 | GNUNET_free (add_nym); |
2381 | } | 2463 | } |
2382 | 2464 | ||
@@ -2417,27 +2499,106 @@ GNUNET_SOCIAL_zone_add_nym (const struct GNUNET_SOCIAL_App *app, | |||
2417 | if (GNUNET_SERVER_MAX_MESSAGE_SIZE < sizeof (*nreq) + name_size) | 2499 | if (GNUNET_SERVER_MAX_MESSAGE_SIZE < sizeof (*nreq) + name_size) |
2418 | return GNUNET_SYSERR; | 2500 | return GNUNET_SYSERR; |
2419 | 2501 | ||
2420 | nreq = GNUNET_malloc (sizeof (*nreq) + name_size); | 2502 | struct GNUNET_MQ_Envelope * |
2421 | nreq->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_ZONE_ADD_NYM); | 2503 | env = GNUNET_MQ_msg_extra (nreq, name_size, |
2422 | nreq->header.size = htons (sizeof (*nreq) + name_size); | 2504 | GNUNET_MESSAGE_TYPE_SOCIAL_ZONE_ADD_NYM); |
2423 | nreq->expiration_time = GNUNET_htonll (expiration_time.abs_value_us); | 2505 | nreq->expiration_time = GNUNET_htonll (expiration_time.abs_value_us); |
2424 | nreq->ego_pub_key = ego->pub_key; | 2506 | nreq->ego_pub_key = ego->pub_key; |
2425 | nreq->nym_pub_key = *nym_pub_key; | 2507 | nreq->nym_pub_key = *nym_pub_key; |
2426 | GNUNET_memcpy (&nreq[1], name, name_size); | 2508 | GNUNET_memcpy (&nreq[1], name, name_size); |
2427 | 2509 | ||
2428 | struct ZoneAddNymHandle * add_nym = GNUNET_malloc (sizeof (*add_nym)); | 2510 | struct ZoneAddNymHandle *add_nym = GNUNET_malloc (sizeof (*add_nym)); |
2429 | add_nym->req = nreq; | ||
2430 | add_nym->result_cb = result_cb; | 2511 | add_nym->result_cb = result_cb; |
2431 | add_nym->result_cls = result_cls; | 2512 | add_nym->result_cls = result_cls; |
2432 | 2513 | ||
2433 | nreq->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (app->client, | 2514 | nreq->op_id = GNUNET_htonll (GNUNET_OP_add (app->op, |
2434 | op_recv_zone_add_nym_result, | 2515 | op_recv_zone_add_nym_result, |
2435 | add_nym)); | 2516 | add_nym, NULL)); |
2436 | GNUNET_CLIENT_MANAGER_transmit_now (app->client, &nreq->header); | 2517 | |
2518 | GNUNET_MQ_send (app->mq, env); | ||
2437 | return GNUNET_OK; | 2519 | return GNUNET_OK; |
2438 | } | 2520 | } |
2439 | 2521 | ||
2440 | 2522 | ||
2523 | /*** APP ***/ | ||
2524 | |||
2525 | |||
2526 | static void | ||
2527 | app_connect (struct GNUNET_SOCIAL_App *app); | ||
2528 | |||
2529 | |||
2530 | static void | ||
2531 | app_reconnect (void *cls) | ||
2532 | { | ||
2533 | app_connect (cls); | ||
2534 | } | ||
2535 | |||
2536 | |||
2537 | /** | ||
2538 | * App client disconnected from service. | ||
2539 | * | ||
2540 | * Reconnect after backoff period. | ||
2541 | */ | ||
2542 | static void | ||
2543 | app_disconnected (void *cls, enum GNUNET_MQ_Error error) | ||
2544 | { | ||
2545 | struct GNUNET_SOCIAL_App *app = cls; | ||
2546 | |||
2547 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
2548 | "App client disconnected (%d), re-connecting\n", | ||
2549 | (int) error); | ||
2550 | if (NULL != app->mq) | ||
2551 | { | ||
2552 | GNUNET_MQ_destroy (app->mq); | ||
2553 | app->mq = NULL; | ||
2554 | } | ||
2555 | |||
2556 | app->reconnect_task = GNUNET_SCHEDULER_add_delayed (app->reconnect_delay, | ||
2557 | app_reconnect, | ||
2558 | app); | ||
2559 | app->reconnect_delay = GNUNET_TIME_STD_BACKOFF (app->reconnect_delay); | ||
2560 | } | ||
2561 | |||
2562 | |||
2563 | static void | ||
2564 | app_connect (struct GNUNET_SOCIAL_App *app) | ||
2565 | { | ||
2566 | GNUNET_MQ_hd_var_size (app_ego, | ||
2567 | GNUNET_MESSAGE_TYPE_SOCIAL_APP_EGO, | ||
2568 | struct AppEgoMessage); | ||
2569 | |||
2570 | GNUNET_MQ_hd_fixed_size (app_ego_end, | ||
2571 | GNUNET_MESSAGE_TYPE_SOCIAL_APP_EGO_END, | ||
2572 | struct GNUNET_MessageHeader); | ||
2573 | |||
2574 | GNUNET_MQ_hd_var_size (app_place, | ||
2575 | GNUNET_MESSAGE_TYPE_SOCIAL_APP_PLACE, | ||
2576 | struct AppPlaceMessage); | ||
2577 | |||
2578 | GNUNET_MQ_hd_fixed_size (app_place_end, | ||
2579 | GNUNET_MESSAGE_TYPE_SOCIAL_APP_PLACE_END, | ||
2580 | struct GNUNET_MessageHeader); | ||
2581 | |||
2582 | GNUNET_MQ_hd_var_size (app_result, | ||
2583 | GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, | ||
2584 | struct GNUNET_OperationResultMessage); | ||
2585 | |||
2586 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
2587 | make_app_ego_handler (app), | ||
2588 | make_app_ego_end_handler (app), | ||
2589 | make_app_place_handler (app), | ||
2590 | make_app_place_end_handler (app), | ||
2591 | make_app_result_handler (app), | ||
2592 | GNUNET_MQ_handler_end () | ||
2593 | }; | ||
2594 | |||
2595 | app->mq = GNUNET_CLIENT_connecT (app->cfg, "social", | ||
2596 | handlers, app_disconnected, app); | ||
2597 | GNUNET_assert (NULL != app->mq); | ||
2598 | GNUNET_MQ_send_copy (app->mq, app->connect_env); | ||
2599 | } | ||
2600 | |||
2601 | |||
2441 | /** | 2602 | /** |
2442 | * Connect application to the social service. | 2603 | * Connect application to the social service. |
2443 | * | 2604 | * |
@@ -2482,21 +2643,16 @@ GNUNET_SOCIAL_app_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
2482 | app->connected_cb = connected_cb; | 2643 | app->connected_cb = connected_cb; |
2483 | app->cb_cls = cls; | 2644 | app->cb_cls = cls; |
2484 | app->egos = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); | 2645 | app->egos = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); |
2485 | app->client = GNUNET_CLIENT_MANAGER_connect (cfg, "social", | 2646 | app->op = GNUNET_OP_create (); |
2486 | app_handlers); | ||
2487 | GNUNET_CLIENT_MANAGER_set_user_context_ (app->client, app, sizeof (*app)); | ||
2488 | |||
2489 | app->id = GNUNET_malloc (app_id_size); | 2647 | app->id = GNUNET_malloc (app_id_size); |
2490 | GNUNET_memcpy (app->id, id, app_id_size); | 2648 | GNUNET_memcpy (app->id, id, app_id_size); |
2491 | 2649 | ||
2492 | struct AppConnectRequest *creq = GNUNET_malloc (sizeof (*creq) + app_id_size); | 2650 | struct AppConnectRequest *creq; |
2493 | creq->header.size = htons (sizeof (*creq) + app_id_size); | 2651 | app->connect_env = GNUNET_MQ_msg_extra (creq, app_id_size, |
2494 | creq->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_APP_CONNECT); | 2652 | GNUNET_MESSAGE_TYPE_SOCIAL_APP_CONNECT); |
2495 | GNUNET_memcpy (&creq[1], app->id, app_id_size); | 2653 | GNUNET_memcpy (&creq[1], app->id, app_id_size); |
2496 | 2654 | ||
2497 | app->connect_msg = &creq->header; | 2655 | app_connect (app); |
2498 | app_send_connect_msg (app); | ||
2499 | |||
2500 | return app; | 2656 | return app; |
2501 | } | 2657 | } |
2502 | 2658 | ||
@@ -2516,8 +2672,15 @@ GNUNET_SOCIAL_app_disconnect (struct GNUNET_SOCIAL_App *app, | |||
2516 | GNUNET_ContinuationCallback disconnect_cb, | 2672 | GNUNET_ContinuationCallback disconnect_cb, |
2517 | void *disconnect_cls) | 2673 | void *disconnect_cls) |
2518 | { | 2674 | { |
2519 | GNUNET_CLIENT_MANAGER_disconnect (app->client, GNUNET_NO, | 2675 | // FIXME: wait till queued messages are sent |
2520 | disconnect_cb, disconnect_cls); | 2676 | if (NULL != app->mq) |
2677 | { | ||
2678 | GNUNET_MQ_destroy (app->mq); | ||
2679 | app->mq = NULL; | ||
2680 | } | ||
2681 | |||
2682 | if (NULL != disconnect_cb) | ||
2683 | disconnect_cb (disconnect_cls); | ||
2521 | } | 2684 | } |
2522 | 2685 | ||
2523 | 2686 | ||
@@ -2538,11 +2701,12 @@ void | |||
2538 | GNUNET_SOCIAL_app_detach (struct GNUNET_SOCIAL_App *app, | 2701 | GNUNET_SOCIAL_app_detach (struct GNUNET_SOCIAL_App *app, |
2539 | struct GNUNET_SOCIAL_Place *plc) | 2702 | struct GNUNET_SOCIAL_Place *plc) |
2540 | { | 2703 | { |
2541 | struct AppDetachRequest dreq; | 2704 | struct AppDetachRequest *dreq; |
2542 | dreq.header.size = htons (sizeof (dreq)); | 2705 | struct GNUNET_MQ_Envelope * |
2543 | dreq.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_APP_DETACH); | 2706 | env = GNUNET_MQ_msg (dreq, GNUNET_MESSAGE_TYPE_SOCIAL_APP_DETACH); |
2544 | dreq.place_pub_key = plc->pub_key; | 2707 | dreq->place_pub_key = plc->pub_key; |
2545 | GNUNET_CLIENT_MANAGER_transmit_now (plc->client, &dreq.header); | 2708 | |
2709 | GNUNET_MQ_send (app->mq, env); | ||
2546 | } | 2710 | } |
2547 | 2711 | ||
2548 | 2712 | ||