aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/include/gnunet_multicast_service.h62
-rw-r--r--src/include/gnunet_protocols.h32
-rw-r--r--src/include/gnunet_psyc_service.h92
-rw-r--r--src/include/gnunet_social_service.h6
-rw-r--r--src/multicast/gnunet-service-multicast.c84
-rw-r--r--src/multicast/multicast.h46
-rw-r--r--src/multicast/multicast_api.c435
-rw-r--r--src/psyc/Makefile.am6
-rw-r--r--src/psyc/gnunet-service-psyc.c341
-rw-r--r--src/psyc/psyc.h11
-rw-r--r--src/psyc/psyc_api.c370
-rw-r--r--src/psyc/psyc_common.c100
-rw-r--r--src/psyc/test_psyc.c231
-rw-r--r--src/psycstore/plugin_psycstore_sqlite.c2
14 files changed, 1288 insertions, 530 deletions
diff --git a/src/include/gnunet_multicast_service.h b/src/include/gnunet_multicast_service.h
index 0abd803a4..aa5597fd3 100644
--- a/src/include/gnunet_multicast_service.h
+++ b/src/include/gnunet_multicast_service.h
@@ -160,8 +160,62 @@ struct GNUNET_MULTICAST_MessageHeader
160 /* Followed by message body. */ 160 /* Followed by message body. */
161}; 161};
162 162
163
164/**
165 * Header of a request from a member to the origin.
166 */
167struct GNUNET_MULTICAST_RequestHeader
168{
169 /**
170 * Header for all requests from a member to the origin.
171 */
172 struct GNUNET_MessageHeader header;
173
174 /**
175 * Public key of the sending member.
176 */
177 struct GNUNET_CRYPTO_EddsaPublicKey member_key;
178
179 /**
180 * ECC signature of the request fragment.
181 *
182 * Signature must match the public key of the multicast group.
183 */
184 struct GNUNET_CRYPTO_EddsaSignature signature;
185
186 /**
187 * Purpose for the signature and size of the signed data.
188 */
189 struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
190
191 /**
192 * Number of the request fragment, monotonically increasing.
193 */
194 uint64_t fragment_id GNUNET_PACKED;
195
196 /**
197 * Byte offset of this @e fragment of the @e request.
198 */
199 uint64_t fragment_offset GNUNET_PACKED;
200
201 /**
202 * Number of the request this fragment belongs to.
203 *
204 * Set in GNUNET_MULTICAST_origin_to_all().
205 */
206 uint64_t request_id GNUNET_PACKED;
207
208 /**
209 * Flags for this request.
210 */
211 enum GNUNET_MULTICAST_MessageFlags flags GNUNET_PACKED;
212
213 /* Followed by request body. */
214};
215
163GNUNET_NETWORK_STRUCT_END 216GNUNET_NETWORK_STRUCT_END
164 217
218
165/** 219/**
166 * Maximum size of a multicast message fragment. 220 * Maximum size of a multicast message fragment.
167 */ 221 */
@@ -492,7 +546,7 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
492 * @param next_fragment_id Next fragment ID to continue counting fragments from 546 * @param next_fragment_id Next fragment ID to continue counting fragments from
493 * when restarting the origin. 1 for a new group. 547 * when restarting the origin. 1 for a new group.
494 * @param join_cb Function called to approve / disapprove joining of a peer. 548 * @param join_cb Function called to approve / disapprove joining of a peer.
495 * @param mem_test_cb Function multicast can use to test group membership. 549 * @param member_test_cb Function multicast can use to test group membership.
496 * @param replay_frag_cb Function that can be called to replay a message fragment. 550 * @param replay_frag_cb Function that can be called to replay a message fragment.
497 * @param replay_msg_cb Function that can be called to replay a message. 551 * @param replay_msg_cb Function that can be called to replay a message.
498 * @param request_cb Function called with message fragments from group members. 552 * @param request_cb Function called with message fragments from group members.
@@ -507,7 +561,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
507 const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key, 561 const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key,
508 uint64_t next_fragment_id, 562 uint64_t next_fragment_id,
509 GNUNET_MULTICAST_JoinCallback join_cb, 563 GNUNET_MULTICAST_JoinCallback join_cb,
510 GNUNET_MULTICAST_MembershipTestCallback mem_test_cb, 564 GNUNET_MULTICAST_MembershipTestCallback member_test_cb,
511 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb, 565 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
512 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb, 566 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
513 GNUNET_MULTICAST_RequestCallback request_cb, 567 GNUNET_MULTICAST_RequestCallback request_cb,
@@ -756,14 +810,14 @@ struct GNUNET_MULTICAST_MemberRequestHandle;
756 * Send a message to the origin of the multicast group. 810 * Send a message to the origin of the multicast group.
757 * 811 *
758 * @param member Membership handle. 812 * @param member Membership handle.
759 * @param message_id Application layer ID for the message. Opaque to multicast. 813 * @param request_id Application layer ID for the request. Opaque to multicast.
760 * @param notify Callback to call to get the message. 814 * @param notify Callback to call to get the message.
761 * @param notify_cls Closure for @a notify. 815 * @param notify_cls Closure for @a notify.
762 * @return Handle to cancel request, NULL on error (i.e. request already pending). 816 * @return Handle to cancel request, NULL on error (i.e. request already pending).
763 */ 817 */
764struct GNUNET_MULTICAST_MemberRequestHandle * 818struct GNUNET_MULTICAST_MemberRequestHandle *
765GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *member, 819GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *member,
766 uint64_t message_id, 820 uint64_t request_id,
767 GNUNET_MULTICAST_MemberTransmitNotify notify, 821 GNUNET_MULTICAST_MemberTransmitNotify notify,
768 void *notify_cls); 822 void *notify_cls);
769 823
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 65ecb22e6..88cf5a024 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2279,14 +2279,34 @@ extern "C"
2279/* WIP: no numbers assigned yet */ 2279/* WIP: no numbers assigned yet */
2280 2280
2281/** 2281/**
2282 * Start an origin.
2283 */
2284#define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START 750
2285
2286/**
2287 * Stop an origin.
2288 */
2289#define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_STOP 751
2290
2291/**
2292 * Join a group as a member.
2293 */
2294#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN 752
2295
2296/**
2297 * Leave a group.
2298 */
2299#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_PART 753
2300
2301/**
2282 * Multicast message from the origin to all members. 2302 * Multicast message from the origin to all members.
2283 */ 2303 */
2284#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 750 2304#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 754
2285 2305
2286/** 2306/**
2287 * A unicast message from a group member to the origin. 2307 * A unicast message from a group member to the origin.
2288 */ 2308 */
2289#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 2309#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 755
2290 2310
2291/** 2311/**
2292 * A peer wants to join the group. 2312 * A peer wants to join the group.
@@ -2366,14 +2386,6 @@ extern "C"
2366 2386
2367 2387
2368/******************************************************************************* 2388/*******************************************************************************
2369 * PSYC message types
2370 ******************************************************************************/
2371
2372/*******************************************************************************
2373 * PSYCSTORE message types
2374 ******************************************************************************/
2375
2376/*******************************************************************************
2377 * SOCIAL message types 2389 * SOCIAL message types
2378 ******************************************************************************/ 2390 ******************************************************************************/
2379 2391
diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h
index f843fbe1f..8f1707854 100644
--- a/src/include/gnunet_psyc_service.h
+++ b/src/include/gnunet_psyc_service.h
@@ -110,7 +110,7 @@ enum GNUNET_PSYC_ChannelFlags
110 * Past messages are only available to slaves who were admitted at the time 110 * Past messages are only available to slaves who were admitted at the time
111 * they were sent to the channel. 111 * they were sent to the channel.
112 */ 112 */
113 GNUNET_PSYC_CHANNEL_RESTRICTED_HISTORY = 1 << 1, 113 GNUNET_PSYC_CHANNEL_RESTRICTED_HISTORY = 1 << 1
114}; 114};
115 115
116/** 116/**
@@ -132,7 +132,7 @@ enum GNUNET_PSYC_Policy
132 */ 132 */
133 GNUNET_PSYC_CHANNEL_PRIVATE 133 GNUNET_PSYC_CHANNEL_PRIVATE
134 = GNUNET_PSYC_CHANNEL_ADMISSION_CONTROL 134 = GNUNET_PSYC_CHANNEL_ADMISSION_CONTROL
135 | GNUNET_PSYC_CHANNEL_RESTRICTED_HISTORY, 135 | GNUNET_PSYC_CHANNEL_RESTRICTED_HISTORY
136 136
137#if IDEAS_FOR_FUTURE 137#if IDEAS_FOR_FUTURE
138 /** 138 /**
@@ -152,9 +152,7 @@ enum GNUNET_PSYC_Policy
152 */ 152 */
153 GNUNET_PSYC_CHANNEL_CLOSED 153 GNUNET_PSYC_CHANNEL_CLOSED
154 = GNUNET_PSYC_CHANNEL_ADMISSION_CONTROL, 154 = GNUNET_PSYC_CHANNEL_ADMISSION_CONTROL,
155,
156#endif 155#endif
157
158}; 156};
159 157
160 158
@@ -163,7 +161,12 @@ enum GNUNET_PSYC_MessageFlags
163 /** 161 /**
164 * Historic message, retrieved from PSYCstore. 162 * Historic message, retrieved from PSYCstore.
165 */ 163 */
166 GNUNET_PSYC_MESSAGE_HISTORIC = 1 164 GNUNET_PSYC_MESSAGE_HISTORIC = 1 << 0,
165
166 /**
167 * Request from slave to master.
168 */
169 GNUNET_PSYC_MESSAGE_REQUEST = 1 << 1
167}; 170};
168 171
169GNUNET_NETWORK_STRUCT_BEGIN 172GNUNET_NETWORK_STRUCT_BEGIN
@@ -406,7 +409,7 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
406/** 409/**
407 * Function called to provide data for a transmission via PSYC. 410 * Function called to provide data for a transmission via PSYC.
408 * 411 *
409 * Note that returning #GNUNET_OK or #GNUNET_SYSERR (but not #GNUNET_NO) 412 * Note that returning #GNUNET_YES or #GNUNET_SYSERR (but not #GNUNET_NO)
410 * invalidates the respective transmission handle. 413 * invalidates the respective transmission handle.
411 * 414 *
412 * @param cls Closure. 415 * @param cls Closure.
@@ -422,15 +425,43 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
422 * #GNUNET_YES if this completes the transmission (all data supplied) 425 * #GNUNET_YES if this completes the transmission (all data supplied)
423 */ 426 */
424typedef int 427typedef int
425(*GNUNET_PSYC_MasterTransmitNotify) (void *cls, 428(*GNUNET_PSYC_TransmitNotifyData) (void *cls,
426 uint16_t *data_size, 429 uint16_t *data_size,
427 void *data); 430 void *data);
428 431
432/**
433 * Function called to provide a modifier for a transmission via PSYC.
434 *
435 * Note that returning #GNUNET_YES or #GNUNET_SYSERR (but not #GNUNET_NO)
436 * invalidates the respective transmission handle.
437 *
438 * @param cls Closure.
439 * @param[in,out] data_size Initially set to the number of bytes available in
440 * @a data, should be set to the number of bytes written to data.
441 * @param[out] data Where to write the modifier's name and value.
442 * The function must copy at most @a data_size bytes to @a data.
443 * When this callback is first called for a modifier, @a data should
444 * contain: "name\0value". If the whole value does not fit, subsequent
445 * calls to this function should write continuations of the value to
446 * @a data.
447 * @param oper Where to write the operator of the modifier. Only needed during
448 * the first call to this callback at the beginning of the modifier.
449 * In case of subsequent calls asking for value continuations @a oper is
450 * set to #NULL.
451 * @return #GNUNET_SYSERR on error (fatal, aborts transmission)
452 * #GNUNET_NO on success, if more data is to be transmitted later.
453 * Should be used if @a data_size was not big enough to take all the
454 * data for the modifier's value (the name must be always returned
455 * during the first call to this callback).
456 * If 0 is returned in @a data_size the transmission is paused,
457 * and can be resumed with GNUNET_PSYC_master_transmit_resume().
458 * #GNUNET_YES if this completes the modifier (the whole value is supplied).
459 */
429typedef int 460typedef int
430(*GNUNET_PSYC_MasterTransmitNotifyModifier) (void *cls, 461(*GNUNET_PSYC_TransmitNotifyModifier) (void *cls,
431 uint16_t *data_size, 462 uint16_t *data_size,
432 void *data, 463 void *data,
433 uint8_t *oper); 464 uint8_t *oper);
434 465
435/** 466/**
436 * Flags for transmitting messages to a channel by the master. 467 * Flags for transmitting messages to a channel by the master.
@@ -477,8 +508,8 @@ struct GNUNET_PSYC_MasterTransmitHandle;
477struct GNUNET_PSYC_MasterTransmitHandle * 508struct GNUNET_PSYC_MasterTransmitHandle *
478GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, 509GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
479 const char *method_name, 510 const char *method_name,
480 GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod, 511 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
481 GNUNET_PSYC_MasterTransmitNotify notify_data, 512 GNUNET_PSYC_TransmitNotifyData notify_data,
482 void *notify_cls, 513 void *notify_cls,
483 enum GNUNET_PSYC_MasterTransmitFlags flags); 514 enum GNUNET_PSYC_MasterTransmitFlags flags);
484 515
@@ -588,29 +619,6 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave);
588 619
589 620
590/** 621/**
591 * Function called to provide data for a transmission to the channel master
592 * (a.k.a. the @e host of the channel).
593 *
594 * Note that returning #GNUNET_OK or #GNUNET_SYSERR (but not #GNUNET_NO)
595 * invalidates the respective transmission handle.
596 *
597 * @param cls Closure.
598 * @param[in,out] data_size Initially set to the number of bytes available in
599 * @a data, should be set to the number of bytes written to data
600 * (IN/OUT).
601 * @param[out] data Where to write the body of the message to give to the method;
602 * function must copy at most @a *data_size bytes to @a data.
603 * @return #GNUNET_SYSERR on error (fatal, aborts transmission).
604 * #GNUNET_NO on success, if more data is to be transmitted later.
605 * #GNUNET_YES if this completes the transmission (all data supplied).
606 */
607typedef int
608(*GNUNET_PSYC_SlaveTransmitNotify) (void *cls,
609 size_t *data_size,
610 char *data);
611
612
613/**
614 * Flags for transmitting messages to the channel master by a slave. 622 * Flags for transmitting messages to the channel master by a slave.
615 */ 623 */
616enum GNUNET_PSYC_SlaveTransmitFlags 624enum GNUNET_PSYC_SlaveTransmitFlags
@@ -630,8 +638,8 @@ struct GNUNET_PSYC_SlaveTransmitHandle;
630 * 638 *
631 * @param slave Slave handle. 639 * @param slave Slave handle.
632 * @param method_name Which (PSYC) method should be invoked (on host). 640 * @param method_name Which (PSYC) method should be invoked (on host).
633 * @param env Environment containing transient variables for the message, or NULL. 641 * @param notify_mod Function to call to obtain modifiers.
634 * @param notify Function to call when we are allowed to transmit (to get data). 642 * @param notify_data Function to call to obtain fragments of the data.
635 * @param notify_cls Closure for @a notify. 643 * @param notify_cls Closure for @a notify.
636 * @param flags Flags for the message being transmitted. 644 * @param flags Flags for the message being transmitted.
637 * @return Transmission handle, NULL on error (i.e. more than one request queued). 645 * @return Transmission handle, NULL on error (i.e. more than one request queued).
@@ -639,8 +647,8 @@ struct GNUNET_PSYC_SlaveTransmitHandle;
639struct GNUNET_PSYC_SlaveTransmitHandle * 647struct GNUNET_PSYC_SlaveTransmitHandle *
640GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, 648GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
641 const char *method_name, 649 const char *method_name,
642 const struct GNUNET_ENV_Environment *env, 650 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
643 GNUNET_PSYC_SlaveTransmitNotify notify, 651 GNUNET_PSYC_TransmitNotifyData notify_data,
644 void *notify_cls, 652 void *notify_cls,
645 enum GNUNET_PSYC_SlaveTransmitFlags flags); 653 enum GNUNET_PSYC_SlaveTransmitFlags flags);
646 654
diff --git a/src/include/gnunet_social_service.h b/src/include/gnunet_social_service.h
index 8bd1a959f..98ad88346 100644
--- a/src/include/gnunet_social_service.h
+++ b/src/include/gnunet_social_service.h
@@ -375,12 +375,11 @@ GNUNET_SOCIAL_home_announce_cancel (struct GNUNET_SOCIAL_Announcement *a);
375 * Convert our home to a place so we can access it via the place API. 375 * Convert our home to a place so we can access it via the place API.
376 * 376 *
377 * @param home Handle for the home. 377 * @param home Handle for the home.
378 * @param keep_active Keep home active after last application disconnected.
379 * @return Place handle for the same home, valid as long as @a home is valid; 378 * @return Place handle for the same home, valid as long as @a home is valid;
380 * do NOT try to GNUNET_SOCIAL_place_leave() this place, it's your home! 379 * do NOT try to GNUNET_SOCIAL_place_leave() this place, it's your home!
381 */ 380 */
382struct GNUNET_SOCIAL_Place * 381struct GNUNET_SOCIAL_Place *
383GNUNET_SOCIAL_home_get_place (struct GNUNET_SOCIAL_Home *home, int keep_active); 382GNUNET_SOCIAL_home_get_place (struct GNUNET_SOCIAL_Home *home);
384 383
385 384
386/** 385/**
@@ -390,9 +389,10 @@ GNUNET_SOCIAL_home_get_place (struct GNUNET_SOCIAL_Home *home, int keep_active);
390 * Guests will be disconnected until the home is restarted. 389 * Guests will be disconnected until the home is restarted.
391 * 390 *
392 * @param home Home to leave. 391 * @param home Home to leave.
392 * @param keep_active Keep home active after last application disconnected.
393 */ 393 */
394void 394void
395GNUNET_SOCIAL_home_leave (struct GNUNET_SOCIAL_Home *home); 395GNUNET_SOCIAL_home_leave (struct GNUNET_SOCIAL_Home *home, int keep_active);
396 396
397/** 397/**
398 * Request entry to a place (home hosted by someone else). 398 * Request entry to a place (home hosted by someone else).
diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c
index f9aa7fe2d..47c8bce36 100644
--- a/src/multicast/gnunet-service-multicast.c
+++ b/src/multicast/gnunet-service-multicast.c
@@ -41,6 +41,71 @@ cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
41 41
42 42
43/** 43/**
44 * Handle a connecting client starting an origin.
45 */
46static void
47handle_origin_start (void *cls, struct GNUNET_SERVER_Client *client,
48 const struct GNUNET_MessageHeader *msg)
49{
50
51}
52
53
54/**
55 * Handle a client stopping an origin.
56 */
57static void
58handle_origin_stop (void *cls, struct GNUNET_SERVER_Client *client,
59 const struct GNUNET_MessageHeader *msg)
60{
61
62}
63
64
65/**
66 * Handle a connecting client joining a group.
67 */
68static void
69handle_member_join (void *cls, struct GNUNET_SERVER_Client *client,
70 const struct GNUNET_MessageHeader *msg)
71{
72
73}
74
75
76/**
77 * Handle a client parting a group.
78 */
79static void
80handle_member_part (void *cls, struct GNUNET_SERVER_Client *client,
81 const struct GNUNET_MessageHeader *msg)
82{
83
84}
85
86
87/**
88 * Incoming message from a client.
89 */
90static void
91handle_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
92 const struct GNUNET_MessageHeader *msg)
93{
94
95}
96
97
98/**
99 * Incoming request from a client.
100 */
101static void
102handle_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
103 const struct GNUNET_MessageHeader *msg)
104{
105
106}
107
108/**
44 * Process multicast requests. 109 * Process multicast requests.
45 * 110 *
46 * @param cls closure 111 * @param cls closure
@@ -52,7 +117,24 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
52 const struct GNUNET_CONFIGURATION_Handle *cfg) 117 const struct GNUNET_CONFIGURATION_Handle *cfg)
53{ 118{
54 static const struct GNUNET_SERVER_MessageHandler handlers[] = { 119 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
55 /* FIXME: add handlers here! */ 120 { &handle_origin_start, NULL,
121 GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, 0 },
122
123 { &handle_origin_stop, NULL,
124 GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_STOP, 0 },
125
126 { &handle_member_join, NULL,
127 GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, 0 },
128
129 { &handle_member_part, NULL,
130 GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_PART, 0 },
131
132 { &handle_multicast_message, NULL,
133 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
134
135 { &handle_multicast_request, NULL,
136 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
137
56 {NULL, NULL, 0, 0} 138 {NULL, NULL, 0, 0}
57 }; 139 };
58 /* FIXME: do setup here */ 140 /* FIXME: do setup here */
diff --git a/src/multicast/multicast.h b/src/multicast/multicast.h
index 88cb2d99e..facf8f54e 100644
--- a/src/multicast/multicast.h
+++ b/src/multicast/multicast.h
@@ -22,6 +22,7 @@
22 * @file multicast/multicast.h 22 * @file multicast/multicast.h
23 * @brief multicast IPC messages 23 * @brief multicast IPC messages
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 * @author Gabor X Toth
25 */ 26 */
26#ifndef MULTICAST_H 27#ifndef MULTICAST_H
27#define MULTICAST_H 28#define MULTICAST_H
@@ -30,12 +31,52 @@ GNUNET_NETWORK_STRUCT_BEGIN
30 31
31 32
32/** 33/**
34 * Header of a join request sent to the origin or another member.
35 */
36struct GNUNET_MULTICAST_JoinRequest
37{
38 /**
39 * Header for the join request.
40 */
41 struct GNUNET_MessageHeader header;
42
43 /**
44 * ECC signature of the rest of the fields of the join request.
45 *
46 * Signature must match the public key of the joining member.
47 */
48 struct GNUNET_CRYPTO_EddsaSignature signature;
49
50 /**
51 * Purpose for the signature and size of the signed data.
52 */
53 struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
54
55 /**
56 * Public key of the target group.
57 */
58 struct GNUNET_CRYPTO_EddsaPublicKey group_key;
59
60 /**
61 * Public key of the joining member.
62 */
63 struct GNUNET_CRYPTO_EddsaPublicKey member_key;
64
65 /**
66 * Peer identity of the joining member.
67 */
68 struct GNUNET_PeerIdentity member_peer;
69
70 /* Followed by request body. */
71};
72
73
74/**
33 * Message sent from the client to the service to notify the service 75 * Message sent from the client to the service to notify the service
34 * about a join decision. 76 * about a join decision.
35 */ 77 */
36struct MulticastJoinDecisionMessage 78struct MulticastJoinDecisionMessage
37{ 79{
38
39 /** 80 /**
40 * 81 *
41 */ 82 */
@@ -329,9 +370,6 @@ struct MulticastUnicastToOriginCancelMessage
329}; 370};
330 371
331 372
332
333
334
335GNUNET_NETWORK_STRUCT_END 373GNUNET_NETWORK_STRUCT_END
336 374
337#endif 375#endif
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index bb6a57b58..25af614af 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -34,6 +34,19 @@
34 34
35 35
36/** 36/**
37 * Started origins.
38 * Group's pub_key_hash -> struct GNUNET_MULTICAST_Origin
39 */
40static struct GNUNET_CONTAINER_MultiHashMap *origins;
41
42/**
43 * Joined members.
44 * group_key_hash -> struct GNUNET_MULTICAST_Member
45 */
46static struct GNUNET_CONTAINER_MultiHashMap *members;
47
48
49/**
37 * Handle for a request to send a message to all multicast group members 50 * Handle for a request to send a message to all multicast group members
38 * (from the origin). 51 * (from the origin).
39 */ 52 */
@@ -49,13 +62,20 @@ struct GNUNET_MULTICAST_OriginMessageHandle
49}; 62};
50 63
51 64
65struct GNUNET_MULTICAST_Group
66{
67 uint8_t is_origin;
68};
69
52/** 70/**
53 * Handle for the origin of a multicast group. 71 * Handle for the origin of a multicast group.
54 */ 72 */
55struct GNUNET_MULTICAST_Origin 73struct GNUNET_MULTICAST_Origin
56{ 74{
57 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; 75 struct GNUNET_MULTICAST_Group grp;
76
58 struct GNUNET_MULTICAST_OriginMessageHandle msg_handle; 77 struct GNUNET_MULTICAST_OriginMessageHandle msg_handle;
78 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
59 79
60 GNUNET_MULTICAST_JoinCallback join_cb; 80 GNUNET_MULTICAST_JoinCallback join_cb;
61 GNUNET_MULTICAST_MembershipTestCallback mem_test_cb; 81 GNUNET_MULTICAST_MembershipTestCallback mem_test_cb;
@@ -66,6 +86,9 @@ struct GNUNET_MULTICAST_Origin
66 void *cls; 86 void *cls;
67 87
68 uint64_t next_fragment_id; 88 uint64_t next_fragment_id;
89
90 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
91 struct GNUNET_HashCode pub_key_hash;
69}; 92};
70 93
71 94
@@ -74,123 +97,176 @@ struct GNUNET_MULTICAST_Origin
74 */ 97 */
75struct GNUNET_MULTICAST_MemberRequestHandle 98struct GNUNET_MULTICAST_MemberRequestHandle
76{ 99{
100 GNUNET_MULTICAST_MemberTransmitNotify notify;
101 void *notify_cls;
102 struct GNUNET_MULTICAST_Member *member;
103
104 uint64_t request_id;
105 uint64_t fragment_offset;
77}; 106};
78 107
79 108
80/** 109/**
81 * Opaque handle for a multicast group member. 110 * Handle for a multicast group member.
82 */ 111 */
83struct GNUNET_MULTICAST_Member 112struct GNUNET_MULTICAST_Member
84{ 113{
114 struct GNUNET_MULTICAST_Group grp;
115
116 struct GNUNET_MULTICAST_MemberRequestHandle req_handle;
117
118 struct GNUNET_CRYPTO_EddsaPublicKey group_key;
119 struct GNUNET_CRYPTO_EddsaPrivateKey member_key;
120 struct GNUNET_PeerIdentity origin;
121 struct GNUNET_PeerIdentity relays;
122 uint32_t relay_count;
123 struct GNUNET_MessageHeader *join_request;
124 GNUNET_MULTICAST_JoinCallback join_cb;
125 GNUNET_MULTICAST_MembershipTestCallback member_test_cb;
126 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
127 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
128 GNUNET_MULTICAST_MessageCallback message_cb;
129 void *cls;
130
131 uint64_t next_fragment_id;
132 struct GNUNET_HashCode group_key_hash;
85}; 133};
86 134
87 135
88GNUNET_NETWORK_STRUCT_BEGIN 136/**
137 * Handle that identifies a join request.
138 *
139 * Used to match calls to #GNUNET_MULTICAST_JoinCallback to the
140 * corresponding calls to #GNUNET_MULTICAST_join_decision().
141 */
142struct GNUNET_MULTICAST_JoinHandle
143{
144};
145
89 146
90/** 147/**
91 * Header of a request from a member to the origin. 148 * Handle to pass back for the answer of a membership test.
92 */ 149 */
93struct GNUNET_MULTICAST_RequestHeader 150struct GNUNET_MULTICAST_MembershipTestHandle
94{ 151{
95 /** 152};
96 * Header for all requests from a member to the origin.
97 */
98 struct GNUNET_MessageHeader header;
99 153
100 /**
101 * Public key of the sending member.
102 */
103 struct GNUNET_CRYPTO_EddsaPublicKey member_key;
104 154
105 /** 155/**
106 * ECC signature of the request fragment. 156 * Opaque handle to a replay request from the multicast service.
107 * 157 */
108 * Signature must match the public key of the multicast group. 158struct GNUNET_MULTICAST_ReplayHandle
109 */ 159{
110 struct GNUNET_CRYPTO_EddsaSignature signature; 160};
111 161
112 /**
113 * Purpose for the signature and size of the signed data.
114 */
115 struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
116 162
117 /** 163/**
118 * Number of the request fragment, monotonically increasing. 164 * Handle for a replay request.
119 */ 165 */
120 uint64_t fragment_id GNUNET_PACKED; 166struct GNUNET_MULTICAST_MemberReplayHandle
167{
168};
121 169
122 /**
123 * Byte offset of this @e fragment of the @e request.
124 */
125 uint64_t fragment_offset GNUNET_PACKED;
126 170
127 /** 171/**
128 * Number of the request this fragment belongs to. 172 * Iterator callback for calling message callbacks for all groups.
129 * 173 */
130 * Set in GNUNET_MULTICAST_origin_to_all(). 174static int
131 */ 175message_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash,
132 uint64_t request_id GNUNET_PACKED; 176 void *group)
177{
178 const struct GNUNET_MessageHeader *msg = cls;
179 struct GNUNET_MULTICAST_Group *grp = group;
133 180
134 /** 181 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
135 * Flags for this request. 182 "Calling message callback for a message of type %u and size %u.\n",
136 */ 183 ntohs (msg->type), ntohs (msg->size));
137 enum GNUNET_MULTICAST_MessageFlags flags GNUNET_PACKED; 184
185 if (GNUNET_YES == grp->is_origin)
186 {
187 struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) grp;
188 orig->message_cb (orig->cls, msg);
189 }
190 else
191 {
192 struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) grp;
193 mem->message_cb (mem->cls, msg);
194 }
195
196 return GNUNET_YES;
197}
138 198
139 /* Followed by request body. */
140};
141 199
142/** 200/**
143 * Header of a join request sent to the origin or another member. 201 * Handle a multicast message from the service.
202 *
203 * Call message callbacks of all origins and members of the destination group.
204 *
205 * @param grp Destination group of the message.
206 * @param msg The message.
144 */ 207 */
145struct GNUNET_MULTICAST_JoinRequest 208static void
209handle_multicast_message (struct GNUNET_MULTICAST_Group *grp,
210 const struct GNUNET_MULTICAST_MessageHeader *msg)
146{ 211{
147 /** 212 struct GNUNET_HashCode *hash;
148 * Header for the join request.
149 */
150 struct GNUNET_MessageHeader header;
151 213
152 /** 214 if (GNUNET_YES == grp->is_origin)
153 * ECC signature of the rest of the fields of the join request. 215 {
154 * 216 struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) grp;
155 * Signature must match the public key of the joining member. 217 hash = &orig->pub_key_hash;
156 */ 218 }
157 struct GNUNET_CRYPTO_EddsaSignature signature; 219 else
158 220 {
159 /** 221 struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) grp;
160 * Purpose for the signature and size of the signed data. 222 hash = &mem->group_key_hash;
161 */ 223 }
162 struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
163 224
164 /** 225 if (origins != NULL)
165 * Public key of the target group. 226 GNUNET_CONTAINER_multihashmap_get_multiple (origins, hash, message_callback,
166 */ 227 (void *) msg);
167 struct GNUNET_CRYPTO_EddsaPublicKey group_key; 228 if (members != NULL)
229 GNUNET_CONTAINER_multihashmap_get_multiple (members, hash, message_callback,
230 (void *) msg);
231}
168 232
169 /**
170 * Public key of the joining member.
171 */
172 struct GNUNET_CRYPTO_EddsaPublicKey member_key;
173 233
174 /** 234/**
175 * Peer identity of the joining member. 235 * Iterator callback for calling request callbacks of origins.
176 */ 236 */
177 struct GNUNET_PeerIdentity member_peer; 237static int
238request_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash,
239 void *origin)
240{
241 const struct GNUNET_MULTICAST_RequestHeader *req = cls;
242 struct GNUNET_MULTICAST_Origin *orig = origin;
178 243
179 /* Followed by request body. */ 244 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
180}; 245 "Calling request callback for a request of type %u and size %u.\n",
246 ntohs (req->header.type), ntohs (req->header.size));
181 247
182GNUNET_NETWORK_STRUCT_END 248 orig->request_cb (orig->cls, &req->member_key,
249 (const struct GNUNET_MessageHeader *) req, 0);
250 return GNUNET_YES;
251}
183 252
184 253
185/** 254/**
186 * Handle that identifies a join request. 255 * Handle a multicast request from the service.
187 * 256 *
188 * Used to match calls to #GNUNET_MULTICAST_JoinCallback to the 257 * Call request callbacks of all origins of the destination group.
189 * corresponding calls to #GNUNET_MULTICAST_join_decision(). 258 *
259 * @param grp Destination group of the message.
260 * @param msg The message.
190 */ 261 */
191struct GNUNET_MULTICAST_JoinHandle 262static void
263handle_multicast_request (const struct GNUNET_HashCode *group_key_hash,
264 const struct GNUNET_MULTICAST_RequestHeader *req)
192{ 265{
193}; 266 if (NULL != origins)
267 GNUNET_CONTAINER_multihashmap_get_multiple (origins, group_key_hash,
268 request_callback, (void *) req);
269}
194 270
195 271
196/** 272/**
@@ -227,14 +303,6 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh,
227 303
228 304
229/** 305/**
230 * Handle to pass back for the answer of a membership test.
231 */
232struct GNUNET_MULTICAST_MembershipTestHandle
233{
234};
235
236
237/**
238 * Call informing multicast about the decision taken for a membership test. 306 * Call informing multicast about the decision taken for a membership test.
239 * 307 *
240 * @param mth Handle that was given for the query. 308 * @param mth Handle that was given for the query.
@@ -249,14 +317,6 @@ GNUNET_MULTICAST_membership_test_result (struct GNUNET_MULTICAST_MembershipTestH
249 317
250 318
251/** 319/**
252 * Opaque handle to a replay request from the multicast service.
253 */
254struct GNUNET_MULTICAST_ReplayHandle
255{
256};
257
258
259/**
260 * Replay a message fragment for the multicast group. 320 * Replay a message fragment for the multicast group.
261 * 321 *
262 * @param rh Replay handle identifying which replay operation was requested. 322 * @param rh Replay handle identifying which replay operation was requested.
@@ -340,6 +400,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
340 void *cls) 400 void *cls)
341{ 401{
342 struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig)); 402 struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
403 orig->grp.is_origin = GNUNET_YES;
343 orig->priv_key = *priv_key; 404 orig->priv_key = *priv_key;
344 orig->next_fragment_id = next_fragment_id; 405 orig->next_fragment_id = next_fragment_id;
345 orig->join_cb = join_cb; 406 orig->join_cb = join_cb;
@@ -349,11 +410,38 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
349 orig->request_cb = request_cb; 410 orig->request_cb = request_cb;
350 orig->message_cb = message_cb; 411 orig->message_cb = message_cb;
351 orig->cls = cls; 412 orig->cls = cls;
413
414 GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &orig->pub_key);
415 GNUNET_CRYPTO_hash (&orig->pub_key, sizeof (orig->pub_key),
416 &orig->pub_key_hash);
417
418 if (NULL == origins)
419 origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
420
421 GNUNET_CONTAINER_multihashmap_put (origins, &orig->pub_key_hash, orig,
422 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
423
424 /* FIXME: send ORIGIN_START to service */
425
352 return orig; 426 return orig;
353} 427}
354 428
355 429
356/* FIXME: for now just send back to the client what it sent. */ 430/**
431 * Stop a multicast group.
432 *
433 * @param origin Multicast group to stop.
434 */
435void
436GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig)
437{
438 GNUNET_CONTAINER_multihashmap_remove (origins, &orig->pub_key_hash, orig);
439 GNUNET_free (orig);
440}
441
442
443/* FIXME: for now just call clients' callbacks
444 * without sending anything to multicast. */
357static void 445static void
358schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 446schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
359{ 447{
@@ -371,7 +459,7 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc
371 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size) 459 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
372 { 460 {
373 LOG (GNUNET_ERROR_TYPE_ERROR, 461 LOG (GNUNET_ERROR_TYPE_ERROR,
374 "MasterTransmitNotify() returned error or invalid message size.\n"); 462 "OriginTransmitNotify() returned error or invalid message size.\n");
375 /* FIXME: handle error */ 463 /* FIXME: handle error */
376 return; 464 return;
377 } 465 }
@@ -401,19 +489,18 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc
401 return; 489 return;
402 } 490 }
403 491
404 /* FIXME: send msg to the service and only then call message_cb with the 492 /* FIXME: send msg to the service and only then call handle_multicast_message
405 * returned signed message. 493 * with the returned signed message.
406 * FIXME: Also send to local members in this group.
407 */ 494 */
408 orig->message_cb (orig->cls, (const struct GNUNET_MessageHeader *) msg); 495 handle_multicast_message (&orig->grp, msg);
409 496
410 if (GNUNET_NO == ret) 497 if (GNUNET_NO == ret)
411 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 498 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
412 (GNUNET_TIME_UNIT_SECONDS, 1), 499 (GNUNET_TIME_UNIT_SECONDS, 1),
413 schedule_origin_to_all, orig); 500 schedule_origin_to_all, orig);
414
415} 501}
416 502
503
417/** 504/**
418 * Send a message to the multicast group. 505 * Send a message to the multicast group.
419 * 506 *
@@ -439,6 +526,7 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *origin,
439 mh->notify = notify; 526 mh->notify = notify;
440 mh->notify_cls = notify_cls; 527 mh->notify_cls = notify_cls;
441 528
529 /* FIXME: remove delay, it's there only for testing */
442 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 530 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
443 (GNUNET_TIME_UNIT_SECONDS, 1), 531 (GNUNET_TIME_UNIT_SECONDS, 1),
444 schedule_origin_to_all, origin); 532 schedule_origin_to_all, origin);
@@ -470,18 +558,6 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginMessageHand
470 558
471 559
472/** 560/**
473 * Stop a multicast group.
474 *
475 * @param origin Multicast group to stop.
476 */
477void
478GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *origin)
479{
480 GNUNET_free (origin);
481}
482
483
484/**
485 * Join a multicast group. 561 * Join a multicast group.
486 * 562 *
487 * The entity joining is always the local peer. Further information about the 563 * The entity joining is always the local peer. Further information about the
@@ -531,24 +607,61 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
531 const struct GNUNET_PeerIdentity *relays, 607 const struct GNUNET_PeerIdentity *relays,
532 const struct GNUNET_MessageHeader *join_request, 608 const struct GNUNET_MessageHeader *join_request,
533 GNUNET_MULTICAST_JoinCallback join_cb, 609 GNUNET_MULTICAST_JoinCallback join_cb,
534 GNUNET_MULTICAST_MembershipTestCallback mem_test_cb, 610 GNUNET_MULTICAST_MembershipTestCallback member_test_cb,
535 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb, 611 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
536 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb, 612 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
537 GNUNET_MULTICAST_MessageCallback message_cb, 613 GNUNET_MULTICAST_MessageCallback message_cb,
538 void *cls) 614 void *cls)
539{ 615{
540 struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem)); 616 struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem));
617 mem->group_key = *group_key;
618 mem->member_key = *member_key;
619 mem->origin = *origin;
620 mem->relay_count = relay_count;
621 mem->relays = *relays;
622 mem->join_cb = join_cb;
623 mem->member_test_cb = member_test_cb;
624 mem->replay_frag_cb = replay_frag_cb;
625 mem->message_cb = message_cb;
626 mem->cls = cls;
627
628 if (NULL != join_request)
629 {
630 uint16_t size = ntohs (join_request->size);
631 mem->join_request = GNUNET_malloc (size);
632 memcpy (mem->join_request, join_request, size);
633 }
634
635 GNUNET_CRYPTO_hash (&mem->group_key, sizeof (mem->group_key), &mem->group_key_hash);
636
637 if (NULL == members)
638 members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
639
640 GNUNET_CONTAINER_multihashmap_put (members, &mem->group_key_hash, mem,
641 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
642
643 /* FIXME: send MEMBER_JOIN to service */
541 644
542 return mem; 645 return mem;
543} 646}
544 647
545 648
546/** 649/**
547 * Handle for a replay request. 650 * Part a multicast group.
651 *
652 * Disconnects from all group members and invalidates the @a member handle.
653 *
654 * An application-dependent part message can be transmitted beforehand using
655 * #GNUNET_MULTICAST_member_to_origin())
656 *
657 * @param member Membership handle.
548 */ 658 */
549struct GNUNET_MULTICAST_MemberReplayHandle 659void
660GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem)
550{ 661{
551}; 662 GNUNET_CONTAINER_multihashmap_remove (members, &mem->group_key_hash, mem);
663 GNUNET_free (mem);
664}
552 665
553 666
554/** 667/**
@@ -612,20 +725,62 @@ GNUNET_MULTICAST_member_replay_cancel (struct GNUNET_MULTICAST_MemberReplayHandl
612} 725}
613 726
614 727
615/** 728/* FIXME: for now just send back to the client what it sent. */
616 * Part a multicast group. 729static void
617 * 730schedule_member_to_origin (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
618 * Disconnects from all group members and invalidates the @a member handle.
619 *
620 * An application-dependent part message can be transmitted beforehand using
621 * #GNUNET_MULTICAST_member_to_origin())
622 *
623 * @param member Membership handle.
624 */
625void
626GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *member)
627{ 731{
628 GNUNET_free (member); 732 LOG (GNUNET_ERROR_TYPE_DEBUG, "schedule_member_to_origin()\n");
733 struct GNUNET_MULTICAST_Member *mem = cls;
734 struct GNUNET_MULTICAST_MemberRequestHandle *rh = &mem->req_handle;
735
736 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD;
737 char buf[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
738 struct GNUNET_MULTICAST_RequestHeader *req
739 = (struct GNUNET_MULTICAST_RequestHeader *) buf;
740 int ret = rh->notify (rh->notify_cls, &buf_size, &req[1]);
741
742 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
743 || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
744 {
745 LOG (GNUNET_ERROR_TYPE_ERROR,
746 "MemberTransmitNotify() returned error or invalid message size.\n");
747 /* FIXME: handle error */
748 return;
749 }
750
751 if (GNUNET_NO == ret && 0 == buf_size)
752 return; /* Transmission paused. */
753
754 req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
755 req->header.size = htons (sizeof (*req) + buf_size);
756 req->request_id = GNUNET_htonll (rh->request_id);
757
758 /* FIXME: add fragment ID and signature in the service instead of here */
759 req->fragment_id = GNUNET_ntohll (mem->next_fragment_id++);
760 req->fragment_offset = GNUNET_ntohll (rh->fragment_offset);
761 rh->fragment_offset += sizeof (*req) + buf_size;
762 req->purpose.size = htonl (sizeof (*req) + buf_size
763 - sizeof (req->header)
764 - sizeof (req->member_key)
765 - sizeof (req->signature));
766 req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
767
768 if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->member_key, &req->purpose,
769 &req->signature))
770 {
771 /* FIXME: handle error */
772 return;
773 }
774
775 /* FIXME: send req to the service and only then call handle_multicast_request
776 * with the returned request.
777 */
778 handle_multicast_request (&mem->group_key_hash, req);
779
780 if (GNUNET_NO == ret)
781 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
782 (GNUNET_TIME_UNIT_SECONDS, 1),
783 schedule_member_to_origin, mem);
629} 784}
630 785
631 786
@@ -633,18 +788,28 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *member)
633 * Send a message to the origin of the multicast group. 788 * Send a message to the origin of the multicast group.
634 * 789 *
635 * @param member Membership handle. 790 * @param member Membership handle.
636 * @param message_id Application layer ID for the message. Opaque to multicast. 791 * @param request_id Application layer ID for the request. Opaque to multicast.
637 * @param notify Callback to call to get the message. 792 * @param notify Callback to call to get the message.
638 * @param notify_cls Closure for @a notify. 793 * @param notify_cls Closure for @a notify.
639 * @return Handle to cancel request, NULL on error (i.e. request already pending). 794 * @return Handle to cancel request, NULL on error (i.e. request already pending).
640 */ 795 */
641struct GNUNET_MULTICAST_MemberRequestHandle * 796struct GNUNET_MULTICAST_MemberRequestHandle *
642GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *member, 797GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *member,
643 uint64_t message_id, 798 uint64_t request_id,
644 GNUNET_MULTICAST_MemberTransmitNotify notify, 799 GNUNET_MULTICAST_MemberTransmitNotify notify,
645 void *notify_cls) 800 void *notify_cls)
646{ 801{
647 return NULL; 802 struct GNUNET_MULTICAST_MemberRequestHandle *rh = &member->req_handle;
803 rh->member = member;
804 rh->request_id = request_id;
805 rh->notify = notify;
806 rh->notify_cls = notify_cls;
807
808 /* FIXME: remove delay, it's there only for testing */
809 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
810 (GNUNET_TIME_UNIT_SECONDS, 1),
811 schedule_member_to_origin, member);
812 return &member->req_handle;
648} 813}
649 814
650 815
diff --git a/src/psyc/Makefile.am b/src/psyc/Makefile.am
index 7860b3995..162b42b2b 100644
--- a/src/psyc/Makefile.am
+++ b/src/psyc/Makefile.am
@@ -21,7 +21,7 @@ lib_LTLIBRARIES = libgnunetpsyc.la
21 21
22libgnunetpsyc_la_SOURCES = \ 22libgnunetpsyc_la_SOURCES = \
23 psyc_api.c \ 23 psyc_api.c \
24 psyc.h 24 psyc_common.c
25libgnunetpsyc_la_LIBADD = \ 25libgnunetpsyc_la_LIBADD = \
26 $(top_builddir)/src/util/libgnunetutil.la \ 26 $(top_builddir)/src/util/libgnunetutil.la \
27 $(top_builddir)/src/env/libgnunetenv.la \ 27 $(top_builddir)/src/env/libgnunetenv.la \
@@ -39,7 +39,8 @@ libexec_PROGRAMS = \
39 gnunet-service-psyc 39 gnunet-service-psyc
40 40
41gnunet_service_psyc_SOURCES = \ 41gnunet_service_psyc_SOURCES = \
42 gnunet-service-psyc.c 42 gnunet-service-psyc.c \
43 psyc_common.c
43gnunet_service_psyc_LDADD = \ 44gnunet_service_psyc_LDADD = \
44 $(top_builddir)/src/statistics/libgnunetstatistics.la \ 45 $(top_builddir)/src/statistics/libgnunetstatistics.la \
45 $(top_builddir)/src/util/libgnunetutil.la \ 46 $(top_builddir)/src/util/libgnunetutil.la \
@@ -51,6 +52,7 @@ gnunet_service_psyc_DEPENDENCIES = \
51 $(top_builddir)/src/util/libgnunetutil.la \ 52 $(top_builddir)/src/util/libgnunetutil.la \
52 $(top_builddir)/src/multicast/libgnunetmulticast.la \ 53 $(top_builddir)/src/multicast/libgnunetmulticast.la \
53 $(top_builddir)/src/psycstore/libgnunetpsycstore.la 54 $(top_builddir)/src/psycstore/libgnunetpsycstore.la
55gnunet_service_psyc_CFLAGS = $(AM_CFLAGS)
54 56
55 57
56if HAVE_TESTING 58if HAVE_TESTING
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index e5de7dcda..dcb5031f1 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -56,6 +56,7 @@ static struct GNUNET_SERVER_NotificationContext *nc;
56static struct GNUNET_PSYCSTORE_Handle *store; 56static struct GNUNET_PSYCSTORE_Handle *store;
57 57
58/** 58/**
59 * All connected masters and slaves.
59 * Channel's pub_key_hash -> struct Channel 60 * Channel's pub_key_hash -> struct Channel
60 */ 61 */
61static struct GNUNET_CONTAINER_MultiHashMap *clients; 62static struct GNUNET_CONTAINER_MultiHashMap *clients;
@@ -105,6 +106,15 @@ struct Channel
105 106
106 uint8_t in_transmit; 107 uint8_t in_transmit;
107 uint8_t is_master; 108 uint8_t is_master;
109
110 /**
111 * Ready to receive messages from client.
112 */
113 uint8_t ready;
114
115 /**
116 * Client disconnected.
117 */
108 uint8_t disconnected; 118 uint8_t disconnected;
109}; 119};
110 120
@@ -116,7 +126,6 @@ struct Master
116 struct Channel channel; 126 struct Channel channel;
117 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; 127 struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
118 struct GNUNET_CRYPTO_EddsaPublicKey pub_key; 128 struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
119 struct GNUNET_HashCode pub_key_hash;
120 129
121 struct GNUNET_MULTICAST_Origin *origin; 130 struct GNUNET_MULTICAST_Origin *origin;
122 struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; 131 struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
@@ -144,6 +153,8 @@ struct Master
144 * @see enum GNUNET_PSYC_Policy 153 * @see enum GNUNET_PSYC_Policy
145 */ 154 */
146 uint32_t policy; 155 uint32_t policy;
156
157 struct GNUNET_HashCode pub_key_hash;
147}; 158};
148 159
149 160
@@ -155,24 +166,26 @@ struct Slave
155 struct Channel channel; 166 struct Channel channel;
156 struct GNUNET_CRYPTO_EddsaPrivateKey slave_key; 167 struct GNUNET_CRYPTO_EddsaPrivateKey slave_key;
157 struct GNUNET_CRYPTO_EddsaPublicKey chan_key; 168 struct GNUNET_CRYPTO_EddsaPublicKey chan_key;
158 struct GNUNET_HashCode chan_key_hash;
159 169
160 struct GNUNET_MULTICAST_Member *member; 170 struct GNUNET_MULTICAST_Member *member;
161 struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle; 171 struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
162 172
163 struct GNUNET_PeerIdentity origin; 173 struct GNUNET_PeerIdentity origin;
174
175 uint32_t relay_count;
164 struct GNUNET_PeerIdentity *relays; 176 struct GNUNET_PeerIdentity *relays;
177
165 struct GNUNET_MessageHeader *join_req; 178 struct GNUNET_MessageHeader *join_req;
166 179
167 uint64_t max_message_id; 180 uint64_t max_message_id;
168 uint64_t max_request_id; 181 uint64_t max_request_id;
169 182
170 uint32_t relay_count; 183 struct GNUNET_HashCode chan_key_hash;
171}; 184};
172 185
173 186
174static inline void 187static inline void
175transmit_message (struct Channel *ch); 188transmit_message (struct Channel *ch, uint8_t inc_msg_id);
176 189
177 190
178/** 191/**
@@ -235,14 +248,14 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
235 if (NULL == client) 248 if (NULL == client)
236 return; 249 return;
237 250
238 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
239
240 struct Channel *ch 251 struct Channel *ch
241 = GNUNET_SERVER_client_get_user_context (client, struct Channel); 252 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
253 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client disconnected\n", ch);
254
242 if (NULL == ch) 255 if (NULL == ch)
243 { 256 {
244 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 257 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
245 "User context is NULL in client_disconnect()\n"); 258 "%p User context is NULL in client_disconnect()\n", ch);
246 GNUNET_break (0); 259 GNUNET_break (0);
247 return; 260 return;
248 } 261 }
@@ -252,7 +265,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
252 /* Send pending messages to multicast before cleanup. */ 265 /* Send pending messages to multicast before cleanup. */
253 if (NULL != ch->tmit_head) 266 if (NULL != ch->tmit_head)
254 { 267 {
255 transmit_message (ch); 268 transmit_message (ch, GNUNET_NO);
256 } 269 }
257 else 270 else
258 { 271 {
@@ -311,53 +324,22 @@ fragment_store_result (void *cls, int64_t result, const char *err_msg)
311 324
312 325
313/** 326/**
314 * Iterator callback for sending a message to a client.
315 *
316 * @see message_cb()
317 */
318static int
319message_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash,
320 void *chan)
321{
322 const struct GNUNET_MessageHeader *msg = cls;
323 struct Channel *ch = chan;
324
325 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
326 "Sending message of type %u and size %u to client 0x%zx.\n",
327 ntohs (msg->type), ntohs (msg->size), ch->client);
328
329 GNUNET_SERVER_notification_context_add (nc, ch->client);
330 GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO);
331
332 return GNUNET_YES;
333}
334
335
336/**
337 * Incoming message fragment from multicast. 327 * Incoming message fragment from multicast.
338 * 328 *
339 * Store it using PSYCstore and send it to all clients of the channel. 329 * Store it using PSYCstore and send it to the client of the channel.
340 */ 330 */
341static void 331static void
342message_cb (void *cls, const struct GNUNET_MessageHeader *msg) 332message_cb (struct Channel *ch,
333 const struct GNUNET_CRYPTO_EddsaPublicKey *chan_key,
334 const struct GNUNET_HashCode *chan_key_hash,
335 const struct GNUNET_MessageHeader *msg)
343{ 336{
344 uint16_t type = ntohs (msg->type); 337 uint16_t type = ntohs (msg->type);
345 uint16_t size = ntohs (msg->size); 338 uint16_t size = ntohs (msg->size);
346 339
347 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 340 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
348 "Received message of type %u and size %u from multicast.\n", 341 "%p Received message of type %u and size %u from multicast.\n",
349 type, size); 342 ch, type, size);
350
351 struct Channel *ch = cls;
352 struct Master *mst = cls;
353 struct Slave *slv = cls;
354
355 /* const struct GNUNET_MULTICAST_MessageHeader *mmsg
356 = (const struct GNUNET_MULTICAST_MessageHeader *) msg; */
357 struct GNUNET_CRYPTO_EddsaPublicKey *chan_key
358 = ch->is_master ? &mst->pub_key : &slv->chan_key;
359 struct GNUNET_HashCode *chan_key_hash
360 = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash;
361 343
362 switch (type) 344 switch (type)
363 { 345 {
@@ -378,29 +360,19 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
378 360
379 const struct GNUNET_MULTICAST_MessageHeader *mmsg 361 const struct GNUNET_MULTICAST_MessageHeader *mmsg
380 = (const struct GNUNET_MULTICAST_MessageHeader *) msg; 362 = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
381 struct GNUNET_PSYC_MessageHeader *pmsg;
382
383 uint16_t size = ntohs (msg->size);
384 uint16_t psize = 0;
385 uint16_t pos = 0;
386 363
387 for (pos = 0; sizeof (*mmsg) + pos < size; pos += psize) 364 if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
365 (const char *) &mmsg[1]))
388 { 366 {
389 const struct GNUNET_MessageHeader *pmsg 367 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
390 = (const struct GNUNET_MessageHeader *) ((char *) &mmsg[1] + pos); 368 "%p Received message with invalid parts from multicast. "
391 psize = ntohs (pmsg->size); 369 "Dropping message.\n", ch);
392 if (psize < sizeof (*pmsg) || sizeof (*mmsg) + pos + psize > size) 370 GNUNET_break_op (0);
393 { 371 break;
394 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
395 "Received invalid message part of type %u and size %u "
396 "from multicast. Not sending to clients.\n",
397 ntohs (pmsg->type), psize);
398 GNUNET_break_op (0);
399 return;
400 }
401 } 372 }
402 373
403 psize = sizeof (*pmsg) + size - sizeof (*mmsg); 374 struct GNUNET_PSYC_MessageHeader *pmsg;
375 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
404 pmsg = GNUNET_malloc (psize); 376 pmsg = GNUNET_malloc (psize);
405 pmsg->header.size = htons (psize); 377 pmsg->header.size = htons (psize);
406 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); 378 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
@@ -408,39 +380,116 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
408 380
409 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); 381 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
410 382
411 GNUNET_CONTAINER_multihashmap_get_multiple (clients, chan_key_hash, 383 GNUNET_SERVER_notification_context_add (nc, ch->client);
412 message_to_client, 384 GNUNET_SERVER_notification_context_unicast (nc, ch->client,
413 (void *) pmsg); 385 (const struct GNUNET_MessageHeader *) pmsg,
386 GNUNET_NO);
414 GNUNET_free (pmsg); 387 GNUNET_free (pmsg);
415 break; 388 break;
416 } 389 }
417 default: 390 default:
418 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 391 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
419 "Discarding unknown message of type %u and size %u.\n", 392 "%p Dropping unknown message of type %u and size %u.\n",
420 type, size); 393 ch, type, size);
421 } 394 }
422} 395}
423 396
424 397
425/** 398/**
426 * Send a request received from multicast to a client. 399 * Incoming message fragment from multicast for a master.
427 */ 400 */
428static int 401static void
429request_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash, 402master_message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
430 void *chan) 403{
404 struct Master *mst = cls;
405 GNUNET_assert (NULL != mst);
406
407 struct GNUNET_CRYPTO_EddsaPublicKey *chan_key = &mst->pub_key;
408 struct GNUNET_HashCode *chan_key_hash = &mst->pub_key_hash;
409
410 message_cb (&mst->channel, chan_key, chan_key_hash, msg);
411}
412
413
414/**
415 * Incoming message fragment from multicast for a slave.
416 */
417static void
418slave_message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
431{ 419{
432 /* TODO */ 420 struct Slave *slv = cls;
421 GNUNET_assert (NULL != slv);
422
423 struct GNUNET_CRYPTO_EddsaPublicKey *chan_key = &slv->chan_key;
424 struct GNUNET_HashCode *chan_key_hash = &slv->chan_key_hash;
433 425
434 return GNUNET_YES; 426 message_cb (&slv->channel, chan_key, chan_key_hash, msg);
435} 427}
436 428
437 429
430/**
431 * Incoming request fragment from multicast for a master.
432 *
433 * @param cls Master.
434 * @param member_key Sending member's public key.
435 * @param msg The message.
436 * @param flags Request flags.
437 */
438static void 438static void
439request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, 439request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
440 const struct GNUNET_MessageHeader *req, 440 const struct GNUNET_MessageHeader *msg,
441 enum GNUNET_MULTICAST_MessageFlags flags) 441 enum GNUNET_MULTICAST_MessageFlags flags)
442{ 442{
443 struct Master *mst = cls;
444 struct Channel *ch = &mst->channel;
443 445
446 uint16_t type = ntohs (msg->type);
447 uint16_t size = ntohs (msg->size);
448
449 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
450 "%p Received request of type %u and size %u from multicast.\n",
451 ch, type, size);
452
453 switch (type)
454 {
455 case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
456 {
457 const struct GNUNET_MULTICAST_RequestHeader *req
458 = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
459
460 if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*req),
461 (const char *) &req[1]))
462 {
463 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
464 "%p Dropping message with invalid parts "
465 "received from multicast.\n", ch);
466 GNUNET_break_op (0);
467 break;
468 }
469
470 struct GNUNET_PSYC_MessageHeader *pmsg;
471 uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
472 pmsg = GNUNET_malloc (psize);
473 pmsg->header.size = htons (psize);
474 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
475 pmsg->message_id = req->request_id;
476 pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
477
478 memcpy (&pmsg[1], &req[1], size - sizeof (*req));
479
480 GNUNET_SERVER_notification_context_add (nc, ch->client);
481 GNUNET_SERVER_notification_context_unicast (nc, ch->client,
482 (const struct GNUNET_MessageHeader *) pmsg,
483 GNUNET_NO);
484 GNUNET_free (pmsg);
485 break;
486 }
487 default:
488 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
489 "%p Dropping unknown request of type %u and size %u.\n",
490 ch, type, size);
491 GNUNET_break_op (0);
492 }
444} 493}
445 494
446 495
@@ -470,7 +519,8 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
470 max_fragment_id + 1, 519 max_fragment_id + 1,
471 join_cb, membership_test_cb, 520 join_cb, membership_test_cb,
472 replay_fragment_cb, replay_message_cb, 521 replay_fragment_cb, replay_message_cb,
473 request_cb, message_cb, ch); 522 request_cb, master_message_cb, ch);
523 ch->ready = GNUNET_YES;
474 } 524 }
475 GNUNET_SERVER_notification_context_add (nc, ch->client); 525 GNUNET_SERVER_notification_context_add (nc, ch->client);
476 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, 526 GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
@@ -505,7 +555,8 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
505 slv->join_req, join_cb, 555 slv->join_req, join_cb,
506 membership_test_cb, 556 membership_test_cb,
507 replay_fragment_cb, replay_message_cb, 557 replay_fragment_cb, replay_message_cb,
508 message_cb, ch); 558 slave_message_cb, ch);
559 ch->ready = GNUNET_YES;
509 } 560 }
510 561
511 GNUNET_SERVER_notification_context_add (nc, ch->client); 562 GNUNET_SERVER_notification_context_add (nc, ch->client);
@@ -529,9 +580,11 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
529 mst->channel.is_master = GNUNET_YES; 580 mst->channel.is_master = GNUNET_YES;
530 mst->policy = ntohl (req->policy); 581 mst->policy = ntohl (req->policy);
531 mst->priv_key = req->channel_key; 582 mst->priv_key = req->channel_key;
532 GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, 583 GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &mst->pub_key);
533 &mst->pub_key);
534 GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash); 584 GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash);
585 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
586 "%p Master connected to channel %s.\n",
587 mst, GNUNET_h2s (&mst->pub_key_hash));
535 588
536 GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key, 589 GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key,
537 master_counters_cb, mst); 590 master_counters_cb, mst);
@@ -561,14 +614,20 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
561 &slv->chan_key_hash); 614 &slv->chan_key_hash);
562 slv->origin = req->origin; 615 slv->origin = req->origin;
563 slv->relay_count = ntohl (req->relay_count); 616 slv->relay_count = ntohl (req->relay_count);
617 if (0 < slv->relay_count)
618 {
619 const struct GNUNET_PeerIdentity *relays
620 = (const struct GNUNET_PeerIdentity *) &req[1];
621 slv->relays
622 = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
623 uint32_t i;
624 for (i = 0; i < slv->relay_count; i++)
625 memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
626 }
564 627
565 const struct GNUNET_PeerIdentity *relays 628 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
566 = (const struct GNUNET_PeerIdentity *) &req[1]; 629 "%p Slave connected to channel %s.\n",
567 slv->relays 630 slv, GNUNET_h2s (&slv->chan_key_hash));
568 = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
569 uint32_t i;
570 for (i = 0; i < slv->relay_count; i++)
571 memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
572 631
573 GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key, 632 GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key,
574 slave_counters_cb, slv); 633 slave_counters_cb, slv);
@@ -609,13 +668,14 @@ transmit_notify (void *cls, size_t *data_size, void *data)
609 668
610 if (NULL == tmit_msg || *data_size < tmit_msg->size) 669 if (NULL == tmit_msg || *data_size < tmit_msg->size)
611 { 670 {
612 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to send.\n"); 671 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
672 "%p transmit_notify: nothing to send.\n", ch);
613 *data_size = 0; 673 *data_size = 0;
614 return GNUNET_NO; 674 return GNUNET_NO;
615 } 675 }
616 676
617 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 677 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
618 "transmit_notify: sending %u bytes.\n", tmit_msg->size); 678 "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
619 679
620 *data_size = tmit_msg->size; 680 *data_size = tmit_msg->size;
621 memcpy (data, tmit_msg->buf, *data_size); 681 memcpy (data, tmit_msg->buf, *data_size);
@@ -630,7 +690,7 @@ transmit_notify (void *cls, size_t *data_size, void *data)
630 { 690 {
631 if (NULL != ch->tmit_head) 691 if (NULL != ch->tmit_head)
632 { 692 {
633 transmit_message (ch); 693 transmit_message (ch, GNUNET_NO);
634 } 694 }
635 else if (ch->disconnected) 695 else if (ch->disconnected)
636 { 696 {
@@ -644,19 +704,55 @@ transmit_notify (void *cls, size_t *data_size, void *data)
644 704
645 705
646/** 706/**
707 * Callback for the transmit functions of multicast.
708 */
709static int
710master_transmit_notify (void *cls, size_t *data_size, void *data)
711{
712 int ret = transmit_notify (cls, data_size, data);
713
714 if (GNUNET_YES == ret)
715 {
716 struct Master *mst = cls;
717 mst->tmit_handle = NULL;
718 }
719 return ret;
720}
721
722
723/**
724 * Callback for the transmit functions of multicast.
725 */
726static int
727slave_transmit_notify (void *cls, size_t *data_size, void *data)
728{
729 int ret = transmit_notify (cls, data_size, data);
730
731 if (GNUNET_YES == ret)
732 {
733 struct Slave *slv = cls;
734 slv->tmit_handle = NULL;
735 }
736 return ret;
737}
738
739
740/**
647 * Transmit a message from a channel master to the multicast group. 741 * Transmit a message from a channel master to the multicast group.
648 */ 742 */
649static void 743static void
650master_transmit_message (struct Master *mst) 744master_transmit_message (struct Master *mst, uint8_t inc_msg_id)
651{ 745{
652 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n"); 746 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
653 mst->channel.tmit_task = 0; 747 mst->channel.tmit_task = 0;
654 if (NULL == mst->tmit_handle) 748 if (NULL == mst->tmit_handle)
655 { 749 {
750 if (GNUNET_YES == inc_msg_id)
751 mst->max_message_id++;
656 mst->tmit_handle 752 mst->tmit_handle
657 = GNUNET_MULTICAST_origin_to_all (mst->origin, ++mst->max_message_id, 753 = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
658 mst->max_group_generation, 754 mst->max_group_generation,
659 transmit_notify, mst); 755 master_transmit_notify, mst);
660 } 756 }
661 else 757 else
662 { 758 {
@@ -669,14 +765,16 @@ master_transmit_message (struct Master *mst)
669 * Transmit a message from a channel slave to the multicast group. 765 * Transmit a message from a channel slave to the multicast group.
670 */ 766 */
671static void 767static void
672slave_transmit_message (struct Slave *slv) 768slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id)
673{ 769{
674 slv->channel.tmit_task = 0; 770 slv->channel.tmit_task = 0;
675 if (NULL == slv->tmit_handle) 771 if (NULL == slv->tmit_handle)
676 { 772 {
773 if (GNUNET_YES == inc_msg_id)
774 slv->max_message_id++;
677 slv->tmit_handle 775 slv->tmit_handle
678 = GNUNET_MULTICAST_member_to_origin(slv->member, ++slv->max_request_id, 776 = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
679 transmit_notify, slv); 777 slave_transmit_notify, slv);
680 } 778 }
681 else 779 else
682 { 780 {
@@ -686,11 +784,11 @@ slave_transmit_message (struct Slave *slv)
686 784
687 785
688static inline void 786static inline void
689transmit_message (struct Channel *ch) 787transmit_message (struct Channel *ch, uint8_t inc_msg_id)
690{ 788{
691 ch->is_master 789 ch->is_master
692 ? master_transmit_message ((struct Master *) ch) 790 ? master_transmit_message ((struct Master *) ch, inc_msg_id)
693 : slave_transmit_message ((struct Slave *) ch); 791 : slave_transmit_message ((struct Slave *) ch, inc_msg_id);
694} 792}
695 793
696 794
@@ -708,10 +806,9 @@ transmit_error (struct Channel *ch)
708 tmit_msg->size = sizeof (*msg); 806 tmit_msg->size = sizeof (*msg);
709 tmit_msg->state = ch->tmit_state; 807 tmit_msg->state = ch->tmit_state;
710 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); 808 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
711 transmit_message (ch); 809 transmit_message (ch, GNUNET_NO);
712 810
713 /* FIXME: cleanup */ 811 /* FIXME: cleanup */
714 GNUNET_SERVER_client_disconnect (ch->client);
715} 812}
716 813
717 814
@@ -720,40 +817,60 @@ transmit_error (struct Channel *ch)
720 */ 817 */
721static void 818static void
722handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, 819handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
723 const struct GNUNET_MessageHeader *msg) 820 const struct GNUNET_MessageHeader *msg)
724{ 821{
725 struct Channel *ch 822 struct Channel *ch
726 = GNUNET_SERVER_client_get_user_context (client, struct Channel); 823 = GNUNET_SERVER_client_get_user_context (client, struct Channel);
727 GNUNET_assert (NULL != ch); 824 GNUNET_assert (NULL != ch);
728 825
826 if (GNUNET_YES != ch->ready)
827 {
828 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
829 "%p Ignoring message from client, channel is not ready yet.\n",
830 ch);
831 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
832 return;
833 }
834
835 uint8_t inc_msg_id = GNUNET_NO;
729 uint16_t size = ntohs (msg->size); 836 uint16_t size = ntohs (msg->size);
730 uint16_t psize = 0, pos = 0; 837 uint16_t psize = 0, ptype = 0, pos = 0;
731 838
732 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) 839 if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
733 { 840 {
734 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Message payload too large\n"); 841 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
735 GNUNET_break (0); 842 GNUNET_break (0);
736 transmit_error (ch); 843 transmit_error (ch);
844 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
737 return; 845 return;
738 } 846 }
739 847
848 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
849 "%p Received message from client.\n", ch);
850 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
851
740 for (pos = 0; sizeof (*msg) + pos < size; pos += psize) 852 for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
741 { 853 {
742 const struct GNUNET_MessageHeader *pmsg 854 const struct GNUNET_MessageHeader *pmsg
743 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); 855 = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
744 psize = ntohs (pmsg->size); 856 psize = ntohs (pmsg->size);
745 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 857 ptype = ntohs (pmsg->type);
746 "Received message part of type %u and size %u "
747 "from client.\n", ntohs (pmsg->type), psize);
748 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) 858 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
749 { 859 {
750 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 860 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
751 "Received invalid message part of type %u and size %u " 861 "%p Received invalid message part of type %u and size %u "
752 "from client.\n", ntohs (pmsg->type), psize); 862 "from client.\n", ch, ptype, psize);
753 GNUNET_break (0); 863 GNUNET_break (0);
754 transmit_error (ch); 864 transmit_error (ch);
865 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
755 return; 866 return;
756 } 867 }
868 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
869 "%p Received message part from client.\n", ch);
870 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
871
872 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == ptype)
873 inc_msg_id = GNUNET_YES;
757 } 874 }
758 875
759 size -= sizeof (*msg); 876 size -= sizeof (*msg);
@@ -763,7 +880,7 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
763 tmit_msg->size = size; 880 tmit_msg->size = size;
764 tmit_msg->state = ch->tmit_state; 881 tmit_msg->state = ch->tmit_state;
765 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); 882 GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
766 transmit_message (ch); 883 transmit_message (ch, inc_msg_id);
767 884
768 GNUNET_SERVER_receive_done (client, GNUNET_OK); 885 GNUNET_SERVER_receive_done (client, GNUNET_OK);
769}; 886};
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h
index 90c07480a..1ffda5d08 100644
--- a/src/psyc/psyc.h
+++ b/src/psyc/psyc.h
@@ -27,7 +27,16 @@
27#ifndef PSYC_H 27#ifndef PSYC_H
28#define PSYC_H 28#define PSYC_H
29 29
30#include "gnunet_common.h" 30#include "platform.h"
31#include "gnunet_psyc_service.h"
32
33
34int
35GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data);
36
37void
38GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
39 const struct GNUNET_MessageHeader *msg);
31 40
32 41
33enum MessageState 42enum MessageState
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 20394bbce..8a1c9ffaa 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -48,12 +48,30 @@ struct OperationHandle
48 struct GNUNET_MessageHeader *msg; 48 struct GNUNET_MessageHeader *msg;
49}; 49};
50 50
51
52/**
53 * Handle for a pending PSYC transmission operation.
54 */
55struct GNUNET_PSYC_ChannelTransmitHandle
56{
57 struct GNUNET_PSYC_Channel *ch;
58 GNUNET_PSYC_TransmitNotifyModifier notify_mod;
59 GNUNET_PSYC_TransmitNotifyData notify_data;
60 void *notify_cls;
61 enum MessageState state;
62};
63
51/** 64/**
52 * Handle to access PSYC channel operations for both the master and slaves. 65 * Handle to access PSYC channel operations for both the master and slaves.
53 */ 66 */
54struct GNUNET_PSYC_Channel 67struct GNUNET_PSYC_Channel
55{ 68{
56 /** 69 /**
70 * Transmission handle;
71 */
72 struct GNUNET_PSYC_ChannelTransmitHandle tmit;
73
74 /**
57 * Configuration to use. 75 * Configuration to use.
58 */ 76 */
59 const struct GNUNET_CONFIGURATION_Handle *cfg; 77 const struct GNUNET_CONFIGURATION_Handle *cfg;
@@ -124,6 +142,11 @@ struct GNUNET_PSYC_Channel
124 uint64_t recv_message_id; 142 uint64_t recv_message_id;
125 143
126 /** 144 /**
145 * Public key of the slave from which a message is being received.
146 */
147 struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key;
148
149 /**
127 * State of the currently being received message from the PSYC service. 150 * State of the currently being received message from the PSYC service.
128 */ 151 */
129 enum MessageState recv_state; 152 enum MessageState recv_state;
@@ -171,27 +194,12 @@ struct GNUNET_PSYC_Channel
171 194
172 195
173/** 196/**
174 * Handle for a pending PSYC transmission operation.
175 */
176struct GNUNET_PSYC_MasterTransmitHandle
177{
178 struct GNUNET_PSYC_Master *master;
179 GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod;
180 GNUNET_PSYC_MasterTransmitNotify notify_data;
181 void *notify_cls;
182 enum MessageState state;
183};
184
185
186/**
187 * Handle for the master of a PSYC channel. 197 * Handle for the master of a PSYC channel.
188 */ 198 */
189struct GNUNET_PSYC_Master 199struct GNUNET_PSYC_Master
190{ 200{
191 struct GNUNET_PSYC_Channel ch; 201 struct GNUNET_PSYC_Channel ch;
192 202
193 struct GNUNET_PSYC_MasterTransmitHandle *tmit;
194
195 GNUNET_PSYC_MasterStartCallback start_cb; 203 GNUNET_PSYC_MasterStartCallback start_cb;
196 204
197 uint64_t max_message_id; 205 uint64_t max_message_id;
@@ -204,6 +212,10 @@ struct GNUNET_PSYC_Master
204struct GNUNET_PSYC_Slave 212struct GNUNET_PSYC_Slave
205{ 213{
206 struct GNUNET_PSYC_Channel ch; 214 struct GNUNET_PSYC_Channel ch;
215
216 GNUNET_PSYC_SlaveJoinCallback join_cb;
217
218 uint64_t max_message_id;
207}; 219};
208 220
209 221
@@ -251,7 +263,7 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
251 263
252 264
253static void 265static void
254master_transmit_data (struct GNUNET_PSYC_Master *mst); 266channel_transmit_data (struct GNUNET_PSYC_Channel *ch);
255 267
256 268
257/** 269/**
@@ -302,7 +314,8 @@ recv_reset (struct GNUNET_PSYC_Channel *ch)
302 ch->recv_state = MSG_STATE_START; 314 ch->recv_state = MSG_STATE_START;
303 ch->recv_flags = 0; 315 ch->recv_flags = 0;
304 ch->recv_message_id = 0; 316 ch->recv_message_id = 0;
305 ch->recv_mod_value_size =0; 317 //FIXME: ch->recv_slave_key = { 0 };
318 ch->recv_mod_value_size = 0;
306 ch->recv_mod_value_size_expected = 0; 319 ch->recv_mod_value_size_expected = 0;
307} 320}
308 321
@@ -379,8 +392,9 @@ queue_message (struct GNUNET_PSYC_Channel *ch,
379 } 392 }
380 393
381 if (NULL != op 394 if (NULL != op
382 && (end || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD 395 && (GNUNET_YES == end
383 < op->msg->size + sizeof (struct GNUNET_MessageHeader)))) 396 || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
397 < op->msg->size + sizeof (struct GNUNET_MessageHeader))))
384 { 398 {
385 /* End of message or buffer is full, add it to transmission queue. */ 399 /* End of message or buffer is full, add it to transmission queue. */
386 op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); 400 op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
@@ -390,6 +404,9 @@ queue_message (struct GNUNET_PSYC_Channel *ch,
390 ch->tmit_ack_pending++; 404 ch->tmit_ack_pending++;
391 } 405 }
392 406
407 if (GNUNET_YES == end)
408 ch->in_transmit = GNUNET_NO;
409
393 transmit_next (ch); 410 transmit_next (ch);
394} 411}
395 412
@@ -400,15 +417,14 @@ queue_message (struct GNUNET_PSYC_Channel *ch,
400 * @param mst Master handle. 417 * @param mst Master handle.
401 */ 418 */
402static void 419static void
403master_transmit_mod (struct GNUNET_PSYC_Master *mst) 420channel_transmit_mod (struct GNUNET_PSYC_Channel *ch)
404{ 421{
405 struct GNUNET_PSYC_Channel *ch = &mst->ch;
406 uint16_t max_data_size, data_size; 422 uint16_t max_data_size, data_size;
407 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; 423 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
408 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; 424 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
409 int notify_ret; 425 int notify_ret;
410 426
411 switch (mst->tmit->state) 427 switch (ch->tmit.state)
412 { 428 {
413 case MSG_STATE_MODIFIER: 429 case MSG_STATE_MODIFIER:
414 { 430 {
@@ -417,12 +433,11 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
417 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; 433 max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
418 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); 434 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
419 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); 435 msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
420 notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls, 436 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
421 &data_size, &mod[1], &mod->oper); 437 &data_size, &mod[1], &mod->oper);
422 mod->name_size = strnlen ((char *) &mod[1], data_size); 438 mod->name_size = strnlen ((char *) &mod[1], data_size);
423 if (mod->name_size < data_size) 439 if (mod->name_size < data_size)
424 { 440 {
425 mod->oper = htons (mod->oper);
426 mod->value_size = htons (data_size - 1 - mod->name_size); 441 mod->value_size = htons (data_size - 1 - mod->name_size);
427 mod->name_size = htons (mod->name_size); 442 mod->name_size = htons (mod->name_size);
428 } 443 }
@@ -438,8 +453,8 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
438 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; 453 max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
439 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); 454 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
440 msg->size = sizeof (struct GNUNET_MessageHeader); 455 msg->size = sizeof (struct GNUNET_MessageHeader);
441 notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls, 456 notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
442 &data_size, &msg[1], NULL); 457 &data_size, &msg[1], NULL);
443 break; 458 break;
444 } 459 }
445 default: 460 default:
@@ -454,27 +469,28 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
454 ch->tmit_paused = GNUNET_YES; 469 ch->tmit_paused = GNUNET_YES;
455 return; 470 return;
456 } 471 }
457 mst->tmit->state = MSG_STATE_MOD_CONT; 472 ch->tmit.state = MSG_STATE_MOD_CONT;
458 break; 473 break;
459 474
460 case GNUNET_YES: 475 case GNUNET_YES:
461 if (0 == data_size) 476 if (0 == data_size)
462 { 477 {
463 /* End of modifiers. */ 478 /* End of modifiers. */
464 mst->tmit->state = MSG_STATE_DATA; 479 ch->tmit.state = MSG_STATE_DATA;
465 if (0 == ch->tmit_ack_pending) 480 if (0 == ch->tmit_ack_pending)
466 master_transmit_data (mst); 481 channel_transmit_data (ch);
467 482
468 return; 483 return;
469 } 484 }
470 mst->tmit->state = MSG_STATE_MODIFIER; 485 ch->tmit.state = MSG_STATE_MODIFIER;
471 break; 486 break;
472 487
473 default: 488 default:
474 LOG (GNUNET_ERROR_TYPE_ERROR, 489 LOG (GNUNET_ERROR_TYPE_ERROR,
475 "MasterTransmitNotify returned error when requesting a modifier.\n"); 490 "MasterTransmitNotifyModifier returned error "
491 "when requesting a modifier.\n");
476 492
477 mst->tmit->state = MSG_STATE_START; 493 ch->tmit.state = MSG_STATE_CANCEL;
478 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); 494 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
479 msg->size = htons (sizeof (*msg)); 495 msg->size = htons (sizeof (*msg));
480 496
@@ -489,7 +505,7 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
489 queue_message (ch, msg, GNUNET_NO); 505 queue_message (ch, msg, GNUNET_NO);
490 } 506 }
491 507
492 master_transmit_mod (mst); 508 channel_transmit_mod (ch);
493} 509}
494 510
495 511
@@ -499,17 +515,16 @@ master_transmit_mod (struct GNUNET_PSYC_Master *mst)
499 * @param mst Master handle. 515 * @param mst Master handle.
500 */ 516 */
501static void 517static void
502master_transmit_data (struct GNUNET_PSYC_Master *mst) 518channel_transmit_data (struct GNUNET_PSYC_Channel *ch)
503{ 519{
504 struct GNUNET_PSYC_Channel *ch = &mst->ch;
505 uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; 520 uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
506 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; 521 char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
507 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; 522 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
508 523
509 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); 524 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
510 525
511 int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, 526 int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls,
512 &data_size, &msg[1]); 527 &data_size, &msg[1]);
513 switch (notify_ret) 528 switch (notify_ret)
514 { 529 {
515 case GNUNET_NO: 530 case GNUNET_NO:
@@ -522,14 +537,14 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
522 break; 537 break;
523 538
524 case GNUNET_YES: 539 case GNUNET_YES:
525 mst->tmit->state = MSG_STATE_START; 540 ch->tmit.state = MSG_STATE_END;
526 break; 541 break;
527 542
528 default: 543 default:
529 LOG (GNUNET_ERROR_TYPE_ERROR, 544 LOG (GNUNET_ERROR_TYPE_ERROR,
530 "MasterTransmitNotify returned error when requesting data.\n"); 545 "MasterTransmitNotify returned error when requesting data.\n");
531 546
532 mst->tmit->state = MSG_STATE_START; 547 ch->tmit.state = MSG_STATE_CANCEL;
533 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); 548 msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
534 msg->size = htons (sizeof (*msg)); 549 msg->size = htons (sizeof (*msg));
535 queue_message (ch, msg, GNUNET_YES); 550 queue_message (ch, msg, GNUNET_YES);
@@ -554,6 +569,86 @@ master_transmit_data (struct GNUNET_PSYC_Master *mst)
554 569
555 570
556/** 571/**
572 * Send a message to a channel.
573 *
574 * @param ch Handle to the PSYC channel.
575 * @param method_name Which method should be invoked.
576 * @param notify_mod Function to call to obtain modifiers.
577 * @param notify_data Function to call to obtain fragments of the data.
578 * @param notify_cls Closure for @a notify_mod and @a notify_data.
579 * @param flags Flags for the message being transmitted.
580 * @return Transmission handle, NULL on error (i.e. more than one request queued).
581 */
582static struct GNUNET_PSYC_ChannelTransmitHandle *
583channel_transmit (struct GNUNET_PSYC_Channel *ch,
584 const char *method_name,
585 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
586 GNUNET_PSYC_TransmitNotifyData notify_data,
587 void *notify_cls,
588 uint32_t flags)
589{
590 if (GNUNET_NO != ch->in_transmit)
591 return NULL;
592 ch->in_transmit = GNUNET_YES;
593
594 size_t size = strlen (method_name) + 1;
595 struct GNUNET_PSYC_MessageMethod *pmeth;
596 struct OperationHandle *op;
597
598 ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
599 + sizeof (*pmeth) + size);
600 op->msg = (struct GNUNET_MessageHeader *) &op[1];
601 op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
602
603 pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
604 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
605 pmeth->header.size = htons (sizeof (*pmeth) + size);
606 pmeth->flags = htonl (flags);
607 memcpy (&pmeth[1], method_name, size);
608
609 ch->tmit.ch = ch;
610 ch->tmit.notify_mod = notify_mod;
611 ch->tmit.notify_data = notify_data;
612 ch->tmit.notify_cls = notify_cls;
613 ch->tmit.state = MSG_STATE_MODIFIER;
614
615 channel_transmit_mod (ch);
616 return &ch->tmit;
617}
618
619
620/**
621 * Resume transmission to the channel.
622 *
623 * @param th Handle of the request that is being resumed.
624 */
625static void
626channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th)
627{
628 struct GNUNET_PSYC_Channel *ch = th->ch;
629 if (0 == ch->tmit_ack_pending)
630 {
631 ch->tmit_paused = GNUNET_NO;
632 channel_transmit_data (ch);
633 }
634}
635
636
637/**
638 * Abort transmission request to channel.
639 *
640 * @param th Handle of the request that is being aborted.
641 */
642static void
643channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th)
644{
645 struct GNUNET_PSYC_Channel *ch = th->ch;
646 if (GNUNET_NO == ch->in_transmit)
647 return;
648}
649
650
651/**
557 * Handle incoming message from the PSYC service. 652 * Handle incoming message from the PSYC service.
558 * 653 *
559 * @param ch The channel the message is sent to. 654 * @param ch The channel the message is sent to.
@@ -564,14 +659,20 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
564 const struct GNUNET_PSYC_MessageHeader *msg) 659 const struct GNUNET_PSYC_MessageHeader *msg)
565{ 660{
566 uint16_t size = ntohs (msg->header.size); 661 uint16_t size = ntohs (msg->header.size);
662 uint32_t flags = ntohl (msg->flags);
663
664 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
665 (struct GNUNET_MessageHeader *) msg);
567 666
568 if (MSG_STATE_START == ch->recv_state) 667 if (MSG_STATE_START == ch->recv_state)
569 { 668 {
570 ch->recv_message_id = GNUNET_ntohll (msg->message_id); 669 ch->recv_message_id = GNUNET_ntohll (msg->message_id);
571 ch->recv_flags = ntohl (msg->flags); 670 ch->recv_flags = flags;
671 ch->recv_slave_key = msg->slave_key;
572 } 672 }
573 else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) 673 else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
574 { 674 {
675 // FIXME
575 LOG (GNUNET_ERROR_TYPE_WARNING, 676 LOG (GNUNET_ERROR_TYPE_WARNING,
576 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", 677 "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
577 GNUNET_ntohll (msg->message_id), ch->recv_message_id); 678 GNUNET_ntohll (msg->message_id), ch->recv_message_id);
@@ -579,11 +680,11 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
579 recv_error (ch); 680 recv_error (ch);
580 return; 681 return;
581 } 682 }
582 else if (ntohl (msg->flags) != ch->recv_flags) 683 else if (flags != ch->recv_flags)
583 { 684 {
584 LOG (GNUNET_ERROR_TYPE_WARNING, 685 LOG (GNUNET_ERROR_TYPE_WARNING,
585 "Unexpected message flags. Got: %lu, expected: %lu\n", 686 "Unexpected message flags. Got: %lu, expected: %lu\n",
586 ntohl (msg->flags), ch->recv_flags); 687 flags, ch->recv_flags);
587 GNUNET_break_op (0); 688 GNUNET_break_op (0);
588 recv_error (ch); 689 recv_error (ch);
589 return; 690 return;
@@ -599,10 +700,6 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
599 ptype = ntohs (pmsg->type); 700 ptype = ntohs (pmsg->type);
600 size_eq = size_min = 0; 701 size_eq = size_min = 0;
601 702
602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
603 "Received message part of type %u and size %u from PSYC.\n",
604 ptype, psize);
605
606 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) 703 if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
607 { 704 {
608 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 705 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -612,6 +709,10 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
612 return; 709 return;
613 } 710 }
614 711
712 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
713 "Received message part from PSYC.\n");
714 GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
715
615 switch (ptype) 716 switch (ptype)
616 { 717 {
617 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: 718 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
@@ -758,6 +859,46 @@ handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
758 859
759 860
760/** 861/**
862 * Handle incoming message acknowledgement from the PSYC service.
863 *
864 * @param ch The channel the acknowledgement is sent to.
865 */
866static void
867handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch)
868{
869 if (0 == ch->tmit_ack_pending)
870 {
871 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
872 GNUNET_break (0);
873 return;
874 }
875 ch->tmit_ack_pending--;
876
877 switch (ch->tmit.state)
878 {
879 case MSG_STATE_MODIFIER:
880 case MSG_STATE_MOD_CONT:
881 if (GNUNET_NO == ch->tmit_paused)
882 channel_transmit_mod (ch);
883 break;
884
885 case MSG_STATE_DATA:
886 if (GNUNET_NO == ch->tmit_paused)
887 channel_transmit_data (ch);
888 break;
889
890 case MSG_STATE_END:
891 case MSG_STATE_CANCEL:
892 break;
893
894 default:
895 LOG (GNUNET_ERROR_TYPE_DEBUG,
896 "Ignoring message ACK in state %u.\n", ch->tmit.state);
897 }
898}
899
900
901/**
761 * Type of a function to call when we receive a message 902 * Type of a function to call when we receive a message
762 * from the service. 903 * from the service.
763 * 904 *
@@ -775,7 +916,7 @@ message_handler (void *cls,
775 916
776 if (NULL == msg) 917 if (NULL == msg)
777 { 918 {
778 GNUNET_break (0); 919 // timeout / disconnected from server, reconnect
779 reschedule_connect (ch); 920 reschedule_connect (ch);
780 return; 921 return;
781 } 922 }
@@ -824,63 +965,15 @@ message_handler (void *cls,
824 } 965 }
825 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: 966 case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
826 { 967 {
827#if TODO
828 struct CountersResult *cres = (struct CountersResult *) msg; 968 struct CountersResult *cres = (struct CountersResult *) msg;
829 slv->max_message_id = GNUNET_ntohll (cres->max_message_id); 969 slv->max_message_id = GNUNET_ntohll (cres->max_message_id);
830 if (NULL != slv->join_ack_cb) 970 if (NULL != slv->join_cb)
831 mst->join_ack_cb (ch->cb_cls, mst->max_message_id); 971 slv->join_cb (ch->cb_cls, slv->max_message_id);
832#endif
833 break; 972 break;
834 } 973 }
835 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: 974 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
836 { 975 {
837 if (0 == ch->tmit_ack_pending) 976 handle_psyc_message_ack (ch);
838 {
839 LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
840 GNUNET_break (0);
841 break;
842 }
843 ch->tmit_ack_pending--;
844
845 if (ch->is_master)
846 {
847 GNUNET_assert (NULL != mst->tmit);
848 switch (mst->tmit->state)
849 {
850 case MSG_STATE_MODIFIER:
851 case MSG_STATE_MOD_CONT:
852 if (GNUNET_NO == ch->tmit_paused)
853 master_transmit_mod (mst);
854 break;
855
856 case MSG_STATE_DATA:
857 if (GNUNET_NO == ch->tmit_paused)
858 master_transmit_data (mst);
859 break;
860
861 case MSG_STATE_END:
862 case MSG_STATE_CANCEL:
863 if (NULL != mst->tmit)
864 {
865 GNUNET_free (mst->tmit);
866 mst->tmit = NULL;
867 }
868 else
869 {
870 LOG (GNUNET_ERROR_TYPE_WARNING,
871 "Ignoring message ACK, there's no transmission going on.\n");
872 GNUNET_break (0);
873 }
874 break;
875 default:
876 LOG (GNUNET_ERROR_TYPE_DEBUG,
877 "Ignoring message ACK in state %u.\n", mst->tmit->state);
878 }
879 }
880 else
881 {
882 /* TODO: slave */
883 }
884 break; 977 break;
885 } 978 }
886 979
@@ -1106,8 +1199,6 @@ void
1106GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) 1199GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
1107{ 1200{
1108 disconnect (master); 1201 disconnect (master);
1109 if (NULL != master->tmit)
1110 GNUNET_free (master->tmit);
1111 GNUNET_free (master); 1202 GNUNET_free (master);
1112} 1203}
1113 1204
@@ -1162,41 +1253,14 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
1162struct GNUNET_PSYC_MasterTransmitHandle * 1253struct GNUNET_PSYC_MasterTransmitHandle *
1163GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, 1254GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
1164 const char *method_name, 1255 const char *method_name,
1165 GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod, 1256 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
1166 GNUNET_PSYC_MasterTransmitNotify notify_data, 1257 GNUNET_PSYC_TransmitNotifyData notify_data,
1167 void *notify_cls, 1258 void *notify_cls,
1168 enum GNUNET_PSYC_MasterTransmitFlags flags) 1259 enum GNUNET_PSYC_MasterTransmitFlags flags)
1169{ 1260{
1170 GNUNET_assert (NULL != master); 1261 return (struct GNUNET_PSYC_MasterTransmitHandle *)
1171 struct GNUNET_PSYC_Channel *ch = &master->ch; 1262 channel_transmit (&master->ch, method_name, notify_mod, notify_data,
1172 if (GNUNET_NO != ch->in_transmit) 1263 notify_cls, flags);
1173 return NULL;
1174 ch->in_transmit = GNUNET_YES;
1175
1176 size_t size = strlen (method_name) + 1;
1177 struct GNUNET_PSYC_MessageMethod *pmeth;
1178 struct OperationHandle *op;
1179
1180 ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
1181 + sizeof (*pmeth) + size);
1182 op->msg = (struct GNUNET_MessageHeader *) &op[1];
1183 op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
1184
1185 pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
1186 pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
1187 pmeth->header.size = htons (sizeof (*pmeth) + size);
1188 pmeth->flags = htonl (flags);
1189 memcpy (&pmeth[1], method_name, size);
1190
1191 master->tmit = GNUNET_malloc (sizeof (*master->tmit));
1192 master->tmit->master = master;
1193 master->tmit->notify_mod = notify_mod;
1194 master->tmit->notify_data = notify_data;
1195 master->tmit->notify_cls = notify_cls;
1196 master->tmit->state = MSG_STATE_MODIFIER;
1197
1198 master_transmit_mod (master);
1199 return master->tmit;
1200} 1264}
1201 1265
1202 1266
@@ -1208,12 +1272,7 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
1208void 1272void
1209GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) 1273GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
1210{ 1274{
1211 struct GNUNET_PSYC_Channel *ch = &th->master->ch; 1275 channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1212 if (0 == ch->tmit_ack_pending)
1213 {
1214 ch->tmit_paused = GNUNET_NO;
1215 master_transmit_data (th->master);
1216 }
1217} 1276}
1218 1277
1219 1278
@@ -1225,10 +1284,7 @@ GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
1225void 1284void
1226GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) 1285GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th)
1227{ 1286{
1228 struct GNUNET_PSYC_Master *master = th->master; 1287 channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1229 struct GNUNET_PSYC_Channel *ch = &master->ch;
1230 if (GNUNET_NO != ch->in_transmit)
1231 return;
1232} 1288}
1233 1289
1234 1290
@@ -1282,15 +1338,15 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1282{ 1338{
1283 struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); 1339 struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
1284 struct GNUNET_PSYC_Channel *ch = &slv->ch; 1340 struct GNUNET_PSYC_Channel *ch = &slv->ch;
1285 struct SlaveJoinRequest *req = GNUNET_malloc (sizeof (*req) 1341 struct SlaveJoinRequest *req
1286 + relay_count * sizeof (*relays)); 1342 = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays));
1287 req->header.size = htons (sizeof (*req) 1343 req->header.size = htons (sizeof (*req)
1288 + relay_count * sizeof (*relays)); 1344 + relay_count * sizeof (*relays));
1289 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); 1345 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
1290 req->channel_key = *channel_key; 1346 req->channel_key = *channel_key;
1291 req->slave_key = *slave_key; 1347 req->slave_key = *slave_key;
1292 req->origin = *origin; 1348 req->origin = *origin;
1293 req->relay_count = relay_count; 1349 req->relay_count = htonl (relay_count);
1294 memcpy (&req[1], relays, relay_count * sizeof (*relays)); 1350 memcpy (&req[1], relays, relay_count * sizeof (*relays));
1295 1351
1296 ch->message_cb = message_cb; 1352 ch->message_cb = message_cb;
@@ -1303,6 +1359,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1303 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 1359 ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1304 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); 1360 ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
1305 1361
1362 slv->join_cb = slave_joined_cb;
1306 return slv; 1363 return slv;
1307} 1364}
1308 1365
@@ -1328,9 +1385,8 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
1328 * 1385 *
1329 * @param slave Slave handle. 1386 * @param slave Slave handle.
1330 * @param method_name Which (PSYC) method should be invoked (on host). 1387 * @param method_name Which (PSYC) method should be invoked (on host).
1331 * @param env Environment containing transient variables for the message, or 1388 * @param notify_mod Function to call to obtain modifiers.
1332 * NULL. 1389 * @param notify_data Function to call to obtain fragments of the data.
1333 * @param notify Function to call when we are allowed to transmit (to get data).
1334 * @param notify_cls Closure for @a notify. 1390 * @param notify_cls Closure for @a notify.
1335 * @param flags Flags for the message being transmitted. 1391 * @param flags Flags for the message being transmitted.
1336 * @return Transmission handle, NULL on error (i.e. more than one request 1392 * @return Transmission handle, NULL on error (i.e. more than one request
@@ -1339,12 +1395,14 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
1339struct GNUNET_PSYC_SlaveTransmitHandle * 1395struct GNUNET_PSYC_SlaveTransmitHandle *
1340GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, 1396GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
1341 const char *method_name, 1397 const char *method_name,
1342 const struct GNUNET_ENV_Environment *env, 1398 GNUNET_PSYC_TransmitNotifyModifier notify_mod,
1343 GNUNET_PSYC_SlaveTransmitNotify notify, 1399 GNUNET_PSYC_TransmitNotifyData notify_data,
1344 void *notify_cls, 1400 void *notify_cls,
1345 enum GNUNET_PSYC_SlaveTransmitFlags flags) 1401 enum GNUNET_PSYC_SlaveTransmitFlags flags)
1346{ 1402{
1347 return NULL; 1403 return (struct GNUNET_PSYC_SlaveTransmitHandle *)
1404 channel_transmit (&slave->ch, method_name,
1405 notify_mod, notify_data, notify_cls, flags);
1348} 1406}
1349 1407
1350 1408
@@ -1356,7 +1414,7 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
1356void 1414void
1357GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) 1415GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
1358{ 1416{
1359 1417 channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1360} 1418}
1361 1419
1362 1420
@@ -1368,7 +1426,7 @@ GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
1368void 1426void
1369GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th) 1427GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
1370{ 1428{
1371 1429 channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
1372} 1430}
1373 1431
1374 1432
@@ -1382,7 +1440,7 @@ GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
1382struct GNUNET_PSYC_Channel * 1440struct GNUNET_PSYC_Channel *
1383GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) 1441GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
1384{ 1442{
1385 return (struct GNUNET_PSYC_Channel *) master; 1443 return &master->ch;
1386} 1444}
1387 1445
1388 1446
@@ -1395,7 +1453,7 @@ GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
1395struct GNUNET_PSYC_Channel * 1453struct GNUNET_PSYC_Channel *
1396GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) 1454GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave)
1397{ 1455{
1398 return (struct GNUNET_PSYC_Channel *) slave; 1456 return &slave->ch;
1399} 1457}
1400 1458
1401 1459
diff --git a/src/psyc/psyc_common.c b/src/psyc/psyc_common.c
new file mode 100644
index 000000000..7368011fc
--- /dev/null
+++ b/src/psyc/psyc_common.c
@@ -0,0 +1,100 @@
1/*
2 * This file is part of GNUnet
3 * (C) 2013 Christian Grothoff (and other contributing authors)
4 *
5 * GNUnet is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published
7 * by the Free Software Foundation; either version 3, or (at your
8 * option) any later version.
9 *
10 * GNUnet is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with GNUnet; see the file COPYING. If not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 * Boston, MA 02111-1307, USA.
19 */
20
21/**
22 * @file psyc/psyc_common.c
23 * @brief Common functions for PSYC
24 * @author Gabor X Toth
25 */
26
27#include <inttypes.h>
28#include "psyc.h"
29
30/**
31 * Check if @a data contains a series of valid message parts.
32 *
33 * @param data_size Size of @a data.
34 * @param data Data.
35 *
36 * @return GNUNET_YES or GNUNET_NO
37 */
38int
39GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data)
40{
41 const struct GNUNET_MessageHeader *pmsg;
42 uint16_t psize = 0;
43 uint16_t pos = 0;
44
45 for (pos = 0; data_size + pos < data_size; pos += psize)
46 {
47 pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
48 psize = ntohs (pmsg->size);
49 if (psize < sizeof (*pmsg) || data_size + pos + psize > data_size)
50 {
51 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
52 "Invalid message part of type %u and size %u.",
53 ntohs (pmsg->type), psize);
54 return GNUNET_NO;
55 }
56 }
57 return GNUNET_YES;
58}
59
60
61void
62GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
63 const struct GNUNET_MessageHeader *msg)
64{
65 uint16_t size = ntohs (msg->size);
66 uint16_t type = ntohs (msg->type);
67 GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
68 switch (type)
69 {
70 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
71 {
72 struct GNUNET_PSYC_MessageHeader *pmsg
73 = (struct GNUNET_PSYC_MessageHeader *) msg;
74 GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %" PRIu32 "\n",
75 GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
76 break;
77 }
78 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
79 {
80 struct GNUNET_PSYC_MessageMethod *meth
81 = (struct GNUNET_PSYC_MessageMethod *) msg;
82 GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
83 break;
84 }
85 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
86 {
87 struct GNUNET_PSYC_MessageModifier *mod
88 = (struct GNUNET_PSYC_MessageModifier *) msg;
89 uint16_t name_size = ntohs (mod->name_size);
90 char oper = ' ' < mod->oper ? mod->oper : ' ';
91 GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
92 ntohs (mod->value_size), ((char *) &mod[1]) + name_size + 1);
93 break;
94 }
95 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
96 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
97 GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
98 break;
99 }
100}
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 33684b125..88947be60 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -37,7 +37,7 @@
37 37
38#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) 38#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
39 39
40#define DEBUG_SERVICE 0 40#define DEBUG_SERVICE 1
41 41
42 42
43/** 43/**
@@ -66,7 +66,8 @@ struct GNUNET_PSYC_MasterTransmitHandle *mth;
66 66
67struct TransmitClosure 67struct TransmitClosure
68{ 68{
69 struct GNUNET_PSYC_MasterTransmitHandle *handle; 69 struct GNUNET_PSYC_MasterTransmitHandle *mst_tmit;
70 struct GNUNET_PSYC_SlaveTransmitHandle *slv_tmit;
70 struct GNUNET_ENV_Environment *env; 71 struct GNUNET_ENV_Environment *env;
71 char *data[16]; 72 char *data[16];
72 const char *mod_value; 73 const char *mod_value;
@@ -78,12 +79,30 @@ struct TransmitClosure
78 79
79struct TransmitClosure *tmit; 80struct TransmitClosure *tmit;
80 81
82
83enum
84{
85 TEST_NONE,
86 TEST_SLAVE_TRANSMIT,
87 TEST_MASTER_TRANSMIT,
88} test;
89
90
91static void
92master_transmit ();
93
94
81/** 95/**
82 * Clean up all resources used. 96 * Clean up all resources used.
83 */ 97 */
84static void 98static void
85cleanup () 99cleanup ()
86{ 100{
101 if (NULL != slv)
102 {
103 GNUNET_PSYC_slave_part (slv);
104 slv = NULL;
105 }
87 if (NULL != mst) 106 if (NULL != mst)
88 { 107 {
89 GNUNET_PSYC_master_stop (mst); 108 GNUNET_PSYC_master_stop (mst);
@@ -133,6 +152,8 @@ end_normally (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
133static void 152static void
134end () 153end ()
135{ 154{
155 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Ending tests.\n");
156
136 if (end_badly_task != GNUNET_SCHEDULER_NO_TASK) 157 if (end_badly_task != GNUNET_SCHEDULER_NO_TASK)
137 { 158 {
138 GNUNET_SCHEDULER_cancel (end_badly_task); 159 GNUNET_SCHEDULER_cancel (end_badly_task);
@@ -144,8 +165,8 @@ end ()
144 165
145 166
146static void 167static void
147message (void *cls, uint64_t message_id, uint32_t flags, 168master_message (void *cls, uint64_t message_id, uint32_t flags,
148 const struct GNUNET_MessageHeader *msg) 169 const struct GNUNET_MessageHeader *msg)
149{ 170{
150 if (NULL == msg) 171 if (NULL == msg)
151 { 172 {
@@ -158,12 +179,64 @@ message (void *cls, uint64_t message_id, uint32_t flags,
158 uint16_t size = ntohs (msg->size); 179 uint16_t size = ntohs (msg->size);
159 180
160 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 181 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
161 "Got message part of type %u and size %u " 182 "Master got message part of type %u and size %u "
162 "belonging to message ID %llu with flags %u\n", 183 "belonging to message ID %llu with flags %u\n",
163 type, size, message_id, flags); 184 type, size, message_id, flags);
164 185
165 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type) 186 switch (test)
166 end (); 187 {
188 case TEST_SLAVE_TRANSMIT:
189 if (GNUNET_PSYC_MESSAGE_REQUEST != flags)
190 {
191 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
192 "Unexpected request flags: %lu\n", flags);
193 GNUNET_assert (0);
194 return;
195 }
196 // FIXME: check rest of message
197
198 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
199 master_transmit ();
200 break;
201
202 case TEST_MASTER_TRANSMIT:
203 break;
204
205 default:
206 GNUNET_assert (0);
207 }
208}
209
210
211static void
212slave_message (void *cls, uint64_t message_id, uint32_t flags,
213 const struct GNUNET_MessageHeader *msg)
214{
215 if (NULL == msg)
216 {
217 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
218 "Error while receiving message %llu\n", message_id);
219 return;
220 }
221
222 uint16_t type = ntohs (msg->type);
223 uint16_t size = ntohs (msg->size);
224
225 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
226 "Slave got message part of type %u and size %u "
227 "belonging to message ID %llu with flags %u\n",
228 type, size, message_id, flags);
229
230 switch (test)
231 {
232 case TEST_MASTER_TRANSMIT:
233 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
234 end ();
235 break;
236
237 default:
238 GNUNET_assert (0);
239 }
167} 240}
168 241
169 242
@@ -175,7 +248,9 @@ join_request (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
175 struct GNUNET_PSYC_JoinHandle *jh) 248 struct GNUNET_PSYC_JoinHandle *jh)
176{ 249{
177 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 250 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
178 "Got join request."); 251 "Got join request: %s (%zu vars)", method_name, variable_count);
252 GNUNET_PSYC_join_decision (jh, GNUNET_YES, 0, NULL, "_notice_join", NULL,
253 "you're in", 9);
179} 254}
180 255
181 256
@@ -185,7 +260,7 @@ transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
185 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n"); 260 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n");
186 struct TransmitClosure *tmit = cls; 261 struct TransmitClosure *tmit = cls;
187 tmit->paused = GNUNET_NO; 262 tmit->paused = GNUNET_NO;
188 GNUNET_PSYC_master_transmit_resume (tmit->handle); 263 GNUNET_PSYC_master_transmit_resume (tmit->mst_tmit);
189} 264}
190 265
191 266
@@ -204,35 +279,8 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper)
204 uint16_t name_size = 0; 279 uint16_t name_size = 0;
205 size_t value_size = 0; 280 size_t value_size = 0;
206 281
207 if (NULL != tmit->mod_value && 0 < tmit->mod_value_size) 282 if (NULL != oper)
208 { /* Modifier continuation */ 283 { /* New modifier */
209 value = tmit->mod_value;
210 if (tmit->mod_value_size <= *data_size)
211 {
212 value_size = tmit->mod_value_size;
213 tmit->mod_value = NULL;
214 }
215 else
216 {
217 value_size = *data_size;
218 tmit->mod_value += value_size;
219 }
220 tmit->mod_value_size -= value_size;
221
222 if (*data_size < value_size)
223 {
224 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
225 "value larger than buffer: %u < %zu\n",
226 *data_size, value_size);
227 *data_size = 0;
228 return GNUNET_NO;
229 }
230
231 *data_size = value_size;
232 memcpy (data, value, value_size);
233 }
234 else if (NULL != oper)
235 {
236 if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name, 284 if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name,
237 (void *) &value, &value_size)) 285 (void *) &value, &value_size))
238 { /* No more modifiers, continue with data */ 286 { /* No more modifiers, continue with data */
@@ -259,6 +307,33 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper)
259 ((char *)data)[name_size] = '\0'; 307 ((char *)data)[name_size] = '\0';
260 memcpy ((char *)data + name_size + 1, value, value_size); 308 memcpy ((char *)data + name_size + 1, value, value_size);
261 } 309 }
310 else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size)
311 { /* Modifier continuation */
312 value = tmit->mod_value;
313 if (tmit->mod_value_size <= *data_size)
314 {
315 value_size = tmit->mod_value_size;
316 tmit->mod_value = NULL;
317 }
318 else
319 {
320 value_size = *data_size;
321 tmit->mod_value += value_size;
322 }
323 tmit->mod_value_size -= value_size;
324
325 if (*data_size < value_size)
326 {
327 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
328 "value larger than buffer: %u < %zu\n",
329 *data_size, value_size);
330 *data_size = 0;
331 return GNUNET_NO;
332 }
333
334 *data_size = value_size;
335 memcpy (data, value, value_size);
336 }
262 337
263 return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO; 338 return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO;
264} 339}
@@ -268,6 +343,12 @@ static int
268tmit_notify_data (void *cls, uint16_t *data_size, void *data) 343tmit_notify_data (void *cls, uint16_t *data_size, void *data)
269{ 344{
270 struct TransmitClosure *tmit = cls; 345 struct TransmitClosure *tmit = cls;
346 if (0 == tmit->data_count)
347 {
348 *data_size = 0;
349 return GNUNET_YES;
350 }
351
271 uint16_t size = strlen (tmit->data[tmit->n]); 352 uint16_t size = strlen (tmit->data[tmit->n]);
272 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 353 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
273 "Transmit notify data: %lu bytes available, " 354 "Transmit notify data: %lu bytes available, "
@@ -300,10 +381,52 @@ tmit_notify_data (void *cls, uint16_t *data_size, void *data)
300 381
301 382
302static void 383static void
303master_started (void *cls, uint64_t max_message_id) 384slave_joined (void *cls, uint64_t max_message_id)
304{ 385{
305 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 386 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n", max_message_id);
306 "Master started: %" PRIu64 "\n", max_message_id); 387 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n");
388
389 test = TEST_SLAVE_TRANSMIT;
390
391 tmit = GNUNET_new (struct TransmitClosure);
392 tmit->env = GNUNET_ENV_environment_create ();
393 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
394 "_abc", "abc def", 7);
395 GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
396 "_abc_def", "abc def ghi", 11);
397 tmit->n = 0;
398 tmit->data[0] = "slave test";
399 tmit->data_count = 1;
400 tmit->slv_tmit
401 = GNUNET_PSYC_slave_transmit (slv, "_request_test", tmit_notify_mod,
402 tmit_notify_data, tmit,
403 GNUNET_PSYC_SLAVE_TRANSMIT_NONE);
404}
405
406static void
407slave_join ()
408{
409 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n");
410
411 struct GNUNET_PeerIdentity origin;
412 struct GNUNET_PeerIdentity relays[16];
413 struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
414 GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
415 "_foo", "bar baz", 7);
416 GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
417 "_foo_bar", "foo bar baz", 11);
418 slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
419 16, relays, &slave_message, &join_request, &slave_joined,
420 NULL, "_request_join", env, "some data", 9);
421 GNUNET_ENV_environment_destroy (env);
422}
423
424
425static void
426master_transmit ()
427{
428 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n");
429 test = TEST_MASTER_TRANSMIT;
307 430
308 tmit = GNUNET_new (struct TransmitClosure); 431 tmit = GNUNET_new (struct TransmitClosure);
309 tmit->env = GNUNET_ENV_environment_create (); 432 tmit->env = GNUNET_ENV_environment_create ();
@@ -315,17 +438,19 @@ master_started (void *cls, uint64_t max_message_id)
315 tmit->data[1] = "foo bar"; 438 tmit->data[1] = "foo bar";
316 tmit->data[2] = "foo bar baz"; 439 tmit->data[2] = "foo bar baz";
317 tmit->data_count = 3; 440 tmit->data_count = 3;
318 tmit->handle 441 tmit->mst_tmit
319 = GNUNET_PSYC_master_transmit (mst, "_test", tmit_notify_mod, 442 = GNUNET_PSYC_master_transmit (mst, "_notice_test", tmit_notify_mod,
320 tmit_notify_data, tmit, 443 tmit_notify_data, tmit,
321 GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); 444 GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN);
322} 445}
323 446
324 447
325static void 448static void
326slave_joined (void *cls, uint64_t max_message_id) 449master_started (void *cls, uint64_t max_message_id)
327{ 450{
328 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n", max_message_id); 451 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
452 "Master started: %" PRIu64 "\n", max_message_id);
453 slave_join ();
329} 454}
330 455
331 456
@@ -355,21 +480,9 @@ run (void *cls,
355 GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key); 480 GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key);
356 GNUNET_CRYPTO_eddsa_key_get_public (slave_key, &slave_pub_key); 481 GNUNET_CRYPTO_eddsa_key_get_public (slave_key, &slave_pub_key);
357 482
483 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n");
358 mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, 484 mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE,
359 &message, &join_request, &master_started, NULL); 485 &master_message, &join_request, &master_started, NULL);
360 return; /* FIXME: test slave */
361
362 struct GNUNET_PeerIdentity origin;
363 struct GNUNET_PeerIdentity relays[16];
364 struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
365 GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
366 "_foo", "bar baz", 7);
367 GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
368 "_foo_bar", "foo bar baz", 11);
369 slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
370 16, relays, &message, &join_request, &slave_joined,
371 NULL, "_request_join", env, "some data", 9);
372 GNUNET_ENV_environment_destroy (env);
373} 486}
374 487
375 488
diff --git a/src/psycstore/plugin_psycstore_sqlite.c b/src/psycstore/plugin_psycstore_sqlite.c
index 356216998..6ca0236d5 100644
--- a/src/psycstore/plugin_psycstore_sqlite.c
+++ b/src/psycstore/plugin_psycstore_sqlite.c
@@ -435,7 +435,7 @@ database_setup (struct Plugin *plugin)
435 &plugin->select_membership); 435 &plugin->select_membership);
436 436
437 sql_prepare (plugin->dbh, 437 sql_prepare (plugin->dbh,
438 "INSERT INTO messages\n" 438 "INSERT OR IGNORE INTO messages\n"
439 " (channel_id, hop_counter, signature, purpose,\n" 439 " (channel_id, hop_counter, signature, purpose,\n"
440 " fragment_id, fragment_offset, message_id,\n" 440 " fragment_id, fragment_offset, message_id,\n"
441 " group_generation, multicast_flags, psycstore_flags, data)\n" 441 " group_generation, multicast_flags, psycstore_flags, data)\n"