diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-07-30 22:27:18 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-07-30 22:27:18 +0000 |
commit | c4f4203aaff577fdbe60a7a0976e913dbb77f03e (patch) | |
tree | 2fca5cb9cdc7b8c8d046c81453313c275d7dd5cd /src | |
parent | e9dbabbabb9edd41bb7d7cb907826abdd8edb257 (diff) | |
download | gnunet-c4f4203aaff577fdbe60a7a0976e913dbb77f03e.tar.gz gnunet-c4f4203aaff577fdbe60a7a0976e913dbb77f03e.zip |
enable setting per-envelope or per-queue transmission options with MQ API
Diffstat (limited to 'src')
-rw-r--r-- | src/core/core_api_2.c | 49 | ||||
-rw-r--r-- | src/include/gnunet_core_service.h | 44 | ||||
-rw-r--r-- | src/include/gnunet_mq_lib.h | 55 | ||||
-rw-r--r-- | src/util/mq.c | 112 |
4 files changed, 250 insertions, 10 deletions
diff --git a/src/core/core_api_2.c b/src/core/core_api_2.c index d45c98e93..70dd1e0f0 100644 --- a/src/core/core_api_2.c +++ b/src/core/core_api_2.c | |||
@@ -257,6 +257,26 @@ handle_mq_error (void *cls, | |||
257 | 257 | ||
258 | 258 | ||
259 | /** | 259 | /** |
260 | * Inquire with CORE what options should be set for a message | ||
261 | * so that it is transmitted with the given @a priority and | ||
262 | * the given @a cork value. | ||
263 | * | ||
264 | * @param cork desired corking | ||
265 | * @param priority desired message priority | ||
266 | * @param[out] flags set to `flags` value for #GNUNET_MQ_set_options() | ||
267 | * @return `extra` argument to give to #GNUNET_MQ_set_options() | ||
268 | */ | ||
269 | const void * | ||
270 | GNUNET_CORE_get_mq_options (int cork, | ||
271 | enum GNUNET_CORE_Priority priority, | ||
272 | uint64_t *flags) | ||
273 | { | ||
274 | *flags = ((uint64_t) priority) + (((uint64_t) cork) << 32); | ||
275 | return NULL; | ||
276 | } | ||
277 | |||
278 | |||
279 | /** | ||
260 | * Implement sending functionality of a message queue for | 280 | * Implement sending functionality of a message queue for |
261 | * us sending messages to a peer. | 281 | * us sending messages to a peer. |
262 | * | 282 | * |
@@ -275,12 +295,20 @@ core_mq_send_impl (struct GNUNET_MQ_Handle *mq, | |||
275 | struct SendMessage *sm; | 295 | struct SendMessage *sm; |
276 | struct GNUNET_MQ_Envelope *env; | 296 | struct GNUNET_MQ_Envelope *env; |
277 | uint16_t msize; | 297 | uint16_t msize; |
278 | int cork | 298 | uint64_t flags; |
279 | = GNUNET_NO; // FIXME | 299 | int cork; |
280 | enum GNUNET_CORE_Priority priority | 300 | enum GNUNET_CORE_Priority priority; |
281 | = GNUNET_CORE_PRIO_BEST_EFFORT; // FIXME | ||
282 | 301 | ||
283 | GNUNET_assert (NULL == pr->env); | 302 | GNUNET_assert (NULL == pr->env); |
303 | /* extract options from envelope */ | ||
304 | env = GNUNET_MQ_get_current_envelope (mq); | ||
305 | GNUNET_break (NULL == | ||
306 | GNUNET_MQ_env_get_options (env, | ||
307 | &flags)); | ||
308 | cork = (int) (flags >> 32); | ||
309 | priority = (uint32_t) flags; | ||
310 | |||
311 | /* check message size for sanity */ | ||
284 | msize = ntohs (msg->size); | 312 | msize = ntohs (msg->size); |
285 | if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct SendMessage)) | 313 | if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct SendMessage)) |
286 | { | 314 | { |
@@ -288,6 +316,8 @@ core_mq_send_impl (struct GNUNET_MQ_Handle *mq, | |||
288 | GNUNET_MQ_impl_send_continue (mq); | 316 | GNUNET_MQ_impl_send_continue (mq); |
289 | return; | 317 | return; |
290 | } | 318 | } |
319 | |||
320 | /* ask core for transmission */ | ||
291 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 321 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
292 | "Asking core for transmission of %u bytes to `%s'\n", | 322 | "Asking core for transmission of %u bytes to `%s'\n", |
293 | (unsigned int) msize, | 323 | (unsigned int) msize, |
@@ -302,6 +332,8 @@ core_mq_send_impl (struct GNUNET_MQ_Handle *mq, | |||
302 | smr->smr_id = htons (++pr->smr_id_gen); | 332 | smr->smr_id = htons (++pr->smr_id_gen); |
303 | GNUNET_MQ_send (h->mq, | 333 | GNUNET_MQ_send (h->mq, |
304 | env); | 334 | env); |
335 | |||
336 | /* prepare message with actual transmission data */ | ||
305 | pr->env = GNUNET_MQ_msg_nested_mh (sm, | 337 | pr->env = GNUNET_MQ_msg_nested_mh (sm, |
306 | GNUNET_MESSAGE_TYPE_CORE_SEND, | 338 | GNUNET_MESSAGE_TYPE_CORE_SEND, |
307 | msg); | 339 | msg); |
@@ -385,6 +417,8 @@ connect_peer (struct GNUNET_CORE_Handle *h, | |||
385 | const struct GNUNET_PeerIdentity *peer) | 417 | const struct GNUNET_PeerIdentity *peer) |
386 | { | 418 | { |
387 | struct PeerRecord *pr; | 419 | struct PeerRecord *pr; |
420 | uint64_t flags; | ||
421 | const void *extra; | ||
388 | 422 | ||
389 | pr = GNUNET_new (struct PeerRecord); | 423 | pr = GNUNET_new (struct PeerRecord); |
390 | pr->peer = *peer; | 424 | pr->peer = *peer; |
@@ -401,6 +435,13 @@ connect_peer (struct GNUNET_CORE_Handle *h, | |||
401 | h->handlers, | 435 | h->handlers, |
402 | &core_mq_error_handler, | 436 | &core_mq_error_handler, |
403 | pr); | 437 | pr); |
438 | /* get our default options */ | ||
439 | extra = GNUNET_CORE_get_mq_options (GNUNET_NO, | ||
440 | GNUNET_CORE_PRIO_BEST_EFFORT, | ||
441 | &flags); | ||
442 | GNUNET_MQ_set_options (pr->mq, | ||
443 | flags, | ||
444 | extra); | ||
404 | if (NULL != h->connects) | 445 | if (NULL != h->connects) |
405 | { | 446 | { |
406 | pr->client_cls = h->connects (h->cls, | 447 | pr->client_cls = h->connects (h->cls, |
diff --git a/src/include/gnunet_core_service.h b/src/include/gnunet_core_service.h index fa74415d6..6ec486b26 100644 --- a/src/include/gnunet_core_service.h +++ b/src/include/gnunet_core_service.h | |||
@@ -247,6 +247,16 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
247 | int outbound_hdr_only, | 247 | int outbound_hdr_only, |
248 | const struct GNUNET_CORE_MessageHandler *handlers); | 248 | const struct GNUNET_CORE_MessageHandler *handlers); |
249 | 249 | ||
250 | /** | ||
251 | * Disconnect from the core service. This function can only | ||
252 | * be called *after* all pending #GNUNET_CORE_notify_transmit_ready | ||
253 | * requests have been explicitly cancelled. | ||
254 | * | ||
255 | * @param handle connection to core to disconnect | ||
256 | */ | ||
257 | void | ||
258 | GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle); | ||
259 | |||
250 | 260 | ||
251 | /** | 261 | /** |
252 | * Connect to the core service. Note that the connection may complete | 262 | * Connect to the core service. Note that the connection may complete |
@@ -282,14 +292,40 @@ GNUNET_CORE_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
282 | 292 | ||
283 | 293 | ||
284 | /** | 294 | /** |
285 | * Disconnect from the core service. This function can only | 295 | * Disconnect from the core service. |
286 | * be called *after* all pending #GNUNET_CORE_notify_transmit_ready | ||
287 | * requests have been explicitly cancelled. | ||
288 | * | 296 | * |
289 | * @param handle connection to core to disconnect | 297 | * @param handle connection to core to disconnect |
290 | */ | 298 | */ |
291 | void | 299 | void |
292 | GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle); | 300 | GNUNET_CORE_disconnecT (struct GNUNET_CORE_Handle *handle); |
301 | |||
302 | |||
303 | /** | ||
304 | * Inquire with CORE what options should be set for a message | ||
305 | * so that it is transmitted with the given @a priority and | ||
306 | * the given @a cork value. | ||
307 | * | ||
308 | * @param cork desired corking | ||
309 | * @param priority desired message priority | ||
310 | * @param[out] flags set to `flags` value for #GNUNET_MQ_set_options() | ||
311 | * @return `extra` argument to give to #GNUNET_MQ_set_options() | ||
312 | */ | ||
313 | const void * | ||
314 | GNUNET_CORE_get_mq_options (int cork, | ||
315 | enum GNUNET_CORE_Priority priority, | ||
316 | uint64_t *flags); | ||
317 | |||
318 | |||
319 | /** | ||
320 | * Obtain the message queue for a connected peer. | ||
321 | * | ||
322 | * @param h the core handle | ||
323 | * @param pid the identity of the peer | ||
324 | * @return NULL if @a pid is not connected | ||
325 | */ | ||
326 | struct GNUNET_MQ_Handle * | ||
327 | GNUNET_CORE_get_mq (const struct GNUNET_CORE_Handle *h, | ||
328 | const struct GNUNET_PeerIdentity *pid); | ||
293 | 329 | ||
294 | 330 | ||
295 | /** | 331 | /** |
diff --git a/src/include/gnunet_mq_lib.h b/src/include/gnunet_mq_lib.h index 43cefca9f..35313263d 100644 --- a/src/include/gnunet_mq_lib.h +++ b/src/include/gnunet_mq_lib.h | |||
@@ -106,7 +106,7 @@ | |||
106 | 106 | ||
107 | 107 | ||
108 | /** | 108 | /** |
109 | * Implementation of the GNUNET_MQ_extract_nexted_mh macro. | 109 | * Implementation of the #GNUNET_MQ_extract_nexted_mh macro. |
110 | * | 110 | * |
111 | * @param mh message header to extract nested message header from | 111 | * @param mh message header to extract nested message header from |
112 | * @param base_size size of the message before the nested message's header appears | 112 | * @param base_size size of the message before the nested message's header appears |
@@ -305,7 +305,7 @@ struct GNUNET_MQ_MessageHandler | |||
305 | /** | 305 | /** |
306 | * End-marker for the handlers array | 306 | * End-marker for the handlers array |
307 | */ | 307 | */ |
308 | #define GNUNET_MQ_handler_end() {NULL, NULL, NULL, 0, 0} | 308 | #define GNUNET_MQ_handler_end() { NULL, NULL, NULL, 0, 0 } |
309 | 309 | ||
310 | 310 | ||
311 | /** | 311 | /** |
@@ -434,6 +434,57 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm); | |||
434 | 434 | ||
435 | 435 | ||
436 | /** | 436 | /** |
437 | * Function to obtain the current envelope from | ||
438 | * within #GNUNET_MQ_SendImpl implementations. | ||
439 | * | ||
440 | * @param mq message queue to interrogate | ||
441 | * @return the current envelope | ||
442 | */ | ||
443 | struct GNUNET_MQ_Envelope * | ||
444 | GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq); | ||
445 | |||
446 | |||
447 | /** | ||
448 | * Set application-specific options for this envelope. | ||
449 | * Overrides the options set for the queue with | ||
450 | * #GNUNET_MQ_set_options() for this message only. | ||
451 | * | ||
452 | * @param env message to set options for | ||
453 | * @param flags flags to use (meaning is queue-specific) | ||
454 | * @param extra additional buffer for further data (also queue-specific) | ||
455 | */ | ||
456 | void | ||
457 | GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env, | ||
458 | uint64_t flags, | ||
459 | const void *extra); | ||
460 | |||
461 | |||
462 | /** | ||
463 | * Get application-specific options for this envelope. | ||
464 | * | ||
465 | * @param env message to set options for | ||
466 | * @param[out] flags set to flags to use (meaning is queue-specific) | ||
467 | * @return extra additional buffer for further data (also queue-specific) | ||
468 | */ | ||
469 | const void * | ||
470 | GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env, | ||
471 | uint64_t *flags); | ||
472 | |||
473 | |||
474 | /** | ||
475 | * Set application-specific options for this queue. | ||
476 | * | ||
477 | * @param mq message queue to set options for | ||
478 | * @param flags flags to use (meaning is queue-specific) | ||
479 | * @param extra additional buffer for further data (also queue-specific) | ||
480 | */ | ||
481 | void | ||
482 | GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq, | ||
483 | uint64_t flags, | ||
484 | const void *extra); | ||
485 | |||
486 | |||
487 | /** | ||
437 | * Obtain the current length of the message queue. | 488 | * Obtain the current length of the message queue. |
438 | * | 489 | * |
439 | * @param mq queue to inspect | 490 | * @param mq queue to inspect |
diff --git a/src/util/mq.c b/src/util/mq.c index 1638d7e0c..a4ea5e39d 100644 --- a/src/util/mq.c +++ b/src/util/mq.c | |||
@@ -63,6 +63,26 @@ struct GNUNET_MQ_Envelope | |||
63 | * Closure for @e send_cb | 63 | * Closure for @e send_cb |
64 | */ | 64 | */ |
65 | void *sent_cls; | 65 | void *sent_cls; |
66 | |||
67 | /** | ||
68 | * Flags that were set for this envelope by | ||
69 | * #GNUNET_MQ_env_set_options(). Only valid if | ||
70 | * @e have_custom_options is set. | ||
71 | */ | ||
72 | uint64_t flags; | ||
73 | |||
74 | /** | ||
75 | * Additional options buffer set for this envelope by | ||
76 | * #GNUNET_MQ_env_set_options(). Only valid if | ||
77 | * @e have_custom_options is set. | ||
78 | */ | ||
79 | const void *extra; | ||
80 | |||
81 | /** | ||
82 | * Did the application call #GNUNET_MQ_env_set_options()? | ||
83 | */ | ||
84 | int have_custom_options; | ||
85 | |||
66 | }; | 86 | }; |
67 | 87 | ||
68 | 88 | ||
@@ -135,6 +155,18 @@ struct GNUNET_MQ_Handle | |||
135 | struct GNUNET_SCHEDULER_Task *continue_task; | 155 | struct GNUNET_SCHEDULER_Task *continue_task; |
136 | 156 | ||
137 | /** | 157 | /** |
158 | * Additional options buffer set for this queue by | ||
159 | * #GNUNET_MQ_set_options(). Default is 0. | ||
160 | */ | ||
161 | const void *default_extra; | ||
162 | |||
163 | /** | ||
164 | * Flags that were set for this queue by | ||
165 | * #GNUNET_MQ_set_options(). Default is 0. | ||
166 | */ | ||
167 | uint64_t default_flags; | ||
168 | |||
169 | /** | ||
138 | * Next id that should be used for the @e assoc_map, | 170 | * Next id that should be used for the @e assoc_map, |
139 | * initialized lazily to a random value together with | 171 | * initialized lazily to a random value together with |
140 | * @e assoc_map | 172 | * @e assoc_map |
@@ -1040,4 +1072,84 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev) | |||
1040 | GNUNET_free (ev); | 1072 | GNUNET_free (ev); |
1041 | } | 1073 | } |
1042 | 1074 | ||
1075 | |||
1076 | /** | ||
1077 | * Function to obtain the current envelope from | ||
1078 | * within #GNUNET_MQ_SendImpl implementations. | ||
1079 | * | ||
1080 | * @param mq message queue to interrogate | ||
1081 | * @return the current envelope | ||
1082 | */ | ||
1083 | struct GNUNET_MQ_Envelope * | ||
1084 | GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq) | ||
1085 | { | ||
1086 | return mq->current_envelope; | ||
1087 | } | ||
1088 | |||
1089 | |||
1090 | /** | ||
1091 | * Set application-specific options for this envelope. | ||
1092 | * Overrides the options set for the queue with | ||
1093 | * #GNUNET_MQ_set_options() for this message only. | ||
1094 | * | ||
1095 | * @param env message to set options for | ||
1096 | * @param flags flags to use (meaning is queue-specific) | ||
1097 | * @param extra additional buffer for further data (also queue-specific) | ||
1098 | */ | ||
1099 | void | ||
1100 | GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env, | ||
1101 | uint64_t flags, | ||
1102 | const void *extra) | ||
1103 | { | ||
1104 | env->flags = flags; | ||
1105 | env->extra = extra; | ||
1106 | env->have_custom_options = GNUNET_YES; | ||
1107 | } | ||
1108 | |||
1109 | |||
1110 | /** | ||
1111 | * Get application-specific options for this envelope. | ||
1112 | * | ||
1113 | * @param env message to set options for | ||
1114 | * @param[out] flags set to flags to use (meaning is queue-specific) | ||
1115 | * @return extra additional buffer for further data (also queue-specific) | ||
1116 | */ | ||
1117 | const void * | ||
1118 | GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env, | ||
1119 | uint64_t *flags) | ||
1120 | { | ||
1121 | struct GNUNET_MQ_Handle *mq = env->parent_queue; | ||
1122 | |||
1123 | if (GNUNET_YES == env->have_custom_options) | ||
1124 | { | ||
1125 | *flags = env->flags; | ||
1126 | return env->extra; | ||
1127 | } | ||
1128 | if (NULL == mq) | ||
1129 | { | ||
1130 | *flags = 0; | ||
1131 | return NULL; | ||
1132 | } | ||
1133 | *flags = mq->default_flags; | ||
1134 | return mq->default_extra; | ||
1135 | } | ||
1136 | |||
1137 | |||
1138 | /** | ||
1139 | * Set application-specific options for this queue. | ||
1140 | * | ||
1141 | * @param mq message queue to set options for | ||
1142 | * @param flags flags to use (meaning is queue-specific) | ||
1143 | * @param extra additional buffer for further data (also queue-specific) | ||
1144 | */ | ||
1145 | void | ||
1146 | GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq, | ||
1147 | uint64_t flags, | ||
1148 | const void *extra) | ||
1149 | { | ||
1150 | mq->default_flags = flags; | ||
1151 | mq->default_extra = extra; | ||
1152 | } | ||
1153 | |||
1154 | |||
1043 | /* end of mq.c */ | 1155 | /* end of mq.c */ |