aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2016-08-17 21:26:41 +0000
committerGabor X Toth <*@tg-x.net>2016-08-17 21:26:41 +0000
commit667cc67f8224ccf4ff391b125a614cf90cf5917e (patch)
treeae2048a6525ab2521ad989afa795d7a2f0833af6
parent720a38ea7519f5a1a820157f056b150ab3d4abd5 (diff)
downloadgnunet-667cc67f8224ccf4ff391b125a614cf90cf5917e.tar.gz
gnunet-667cc67f8224ccf4ff391b125a614cf90cf5917e.zip
psyc, social: switch to MQ
-rw-r--r--src/include/gnunet_psyc_message.h2
-rw-r--r--src/multicast/multicast_api.c25
-rw-r--r--src/psyc/psyc_api.c662
-rw-r--r--src/psycstore/psycstore_api.c1
-rw-r--r--src/psycutil/psyc_message.c34
-rw-r--r--src/social/social_api.c1080
6 files changed, 1063 insertions, 741 deletions
diff --git a/src/include/gnunet_psyc_message.h b/src/include/gnunet_psyc_message.h
index 42ff0ea86..e6337d093 100644
--- a/src/include/gnunet_psyc_message.h
+++ b/src/include/gnunet_psyc_message.h
@@ -107,7 +107,7 @@ struct GNUNET_PSYC_TransmitHandle;
107 * Create a transmission handle. 107 * Create a transmission handle.
108 */ 108 */
109struct GNUNET_PSYC_TransmitHandle * 109struct GNUNET_PSYC_TransmitHandle *
110GNUNET_PSYC_transmit_create (); 110GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq);
111 111
112 112
113/** 113/**
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index 6fb45d722..89a9bf5e1 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -76,6 +76,11 @@ struct GNUNET_MULTICAST_Group
76 struct GNUNET_MQ_Handle *mq; 76 struct GNUNET_MQ_Handle *mq;
77 77
78 /** 78 /**
79 * Message to send on connect.
80 */
81 struct GNUNET_MQ_Envelope *connect_env;
82
83 /**
79 * Time to wait until we try to reconnect on failure. 84 * Time to wait until we try to reconnect on failure.
80 */ 85 */
81 struct GNUNET_TIME_Relative reconnect_delay; 86 struct GNUNET_TIME_Relative reconnect_delay;
@@ -85,11 +90,6 @@ struct GNUNET_MULTICAST_Group
85 */ 90 */
86 struct GNUNET_SCHEDULER_Task *reconnect_task; 91 struct GNUNET_SCHEDULER_Task *reconnect_task;
87 92
88 /**
89 * Message to send on connect.
90 */
91 struct GNUNET_MQ_Envelope *connect_env;
92
93 GNUNET_MULTICAST_JoinRequestCallback join_req_cb; 93 GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
94 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb; 94 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
95 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb; 95 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
@@ -522,7 +522,7 @@ handle_member_join_decision (void *cls,
522static void 522static void
523group_cleanup (struct GNUNET_MULTICAST_Group *grp) 523group_cleanup (struct GNUNET_MULTICAST_Group *grp)
524{ 524{
525 GNUNET_free (grp->connect_env); 525 GNUNET_MQ_discard (grp->connect_env);
526 if (NULL != grp->disconnect_cb) 526 if (NULL != grp->disconnect_cb)
527 grp->disconnect_cb (grp->disconnect_cls); 527 grp->disconnect_cb (grp->disconnect_cls);
528} 528}
@@ -724,7 +724,7 @@ origin_disconnected (void *cls, enum GNUNET_MQ_Error error)
724 } 724 }
725 725
726 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, 726 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
727 &origin_reconnect, 727 origin_reconnect,
728 orig); 728 orig);
729 grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); 729 grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
730} 730}
@@ -829,10 +829,11 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
829 grp->connect_env = GNUNET_MQ_msg (start, 829 grp->connect_env = GNUNET_MQ_msg (start,
830 GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START); 830 GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
831 start->max_fragment_id = max_fragment_id; 831 start->max_fragment_id = max_fragment_id;
832 GNUNET_memcpy (&start->group_key, priv_key, sizeof (*priv_key)); 832 start->group_key = *priv_key;
833 833
834 grp->is_origin = GNUNET_YES;
835 grp->cfg = cfg; 834 grp->cfg = cfg;
835 grp->is_origin = GNUNET_YES;
836 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
836 837
837 grp->cb_cls = cls; 838 grp->cb_cls = cls;
838 grp->join_req_cb = join_request_cb; 839 grp->join_req_cb = join_request_cb;
@@ -1024,7 +1025,7 @@ member_disconnected (void *cls, enum GNUNET_MQ_Error error)
1024 grp->mq = NULL; 1025 grp->mq = NULL;
1025 1026
1026 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, 1027 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
1027 &member_reconnect, 1028 member_reconnect,
1028 mem); 1029 mem);
1029 grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); 1030 grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
1030} 1031}
@@ -1162,9 +1163,9 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1162 if (0 < join_msg_size) 1163 if (0 < join_msg_size)
1163 GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); 1164 GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
1164 1165
1165 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1166 grp->is_origin = GNUNET_NO;
1167 grp->cfg = cfg; 1166 grp->cfg = cfg;
1167 grp->is_origin = GNUNET_NO;
1168 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1168 1169
1169 mem->join_dcsn_cb = join_decision_cb; 1170 mem->join_dcsn_cb = join_decision_cb;
1170 grp->join_req_cb = join_request_cb; 1171 grp->join_req_cb = join_request_cb;
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 515a2731a..2f6a15bab 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -55,7 +55,27 @@ struct GNUNET_PSYC_Channel
55 /** 55 /**
56 * Client connection to the service. 56 * Client connection to the service.
57 */ 57 */
58 struct GNUNET_CLIENT_MANAGER_Connection *client; 58 struct GNUNET_MQ_Handle *mq;
59
60 /**
61 * Message to send on connect.
62 */
63 struct GNUNET_MQ_Envelope *connect_env;
64
65 /**
66 * Time to wait until we try to reconnect on failure.
67 */
68 struct GNUNET_TIME_Relative reconnect_delay;
69
70 /**
71 * Task for reconnecting when the listener fails.
72 */
73 struct GNUNET_SCHEDULER_Task *reconnect_task;
74
75 /**
76 * Async operations.
77 */
78 struct GNUNET_OP_Handle *op;
59 79
60 /** 80 /**
61 * Transmission handle; 81 * Transmission handle;
@@ -68,11 +88,6 @@ struct GNUNET_PSYC_Channel
68 struct GNUNET_PSYC_ReceiveHandle *recv; 88 struct GNUNET_PSYC_ReceiveHandle *recv;
69 89
70 /** 90 /**
71 * Message to send on reconnect.
72 */
73 struct GNUNET_MessageHeader *connect_msg;
74
75 /**
76 * Function called after disconnected from the service. 91 * Function called after disconnected from the service.
77 */ 92 */
78 GNUNET_ContinuationCallback disconnect_cb; 93 GNUNET_ContinuationCallback disconnect_cb;
@@ -219,41 +234,21 @@ struct GNUNET_PSYC_StateRequest
219}; 234};
220 235
221 236
222static void 237static int
223channel_send_connect_msg (struct GNUNET_PSYC_Channel *chn) 238check_channel_result (void *cls,
224{ 239 const struct GNUNET_OperationResultMessage *res)
225 uint16_t cmsg_size = ntohs (chn->connect_msg->size);
226 struct GNUNET_MessageHeader *cmsg = GNUNET_malloc (cmsg_size);
227 GNUNET_memcpy (cmsg, chn->connect_msg, cmsg_size);
228 GNUNET_CLIENT_MANAGER_transmit_now (chn->client, cmsg);
229 GNUNET_free (cmsg);
230}
231
232
233static void
234channel_recv_disconnect (void *cls,
235 struct GNUNET_CLIENT_MANAGER_Connection *client,
236 const struct GNUNET_MessageHeader *msg)
237{ 240{
238 struct GNUNET_PSYC_Channel * 241 return GNUNET_OK;
239 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
240 GNUNET_CLIENT_MANAGER_reconnect (client);
241 channel_send_connect_msg (chn);
242} 242}
243 243
244 244
245static void 245static void
246channel_recv_result (void *cls, 246handle_channel_result (void *cls,
247 struct GNUNET_CLIENT_MANAGER_Connection *client, 247 const struct GNUNET_OperationResultMessage *res)
248 const struct GNUNET_MessageHeader *msg)
249{ 248{
250 struct GNUNET_PSYC_Channel * 249 struct GNUNET_PSYC_Channel *chn = cls;
251 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
252 250
253 const struct GNUNET_OperationResultMessage * 251 uint16_t size = ntohs (res->header.size);
254 res = (const struct GNUNET_OperationResultMessage *) msg;
255
256 uint16_t size = ntohs (msg->size);
257 if (size < sizeof (*res)) 252 if (size < sizeof (*res))
258 { /* Error, message too small. */ 253 { /* Error, message too small. */
259 GNUNET_break (0); 254 GNUNET_break (0);
@@ -262,9 +257,9 @@ channel_recv_result (void *cls,
262 257
263 uint16_t data_size = size - sizeof (*res); 258 uint16_t data_size = size - sizeof (*res);
264 const char *data = (0 < data_size) ? (void *) &res[1] : NULL; 259 const char *data = (0 < data_size) ? (void *) &res[1] : NULL;
265 GNUNET_CLIENT_MANAGER_op_result (chn->client, GNUNET_ntohll (res->op_id), 260 GNUNET_OP_result (chn->op, GNUNET_ntohll (res->op_id),
266 GNUNET_ntohll (res->result_code), 261 GNUNET_ntohll (res->result_code),
267 data, data_size); 262 data, data_size, NULL);
268} 263}
269 264
270 265
@@ -301,18 +296,30 @@ op_recv_state_result (void *cls, int64_t result,
301} 296}
302 297
303 298
304static void 299static int
305channel_recv_history_result (void *cls, 300check_channel_history_result (void *cls,
306 struct GNUNET_CLIENT_MANAGER_Connection *client, 301 const struct GNUNET_OperationResultMessage *res)
307 const struct GNUNET_MessageHeader *msg)
308{ 302{
309 struct GNUNET_PSYC_Channel * 303 struct GNUNET_PSYC_MessageHeader *
310 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); 304 pmsg = (struct GNUNET_PSYC_MessageHeader *) GNUNET_MQ_extract_nested_mh (res);
305 uint16_t size = ntohs (res->header.size);
311 306
312 const struct GNUNET_OperationResultMessage * 307 if (NULL == pmsg || size < sizeof (*res) + sizeof (*pmsg))
313 res = (const struct GNUNET_OperationResultMessage *) msg; 308 { /* Error, message too small. */
309 GNUNET_break_op (0);
310 return GNUNET_SYSERR;
311 }
312 return GNUNET_OK;
313}
314
315
316static void
317handle_channel_history_result (void *cls,
318 const struct GNUNET_OperationResultMessage *res)
319{
320 struct GNUNET_PSYC_Channel *chn = cls;
314 struct GNUNET_PSYC_MessageHeader * 321 struct GNUNET_PSYC_MessageHeader *
315 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1]; 322 pmsg = (struct GNUNET_PSYC_MessageHeader *) GNUNET_MQ_extract_nested_mh (res);
316 323
317 LOG (GNUNET_ERROR_TYPE_DEBUG, 324 LOG (GNUNET_ERROR_TYPE_DEBUG,
318 "%p Received historic fragment for message #%" PRIu64 ".\n", 325 "%p Received historic fragment for message #%" PRIu64 ".\n",
@@ -321,9 +328,9 @@ channel_recv_history_result (void *cls,
321 GNUNET_ResultCallback result_cb = NULL; 328 GNUNET_ResultCallback result_cb = NULL;
322 struct GNUNET_PSYC_HistoryRequest *hist = NULL; 329 struct GNUNET_PSYC_HistoryRequest *hist = NULL;
323 330
324 if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client, 331 if (GNUNET_YES != GNUNET_OP_get (chn->op,
325 GNUNET_ntohll (res->op_id), 332 GNUNET_ntohll (res->op_id),
326 &result_cb, (void *) &hist)) 333 &result_cb, (void *) &hist, NULL))
327 { /* Operation not found. */ 334 { /* Operation not found. */
328 LOG (GNUNET_ERROR_TYPE_WARNING, 335 LOG (GNUNET_ERROR_TYPE_WARNING,
329 "%p Replay operation not found for historic fragment of message #%" 336 "%p Replay operation not found for historic fragment of message #%"
@@ -332,47 +339,47 @@ channel_recv_history_result (void *cls,
332 return; 339 return;
333 } 340 }
334 341
335 uint16_t size = ntohs (msg->size);
336 if (size < sizeof (*res) + sizeof (*pmsg))
337 { /* Error, message too small. */
338 GNUNET_break (0);
339 return;
340 }
341
342 GNUNET_PSYC_receive_message (hist->recv, 342 GNUNET_PSYC_receive_message (hist->recv,
343 (const struct GNUNET_PSYC_MessageHeader *) pmsg); 343 (const struct GNUNET_PSYC_MessageHeader *) pmsg);
344} 344}
345 345
346 346
347static void 347static int
348channel_recv_state_result (void *cls, 348check_channel_state_result (void *cls,
349 struct GNUNET_CLIENT_MANAGER_Connection *client, 349 const struct GNUNET_OperationResultMessage *res)
350 const struct GNUNET_MessageHeader *msg)
351{ 350{
352 struct GNUNET_PSYC_Channel * 351 const struct GNUNET_MessageHeader *mod = GNUNET_MQ_extract_nested_mh (res);
353 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); 352 uint16_t mod_size = ntohs (mod->size);
353 uint16_t size = ntohs (res->header.size);
354
355 if (NULL == mod || size - sizeof (*res) != mod_size)
356 {
357 GNUNET_break_op (0);
358 return GNUNET_SYSERR;
359 }
360 return GNUNET_OK;
361}
362
354 363
355 const struct GNUNET_OperationResultMessage * 364static void
356 res = (const struct GNUNET_OperationResultMessage *) msg; 365handle_channel_state_result (void *cls,
366 const struct GNUNET_OperationResultMessage *res)
367{
368 struct GNUNET_PSYC_Channel *chn = cls;
357 369
358 GNUNET_ResultCallback result_cb = NULL; 370 GNUNET_ResultCallback result_cb = NULL;
359 struct GNUNET_PSYC_StateRequest *sr = NULL; 371 struct GNUNET_PSYC_StateRequest *sr = NULL;
360 372
361 if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client, 373 if (GNUNET_YES != GNUNET_OP_get (chn->op,
362 GNUNET_ntohll (res->op_id), 374 GNUNET_ntohll (res->op_id),
363 &result_cb, (void *) &sr)) 375 &result_cb, (void *) &sr, NULL))
364 { /* Operation not found. */ 376 { /* Operation not found. */
365 return; 377 return;
366 } 378 }
367 379
368 const struct GNUNET_MessageHeader * 380 const struct GNUNET_MessageHeader *mod = GNUNET_MQ_extract_nested_mh (res);
369 mod = (struct GNUNET_MessageHeader *) &res[1];
370 uint16_t mod_size = ntohs (mod->size); 381 uint16_t mod_size = ntohs (mod->size);
371 if (ntohs (msg->size) - sizeof (*res) != mod_size) 382
372 {
373 GNUNET_break (0);
374 return;
375 }
376 switch (ntohs (mod->type)) 383 switch (ntohs (mod->type))
377 { 384 {
378 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: 385 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
@@ -401,40 +408,40 @@ channel_recv_state_result (void *cls,
401} 408}
402 409
403 410
411static int
412check_channel_message (void *cls,
413 const struct GNUNET_PSYC_MessageHeader *pmsg)
414{
415 return GNUNET_OK;
416}
417
418
404static void 419static void
405channel_recv_message (void *cls, 420handle_channel_message (void *cls,
406 struct GNUNET_CLIENT_MANAGER_Connection *client, 421 const struct GNUNET_PSYC_MessageHeader *pmsg)
407 const struct GNUNET_MessageHeader *msg)
408{ 422{
409 struct GNUNET_PSYC_Channel * 423 struct GNUNET_PSYC_Channel *chn = cls;
410 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); 424
411 GNUNET_PSYC_receive_message (chn->recv, 425 GNUNET_PSYC_receive_message (chn->recv, pmsg);
412 (const struct GNUNET_PSYC_MessageHeader *) msg);
413} 426}
414 427
415 428
416static void 429static void
417channel_recv_message_ack (void *cls, 430handle_channel_message_ack (void *cls,
418 struct GNUNET_CLIENT_MANAGER_Connection *client, 431 const struct GNUNET_MessageHeader *msg)
419 const struct GNUNET_MessageHeader *msg)
420{ 432{
421 struct GNUNET_PSYC_Channel * 433 struct GNUNET_PSYC_Channel *chn = cls;
422 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); 434
423 GNUNET_PSYC_transmit_got_ack (chn->tmit); 435 GNUNET_PSYC_transmit_got_ack (chn->tmit);
424} 436}
425 437
426 438
427static void 439static void
428master_recv_start_ack (void *cls, 440handle_master_start_ack (void *cls,
429 struct GNUNET_CLIENT_MANAGER_Connection *client, 441 const struct GNUNET_PSYC_CountersResultMessage *cres)
430 const struct GNUNET_MessageHeader *msg)
431{ 442{
432 struct GNUNET_PSYC_Master * 443 struct GNUNET_PSYC_Master *mst = cls;
433 mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
434 sizeof (struct GNUNET_PSYC_Channel));
435 444
436 struct GNUNET_PSYC_CountersResultMessage *
437 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
438 int32_t result = ntohl (cres->result_code); 445 int32_t result = ntohl (cres->result_code);
439 if (GNUNET_OK != result && GNUNET_NO != result) 446 if (GNUNET_OK != result && GNUNET_NO != result)
440 { 447 {
@@ -447,23 +454,27 @@ master_recv_start_ack (void *cls,
447} 454}
448 455
449 456
457static int
458check_master_join_request (void *cls,
459 const struct GNUNET_PSYC_JoinRequestMessage *req)
460{
461 return GNUNET_OK;
462}
463
464
450static void 465static void
451master_recv_join_request (void *cls, 466handle_master_join_request (void *cls,
452 struct GNUNET_CLIENT_MANAGER_Connection *client, 467 const struct GNUNET_PSYC_JoinRequestMessage *req)
453 const struct GNUNET_MessageHeader *msg)
454{ 468{
455 struct GNUNET_PSYC_Master * 469 struct GNUNET_PSYC_Master *mst = cls;
456 mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, 470
457 sizeof (struct GNUNET_PSYC_Channel));
458 if (NULL == mst->join_req_cb) 471 if (NULL == mst->join_req_cb)
459 return; 472 return;
460 473
461 const struct GNUNET_PSYC_JoinRequestMessage *
462 req = (const struct GNUNET_PSYC_JoinRequestMessage *) msg;
463 const struct GNUNET_PSYC_Message *join_msg = NULL; 474 const struct GNUNET_PSYC_Message *join_msg = NULL;
464 if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size)) 475 if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size))
465 { 476 {
466 join_msg = (struct GNUNET_PSYC_Message *) &req[1]; 477 join_msg = (struct GNUNET_PSYC_Message *) GNUNET_MQ_extract_nested_mh (req);
467 LOG (GNUNET_ERROR_TYPE_DEBUG, 478 LOG (GNUNET_ERROR_TYPE_DEBUG,
468 "Received join_msg of type %u and size %u.\n", 479 "Received join_msg of type %u and size %u.\n",
469 ntohs (join_msg->header.type), ntohs (join_msg->header.size)); 480 ntohs (join_msg->header.type), ntohs (join_msg->header.size));
@@ -479,15 +490,11 @@ master_recv_join_request (void *cls,
479 490
480 491
481static void 492static void
482slave_recv_join_ack (void *cls, 493handle_slave_join_ack (void *cls,
483 struct GNUNET_CLIENT_MANAGER_Connection *client, 494 const struct GNUNET_PSYC_CountersResultMessage *cres)
484 const struct GNUNET_MessageHeader *msg)
485{ 495{
486 struct GNUNET_PSYC_Slave * 496 struct GNUNET_PSYC_Slave *slv = cls;
487 slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client, 497
488 sizeof (struct GNUNET_PSYC_Channel));
489 struct GNUNET_PSYC_CountersResultMessage *
490 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
491 int32_t result = ntohl (cres->result_code); 498 int32_t result = ntohl (cres->result_code);
492 if (GNUNET_YES != result && GNUNET_NO != result) 499 if (GNUNET_YES != result && GNUNET_NO != result)
493 { 500 {
@@ -500,16 +507,19 @@ slave_recv_join_ack (void *cls,
500} 507}
501 508
502 509
510static int
511check_slave_join_decision (void *cls,
512 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
513{
514 return GNUNET_OK;
515}
516
517
503static void 518static void
504slave_recv_join_decision (void *cls, 519handle_slave_join_decision (void *cls,
505 struct GNUNET_CLIENT_MANAGER_Connection *client, 520 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
506 const struct GNUNET_MessageHeader *msg)
507{ 521{
508 struct GNUNET_PSYC_Slave * 522 struct GNUNET_PSYC_Slave *slv = cls;
509 slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
510 sizeof (struct GNUNET_PSYC_Channel));
511 const struct GNUNET_PSYC_JoinDecisionMessage *
512 dcsn = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
513 523
514 struct GNUNET_PSYC_Message *pmsg = NULL; 524 struct GNUNET_PSYC_Message *pmsg = NULL;
515 if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg)) 525 if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg))
@@ -520,104 +530,164 @@ slave_recv_join_decision (void *cls,
520} 530}
521 531
522 532
523static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] = 533static void
534channel_cleanup (struct GNUNET_PSYC_Channel *chn)
524{ 535{
525 { &channel_recv_message, NULL, 536 if (NULL != chn->tmit)
526 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 537 {
527 sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES }, 538 GNUNET_PSYC_transmit_destroy (chn->tmit);
528 539 chn->tmit = NULL;
529 { &channel_recv_message_ack, NULL, 540 }
530 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, 541 if (NULL != chn->recv)
531 sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, 542 {
532 543 GNUNET_PSYC_receive_destroy (chn->recv);
533 { &master_recv_start_ack, NULL, 544 chn->recv = NULL;
534 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK, 545 }
535 sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO }, 546 if (NULL != chn->connect_env)
536 547 {
537 { &master_recv_join_request, NULL, 548 GNUNET_MQ_discard (chn->connect_env);
538 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, 549 chn->connect_env = NULL;
539 sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES }, 550 }
540 551 if (NULL != chn->disconnect_cb)
541 { &channel_recv_history_result, NULL, 552 {
542 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, 553 chn->disconnect_cb (chn->disconnect_cls);
543 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, 554 chn->disconnect_cb = NULL;
544 555 }
545 { &channel_recv_state_result, NULL, 556}
546 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
547 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
548
549 { &channel_recv_result, NULL,
550 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
551 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
552 557
553 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
554 558
555 { NULL, NULL, 0, 0, GNUNET_NO } 559static void
556}; 560master_cleanup (void *cls)
561{
562 struct GNUNET_PSYC_Master *mst = cls;
563 channel_cleanup (&mst->chn);
564 GNUNET_free (mst);
565}
557 566
558 567
559static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] = 568static void
569slave_cleanup (void *cls)
560{ 570{
561 { &channel_recv_message, NULL, 571 struct GNUNET_PSYC_Slave *slv = cls;
562 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 572 channel_cleanup (&slv->chn);
563 sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES }, 573 GNUNET_free (slv);
564 574}
565 { &channel_recv_message_ack, NULL,
566 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
567 sizeof (struct GNUNET_MessageHeader), GNUNET_NO },
568 575
569 { &slave_recv_join_ack, NULL,
570 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK,
571 sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO },
572 576
573 { &slave_recv_join_decision, NULL, 577static void
574 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 578channel_disconnect (struct GNUNET_PSYC_Channel *chn,
575 sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES }, 579 GNUNET_ContinuationCallback cb,
580 void *cls)
581{
582 chn->is_disconnecting = GNUNET_YES;
583 chn->disconnect_cb = cb;
584 chn->disconnect_cls = cls;
576 585
577 { &channel_recv_history_result, NULL, 586 // FIXME: wait till queued messages are sent
578 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT, 587 if (NULL != chn->mq)
579 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES }, 588 {
589 GNUNET_MQ_destroy (chn->mq);
590 chn->mq = NULL;
591 }
592}
580 593
581 { &channel_recv_state_result, NULL,
582 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
583 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
584 594
585 { &channel_recv_result, NULL, 595/*** MASTER ***/
586 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
587 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
588 596
589 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
590 597
591 { NULL, NULL, 0, 0, GNUNET_NO } 598static void
592}; 599master_connect (struct GNUNET_PSYC_Master *mst);
593 600
594 601
595static void 602static void
596channel_cleanup (struct GNUNET_PSYC_Channel *chn) 603master_reconnect (void *cls)
597{ 604{
598 GNUNET_PSYC_transmit_destroy (chn->tmit); 605 master_connect (cls);
599 GNUNET_PSYC_receive_destroy (chn->recv);
600 GNUNET_free (chn->connect_msg);
601 if (NULL != chn->disconnect_cb)
602 chn->disconnect_cb (chn->disconnect_cls);
603} 606}
604 607
605 608
609/**
610 * Master client disconnected from service.
611 *
612 * Reconnect after backoff period.
613 */
606static void 614static void
607master_cleanup (void *cls) 615master_disconnected (void *cls, enum GNUNET_MQ_Error error)
608{ 616{
609 struct GNUNET_PSYC_Master *mst = cls; 617 struct GNUNET_PSYC_Master *mst = cls;
610 channel_cleanup (&mst->chn); 618 struct GNUNET_PSYC_Channel *chn = &mst->chn;
611 GNUNET_free (mst); 619
620 LOG (GNUNET_ERROR_TYPE_DEBUG,
621 "Master client disconnected (%d), re-connecting\n",
622 (int) error);
623 if (NULL != chn->mq)
624 {
625 GNUNET_MQ_destroy (chn->mq);
626 chn->mq = NULL;
627 }
628 if (NULL != chn->tmit)
629 {
630 GNUNET_PSYC_transmit_destroy (chn->tmit);
631 chn->tmit = NULL;
632 }
633
634 chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay,
635 master_reconnect,
636 mst);
637 chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay);
612} 638}
613 639
614 640
615static void 641static void
616slave_cleanup (void *cls) 642master_connect (struct GNUNET_PSYC_Master *mst)
617{ 643{
618 struct GNUNET_PSYC_Slave *slv = cls; 644 struct GNUNET_PSYC_Channel *chn = &mst->chn;
619 channel_cleanup (&slv->chn); 645
620 GNUNET_free (slv); 646 GNUNET_MQ_hd_fixed_size (master_start_ack,
647 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK,
648 struct GNUNET_PSYC_CountersResultMessage);
649
650 GNUNET_MQ_hd_var_size (master_join_request,
651 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
652 struct GNUNET_PSYC_JoinRequestMessage);
653
654 GNUNET_MQ_hd_var_size (channel_message,
655 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
656 struct GNUNET_PSYC_MessageHeader);
657
658 GNUNET_MQ_hd_fixed_size (channel_message_ack,
659 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
660 struct GNUNET_MessageHeader);
661
662 GNUNET_MQ_hd_var_size (channel_history_result,
663 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
664 struct GNUNET_OperationResultMessage);
665
666 GNUNET_MQ_hd_var_size (channel_state_result,
667 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
668 struct GNUNET_OperationResultMessage);
669
670 GNUNET_MQ_hd_var_size (channel_result,
671 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
672 struct GNUNET_OperationResultMessage);
673
674 struct GNUNET_MQ_MessageHandler handlers[] = {
675 make_master_start_ack_handler (mst),
676 make_master_join_request_handler (mst),
677 make_channel_message_handler (chn),
678 make_channel_message_ack_handler (chn),
679 make_channel_history_result_handler (chn),
680 make_channel_state_result_handler (chn),
681 make_channel_result_handler (chn),
682 GNUNET_MQ_handler_end ()
683 };
684
685 chn->mq = GNUNET_CLIENT_connecT (chn->cfg, "psyc",
686 handlers, master_disconnected, mst);
687 GNUNET_assert (NULL != chn->mq);
688 chn->tmit = GNUNET_PSYC_transmit_create (chn->mq);
689
690 GNUNET_MQ_send_copy (chn->mq, chn->connect_env);
621} 691}
622 692
623 693
@@ -664,26 +734,23 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
664 struct GNUNET_PSYC_Channel *chn = &mst->chn; 734 struct GNUNET_PSYC_Channel *chn = &mst->chn;
665 735
666 struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); 736 struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req));
667 req->header.size = htons (sizeof (*req)); 737 chn->connect_env = GNUNET_MQ_msg (req,
668 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); 738 GNUNET_MESSAGE_TYPE_PSYC_MASTER_START);
669 req->channel_key = *channel_key; 739 req->channel_key = *channel_key;
670 req->policy = policy; 740 req->policy = policy;
671 741
672 chn->connect_msg = &req->header;
673 chn->cfg = cfg; 742 chn->cfg = cfg;
674 chn->is_master = GNUNET_YES; 743 chn->is_master = GNUNET_YES;
744 chn->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
745
746 chn->op = GNUNET_OP_create ();
747 chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
675 748
676 mst->start_cb = start_cb; 749 mst->start_cb = start_cb;
677 mst->join_req_cb = join_request_cb; 750 mst->join_req_cb = join_request_cb;
678 mst->cb_cls = cls; 751 mst->cb_cls = cls;
679 752
680 chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", master_handlers); 753 master_connect (mst);
681 GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, mst, sizeof (*chn));
682
683 chn->tmit = GNUNET_PSYC_transmit_create (chn->client);
684 chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
685
686 channel_send_connect_msg (chn);
687 return mst; 754 return mst;
688} 755}
689 756
@@ -704,12 +771,8 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst,
704 771
705 /* FIXME: send msg to service */ 772 /* FIXME: send msg to service */
706 773
707 chn->is_disconnecting = GNUNET_YES; 774 channel_disconnect (chn, stop_cb, stop_cls);
708 chn->disconnect_cb = stop_cb; 775 master_cleanup (mst);
709 chn->disconnect_cls = stop_cls;
710
711 GNUNET_CLIENT_MANAGER_disconnect (mst->chn.client, GNUNET_YES,
712 &master_cleanup, mst);
713} 776}
714 777
715 778
@@ -753,17 +816,16 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
753 < sizeof (*dcsn) + relay_size + join_resp_size) 816 < sizeof (*dcsn) + relay_size + join_resp_size)
754 return GNUNET_SYSERR; 817 return GNUNET_SYSERR;
755 818
756 dcsn = GNUNET_malloc (sizeof (*dcsn) + relay_size + join_resp_size); 819 struct GNUNET_MQ_Envelope *
757 dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size); 820 env = GNUNET_MQ_msg_extra (dcsn, relay_size + join_resp_size,
758 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); 821 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
759 dcsn->is_admitted = htonl (is_admitted); 822 dcsn->is_admitted = htonl (is_admitted);
760 dcsn->slave_pub_key = jh->slave_pub_key; 823 dcsn->slave_pub_key = jh->slave_pub_key;
761 824
762 if (0 < join_resp_size) 825 if (0 < join_resp_size)
763 GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size); 826 GNUNET_memcpy (&dcsn[1], join_resp, join_resp_size);
764 827
765 GNUNET_CLIENT_MANAGER_transmit (chn->client, &dcsn->header); 828 GNUNET_MQ_send (chn->mq, env);
766 GNUNET_free (dcsn);
767 GNUNET_free (jh); 829 GNUNET_free (jh);
768 return GNUNET_OK; 830 return GNUNET_OK;
769} 831}
@@ -838,6 +900,104 @@ GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
838} 900}
839 901
840 902
903/*** SLAVE ***/
904
905
906static void
907slave_connect (struct GNUNET_PSYC_Slave *slv);
908
909
910static void
911slave_reconnect (void *cls)
912{
913 slave_connect (cls);
914}
915
916
917/**
918 * Slave client disconnected from service.
919 *
920 * Reconnect after backoff period.
921 */
922static void
923slave_disconnected (void *cls, enum GNUNET_MQ_Error error)
924{
925 struct GNUNET_PSYC_Slave *slv = cls;
926 struct GNUNET_PSYC_Channel *chn = &slv->chn;
927
928 LOG (GNUNET_ERROR_TYPE_DEBUG,
929 "Slave client disconnected (%d), re-connecting\n",
930 (int) error);
931 if (NULL != chn->mq)
932 {
933 GNUNET_MQ_destroy (chn->mq);
934 chn->mq = NULL;
935 }
936 if (NULL != chn->tmit)
937 {
938 GNUNET_PSYC_transmit_destroy (chn->tmit);
939 chn->tmit = NULL;
940 }
941 chn->reconnect_task = GNUNET_SCHEDULER_add_delayed (chn->reconnect_delay,
942 slave_reconnect,
943 slv);
944 chn->reconnect_delay = GNUNET_TIME_STD_BACKOFF (chn->reconnect_delay);
945}
946
947
948static void
949slave_connect (struct GNUNET_PSYC_Slave *slv)
950{
951 struct GNUNET_PSYC_Channel *chn = &slv->chn;
952
953 GNUNET_MQ_hd_fixed_size (slave_join_ack,
954 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK,
955 struct GNUNET_PSYC_CountersResultMessage);
956
957 GNUNET_MQ_hd_var_size (slave_join_decision,
958 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
959 struct GNUNET_PSYC_JoinDecisionMessage);
960
961 GNUNET_MQ_hd_var_size (channel_message,
962 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
963 struct GNUNET_PSYC_MessageHeader);
964
965 GNUNET_MQ_hd_fixed_size (channel_message_ack,
966 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
967 struct GNUNET_MessageHeader);
968
969 GNUNET_MQ_hd_var_size (channel_history_result,
970 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
971 struct GNUNET_OperationResultMessage);
972
973 GNUNET_MQ_hd_var_size (channel_state_result,
974 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
975 struct GNUNET_OperationResultMessage);
976
977 GNUNET_MQ_hd_var_size (channel_result,
978 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
979 struct GNUNET_OperationResultMessage);
980
981 struct GNUNET_MQ_MessageHandler handlers[] = {
982 make_slave_join_ack_handler (slv),
983 make_slave_join_decision_handler (slv),
984 make_channel_message_handler (chn),
985 make_channel_message_ack_handler (chn),
986 make_channel_history_result_handler (chn),
987 make_channel_state_result_handler (chn),
988 make_channel_result_handler (chn),
989 GNUNET_MQ_handler_end ()
990 };
991
992 chn->mq = GNUNET_CLIENT_connecT (chn->cfg, "psyc",
993 handlers, slave_disconnected, slv);
994 GNUNET_assert (NULL != chn->mq);
995 chn->tmit = GNUNET_PSYC_transmit_create (chn->mq);
996
997 GNUNET_MQ_send_copy (chn->mq, chn->connect_env);
998}
999
1000
841/** 1001/**
842 * Join a PSYC channel. 1002 * Join a PSYC channel.
843 * 1003 *
@@ -892,15 +1052,14 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
892 struct GNUNET_PSYC_Channel *chn = &slv->chn; 1052 struct GNUNET_PSYC_Channel *chn = &slv->chn;
893 uint16_t relay_size = relay_count * sizeof (*relays); 1053 uint16_t relay_size = relay_count * sizeof (*relays);
894 uint16_t join_msg_size; 1054 uint16_t join_msg_size;
895 struct SlaveJoinRequest *req;
896
897 if (NULL == join_msg) 1055 if (NULL == join_msg)
898 join_msg_size = 0; 1056 join_msg_size = 0;
899 else 1057 else
900 join_msg_size = ntohs (join_msg->header.size); 1058 join_msg_size = ntohs (join_msg->header.size);
901 req = GNUNET_malloc (sizeof (*req) + relay_size + join_msg_size); 1059
902 req->header.size = htons (sizeof (*req) + relay_size + join_msg_size); 1060 struct SlaveJoinRequest *req;
903 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); 1061 chn->connect_env = GNUNET_MQ_msg_extra (req, relay_size + join_msg_size,
1062 GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
904 req->channel_pub_key = *channel_pub_key; 1063 req->channel_pub_key = *channel_pub_key;
905 req->slave_key = *slave_key; 1064 req->slave_key = *slave_key;
906 req->origin = *origin; 1065 req->origin = *origin;
@@ -913,21 +1072,18 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
913 if (NULL != join_msg) 1072 if (NULL != join_msg)
914 GNUNET_memcpy ((char *) &req[1] + relay_size, join_msg, join_msg_size); 1073 GNUNET_memcpy ((char *) &req[1] + relay_size, join_msg, join_msg_size);
915 1074
916 chn->connect_msg = &req->header;
917 chn->cfg = cfg; 1075 chn->cfg = cfg;
918 chn->is_master = GNUNET_NO; 1076 chn->is_master = GNUNET_NO;
1077 chn->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1078
1079 chn->op = GNUNET_OP_create ();
1080 chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
919 1081
920 slv->connect_cb = connect_cb; 1082 slv->connect_cb = connect_cb;
921 slv->join_dcsn_cb = join_decision_cb; 1083 slv->join_dcsn_cb = join_decision_cb;
922 slv->cb_cls = cls; 1084 slv->cb_cls = cls;
923 1085
924 chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", slave_handlers); 1086 slave_connect (slv);
925 GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, slv, sizeof (*chn));
926
927 chn->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
928 chn->tmit = GNUNET_PSYC_transmit_create (chn->client);
929
930 channel_send_connect_msg (chn);
931 return slv; 1087 return slv;
932} 1088}
933 1089
@@ -950,12 +1106,8 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv,
950 1106
951 /* FIXME: send msg to service */ 1107 /* FIXME: send msg to service */
952 1108
953 chn->is_disconnecting = GNUNET_YES; 1109 channel_disconnect (chn, part_cb, part_cls);
954 chn->disconnect_cb = part_cb; 1110 slave_cleanup (slv);
955 chn->disconnect_cls = part_cls;
956
957 GNUNET_CLIENT_MANAGER_disconnect (slv->chn.client, GNUNET_YES,
958 &slave_cleanup, slv);
959} 1111}
960 1112
961 1113
@@ -1069,18 +1221,16 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
1069 GNUNET_ResultCallback result_cb, 1221 GNUNET_ResultCallback result_cb,
1070 void *cls) 1222 void *cls)
1071{ 1223{
1072 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); 1224 struct ChannelMembershipStoreRequest *req;
1073 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); 1225 struct GNUNET_MQ_Envelope *
1074 req->header.size = htons (sizeof (*req)); 1226 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE);
1075 req->slave_pub_key = *slave_pub_key; 1227 req->slave_pub_key = *slave_pub_key;
1076 req->announced_at = GNUNET_htonll (announced_at); 1228 req->announced_at = GNUNET_htonll (announced_at);
1077 req->effective_since = GNUNET_htonll (effective_since); 1229 req->effective_since = GNUNET_htonll (effective_since);
1078 req->did_join = GNUNET_YES; 1230 req->did_join = GNUNET_YES;
1079 req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client, 1231 req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL));
1080 result_cb, cls));
1081 1232
1082 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1233 GNUNET_MQ_send (chn->mq, env);
1083 GNUNET_free (req);
1084} 1234}
1085 1235
1086 1236
@@ -1122,17 +1272,15 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1122 GNUNET_ResultCallback result_cb, 1272 GNUNET_ResultCallback result_cb,
1123 void *cls) 1273 void *cls)
1124{ 1274{
1125 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); 1275 struct ChannelMembershipStoreRequest *req;
1126 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE); 1276 struct GNUNET_MQ_Envelope *
1127 req->header.size = htons (sizeof (*req)); 1277 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE);
1128 req->slave_pub_key = *slave_pub_key; 1278 req->slave_pub_key = *slave_pub_key;
1129 req->announced_at = GNUNET_htonll (announced_at); 1279 req->announced_at = GNUNET_htonll (announced_at);
1130 req->did_join = GNUNET_NO; 1280 req->did_join = GNUNET_NO;
1131 req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client, 1281 req->op_id = GNUNET_htonll (GNUNET_OP_add (chn->op, result_cb, cls, NULL));
1132 result_cb, cls));
1133 1282
1134 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1283 GNUNET_MQ_send (chn->mq, env);
1135 GNUNET_free (req);
1136} 1284}
1137 1285
1138 1286
@@ -1154,17 +1302,17 @@ channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1154 hist->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls); 1302 hist->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
1155 hist->result_cb = result_cb; 1303 hist->result_cb = result_cb;
1156 hist->cls = cls; 1304 hist->cls = cls;
1157 hist->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client, 1305 hist->op_id = GNUNET_OP_add (chn->op, op_recv_history_result, hist, NULL);
1158 &op_recv_history_result, hist);
1159 1306
1160 GNUNET_assert (NULL != method_prefix); 1307 GNUNET_assert (NULL != method_prefix);
1161 uint16_t method_size = strnlen (method_prefix, 1308 uint16_t method_size = strnlen (method_prefix,
1162 GNUNET_SERVER_MAX_MESSAGE_SIZE 1309 GNUNET_SERVER_MAX_MESSAGE_SIZE
1163 - sizeof (*req)) + 1; 1310 - sizeof (*req)) + 1;
1164 GNUNET_assert ('\0' == method_prefix[method_size - 1]); 1311 GNUNET_assert ('\0' == method_prefix[method_size - 1]);
1165 req = GNUNET_malloc (sizeof (*req) + method_size); 1312
1166 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); 1313 struct GNUNET_MQ_Envelope *
1167 req->header.size = htons (sizeof (*req) + method_size); 1314 env = GNUNET_MQ_msg_extra (req, method_size,
1315 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY);
1168 req->start_message_id = GNUNET_htonll (start_message_id); 1316 req->start_message_id = GNUNET_htonll (start_message_id);
1169 req->end_message_id = GNUNET_htonll (end_message_id); 1317 req->end_message_id = GNUNET_htonll (end_message_id);
1170 req->message_limit = GNUNET_htonll (message_limit); 1318 req->message_limit = GNUNET_htonll (message_limit);
@@ -1172,8 +1320,7 @@ channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1172 req->op_id = GNUNET_htonll (hist->op_id); 1320 req->op_id = GNUNET_htonll (hist->op_id);
1173 GNUNET_memcpy (&req[1], method_prefix, method_size); 1321 GNUNET_memcpy (&req[1], method_prefix, method_size);
1174 1322
1175 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1323 GNUNET_MQ_send (chn->mq, env);
1176 GNUNET_free (req);
1177 return hist; 1324 return hist;
1178} 1325}
1179 1326
@@ -1263,7 +1410,7 @@ GNUNET_PSYC_channel_history_replay_cancel (struct GNUNET_PSYC_Channel *channel,
1263 struct GNUNET_PSYC_HistoryRequest *hist) 1410 struct GNUNET_PSYC_HistoryRequest *hist)
1264{ 1411{
1265 GNUNET_PSYC_receive_destroy (hist->recv); 1412 GNUNET_PSYC_receive_destroy (hist->recv);
1266 GNUNET_CLIENT_MANAGER_op_cancel (hist->chn->client, hist->op_id); 1413 GNUNET_OP_remove (hist->chn->op, hist->op_id);
1267 GNUNET_free (hist); 1414 GNUNET_free (hist);
1268} 1415}
1269 1416
@@ -1301,20 +1448,17 @@ channel_state_get (struct GNUNET_PSYC_Channel *chn,
1301 sr->var_cb = var_cb; 1448 sr->var_cb = var_cb;
1302 sr->result_cb = result_cb; 1449 sr->result_cb = result_cb;
1303 sr->cls = cls; 1450 sr->cls = cls;
1304 sr->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client, 1451 sr->op_id = GNUNET_OP_add (chn->op, op_recv_state_result, sr, NULL);
1305 &op_recv_state_result, sr);
1306 1452
1307 GNUNET_assert (NULL != name); 1453 GNUNET_assert (NULL != name);
1308 size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE 1454 size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE
1309 - sizeof (*req)) + 1; 1455 - sizeof (*req)) + 1;
1310 req = GNUNET_malloc (sizeof (*req) + name_size); 1456 struct GNUNET_MQ_Envelope *
1311 req->header.type = htons (type); 1457 env = GNUNET_MQ_msg_extra (req, name_size, type);
1312 req->header.size = htons (sizeof (*req) + name_size);
1313 req->op_id = GNUNET_htonll (sr->op_id); 1458 req->op_id = GNUNET_htonll (sr->op_id);
1314 GNUNET_memcpy (&req[1], name, name_size); 1459 GNUNET_memcpy (&req[1], name, name_size);
1315 1460
1316 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1461 GNUNET_MQ_send (chn->mq, env);
1317 GNUNET_free (req);
1318 return sr; 1462 return sr;
1319} 1463}
1320 1464
@@ -1397,7 +1541,7 @@ GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn,
1397void 1541void
1398GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateRequest *sr) 1542GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateRequest *sr)
1399{ 1543{
1400 GNUNET_CLIENT_MANAGER_op_cancel (sr->chn->client, sr->op_id); 1544 GNUNET_OP_remove (sr->chn->op, sr->op_id);
1401 GNUNET_free (sr); 1545 GNUNET_free (sr);
1402} 1546}
1403 1547
diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c
index 94b7ff9f5..89be19a42 100644
--- a/src/psycstore/psycstore_api.c
+++ b/src/psycstore/psycstore_api.c
@@ -89,7 +89,6 @@ struct GNUNET_PSYCSTORE_Handle
89 */ 89 */
90 struct GNUNET_MQ_Handle *mq; 90 struct GNUNET_MQ_Handle *mq;
91 91
92
93 /** 92 /**
94 * Async operations. 93 * Async operations.
95 */ 94 */
diff --git a/src/psycutil/psyc_message.c b/src/psycutil/psyc_message.c
index 303ba8466..bc1896b1f 100644
--- a/src/psycutil/psyc_message.c
+++ b/src/psycutil/psyc_message.c
@@ -39,7 +39,7 @@ struct GNUNET_PSYC_TransmitHandle
39 /** 39 /**
40 * Client connection to service. 40 * Client connection to service.
41 */ 41 */
42 struct GNUNET_CLIENT_MANAGER_Connection *client; 42 struct GNUNET_MQ_Handle *mq;
43 43
44 /** 44 /**
45 * Message currently being received from the client. 45 * Message currently being received from the client.
@@ -47,6 +47,11 @@ struct GNUNET_PSYC_TransmitHandle
47 struct GNUNET_MessageHeader *msg; 47 struct GNUNET_MessageHeader *msg;
48 48
49 /** 49 /**
50 * Envelope for @a msg
51 */
52 struct GNUNET_MQ_Envelope *env;
53
54 /**
50 * Callback to request next modifier from client. 55 * Callback to request next modifier from client.
51 */ 56 */
52 GNUNET_PSYC_TransmitNotifyModifier notify_mod; 57 GNUNET_PSYC_TransmitNotifyModifier notify_mod;
@@ -327,11 +332,11 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
327 * Create a transmission handle. 332 * Create a transmission handle.
328 */ 333 */
329struct GNUNET_PSYC_TransmitHandle * 334struct GNUNET_PSYC_TransmitHandle *
330GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client) 335GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq)
331{ 336{
332 struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle); 337 struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle);
333 338
334 tmit->client = client; 339 tmit->mq = mq;
335 return tmit; 340 return tmit;
336} 341}
337 342
@@ -378,16 +383,15 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
378 { 383 {
379 /* End of message or buffer is full, add it to transmission queue 384 /* End of message or buffer is full, add it to transmission queue
380 * and start with empty buffer */ 385 * and start with empty buffer */
381 tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
382 tmit->msg->size = htons (tmit->msg->size); 386 tmit->msg->size = htons (tmit->msg->size);
383 GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg); 387 GNUNET_MQ_send (tmit->mq, tmit->env);
388 tmit->env = NULL;
384 tmit->msg = NULL; 389 tmit->msg = NULL;
385 tmit->acks_pending++; 390 tmit->acks_pending++;
386 } 391 }
387 else 392 else
388 { 393 {
389 /* Message fits in current buffer, append */ 394 /* Message fits in current buffer, append */
390 tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
391 GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size); 395 GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
392 tmit->msg->size += size; 396 tmit->msg->size += size;
393 } 397 }
@@ -396,8 +400,13 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
396 if (NULL == tmit->msg && NULL != msg) 400 if (NULL == tmit->msg && NULL != msg)
397 { 401 {
398 /* Empty buffer, copy over message. */ 402 /* Empty buffer, copy over message. */
399 tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size); 403 tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
404 GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
405 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
406 /* store current message size in host byte order
407 * then later switch it to network byte order before sending */
400 tmit->msg->size = sizeof (*tmit->msg) + size; 408 tmit->msg->size = sizeof (*tmit->msg) + size;
409
401 GNUNET_memcpy (&tmit->msg[1], msg, size); 410 GNUNET_memcpy (&tmit->msg[1], msg, size);
402 } 411 }
403 412
@@ -407,9 +416,9 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
407 < tmit->msg->size + sizeof (struct GNUNET_MessageHeader)))) 416 < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
408 { 417 {
409 /* End of message or buffer is full, add it to transmission queue. */ 418 /* End of message or buffer is full, add it to transmission queue. */
410 tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
411 tmit->msg->size = htons (tmit->msg->size); 419 tmit->msg->size = htons (tmit->msg->size);
412 GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg); 420 GNUNET_MQ_send (tmit->mq, tmit->env);
421 tmit->env = NULL;
413 tmit->msg = NULL; 422 tmit->msg = NULL;
414 tmit->acks_pending++; 423 tmit->acks_pending++;
415 } 424 }
@@ -722,7 +731,12 @@ GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
722 731
723 size_t size = strlen (method_name) + 1; 732 size_t size = strlen (method_name) + 1;
724 struct GNUNET_PSYC_MessageMethod *pmeth; 733 struct GNUNET_PSYC_MessageMethod *pmeth;
725 tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size); 734
735 tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
736 GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
737 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
738 /* store current message size in host byte order
739 * then later switch it to network byte order before sending */
726 tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size; 740 tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
727 741
728 if (NULL != notify_mod) 742 if (NULL != notify_mod)
diff --git a/src/social/social_api.c b/src/social/social_api.c
index 66af14813..9f15b4146 100644
--- a/src/social/social_api.c
+++ b/src/social/social_api.c
@@ -71,12 +71,27 @@ struct GNUNET_SOCIAL_App
71 /** 71 /**
72 * Client connection to the service. 72 * Client connection to the service.
73 */ 73 */
74 struct GNUNET_CLIENT_MANAGER_Connection *client; 74 struct GNUNET_MQ_Handle *mq;
75 75
76 /** 76 /**
77 * Message to send on reconnect. 77 * Message to send on connect.
78 */ 78 */
79 struct GNUNET_MessageHeader *connect_msg; 79 struct GNUNET_MQ_Envelope *connect_env;
80
81 /**
82 * Time to wait until we try to reconnect on failure.
83 */
84 struct GNUNET_TIME_Relative reconnect_delay;
85
86 /**
87 * Task for reconnecting when the listener fails.
88 */
89 struct GNUNET_SCHEDULER_Task *reconnect_task;
90
91 /**
92 * Async operations.
93 */
94 struct GNUNET_OP_Handle *op;
80 95
81 /** 96 /**
82 * Function called after disconnected from the service. 97 * Function called after disconnected from the service.
@@ -136,7 +151,27 @@ struct GNUNET_SOCIAL_Place
136 /** 151 /**
137 * Client connection to the service. 152 * Client connection to the service.
138 */ 153 */
139 struct GNUNET_CLIENT_MANAGER_Connection *client; 154 struct GNUNET_MQ_Handle *mq;
155
156 /**
157 * Message to send on connect.
158 */
159 struct GNUNET_MQ_Envelope *connect_env;
160
161 /**
162 * Time to wait until we try to reconnect on failure.
163 */
164 struct GNUNET_TIME_Relative reconnect_delay;
165
166 /**
167 * Task for reconnecting when the listener fails.
168 */
169 struct GNUNET_SCHEDULER_Task *reconnect_task;
170
171 /**
172 * Async operations.
173 */
174 struct GNUNET_OP_Handle *op;
140 175
141 /** 176 /**
142 * Transmission handle. 177 * Transmission handle.
@@ -149,11 +184,6 @@ struct GNUNET_SOCIAL_Place
149 struct GNUNET_PSYC_Slicer *slicer; 184 struct GNUNET_PSYC_Slicer *slicer;
150 185
151 /** 186 /**
152 * Message to send on reconnect.
153 */
154 struct GNUNET_MessageHeader *connect_msg;
155
156 /**
157 * Function called after disconnected from the service. 187 * Function called after disconnected from the service.
158 */ 188 */
159 GNUNET_ContinuationCallback disconnect_cb; 189 GNUNET_ContinuationCallback disconnect_cb;
@@ -337,7 +367,6 @@ struct ZoneAddPlaceHandle
337 367
338struct ZoneAddNymHandle 368struct ZoneAddNymHandle
339{ 369{
340 struct ZoneAddNymRequest *req;
341 GNUNET_ResultCallback result_cb; 370 GNUNET_ResultCallback result_cb;
342 void *result_cls; 371 void *result_cls;
343}; 372};
@@ -481,109 +510,66 @@ host_recv_notice_place_leave_eom (void *cls,
481} 510}
482 511
483 512
484/*** CLIENT ***/
485
486
487static void
488app_send_connect_msg (struct GNUNET_SOCIAL_App *app)
489{
490 uint16_t cmsg_size = ntohs (app->connect_msg->size);
491 struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size);
492 GNUNET_memcpy (cmsg, app->connect_msg, cmsg_size);
493 GNUNET_CLIENT_MANAGER_transmit_now (app->client, cmsg);
494 GNUNET_free (cmsg);
495}
496
497
498static void
499app_recv_disconnect (void *cls,
500 struct GNUNET_CLIENT_MANAGER_Connection *client,
501 const struct GNUNET_MessageHeader *msg)
502{
503 struct GNUNET_SOCIAL_App *
504 app = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*app));
505
506 GNUNET_CLIENT_MANAGER_reconnect (client);
507 app_send_connect_msg (app);
508}
509
510
511/*** PLACE ***/ 513/*** PLACE ***/
512 514
513 515
514static void 516static int
515place_send_connect_msg (struct GNUNET_SOCIAL_Place *plc) 517check_place_result (void *cls,
518 const struct GNUNET_OperationResultMessage *res)
516{ 519{
517 uint16_t cmsg_size = ntohs (plc->connect_msg->size); 520 uint16_t size = ntohs (res->header.size);
518 struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size); 521 if (size < sizeof (*res))
519 GNUNET_memcpy (cmsg, plc->connect_msg, cmsg_size); 522 { /* Error, message too small. */
520 GNUNET_CLIENT_MANAGER_transmit_now (plc->client, cmsg); 523 GNUNET_break (0);
521 GNUNET_free (cmsg); 524 return GNUNET_SYSERR;
525 }
526 return GNUNET_OK;
522} 527}
523 528
524 529
525static void 530static void
526place_recv_disconnect (void *cls, 531handle_place_result (void *cls,
527 struct GNUNET_CLIENT_MANAGER_Connection *client, 532 const struct GNUNET_OperationResultMessage *res)
528 const struct GNUNET_MessageHeader *msg)
529{ 533{
530 struct GNUNET_SOCIAL_Place * 534 struct GNUNET_SOCIAL_Place *plc = cls;
531 plc = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*plc));
532 535
533 GNUNET_CLIENT_MANAGER_reconnect (client); 536 uint16_t size = ntohs (res->header.size);
534 place_send_connect_msg (plc); 537 uint16_t data_size = size - sizeof (*res);
538 const char *data = (0 < data_size) ? (const char *) &res[1] : NULL;
539
540 GNUNET_OP_result (plc->op, GNUNET_ntohll (res->op_id),
541 GNUNET_ntohll (res->result_code),
542 data, data_size, NULL);
535} 543}
536 544
537 545
538static void 546static int
539place_recv_result (void *cls, 547check_app_result (void *cls,
540 struct GNUNET_CLIENT_MANAGER_Connection *client, 548 const struct GNUNET_OperationResultMessage *res)
541 const struct GNUNET_MessageHeader *msg)
542{ 549{
543 struct GNUNET_SOCIAL_Place * 550 uint16_t size = ntohs (res->header.size);
544 plc = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*plc));
545
546 const struct GNUNET_OperationResultMessage *
547 res = (const struct GNUNET_OperationResultMessage *) msg;
548
549 uint16_t size = ntohs (msg->size);
550 if (size < sizeof (*res)) 551 if (size < sizeof (*res))
551 { /* Error, message too small. */ 552 { /* Error, message too small. */
552 GNUNET_break (0); 553 GNUNET_break (0);
553 return; 554 return GNUNET_SYSERR;
554 } 555 }
555 556 return GNUNET_OK;
556 uint16_t data_size = size - sizeof (*res);
557 const char *data = (0 < data_size) ? (const char *) &res[1] : NULL;
558 GNUNET_CLIENT_MANAGER_op_result (plc->client, GNUNET_ntohll (res->op_id),
559 GNUNET_ntohll (res->result_code),
560 data, data_size);
561} 557}
562 558
563 559
564static void 560static void
565app_recv_result (void *cls, 561handle_app_result (void *cls,
566 struct GNUNET_CLIENT_MANAGER_Connection *client, 562 const struct GNUNET_OperationResultMessage *res)
567 const struct GNUNET_MessageHeader *msg)
568{ 563{
569 struct GNUNET_SOCIAL_App * 564 struct GNUNET_SOCIAL_App *app = cls;
570 app = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*app));
571
572 const struct GNUNET_OperationResultMessage *
573 res = (const struct GNUNET_OperationResultMessage *) msg;
574
575 uint16_t size = ntohs (msg->size);
576 if (size < sizeof (*res))
577 { /* Error, message too small. */
578 GNUNET_break (0);
579 return;
580 }
581 565
566 uint16_t size = ntohs (res->header.size);
582 uint16_t data_size = size - sizeof (*res); 567 uint16_t data_size = size - sizeof (*res);
583 const char *data = (0 < data_size) ? (const char *) &res[1] : NULL; 568 const char *data = (0 < data_size) ? (const char *) &res[1] : NULL;
584 GNUNET_CLIENT_MANAGER_op_result (app->client, GNUNET_ntohll (res->op_id), 569
585 GNUNET_ntohll (res->result_code), 570 GNUNET_OP_result (app->op, GNUNET_ntohll (res->op_id),
586 data, data_size); 571 GNUNET_ntohll (res->result_code),
572 data, data_size, NULL);
587} 573}
588 574
589 575
@@ -619,18 +605,30 @@ op_recv_state_result (void *cls, int64_t result,
619} 605}
620 606
621 607
622static void 608static int
623place_recv_history_result (void *cls, 609check_place_history_result (void *cls,
624 struct GNUNET_CLIENT_MANAGER_Connection *client, 610 const struct GNUNET_OperationResultMessage *res)
625 const struct GNUNET_MessageHeader *msg)
626{ 611{
627 struct GNUNET_SOCIAL_Place * 612 struct GNUNET_PSYC_MessageHeader *
628 plc = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*plc)); 613 pmsg = (struct GNUNET_PSYC_MessageHeader *) GNUNET_MQ_extract_nested_mh (res);
614 uint16_t size = ntohs (res->header.size);
615
616 if (NULL == pmsg || size < sizeof (*res) + sizeof (*pmsg))
617 { /* Error, message too small. */
618 GNUNET_break (0);
619 return GNUNET_SYSERR;
620 }
621 return GNUNET_OK;
622}
629 623
630 const struct GNUNET_OperationResultMessage * 624
631 res = (const struct GNUNET_OperationResultMessage *) msg; 625static void
626handle_place_history_result (void *cls,
627 const struct GNUNET_OperationResultMessage *res)
628{
629 struct GNUNET_SOCIAL_Place *plc = cls;
632 struct GNUNET_PSYC_MessageHeader * 630 struct GNUNET_PSYC_MessageHeader *
633 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1]; 631 pmsg = (struct GNUNET_PSYC_MessageHeader *) GNUNET_MQ_extract_nested_mh (res);
634 632
635 LOG (GNUNET_ERROR_TYPE_DEBUG, 633 LOG (GNUNET_ERROR_TYPE_DEBUG,
636 "%p Received historic fragment for message #%" PRIu64 ".\n", 634 "%p Received historic fragment for message #%" PRIu64 ".\n",
@@ -639,9 +637,9 @@ place_recv_history_result (void *cls,
639 GNUNET_ResultCallback result_cb = NULL; 637 GNUNET_ResultCallback result_cb = NULL;
640 struct GNUNET_SOCIAL_HistoryRequest *hist = NULL; 638 struct GNUNET_SOCIAL_HistoryRequest *hist = NULL;
641 639
642 if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (plc->client, 640 if (GNUNET_YES != GNUNET_OP_get (plc->op,
643 GNUNET_ntohll (res->op_id), 641 GNUNET_ntohll (res->op_id),
644 &result_cb, (void *) &hist)) 642 &result_cb, (void *) &hist, NULL))
645 { /* Operation not found. */ 643 { /* Operation not found. */
646 LOG (GNUNET_ERROR_TYPE_WARNING, 644 LOG (GNUNET_ERROR_TYPE_WARNING,
647 "%p Replay operation not found for historic fragment of message #%" 645 "%p Replay operation not found for historic fragment of message #%"
@@ -650,50 +648,50 @@ place_recv_history_result (void *cls,
650 return; 648 return;
651 } 649 }
652 650
653 uint16_t size = ntohs (msg->size);
654 if (size < sizeof (*res) + sizeof (*pmsg))
655 { /* Error, message too small. */
656 GNUNET_break (0);
657 return;
658 }
659
660 GNUNET_PSYC_slicer_message (hist->slicer, 651 GNUNET_PSYC_slicer_message (hist->slicer,
661 (const struct GNUNET_PSYC_MessageHeader *) pmsg); 652 (const struct GNUNET_PSYC_MessageHeader *) pmsg);
662} 653}
663 654
664 655
665static void 656static int
666place_recv_state_result (void *cls, 657check_place_state_result (void *cls,
667 struct GNUNET_CLIENT_MANAGER_Connection *client, 658 const struct GNUNET_OperationResultMessage *res)
668 const struct GNUNET_MessageHeader *msg)
669{ 659{
670 struct GNUNET_SOCIAL_Place * 660 const struct GNUNET_MessageHeader *mod = GNUNET_MQ_extract_nested_mh (res);
671 plc = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*plc)); 661 uint16_t mod_size = ntohs (mod->size);
662 uint16_t size = ntohs (res->header.size);
672 663
673 const struct GNUNET_OperationResultMessage * 664 if (NULL == mod || size - sizeof (*res) != mod_size)
674 res = (const struct GNUNET_OperationResultMessage *) msg; 665 {
666 GNUNET_break_op (0);
667 LOG (GNUNET_ERROR_TYPE_WARNING,
668 "Invalid modifier size in state result: %u - %u != %u\n",
669 ntohs (res->header.size), sizeof (*res), mod_size);
670 return GNUNET_SYSERR;
671 }
672 return GNUNET_OK;
673}
674
675
676static void
677handle_place_state_result (void *cls,
678 const struct GNUNET_OperationResultMessage *res)
679{
680 struct GNUNET_SOCIAL_Place *plc = cls;
675 681
676 GNUNET_ResultCallback result_cb = NULL; 682 GNUNET_ResultCallback result_cb = NULL;
677 struct GNUNET_SOCIAL_LookHandle *look = NULL; 683 struct GNUNET_SOCIAL_LookHandle *look = NULL;
678 684
679 if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (plc->client, 685 if (GNUNET_YES != GNUNET_OP_get (plc->op,
680 GNUNET_ntohll (res->op_id), 686 GNUNET_ntohll (res->op_id),
681 &result_cb, (void *) &look)) 687 &result_cb, (void *) &look, NULL))
682 { /* Operation not found. */ 688 { /* Operation not found. */
683 return; 689 return;
684 } 690 }
685 691
686 const struct GNUNET_MessageHeader * 692 const struct GNUNET_MessageHeader *mod = GNUNET_MQ_extract_nested_mh (res);
687 mod = (struct GNUNET_MessageHeader *) &res[1];
688 uint16_t mod_size = ntohs (mod->size); 693 uint16_t mod_size = ntohs (mod->size);
689 if (ntohs (msg->size) - sizeof (*res) != mod_size) 694
690 {
691 GNUNET_break_op (0);
692 LOG (GNUNET_ERROR_TYPE_WARNING,
693 "Invalid modifier size in state result: %u - %u != %u\n",
694 ntohs (msg->size), sizeof (*res), mod_size);
695 return;
696 }
697 switch (ntohs (mod->type)) 695 switch (ntohs (mod->type))
698 { 696 {
699 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: 697 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
@@ -737,52 +735,58 @@ place_recv_state_result (void *cls,
737 735
738 736
739static void 737static void
740place_recv_message_ack (void *cls, 738handle_place_message_ack (void *cls,
741 struct GNUNET_CLIENT_MANAGER_Connection *client, 739 const struct GNUNET_MessageHeader *msg)
742 const struct GNUNET_MessageHeader *msg)
743{ 740{
744 struct GNUNET_SOCIAL_Place * 741 struct GNUNET_SOCIAL_Place *plc = cls;
745 plc = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*plc)); 742
746 GNUNET_PSYC_transmit_got_ack (plc->tmit); 743 GNUNET_PSYC_transmit_got_ack (plc->tmit);
747} 744}
748 745
749 746
747static int
748check_place_message (void *cls,
749 const struct GNUNET_PSYC_MessageHeader *pmsg)
750{
751 return GNUNET_OK;
752}
753
754
750static void 755static void
751place_recv_message (void *cls, 756handle_place_message (void *cls,
752 struct GNUNET_CLIENT_MANAGER_Connection *client, 757 const struct GNUNET_PSYC_MessageHeader *pmsg)
753 const struct GNUNET_MessageHeader *msg) 758{
759 struct GNUNET_SOCIAL_Place *plc = cls;
760
761 GNUNET_PSYC_slicer_message (plc->slicer, pmsg);
762}
763
764
765static int
766check_host_message (void *cls,
767 const struct GNUNET_PSYC_MessageHeader *pmsg)
754{ 768{
755 struct GNUNET_SOCIAL_Place * 769 return GNUNET_OK;
756 plc = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*plc));
757 GNUNET_PSYC_slicer_message (plc->slicer,
758 (const struct GNUNET_PSYC_MessageHeader *) msg);
759} 770}
760 771
761 772
762static void 773static void
763host_recv_message (void *cls, 774handle_host_message (void *cls,
764 struct GNUNET_CLIENT_MANAGER_Connection *client, 775 const struct GNUNET_PSYC_MessageHeader *pmsg)
765 const struct GNUNET_MessageHeader *msg)
766{ 776{
767 struct GNUNET_SOCIAL_Host * 777 struct GNUNET_SOCIAL_Host *hst = cls;
768 hst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (hst->plc)); 778
769 GNUNET_PSYC_slicer_message (hst->slicer, 779 GNUNET_PSYC_slicer_message (hst->slicer, pmsg);
770 (const struct GNUNET_PSYC_MessageHeader *) msg); 780 GNUNET_PSYC_slicer_message (hst->plc.slicer, pmsg);
771 GNUNET_PSYC_slicer_message (hst->plc.slicer,
772 (const struct GNUNET_PSYC_MessageHeader *) msg);
773} 781}
774 782
775 783
776static void 784static void
777host_recv_enter_ack (void *cls, 785handle_host_enter_ack (void *cls,
778 struct GNUNET_CLIENT_MANAGER_Connection *client, 786 const struct HostEnterAck *hack)
779 const struct GNUNET_MessageHeader *msg)
780{ 787{
781 struct GNUNET_SOCIAL_Host * 788 struct GNUNET_SOCIAL_Host *hst = cls;
782 hst = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
783 sizeof (struct GNUNET_SOCIAL_Place));
784 789
785 struct HostEnterAck *hack = (struct HostEnterAck *) msg;
786 hst->plc.pub_key = hack->place_pub_key; 790 hst->plc.pub_key = hack->place_pub_key;
787 791
788 int32_t result = ntohl (hack->result_code); 792 int32_t result = ntohl (hack->result_code);
@@ -792,14 +796,20 @@ host_recv_enter_ack (void *cls,
792} 796}
793 797
794 798
799static int
800check_host_enter_request (void *cls,
801 const struct GNUNET_PSYC_JoinRequestMessage *req)
802{
803 return GNUNET_OK;
804}
805
806
795static void 807static void
796host_recv_enter_request (void *cls, 808handle_host_enter_request (void *cls,
797 struct GNUNET_CLIENT_MANAGER_Connection *client, 809 const struct GNUNET_PSYC_JoinRequestMessage *req)
798 const struct GNUNET_MessageHeader *msg)
799{ 810{
800 struct GNUNET_SOCIAL_Host * 811 struct GNUNET_SOCIAL_Host *hst = cls;
801 hst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, 812
802 sizeof (struct GNUNET_SOCIAL_Place));
803 if (NULL == hst->answer_door_cb) 813 if (NULL == hst->answer_door_cb)
804 return; 814 return;
805 815
@@ -809,15 +819,13 @@ host_recv_enter_request (void *cls,
809 const void *data = NULL; 819 const void *data = NULL;
810 uint16_t data_size = 0; 820 uint16_t data_size = 0;
811 char *str; 821 char *str;
812 const struct GNUNET_PSYC_JoinRequestMessage *
813 req = (const struct GNUNET_PSYC_JoinRequestMessage *) msg;
814 const struct GNUNET_PSYC_Message *join_msg = NULL; 822 const struct GNUNET_PSYC_Message *join_msg = NULL;
815 823
816 do 824 do
817 { 825 {
818 if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size)) 826 if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size))
819 { 827 {
820 join_msg = (struct GNUNET_PSYC_Message *) &req[1]; 828 join_msg = (struct GNUNET_PSYC_Message *) GNUNET_MQ_extract_nested_mh (req);
821 LOG (GNUNET_ERROR_TYPE_DEBUG, 829 LOG (GNUNET_ERROR_TYPE_DEBUG,
822 "Received join_msg of type %u and size %u.\n", 830 "Received join_msg of type %u and size %u.\n",
823 ntohs (join_msg->header.type), ntohs (join_msg->header.size)); 831 ntohs (join_msg->header.type), ntohs (join_msg->header.size));
@@ -850,16 +858,11 @@ host_recv_enter_request (void *cls,
850 858
851 859
852static void 860static void
853guest_recv_enter_ack (void *cls, 861handle_guest_enter_ack (void *cls,
854 struct GNUNET_CLIENT_MANAGER_Connection *client, 862 const struct GNUNET_PSYC_CountersResultMessage *cres)
855 const struct GNUNET_MessageHeader *msg)
856{ 863{
857 struct GNUNET_SOCIAL_Guest * 864 struct GNUNET_SOCIAL_Guest *gst = cls;
858 gst = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
859 sizeof (struct GNUNET_SOCIAL_Place));
860 865
861 struct GNUNET_PSYC_CountersResultMessage *
862 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
863 int32_t result = ntohl (cres->result_code); 866 int32_t result = ntohl (cres->result_code);
864 if (NULL != gst->enter_cb) 867 if (NULL != gst->enter_cb)
865 gst->enter_cb (gst->cb_cls, result, &gst->plc.pub_key, 868 gst->enter_cb (gst->cb_cls, result, &gst->plc.pub_key,
@@ -867,36 +870,42 @@ guest_recv_enter_ack (void *cls,
867} 870}
868 871
869 872
873static int
874check_guest_enter_decision (void *cls,
875 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
876{
877 return GNUNET_OK;
878}
879
880
870static void 881static void
871guest_recv_join_decision (void *cls, 882handle_guest_enter_decision (void *cls,
872 struct GNUNET_CLIENT_MANAGER_Connection *client, 883 const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
873 const struct GNUNET_MessageHeader *msg)
874{ 884{
875 struct GNUNET_SOCIAL_Guest * 885 struct GNUNET_SOCIAL_Guest *gst = cls;
876 gst = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
877 sizeof (struct GNUNET_SOCIAL_Place));
878 const struct GNUNET_PSYC_JoinDecisionMessage *
879 dcsn = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
880 886
881 struct GNUNET_PSYC_Message *pmsg = NULL; 887 struct GNUNET_PSYC_Message *pmsg = NULL;
882 if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg)) 888 if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg))
883 pmsg = (struct GNUNET_PSYC_Message *) &dcsn[1]; 889 pmsg = (struct GNUNET_PSYC_Message *) GNUNET_MQ_extract_nested_mh (dcsn);
884 890
885 if (NULL != gst->entry_dcsn_cb) 891 if (NULL != gst->entry_dcsn_cb)
886 gst->entry_dcsn_cb (gst->cb_cls, ntohl (dcsn->is_admitted), pmsg); 892 gst->entry_dcsn_cb (gst->cb_cls, ntohl (dcsn->is_admitted), pmsg);
887} 893}
888 894
889 895
890static void 896static int
891app_recv_ego (void *cls, 897check_app_ego (void *cls,
892 struct GNUNET_CLIENT_MANAGER_Connection *client, 898 const struct AppEgoMessage *emsg)
893 const struct GNUNET_MessageHeader *msg)
894{ 899{
895 struct GNUNET_SOCIAL_App * 900 return GNUNET_OK;
896 app = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*app)); 901}
897 902
898 struct AppEgoMessage * 903
899 emsg = (struct AppEgoMessage *) msg; 904static void
905handle_app_ego (void *cls,
906 const struct AppEgoMessage *emsg)
907{
908 struct GNUNET_SOCIAL_App *app = cls;
900 909
901 uint16_t name_size = ntohs (emsg->header.size) - sizeof (*emsg); 910 uint16_t name_size = ntohs (emsg->header.size) - sizeof (*emsg);
902 911
@@ -928,25 +937,26 @@ app_recv_ego (void *cls,
928 937
929 938
930static void 939static void
931app_recv_ego_end (void *cls, 940handle_app_ego_end (void *cls,
932 struct GNUNET_CLIENT_MANAGER_Connection *client, 941 const struct GNUNET_MessageHeader *msg)
933 const struct GNUNET_MessageHeader *msg)
934{ 942{
935 struct GNUNET_SOCIAL_App * 943 //struct GNUNET_SOCIAL_App *app = cls;
936 app = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*app));
937} 944}
938 945
939 946
940static void 947static int
941app_recv_place (void *cls, 948check_app_place (void *cls,
942 struct GNUNET_CLIENT_MANAGER_Connection *client, 949 const struct AppPlaceMessage *pmsg)
943 const struct GNUNET_MessageHeader *msg)
944{ 950{
945 struct GNUNET_SOCIAL_App * 951 return GNUNET_OK;
946 app = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*app)); 952}
947 953
948 struct AppPlaceMessage * 954
949 pmsg = (struct AppPlaceMessage *) msg; 955static void
956handle_app_place (void *cls,
957 const struct AppPlaceMessage *pmsg)
958{
959 struct GNUNET_SOCIAL_App *app = cls;
950 960
951 if ((GNUNET_YES == pmsg->is_host && NULL == app->host_cb) 961 if ((GNUNET_YES == pmsg->is_host && NULL == app->host_cb)
952 || (GNUNET_NO == pmsg->is_host && NULL == app->guest_cb)) 962 || (GNUNET_NO == pmsg->is_host && NULL == app->guest_cb))
@@ -987,122 +997,16 @@ app_recv_place (void *cls,
987 997
988 998
989static void 999static void
990app_recv_place_end (void *cls, 1000handle_app_place_end (void *cls,
991 struct GNUNET_CLIENT_MANAGER_Connection *client, 1001 const struct GNUNET_MessageHeader *msg)
992 const struct GNUNET_MessageHeader *msg)
993{ 1002{
994 struct GNUNET_SOCIAL_App * 1003 struct GNUNET_SOCIAL_App *app = cls;
995 app = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*app));
996 1004
997 if (NULL != app->connected_cb) 1005 if (NULL != app->connected_cb)
998 app->connected_cb (app->cb_cls); 1006 app->connected_cb (app->cb_cls);
999} 1007}
1000 1008
1001 1009
1002static struct GNUNET_CLIENT_MANAGER_MessageHandler host_handlers[] =
1003{
1004 { host_recv_enter_ack, NULL,
1005 GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK,
1006 sizeof (struct HostEnterAck), GNUNET_NO },
1007
1008 { host_recv_enter_request, NULL,
1009 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
1010 sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES },
1011
1012 { host_recv_message, NULL,
1013 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
1014 sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES },
1015
1016 { place_recv_message_ack, NULL,
1017 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
1018 sizeof (struct GNUNET_MessageHeader), GNUNET_NO },
1019
1020 { place_recv_history_result, NULL,
1021 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
1022 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
1023
1024 { place_recv_state_result, NULL,
1025 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
1026 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
1027
1028 { place_recv_result, NULL,
1029 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
1030 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
1031
1032 { place_recv_disconnect, NULL, 0, 0, GNUNET_NO },
1033
1034 { NULL, NULL, 0, 0, GNUNET_NO }
1035};
1036
1037
1038static struct GNUNET_CLIENT_MANAGER_MessageHandler guest_handlers[] =
1039{
1040 { guest_recv_enter_ack, NULL,
1041 GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK,
1042 sizeof (struct GNUNET_PSYC_CountersResultMessage), GNUNET_NO },
1043
1044 { host_recv_enter_request, NULL,
1045 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
1046 sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES },
1047
1048 { place_recv_message, NULL,
1049 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
1050 sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES },
1051
1052 { place_recv_message_ack, NULL,
1053 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
1054 sizeof (struct GNUNET_MessageHeader), GNUNET_NO },
1055
1056 { guest_recv_join_decision, NULL,
1057 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
1058 sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES },
1059
1060 { place_recv_history_result, NULL,
1061 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
1062 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
1063
1064 { place_recv_state_result, NULL,
1065 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
1066 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
1067
1068 { place_recv_result, NULL,
1069 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
1070 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
1071
1072 { place_recv_disconnect, NULL, 0, 0, GNUNET_NO },
1073
1074 { NULL, NULL, 0, 0, GNUNET_NO }
1075};
1076
1077
1078static struct GNUNET_CLIENT_MANAGER_MessageHandler app_handlers[] =
1079{
1080 { app_recv_ego, NULL,
1081 GNUNET_MESSAGE_TYPE_SOCIAL_APP_EGO,
1082 sizeof (struct AppEgoMessage), GNUNET_YES },
1083
1084 { app_recv_ego_end, NULL,
1085 GNUNET_MESSAGE_TYPE_SOCIAL_APP_EGO_END,
1086 sizeof (struct GNUNET_MessageHeader), GNUNET_NO },
1087
1088 { app_recv_place, NULL,
1089 GNUNET_MESSAGE_TYPE_SOCIAL_APP_PLACE,
1090 sizeof (struct AppPlaceMessage), GNUNET_NO },
1091
1092 { app_recv_place_end, NULL,
1093 GNUNET_MESSAGE_TYPE_SOCIAL_APP_PLACE_END,
1094 sizeof (struct GNUNET_MessageHeader), GNUNET_NO },
1095
1096 { app_recv_result, NULL,
1097 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
1098 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
1099
1100 { app_recv_disconnect, NULL, 0, 0, GNUNET_NO },
1101
1102 { NULL, NULL, 0, 0, GNUNET_NO }
1103};
1104
1105
1106static void 1010static void
1107place_cleanup (struct GNUNET_SOCIAL_Place *plc) 1011place_cleanup (struct GNUNET_SOCIAL_Place *plc)
1108{ 1012{
@@ -1114,18 +1018,26 @@ place_cleanup (struct GNUNET_SOCIAL_Place *plc)
1114 GNUNET_h2s (&place_pub_hash)); 1018 GNUNET_h2s (&place_pub_hash));
1115 1019
1116 if (NULL != plc->tmit) 1020 if (NULL != plc->tmit)
1021 {
1117 GNUNET_PSYC_transmit_destroy (plc->tmit); 1022 GNUNET_PSYC_transmit_destroy (plc->tmit);
1118 if (NULL != plc->connect_msg) 1023 plc->tmit = NULL;
1119 GNUNET_free (plc->connect_msg); 1024 }
1025 if (NULL != plc->connect_env)
1026 {
1027 GNUNET_MQ_discard (plc->connect_env);
1028 plc->connect_env = NULL;
1029 }
1120 if (NULL != plc->disconnect_cb) 1030 if (NULL != plc->disconnect_cb)
1031 {
1121 plc->disconnect_cb (plc->disconnect_cls); 1032 plc->disconnect_cb (plc->disconnect_cls);
1033 plc->disconnect_cb = NULL;
1034 }
1122} 1035}
1123 1036
1124 1037
1125static void 1038static void
1126host_cleanup (void *cls) 1039host_cleanup (struct GNUNET_SOCIAL_Host *hst)
1127{ 1040{
1128 struct GNUNET_SOCIAL_Host *hst = cls;
1129 place_cleanup (&hst->plc); 1041 place_cleanup (&hst->plc);
1130 if (NULL != hst->slicer) 1042 if (NULL != hst->slicer)
1131 { 1043 {
@@ -1137,9 +1049,8 @@ host_cleanup (void *cls)
1137 1049
1138 1050
1139static void 1051static void
1140guest_cleanup (void *cls) 1052guest_cleanup (struct GNUNET_SOCIAL_Guest *gst)
1141{ 1053{
1142 struct GNUNET_SOCIAL_Guest *gst = cls;
1143 place_cleanup (&gst->plc); 1054 place_cleanup (&gst->plc);
1144 GNUNET_free (gst); 1055 GNUNET_free (gst);
1145} 1056}
@@ -1147,6 +1058,103 @@ guest_cleanup (void *cls)
1147 1058
1148/*** HOST ***/ 1059/*** HOST ***/
1149 1060
1061
1062static void
1063host_connect (struct GNUNET_SOCIAL_Host *hst);
1064
1065
1066static void
1067host_reconnect (void *cls)
1068{
1069 host_connect (cls);
1070}
1071
1072
1073/**
1074 * Host client disconnected from service.
1075 *
1076 * Reconnect after backoff period.
1077 */
1078static void
1079host_disconnected (void *cls, enum GNUNET_MQ_Error error)
1080{
1081 struct GNUNET_SOCIAL_Host *hst = cls;
1082 struct GNUNET_SOCIAL_Place *plc = &hst->plc;
1083
1084 LOG (GNUNET_ERROR_TYPE_DEBUG,
1085 "Host client disconnected (%d), re-connecting\n",
1086 (int) error);
1087 if (NULL != plc->mq)
1088 {
1089 GNUNET_MQ_destroy (plc->mq);
1090 plc->mq = NULL;
1091 }
1092 if (NULL != plc->tmit)
1093 {
1094 GNUNET_PSYC_transmit_destroy (plc->tmit);
1095 plc->tmit = NULL;
1096 }
1097
1098 plc->reconnect_task = GNUNET_SCHEDULER_add_delayed (plc->reconnect_delay,
1099 host_reconnect,
1100 hst);
1101 plc->reconnect_delay = GNUNET_TIME_STD_BACKOFF (plc->reconnect_delay);
1102}
1103
1104
1105static void
1106host_connect (struct GNUNET_SOCIAL_Host *hst)
1107{
1108 struct GNUNET_SOCIAL_Place *plc = &hst->plc;
1109
1110 GNUNET_MQ_hd_fixed_size (host_enter_ack,
1111 GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER_ACK,
1112 struct HostEnterAck);
1113
1114 GNUNET_MQ_hd_var_size (host_enter_request,
1115 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
1116 struct GNUNET_PSYC_JoinRequestMessage);
1117
1118 GNUNET_MQ_hd_var_size (host_message,
1119 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
1120 struct GNUNET_PSYC_MessageHeader);
1121
1122 GNUNET_MQ_hd_fixed_size (place_message_ack,
1123 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
1124 struct GNUNET_MessageHeader);
1125
1126 GNUNET_MQ_hd_var_size (place_history_result,
1127 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
1128 struct GNUNET_OperationResultMessage);
1129
1130 GNUNET_MQ_hd_var_size (place_state_result,
1131 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
1132 struct GNUNET_OperationResultMessage);
1133
1134 GNUNET_MQ_hd_var_size (place_result,
1135 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
1136 struct GNUNET_OperationResultMessage);
1137
1138 struct GNUNET_MQ_MessageHandler handlers[] = {
1139 make_host_enter_ack_handler (hst),
1140 make_host_enter_request_handler (hst),
1141 make_host_message_handler (plc),
1142 make_place_message_ack_handler (plc),
1143 make_place_history_result_handler (plc),
1144 make_place_state_result_handler (plc),
1145 make_place_result_handler (plc),
1146 GNUNET_MQ_handler_end ()
1147 };
1148
1149 plc->mq = GNUNET_CLIENT_connecT (plc->cfg, "social",
1150 handlers, host_disconnected, hst);
1151 GNUNET_assert (NULL != plc->mq);
1152 plc->tmit = GNUNET_PSYC_transmit_create (plc->mq);
1153
1154 GNUNET_MQ_send_copy (plc->mq, plc->connect_env);
1155}
1156
1157
1150/** 1158/**
1151 * Enter a place as host. 1159 * Enter a place as host.
1152 * 1160 *
@@ -1194,10 +1202,7 @@ GNUNET_SOCIAL_host_enter (const struct GNUNET_SOCIAL_App *app,
1194 hst->farewell_cb = farewell_cb; 1202 hst->farewell_cb = farewell_cb;
1195 hst->cb_cls = cls; 1203 hst->cb_cls = cls;
1196 1204
1197 plc->client = GNUNET_CLIENT_MANAGER_connect (plc->cfg, "social", host_handlers); 1205 plc->op = GNUNET_OP_create ();
1198 GNUNET_CLIENT_MANAGER_set_user_context_ (plc->client, hst, sizeof (*plc));
1199
1200 plc->tmit = GNUNET_PSYC_transmit_create (plc->client);
1201 1206
1202 hst->slicer = GNUNET_PSYC_slicer_create (); 1207 hst->slicer = GNUNET_PSYC_slicer_create ();
1203 GNUNET_PSYC_slicer_method_add (hst->slicer, "_notice_place_leave", NULL, 1208 GNUNET_PSYC_slicer_method_add (hst->slicer, "_notice_place_leave", NULL,
@@ -1206,16 +1211,14 @@ GNUNET_SOCIAL_host_enter (const struct GNUNET_SOCIAL_App *app,
1206 NULL, host_recv_notice_place_leave_eom, hst); 1211 NULL, host_recv_notice_place_leave_eom, hst);
1207 1212
1208 uint16_t app_id_size = strlen (app->id) + 1; 1213 uint16_t app_id_size = strlen (app->id) + 1;
1209 struct HostEnterRequest *hreq = GNUNET_malloc (sizeof (*hreq) + app_id_size); 1214 struct HostEnterRequest *hreq;
1210 hreq->header.size = htons (sizeof (*hreq) + app_id_size); 1215 plc->connect_env = GNUNET_MQ_msg_extra (hreq, app_id_size,
1211 hreq->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER); 1216 GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER);
1212 hreq->policy = policy; 1217 hreq->policy = policy;
1213 hreq->ego_pub_key = ego->pub_key; 1218 hreq->ego_pub_key = ego->pub_key;
1214 GNUNET_memcpy (&hreq[1], app->id, app_id_size); 1219 GNUNET_memcpy (&hreq[1], app->id, app_id_size);
1215 1220
1216 plc->connect_msg = &hreq->header; 1221 host_connect (hst);
1217 place_send_connect_msg (plc);
1218
1219 return hst; 1222 return hst;
1220} 1223}
1221 1224
@@ -1250,9 +1253,6 @@ GNUNET_SOCIAL_host_enter_reconnect (struct GNUNET_SOCIAL_HostConnection *hconn,
1250 struct GNUNET_SOCIAL_Host *hst = GNUNET_malloc (sizeof (*hst)); 1253 struct GNUNET_SOCIAL_Host *hst = GNUNET_malloc (sizeof (*hst));
1251 struct GNUNET_SOCIAL_Place *plc = &hst->plc; 1254 struct GNUNET_SOCIAL_Place *plc = &hst->plc;
1252 1255
1253 size_t app_id_size = strlen (hconn->app->id) + 1;
1254 struct HostEnterRequest *hreq = GNUNET_malloc (sizeof (*hreq) + app_id_size);
1255
1256 hst->enter_cb = enter_cb; 1256 hst->enter_cb = enter_cb;
1257 hst->answer_door_cb = answer_door_cb; 1257 hst->answer_door_cb = answer_door_cb;
1258 hst->farewell_cb = farewell_cb; 1258 hst->farewell_cb = farewell_cb;
@@ -1264,10 +1264,7 @@ GNUNET_SOCIAL_host_enter_reconnect (struct GNUNET_SOCIAL_HostConnection *hconn,
1264 plc->pub_key = hconn->plc_msg.place_pub_key; 1264 plc->pub_key = hconn->plc_msg.place_pub_key;
1265 plc->ego_pub_key = hconn->plc_msg.ego_pub_key; 1265 plc->ego_pub_key = hconn->plc_msg.ego_pub_key;
1266 1266
1267 plc->client = GNUNET_CLIENT_MANAGER_connect (plc->cfg, "social", host_handlers); 1267 plc->op = GNUNET_OP_create ();
1268 GNUNET_CLIENT_MANAGER_set_user_context_ (plc->client, hst, sizeof (*plc));
1269
1270 plc->tmit = GNUNET_PSYC_transmit_create (plc->client);
1271 1268
1272 hst->slicer = GNUNET_PSYC_slicer_create (); 1269 hst->slicer = GNUNET_PSYC_slicer_create ();
1273 GNUNET_PSYC_slicer_method_add (hst->slicer, "_notice_place_leave", NULL, 1270 GNUNET_PSYC_slicer_method_add (hst->slicer, "_notice_place_leave", NULL,
@@ -1275,15 +1272,15 @@ GNUNET_SOCIAL_host_enter_reconnect (struct GNUNET_SOCIAL_HostConnection *hconn,
1275 host_recv_notice_place_leave_modifier, 1272 host_recv_notice_place_leave_modifier,
1276 NULL, host_recv_notice_place_leave_eom, hst); 1273 NULL, host_recv_notice_place_leave_eom, hst);
1277 1274
1278 hreq->header.size = htons (sizeof (*hreq) + app_id_size); 1275 size_t app_id_size = strlen (hconn->app->id) + 1;
1279 hreq->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER); 1276 struct HostEnterRequest *hreq;
1277 plc->connect_env = GNUNET_MQ_msg_extra (hreq, app_id_size,
1278 GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER);
1280 hreq->place_pub_key = hconn->plc_msg.place_pub_key; 1279 hreq->place_pub_key = hconn->plc_msg.place_pub_key;
1281 hreq->ego_pub_key = hconn->plc_msg.ego_pub_key; 1280 hreq->ego_pub_key = hconn->plc_msg.ego_pub_key;
1282 GNUNET_memcpy (&hreq[1], hconn->app->id, app_id_size); 1281 GNUNET_memcpy (&hreq[1], hconn->app->id, app_id_size);
1283 1282
1284 plc->connect_msg = &hreq->header; 1283 host_connect (hst);
1285 place_send_connect_msg (plc);
1286
1287 return hst; 1284 return hst;
1288} 1285}
1289 1286
@@ -1316,6 +1313,7 @@ GNUNET_SOCIAL_host_entry_decision (struct GNUNET_SOCIAL_Host *hst,
1316 int is_admitted, 1313 int is_admitted,
1317 const struct GNUNET_PSYC_Message *entry_resp) 1314 const struct GNUNET_PSYC_Message *entry_resp)
1318{ 1315{
1316 struct GNUNET_SOCIAL_Place *plc = &hst->plc;
1319 struct GNUNET_PSYC_JoinDecisionMessage *dcsn; 1317 struct GNUNET_PSYC_JoinDecisionMessage *dcsn;
1320 uint16_t entry_resp_size 1318 uint16_t entry_resp_size
1321 = (NULL != entry_resp) ? ntohs (entry_resp->header.size) : 0; 1319 = (NULL != entry_resp) ? ntohs (entry_resp->header.size) : 0;
@@ -1323,17 +1321,16 @@ GNUNET_SOCIAL_host_entry_decision (struct GNUNET_SOCIAL_Host *hst,
1323 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < sizeof (*dcsn) + entry_resp_size) 1321 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < sizeof (*dcsn) + entry_resp_size)
1324 return GNUNET_SYSERR; 1322 return GNUNET_SYSERR;
1325 1323
1326 dcsn = GNUNET_malloc (sizeof (*dcsn) + entry_resp_size); 1324 struct GNUNET_MQ_Envelope *
1327 dcsn->header.size = htons (sizeof (*dcsn) + entry_resp_size); 1325 env = GNUNET_MQ_msg_extra (dcsn, entry_resp_size,
1328 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); 1326 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
1329 dcsn->is_admitted = htonl (is_admitted); 1327 dcsn->is_admitted = htonl (is_admitted);
1330 dcsn->slave_pub_key = nym->pub_key; 1328 dcsn->slave_pub_key = nym->pub_key;
1331 1329
1332 if (0 < entry_resp_size) 1330 if (0 < entry_resp_size)
1333 GNUNET_memcpy (&dcsn[1], entry_resp, entry_resp_size); 1331 GNUNET_memcpy (&dcsn[1], entry_resp, entry_resp_size);
1334 1332
1335 GNUNET_CLIENT_MANAGER_transmit (hst->plc.client, &dcsn->header); 1333 GNUNET_MQ_send (plc->mq, env);
1336 GNUNET_free (dcsn);
1337 return GNUNET_OK; 1334 return GNUNET_OK;
1338} 1335}
1339 1336
@@ -1524,10 +1521,11 @@ GNUNET_SOCIAL_host_get_place (struct GNUNET_SOCIAL_Host *hst)
1524void 1521void
1525place_leave (struct GNUNET_SOCIAL_Place *plc) 1522place_leave (struct GNUNET_SOCIAL_Place *plc)
1526{ 1523{
1527 struct GNUNET_MessageHeader msg; 1524 struct GNUNET_MessageHeader *msg;
1528 msg.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_PLACE_LEAVE); 1525 struct GNUNET_MQ_Envelope *
1529 msg.size = htons (sizeof (msg)); 1526 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SOCIAL_PLACE_LEAVE);
1530 GNUNET_CLIENT_MANAGER_transmit (plc->client, &msg); 1527
1528 GNUNET_MQ_send (plc->mq, env);
1531} 1529}
1532 1530
1533 1531
@@ -1539,10 +1537,12 @@ place_disconnect (struct GNUNET_SOCIAL_Place *plc,
1539 plc->disconnect_cb = disconnect_cb; 1537 plc->disconnect_cb = disconnect_cb;
1540 plc->disconnect_cls = disconnect_cls; 1538 plc->disconnect_cls = disconnect_cls;
1541 1539
1542 GNUNET_CLIENT_MANAGER_disconnect (plc->client, GNUNET_YES, 1540 // FIXME: wait till queued messages are sent
1543 GNUNET_YES == plc->is_host 1541 if (NULL != plc->mq)
1544 ? host_cleanup : guest_cleanup, 1542 {
1545 plc); 1543 GNUNET_MQ_destroy (plc->mq);
1544 plc->mq = NULL;
1545 }
1546} 1546}
1547 1547
1548 1548
@@ -1560,6 +1560,7 @@ GNUNET_SOCIAL_host_disconnect (struct GNUNET_SOCIAL_Host *hst,
1560 void *cls) 1560 void *cls)
1561{ 1561{
1562 place_disconnect (&hst->plc, disconnect_cb, cls); 1562 place_disconnect (&hst->plc, disconnect_cb, cls);
1563 host_cleanup (hst);
1563} 1564}
1564 1565
1565 1566
@@ -1595,7 +1596,104 @@ GNUNET_SOCIAL_host_leave (struct GNUNET_SOCIAL_Host *hst,
1595 1596
1596/*** GUEST ***/ 1597/*** GUEST ***/
1597 1598
1598static struct GuestEnterRequest * 1599
1600static void
1601guest_connect (struct GNUNET_SOCIAL_Guest *gst);
1602
1603
1604static void
1605guest_reconnect (void *cls)
1606{
1607 guest_connect (cls);
1608}
1609
1610
1611/**
1612 * Guest client disconnected from service.
1613 *
1614 * Reconnect after backoff period.
1615 */
1616static void
1617guest_disconnected (void *cls, enum GNUNET_MQ_Error error)
1618{
1619 struct GNUNET_SOCIAL_Guest *gst = cls;
1620 struct GNUNET_SOCIAL_Place *plc = &gst->plc;
1621
1622 LOG (GNUNET_ERROR_TYPE_DEBUG,
1623 "Guest client disconnected (%d), re-connecting\n",
1624 (int) error);
1625 if (NULL != plc->mq)
1626 {
1627 GNUNET_MQ_destroy (plc->mq);
1628 plc->mq = NULL;
1629 }
1630 if (NULL != plc->tmit)
1631 {
1632 GNUNET_PSYC_transmit_destroy (plc->tmit);
1633 plc->tmit = NULL;
1634 }
1635
1636 plc->reconnect_task = GNUNET_SCHEDULER_add_delayed (plc->reconnect_delay,
1637 guest_reconnect,
1638 gst);
1639 plc->reconnect_delay = GNUNET_TIME_STD_BACKOFF (plc->reconnect_delay);
1640}
1641
1642
1643static void
1644guest_connect (struct GNUNET_SOCIAL_Guest *gst)
1645{
1646 struct GNUNET_SOCIAL_Place *plc = &gst->plc;
1647
1648 GNUNET_MQ_hd_fixed_size (guest_enter_ack,
1649 GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_ACK,
1650 struct GNUNET_PSYC_CountersResultMessage);
1651
1652 GNUNET_MQ_hd_var_size (guest_enter_decision,
1653 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
1654 struct GNUNET_PSYC_JoinDecisionMessage);
1655
1656 GNUNET_MQ_hd_var_size (place_message,
1657 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
1658 struct GNUNET_PSYC_MessageHeader);
1659
1660 GNUNET_MQ_hd_fixed_size (place_message_ack,
1661 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
1662 struct GNUNET_MessageHeader);
1663
1664 GNUNET_MQ_hd_var_size (place_history_result,
1665 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
1666 struct GNUNET_OperationResultMessage);
1667
1668 GNUNET_MQ_hd_var_size (place_state_result,
1669 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
1670 struct GNUNET_OperationResultMessage);
1671
1672 GNUNET_MQ_hd_var_size (place_result,
1673 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
1674 struct GNUNET_OperationResultMessage);
1675
1676 struct GNUNET_MQ_MessageHandler handlers[] = {
1677 make_guest_enter_ack_handler (gst),
1678 make_guest_enter_decision_handler (gst),
1679 make_place_message_handler (plc),
1680 make_place_message_ack_handler (plc),
1681 make_place_history_result_handler (plc),
1682 make_place_state_result_handler (plc),
1683 make_place_result_handler (plc),
1684 GNUNET_MQ_handler_end ()
1685 };
1686
1687 plc->mq = GNUNET_CLIENT_connecT (plc->cfg, "social",
1688 handlers, guest_disconnected, gst);
1689 GNUNET_assert (NULL != plc->mq);
1690 plc->tmit = GNUNET_PSYC_transmit_create (plc->mq);
1691
1692 GNUNET_MQ_send_copy (plc->mq, plc->connect_env);
1693}
1694
1695
1696static struct GNUNET_MQ_Envelope *
1599guest_enter_request_create (const char *app_id, 1697guest_enter_request_create (const char *app_id,
1600 const struct GNUNET_CRYPTO_EcdsaPublicKey *ego_pub_key, 1698 const struct GNUNET_CRYPTO_EcdsaPublicKey *ego_pub_key,
1601 const struct GNUNET_CRYPTO_EddsaPublicKey *place_pub_key, 1699 const struct GNUNET_CRYPTO_EddsaPublicKey *place_pub_key,
@@ -1608,11 +1706,10 @@ guest_enter_request_create (const char *app_id,
1608 uint16_t join_msg_size = ntohs (join_msg->header.size); 1706 uint16_t join_msg_size = ntohs (join_msg->header.size);
1609 uint16_t relay_size = relay_count * sizeof (*relays); 1707 uint16_t relay_size = relay_count * sizeof (*relays);
1610 1708
1611 struct GuestEnterRequest * 1709 struct GuestEnterRequest *greq;
1612 greq = GNUNET_malloc (sizeof (*greq) + app_id_size + relay_size + join_msg_size); 1710 struct GNUNET_MQ_Envelope *
1613 1711 env = GNUNET_MQ_msg_extra (greq, app_id_size + relay_size + join_msg_size,
1614 greq->header.size = htons (sizeof (*greq) + app_id_size + relay_size + join_msg_size); 1712 GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER);
1615 greq->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER);
1616 greq->place_pub_key = *place_pub_key; 1713 greq->place_pub_key = *place_pub_key;
1617 greq->ego_pub_key = *ego_pub_key; 1714 greq->ego_pub_key = *ego_pub_key;
1618 greq->origin = *origin; 1715 greq->origin = *origin;
@@ -1629,7 +1726,7 @@ guest_enter_request_create (const char *app_id,
1629 } 1726 }
1630 1727
1631 GNUNET_memcpy (p, join_msg, join_msg_size); 1728 GNUNET_memcpy (p, join_msg, join_msg_size);
1632 return greq; 1729 return env;
1633} 1730}
1634 1731
1635 1732
@@ -1686,20 +1783,17 @@ GNUNET_SOCIAL_guest_enter (const struct GNUNET_SOCIAL_App *app,
1686 plc->is_host = GNUNET_NO; 1783 plc->is_host = GNUNET_NO;
1687 plc->slicer = slicer; 1784 plc->slicer = slicer;
1688 1785
1786 plc->op = GNUNET_OP_create ();
1787
1788 plc->connect_env
1789 = guest_enter_request_create (app->id, &ego->pub_key, &plc->pub_key,
1790 origin, relay_count, relays, entry_msg);
1791
1689 gst->enter_cb = local_enter_cb; 1792 gst->enter_cb = local_enter_cb;
1690 gst->entry_dcsn_cb = entry_dcsn_cb; 1793 gst->entry_dcsn_cb = entry_dcsn_cb;
1691 gst->cb_cls = cls; 1794 gst->cb_cls = cls;
1692 1795
1693 plc->client = GNUNET_CLIENT_MANAGER_connect (plc->cfg, "social", guest_handlers); 1796 guest_connect (gst);
1694 GNUNET_CLIENT_MANAGER_set_user_context_ (plc->client, gst, sizeof (*plc));
1695
1696 plc->tmit = GNUNET_PSYC_transmit_create (plc->client);
1697
1698 struct GuestEnterRequest *
1699 greq = guest_enter_request_create (app->id, &ego->pub_key, &plc->pub_key,
1700 origin, relay_count, relays, entry_msg);
1701 plc->connect_msg = &greq->header;
1702 place_send_connect_msg (plc);
1703 return gst; 1797 return gst;
1704} 1798}
1705 1799
@@ -1755,11 +1849,12 @@ GNUNET_SOCIAL_guest_enter_by_name (const struct GNUNET_SOCIAL_App *app,
1755 if (NULL != join_msg) 1849 if (NULL != join_msg)
1756 join_msg_size = ntohs (join_msg->header.size); 1850 join_msg_size = ntohs (join_msg->header.size);
1757 1851
1758 uint16_t greq_size = sizeof (struct GuestEnterByNameRequest) 1852 struct GuestEnterByNameRequest *greq;
1759 + app_id_size + gns_name_size + password_size + join_msg_size; 1853 plc->connect_env
1760 struct GuestEnterByNameRequest *greq = GNUNET_malloc (greq_size); 1854 = GNUNET_MQ_msg_extra (greq, app_id_size + gns_name_size
1761 greq->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_BY_NAME); 1855 + password_size + join_msg_size,
1762 greq->header.size = htons (greq_size); 1856 GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER_BY_NAME);
1857
1763 greq->ego_pub_key = ego->pub_key; 1858 greq->ego_pub_key = ego->pub_key;
1764 1859
1765 char *p = (char *) &greq[1]; 1860 char *p = (char *) &greq[1];
@@ -1772,23 +1867,18 @@ GNUNET_SOCIAL_guest_enter_by_name (const struct GNUNET_SOCIAL_App *app,
1772 if (NULL != join_msg) 1867 if (NULL != join_msg)
1773 GNUNET_memcpy (p, join_msg, join_msg_size); 1868 GNUNET_memcpy (p, join_msg, join_msg_size);
1774 1869
1775 gst->enter_cb = local_enter_cb;
1776 gst->entry_dcsn_cb = entry_decision_cb;
1777 gst->cb_cls = cls;
1778
1779 plc->ego_pub_key = ego->pub_key; 1870 plc->ego_pub_key = ego->pub_key;
1780 plc->cfg = app->cfg; 1871 plc->cfg = app->cfg;
1781 plc->is_host = GNUNET_NO; 1872 plc->is_host = GNUNET_NO;
1782 plc->slicer = slicer; 1873 plc->slicer = slicer;
1783 1874
1784 plc->client = GNUNET_CLIENT_MANAGER_connect (app->cfg, "social", guest_handlers); 1875 plc->op = GNUNET_OP_create ();
1785 GNUNET_CLIENT_MANAGER_set_user_context_ (plc->client, gst, sizeof (*plc));
1786
1787 plc->tmit = GNUNET_PSYC_transmit_create (plc->client);
1788 1876
1789 plc->connect_msg = &greq->header; 1877 gst->enter_cb = local_enter_cb;
1790 place_send_connect_msg (plc); 1878 gst->entry_dcsn_cb = entry_decision_cb;
1879 gst->cb_cls = cls;
1791 1880
1881 guest_connect (gst);
1792 return gst; 1882 return gst;
1793} 1883}
1794 1884
@@ -1821,33 +1911,28 @@ GNUNET_SOCIAL_guest_enter_reconnect (struct GNUNET_SOCIAL_GuestConnection *gconn
1821 struct GNUNET_SOCIAL_Place *plc = &gst->plc; 1911 struct GNUNET_SOCIAL_Place *plc = &gst->plc;
1822 1912
1823 uint16_t app_id_size = strlen (gconn->app->id) + 1; 1913 uint16_t app_id_size = strlen (gconn->app->id) + 1;
1824 uint16_t greq_size = sizeof (struct GuestEnterRequest) + app_id_size; 1914 struct GuestEnterRequest *greq;
1825 struct GuestEnterRequest *greq = GNUNET_malloc (greq_size); 1915 plc->connect_env
1826 greq->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER); 1916 = GNUNET_MQ_msg_extra (greq, app_id_size,
1827 greq->header.size = htons (greq_size); 1917 GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER);
1828 greq->ego_pub_key = gconn->plc_msg.ego_pub_key; 1918 greq->ego_pub_key = gconn->plc_msg.ego_pub_key;
1829 greq->place_pub_key = gconn->plc_msg.place_pub_key; 1919 greq->place_pub_key = gconn->plc_msg.place_pub_key;
1830 greq->flags = htonl (flags); 1920 greq->flags = htonl (flags);
1831 1921
1832 GNUNET_memcpy (&greq[1], gconn->app->id, app_id_size); 1922 GNUNET_memcpy (&greq[1], gconn->app->id, app_id_size);
1833 1923
1834 gst->enter_cb = local_enter_cb;
1835 gst->cb_cls = cls;
1836
1837 plc->cfg = gconn->app->cfg; 1924 plc->cfg = gconn->app->cfg;
1838 plc->is_host = GNUNET_NO; 1925 plc->is_host = GNUNET_NO;
1839 plc->slicer = slicer; 1926 plc->slicer = slicer;
1840 plc->pub_key = gconn->plc_msg.place_pub_key; 1927 plc->pub_key = gconn->plc_msg.place_pub_key;
1841 plc->ego_pub_key = gconn->plc_msg.ego_pub_key; 1928 plc->ego_pub_key = gconn->plc_msg.ego_pub_key;
1842 1929
1843 plc->client = GNUNET_CLIENT_MANAGER_connect (plc->cfg, "social", guest_handlers); 1930 plc->op = GNUNET_OP_create ();
1844 GNUNET_CLIENT_MANAGER_set_user_context_ (plc->client, gst, sizeof (*plc));
1845
1846 plc->tmit = GNUNET_PSYC_transmit_create (plc->client);
1847 1931
1848 plc->connect_msg = &greq->header; 1932 gst->enter_cb = local_enter_cb;
1849 place_send_connect_msg (plc); 1933 gst->cb_cls = cls;
1850 1934
1935 guest_connect (gst);
1851 return gst; 1936 return gst;
1852} 1937}
1853 1938
@@ -1931,6 +2016,7 @@ GNUNET_SOCIAL_guest_disconnect (struct GNUNET_SOCIAL_Guest *gst,
1931 void *cls) 2016 void *cls)
1932{ 2017{
1933 place_disconnect (&gst->plc, disconnect_cb, cls); 2018 place_disconnect (&gst->plc, disconnect_cb, cls);
2019 guest_cleanup (gst);
1934} 2020}
1935 2021
1936 2022
@@ -2015,15 +2101,14 @@ GNUNET_SOCIAL_place_msg_proc_set (struct GNUNET_SOCIAL_Place *plc,
2015 GNUNET_SERVER_MAX_MESSAGE_SIZE 2101 GNUNET_SERVER_MAX_MESSAGE_SIZE
2016 - sizeof (*mpreq)) + 1; 2102 - sizeof (*mpreq)) + 1;
2017 GNUNET_assert ('\0' == method_prefix[method_size - 1]); 2103 GNUNET_assert ('\0' == method_prefix[method_size - 1]);
2018 mpreq = GNUNET_malloc (sizeof (*mpreq) + method_size);
2019 2104
2020 mpreq->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_MSG_PROC_SET); 2105 struct GNUNET_MQ_Envelope *
2021 mpreq->header.size = htons (sizeof (*mpreq) + method_size); 2106 env = GNUNET_MQ_msg_extra (mpreq, method_size,
2107 GNUNET_MESSAGE_TYPE_SOCIAL_MSG_PROC_SET);
2022 mpreq->flags = htonl (flags); 2108 mpreq->flags = htonl (flags);
2023 GNUNET_memcpy (&mpreq[1], method_prefix, method_size); 2109 GNUNET_memcpy (&mpreq[1], method_prefix, method_size);
2024 2110
2025 GNUNET_CLIENT_MANAGER_transmit (plc->client, &mpreq->header); 2111 GNUNET_MQ_send (plc->mq, env);
2026 GNUNET_free (mpreq);
2027} 2112}
2028 2113
2029 2114
@@ -2033,10 +2118,11 @@ GNUNET_SOCIAL_place_msg_proc_set (struct GNUNET_SOCIAL_Place *plc,
2033void 2118void
2034GNUNET_SOCIAL_place_msg_proc_clear (struct GNUNET_SOCIAL_Place *plc) 2119GNUNET_SOCIAL_place_msg_proc_clear (struct GNUNET_SOCIAL_Place *plc)
2035{ 2120{
2036 struct GNUNET_MessageHeader req; 2121 struct GNUNET_MessageHeader *req;
2037 req.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_MSG_PROC_CLEAR); 2122 struct GNUNET_MQ_Envelope *
2038 req.size = htons (sizeof (req)); 2123 env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_SOCIAL_MSG_PROC_CLEAR);
2039 GNUNET_CLIENT_MANAGER_transmit (plc->client, &req); 2124
2125 GNUNET_MQ_send (plc->mq, env);
2040} 2126}
2041 2127
2042 2128
@@ -2057,17 +2143,17 @@ place_history_replay (struct GNUNET_SOCIAL_Place *plc,
2057 hist->slicer = slicer; 2143 hist->slicer = slicer;
2058 hist->result_cb = result_cb; 2144 hist->result_cb = result_cb;
2059 hist->cls = cls; 2145 hist->cls = cls;
2060 hist->op_id = GNUNET_CLIENT_MANAGER_op_add (plc->client, 2146 hist->op_id = GNUNET_OP_add (plc->op, op_recv_history_result, hist, NULL);
2061 &op_recv_history_result, hist);
2062 2147
2063 GNUNET_assert (NULL != method_prefix); 2148 GNUNET_assert (NULL != method_prefix);
2064 uint16_t method_size = strnlen (method_prefix, 2149 uint16_t method_size = strnlen (method_prefix,
2065 GNUNET_SERVER_MAX_MESSAGE_SIZE 2150 GNUNET_SERVER_MAX_MESSAGE_SIZE
2066 - sizeof (*req)) + 1; 2151 - sizeof (*req)) + 1;
2067 GNUNET_assert ('\0' == method_prefix[method_size - 1]); 2152 GNUNET_assert ('\0' == method_prefix[method_size - 1]);
2068 req = GNUNET_malloc (sizeof (*req) + method_size); 2153
2069 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); 2154 struct GNUNET_MQ_Envelope *
2070 req->header.size = htons (sizeof (*req) + method_size); 2155 env = GNUNET_MQ_msg_extra (req, method_size,
2156 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY);
2071 req->start_message_id = GNUNET_htonll (start_message_id); 2157 req->start_message_id = GNUNET_htonll (start_message_id);
2072 req->end_message_id = GNUNET_htonll (end_message_id); 2158 req->end_message_id = GNUNET_htonll (end_message_id);
2073 req->message_limit = GNUNET_htonll (message_limit); 2159 req->message_limit = GNUNET_htonll (message_limit);
@@ -2075,8 +2161,7 @@ place_history_replay (struct GNUNET_SOCIAL_Place *plc,
2075 req->op_id = GNUNET_htonll (hist->op_id); 2161 req->op_id = GNUNET_htonll (hist->op_id);
2076 GNUNET_memcpy (&req[1], method_prefix, method_size); 2162 GNUNET_memcpy (&req[1], method_prefix, method_size);
2077 2163
2078 GNUNET_CLIENT_MANAGER_transmit (plc->client, &req->header); 2164 GNUNET_MQ_send (plc->mq, env);
2079 GNUNET_free (req);
2080 return hist; 2165 return hist;
2081} 2166}
2082 2167
@@ -2165,7 +2250,7 @@ GNUNET_SOCIAL_place_history_replay_latest (struct GNUNET_SOCIAL_Place *plc,
2165void 2250void
2166GNUNET_SOCIAL_place_history_replay_cancel (struct GNUNET_SOCIAL_HistoryRequest *hist) 2251GNUNET_SOCIAL_place_history_replay_cancel (struct GNUNET_SOCIAL_HistoryRequest *hist)
2167{ 2252{
2168 GNUNET_CLIENT_MANAGER_op_cancel (hist->plc->client, hist->op_id); 2253 GNUNET_OP_remove (hist->plc->op, hist->op_id);
2169 GNUNET_free (hist); 2254 GNUNET_free (hist);
2170} 2255}
2171 2256
@@ -2185,20 +2270,17 @@ place_state_get (struct GNUNET_SOCIAL_Place *plc,
2185 look->var_cb = var_cb; 2270 look->var_cb = var_cb;
2186 look->result_cb = result_cb; 2271 look->result_cb = result_cb;
2187 look->cls = cls; 2272 look->cls = cls;
2188 look->op_id = GNUNET_CLIENT_MANAGER_op_add (plc->client, 2273 look->op_id = GNUNET_OP_add (plc->op, &op_recv_state_result, look, NULL);
2189 &op_recv_state_result, look);
2190 2274
2191 GNUNET_assert (NULL != name); 2275 GNUNET_assert (NULL != name);
2192 size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE 2276 size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE
2193 - sizeof (*req)) + 1; 2277 - sizeof (*req)) + 1;
2194 req = GNUNET_malloc (sizeof (*req) + name_size); 2278 struct GNUNET_MQ_Envelope *
2195 req->header.type = htons (type); 2279 env = GNUNET_MQ_msg_extra (req, name_size, type);
2196 req->header.size = htons (sizeof (*req) + name_size);
2197 req->op_id = GNUNET_htonll (look->op_id); 2280 req->op_id = GNUNET_htonll (look->op_id);
2198 GNUNET_memcpy (&req[1], name, name_size); 2281 GNUNET_memcpy (&req[1], name, name_size);
2199 2282
2200 GNUNET_CLIENT_MANAGER_transmit (plc->client, &req->header); 2283 GNUNET_MQ_send (plc->mq, env);
2201 GNUNET_free (req);
2202 return look; 2284 return look;
2203} 2285}
2204 2286
@@ -2265,7 +2347,7 @@ GNUNET_SOCIAL_place_look_for (struct GNUNET_SOCIAL_Place *plc,
2265void 2347void
2266GNUNET_SOCIAL_place_look_cancel (struct GNUNET_SOCIAL_LookHandle *look) 2348GNUNET_SOCIAL_place_look_cancel (struct GNUNET_SOCIAL_LookHandle *look)
2267{ 2349{
2268 GNUNET_CLIENT_MANAGER_op_cancel (look->plc->client, look->op_id); 2350 GNUNET_OP_remove (look->plc->op, look->op_id);
2269 GNUNET_free (look); 2351 GNUNET_free (look);
2270} 2352}
2271 2353
@@ -2331,14 +2413,14 @@ GNUNET_SOCIAL_zone_add_place (const struct GNUNET_SOCIAL_App *app,
2331 size_t name_size = strlen (name) + 1; 2413 size_t name_size = strlen (name) + 1;
2332 size_t password_size = strlen (password) + 1; 2414 size_t password_size = strlen (password) + 1;
2333 size_t relay_size = relay_count * sizeof (*relays); 2415 size_t relay_size = relay_count * sizeof (*relays);
2334 size_t preq_size = sizeof (*preq) + name_size + password_size + relay_size; 2416 size_t payload_size = name_size + password_size + relay_size;
2335 2417
2336 if (GNUNET_SERVER_MAX_MESSAGE_SIZE < preq_size) 2418 if (GNUNET_SERVER_MAX_MESSAGE_SIZE < sizeof (*preq) + payload_size)
2337 return GNUNET_SYSERR; 2419 return GNUNET_SYSERR;
2338 2420
2339 preq = GNUNET_malloc (preq_size); 2421 struct GNUNET_MQ_Envelope *
2340 preq->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_ZONE_ADD_PLACE); 2422 env = GNUNET_MQ_msg_extra (preq, payload_size,
2341 preq->header.size = htons (preq_size); 2423 GNUNET_MESSAGE_TYPE_SOCIAL_ZONE_ADD_PLACE);
2342 preq->expiration_time = GNUNET_htonll (expiration_time.abs_value_us); 2424 preq->expiration_time = GNUNET_htonll (expiration_time.abs_value_us);
2343 preq->ego_pub_key = ego->pub_key; 2425 preq->ego_pub_key = ego->pub_key;
2344 preq->place_pub_key = *place_pub_key; 2426 preq->place_pub_key = *place_pub_key;
@@ -2357,10 +2439,11 @@ GNUNET_SOCIAL_zone_add_place (const struct GNUNET_SOCIAL_App *app,
2357 add_plc->result_cb = result_cb; 2439 add_plc->result_cb = result_cb;
2358 add_plc->result_cls = result_cls; 2440 add_plc->result_cls = result_cls;
2359 2441
2360 preq->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (app->client, 2442 preq->op_id = GNUNET_htonll (GNUNET_OP_add (app->op,
2361 op_recv_zone_add_place_result, 2443 op_recv_zone_add_place_result,
2362 add_plc)); 2444 add_plc, NULL));
2363 GNUNET_CLIENT_MANAGER_transmit_now (app->client, &preq->header); 2445
2446 GNUNET_MQ_send (app->mq, env);
2364 return GNUNET_OK; 2447 return GNUNET_OK;
2365} 2448}
2366 2449
@@ -2376,7 +2459,6 @@ op_recv_zone_add_nym_result (void *cls, int64_t result,
2376 if (NULL != add_nym->result_cb) 2459 if (NULL != add_nym->result_cb)
2377 add_nym->result_cb (add_nym->result_cls, result, err_msg, err_msg_size); 2460 add_nym->result_cb (add_nym->result_cls, result, err_msg, err_msg_size);
2378 2461
2379 GNUNET_free (add_nym->req);
2380 GNUNET_free (add_nym); 2462 GNUNET_free (add_nym);
2381} 2463}
2382 2464
@@ -2417,27 +2499,106 @@ GNUNET_SOCIAL_zone_add_nym (const struct GNUNET_SOCIAL_App *app,
2417 if (GNUNET_SERVER_MAX_MESSAGE_SIZE < sizeof (*nreq) + name_size) 2499 if (GNUNET_SERVER_MAX_MESSAGE_SIZE < sizeof (*nreq) + name_size)
2418 return GNUNET_SYSERR; 2500 return GNUNET_SYSERR;
2419 2501
2420 nreq = GNUNET_malloc (sizeof (*nreq) + name_size); 2502 struct GNUNET_MQ_Envelope *
2421 nreq->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_ZONE_ADD_NYM); 2503 env = GNUNET_MQ_msg_extra (nreq, name_size,
2422 nreq->header.size = htons (sizeof (*nreq) + name_size); 2504 GNUNET_MESSAGE_TYPE_SOCIAL_ZONE_ADD_NYM);
2423 nreq->expiration_time = GNUNET_htonll (expiration_time.abs_value_us); 2505 nreq->expiration_time = GNUNET_htonll (expiration_time.abs_value_us);
2424 nreq->ego_pub_key = ego->pub_key; 2506 nreq->ego_pub_key = ego->pub_key;
2425 nreq->nym_pub_key = *nym_pub_key; 2507 nreq->nym_pub_key = *nym_pub_key;
2426 GNUNET_memcpy (&nreq[1], name, name_size); 2508 GNUNET_memcpy (&nreq[1], name, name_size);
2427 2509
2428 struct ZoneAddNymHandle * add_nym = GNUNET_malloc (sizeof (*add_nym)); 2510 struct ZoneAddNymHandle *add_nym = GNUNET_malloc (sizeof (*add_nym));
2429 add_nym->req = nreq;
2430 add_nym->result_cb = result_cb; 2511 add_nym->result_cb = result_cb;
2431 add_nym->result_cls = result_cls; 2512 add_nym->result_cls = result_cls;
2432 2513
2433 nreq->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (app->client, 2514 nreq->op_id = GNUNET_htonll (GNUNET_OP_add (app->op,
2434 op_recv_zone_add_nym_result, 2515 op_recv_zone_add_nym_result,
2435 add_nym)); 2516 add_nym, NULL));
2436 GNUNET_CLIENT_MANAGER_transmit_now (app->client, &nreq->header); 2517
2518 GNUNET_MQ_send (app->mq, env);
2437 return GNUNET_OK; 2519 return GNUNET_OK;
2438} 2520}
2439 2521
2440 2522
2523/*** APP ***/
2524
2525
2526static void
2527app_connect (struct GNUNET_SOCIAL_App *app);
2528
2529
2530static void
2531app_reconnect (void *cls)
2532{
2533 app_connect (cls);
2534}
2535
2536
2537/**
2538 * App client disconnected from service.
2539 *
2540 * Reconnect after backoff period.
2541 */
2542static void
2543app_disconnected (void *cls, enum GNUNET_MQ_Error error)
2544{
2545 struct GNUNET_SOCIAL_App *app = cls;
2546
2547 LOG (GNUNET_ERROR_TYPE_DEBUG,
2548 "App client disconnected (%d), re-connecting\n",
2549 (int) error);
2550 if (NULL != app->mq)
2551 {
2552 GNUNET_MQ_destroy (app->mq);
2553 app->mq = NULL;
2554 }
2555
2556 app->reconnect_task = GNUNET_SCHEDULER_add_delayed (app->reconnect_delay,
2557 app_reconnect,
2558 app);
2559 app->reconnect_delay = GNUNET_TIME_STD_BACKOFF (app->reconnect_delay);
2560}
2561
2562
2563static void
2564app_connect (struct GNUNET_SOCIAL_App *app)
2565{
2566 GNUNET_MQ_hd_var_size (app_ego,
2567 GNUNET_MESSAGE_TYPE_SOCIAL_APP_EGO,
2568 struct AppEgoMessage);
2569
2570 GNUNET_MQ_hd_fixed_size (app_ego_end,
2571 GNUNET_MESSAGE_TYPE_SOCIAL_APP_EGO_END,
2572 struct GNUNET_MessageHeader);
2573
2574 GNUNET_MQ_hd_var_size (app_place,
2575 GNUNET_MESSAGE_TYPE_SOCIAL_APP_PLACE,
2576 struct AppPlaceMessage);
2577
2578 GNUNET_MQ_hd_fixed_size (app_place_end,
2579 GNUNET_MESSAGE_TYPE_SOCIAL_APP_PLACE_END,
2580 struct GNUNET_MessageHeader);
2581
2582 GNUNET_MQ_hd_var_size (app_result,
2583 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
2584 struct GNUNET_OperationResultMessage);
2585
2586 struct GNUNET_MQ_MessageHandler handlers[] = {
2587 make_app_ego_handler (app),
2588 make_app_ego_end_handler (app),
2589 make_app_place_handler (app),
2590 make_app_place_end_handler (app),
2591 make_app_result_handler (app),
2592 GNUNET_MQ_handler_end ()
2593 };
2594
2595 app->mq = GNUNET_CLIENT_connecT (app->cfg, "social",
2596 handlers, app_disconnected, app);
2597 GNUNET_assert (NULL != app->mq);
2598 GNUNET_MQ_send_copy (app->mq, app->connect_env);
2599}
2600
2601
2441/** 2602/**
2442 * Connect application to the social service. 2603 * Connect application to the social service.
2443 * 2604 *
@@ -2482,21 +2643,16 @@ GNUNET_SOCIAL_app_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
2482 app->connected_cb = connected_cb; 2643 app->connected_cb = connected_cb;
2483 app->cb_cls = cls; 2644 app->cb_cls = cls;
2484 app->egos = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); 2645 app->egos = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
2485 app->client = GNUNET_CLIENT_MANAGER_connect (cfg, "social", 2646 app->op = GNUNET_OP_create ();
2486 app_handlers);
2487 GNUNET_CLIENT_MANAGER_set_user_context_ (app->client, app, sizeof (*app));
2488
2489 app->id = GNUNET_malloc (app_id_size); 2647 app->id = GNUNET_malloc (app_id_size);
2490 GNUNET_memcpy (app->id, id, app_id_size); 2648 GNUNET_memcpy (app->id, id, app_id_size);
2491 2649
2492 struct AppConnectRequest *creq = GNUNET_malloc (sizeof (*creq) + app_id_size); 2650 struct AppConnectRequest *creq;
2493 creq->header.size = htons (sizeof (*creq) + app_id_size); 2651 app->connect_env = GNUNET_MQ_msg_extra (creq, app_id_size,
2494 creq->header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_APP_CONNECT); 2652 GNUNET_MESSAGE_TYPE_SOCIAL_APP_CONNECT);
2495 GNUNET_memcpy (&creq[1], app->id, app_id_size); 2653 GNUNET_memcpy (&creq[1], app->id, app_id_size);
2496 2654
2497 app->connect_msg = &creq->header; 2655 app_connect (app);
2498 app_send_connect_msg (app);
2499
2500 return app; 2656 return app;
2501} 2657}
2502 2658
@@ -2516,8 +2672,15 @@ GNUNET_SOCIAL_app_disconnect (struct GNUNET_SOCIAL_App *app,
2516 GNUNET_ContinuationCallback disconnect_cb, 2672 GNUNET_ContinuationCallback disconnect_cb,
2517 void *disconnect_cls) 2673 void *disconnect_cls)
2518{ 2674{
2519 GNUNET_CLIENT_MANAGER_disconnect (app->client, GNUNET_NO, 2675 // FIXME: wait till queued messages are sent
2520 disconnect_cb, disconnect_cls); 2676 if (NULL != app->mq)
2677 {
2678 GNUNET_MQ_destroy (app->mq);
2679 app->mq = NULL;
2680 }
2681
2682 if (NULL != disconnect_cb)
2683 disconnect_cb (disconnect_cls);
2521} 2684}
2522 2685
2523 2686
@@ -2538,11 +2701,12 @@ void
2538GNUNET_SOCIAL_app_detach (struct GNUNET_SOCIAL_App *app, 2701GNUNET_SOCIAL_app_detach (struct GNUNET_SOCIAL_App *app,
2539 struct GNUNET_SOCIAL_Place *plc) 2702 struct GNUNET_SOCIAL_Place *plc)
2540{ 2703{
2541 struct AppDetachRequest dreq; 2704 struct AppDetachRequest *dreq;
2542 dreq.header.size = htons (sizeof (dreq)); 2705 struct GNUNET_MQ_Envelope *
2543 dreq.header.type = htons (GNUNET_MESSAGE_TYPE_SOCIAL_APP_DETACH); 2706 env = GNUNET_MQ_msg (dreq, GNUNET_MESSAGE_TYPE_SOCIAL_APP_DETACH);
2544 dreq.place_pub_key = plc->pub_key; 2707 dreq->place_pub_key = plc->pub_key;
2545 GNUNET_CLIENT_MANAGER_transmit_now (plc->client, &dreq.header); 2708
2709 GNUNET_MQ_send (app->mq, env);
2546} 2710}
2547 2711
2548 2712