aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-01-25 18:34:41 +0100
committerChristian Grothoff <christian@grothoff.org>2019-01-25 18:34:57 +0100
commit8e157def22a61958cd4b6c791da505a38062cc1d (patch)
treec7a53e64681bca90ad4cb69f5fe5e6f9fa0bd8e8 /src
parent7536db9fc52657812e14c6890f2a95db8ca685b2 (diff)
downloadgnunet-8e157def22a61958cd4b6c791da505a38062cc1d.tar.gz
gnunet-8e157def22a61958cd4b6c791da505a38062cc1d.zip
add prototypes for handlers for incoming messages
Diffstat (limited to 'src')
-rw-r--r--src/include/gnunet_mq_lib.h45
-rw-r--r--src/include/gnunet_protocols.h5
-rw-r--r--src/transport/gnunet-service-tng.c565
-rw-r--r--src/util/mq.c44
4 files changed, 589 insertions, 70 deletions
diff --git a/src/include/gnunet_mq_lib.h b/src/include/gnunet_mq_lib.h
index 3f67dc365..2a459636a 100644
--- a/src/include/gnunet_mq_lib.h
+++ b/src/include/gnunet_mq_lib.h
@@ -528,6 +528,51 @@ struct GNUNET_MQ_MessageHandler
528 528
529 529
530/** 530/**
531 * Insert code for a "check_" function that verifies that
532 * a given variable-length message received over the network
533 * is followed by another variable-length message that fits
534 * exactly with the given size. If the message @a m
535 * is not followed by another `struct GNUNET_MessageHeader`
536 * with a size that adds up to the total size, an error is logged
537 * and the function is returned with #GNUNET_NO.
538 *
539 * @param an IPC message with proper type to determine
540 * the size, starting with a `struct GNUNET_MessageHeader`
541 */
542#define GNUNET_MQ_check_boxed_message(m) \
543 { \
544 const struct GNUNET_MessageHeader *inbox = \
545 (const struct GNUNET_MessageHeader *) &m[1]; \
546 const struct GNUNET_MessageHeader *hdr = \
547 (const struct GNUNET_MessageHeader *) m; \
548 uint16_t slen = ntohs (hdr->size) - sizeof (*m); \
549 if ( (slen < sizeof (struct GNUNET_MessageHeader))||\
550 (slen != ntohs (inbox->size)) ) \
551 { \
552 GNUNET_break (0); \
553 return GNUNET_NO; \
554 } \
555 }
556
557
558/**
559 * Call the message message handler that was registered
560 * for the type of the given message in the given @a handlers list.
561 *
562 * This function is indended to be used for the implementation
563 * of message queues.
564 *
565 * @param handlers a set of handlers
566 * @param mh message to dispatch
567 * @return #GNUNET_OK on success, #GNUNET_NO if no handler matched,
568 * #GNUNET_SYSERR if message was rejected by check function
569 */
570int
571GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
572 const struct GNUNET_MessageHeader *mh);
573
574
575/**
531 * Create a new envelope. 576 * Create a new envelope.
532 * 577 *
533 * @param mhp message header to store the allocated message header in, can be NULL 578 * @param mhp message header to store the allocated message header in, can be NULL
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 8593005d7..a8d716b3f 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -3125,6 +3125,11 @@ extern "C"
3125#define GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX 1219 3125#define GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX 1219
3126 3126
3127/** 3127/**
3128 * Transport affirming receipt of an ephemeral key.
3129 */
3130#define GNUNET_MESSAGE_TYPE_TRANSPORT_EPHEMERAL_CONFIRMATION 1220
3131
3132/**
3128 * Message sent to indicate to the transport that a monitor 3133 * Message sent to indicate to the transport that a monitor
3129 * wants to observe certain events. 3134 * wants to observe certain events.
3130 */ 3135 */
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 8febbdfff..3cccf5173 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -33,7 +33,7 @@
33 * transport-to-transport traffic) 33 * transport-to-transport traffic)
34 * 34 *
35 * Implement: 35 * Implement:
36 * - manage fragmentation/defragmentation, retransmission, track RTT, loss, etc. 36 * - manage defragmentation, retransmission, track RTT, loss, etc.
37 * 37 *
38 * Easy: 38 * Easy:
39 * - use ATS bandwidth allocation callback and schedule transmissions! 39 * - use ATS bandwidth allocation callback and schedule transmissions!
@@ -165,8 +165,8 @@ struct TransportBackchannelEncapsulationMessage
165 165
166 166
167/** 167/**
168 * Message by which a peer confirms that it is using an 168 * Body by which a peqer confirms that it is using an ephemeral
169 * ephemeral key. 169 * key.
170 */ 170 */
171struct EphemeralConfirmation 171struct EphemeralConfirmation
172{ 172{
@@ -192,6 +192,37 @@ struct EphemeralConfirmation
192 192
193 193
194/** 194/**
195 * Message by which a peqer confirms that it is using an ephemeral
196 * key.
197 */
198struct EphemeralConfirmationMessage
199{
200
201 /**
202 * Message header, type is #GNUNET_MESSAGE_TYPE_TRANSPORT_EPHEMERAL_CONFIRMATION
203 */
204 struct GNUNET_MessageHeader header;
205
206 /**
207 * Must be zero.
208 */
209 uint32_t reserved;
210
211 /**
212 * How long is this signature over the ephemeral key
213 * valid?
214 */
215 struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
216
217 /**
218 * Ephemeral key setup by the sender for @e target, used
219 * to encrypt the payload.
220 */
221 struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
222};
223
224
225/**
195 * Plaintext of the variable-size payload that is encrypted 226 * Plaintext of the variable-size payload that is encrypted
196 * within a `struct TransportBackchannelEncapsulationMessage` 227 * within a `struct TransportBackchannelEncapsulationMessage`
197 */ 228 */
@@ -863,7 +894,12 @@ struct PendingMessage
863 * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOX) 894 * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOX)
864 */ 895 */
865 struct PendingMessage *prev_frag; 896 struct PendingMessage *prev_frag;
866 897
898 /**
899 * This message, reliability boxed. Only possibly available if @e pmt is #PMT_CORE.
900 */
901 struct PendingMessage *bpm;
902
867 /** 903 /**
868 * Target of the request. 904 * Target of the request.
869 */ 905 */
@@ -1798,6 +1834,36 @@ free_fragment_tree (struct PendingMessage *root)
1798 1834
1799 1835
1800/** 1836/**
1837 * Release memory associated with @a pm and remove @a pm from associated
1838 * data structures. @a pm must be a top-level pending message and not
1839 * a fragment in the tree. The entire tree is freed (if applicable).
1840 *
1841 * @param pm the pending message to free
1842 */
1843static void
1844free_pending_message (struct PendingMessage *pm)
1845{
1846 struct TransportClient *tc = pm->client;
1847 struct Neighbour *target = pm->target;
1848
1849 if (NULL != tc)
1850 {
1851 GNUNET_CONTAINER_MDLL_remove (client,
1852 tc->details.core.pending_msg_head,
1853 tc->details.core.pending_msg_tail,
1854 pm);
1855 }
1856 GNUNET_CONTAINER_MDLL_remove (neighbour,
1857 target->pending_msg_head,
1858 target->pending_msg_tail,
1859 pm);
1860 free_fragment_tree (pm);
1861 GNUNET_free_non_null (pm->bpm);
1862 GNUNET_free (pm);
1863}
1864
1865
1866/**
1801 * Send a response to the @a pm that we have processed a 1867 * Send a response to the @a pm that we have processed a
1802 * "send" request with status @a success. We 1868 * "send" request with status @a success. We
1803 * transmitted @a bytes_physical on the actual wire. 1869 * transmitted @a bytes_physical on the actual wire.
@@ -1829,17 +1895,8 @@ client_send_response (struct PendingMessage *pm,
1829 som->peer = target->pid; 1895 som->peer = target->pid;
1830 GNUNET_MQ_send (tc->mq, 1896 GNUNET_MQ_send (tc->mq,
1831 env); 1897 env);
1832 GNUNET_CONTAINER_MDLL_remove (client,
1833 tc->details.core.pending_msg_head,
1834 tc->details.core.pending_msg_tail,
1835 pm);
1836 } 1898 }
1837 GNUNET_CONTAINER_MDLL_remove (neighbour, 1899 free_pending_message (pm);
1838 target->pending_msg_head,
1839 target->pending_msg_tail,
1840 pm);
1841 free_fragment_tree (pm);
1842 GNUNET_free (pm);
1843} 1900}
1844 1901
1845 1902
@@ -2176,36 +2233,291 @@ handle_del_address (void *cls,
2176 2233
2177 2234
2178/** 2235/**
2236 * Context from #handle_incoming_msg(). Closure for many
2237 * message handlers below.
2238 */
2239struct CommunicatorMessageContext
2240{
2241 /**
2242 * Which communicator provided us with the message.
2243 */
2244 struct TransportClient *tc;
2245
2246 /**
2247 * Additional information for flow control and about the sender.
2248 */
2249 struct GNUNET_TRANSPORT_IncomingMessage im;
2250};
2251
2252
2253/**
2254 * Send ACK to communicator (if requested) and free @a cmc.
2255 *
2256 * @param cmc context for which we are done handling the message
2257 */
2258static void
2259finish_cmc_handling (struct CommunicatorMessageContext *cmc)
2260{
2261 // FIXME: if (0 != ntohl (im->fc_on)) => send ACK when done to communicator for flow control!
2262 GNUNET_SERVICE_client_continue (cmc->tc->client);
2263
2264 GNUNET_free (cmc);
2265}
2266
2267
2268/**
2269 * Communicator gave us an unencapsulated message to pass
2270 * as-is to CORE. Process the request.
2271 *
2272 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2273 * @param mh the message that was received
2274 */
2275static void
2276handle_raw_message (void *cls,
2277 const struct GNUNET_MessageHeader *mh)
2278{
2279 struct CommunicatorMessageContext *cmc = cls;
2280
2281 // FIXME: do work!
2282 finish_cmc_handling (cmc);
2283}
2284
2285
2286/**
2287 * Communicator gave us a fragment box. Check the message.
2288 *
2289 * @param cls a `struct CommunicatorMessageContext`
2290 * @param fb the send message that was sent
2291 * @return #GNUNET_YES if message is well-formed
2292 */
2293static int
2294check_fragment_box (void *cls,
2295 const struct TransportFragmentBox *fb)
2296{
2297 // FIXME! check that off + size-of-payload <= total-length!
2298 return GNUNET_YES;
2299}
2300
2301
2302/**
2303 * Communicator gave us a fragment. Process the request.
2304 *
2305 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2306 * @param fb the message that was received
2307 */
2308static void
2309handle_fragment_box (void *cls,
2310 const struct TransportFragmentBox *fb)
2311{
2312 struct CommunicatorMessageContext *cmc = cls;
2313
2314 // FIXME: do work!
2315 finish_cmc_handling (cmc);
2316}
2317
2318
2319/**
2320 * Communicator gave us a fragment acknowledgement. Process the request.
2321 *
2322 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2323 * @param fa the message that was received
2324 */
2325static void
2326handle_fragment_ack (void *cls,
2327 const struct TransportFragmentAckMessage *fa)
2328{
2329 struct CommunicatorMessageContext *cmc = cls;
2330
2331 // FIXME: do work!
2332 finish_cmc_handling (cmc);
2333}
2334
2335
2336/**
2337 * Communicator gave us a reliability box. Check the message.
2338 *
2339 * @param cls a `struct CommunicatorMessageContext`
2340 * @param rb the send message that was sent
2341 * @return #GNUNET_YES if message is well-formed
2342 */
2343static int
2344check_reliability_box (void *cls,
2345 const struct TransportReliabilityBox *rb)
2346{
2347 GNUNET_MQ_check_boxed_message (rb);
2348 return GNUNET_YES;
2349}
2350
2351
2352/**
2353 * Communicator gave us a reliability box. Process the request.
2354 *
2355 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2356 * @param rb the message that was received
2357 */
2358static void
2359handle_reliability_box (void *cls,
2360 const struct TransportReliabilityBox *rb)
2361{
2362 struct CommunicatorMessageContext *cmc = cls;
2363
2364 // FIXME: do work!
2365 finish_cmc_handling (cmc);
2366}
2367
2368
2369/**
2370 * Communicator gave us a reliability ack. Process the request.
2371 *
2372 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2373 * @param ra the message that was received
2374 */
2375static void
2376handle_reliability_ack (void *cls,
2377 const struct TransportReliabilityAckMessage *ra)
2378{
2379 struct CommunicatorMessageContext *cmc = cls;
2380
2381 // FIXME: do work!
2382 finish_cmc_handling (cmc);
2383}
2384
2385
2386/**
2387 * Communicator gave us a backchannel encapsulation. Check the message.
2388 *
2389 * @param cls a `struct CommunicatorMessageContext`
2390 * @param be the send message that was sent
2391 * @return #GNUNET_YES if message is well-formed
2392 */
2393static int
2394check_backchannel_encapsulation (void *cls,
2395 const struct TransportBackchannelEncapsulationMessage *be)
2396{
2397 // FIXME: do work!
2398 return GNUNET_YES;
2399}
2400
2401
2402/**
2403 * Communicator gave us a backchannel encapsulation. Process the request.
2404 *
2405 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2406 * @param be the message that was received
2407 */
2408static void
2409handle_backchannel_encapsulation (void *cls,
2410 const struct TransportBackchannelEncapsulationMessage *be)
2411{
2412 struct CommunicatorMessageContext *cmc = cls;
2413
2414 // FIXME: do work!
2415 finish_cmc_handling (cmc);
2416}
2417
2418
2419/**
2420 * Communicator gave us an ephemeral confirmation. Process the request.
2421 *
2422 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2423 * @param ec the message that was received
2424 */
2425static void
2426handle_ephemeral_confirmation (void *cls,
2427 const struct EphemeralConfirmationMessage *ec)
2428{
2429 struct CommunicatorMessageContext *cmc = cls;
2430
2431 // FIXME: do work!
2432 finish_cmc_handling (cmc);
2433}
2434
2435
2436/**
2437 * Communicator gave us a DV learn message. Check the message.
2438 *
2439 * @param cls a `struct CommunicatorMessageContext`
2440 * @param dvl the send message that was sent
2441 * @return #GNUNET_YES if message is well-formed
2442 */
2443static int
2444check_dv_learn (void *cls,
2445 const struct TransportDVLearn *dvl)
2446{
2447 // FIXME: do work!
2448 return GNUNET_YES;
2449}
2450
2451
2452/**
2453 * Communicator gave us a DV learn message. Process the request.
2454 *
2455 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2456 * @param dvl the message that was received
2457 */
2458static void
2459handle_dv_learn (void *cls,
2460 const struct TransportDVLearn *dvl)
2461{
2462 struct CommunicatorMessageContext *cmc = cls;
2463
2464 // FIXME: do work!
2465 finish_cmc_handling (cmc);
2466}
2467
2468
2469/**
2470 * Communicator gave us a DV box. Check the message.
2471 *
2472 * @param cls a `struct CommunicatorMessageContext`
2473 * @param dvb the send message that was sent
2474 * @return #GNUNET_YES if message is well-formed
2475 */
2476static int
2477check_dv_box (void *cls,
2478 const struct TransportDVBox *dvb)
2479{
2480 // FIXME: do work!
2481 return GNUNET_YES;
2482}
2483
2484
2485/**
2486 * Communicator gave us a DV box. Process the request.
2487 *
2488 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2489 * @param dvb the message that was received
2490 */
2491static void
2492handle_dv_box (void *cls,
2493 const struct TransportDVBox *dvb)
2494{
2495 struct CommunicatorMessageContext *cmc = cls;
2496
2497 // FIXME: do work!
2498 finish_cmc_handling (cmc);
2499}
2500
2501
2502/**
2179 * Client notified us about transmission from a peer. Process the request. 2503 * Client notified us about transmission from a peer. Process the request.
2180 * 2504 *
2181 * @param cls the client 2505 * @param cls a `struct TransportClient` which sent us the message
2182 * @param obm the send message that was sent 2506 * @param obm the send message that was sent
2507 * @return #GNUNET_YES if message is well-formed
2183 */ 2508 */
2184static int 2509static int
2185check_incoming_msg (void *cls, 2510check_incoming_msg (void *cls,
2186 const struct GNUNET_TRANSPORT_IncomingMessage *im) 2511 const struct GNUNET_TRANSPORT_IncomingMessage *im)
2187{ 2512{
2188 struct TransportClient *tc = cls; 2513 struct TransportClient *tc = cls;
2189 uint16_t size;
2190 const struct GNUNET_MessageHeader *obmm;
2191 2514
2192 if (CT_COMMUNICATOR != tc->type) 2515 if (CT_COMMUNICATOR != tc->type)
2193 { 2516 {
2194 GNUNET_break (0); 2517 GNUNET_break (0);
2195 return GNUNET_SYSERR; 2518 return GNUNET_SYSERR;
2196 } 2519 }
2197 size = ntohs (im->header.size) - sizeof (*im); 2520 GNUNET_MQ_check_boxed_message (im);
2198 if (size < sizeof (struct GNUNET_MessageHeader))
2199 {
2200 GNUNET_break (0);
2201 return GNUNET_SYSERR;
2202 }
2203 obmm = (const struct GNUNET_MessageHeader *) &im[1];
2204 if (size != ntohs (obmm->size))
2205 {
2206 GNUNET_break (0);
2207 return GNUNET_SYSERR;
2208 }
2209 return GNUNET_OK; 2521 return GNUNET_OK;
2210} 2522}
2211 2523
@@ -2213,7 +2525,6 @@ check_incoming_msg (void *cls,
2213/** 2525/**
2214 * Incoming meessage. Process the request. 2526 * Incoming meessage. Process the request.
2215 * 2527 *
2216 * @param cls the client
2217 * @param im the send message that was received 2528 * @param im the send message that was received
2218 */ 2529 */
2219static void 2530static void
@@ -2221,8 +2532,61 @@ handle_incoming_msg (void *cls,
2221 const struct GNUNET_TRANSPORT_IncomingMessage *im) 2532 const struct GNUNET_TRANSPORT_IncomingMessage *im)
2222{ 2533{
2223 struct TransportClient *tc = cls; 2534 struct TransportClient *tc = cls;
2535 struct CommunicatorMessageContext *cmc = GNUNET_new (struct CommunicatorMessageContext);
2536 struct GNUNET_MQ_MessageHandler handlers[] = {
2537 GNUNET_MQ_hd_var_size (fragment_box,
2538 GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
2539 struct TransportFragmentBox,
2540 &cmc),
2541 GNUNET_MQ_hd_fixed_size (fragment_ack,
2542 GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
2543 struct TransportFragmentAckMessage,
2544 &cmc),
2545 GNUNET_MQ_hd_var_size (reliability_box,
2546 GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
2547 struct TransportReliabilityBox,
2548 &cmc),
2549 GNUNET_MQ_hd_fixed_size (reliability_ack,
2550 GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
2551 struct TransportReliabilityAckMessage,
2552 &cmc),
2553 GNUNET_MQ_hd_var_size (backchannel_encapsulation,
2554 GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
2555 struct TransportBackchannelEncapsulationMessage,
2556 &cmc),
2557 GNUNET_MQ_hd_fixed_size (ephemeral_confirmation,
2558 GNUNET_MESSAGE_TYPE_TRANSPORT_EPHEMERAL_CONFIRMATION,
2559 struct EphemeralConfirmationMessage,
2560 &cmc),
2561 GNUNET_MQ_hd_var_size (dv_learn,
2562 GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
2563 struct TransportDVLearn,
2564 &cmc),
2565 GNUNET_MQ_hd_var_size (dv_box,
2566 GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
2567 struct TransportDVBox,
2568 &cmc),
2569 GNUNET_MQ_handler_end()
2570 };
2571 int ret;
2224 2572
2225 GNUNET_SERVICE_client_continue (tc->client); 2573 cmc->tc = tc;
2574 cmc->im = *im;
2575 ret = GNUNET_MQ_handle_message (handlers,
2576 (const struct GNUNET_MessageHeader *) &im[1]);
2577 if (GNUNET_SYSERR == ret)
2578 {
2579 GNUNET_break (0);
2580 GNUNET_SERVICE_client_drop (tc->client);
2581 GNUNET_free (cmc);
2582 return;
2583 }
2584 if (GNUNET_NO == ret)
2585 {
2586 /* unencapsulated 'raw' message */
2587 handle_raw_message (&cmc,
2588 (const struct GNUNET_MessageHeader *) &im[1]);
2589 }
2226} 2590}
2227 2591
2228 2592
@@ -2269,6 +2633,23 @@ tracker_update_in_cb (void *cls)
2269 2633
2270 2634
2271/** 2635/**
2636 * If necessary, generates the UUID for a @a pm
2637 *
2638 * @param pm pending message to generate UUID for.
2639 */
2640static void
2641set_pending_message_uuid (struct PendingMessage *pm)
2642{
2643 if (pm->msg_uuid_set)
2644 return;
2645 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
2646 &pm->msg_uuid,
2647 sizeof (pm->msg_uuid));
2648 pm->msg_uuid_set = GNUNET_YES;
2649}
2650
2651
2652/**
2272 * Fragment the given @a pm to the given @a mtu. Adds 2653 * Fragment the given @a pm to the given @a mtu. Adds
2273 * additional fragments to the neighbour as well. If the 2654 * additional fragments to the neighbour as well. If the
2274 * @a mtu is too small, generates and error for the @a pm 2655 * @a mtu is too small, generates and error for the @a pm
@@ -2284,13 +2665,7 @@ fragment_message (struct PendingMessage *pm,
2284{ 2665{
2285 struct PendingMessage *ff; 2666 struct PendingMessage *ff;
2286 2667
2287 if (GNUNET_NO == pm->msg_uuid_set) 2668 set_pending_message_uuid (pm);
2288 {
2289 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
2290 &pm->msg_uuid,
2291 sizeof (pm->msg_uuid));
2292 pm->msg_uuid_set = GNUNET_YES;
2293 }
2294 2669
2295 /* This invariant is established in #handle_add_queue_message() */ 2670 /* This invariant is established in #handle_add_queue_message() */
2296 GNUNET_assert (mtu > sizeof (struct TransportFragmentBox)); 2671 GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
@@ -2390,24 +2765,50 @@ fragment_message (struct PendingMessage *pm,
2390static struct PendingMessage * 2765static struct PendingMessage *
2391reliability_box_message (struct PendingMessage *pm) 2766reliability_box_message (struct PendingMessage *pm)
2392{ 2767{
2393 if (PMT_CORE != pm->pmt) 2768 struct TransportReliabilityBox rbox;
2394 { 2769 struct PendingMessage *bpm;
2395 /* already fragmented or reliability boxed, or control message: do nothing */ 2770 char *msg;
2396 return pm; 2771
2397 } 2772 if (PMT_CORE != pm->pmt)
2398 2773 return pm; /* already fragmented or reliability boxed, or control message: do nothing */
2399 if (0) // FIXME 2774 if (NULL != pm->bpm)
2775 return pm->bpm; /* already computed earlier: do nothing */
2776 GNUNET_assert (NULL == pm->head_frag);
2777 if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX)
2400 { 2778 {
2401 /* failed hard */ 2779 /* failed hard */
2402 // FIMXE: bitch 2780 GNUNET_break (0);
2403 client_send_response (pm, 2781 client_send_response (pm,
2404 GNUNET_NO, 2782 GNUNET_NO,
2405 0); 2783 0);
2406 return NULL; 2784 return NULL;
2407 } 2785 }
2408 2786 bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
2409 /* FIXME: return boxed PM here! */ 2787 sizeof (rbox) +
2410 return NULL; 2788 pm->bytes_msg);
2789 bpm->target = pm->target;
2790 bpm->frag_parent = pm;
2791 GNUNET_CONTAINER_MDLL_insert (frag,
2792 pm->head_frag,
2793 pm->tail_frag,
2794 bpm);
2795 bpm->timeout = pm->timeout;
2796 bpm->pmt = PMT_RELIABILITY_BOX;
2797 bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
2798 set_pending_message_uuid (bpm);
2799 rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
2800 rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
2801 rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
2802 rbox.msg_uuid = pm->msg_uuid;
2803 msg = (char *) &bpm[1];
2804 memcpy (msg,
2805 &rbox,
2806 sizeof (rbox));
2807 memcpy (&msg[sizeof (rbox)],
2808 &pm[1],
2809 pm->bytes_msg);
2810 pm->bpm = bpm;
2811 return bpm;
2411} 2812}
2412 2813
2413 2814
@@ -2542,26 +2943,64 @@ transmit_on_queue (void *cls)
2542 else if (PMT_CORE != pm->pmt) 2943 else if (PMT_CORE != pm->pmt)
2543 { 2944 {
2544 /* This was an acknowledgement of some type, always free */ 2945 /* This was an acknowledgement of some type, always free */
2545 2946 free_pending_message (pm);
2546 struct Neighbour *neighbour = pm->target;
2547 GNUNET_CONTAINER_MDLL_remove (neighbour,
2548 neighbour->pending_msg_head,
2549 neighbour->pending_msg_tail,
2550 pm);
2551 GNUNET_free (pm);
2552 } 2947 }
2553 else 2948 else
2554 { 2949 {
2555 /* message not finished, waiting for acknowledgement */ 2950 /* message not finished, waiting for acknowledgement */
2556 // FIXME: update time by which we might retransmit 's' based on 2951 struct Neighbour *neighbour = pm->target;
2557 // queue characteristics (i.e. RTT) 2952 /* Update time by which we might retransmit 's' based on queue
2558 2953 characteristics (i.e. RTT); it takes one RTT for the message to
2559 // FIXME: move 'pm' back in the transmission queue (simplistic: to 2954 arrive and the ACK to come back in the best case; but the other
2560 // the end, better: with position depending on type, timeout, 2955 side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
2561 // etc.) 2956 retransmitting. Note that in the future this heuristic should
2957 likely be improved further (measure RTT stability, consider
2958 message urgency and size when delaying ACKs, etc.) */
2959 s->next_attempt = GNUNET_TIME_relative_to_absolute
2960 (GNUNET_TIME_relative_multiply (queue->rtt,
2961 4));
2962 if (s == pm)
2963 {
2964 struct PendingMessage *pos;
2965
2966 /* re-insert sort in neighbour list */
2967 GNUNET_CONTAINER_MDLL_remove (neighbour,
2968 neighbour->pending_msg_head,
2969 neighbour->pending_msg_tail,
2970 pm);
2971 pos = neighbour->pending_msg_tail;
2972 while ( (NULL != pos) &&
2973 (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
2974 pos = pos->prev_neighbour;
2975 GNUNET_CONTAINER_MDLL_insert_after (neighbour,
2976 neighbour->pending_msg_head,
2977 neighbour->pending_msg_tail,
2978 pos,
2979 pm);
2980 }
2981 else
2982 {
2983 /* re-insert sort in fragment list */
2984 struct PendingMessage *fp = s->frag_parent;
2985 struct PendingMessage *pos;
2986
2987 GNUNET_CONTAINER_MDLL_remove (frag,
2988 fp->head_frag,
2989 fp->tail_frag,
2990 s);
2991 pos = fp->tail_frag;
2992 while ( (NULL != pos) &&
2993 (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
2994 pos = pos->prev_frag;
2995 GNUNET_CONTAINER_MDLL_insert_after (frag,
2996 fp->head_frag,
2997 fp->tail_frag,
2998 pos,
2999 s);
3000 }
2562 } 3001 }
2563 3002
2564 /* finally, re-schedule self */ 3003 /* finally, re-schedule queue transmission task itself */
2565 schedule_transmit_on_queue (queue); 3004 schedule_transmit_on_queue (queue);
2566} 3005}
2567 3006
diff --git a/src/util/mq.c b/src/util/mq.c
index 4dfcb72be..d2f5add19 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -215,6 +215,35 @@ void
215GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, 215GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
216 const struct GNUNET_MessageHeader *mh) 216 const struct GNUNET_MessageHeader *mh)
217{ 217{
218 int ret;
219
220 ret = GNUNET_MQ_handle_message (mq->handlers,
221 mh);
222 if (GNUNET_SYSERR == ret)
223 {
224 GNUNET_MQ_inject_error (mq,
225 GNUNET_MQ_ERROR_MALFORMED);
226 return;
227 }
228}
229
230
231/**
232 * Call the message message handler that was registered
233 * for the type of the given message in the given @a handlers list.
234 *
235 * This function is indended to be used for the implementation
236 * of message queues.
237 *
238 * @param handlers a set of handlers
239 * @param mh message to dispatch
240 * @return #GNUNET_OK on success, #GNUNET_NO if no handler matched,
241 * #GNUNET_SYSERR if message was rejected by check function
242 */
243int
244GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
245 const struct GNUNET_MessageHeader *mh)
246{
218 const struct GNUNET_MQ_MessageHandler *handler; 247 const struct GNUNET_MQ_MessageHandler *handler;
219 int handled = GNUNET_NO; 248 int handled = GNUNET_NO;
220 uint16_t msize = ntohs (mh->size); 249 uint16_t msize = ntohs (mh->size);
@@ -224,9 +253,9 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
224 "Received message of type %u and size %u\n", 253 "Received message of type %u and size %u\n",
225 mtype, msize); 254 mtype, msize);
226 255
227 if (NULL == mq->handlers) 256 if (NULL == handlers)
228 goto done; 257 goto done;
229 for (handler = mq->handlers; NULL != handler->cb; handler++) 258 for (handler = handlers; NULL != handler->cb; handler++)
230 { 259 {
231 if (handler->type == mtype) 260 if (handler->type == mtype)
232 { 261 {
@@ -240,9 +269,7 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
240 LOG (GNUNET_ERROR_TYPE_ERROR, 269 LOG (GNUNET_ERROR_TYPE_ERROR,
241 "Received malformed message of type %u\n", 270 "Received malformed message of type %u\n",
242 (unsigned int) handler->type); 271 (unsigned int) handler->type);
243 GNUNET_MQ_inject_error (mq, 272 return GNUNET_SYSERR;
244 GNUNET_MQ_ERROR_MALFORMED);
245 break;
246 } 273 }
247 if ( (NULL == handler->mv) || 274 if ( (NULL == handler->mv) ||
248 (GNUNET_OK == 275 (GNUNET_OK ==
@@ -257,17 +284,20 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
257 LOG (GNUNET_ERROR_TYPE_ERROR, 284 LOG (GNUNET_ERROR_TYPE_ERROR,
258 "Received malformed message of type %u\n", 285 "Received malformed message of type %u\n",
259 (unsigned int) handler->type); 286 (unsigned int) handler->type);
260 GNUNET_MQ_inject_error (mq, 287 return GNUNET_SYSERR;
261 GNUNET_MQ_ERROR_MALFORMED);
262 } 288 }
263 break; 289 break;
264 } 290 }
265 } 291 }
266 done: 292 done:
267 if (GNUNET_NO == handled) 293 if (GNUNET_NO == handled)
294 {
268 LOG (GNUNET_ERROR_TYPE_INFO, 295 LOG (GNUNET_ERROR_TYPE_INFO,
269 "No handler for message of type %u and size %u\n", 296 "No handler for message of type %u and size %u\n",
270 mtype, msize); 297 mtype, msize);
298 return GNUNET_NO;
299 }
300 return GNUNET_OK;
271} 301}
272 302
273 303