diff options
author | Gabor X Toth <*@tg-x.net> | 2014-01-06 00:09:37 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2014-01-06 00:09:37 +0000 |
commit | c04d45b9738e1764d2e2c21efdbeb129f298d5d1 (patch) | |
tree | 9eec32efdd3fe3f9f459630af16058cc47436bce /src | |
parent | 83a0e31631dbc199c37c42f11004e1be544f04a8 (diff) | |
download | gnunet-c04d45b9738e1764d2e2c21efdbeb129f298d5d1.tar.gz gnunet-c04d45b9738e1764d2e2c21efdbeb129f298d5d1.zip |
psyc: ipc messages
Diffstat (limited to 'src')
-rw-r--r-- | src/include/gnunet_env_lib.h | 4 | ||||
-rw-r--r-- | src/include/gnunet_multicast_service.h | 27 | ||||
-rw-r--r-- | src/include/gnunet_protocols.h | 16 | ||||
-rw-r--r-- | src/include/gnunet_psyc_service.h | 168 | ||||
-rw-r--r-- | src/include/gnunet_social_service.h | 2 | ||||
-rw-r--r-- | src/multicast/multicast_api.c | 5 | ||||
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 325 | ||||
-rw-r--r-- | src/psyc/psyc.h | 19 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 569 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 74 |
10 files changed, 882 insertions, 327 deletions
diff --git a/src/include/gnunet_env_lib.h b/src/include/gnunet_env_lib.h index 857ad45c2..89101afc1 100644 --- a/src/include/gnunet_env_lib.h +++ b/src/include/gnunet_env_lib.h | |||
@@ -168,7 +168,9 @@ GNUNET_ENV_environment_add_mod (struct GNUNET_ENV_Environment *env, | |||
168 | * #GNUNET_NO to stop. | 168 | * #GNUNET_NO to stop. |
169 | */ | 169 | */ |
170 | typedef int | 170 | typedef int |
171 | (*GNUNET_ENV_Iterator) (void *cls, struct GNUNET_ENV_Modifier *mod); | 171 | (*GNUNET_ENV_Iterator) (void *cls, enum GNUNET_ENV_Operator oper, |
172 | const char *name, const char *value, | ||
173 | uint32_t value_size); | ||
172 | 174 | ||
173 | 175 | ||
174 | /** | 176 | /** |
diff --git a/src/include/gnunet_multicast_service.h b/src/include/gnunet_multicast_service.h index 58a99c0d8..0abd803a4 100644 --- a/src/include/gnunet_multicast_service.h +++ b/src/include/gnunet_multicast_service.h | |||
@@ -45,11 +45,6 @@ extern "C" | |||
45 | #define GNUNET_MULTICAST_VERSION 0x00000000 | 45 | #define GNUNET_MULTICAST_VERSION 0x00000000 |
46 | 46 | ||
47 | /** | 47 | /** |
48 | * Maximum size of a multicast message fragment. | ||
49 | */ | ||
50 | #define GNUNET_MULTICAST_FRAGMENT_MAX_SIZE 63 * 1024 | ||
51 | |||
52 | /** | ||
53 | * Opaque handle for a multicast group member. | 48 | * Opaque handle for a multicast group member. |
54 | */ | 49 | */ |
55 | struct GNUNET_MULTICAST_Member; | 50 | struct GNUNET_MULTICAST_Member; |
@@ -77,7 +72,14 @@ enum GNUNET_MULTICAST_MessageFlags | |||
77 | */ | 72 | */ |
78 | GNUNET_MULTICAST_MESSAGE_NOT_FRAGMENTED | 73 | GNUNET_MULTICAST_MESSAGE_NOT_FRAGMENTED |
79 | = GNUNET_MULTICAST_MESSAGE_FIRST_FRAGMENT | 74 | = GNUNET_MULTICAST_MESSAGE_FIRST_FRAGMENT |
80 | | GNUNET_MULTICAST_MESSAGE_LAST_FRAGMENT | 75 | | GNUNET_MULTICAST_MESSAGE_LAST_FRAGMENT, |
76 | |||
77 | /** | ||
78 | * Historic message, used only locally when replaying messages from local | ||
79 | * storage. | ||
80 | */ | ||
81 | GNUNET_MULTICAST_MESSAGE_HISTORIC = 1 << 30 | ||
82 | |||
81 | }; | 83 | }; |
82 | 84 | ||
83 | 85 | ||
@@ -120,7 +122,7 @@ struct GNUNET_MULTICAST_MessageHeader | |||
120 | struct GNUNET_CRYPTO_EccSignaturePurpose purpose; | 122 | struct GNUNET_CRYPTO_EccSignaturePurpose purpose; |
121 | 123 | ||
122 | /** | 124 | /** |
123 | * Number of the message fragment, monotonically increasing. | 125 | * Number of the message fragment, monotonically increasing starting from 1. |
124 | */ | 126 | */ |
125 | uint64_t fragment_id GNUNET_PACKED; | 127 | uint64_t fragment_id GNUNET_PACKED; |
126 | 128 | ||
@@ -150,6 +152,8 @@ struct GNUNET_MULTICAST_MessageHeader | |||
150 | 152 | ||
151 | /** | 153 | /** |
152 | * Flags for this message fragment. | 154 | * Flags for this message fragment. |
155 | * | ||
156 | * @see enum GNUNET_MULTICAST_MessageFlags | ||
153 | */ | 157 | */ |
154 | uint32_t flags GNUNET_PACKED; | 158 | uint32_t flags GNUNET_PACKED; |
155 | 159 | ||
@@ -158,6 +162,15 @@ struct GNUNET_MULTICAST_MessageHeader | |||
158 | 162 | ||
159 | GNUNET_NETWORK_STRUCT_END | 163 | GNUNET_NETWORK_STRUCT_END |
160 | 164 | ||
165 | /** | ||
166 | * Maximum size of a multicast message fragment. | ||
167 | */ | ||
168 | #define GNUNET_MULTICAST_FRAGMENT_MAX_SIZE 63 * 1024 | ||
169 | |||
170 | #define GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD \ | ||
171 | GNUNET_MULTICAST_FRAGMENT_MAX_SIZE \ | ||
172 | - sizeof (struct GNUNET_MULTICAST_MessageHeader) | ||
173 | |||
161 | 174 | ||
162 | /** | 175 | /** |
163 | * Handle that identifies a join request. | 176 | * Handle that identifies a join request. |
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 5b7bc591a..9ca4155e8 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h | |||
@@ -2116,15 +2116,21 @@ extern "C" | |||
2116 | #define GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM 690 | 2116 | #define GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM 690 |
2117 | 2117 | ||
2118 | 2118 | ||
2119 | #define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD 691 | 2119 | #define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE 691 |
2120 | 2120 | ||
2121 | #define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER 692 | 2121 | #define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD 692 |
2122 | 2122 | ||
2123 | #define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT 693 | 2123 | #define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER 693 |
2124 | 2124 | ||
2125 | #define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA 694 | 2125 | #define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT 694 |
2126 | 2126 | ||
2127 | #define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK 695 | 2127 | #define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA 695 |
2128 | |||
2129 | #define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END 696 | ||
2130 | |||
2131 | #define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL 697 | ||
2132 | |||
2133 | #define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK 698 | ||
2128 | 2134 | ||
2129 | 2135 | ||
2130 | #define GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST 701 | 2136 | #define GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST 701 |
diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h index 91dcd0b07..eb17c9351 100644 --- a/src/include/gnunet_psyc_service.h +++ b/src/include/gnunet_psyc_service.h | |||
@@ -161,31 +161,49 @@ enum GNUNET_PSYC_Policy | |||
161 | enum GNUNET_PSYC_MessageFlags | 161 | enum GNUNET_PSYC_MessageFlags |
162 | { | 162 | { |
163 | /** | 163 | /** |
164 | * First fragment of a message. | 164 | * Historic message, retrieved from PSYCstore. |
165 | */ | ||
166 | GNUNET_PSYC_MESSAGE_HISTORIC = 1 | ||
167 | }; | ||
168 | |||
169 | GNUNET_NETWORK_STRUCT_BEGIN | ||
170 | |||
171 | /** | ||
172 | * Header of a PSYC message. | ||
173 | */ | ||
174 | struct GNUNET_PSYC_MessageHeader | ||
175 | { | ||
176 | /** | ||
177 | * Generic message header with size and type information. | ||
165 | */ | 178 | */ |
166 | GNUNET_PSYC_MESSAGE_FIRST_FRAGMENT = 1 << 0, | 179 | struct GNUNET_MessageHeader header; |
167 | 180 | ||
168 | /** | 181 | /** |
169 | * Last fragment of a message. | 182 | * Flags for this message fragment. |
183 | * | ||
184 | * @see enum GNUNET_PSYC_MessageFlags | ||
170 | */ | 185 | */ |
171 | GNUNET_PSYC_MESSAGE_LAST_FRAGMENT = 1 << 1, | 186 | uint32_t flags GNUNET_PACKED; |
172 | 187 | ||
173 | /** | 188 | /** |
174 | * OR'ed flags if message is not fragmented. | 189 | * Number of the message this message part belongs to. |
175 | */ | 190 | */ |
176 | GNUNET_PSYC_MESSAGE_NOT_FRAGMENTED | 191 | uint64_t message_id GNUNET_PACKED; |
177 | = GNUNET_PSYC_MESSAGE_FIRST_FRAGMENT | ||
178 | | GNUNET_PSYC_MESSAGE_LAST_FRAGMENT, | ||
179 | 192 | ||
180 | /** | 193 | /** |
181 | * Historic message, retrieved from PSYCstore. | 194 | * Sending slave's public key. |
195 | * Not set if the message is from the master. | ||
196 | */ | ||
197 | struct GNUNET_CRYPTO_EddsaPublicKey slave_key; | ||
198 | |||
199 | /* Followed by concatenated PSYC message parts: | ||
200 | * messages with GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_* types | ||
182 | */ | 201 | */ |
183 | GNUNET_PSYC_MESSAGE_HISTORIC = 1 << 30 | ||
184 | }; | 202 | }; |
185 | 203 | ||
186 | 204 | ||
187 | /** | 205 | /** |
188 | * M | 206 | * The method of a message. |
189 | */ | 207 | */ |
190 | struct GNUNET_PSYC_MessageMethod | 208 | struct GNUNET_PSYC_MessageMethod |
191 | { | 209 | { |
@@ -194,28 +212,18 @@ struct GNUNET_PSYC_MessageMethod | |||
194 | */ | 212 | */ |
195 | struct GNUNET_MessageHeader header; | 213 | struct GNUNET_MessageHeader header; |
196 | 214 | ||
197 | uint32_t reserved GNUNET_PACKED; | ||
198 | |||
199 | /** | ||
200 | * Number of modifiers in the message. | ||
201 | */ | ||
202 | uint32_t mod_count GNUNET_PACKED; | ||
203 | |||
204 | /** | 215 | /** |
205 | * OR'ed GNUNET_PSYC_MasterTransmitFlags | 216 | * OR'ed GNUNET_PSYC_MasterTransmitFlags |
206 | */ | 217 | */ |
207 | uint32_t flags GNUNET_PACKED; | 218 | uint32_t flags GNUNET_PACKED; |
208 | 219 | ||
209 | /** | ||
210 | * Sending slave's public key. | ||
211 | * NULL if the message is from the master, or when transmitting a message. | ||
212 | */ | ||
213 | struct GNUNET_CRYPTO_EddsaPublicKey slave_key; | ||
214 | |||
215 | /* Followed by NUL-terminated method name. */ | 220 | /* Followed by NUL-terminated method name. */ |
216 | }; | 221 | }; |
217 | 222 | ||
218 | 223 | ||
224 | /** | ||
225 | * A modifier of a message. | ||
226 | */ | ||
219 | struct GNUNET_PSYC_MessageModifier | 227 | struct GNUNET_PSYC_MessageModifier |
220 | { | 228 | { |
221 | /** | 229 | /** |
@@ -241,38 +249,19 @@ struct GNUNET_PSYC_MessageModifier | |||
241 | /* Followed by NUL-terminated name, then the value. */ | 249 | /* Followed by NUL-terminated name, then the value. */ |
242 | }; | 250 | }; |
243 | 251 | ||
252 | GNUNET_NETWORK_STRUCT_END | ||
244 | 253 | ||
245 | enum GNUNET_PSYC_DataStatus | 254 | #define GNUNET_PSYC_MODIFIER_MAX_PAYLOAD \ |
246 | { | 255 | GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD \ |
247 | /** | 256 | - sizeof (struct GNUNET_PSYC_MessageModifier) |
248 | * To be continued. | ||
249 | */ | ||
250 | GNUNET_PSYC_DATA_CONT = 0, | ||
251 | 257 | ||
252 | /** | 258 | #define GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD \ |
253 | * Reached the end of message. | 259 | GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD \ |
254 | */ | 260 | - sizeof (struct GNUNET_MessageHeader) |
255 | GNUNET_PSYC_DATA_END = 1, | ||
256 | |||
257 | /** | ||
258 | * Cancelled before the end. | ||
259 | */ | ||
260 | GNUNET_PSYC_DATA_CANCEL = 2 | ||
261 | }; | ||
262 | |||
263 | |||
264 | struct GNUNET_PSYC_MessageData | ||
265 | { | ||
266 | /** | ||
267 | * Type: GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA | ||
268 | */ | ||
269 | struct GNUNET_MessageHeader header; | ||
270 | 261 | ||
271 | /** | 262 | #define GNUNET_PSYC_DATA_MAX_PAYLOAD \ |
272 | * enum GNUNET_PSYC_DataStatus | 263 | GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD \ |
273 | */ | 264 | - sizeof (struct GNUNET_MessageHeader) |
274 | uint8_t status; | ||
275 | }; | ||
276 | 265 | ||
277 | /** | 266 | /** |
278 | * Handle that identifies a join request. | 267 | * Handle that identifies a join request. |
@@ -284,36 +273,21 @@ struct GNUNET_PSYC_JoinHandle; | |||
284 | 273 | ||
285 | 274 | ||
286 | /** | 275 | /** |
287 | * Method called from PSYC upon receiving a message indicating a call to a | 276 | * Method called from PSYC upon receiving part of a message. |
288 | * @e method. | ||
289 | * | 277 | * |
290 | * @param cls Closure. | 278 | * @param cls Closure. |
291 | * @param slave_key Who transmitted the message. | 279 | * @param msg Message part, one of the following types: |
292 | * - NULL for multicast messages from the master. | 280 | * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_HEADER |
293 | * - The sending slave's public key for unicast requests from one of the | 281 | * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD |
294 | * slaves to the master. | 282 | * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER |
295 | * @param message_id Unique message counter for this message. | 283 | * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT |
296 | * Unique only in combination with the given sender for this channel. | 284 | * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA |
297 | * @param method_name Method name from PSYC. | ||
298 | * @param modifier_count Number of elements in the @a modifiers array. | ||
299 | * @param modifiers State modifiers and transient variables for the message. | ||
300 | * @param data_offset Byte offset of @a data in the overall data of the method. | ||
301 | * @param data Data stream given to the method (might not be zero-terminated | ||
302 | * if data is binary). | ||
303 | * @param data_size Number of bytes in @a data. | ||
304 | * @param frag Fragmentation status for the data. | ||
305 | */ | 285 | */ |
306 | typedef int | 286 | typedef void |
307 | (*GNUNET_PSYC_Method) (void *cls, | 287 | (*GNUNET_PSYC_MessageCallback) (void *cls, |
308 | const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, | 288 | uint64_t message_id, |
309 | uint64_t message_id, | 289 | uint32_t flags, |
310 | const char *method_name, | 290 | const struct GNUNET_MessageHeader *msg); |
311 | size_t modifier_count, | ||
312 | const struct GNUNET_ENV_Modifier *modifiers, | ||
313 | uint64_t data_offset, | ||
314 | const void *data, | ||
315 | size_t data_size, | ||
316 | enum GNUNET_PSYC_MessageFlags flags); | ||
317 | 291 | ||
318 | 292 | ||
319 | /** | 293 | /** |
@@ -329,7 +303,7 @@ typedef int | |||
329 | * @param data_size Number of bytes in @a data. | 303 | * @param data_size Number of bytes in @a data. |
330 | * @param jh Join handle to use with GNUNET_PSYC_join_decision() | 304 | * @param jh Join handle to use with GNUNET_PSYC_join_decision() |
331 | */ | 305 | */ |
332 | typedef int | 306 | typedef void |
333 | (*GNUNET_PSYC_JoinCallback) (void *cls, | 307 | (*GNUNET_PSYC_JoinCallback) (void *cls, |
334 | const struct GNUNET_CRYPTO_EddsaPublicKey | 308 | const struct GNUNET_CRYPTO_EddsaPublicKey |
335 | *slave_key, | 309 | *slave_key, |
@@ -413,7 +387,7 @@ typedef void | |||
413 | * one in the future. | 387 | * one in the future. |
414 | * @param policy Channel policy specifying join and history restrictions. | 388 | * @param policy Channel policy specifying join and history restrictions. |
415 | * Used to automate join decisions. | 389 | * Used to automate join decisions. |
416 | * @param method Function to invoke on messages received from slaves. | 390 | * @param message_cb Function to invoke on message parts received from slaves. |
417 | * @param join_cb Function to invoke when a peer wants to join. | 391 | * @param join_cb Function to invoke when a peer wants to join. |
418 | * @param master_started_cb Function to invoke after the channel master started. | 392 | * @param master_started_cb Function to invoke after the channel master started. |
419 | * @param cls Closure for @a method and @a join_cb. | 393 | * @param cls Closure for @a method and @a join_cb. |
@@ -423,7 +397,7 @@ struct GNUNET_PSYC_Master * | |||
423 | GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | 397 | GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, |
424 | const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key, | 398 | const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key, |
425 | enum GNUNET_PSYC_Policy policy, | 399 | enum GNUNET_PSYC_Policy policy, |
426 | GNUNET_PSYC_Method method, | 400 | GNUNET_PSYC_MessageCallback message_cb, |
427 | GNUNET_PSYC_JoinCallback join_cb, | 401 | GNUNET_PSYC_JoinCallback join_cb, |
428 | GNUNET_PSYC_MasterStartCallback master_started_cb, | 402 | GNUNET_PSYC_MasterStartCallback master_started_cb, |
429 | void *cls); | 403 | void *cls); |
@@ -449,7 +423,7 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
449 | */ | 423 | */ |
450 | typedef int | 424 | typedef int |
451 | (*GNUNET_PSYC_MasterTransmitNotify) (void *cls, | 425 | (*GNUNET_PSYC_MasterTransmitNotify) (void *cls, |
452 | size_t *data_size, | 426 | uint16_t *data_size, |
453 | void *data); | 427 | void *data); |
454 | 428 | ||
455 | 429 | ||
@@ -489,18 +463,17 @@ struct GNUNET_PSYC_MasterTransmitHandle; | |||
489 | * | 463 | * |
490 | * @param master Handle to the PSYC channel. | 464 | * @param master Handle to the PSYC channel. |
491 | * @param method_name Which method should be invoked. | 465 | * @param method_name Which method should be invoked. |
492 | * @param env Environment containing state operations and transient variables | 466 | * @param notify_mod Function to call to obtain modifiers. |
493 | * for the message, or NULL. | 467 | * @param notify_data Function to call to obtain fragments of the data. |
494 | * @param notify Function to call to obtain the arguments. | 468 | * @param notify_cls Closure for @a notify_mod and @a notify_data. |
495 | * @param notify_cls Closure for @a notify. | ||
496 | * @param flags Flags for the message being transmitted. | 469 | * @param flags Flags for the message being transmitted. |
497 | * @return Transmission handle, NULL on error (i.e. more than one request queued). | 470 | * @return Transmission handle, NULL on error (i.e. more than one request queued). |
498 | */ | 471 | */ |
499 | struct GNUNET_PSYC_MasterTransmitHandle * | 472 | struct GNUNET_PSYC_MasterTransmitHandle * |
500 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | 473 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, |
501 | const char *method_name, | 474 | const char *method_name, |
502 | const struct GNUNET_ENV_Environment *env, | 475 | GNUNET_PSYC_MasterTransmitNotify notify_mod, |
503 | GNUNET_PSYC_MasterTransmitNotify notify, | 476 | GNUNET_PSYC_MasterTransmitNotify notify_data, |
504 | void *notify_cls, | 477 | void *notify_cls, |
505 | enum GNUNET_PSYC_MasterTransmitFlags flags); | 478 | enum GNUNET_PSYC_MasterTransmitFlags flags); |
506 | 479 | ||
@@ -567,12 +540,13 @@ typedef void | |||
567 | * @param relay_count Number of peers in the @a relays array. | 540 | * @param relay_count Number of peers in the @a relays array. |
568 | * @param relays Peer identities of members of the multicast group, which serve | 541 | * @param relays Peer identities of members of the multicast group, which serve |
569 | * as relays and used to join the group at. | 542 | * as relays and used to join the group at. |
570 | * @param method Function to invoke on messages received from the channel, | 543 | * @param message_cb Function to invoke on message parts received from the |
571 | * typically at least contains functions for @e join and @e part. | 544 | * channel, typically at least contains method handlers for @e join and |
545 | * @e part. | ||
572 | * @param join_cb function invoked once we have joined with the current | 546 | * @param join_cb function invoked once we have joined with the current |
573 | * message ID of the channel | 547 | * message ID of the channel |
574 | * @param slave_joined_cb Function to invoke when a peer wants to join. | 548 | * @param slave_joined_cb Function to invoke when a peer wants to join. |
575 | * @param cls Closure for @a method_cb and @a slave_joined_cb. | 549 | * @param cls Closure for @a message_cb and @a slave_joined_cb. |
576 | * @param method_name Method name for the join request. | 550 | * @param method_name Method name for the join request. |
577 | * @param env Environment containing transient variables for the request, or NULL. | 551 | * @param env Environment containing transient variables for the request, or NULL. |
578 | * @param data Payload for the join message. | 552 | * @param data Payload for the join message. |
@@ -586,7 +560,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
586 | const struct GNUNET_PeerIdentity *origin, | 560 | const struct GNUNET_PeerIdentity *origin, |
587 | uint32_t relay_count, | 561 | uint32_t relay_count, |
588 | const struct GNUNET_PeerIdentity *relays, | 562 | const struct GNUNET_PeerIdentity *relays, |
589 | GNUNET_PSYC_Method method, | 563 | GNUNET_PSYC_MessageCallback message_cb, |
590 | GNUNET_PSYC_JoinCallback join_cb, | 564 | GNUNET_PSYC_JoinCallback join_cb, |
591 | GNUNET_PSYC_SlaveJoinCallback slave_joined_cb, | 565 | GNUNET_PSYC_SlaveJoinCallback slave_joined_cb, |
592 | void *cls, | 566 | void *cls, |
@@ -809,7 +783,7 @@ struct GNUNET_PSYC_Story; | |||
809 | * @param channel Which channel should be replayed? | 783 | * @param channel Which channel should be replayed? |
810 | * @param start_message_id Earliest interesting point in history. | 784 | * @param start_message_id Earliest interesting point in history. |
811 | * @param end_message_id Last (exclusive) interesting point in history. | 785 | * @param end_message_id Last (exclusive) interesting point in history. |
812 | * @param method Function to invoke on messages received from the story. | 786 | * @param message_cb Function to invoke on message parts received from the story. |
813 | * @param finish_cb Function to call when the requested story has been fully | 787 | * @param finish_cb Function to call when the requested story has been fully |
814 | * told (counting message IDs might not suffice, as some messages | 788 | * told (counting message IDs might not suffice, as some messages |
815 | * might be secret and thus the listener would not know the story is | 789 | * might be secret and thus the listener would not know the story is |
@@ -823,8 +797,8 @@ struct GNUNET_PSYC_Story * | |||
823 | GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel, | 797 | GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel, |
824 | uint64_t start_message_id, | 798 | uint64_t start_message_id, |
825 | uint64_t end_message_id, | 799 | uint64_t end_message_id, |
826 | GNUNET_PSYC_Method method, | 800 | GNUNET_PSYC_MessageCallback message_cb, |
827 | GNUNET_PSYC_FinishCallback *finish_cb, | 801 | GNUNET_PSYC_FinishCallback finish_cb, |
828 | void *cls); | 802 | void *cls); |
829 | 803 | ||
830 | 804 | ||
diff --git a/src/include/gnunet_social_service.h b/src/include/gnunet_social_service.h index b0c7b63cd..8bd1a959f 100644 --- a/src/include/gnunet_social_service.h +++ b/src/include/gnunet_social_service.h | |||
@@ -587,7 +587,7 @@ struct GNUNET_SOCIAL_HistoryLesson; | |||
587 | * @param slicer Slicer to use to process history. Can be the same as the | 587 | * @param slicer Slicer to use to process history. Can be the same as the |
588 | * slicer of the place, as the HISTORIC flag allows distinguishing | 588 | * slicer of the place, as the HISTORIC flag allows distinguishing |
589 | * old messages from fresh ones. | 589 | * old messages from fresh ones. |
590 | * @param finish_cb Function called after the last message if the history lesson | 590 | * @param finish_cb Function called after the last message in the history lesson |
591 | * is passed through the @a slicer. NULL if not needed. | 591 | * is passed through the @a slicer. NULL if not needed. |
592 | * @param finish_cb_cls Closure for @a finish_cb. | 592 | * @param finish_cb_cls Closure for @a finish_cb. |
593 | * @return Handle to abort history lesson, never NULL (multiple lessons | 593 | * @return Handle to abort history lesson, never NULL (multiple lessons |
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index 40fb6cc33..6b784c2f0 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c | |||
@@ -361,14 +361,13 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc | |||
361 | struct GNUNET_MULTICAST_Origin *orig = cls; | 361 | struct GNUNET_MULTICAST_Origin *orig = cls; |
362 | struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle; | 362 | struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle; |
363 | 363 | ||
364 | size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; | 364 | size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; |
365 | struct GNUNET_MULTICAST_MessageHeader *msg | 365 | struct GNUNET_MULTICAST_MessageHeader *msg |
366 | = GNUNET_malloc (buf_size); | 366 | = GNUNET_malloc (buf_size); |
367 | buf_size -= sizeof (*msg); | ||
368 | int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]); | 367 | int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]); |
369 | 368 | ||
370 | if (! (GNUNET_YES == ret || GNUNET_NO == ret) | 369 | if (! (GNUNET_YES == ret || GNUNET_NO == ret) |
371 | || sizeof (*msg) + buf_size > GNUNET_MULTICAST_FRAGMENT_MAX_SIZE) | 370 | || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size) |
372 | { | 371 | { |
373 | LOG (GNUNET_ERROR_TYPE_ERROR, | 372 | LOG (GNUNET_ERROR_TYPE_ERROR, |
374 | "MasterTransmitNotify() returned error or invalid message size.\n"); | 373 | "MasterTransmitNotify() returned error or invalid message size.\n"); |
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index a3b1b8f82..628c39900 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c | |||
@@ -71,9 +71,9 @@ struct TransmitMessage | |||
71 | char *buf; | 71 | char *buf; |
72 | uint16_t size; | 72 | uint16_t size; |
73 | /** | 73 | /** |
74 | * enum GNUNET_PSYC_DataStatus | 74 | * enum MessageState |
75 | */ | 75 | */ |
76 | uint8_t status; | 76 | uint8_t state; |
77 | }; | 77 | }; |
78 | 78 | ||
79 | /** | 79 | /** |
@@ -87,12 +87,21 @@ struct Channel | |||
87 | struct TransmitMessage *tmit_tail; | 87 | struct TransmitMessage *tmit_tail; |
88 | 88 | ||
89 | GNUNET_SCHEDULER_TaskIdentifier tmit_task; | 89 | GNUNET_SCHEDULER_TaskIdentifier tmit_task; |
90 | uint32_t tmit_mod_count; | 90 | |
91 | uint32_t tmit_mod_recvd; | ||
92 | /** | 91 | /** |
93 | * enum GNUNET_PSYC_DataStatus | 92 | * Expected value size for the modifier being received from the PSYC service. |
94 | */ | 93 | */ |
95 | uint8_t tmit_status; | 94 | uint32_t tmit_mod_value_size_expected; |
95 | |||
96 | /** | ||
97 | * Actual value size for the modifier being received from the PSYC service. | ||
98 | */ | ||
99 | uint32_t tmit_mod_value_size; | ||
100 | |||
101 | /** | ||
102 | * enum MessageState | ||
103 | */ | ||
104 | uint8_t tmit_state; | ||
96 | 105 | ||
97 | uint8_t in_transmit; | 106 | uint8_t in_transmit; |
98 | uint8_t is_master; | 107 | uint8_t is_master; |
@@ -112,12 +121,27 @@ struct Master | |||
112 | struct GNUNET_MULTICAST_Origin *origin; | 121 | struct GNUNET_MULTICAST_Origin *origin; |
113 | struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; | 122 | struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; |
114 | 123 | ||
124 | /** | ||
125 | * Maximum message ID for this channel. | ||
126 | * | ||
127 | * Incremented before sending a message, thus the message_id in messages sent | ||
128 | * starts from 1. | ||
129 | */ | ||
115 | uint64_t max_message_id; | 130 | uint64_t max_message_id; |
131 | |||
132 | /** | ||
133 | * ID of the last message that contains any state operations. | ||
134 | * 0 if there is no such message. | ||
135 | */ | ||
116 | uint64_t max_state_message_id; | 136 | uint64_t max_state_message_id; |
137 | |||
138 | /** | ||
139 | * Maximum group generation for this channel. | ||
140 | */ | ||
117 | uint64_t max_group_generation; | 141 | uint64_t max_group_generation; |
118 | 142 | ||
119 | /** | 143 | /** |
120 | * enum GNUNET_PSYC_Policy | 144 | * @see enum GNUNET_PSYC_Policy |
121 | */ | 145 | */ |
122 | uint32_t policy; | 146 | uint32_t policy; |
123 | }; | 147 | }; |
@@ -196,6 +220,7 @@ client_cleanup (struct Channel *ch) | |||
196 | GNUNET_free (ch); | 220 | GNUNET_free (ch); |
197 | } | 221 | } |
198 | 222 | ||
223 | |||
199 | /** | 224 | /** |
200 | * Called whenever a client is disconnected. | 225 | * Called whenever a client is disconnected. |
201 | * Frees our resources associated with that client. | 226 | * Frees our resources associated with that client. |
@@ -234,7 +259,8 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
234 | } | 259 | } |
235 | } | 260 | } |
236 | 261 | ||
237 | void | 262 | |
263 | static void | ||
238 | join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, | 264 | join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, |
239 | const struct GNUNET_MessageHeader *join_req, | 265 | const struct GNUNET_MessageHeader *join_req, |
240 | struct GNUNET_MULTICAST_JoinHandle *jh) | 266 | struct GNUNET_MULTICAST_JoinHandle *jh) |
@@ -242,7 +268,8 @@ join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, | |||
242 | 268 | ||
243 | } | 269 | } |
244 | 270 | ||
245 | void | 271 | |
272 | static void | ||
246 | membership_test_cb (void *cls, | 273 | membership_test_cb (void *cls, |
247 | const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, | 274 | const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, |
248 | uint64_t message_id, uint64_t group_generation, | 275 | uint64_t message_id, uint64_t group_generation, |
@@ -251,16 +278,18 @@ membership_test_cb (void *cls, | |||
251 | 278 | ||
252 | } | 279 | } |
253 | 280 | ||
254 | void | 281 | |
282 | static void | ||
255 | replay_fragment_cb (void *cls, | 283 | replay_fragment_cb (void *cls, |
256 | const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, | 284 | const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, |
257 | uint64_t fragment_id, uint64_t flags, | 285 | uint64_t fragment_id, uint64_t flags, |
258 | struct GNUNET_MULTICAST_ReplayHandle *rh) | 286 | struct GNUNET_MULTICAST_ReplayHandle *rh) |
259 | { | ||
260 | 287 | ||
288 | { | ||
261 | } | 289 | } |
262 | 290 | ||
263 | void | 291 | |
292 | static void | ||
264 | replay_message_cb (void *cls, | 293 | replay_message_cb (void *cls, |
265 | const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, | 294 | const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, |
266 | uint64_t message_id, | 295 | uint64_t message_id, |
@@ -271,56 +300,30 @@ replay_message_cb (void *cls, | |||
271 | 300 | ||
272 | } | 301 | } |
273 | 302 | ||
274 | void | ||
275 | request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, | ||
276 | const struct GNUNET_MessageHeader *req, | ||
277 | enum GNUNET_MULTICAST_MessageFlags flags) | ||
278 | { | ||
279 | |||
280 | } | ||
281 | |||
282 | 303 | ||
283 | void | 304 | static void |
284 | fragment_store_result (void *cls, int64_t result, const char *err_msg) | 305 | fragment_store_result (void *cls, int64_t result, const char *err_msg) |
285 | { | 306 | { |
286 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 307 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
287 | "fragment_store() returned %l (%s)\n", result, err_msg); | 308 | "fragment_store() returned %l (%s)\n", result, err_msg); |
288 | } | 309 | } |
289 | 310 | ||
311 | |||
290 | /** | 312 | /** |
291 | * Send PSYC messages in an incoming multicast message to a client. | 313 | * Iterator callback for sending a message to a client. |
314 | * | ||
315 | * @see message_cb() | ||
292 | */ | 316 | */ |
293 | int | 317 | static int |
294 | send_to_client (void *cls, const struct GNUNET_HashCode *ch_key_hash, void *chan) | 318 | message_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash, |
319 | void *chan) | ||
295 | { | 320 | { |
296 | const struct GNUNET_MULTICAST_MessageHeader *msg = cls; | 321 | const struct GNUNET_MessageHeader *msg = cls; |
297 | struct Channel *ch = chan; | 322 | struct Channel *ch = chan; |
298 | 323 | ||
299 | uint16_t size = ntohs (msg->header.size); | 324 | GNUNET_SERVER_notification_context_add (nc, ch->client); |
300 | uint16_t pos = 0; | 325 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO); |
301 | |||
302 | while (sizeof (*msg) + pos < size) | ||
303 | { | ||
304 | const struct GNUNET_MessageHeader *pmsg | ||
305 | = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); | ||
306 | uint16_t psize = ntohs (pmsg->size); | ||
307 | if (sizeof (*msg) + pos + psize > size) | ||
308 | { | ||
309 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
310 | "Ignoring message of type %u with invalid size. " | ||
311 | "(%u + %u + %u > %u)\n", ntohs (pmsg->type), | ||
312 | sizeof (*msg), pos, psize, size); | ||
313 | break; | ||
314 | } | ||
315 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
316 | "Sending message of type %u and size %u to client.\n", | ||
317 | ntohs (pmsg->type), psize); | ||
318 | 326 | ||
319 | GNUNET_SERVER_notification_context_add (nc, ch->client); | ||
320 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, pmsg, | ||
321 | GNUNET_NO); | ||
322 | pos += psize; | ||
323 | } | ||
324 | return GNUNET_YES; | 327 | return GNUNET_YES; |
325 | } | 328 | } |
326 | 329 | ||
@@ -330,7 +333,7 @@ send_to_client (void *cls, const struct GNUNET_HashCode *ch_key_hash, void *chan | |||
330 | * | 333 | * |
331 | * Store it using PSYCstore and send it to all clients of the channel. | 334 | * Store it using PSYCstore and send it to all clients of the channel. |
332 | */ | 335 | */ |
333 | void | 336 | static void |
334 | message_cb (void *cls, const struct GNUNET_MessageHeader *msg) | 337 | message_cb (void *cls, const struct GNUNET_MessageHeader *msg) |
335 | { | 338 | { |
336 | uint16_t type = ntohs (msg->type); | 339 | uint16_t type = ntohs (msg->type); |
@@ -344,34 +347,100 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg) | |||
344 | struct Master *mst = cls; | 347 | struct Master *mst = cls; |
345 | struct Slave *slv = cls; | 348 | struct Slave *slv = cls; |
346 | 349 | ||
347 | struct GNUNET_CRYPTO_EddsaPublicKey *ch_key | 350 | /* const struct GNUNET_MULTICAST_MessageHeader *mmsg |
351 | = (const struct GNUNET_MULTICAST_MessageHeader *) msg; */ | ||
352 | struct GNUNET_CRYPTO_EddsaPublicKey *chan_key | ||
348 | = ch->is_master ? &mst->pub_key : &slv->chan_key; | 353 | = ch->is_master ? &mst->pub_key : &slv->chan_key; |
349 | struct GNUNET_HashCode *ch_key_hash | 354 | struct GNUNET_HashCode *chan_key_hash |
350 | = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash; | 355 | = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash; |
351 | 356 | ||
352 | switch (type) | 357 | switch (type) |
353 | { | 358 | { |
354 | case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: | 359 | case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: |
355 | GNUNET_PSYCSTORE_fragment_store (store, ch_key, | 360 | { |
361 | GNUNET_PSYCSTORE_fragment_store (store, chan_key, | ||
356 | (const struct | 362 | (const struct |
357 | GNUNET_MULTICAST_MessageHeader *) msg, | 363 | GNUNET_MULTICAST_MessageHeader *) msg, |
358 | 0, NULL, NULL); | 364 | 0, NULL, NULL); |
359 | GNUNET_CONTAINER_multihashmap_get_multiple (clients, ch_key_hash, | ||
360 | send_to_client, (void *) msg); | ||
361 | break; | ||
362 | 365 | ||
366 | uint16_t size = ntohs (msg->size); | ||
367 | uint16_t psize = 0; | ||
368 | uint16_t pos = 0; | ||
369 | |||
370 | for (pos = 0; sizeof (*msg) + pos < size; pos += psize) | ||
371 | { | ||
372 | const struct GNUNET_MessageHeader *pmsg | ||
373 | = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); | ||
374 | uint16_t psize = ntohs (pmsg->size); | ||
375 | if (sizeof (*msg) + pos + psize > size) | ||
376 | { | ||
377 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
378 | "Message received from multicast contains invalid PSYC " | ||
379 | "message. Not sending to clients.\n"); | ||
380 | return; | ||
381 | } | ||
382 | } | ||
383 | |||
384 | #if TODO | ||
385 | /* FIXME: apply modifiers to state in PSYCstore */ | ||
386 | GNUNET_PSYCSTORE_state_modify (store, chan_key, | ||
387 | GNUNET_ntohll (mmsg->message_id), | ||
388 | meth->mod_count, mods, | ||
389 | rcb, rcb_cls); | ||
390 | #endif | ||
391 | |||
392 | const struct GNUNET_MULTICAST_MessageHeader *mmsg | ||
393 | = (const struct GNUNET_MULTICAST_MessageHeader *) msg; | ||
394 | struct GNUNET_PSYC_MessageHeader *pmsg; | ||
395 | |||
396 | psize = sizeof (*pmsg) + size - sizeof (*mmsg); | ||
397 | pmsg = GNUNET_malloc (psize); | ||
398 | pmsg->header.size = htons (psize); | ||
399 | pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); | ||
400 | pmsg->message_id = mmsg->message_id; | ||
401 | |||
402 | memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); | ||
403 | |||
404 | GNUNET_CONTAINER_multihashmap_get_multiple (clients, chan_key_hash, | ||
405 | message_to_client, | ||
406 | (void *) pmsg); | ||
407 | GNUNET_free (pmsg); | ||
408 | break; | ||
409 | } | ||
363 | default: | 410 | default: |
364 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 411 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
365 | "Ignoring unknown message of type %u and size %u.\n", | 412 | "Discarding unknown message of type %u and size %u.\n", |
366 | type, size); | 413 | type, size); |
367 | } | 414 | } |
368 | } | 415 | } |
369 | 416 | ||
370 | 417 | ||
371 | /** | 418 | /** |
419 | * Send a request received from multicast to a client. | ||
420 | */ | ||
421 | static int | ||
422 | request_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash, | ||
423 | void *chan) | ||
424 | { | ||
425 | /* TODO */ | ||
426 | |||
427 | return GNUNET_YES; | ||
428 | } | ||
429 | |||
430 | |||
431 | static void | ||
432 | request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, | ||
433 | const struct GNUNET_MessageHeader *req, | ||
434 | enum GNUNET_MULTICAST_MessageFlags flags) | ||
435 | { | ||
436 | |||
437 | } | ||
438 | |||
439 | |||
440 | /** | ||
372 | * Response from PSYCstore with the current counter values for a channel master. | 441 | * Response from PSYCstore with the current counter values for a channel master. |
373 | */ | 442 | */ |
374 | void | 443 | static void |
375 | master_counters_cb (void *cls, int result, uint64_t max_fragment_id, | 444 | master_counters_cb (void *cls, int result, uint64_t max_fragment_id, |
376 | uint64_t max_message_id, uint64_t max_group_generation, | 445 | uint64_t max_message_id, uint64_t max_group_generation, |
377 | uint64_t max_state_message_id) | 446 | uint64_t max_state_message_id) |
@@ -513,20 +582,13 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, | |||
513 | static void | 582 | static void |
514 | send_transmit_ack (struct Channel *ch) | 583 | send_transmit_ack (struct Channel *ch) |
515 | { | 584 | { |
516 | struct TransmitAck *res = GNUNET_malloc (sizeof (*res)); | 585 | struct GNUNET_MessageHeader res; |
517 | res->header.size = htons (sizeof (*res)); | 586 | res.size = htons (sizeof (res)); |
518 | res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK); | 587 | res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK); |
519 | |||
520 | res->buf_avail = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; | ||
521 | struct TransmitMessage *tmit_msg = ch->tmit_tail; | ||
522 | if (NULL != tmit_msg && GNUNET_PSYC_DATA_CONT == tmit_msg->status) | ||
523 | res->buf_avail -= tmit_msg->size; | ||
524 | res->buf_avail = htons (res->buf_avail); | ||
525 | 588 | ||
526 | GNUNET_SERVER_notification_context_add (nc, ch->client); | 589 | GNUNET_SERVER_notification_context_add (nc, ch->client); |
527 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, | 590 | GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res, |
528 | GNUNET_NO); | 591 | GNUNET_NO); |
529 | GNUNET_free (res); | ||
530 | } | 592 | } |
531 | 593 | ||
532 | 594 | ||
@@ -555,7 +617,7 @@ transmit_notify (void *cls, size_t *data_size, void *data) | |||
555 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg); | 617 | GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg); |
556 | GNUNET_free (msg); | 618 | GNUNET_free (msg); |
557 | 619 | ||
558 | int ret = (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES; | 620 | int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES; |
559 | 621 | ||
560 | if (0 == ch->tmit_task) | 622 | if (0 == ch->tmit_task) |
561 | { | 623 | { |
@@ -638,6 +700,7 @@ transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay) | |||
638 | : GNUNET_SCHEDULER_add_delayed (delay, slave_transmit_message, ch); | 700 | : GNUNET_SCHEDULER_add_delayed (delay, slave_transmit_message, ch); |
639 | } | 701 | } |
640 | 702 | ||
703 | |||
641 | /** | 704 | /** |
642 | * Queue incoming message parts from a client for transmission, and send them to | 705 | * Queue incoming message parts from a client for transmission, and send them to |
643 | * the multicast group when the buffer is full or reached the end of message. | 706 | * the multicast group when the buffer is full or reached the end of message. |
@@ -659,21 +722,20 @@ queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg) | |||
659 | "for transmission to multicast.\n", | 722 | "for transmission to multicast.\n", |
660 | ntohs (msg->type), size); | 723 | ntohs (msg->type), size); |
661 | 724 | ||
662 | if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size) | 725 | if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size) |
663 | return GNUNET_SYSERR; | 726 | return GNUNET_SYSERR; |
664 | 727 | ||
665 | if (NULL == tmit_msg | 728 | if (NULL == tmit_msg |
666 | || tmit_msg->status != GNUNET_PSYC_DATA_CONT | 729 | || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit_msg->size + size) |
667 | || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < tmit_msg->size + size) | ||
668 | { | 730 | { |
669 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 731 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
670 | "Appending message qto new buffer.\n"); | 732 | "Appending message to new buffer.\n"); |
671 | /* Start filling up new buffer */ | 733 | /* Start filling up new buffer */ |
672 | tmit_msg = GNUNET_new (struct TransmitMessage); | 734 | tmit_msg = GNUNET_new (struct TransmitMessage); |
673 | tmit_msg->buf = GNUNET_malloc (size); | 735 | tmit_msg->buf = GNUNET_malloc (size); |
674 | memcpy (tmit_msg->buf, msg, size); | 736 | memcpy (tmit_msg->buf, msg, size); |
675 | tmit_msg->size = size; | 737 | tmit_msg->size = size; |
676 | tmit_msg->status = ch->tmit_status; | 738 | tmit_msg->state = ch->tmit_state; |
677 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); | 739 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); |
678 | } | 740 | } |
679 | else | 741 | else |
@@ -684,23 +746,41 @@ queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg) | |||
684 | tmit_msg->buf = GNUNET_realloc (tmit_msg->buf, tmit_msg->size + size); | 746 | tmit_msg->buf = GNUNET_realloc (tmit_msg->buf, tmit_msg->size + size); |
685 | memcpy (tmit_msg->buf + tmit_msg->size, msg, size); | 747 | memcpy (tmit_msg->buf + tmit_msg->size, msg, size); |
686 | tmit_msg->size += size; | 748 | tmit_msg->size += size; |
687 | tmit_msg->status = ch->tmit_status; | 749 | tmit_msg->state = ch->tmit_state; |
688 | } | 750 | } |
689 | 751 | ||
690 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmit_size: %u\n", tmit_msg->size); | 752 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmit_size: %u\n", tmit_msg->size); |
691 | 753 | ||
692 | /* Wait a bit for the remaining message parts from the client | 754 | /* Wait a bit for the remaining message parts from the client |
693 | if there's still some space left in the buffer. */ | 755 | if there's still some space left in the buffer. */ |
694 | if (GNUNET_PSYC_DATA_CONT == tmit_msg->status | 756 | if (tmit_msg->state < MSG_STATE_END |
695 | && (tmit_msg->size + sizeof (struct GNUNET_PSYC_MessageData) | 757 | && (tmit_msg->size + sizeof (struct GNUNET_MessageHeader) |
696 | < GNUNET_MULTICAST_FRAGMENT_MAX_SIZE)) | 758 | < GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD)) |
759 | { | ||
697 | tmit_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2); | 760 | tmit_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2); |
761 | } | ||
762 | else | ||
763 | { | ||
764 | send_transmit_ack (ch); | ||
765 | } | ||
698 | 766 | ||
699 | transmit_message (ch, tmit_delay); | 767 | transmit_message (ch, tmit_delay); |
700 | 768 | ||
701 | return GNUNET_OK; | 769 | return GNUNET_OK; |
702 | } | 770 | } |
703 | 771 | ||
772 | |||
773 | static void | ||
774 | transmit_error (struct Channel *ch) | ||
775 | { | ||
776 | struct GNUNET_MessageHeader *msg = GNUNET_malloc (sizeof (*msg)); | ||
777 | msg->size = ntohs (sizeof (*msg)); | ||
778 | msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | ||
779 | queue_message (ch, msg); | ||
780 | |||
781 | GNUNET_SERVER_client_disconnect (ch->client); | ||
782 | } | ||
783 | |||
704 | /** | 784 | /** |
705 | * Incoming method from a client. | 785 | * Incoming method from a client. |
706 | */ | 786 | */ |
@@ -708,28 +788,21 @@ static void | |||
708 | handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, | 788 | handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, |
709 | const struct GNUNET_MessageHeader *msg) | 789 | const struct GNUNET_MessageHeader *msg) |
710 | { | 790 | { |
711 | const struct GNUNET_PSYC_MessageMethod *meth | 791 | /* const struct GNUNET_PSYC_MessageMethod *meth |
712 | = (const struct GNUNET_PSYC_MessageMethod *) msg; | 792 | = (const struct GNUNET_PSYC_MessageMethod *) msg; */ |
713 | struct Channel *ch | 793 | struct Channel *ch |
714 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); | 794 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
715 | GNUNET_assert (NULL != ch); | 795 | GNUNET_assert (NULL != ch); |
716 | 796 | ||
717 | if (GNUNET_NO != ch->in_transmit) | 797 | if (MSG_STATE_START != ch->tmit_state) |
718 | { | 798 | { |
719 | /* FIXME: already transmitting a message, send back error message. */ | 799 | transmit_error (ch); |
720 | return; | 800 | return; |
721 | } | 801 | } |
722 | 802 | ch->tmit_state = MSG_STATE_METHOD; | |
723 | ch->in_transmit = GNUNET_YES; | ||
724 | ch->tmit_mod_recvd = 0; | ||
725 | ch->tmit_mod_count = ntohl (meth->mod_count); | ||
726 | ch->tmit_status = GNUNET_PSYC_DATA_CONT; | ||
727 | 803 | ||
728 | queue_message (ch, msg); | 804 | queue_message (ch, msg); |
729 | 805 | send_transmit_ack (ch); | |
730 | if (0 == ch->tmit_mod_count) | ||
731 | send_transmit_ack (ch); | ||
732 | |||
733 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 806 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
734 | }; | 807 | }; |
735 | 808 | ||
@@ -741,20 +814,52 @@ static void | |||
741 | handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client, | 814 | handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client, |
742 | const struct GNUNET_MessageHeader *msg) | 815 | const struct GNUNET_MessageHeader *msg) |
743 | { | 816 | { |
744 | /* | ||
745 | const struct GNUNET_PSYC_MessageModifier *mod | 817 | const struct GNUNET_PSYC_MessageModifier *mod |
746 | = (const struct GNUNET_PSYC_MessageModifier *) msg; | 818 | = (const struct GNUNET_PSYC_MessageModifier *) msg; |
747 | */ | 819 | |
748 | struct Channel *ch | 820 | struct Channel *ch |
749 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); | 821 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
750 | GNUNET_assert (NULL != ch); | 822 | GNUNET_assert (NULL != ch); |
751 | 823 | ||
752 | ch->tmit_mod_recvd++; | 824 | if (MSG_STATE_METHOD != ch->tmit_state |
825 | || MSG_STATE_MODIFIER != ch->tmit_state | ||
826 | || MSG_STATE_MOD_CONT != ch->tmit_state | ||
827 | || ch->tmit_mod_value_size_expected != ch->tmit_mod_value_size) | ||
828 | { | ||
829 | transmit_error (ch); | ||
830 | return; | ||
831 | } | ||
832 | ch->tmit_mod_value_size_expected = ntohl (mod->value_size); | ||
833 | ch->tmit_mod_value_size = ntohs (msg->size) - ntohs(mod->name_size) - 1; | ||
834 | |||
753 | queue_message (ch, msg); | 835 | queue_message (ch, msg); |
836 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
837 | }; | ||
754 | 838 | ||
755 | if (ch->tmit_mod_recvd == ch->tmit_mod_count) | ||
756 | send_transmit_ack (ch); | ||
757 | 839 | ||
840 | /** | ||
841 | * Incoming modifier from a client. | ||
842 | */ | ||
843 | static void | ||
844 | handle_transmit_mod_cont (void *cls, struct GNUNET_SERVER_Client *client, | ||
845 | const struct GNUNET_MessageHeader *msg) | ||
846 | { | ||
847 | struct Channel *ch | ||
848 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); | ||
849 | GNUNET_assert (NULL != ch); | ||
850 | |||
851 | ch->tmit_mod_value_size += ntohs (msg->size); | ||
852 | |||
853 | if (MSG_STATE_MODIFIER != ch->tmit_state | ||
854 | || MSG_STATE_MOD_CONT != ch->tmit_state | ||
855 | || ch->tmit_mod_value_size_expected < ch->tmit_mod_value_size) | ||
856 | { | ||
857 | transmit_error (ch); | ||
858 | return; | ||
859 | } | ||
860 | ch->tmit_state = MSG_STATE_MOD_CONT; | ||
861 | |||
862 | queue_message (ch, msg); | ||
758 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 863 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
759 | }; | 864 | }; |
760 | 865 | ||
@@ -766,18 +871,25 @@ static void | |||
766 | handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client, | 871 | handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client, |
767 | const struct GNUNET_MessageHeader *msg) | 872 | const struct GNUNET_MessageHeader *msg) |
768 | { | 873 | { |
769 | const struct GNUNET_PSYC_MessageData *data | ||
770 | = (const struct GNUNET_PSYC_MessageData *) msg; | ||
771 | struct Channel *ch | 874 | struct Channel *ch |
772 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); | 875 | = GNUNET_SERVER_client_get_user_context (client, struct Channel); |
773 | GNUNET_assert (NULL != ch); | 876 | GNUNET_assert (NULL != ch); |
774 | 877 | ||
775 | ch->tmit_status = ntohs (data->status); | 878 | if (MSG_STATE_METHOD != ch->tmit_state |
879 | || MSG_STATE_MODIFIER != ch->tmit_state | ||
880 | || MSG_STATE_MOD_CONT != ch->tmit_state | ||
881 | || ch->tmit_mod_value_size_expected != ch->tmit_mod_value_size) | ||
882 | { | ||
883 | transmit_error (ch); | ||
884 | return; | ||
885 | } | ||
886 | ch->tmit_state = MSG_STATE_DATA; | ||
887 | |||
776 | queue_message (ch, msg); | 888 | queue_message (ch, msg); |
777 | send_transmit_ack (ch); | 889 | send_transmit_ack (ch); |
778 | 890 | ||
779 | if (GNUNET_PSYC_DATA_CONT != ch->tmit_status) | 891 | if (MSG_STATE_END <= ch->tmit_state) |
780 | ch->in_transmit = GNUNET_NO; | 892 | ch->tmit_state = MSG_STATE_START; |
781 | 893 | ||
782 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 894 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
783 | }; | 895 | }; |
@@ -800,16 +912,21 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, | |||
800 | 912 | ||
801 | { &handle_slave_join, NULL, | 913 | { &handle_slave_join, NULL, |
802 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, | 914 | GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, |
803 | 915 | #if TODO | |
916 | { &handle_psyc_message, NULL, | ||
917 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }, | ||
918 | #endif | ||
804 | { &handle_transmit_method, NULL, | 919 | { &handle_transmit_method, NULL, |
805 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 }, | 920 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 }, |
806 | 921 | ||
807 | { &handle_transmit_modifier, NULL, | 922 | { &handle_transmit_modifier, NULL, |
808 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 }, | 923 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 }, |
809 | 924 | ||
925 | { &handle_transmit_mod_cont, NULL, | ||
926 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT, 0 }, | ||
927 | |||
810 | { &handle_transmit_data, NULL, | 928 | { &handle_transmit_data, NULL, |
811 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 }, | 929 | GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 }, |
812 | |||
813 | { NULL, NULL, 0, 0 } | 930 | { NULL, NULL, 0, 0 } |
814 | }; | 931 | }; |
815 | 932 | ||
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h index 85d10858e..90c07480a 100644 --- a/src/psyc/psyc.h +++ b/src/psyc/psyc.h | |||
@@ -29,6 +29,20 @@ | |||
29 | 29 | ||
30 | #include "gnunet_common.h" | 30 | #include "gnunet_common.h" |
31 | 31 | ||
32 | |||
33 | enum MessageState | ||
34 | { | ||
35 | MSG_STATE_START = 0, | ||
36 | MSG_STATE_HEADER = 1, | ||
37 | MSG_STATE_METHOD = 2, | ||
38 | MSG_STATE_MODIFIER = 3, | ||
39 | MSG_STATE_MOD_CONT = 4, | ||
40 | MSG_STATE_DATA = 5, | ||
41 | MSG_STATE_END = 6, | ||
42 | MSG_STATE_CANCEL = 7, | ||
43 | }; | ||
44 | |||
45 | |||
32 | GNUNET_NETWORK_STRUCT_BEGIN | 46 | GNUNET_NETWORK_STRUCT_BEGIN |
33 | 47 | ||
34 | /**** service -> library ****/ | 48 | /**** service -> library ****/ |
@@ -53,8 +67,7 @@ struct OperationResult | |||
53 | */ | 67 | */ |
54 | int64_t result_code GNUNET_PACKED; | 68 | int64_t result_code GNUNET_PACKED; |
55 | 69 | ||
56 | /* followed by 0-terminated error message (on error) */ | 70 | /* followed by NUL-terminated error message (on error) */ |
57 | |||
58 | }; | 71 | }; |
59 | 72 | ||
60 | 73 | ||
@@ -74,6 +87,7 @@ struct CountersResult | |||
74 | }; | 87 | }; |
75 | 88 | ||
76 | 89 | ||
90 | #if REMOVE | ||
77 | /** | 91 | /** |
78 | * Transmit acknowledgment. | 92 | * Transmit acknowledgment. |
79 | * | 93 | * |
@@ -95,6 +109,7 @@ struct TransmitAck | |||
95 | */ | 109 | */ |
96 | uint16_t buf_avail; | 110 | uint16_t buf_avail; |
97 | }; | 111 | }; |
112 | #endif | ||
98 | 113 | ||
99 | 114 | ||
100 | /**** library -> service ****/ | 115 | /**** library -> service ****/ |
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 290f3e375..a5a01fa92 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c | |||
@@ -30,15 +30,17 @@ | |||
30 | * @author Gabor X Toth | 30 | * @author Gabor X Toth |
31 | */ | 31 | */ |
32 | 32 | ||
33 | #include <inttypes.h> | ||
34 | |||
33 | #include "platform.h" | 35 | #include "platform.h" |
34 | #include "gnunet_util_lib.h" | 36 | #include "gnunet_util_lib.h" |
35 | #include "gnunet_env_lib.h" | 37 | #include "gnunet_env_lib.h" |
38 | #include "gnunet_multicast_service.h" | ||
36 | #include "gnunet_psyc_service.h" | 39 | #include "gnunet_psyc_service.h" |
37 | #include "psyc.h" | 40 | #include "psyc.h" |
38 | 41 | ||
39 | #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) | 42 | #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) |
40 | 43 | ||
41 | |||
42 | struct OperationHandle | 44 | struct OperationHandle |
43 | { | 45 | { |
44 | struct OperationHandle *prev; | 46 | struct OperationHandle *prev; |
@@ -91,31 +93,55 @@ struct GNUNET_PSYC_Channel | |||
91 | */ | 93 | */ |
92 | struct GNUNET_TIME_Relative reconnect_delay; | 94 | struct GNUNET_TIME_Relative reconnect_delay; |
93 | 95 | ||
94 | GNUNET_PSYC_Method method_cb; | 96 | /** |
97 | * Message part callback. | ||
98 | */ | ||
99 | GNUNET_PSYC_MessageCallback message_cb; | ||
100 | |||
101 | /** | ||
102 | * Message part callback for historic message. | ||
103 | */ | ||
104 | GNUNET_PSYC_MessageCallback hist_message_cb; | ||
95 | 105 | ||
106 | /** | ||
107 | * Join handler callback. | ||
108 | */ | ||
96 | GNUNET_PSYC_JoinCallback join_cb; | 109 | GNUNET_PSYC_JoinCallback join_cb; |
97 | 110 | ||
111 | /** | ||
112 | * Closure for @a message_cb and @a join_cb. | ||
113 | */ | ||
98 | void *cb_cls; | 114 | void *cb_cls; |
99 | 115 | ||
100 | /** | 116 | /** |
101 | * Are we polling for incoming messages right now? | 117 | * ID of the message being received from the PSYC service. |
102 | */ | 118 | */ |
103 | int in_receive; | 119 | uint64_t recv_message_id; |
104 | 120 | ||
105 | /** | 121 | /** |
106 | * Are we currently transmitting a message? | 122 | * State of the currently being received message from the PSYC service. |
107 | */ | 123 | */ |
108 | int in_transmit; | 124 | enum MessageState recv_state; |
109 | 125 | ||
110 | /** | 126 | /** |
111 | * Is this a master or slave channel? | 127 | * Flags for the currently being received message from the PSYC service. |
128 | */ | ||
129 | enum GNUNET_PSYC_MessageFlags recv_flags; | ||
130 | |||
131 | /** | ||
132 | * Expected value size for the modifier being received from the PSYC service. | ||
133 | */ | ||
134 | uint32_t recv_mod_value_size_expected; | ||
135 | |||
136 | /** | ||
137 | * Actual value size for the modifier being received from the PSYC service. | ||
112 | */ | 138 | */ |
113 | int is_master; | 139 | uint32_t recv_mod_value_size; |
114 | 140 | ||
115 | /** | 141 | /** |
116 | * Buffer space available for transmitting the next data fragment. | 142 | * Buffer space available for transmitting the next data fragment. |
117 | */ | 143 | */ |
118 | uint16_t tmit_buf_avail; | 144 | uint16_t tmit_size; // FIXME |
119 | 145 | ||
120 | /** | 146 | /** |
121 | * Is transmission paused? | 147 | * Is transmission paused? |
@@ -125,7 +151,22 @@ struct GNUNET_PSYC_Channel | |||
125 | /** | 151 | /** |
126 | * Are we still waiting for a PSYC_TRANSMIT_ACK? | 152 | * Are we still waiting for a PSYC_TRANSMIT_ACK? |
127 | */ | 153 | */ |
128 | uint8_t tmit_ack_pending; | 154 | uint8_t tmit_ack_pending; // FIXME |
155 | |||
156 | /** | ||
157 | * Are we polling for incoming messages right now? | ||
158 | */ | ||
159 | uint8_t in_receive; | ||
160 | |||
161 | /** | ||
162 | * Are we currently transmitting a message? | ||
163 | */ | ||
164 | uint8_t in_transmit; | ||
165 | |||
166 | /** | ||
167 | * Is this a master or slave channel? | ||
168 | */ | ||
169 | uint8_t is_master; | ||
129 | }; | 170 | }; |
130 | 171 | ||
131 | 172 | ||
@@ -135,9 +176,10 @@ struct GNUNET_PSYC_Channel | |||
135 | struct GNUNET_PSYC_MasterTransmitHandle | 176 | struct GNUNET_PSYC_MasterTransmitHandle |
136 | { | 177 | { |
137 | struct GNUNET_PSYC_Master *master; | 178 | struct GNUNET_PSYC_Master *master; |
138 | GNUNET_PSYC_MasterTransmitNotify notify; | 179 | GNUNET_PSYC_MasterTransmitNotify notify_mod; |
180 | GNUNET_PSYC_MasterTransmitNotify notify_data; | ||
139 | void *notify_cls; | 181 | void *notify_cls; |
140 | enum GNUNET_PSYC_DataStatus status; | 182 | enum MessageState state; |
141 | }; | 183 | }; |
142 | 184 | ||
143 | 185 | ||
@@ -254,52 +296,383 @@ transmit_next (struct GNUNET_PSYC_Channel *ch); | |||
254 | 296 | ||
255 | 297 | ||
256 | /** | 298 | /** |
257 | * Request data from client to transmit. | 299 | * Reset data stored related to the last received message. |
300 | */ | ||
301 | static void | ||
302 | recv_reset (struct GNUNET_PSYC_Channel *ch) | ||
303 | { | ||
304 | ch->recv_state = MSG_STATE_START; | ||
305 | ch->recv_flags = 0; | ||
306 | ch->recv_message_id = 0; | ||
307 | ch->recv_mod_value_size =0; | ||
308 | ch->recv_mod_value_size_expected = 0; | ||
309 | } | ||
310 | |||
311 | |||
312 | static void | ||
313 | recv_error (struct GNUNET_PSYC_Channel *ch) | ||
314 | { | ||
315 | recv_reset (ch); | ||
316 | |||
317 | GNUNET_PSYC_MessageCallback message_cb | ||
318 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | ||
319 | ? ch->hist_message_cb | ||
320 | : ch->message_cb; | ||
321 | |||
322 | if (NULL != message_cb) | ||
323 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL); | ||
324 | } | ||
325 | |||
326 | /** | ||
327 | * Request a modifier from a client to transmit. | ||
258 | * | 328 | * |
259 | * @param mst Master handle. | 329 | * @param mst Master handle. |
260 | */ | 330 | */ |
261 | static void | 331 | static void |
262 | master_transmit_data (struct GNUNET_PSYC_Master *mst) | 332 | master_transmit_mod (struct GNUNET_PSYC_Master *mst) |
263 | { | 333 | { |
264 | struct GNUNET_PSYC_Channel *ch = &mst->ch; | 334 | struct GNUNET_PSYC_Channel *ch = &mst->ch; |
265 | size_t data_size = ch->tmit_buf_avail; | 335 | uint16_t max_data_size |
266 | struct GNUNET_PSYC_MessageData *pdata; | 336 | = ch->tmit_size > sizeof (struct GNUNET_MessageHeader) |
267 | struct OperationHandle *op | 337 | ? GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - ch->tmit_size |
268 | = GNUNET_malloc (sizeof (*op) + sizeof (*pdata) + data_size); | 338 | : GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD - ch->tmit_size; |
269 | pdata = (struct GNUNET_PSYC_MessageData *) &op[1]; | 339 | uint16_t data_size = max_data_size; |
270 | op->msg = (struct GNUNET_MessageHeader *) pdata; | ||
271 | pdata->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); | ||
272 | 340 | ||
273 | switch (mst->tmit->notify (mst->tmit->notify_cls, &data_size, &pdata[1])) | 341 | struct GNUNET_MessageHeader *msg; |
342 | struct OperationHandle *op | ||
343 | = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size); | ||
344 | op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
345 | msg->type | ||
346 | = MSG_STATE_MODIFIER == mst->tmit->state | ||
347 | ? htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER) | ||
348 | : htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); | ||
349 | |||
350 | int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, | ||
351 | &data_size, &msg[1]); | ||
352 | switch (notify_ret) | ||
274 | { | 353 | { |
275 | case GNUNET_NO: | 354 | case GNUNET_NO: |
276 | mst->tmit->status = GNUNET_PSYC_DATA_CONT; | 355 | if (0 != data_size) |
356 | mst->tmit->state = MSG_STATE_MOD_CONT; | ||
277 | break; | 357 | break; |
278 | 358 | ||
279 | case GNUNET_YES: | 359 | case GNUNET_YES: |
280 | mst->tmit->status = GNUNET_PSYC_DATA_END; | 360 | mst->tmit->state = (0 == data_size) ? MSG_STATE_DATA : MSG_STATE_MODIFIER; |
281 | break; | 361 | break; |
282 | 362 | ||
283 | default: | 363 | default: |
284 | mst->tmit->status = GNUNET_PSYC_DATA_CANCEL; | 364 | LOG (GNUNET_ERROR_TYPE_ERROR, |
285 | data_size = 0; | 365 | "MasterTransmitNotify returned error when requesting a modifier.\n"); |
286 | LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error.\n"); | 366 | |
367 | mst->tmit->state = MSG_STATE_START; | ||
368 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | ||
369 | msg->size = htons (sizeof (*msg)); | ||
370 | |||
371 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
372 | transmit_next (ch); | ||
373 | return; | ||
287 | } | 374 | } |
288 | 375 | ||
289 | if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size)) | 376 | if ((GNUNET_NO == notify_ret && 0 == data_size)) |
290 | { | 377 | { |
291 | /* Transmission paused, nothing to send. */ | 378 | /* Transmission paused, nothing to send. */ |
292 | ch->tmit_paused = GNUNET_YES; | 379 | ch->tmit_paused = GNUNET_YES; |
293 | GNUNET_free (op); | 380 | GNUNET_free (op); |
294 | } | 381 | } |
295 | else | 382 | |
383 | if (0 < data_size) | ||
384 | { | ||
385 | GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); | ||
386 | msg->size = htons (sizeof (*msg) + data_size); | ||
387 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
388 | } | ||
389 | |||
390 | /* End of message. */ | ||
391 | if (GNUNET_YES == notify_ret) | ||
392 | { | ||
393 | op = GNUNET_malloc (sizeof *(op) + sizeof (*msg)); | ||
394 | op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
395 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); | ||
396 | msg->size = htons (sizeof (*msg)); | ||
397 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
398 | } | ||
399 | |||
400 | transmit_next (ch); | ||
401 | } | ||
402 | |||
403 | |||
404 | /** | ||
405 | * Request data from a client to transmit. | ||
406 | * | ||
407 | * @param mst Master handle. | ||
408 | */ | ||
409 | static void | ||
410 | master_transmit_data (struct GNUNET_PSYC_Master *mst) | ||
411 | { | ||
412 | struct GNUNET_PSYC_Channel *ch = &mst->ch; | ||
413 | struct GNUNET_MessageHeader *msg; | ||
414 | uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; | ||
415 | struct OperationHandle *op | ||
416 | = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size); | ||
417 | op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
418 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); | ||
419 | |||
420 | int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls, | ||
421 | &data_size, &msg[1]); | ||
422 | switch (notify_ret) | ||
296 | { | 423 | { |
297 | GNUNET_assert (data_size <= ch->tmit_buf_avail); | 424 | case GNUNET_NO: |
298 | pdata->header.size = htons (sizeof (*pdata) + data_size); | 425 | if (0 == data_size) |
299 | pdata->status = htons (mst->tmit->status); | 426 | { |
427 | /* Transmission paused, nothing to send. */ | ||
428 | ch->tmit_paused = GNUNET_YES; | ||
429 | GNUNET_free (op); | ||
430 | } | ||
431 | break; | ||
432 | |||
433 | case GNUNET_YES: | ||
434 | mst->tmit->state = MSG_STATE_START; | ||
435 | break; | ||
436 | |||
437 | default: | ||
438 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
439 | "MasterTransmitNotify returned error when requesting data.\n"); | ||
440 | |||
441 | mst->tmit->state = MSG_STATE_START; | ||
442 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); | ||
443 | msg->size = htons (sizeof (*msg)); | ||
444 | |||
300 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | 445 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); |
301 | ch->tmit_ack_pending = GNUNET_YES; | ||
302 | transmit_next (ch); | 446 | transmit_next (ch); |
447 | return; | ||
448 | } | ||
449 | |||
450 | if (0 < data_size) | ||
451 | { | ||
452 | GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); | ||
453 | msg->size = htons (sizeof (*msg) + data_size); | ||
454 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
455 | } | ||
456 | |||
457 | /* End of message. */ | ||
458 | if (GNUNET_YES == notify_ret) | ||
459 | { | ||
460 | op = GNUNET_malloc (sizeof *(op) + sizeof (*msg)); | ||
461 | op->msg = msg = (struct GNUNET_MessageHeader *) &op[1]; | ||
462 | msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); | ||
463 | msg->size = htons (sizeof (*msg)); | ||
464 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | ||
465 | } | ||
466 | |||
467 | transmit_next (ch); | ||
468 | } | ||
469 | |||
470 | |||
471 | /** | ||
472 | * Handle incoming message from the PSYC service. | ||
473 | * | ||
474 | * @param ch The channel the message is sent to. | ||
475 | * @param pmsg The message. | ||
476 | */ | ||
477 | static void | ||
478 | handle_psyc_message (struct GNUNET_PSYC_Channel *ch, | ||
479 | const struct GNUNET_PSYC_MessageHeader *pmsg) | ||
480 | { | ||
481 | const struct GNUNET_MessageHeader *msg; | ||
482 | uint16_t msize = ntohs (pmsg->header.size); | ||
483 | uint16_t pos = 0; | ||
484 | uint16_t size = 0; | ||
485 | uint16_t type, size_eq, size_min; | ||
486 | |||
487 | if (MSG_STATE_START == ch->recv_state) | ||
488 | { | ||
489 | ch->recv_message_id = GNUNET_ntohll (pmsg->message_id); | ||
490 | ch->recv_flags = ntohl (pmsg->flags); | ||
491 | } | ||
492 | else if (GNUNET_ntohll (pmsg->message_id) != ch->recv_message_id) | ||
493 | { | ||
494 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
495 | "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", | ||
496 | GNUNET_ntohll (pmsg->message_id), ch->recv_message_id); | ||
497 | GNUNET_break_op (0); | ||
498 | recv_error (ch); | ||
499 | } | ||
500 | else if (ntohl (pmsg->flags) != ch->recv_flags) | ||
501 | { | ||
502 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
503 | "Unexpected message flags. Got: %lu, expected: %lu\n", | ||
504 | ntohl (pmsg->flags), ch->recv_flags); | ||
505 | GNUNET_break_op (0); | ||
506 | recv_error (ch); | ||
507 | } | ||
508 | |||
509 | for (pos = 0; sizeof (*pmsg) + pos < msize; pos += size) | ||
510 | { | ||
511 | msg = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); | ||
512 | size = ntohs (msg->size); | ||
513 | type = ntohs (msg->type); | ||
514 | size_eq = size_min = 0; | ||
515 | |||
516 | if (msize < sizeof (*pmsg) + pos + size) | ||
517 | { | ||
518 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
519 | "Discarding message of type %u with invalid size. " | ||
520 | "(%u < %u + %u + %u)\n", ntohs (msg->type), | ||
521 | msize, sizeof (*msg), pos, size); | ||
522 | break; | ||
523 | } | ||
524 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
525 | "Received message part of type %u and size %u from PSYC.\n", | ||
526 | ntohs (msg->type), size); | ||
527 | |||
528 | |||
529 | switch (type) | ||
530 | { | ||
531 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
532 | size_min = sizeof (struct GNUNET_PSYC_MessageMethod); | ||
533 | break; | ||
534 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
535 | size_min = sizeof (struct GNUNET_PSYC_MessageModifier); | ||
536 | break; | ||
537 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
538 | size_min = sizeof (struct GNUNET_MessageHeader); | ||
539 | break; | ||
540 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
541 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | ||
542 | size_eq = sizeof (struct GNUNET_MessageHeader); | ||
543 | break; | ||
544 | } | ||
545 | |||
546 | if (! ((0 < size_eq && size == size_eq) | ||
547 | || (0 < size_min && size_min <= size))) | ||
548 | { | ||
549 | GNUNET_break (0); | ||
550 | reschedule_connect (ch); | ||
551 | return; | ||
552 | } | ||
553 | |||
554 | switch (type) | ||
555 | { | ||
556 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
557 | { | ||
558 | struct GNUNET_PSYC_MessageMethod *meth | ||
559 | = (struct GNUNET_PSYC_MessageMethod *) msg; | ||
560 | |||
561 | if (MSG_STATE_HEADER != ch->recv_state) | ||
562 | { | ||
563 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
564 | "Discarding out of order message method.\n"); | ||
565 | /* It is normal to receive an incomplete message right after connecting, | ||
566 | * but should not happen later. | ||
567 | * FIXME: add a check for this condition. | ||
568 | */ | ||
569 | GNUNET_break_op (0); | ||
570 | recv_error (ch); | ||
571 | break; | ||
572 | } | ||
573 | |||
574 | if ('\0' != (char *) meth + msg->size - 1) | ||
575 | { | ||
576 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
577 | "Discarding message with malformed method. " | ||
578 | "Message ID: %" PRIu64 "\n", ch->recv_message_id); | ||
579 | GNUNET_break_op (0); | ||
580 | recv_error (ch); | ||
581 | break; | ||
582 | } | ||
583 | GNUNET_PSYC_MessageCallback message_cb | ||
584 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | ||
585 | ? ch->hist_message_cb | ||
586 | : ch->message_cb; | ||
587 | |||
588 | if (NULL != message_cb) | ||
589 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg); | ||
590 | |||
591 | ch->recv_state = MSG_STATE_METHOD; | ||
592 | break; | ||
593 | } | ||
594 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
595 | { | ||
596 | if (MSG_STATE_MODIFIER != ch->recv_state) | ||
597 | { | ||
598 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
599 | "Discarding out of order message modifier.\n"); | ||
600 | GNUNET_break_op (0); | ||
601 | recv_error (ch); | ||
602 | break; | ||
603 | } | ||
604 | |||
605 | struct GNUNET_PSYC_MessageModifier *mod | ||
606 | = (struct GNUNET_PSYC_MessageModifier *) msg; | ||
607 | |||
608 | uint16_t name_size = ntohs (mod->name_size); | ||
609 | ch->recv_mod_value_size_expected = ntohs (mod->value_size); | ||
610 | ch->recv_mod_value_size = size - sizeof (*mod) - name_size - 1; | ||
611 | |||
612 | if (size < sizeof (*mod) + name_size + 1 | ||
613 | || '\0' != (char *) &mod[1] + mod->name_size | ||
614 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) | ||
615 | { | ||
616 | LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n"); | ||
617 | GNUNET_break_op (0); | ||
618 | break; | ||
619 | } | ||
620 | |||
621 | ch->recv_state = MSG_STATE_MODIFIER; | ||
622 | |||
623 | GNUNET_PSYC_MessageCallback message_cb | ||
624 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | ||
625 | ? ch->hist_message_cb | ||
626 | : ch->message_cb; | ||
627 | |||
628 | if (NULL != message_cb) | ||
629 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg); | ||
630 | |||
631 | break; | ||
632 | } | ||
633 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | ||
634 | { | ||
635 | ch->recv_mod_value_size += size - sizeof (*msg); | ||
636 | |||
637 | if (MSG_STATE_MODIFIER != ch->recv_state | ||
638 | || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) | ||
639 | { | ||
640 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
641 | "Discarding out of order message modifier continuation.\n"); | ||
642 | GNUNET_break_op (0); | ||
643 | recv_reset (ch); | ||
644 | break; | ||
645 | } | ||
646 | |||
647 | GNUNET_PSYC_MessageCallback message_cb | ||
648 | = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC | ||
649 | ? ch->hist_message_cb | ||
650 | : ch->message_cb; | ||
651 | |||
652 | if (NULL != message_cb) | ||
653 | message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg); | ||
654 | break; | ||
655 | } | ||
656 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
657 | { | ||
658 | if (ch->recv_state < MSG_STATE_METHOD | ||
659 | || ch->recv_mod_value_size_expected != ch->recv_mod_value_size) | ||
660 | { | ||
661 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
662 | "Discarding out of order message data fragment.\n"); | ||
663 | GNUNET_break_op (0); | ||
664 | recv_reset (ch); | ||
665 | break; | ||
666 | } | ||
667 | |||
668 | ch->recv_state = MSG_STATE_DATA; | ||
669 | break; | ||
670 | } | ||
671 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
672 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | ||
673 | recv_reset (ch); | ||
674 | break; | ||
675 | } | ||
303 | } | 676 | } |
304 | } | 677 | } |
305 | 678 | ||
@@ -319,11 +692,10 @@ message_handler (void *cls, | |||
319 | struct GNUNET_PSYC_Channel *ch = cls; | 692 | struct GNUNET_PSYC_Channel *ch = cls; |
320 | struct GNUNET_PSYC_Master *mst = cls; | 693 | struct GNUNET_PSYC_Master *mst = cls; |
321 | struct GNUNET_PSYC_Slave *slv = cls; | 694 | struct GNUNET_PSYC_Slave *slv = cls; |
322 | struct CountersResult *cres; | ||
323 | struct TransmitAck *tack; | ||
324 | 695 | ||
325 | if (NULL == msg) | 696 | if (NULL == msg) |
326 | { | 697 | { |
698 | GNUNET_break (0); | ||
327 | reschedule_connect (ch); | 699 | reschedule_connect (ch); |
328 | return; | 700 | return; |
329 | } | 701 | } |
@@ -342,8 +714,8 @@ message_handler (void *cls, | |||
342 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: | 714 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: |
343 | size_eq = sizeof (struct CountersResult); | 715 | size_eq = sizeof (struct CountersResult); |
344 | break; | 716 | break; |
345 | case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: | 717 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: |
346 | size_eq = sizeof (struct TransmitAck); | 718 | size_min = sizeof (struct GNUNET_PSYC_MessageHeader); |
347 | break; | 719 | break; |
348 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | 720 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: |
349 | size_min = sizeof (struct GNUNET_PSYC_MessageMethod); | 721 | size_min = sizeof (struct GNUNET_PSYC_MessageMethod); |
@@ -352,11 +724,13 @@ message_handler (void *cls, | |||
352 | size_min = sizeof (struct GNUNET_PSYC_MessageModifier); | 724 | size_min = sizeof (struct GNUNET_PSYC_MessageModifier); |
353 | break; | 725 | break; |
354 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | 726 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: |
355 | size_min = sizeof (struct GNUNET_PSYC_MessageData); | 727 | size_min = sizeof (struct GNUNET_MessageHeader); |
728 | break; | ||
729 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
730 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: | ||
731 | case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: | ||
732 | size_eq = sizeof (struct GNUNET_MessageHeader); | ||
356 | break; | 733 | break; |
357 | default: | ||
358 | GNUNET_break_op (0); | ||
359 | return; | ||
360 | } | 734 | } |
361 | 735 | ||
362 | if (! ((0 < size_eq && size == size_eq) | 736 | if (! ((0 < size_eq && size == size_eq) |
@@ -370,38 +744,63 @@ message_handler (void *cls, | |||
370 | switch (type) | 744 | switch (type) |
371 | { | 745 | { |
372 | case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: | 746 | case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: |
373 | cres = (struct CountersResult *) msg; | 747 | { |
748 | struct CountersResult *cres = (struct CountersResult *) msg; | ||
374 | mst->max_message_id = GNUNET_ntohll (cres->max_message_id); | 749 | mst->max_message_id = GNUNET_ntohll (cres->max_message_id); |
375 | if (NULL != mst->start_cb) | 750 | if (NULL != mst->start_cb) |
376 | mst->start_cb (ch->cb_cls, mst->max_message_id); | 751 | mst->start_cb (ch->cb_cls, mst->max_message_id); |
377 | break; | 752 | break; |
378 | 753 | } | |
379 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: | 754 | case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: |
380 | cres = (struct CountersResult *) msg; | 755 | { |
381 | #if TODO | 756 | #if TODO |
757 | struct CountersResult *cres = (struct CountersResult *) msg; | ||
382 | slv->max_message_id = GNUNET_ntohll (cres->max_message_id); | 758 | slv->max_message_id = GNUNET_ntohll (cres->max_message_id); |
383 | if (NULL != slv->join_ack_cb) | 759 | if (NULL != slv->join_ack_cb) |
384 | mst->join_ack_cb (ch->cb_cls, mst->max_message_id); | 760 | mst->join_ack_cb (ch->cb_cls, mst->max_message_id); |
385 | #endif | 761 | #endif |
386 | break; | 762 | break; |
387 | 763 | } | |
388 | case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: | 764 | case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: |
389 | tack = (struct TransmitAck *) msg; | 765 | { |
766 | ch->tmit_ack_pending = GNUNET_NO; | ||
767 | |||
390 | if (ch->is_master) | 768 | if (ch->is_master) |
391 | { | 769 | { |
392 | GNUNET_assert (NULL != mst->tmit); | 770 | GNUNET_assert (NULL != mst->tmit); |
393 | if (GNUNET_PSYC_DATA_CONT != mst->tmit->status | 771 | switch (mst->tmit->state) |
394 | || NULL == mst->tmit->notify) | ||
395 | { | ||
396 | GNUNET_free (mst->tmit); | ||
397 | mst->tmit = NULL; | ||
398 | } | ||
399 | else | ||
400 | { | 772 | { |
401 | ch->tmit_buf_avail = ntohs (tack->buf_avail); | 773 | case MSG_STATE_MODIFIER: |
402 | ch->tmit_ack_pending = GNUNET_NO; | 774 | if (GNUNET_NO == ch->tmit_paused) |
775 | master_transmit_mod (mst); | ||
776 | break; | ||
777 | |||
778 | case MSG_STATE_MOD_CONT: | ||
779 | if (GNUNET_NO == ch->tmit_paused) | ||
780 | master_transmit_mod (mst); | ||
781 | break; | ||
782 | |||
783 | case MSG_STATE_DATA: | ||
403 | if (GNUNET_NO == ch->tmit_paused) | 784 | if (GNUNET_NO == ch->tmit_paused) |
404 | master_transmit_data (mst); | 785 | master_transmit_data (mst); |
786 | break; | ||
787 | |||
788 | case MSG_STATE_END: | ||
789 | case MSG_STATE_CANCEL: | ||
790 | if (NULL != mst->tmit) | ||
791 | { | ||
792 | GNUNET_free (mst->tmit); | ||
793 | mst->tmit = NULL; | ||
794 | } | ||
795 | else | ||
796 | { | ||
797 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
798 | "Ignoring transmit ack, there's no transmission going on.\n"); | ||
799 | } | ||
800 | break; | ||
801 | default: | ||
802 | LOG (GNUNET_ERROR_TYPE_WARNING, | ||
803 | "Ignoring unexpected transmit ack.\n"); | ||
405 | } | 804 | } |
406 | } | 805 | } |
407 | else | 806 | else |
@@ -409,17 +808,10 @@ message_handler (void *cls, | |||
409 | /* TODO: slave */ | 808 | /* TODO: slave */ |
410 | } | 809 | } |
411 | break; | 810 | break; |
811 | } | ||
412 | 812 | ||
413 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | 813 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: |
414 | 814 | handle_psyc_message(ch, (const struct GNUNET_PSYC_MessageHeader *) msg); | |
415 | break; | ||
416 | |||
417 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
418 | |||
419 | break; | ||
420 | |||
421 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
422 | |||
423 | break; | 815 | break; |
424 | } | 816 | } |
425 | 817 | ||
@@ -506,6 +898,7 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
506 | { | 898 | { |
507 | struct GNUNET_PSYC_Channel *ch = cls; | 899 | struct GNUNET_PSYC_Channel *ch = cls; |
508 | 900 | ||
901 | recv_reset (ch); | ||
509 | ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK; | 902 | ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK; |
510 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 903 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
511 | "Connecting to PSYC service.\n"); | 904 | "Connecting to PSYC service.\n"); |
@@ -588,7 +981,7 @@ disconnect (void *c) | |||
588 | * one in the future. | 981 | * one in the future. |
589 | * @param policy Channel policy specifying join and history restrictions. | 982 | * @param policy Channel policy specifying join and history restrictions. |
590 | * Used to automate join decisions. | 983 | * Used to automate join decisions. |
591 | * @param method Function to invoke on messages received from slaves. | 984 | * @param message_cb Function to invoke on message parts received from slaves. |
592 | * @param join_cb Function to invoke when a peer wants to join. | 985 | * @param join_cb Function to invoke when a peer wants to join. |
593 | * @param master_started_cb Function to invoke after the channel master started. | 986 | * @param master_started_cb Function to invoke after the channel master started. |
594 | * @param cls Closure for @a master_started_cb and @a join_cb. | 987 | * @param cls Closure for @a master_started_cb and @a join_cb. |
@@ -598,7 +991,7 @@ struct GNUNET_PSYC_Master * | |||
598 | GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | 991 | GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, |
599 | const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key, | 992 | const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key, |
600 | enum GNUNET_PSYC_Policy policy, | 993 | enum GNUNET_PSYC_Policy policy, |
601 | GNUNET_PSYC_Method method, | 994 | GNUNET_PSYC_MessageCallback message_cb, |
602 | GNUNET_PSYC_JoinCallback join_cb, | 995 | GNUNET_PSYC_JoinCallback join_cb, |
603 | GNUNET_PSYC_MasterStartCallback master_started_cb, | 996 | GNUNET_PSYC_MasterStartCallback master_started_cb, |
604 | void *cls) | 997 | void *cls) |
@@ -618,7 +1011,7 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
618 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; | 1011 | ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; |
619 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); | 1012 | ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); |
620 | 1013 | ||
621 | ch->method_cb = method; | 1014 | ch->message_cb = message_cb; |
622 | ch->join_cb = join_cb; | 1015 | ch->join_cb = join_cb; |
623 | ch->cb_cls = cls; | 1016 | ch->cb_cls = cls; |
624 | mst->start_cb = master_started_cb; | 1017 | mst->start_cb = master_started_cb; |
@@ -705,19 +1098,17 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod) | |||
705 | * | 1098 | * |
706 | * @param master Handle to the PSYC channel. | 1099 | * @param master Handle to the PSYC channel. |
707 | * @param method_name Which method should be invoked. | 1100 | * @param method_name Which method should be invoked. |
708 | * @param env Environment containing state operations and transient variables | 1101 | * @param notify_mod Function to call to obtain modifiers. |
709 | * for the message, or NULL. | 1102 | * @param notify_data Function to call to obtain fragments of the data. |
710 | * @param notify Function to call to obtain the arguments. | 1103 | * @param notify_cls Closure for @a notify_mod and @a notify_data. |
711 | * @param notify_cls Closure for @a notify. | ||
712 | * @param flags Flags for the message being transmitted. | 1104 | * @param flags Flags for the message being transmitted. |
713 | * @return Transmission handle, NULL on error (i.e. more than one request | 1105 | * @return Transmission handle, NULL on error (i.e. more than one request queued). |
714 | * queued). | ||
715 | */ | 1106 | */ |
716 | struct GNUNET_PSYC_MasterTransmitHandle * | 1107 | struct GNUNET_PSYC_MasterTransmitHandle * |
717 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | 1108 | GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, |
718 | const char *method_name, | 1109 | const char *method_name, |
719 | const struct GNUNET_ENV_Environment *env, | 1110 | GNUNET_PSYC_MasterTransmitNotify notify_mod, |
720 | GNUNET_PSYC_MasterTransmitNotify notify, | 1111 | GNUNET_PSYC_MasterTransmitNotify notify_data, |
721 | void *notify_cls, | 1112 | void *notify_cls, |
722 | enum GNUNET_PSYC_MasterTransmitFlags flags) | 1113 | enum GNUNET_PSYC_MasterTransmitFlags flags) |
723 | { | 1114 | { |
@@ -737,18 +1128,17 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, | |||
737 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); | 1128 | pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); |
738 | pmeth->header.size = htons (sizeof (*pmeth) + size); | 1129 | pmeth->header.size = htons (sizeof (*pmeth) + size); |
739 | pmeth->flags = htonl (flags); | 1130 | pmeth->flags = htonl (flags); |
740 | pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env)); | ||
741 | memcpy (&pmeth[1], method_name, size); | 1131 | memcpy (&pmeth[1], method_name, size); |
742 | 1132 | ||
743 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); | 1133 | GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op); |
744 | GNUNET_ENV_environment_iterate (env, send_modifier, master); | ||
745 | transmit_next (ch); | 1134 | transmit_next (ch); |
746 | 1135 | ||
747 | master->tmit = GNUNET_malloc (sizeof (*master->tmit)); | 1136 | master->tmit = GNUNET_malloc (sizeof (*master->tmit)); |
748 | master->tmit->master = master; | 1137 | master->tmit->master = master; |
749 | master->tmit->notify = notify; | 1138 | master->tmit->notify_mod = notify_mod; |
1139 | master->tmit->notify_data = notify_data; | ||
750 | master->tmit->notify_cls = notify_cls; | 1140 | master->tmit->notify_cls = notify_cls; |
751 | master->tmit->status = GNUNET_PSYC_DATA_CONT; | 1141 | master->tmit->state = MSG_STATE_START; // FIXME |
752 | return master->tmit; | 1142 | return master->tmit; |
753 | } | 1143 | } |
754 | 1144 | ||
@@ -804,12 +1194,13 @@ GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) | |||
804 | * @param relay_count Number of peers in the @a relays array. | 1194 | * @param relay_count Number of peers in the @a relays array. |
805 | * @param relays Peer identities of members of the multicast group, which serve | 1195 | * @param relays Peer identities of members of the multicast group, which serve |
806 | * as relays and used to join the group at. | 1196 | * as relays and used to join the group at. |
807 | * @param method Function to invoke on messages received from the channel, | 1197 | * @param message_cb Function to invoke on message parts received from the |
808 | * typically at least contains functions for @e join and @e part. | 1198 | * channel, typically at least contains method handlers for @e join and |
1199 | * @e part. | ||
809 | * @param join_cb function invoked once we have joined with the current | 1200 | * @param join_cb function invoked once we have joined with the current |
810 | * message ID of the channel | 1201 | * message ID of the channel |
811 | * @param slave_joined_cb Function to invoke when a peer wants to join. | 1202 | * @param slave_joined_cb Function to invoke when a peer wants to join. |
812 | * @param cls Closure for @a method_cb and @a slave_joined_cb. | 1203 | * @param cls Closure for @a message_cb and @a slave_joined_cb. |
813 | * @param method_name Method name for the join request. | 1204 | * @param method_name Method name for the join request. |
814 | * @param env Environment containing transient variables for the request, or NULL. | 1205 | * @param env Environment containing transient variables for the request, or NULL. |
815 | * @param data Payload for the join message. | 1206 | * @param data Payload for the join message. |
@@ -823,7 +1214,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
823 | const struct GNUNET_PeerIdentity *origin, | 1214 | const struct GNUNET_PeerIdentity *origin, |
824 | uint32_t relay_count, | 1215 | uint32_t relay_count, |
825 | const struct GNUNET_PeerIdentity *relays, | 1216 | const struct GNUNET_PeerIdentity *relays, |
826 | GNUNET_PSYC_Method method, | 1217 | GNUNET_PSYC_MessageCallback message_cb, |
827 | GNUNET_PSYC_JoinCallback join_cb, | 1218 | GNUNET_PSYC_JoinCallback join_cb, |
828 | GNUNET_PSYC_SlaveJoinCallback slave_joined_cb, | 1219 | GNUNET_PSYC_SlaveJoinCallback slave_joined_cb, |
829 | void *cls, | 1220 | void *cls, |
@@ -845,6 +1236,10 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
845 | req->relay_count = relay_count; | 1236 | req->relay_count = relay_count; |
846 | memcpy (&req[1], relays, relay_count * sizeof (*relays)); | 1237 | memcpy (&req[1], relays, relay_count * sizeof (*relays)); |
847 | 1238 | ||
1239 | ch->message_cb = message_cb; | ||
1240 | ch->join_cb = join_cb; | ||
1241 | ch->cb_cls = cls; | ||
1242 | |||
848 | ch->cfg = cfg; | 1243 | ch->cfg = cfg; |
849 | ch->is_master = GNUNET_NO; | 1244 | ch->is_master = GNUNET_NO; |
850 | ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; | 1245 | ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; |
@@ -1043,7 +1438,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, | |||
1043 | * @param channel Which channel should be replayed? | 1438 | * @param channel Which channel should be replayed? |
1044 | * @param start_message_id Earliest interesting point in history. | 1439 | * @param start_message_id Earliest interesting point in history. |
1045 | * @param end_message_id Last (exclusive) interesting point in history. | 1440 | * @param end_message_id Last (exclusive) interesting point in history. |
1046 | * @param method Function to invoke on messages received from the story. | 1441 | * @param message_cb Function to invoke on message parts received from the story. |
1047 | * @param finish_cb Function to call when the requested story has been fully | 1442 | * @param finish_cb Function to call when the requested story has been fully |
1048 | * told (counting message IDs might not suffice, as some messages | 1443 | * told (counting message IDs might not suffice, as some messages |
1049 | * might be secret and thus the listener would not know the story is | 1444 | * might be secret and thus the listener would not know the story is |
@@ -1057,8 +1452,8 @@ struct GNUNET_PSYC_Story * | |||
1057 | GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel, | 1452 | GNUNET_PSYC_channel_story_tell (struct GNUNET_PSYC_Channel *channel, |
1058 | uint64_t start_message_id, | 1453 | uint64_t start_message_id, |
1059 | uint64_t end_message_id, | 1454 | uint64_t end_message_id, |
1060 | GNUNET_PSYC_Method method, | 1455 | GNUNET_PSYC_MessageCallback message_cb, |
1061 | GNUNET_PSYC_FinishCallback *finish_cb, | 1456 | GNUNET_PSYC_FinishCallback finish_cb, |
1062 | void *cls) | 1457 | void *cls) |
1063 | { | 1458 | { |
1064 | return NULL; | 1459 | return NULL; |
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 2986fdf6a..704819c50 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c | |||
@@ -148,11 +148,16 @@ join (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, | |||
148 | struct TransmitClosure | 148 | struct TransmitClosure |
149 | { | 149 | { |
150 | struct GNUNET_PSYC_MasterTransmitHandle *handle; | 150 | struct GNUNET_PSYC_MasterTransmitHandle *handle; |
151 | uint8_t n; | 151 | |
152 | char *mod_names[16]; | ||
153 | char *mod_values[16]; | ||
154 | char *data[16]; | ||
155 | |||
156 | uint8_t mod_count; | ||
157 | uint8_t data_count; | ||
158 | |||
152 | uint8_t paused; | 159 | uint8_t paused; |
153 | uint8_t fragment_count; | 160 | uint8_t n; |
154 | char *fragments[16]; | ||
155 | uint16_t fragment_sizes[16]; | ||
156 | }; | 161 | }; |
157 | 162 | ||
158 | 163 | ||
@@ -167,16 +172,47 @@ transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
167 | 172 | ||
168 | 173 | ||
169 | static int | 174 | static int |
170 | transmit_notify (void *cls, size_t *data_size, void *data) | 175 | tmit_notify_mod (void *cls, size_t *data_size, void *data) |
171 | { | 176 | { |
172 | struct TransmitClosure *tmit = cls; | 177 | struct TransmitClosure *tmit = cls; |
173 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 178 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
174 | "Transmit notify: %lu bytes available, " | 179 | "Transmit notify modifier: %lu bytes available, " |
180 | "processing modifier %u/%u.\n", | ||
181 | *data_size, tmit->n + 1, tmit->fragment_count); | ||
182 | /* FIXME: continuation */ | ||
183 | uint16_t name_size = strlen (tmit->mod_names[tmit->n]); | ||
184 | uint16_t value_size = strlen (tmit->mod_values[tmit->n]); | ||
185 | if (name_size + 1 + value_size <= *data_size) | ||
186 | return GNUNET_NO; | ||
187 | |||
188 | *data_size = name_size + 1 + value_size; | ||
189 | memcpy (data, tmit->fragments[tmit->n], *data_size); | ||
190 | |||
191 | if (++tmit->n < tmit->mod_count) | ||
192 | { | ||
193 | return GNUNET_NO; | ||
194 | } | ||
195 | else | ||
196 | { | ||
197 | tmit->n = 0; | ||
198 | return GNUNET_YES; | ||
199 | } | ||
200 | } | ||
201 | |||
202 | |||
203 | static int | ||
204 | tmit_notify_data (void *cls, size_t *data_size, void *data) | ||
205 | { | ||
206 | struct TransmitClosure *tmit = cls; | ||
207 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
208 | "Transmit notify data: %lu bytes available, " | ||
175 | "processing fragment %u/%u.\n", | 209 | "processing fragment %u/%u.\n", |
176 | *data_size, tmit->n + 1, tmit->fragment_count); | 210 | *data_size, tmit->n + 1, tmit->fragment_count); |
177 | GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size); | 211 | uint16_t size = strlen (tmit->data[tmit->n]); |
212 | if (size <= *data_size) | ||
213 | return GNUNET_NO; | ||
178 | 214 | ||
179 | if (GNUNET_YES == tmit->paused && tmit->n == tmit->fragment_count - 1) | 215 | if (GNUNET_YES == tmit->paused && tmit->n == tmit->data_count - 1) |
180 | { | 216 | { |
181 | /* Send last fragment later. */ | 217 | /* Send last fragment later. */ |
182 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n"); | 218 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n"); |
@@ -188,13 +224,13 @@ transmit_notify (void *cls, size_t *data_size, void *data) | |||
188 | return GNUNET_NO; | 224 | return GNUNET_NO; |
189 | } | 225 | } |
190 | 226 | ||
191 | GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size); | 227 | *data_size = size; |
192 | *data_size = tmit->fragment_sizes[tmit->n]; | 228 | memcpy (data, tmit->data[tmit->n], size); |
193 | memcpy (data, tmit->fragments[tmit->n], *data_size); | ||
194 | 229 | ||
195 | return ++tmit->n < tmit->fragment_count ? GNUNET_NO : GNUNET_YES; | 230 | return ++tmit->n < tmit->data_count ? GNUNET_NO : GNUNET_YES; |
196 | } | 231 | } |
197 | 232 | ||
233 | |||
198 | void | 234 | void |
199 | master_started (void *cls, uint64_t max_message_id) | 235 | master_started (void *cls, uint64_t max_message_id) |
200 | { | 236 | { |
@@ -208,15 +244,13 @@ master_started (void *cls, uint64_t max_message_id) | |||
208 | "_foo_bar", "foo bar baz", 11); | 244 | "_foo_bar", "foo bar baz", 11); |
209 | 245 | ||
210 | struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure); | 246 | struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure); |
211 | tmit->fragment_count = 3; | 247 | tmit->data[0] = "foo"; |
212 | tmit->fragments[0] = "foo"; | 248 | tmit->data[1] = "foo bar"; |
213 | tmit->fragment_sizes[0] = 4; | 249 | tmit->data[2] = "foo bar baz"; |
214 | tmit->fragments[1] = "foo bar"; | 250 | tmit->data_count = 3; |
215 | tmit->fragment_sizes[1] = 7; | ||
216 | tmit->fragments[2] = "foo bar baz"; | ||
217 | tmit->fragment_sizes[2] = 11; | ||
218 | tmit->handle | 251 | tmit->handle |
219 | = GNUNET_PSYC_master_transmit (mst, "_test", env, transmit_notify, tmit, | 252 | = GNUNET_PSYC_master_transmit (mst, "_test", tmit_notify_mod, |
253 | tmit_notify_data, tmit, | ||
220 | GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); | 254 | GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); |
221 | } | 255 | } |
222 | 256 | ||