aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-07-30 22:27:18 +0000
committerChristian Grothoff <christian@grothoff.org>2016-07-30 22:27:18 +0000
commitc4f4203aaff577fdbe60a7a0976e913dbb77f03e (patch)
tree2fca5cb9cdc7b8c8d046c81453313c275d7dd5cd /src
parente9dbabbabb9edd41bb7d7cb907826abdd8edb257 (diff)
downloadgnunet-c4f4203aaff577fdbe60a7a0976e913dbb77f03e.tar.gz
gnunet-c4f4203aaff577fdbe60a7a0976e913dbb77f03e.zip
enable setting per-envelope or per-queue transmission options with MQ API
Diffstat (limited to 'src')
-rw-r--r--src/core/core_api_2.c49
-rw-r--r--src/include/gnunet_core_service.h44
-rw-r--r--src/include/gnunet_mq_lib.h55
-rw-r--r--src/util/mq.c112
4 files changed, 250 insertions, 10 deletions
diff --git a/src/core/core_api_2.c b/src/core/core_api_2.c
index d45c98e93..70dd1e0f0 100644
--- a/src/core/core_api_2.c
+++ b/src/core/core_api_2.c
@@ -257,6 +257,26 @@ handle_mq_error (void *cls,
257 257
258 258
259/** 259/**
260 * Inquire with CORE what options should be set for a message
261 * so that it is transmitted with the given @a priority and
262 * the given @a cork value.
263 *
264 * @param cork desired corking
265 * @param priority desired message priority
266 * @param[out] flags set to `flags` value for #GNUNET_MQ_set_options()
267 * @return `extra` argument to give to #GNUNET_MQ_set_options()
268 */
269const void *
270GNUNET_CORE_get_mq_options (int cork,
271 enum GNUNET_CORE_Priority priority,
272 uint64_t *flags)
273{
274 *flags = ((uint64_t) priority) + (((uint64_t) cork) << 32);
275 return NULL;
276}
277
278
279/**
260 * Implement sending functionality of a message queue for 280 * Implement sending functionality of a message queue for
261 * us sending messages to a peer. 281 * us sending messages to a peer.
262 * 282 *
@@ -275,12 +295,20 @@ core_mq_send_impl (struct GNUNET_MQ_Handle *mq,
275 struct SendMessage *sm; 295 struct SendMessage *sm;
276 struct GNUNET_MQ_Envelope *env; 296 struct GNUNET_MQ_Envelope *env;
277 uint16_t msize; 297 uint16_t msize;
278 int cork 298 uint64_t flags;
279 = GNUNET_NO; // FIXME 299 int cork;
280 enum GNUNET_CORE_Priority priority 300 enum GNUNET_CORE_Priority priority;
281 = GNUNET_CORE_PRIO_BEST_EFFORT; // FIXME
282 301
283 GNUNET_assert (NULL == pr->env); 302 GNUNET_assert (NULL == pr->env);
303 /* extract options from envelope */
304 env = GNUNET_MQ_get_current_envelope (mq);
305 GNUNET_break (NULL ==
306 GNUNET_MQ_env_get_options (env,
307 &flags));
308 cork = (int) (flags >> 32);
309 priority = (uint32_t) flags;
310
311 /* check message size for sanity */
284 msize = ntohs (msg->size); 312 msize = ntohs (msg->size);
285 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct SendMessage)) 313 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct SendMessage))
286 { 314 {
@@ -288,6 +316,8 @@ core_mq_send_impl (struct GNUNET_MQ_Handle *mq,
288 GNUNET_MQ_impl_send_continue (mq); 316 GNUNET_MQ_impl_send_continue (mq);
289 return; 317 return;
290 } 318 }
319
320 /* ask core for transmission */
291 LOG (GNUNET_ERROR_TYPE_DEBUG, 321 LOG (GNUNET_ERROR_TYPE_DEBUG,
292 "Asking core for transmission of %u bytes to `%s'\n", 322 "Asking core for transmission of %u bytes to `%s'\n",
293 (unsigned int) msize, 323 (unsigned int) msize,
@@ -302,6 +332,8 @@ core_mq_send_impl (struct GNUNET_MQ_Handle *mq,
302 smr->smr_id = htons (++pr->smr_id_gen); 332 smr->smr_id = htons (++pr->smr_id_gen);
303 GNUNET_MQ_send (h->mq, 333 GNUNET_MQ_send (h->mq,
304 env); 334 env);
335
336 /* prepare message with actual transmission data */
305 pr->env = GNUNET_MQ_msg_nested_mh (sm, 337 pr->env = GNUNET_MQ_msg_nested_mh (sm,
306 GNUNET_MESSAGE_TYPE_CORE_SEND, 338 GNUNET_MESSAGE_TYPE_CORE_SEND,
307 msg); 339 msg);
@@ -385,6 +417,8 @@ connect_peer (struct GNUNET_CORE_Handle *h,
385 const struct GNUNET_PeerIdentity *peer) 417 const struct GNUNET_PeerIdentity *peer)
386{ 418{
387 struct PeerRecord *pr; 419 struct PeerRecord *pr;
420 uint64_t flags;
421 const void *extra;
388 422
389 pr = GNUNET_new (struct PeerRecord); 423 pr = GNUNET_new (struct PeerRecord);
390 pr->peer = *peer; 424 pr->peer = *peer;
@@ -401,6 +435,13 @@ connect_peer (struct GNUNET_CORE_Handle *h,
401 h->handlers, 435 h->handlers,
402 &core_mq_error_handler, 436 &core_mq_error_handler,
403 pr); 437 pr);
438 /* get our default options */
439 extra = GNUNET_CORE_get_mq_options (GNUNET_NO,
440 GNUNET_CORE_PRIO_BEST_EFFORT,
441 &flags);
442 GNUNET_MQ_set_options (pr->mq,
443 flags,
444 extra);
404 if (NULL != h->connects) 445 if (NULL != h->connects)
405 { 446 {
406 pr->client_cls = h->connects (h->cls, 447 pr->client_cls = h->connects (h->cls,
diff --git a/src/include/gnunet_core_service.h b/src/include/gnunet_core_service.h
index fa74415d6..6ec486b26 100644
--- a/src/include/gnunet_core_service.h
+++ b/src/include/gnunet_core_service.h
@@ -247,6 +247,16 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
247 int outbound_hdr_only, 247 int outbound_hdr_only,
248 const struct GNUNET_CORE_MessageHandler *handlers); 248 const struct GNUNET_CORE_MessageHandler *handlers);
249 249
250/**
251 * Disconnect from the core service. This function can only
252 * be called *after* all pending #GNUNET_CORE_notify_transmit_ready
253 * requests have been explicitly cancelled.
254 *
255 * @param handle connection to core to disconnect
256 */
257void
258GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle);
259
250 260
251/** 261/**
252 * Connect to the core service. Note that the connection may complete 262 * Connect to the core service. Note that the connection may complete
@@ -282,14 +292,40 @@ GNUNET_CORE_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg,
282 292
283 293
284/** 294/**
285 * Disconnect from the core service. This function can only 295 * Disconnect from the core service.
286 * be called *after* all pending #GNUNET_CORE_notify_transmit_ready
287 * requests have been explicitly cancelled.
288 * 296 *
289 * @param handle connection to core to disconnect 297 * @param handle connection to core to disconnect
290 */ 298 */
291void 299void
292GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle); 300GNUNET_CORE_disconnecT (struct GNUNET_CORE_Handle *handle);
301
302
303/**
304 * Inquire with CORE what options should be set for a message
305 * so that it is transmitted with the given @a priority and
306 * the given @a cork value.
307 *
308 * @param cork desired corking
309 * @param priority desired message priority
310 * @param[out] flags set to `flags` value for #GNUNET_MQ_set_options()
311 * @return `extra` argument to give to #GNUNET_MQ_set_options()
312 */
313const void *
314GNUNET_CORE_get_mq_options (int cork,
315 enum GNUNET_CORE_Priority priority,
316 uint64_t *flags);
317
318
319/**
320 * Obtain the message queue for a connected peer.
321 *
322 * @param h the core handle
323 * @param pid the identity of the peer
324 * @return NULL if @a pid is not connected
325 */
326struct GNUNET_MQ_Handle *
327GNUNET_CORE_get_mq (const struct GNUNET_CORE_Handle *h,
328 const struct GNUNET_PeerIdentity *pid);
293 329
294 330
295/** 331/**
diff --git a/src/include/gnunet_mq_lib.h b/src/include/gnunet_mq_lib.h
index 43cefca9f..35313263d 100644
--- a/src/include/gnunet_mq_lib.h
+++ b/src/include/gnunet_mq_lib.h
@@ -106,7 +106,7 @@
106 106
107 107
108/** 108/**
109 * Implementation of the GNUNET_MQ_extract_nexted_mh macro. 109 * Implementation of the #GNUNET_MQ_extract_nexted_mh macro.
110 * 110 *
111 * @param mh message header to extract nested message header from 111 * @param mh message header to extract nested message header from
112 * @param base_size size of the message before the nested message's header appears 112 * @param base_size size of the message before the nested message's header appears
@@ -305,7 +305,7 @@ struct GNUNET_MQ_MessageHandler
305/** 305/**
306 * End-marker for the handlers array 306 * End-marker for the handlers array
307 */ 307 */
308#define GNUNET_MQ_handler_end() {NULL, NULL, NULL, 0, 0} 308#define GNUNET_MQ_handler_end() { NULL, NULL, NULL, 0, 0 }
309 309
310 310
311/** 311/**
@@ -434,6 +434,57 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm);
434 434
435 435
436/** 436/**
437 * Function to obtain the current envelope from
438 * within #GNUNET_MQ_SendImpl implementations.
439 *
440 * @param mq message queue to interrogate
441 * @return the current envelope
442 */
443struct GNUNET_MQ_Envelope *
444GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq);
445
446
447/**
448 * Set application-specific options for this envelope.
449 * Overrides the options set for the queue with
450 * #GNUNET_MQ_set_options() for this message only.
451 *
452 * @param env message to set options for
453 * @param flags flags to use (meaning is queue-specific)
454 * @param extra additional buffer for further data (also queue-specific)
455 */
456void
457GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
458 uint64_t flags,
459 const void *extra);
460
461
462/**
463 * Get application-specific options for this envelope.
464 *
465 * @param env message to set options for
466 * @param[out] flags set to flags to use (meaning is queue-specific)
467 * @return extra additional buffer for further data (also queue-specific)
468 */
469const void *
470GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env,
471 uint64_t *flags);
472
473
474/**
475 * Set application-specific options for this queue.
476 *
477 * @param mq message queue to set options for
478 * @param flags flags to use (meaning is queue-specific)
479 * @param extra additional buffer for further data (also queue-specific)
480 */
481void
482GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
483 uint64_t flags,
484 const void *extra);
485
486
487/**
437 * Obtain the current length of the message queue. 488 * Obtain the current length of the message queue.
438 * 489 *
439 * @param mq queue to inspect 490 * @param mq queue to inspect
diff --git a/src/util/mq.c b/src/util/mq.c
index 1638d7e0c..a4ea5e39d 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -63,6 +63,26 @@ struct GNUNET_MQ_Envelope
63 * Closure for @e send_cb 63 * Closure for @e send_cb
64 */ 64 */
65 void *sent_cls; 65 void *sent_cls;
66
67 /**
68 * Flags that were set for this envelope by
69 * #GNUNET_MQ_env_set_options(). Only valid if
70 * @e have_custom_options is set.
71 */
72 uint64_t flags;
73
74 /**
75 * Additional options buffer set for this envelope by
76 * #GNUNET_MQ_env_set_options(). Only valid if
77 * @e have_custom_options is set.
78 */
79 const void *extra;
80
81 /**
82 * Did the application call #GNUNET_MQ_env_set_options()?
83 */
84 int have_custom_options;
85
66}; 86};
67 87
68 88
@@ -135,6 +155,18 @@ struct GNUNET_MQ_Handle
135 struct GNUNET_SCHEDULER_Task *continue_task; 155 struct GNUNET_SCHEDULER_Task *continue_task;
136 156
137 /** 157 /**
158 * Additional options buffer set for this queue by
159 * #GNUNET_MQ_set_options(). Default is 0.
160 */
161 const void *default_extra;
162
163 /**
164 * Flags that were set for this queue by
165 * #GNUNET_MQ_set_options(). Default is 0.
166 */
167 uint64_t default_flags;
168
169 /**
138 * Next id that should be used for the @e assoc_map, 170 * Next id that should be used for the @e assoc_map,
139 * initialized lazily to a random value together with 171 * initialized lazily to a random value together with
140 * @e assoc_map 172 * @e assoc_map
@@ -1040,4 +1072,84 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
1040 GNUNET_free (ev); 1072 GNUNET_free (ev);
1041} 1073}
1042 1074
1075
1076/**
1077 * Function to obtain the current envelope from
1078 * within #GNUNET_MQ_SendImpl implementations.
1079 *
1080 * @param mq message queue to interrogate
1081 * @return the current envelope
1082 */
1083struct GNUNET_MQ_Envelope *
1084GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
1085{
1086 return mq->current_envelope;
1087}
1088
1089
1090/**
1091 * Set application-specific options for this envelope.
1092 * Overrides the options set for the queue with
1093 * #GNUNET_MQ_set_options() for this message only.
1094 *
1095 * @param env message to set options for
1096 * @param flags flags to use (meaning is queue-specific)
1097 * @param extra additional buffer for further data (also queue-specific)
1098 */
1099void
1100GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
1101 uint64_t flags,
1102 const void *extra)
1103{
1104 env->flags = flags;
1105 env->extra = extra;
1106 env->have_custom_options = GNUNET_YES;
1107}
1108
1109
1110/**
1111 * Get application-specific options for this envelope.
1112 *
1113 * @param env message to set options for
1114 * @param[out] flags set to flags to use (meaning is queue-specific)
1115 * @return extra additional buffer for further data (also queue-specific)
1116 */
1117const void *
1118GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env,
1119 uint64_t *flags)
1120{
1121 struct GNUNET_MQ_Handle *mq = env->parent_queue;
1122
1123 if (GNUNET_YES == env->have_custom_options)
1124 {
1125 *flags = env->flags;
1126 return env->extra;
1127 }
1128 if (NULL == mq)
1129 {
1130 *flags = 0;
1131 return NULL;
1132 }
1133 *flags = mq->default_flags;
1134 return mq->default_extra;
1135}
1136
1137
1138/**
1139 * Set application-specific options for this queue.
1140 *
1141 * @param mq message queue to set options for
1142 * @param flags flags to use (meaning is queue-specific)
1143 * @param extra additional buffer for further data (also queue-specific)
1144 */
1145void
1146GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
1147 uint64_t flags,
1148 const void *extra)
1149{
1150 mq->default_flags = flags;
1151 mq->default_extra = extra;
1152}
1153
1154
1043/* end of mq.c */ 1155/* end of mq.c */