aboutsummaryrefslogtreecommitdiff
path: root/src/cadet/cadet_api.c
diff options
context:
space:
mode:
authorBart Polot <bart.polot+voyager@gmail.com>2017-01-31 02:58:54 +0100
committerBart Polot <bart.polot+voyager@gmail.com>2017-01-31 02:58:54 +0100
commitbc38effeecf8300b99de2f367e01c9d02fcafb78 (patch)
tree6366d9f50b482b7636479eaab8103ee82250d607 /src/cadet/cadet_api.c
parentb2c3389e8de9d24794d4b4bb499e14408101d433 (diff)
downloadgnunet-bc38effeecf8300b99de2f367e01c9d02fcafb78.tar.gz
gnunet-bc38effeecf8300b99de2f367e01c9d02fcafb78.zip
Implement the connect and create_channel call for mq api
Diffstat (limited to 'src/cadet/cadet_api.c')
-rw-r--r--src/cadet/cadet_api.c308
1 files changed, 298 insertions, 10 deletions
diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c
index 2b50f781c..3491bd75f 100644
--- a/src/cadet/cadet_api.c
+++ b/src/cadet/cadet_api.c
@@ -38,6 +38,8 @@
38 38
39/** 39/**
40 * Transmission queue to the service 40 * Transmission queue to the service
41 *
42 * @deprecated
41 */ 43 */
42struct GNUNET_CADET_TransmitHandle 44struct GNUNET_CADET_TransmitHandle
43{ 45{
@@ -117,17 +119,26 @@ union CadetInfoCB
117struct GNUNET_CADET_Handle 119struct GNUNET_CADET_Handle
118{ 120{
119 /** 121 /**
122 * Flag to indicate old or MQ API.
123 */
124 int mq_api;
125
126 /**
120 * Message queue (if available). 127 * Message queue (if available).
121 */ 128 */
122 struct GNUNET_MQ_Handle *mq; 129 struct GNUNET_MQ_Handle *mq;
123 130
124 /** 131 /**
125 * Set of handlers used for processing incoming messages in the channels 132 * Set of handlers used for processing incoming messages in the channels
133 *
134 * @deprecated
126 */ 135 */
127 const struct GNUNET_CADET_MessageHandler *message_handlers; 136 const struct GNUNET_CADET_MessageHandler *message_handlers;
128 137
129 /** 138 /**
130 * Number of handlers in the handlers array. 139 * Number of handlers in the handlers array.
140 *
141 * @deprecated
131 */ 142 */
132 unsigned int n_handlers; 143 unsigned int n_handlers;
133 144
@@ -153,16 +164,22 @@ struct GNUNET_CADET_Handle
153 164
154 /** 165 /**
155 * Closure for all the handlers given by the client 166 * Closure for all the handlers given by the client
167 *
168 * @deprecated
156 */ 169 */
157 void *cls; 170 void *cls;
158 171
159 /** 172 /**
160 * Messages to send to the service, head. 173 * Messages to send to the service, head.
174 *
175 * @deprecated
161 */ 176 */
162 struct GNUNET_CADET_TransmitHandle *th_head; 177 struct GNUNET_CADET_TransmitHandle *th_head;
163 178
164 /** 179 /**
165 * Messages to send to the service, tail. 180 * Messages to send to the service, tail.
181 *
182 * @deprecated
166 */ 183 */
167 struct GNUNET_CADET_TransmitHandle *th_tail; 184 struct GNUNET_CADET_TransmitHandle *th_tail;
168 185
@@ -241,9 +258,9 @@ struct GNUNET_CADET_Channel
241 struct GNUNET_CADET_ClientChannelNumber ccn; 258 struct GNUNET_CADET_ClientChannelNumber ccn;
242 259
243 /** 260 /**
244 * Channel's port, if any. 261 * Channel's port, if incoming.
245 */ 262 */
246 struct GNUNET_CADET_Port *port; 263 struct GNUNET_CADET_Port *incoming_port;
247 264
248 /** 265 /**
249 * Other end of the channel. 266 * Other end of the channel.
@@ -262,9 +279,30 @@ struct GNUNET_CADET_Channel
262 279
263 /** 280 /**
264 * Are we allowed to send to the service? 281 * Are we allowed to send to the service?
282 *
283 * @deprecated
265 */ 284 */
266 unsigned int allow_send; 285 unsigned int allow_send;
267 286
287 /****************************************************************************/
288 /***************************** MQ ************************************/
289 /****************************************************************************/
290
291 /**
292 * Message Queue for the channel.
293 */
294 struct GNUNET_MQ_Handle *mq;
295
296 /**
297 * Window change handler.
298 */
299 GNUNET_CADET_WindowSizeEventHandler window_changes;
300
301 /**
302 * Disconnect handler.
303 */
304 GNUNET_CADET_DisconnectEventHandler disconnects;
305
268}; 306};
269 307
270 308
@@ -611,7 +649,7 @@ handle_channel_created (void *cls,
611 ch->peer = GNUNET_PEER_intern (&msg->peer); 649 ch->peer = GNUNET_PEER_intern (&msg->peer);
612 ch->cadet = h; 650 ch->cadet = h;
613 ch->ccn = ccn; 651 ch->ccn = ccn;
614 ch->port = port; 652 ch->incoming_port = port;
615 ch->options = ntohl (msg->opt); 653 ch->options = ntohl (msg->opt);
616 654
617 LOG (GNUNET_ERROR_TYPE_DEBUG, 655 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -2047,9 +2085,9 @@ cadet_mq_ntr (void *cls, size_t size,
2047 * @param impl_state state of the implementation 2085 * @param impl_state state of the implementation
2048 */ 2086 */
2049static void 2087static void
2050cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq, 2088cadet_mq_send_impl_old (struct GNUNET_MQ_Handle *mq,
2051 const struct GNUNET_MessageHeader *msg, 2089 const struct GNUNET_MessageHeader *msg,
2052 void *impl_state) 2090 void *impl_state)
2053{ 2091{
2054 struct CadetMQState *state = impl_state; 2092 struct CadetMQState *state = impl_state;
2055 2093
@@ -2075,8 +2113,8 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
2075 * @param impl_state state of the implementation 2113 * @param impl_state state of the implementation
2076 */ 2114 */
2077static void 2115static void
2078cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, 2116cadet_mq_destroy_impl_old (struct GNUNET_MQ_Handle *mq,
2079 void *impl_state) 2117 void *impl_state)
2080{ 2118{
2081 struct CadetMQState *state = impl_state; 2119 struct CadetMQState *state = impl_state;
2082 2120
@@ -2104,8 +2142,8 @@ GNUNET_CADET_mq_create (struct GNUNET_CADET_Channel *channel)
2104 state = GNUNET_new (struct CadetMQState); 2142 state = GNUNET_new (struct CadetMQState);
2105 state->channel = channel; 2143 state->channel = channel;
2106 2144
2107 mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl, 2145 mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl_old,
2108 &cadet_mq_destroy_impl, 2146 &cadet_mq_destroy_impl_old,
2109 NULL, /* FIXME: cancel impl. */ 2147 NULL, /* FIXME: cancel impl. */
2110 state, 2148 state,
2111 NULL, /* no msg handlers */ 2149 NULL, /* no msg handlers */
@@ -2136,3 +2174,253 @@ GC_u2h (uint32_t port)
2136 2174
2137 return &hash; 2175 return &hash;
2138} 2176}
2177
2178
2179
2180/******************************************************************************/
2181/******************************* MQ-BASED API *********************************/
2182/******************************************************************************/
2183
2184/**
2185 * Connect to the MQ-based cadet service.
2186 *
2187 * @param cfg Configuration to use.
2188 *
2189 * @return Handle to the cadet service NULL on error.
2190 */
2191struct GNUNET_CADET_Handle *
2192GNUNET_CADET_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg)
2193{
2194 struct GNUNET_CADET_Handle *h;
2195
2196 LOG (GNUNET_ERROR_TYPE_DEBUG,
2197 "GNUNET_CADET_connecT()\n");
2198 h = GNUNET_new (struct GNUNET_CADET_Handle);
2199 h->cfg = cfg;
2200 h->mq_api = GNUNET_YES;
2201 h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
2202 do_reconnect (h);
2203 if (h->mq == NULL)
2204 {
2205 GNUNET_break (0);
2206 GNUNET_CADET_disconnect (h);
2207 return NULL;
2208 }
2209 h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
2210 h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
2211 h->reconnect_task = NULL;
2212
2213 return h;
2214}
2215
2216
2217/**
2218 * Open a port to receive incomming MQ-based channels.
2219 *
2220 * @param h CADET handle.
2221 * @param port Hash identifying the port.
2222 * @param connects Function called when an incoming channel is connected.
2223 * @param connects_cls Closure for the @a connects handler.
2224 * @param window_changes Function called when the transmit window size changes.
2225 * @param disconnects Function called when a channel is disconnected.
2226 * @param handlers Callbacks for messages we care about, NULL-terminated.
2227 *
2228 * @return Port handle.
2229 */
2230struct GNUNET_CADET_Port *
2231GNUNET_CADET_open_porT (struct GNUNET_CADET_Handle *h,
2232 const struct GNUNET_HashCode *port,
2233 GNUNET_CADET_ConnectEventHandler connects,
2234 void * connects_cls,
2235 GNUNET_CADET_WindowSizeEventHandler window_changes,
2236 GNUNET_CADET_DisconnectEventHandler disconnects,
2237 const struct GNUNET_MQ_MessageHandler *handlers)
2238{
2239 return NULL;
2240}
2241
2242
2243/**
2244 * Implement sending functionality of a message queue for
2245 * us sending messages to a peer.
2246 *
2247 * Encapsulates the payload message in a #GNUNET_CADET_LocalData message
2248 * in order to label the message with the channel ID and send the
2249 * encapsulated message to the service.
2250 *
2251 * @param mq the message queue
2252 * @param msg the message to send
2253 * @param impl_state state of the implementation
2254 */
2255static void
2256cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
2257 const struct GNUNET_MessageHeader *msg,
2258 void *impl_state)
2259{
2260 struct GNUNET_CADET_Channel *ch = impl_state;
2261 struct GNUNET_CADET_Handle *h = ch->cadet;
2262 uint16_t msize;
2263 struct GNUNET_MQ_Envelope *env;
2264 struct GNUNET_CADET_LocalData *cadet_msg;
2265
2266
2267 if (NULL == h->mq)
2268 {
2269 /* We're currently reconnecting, pretend this worked */
2270 GNUNET_MQ_impl_send_continue (mq);
2271 return;
2272 }
2273
2274 /* check message size for sanity */
2275 msize = ntohs (msg->size);
2276 if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE)
2277 {
2278 GNUNET_break (0);
2279 GNUNET_MQ_impl_send_continue (mq);
2280 return;
2281 }
2282
2283 env = GNUNET_MQ_msg_nested_mh (cadet_msg,
2284 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
2285 msg);
2286 cadet_msg->ccn = ch->ccn;
2287 GNUNET_MQ_send (h->mq, env);
2288 GNUNET_MQ_impl_send_continue (mq);
2289}
2290
2291
2292/**
2293 * Handle destruction of a message queue. Implementations must not
2294 * free @a mq, but should take care of @a impl_state.
2295 *
2296 * @param mq the message queue to destroy
2297 * @param impl_state state of the implementation
2298 */
2299static void
2300cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
2301 void *impl_state)
2302{
2303 struct GNUNET_CADET_Channel *ch = impl_state;
2304
2305 GNUNET_assert (mq == ch->mq);
2306 ch->mq = NULL;
2307}
2308
2309
2310/**
2311 * We had an error processing a message we forwarded from a peer to
2312 * the CADET service. We should just complain about it but otherwise
2313 * continue processing.
2314 *
2315 * @param cls closure
2316 * @param error error code
2317 */
2318static void
2319cadet_mq_error_handler (void *cls,
2320 enum GNUNET_MQ_Error error)
2321{
2322 GNUNET_break_op (0);
2323}
2324
2325
2326/**
2327 * Implementation function that cancels the currently sent message.
2328 * Should basically undo whatever #mq_send_impl() did.
2329 *
2330 * @param mq message queue
2331 * @param impl_state state specific to the implementation
2332 */
2333static void
2334cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
2335 void *impl_state)
2336{
2337 struct GNUNET_CADET_Channel *ch = impl_state;
2338
2339 LOG (GNUNET_ERROR_TYPE_WARNING,
2340 "Cannot cancel mq message on channel %X of %p\n",
2341 ch->ccn.channel_of_client, ch->cadet);
2342
2343 GNUNET_break (0);
2344}
2345
2346
2347/**
2348 * Create a new channel towards a remote peer.
2349 *
2350 * If the destination port is not open by any peer or the destination peer
2351 * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called
2352 * for this channel.
2353 *
2354 * @param h CADET handle.
2355 * @param channel_cls Closure for the channel. It's given to:
2356 * - The disconnect handler @a disconnects
2357 * - Each message type callback in @a handlers
2358 * @param destination Peer identity the channel should go to.
2359 * @param port Identification of the destination port.
2360 * @param options CadetOption flag field, with all desired option bits set to 1.
2361 * @param window_changes Function called when the transmit window size changes.
2362 * @param disconnects Function called when the channel is disconnected.
2363 * @param handlers Callbacks for messages we care about, NULL-terminated.
2364 *
2365 * @return Handle to the channel.
2366 */
2367struct GNUNET_CADET_Channel *
2368GNUNET_CADET_channel_creatE (struct GNUNET_CADET_Handle *h,
2369 void *channel_cls,
2370 const struct GNUNET_PeerIdentity *destination,
2371 const struct GNUNET_HashCode *port,
2372 enum GNUNET_CADET_ChannelOption options,
2373 GNUNET_CADET_WindowSizeEventHandler window_changes,
2374 GNUNET_CADET_DisconnectEventHandler disconnects,
2375 const struct GNUNET_MQ_MessageHandler *handlers)
2376{
2377 struct GNUNET_CADET_Channel *ch;
2378 struct GNUNET_CADET_ClientChannelNumber ccn;
2379 struct GNUNET_CADET_LocalChannelCreateMessage *msg;
2380 struct GNUNET_MQ_Envelope *env;
2381
2382 /* Save parameters */
2383 ccn.channel_of_client = htonl (0);
2384 ch = create_channel (h, ccn);
2385 ch->ctx = channel_cls;
2386 ch->peer = GNUNET_PEER_intern (destination);
2387 ch->options = options;
2388 ch->window_changes = window_changes;
2389 ch->disconnects = disconnects;
2390
2391 /* Create MQ for channel */
2392 ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
2393 &cadet_mq_destroy_impl,
2394 &cadet_mq_cancel_impl,
2395 ch,
2396 handlers,
2397 &cadet_mq_error_handler,
2398 ch);
2399 GNUNET_MQ_set_handlers_closure (ch->mq, channel_cls);
2400
2401 /* Request channel creation to service */
2402 env = GNUNET_MQ_msg (msg,
2403 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
2404 msg->ccn = ch->ccn;
2405 msg->port = *port;
2406 msg->peer = *destination;
2407 msg->opt = htonl (options);
2408 GNUNET_MQ_send (h->mq,
2409 env);
2410
2411 return ch;
2412}
2413
2414
2415/**
2416 * Obtain the message queue for a connected peer.
2417 *
2418 * @param channel The channel handle from which to get the MQ.
2419 *
2420 * @return NULL if @a channel is not yet connected.
2421 */
2422struct GNUNET_MQ_Handle *
2423GNUNET_CADET_get_mq (const struct GNUNET_CADET_Channel *channel)
2424{
2425 return channel->mq;
2426}