diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-06-21 19:07:03 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-06-21 19:07:03 +0000 |
commit | 7262f6743a026cf110d3aedc7e39951280d65fa1 (patch) | |
tree | 6e48d45fafe9dc0da62d23cee31ae4cc37b109ac /src/namecache | |
parent | 3d7b29ec1c5d1c2de96cf4c9badaa112e86ef899 (diff) | |
download | gnunet-7262f6743a026cf110d3aedc7e39951280d65fa1.tar.gz gnunet-7262f6743a026cf110d3aedc7e39951280d65fa1.zip |
convert namecache to new MQ API
Diffstat (limited to 'src/namecache')
-rw-r--r-- | src/namecache/namecache_api.c | 518 |
1 files changed, 183 insertions, 335 deletions
diff --git a/src/namecache/namecache_api.c b/src/namecache/namecache_api.c index 15a750448..51cbacf12 100644 --- a/src/namecache/namecache_api.c +++ b/src/namecache/namecache_api.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | Copyright (C) 2010-2013 GNUnet e.V. | 3 | Copyright (C) 2010-2013, 2016 GNUnet e.V. |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -91,31 +91,6 @@ struct GNUNET_NAMECACHE_QueueEntry | |||
91 | 91 | ||
92 | 92 | ||
93 | /** | 93 | /** |
94 | * Message in linked list we should send to the service. The | ||
95 | * actual binary message follows this struct. | ||
96 | */ | ||
97 | struct PendingMessage | ||
98 | { | ||
99 | |||
100 | /** | ||
101 | * Kept in a DLL. | ||
102 | */ | ||
103 | struct PendingMessage *next; | ||
104 | |||
105 | /** | ||
106 | * Kept in a DLL. | ||
107 | */ | ||
108 | struct PendingMessage *prev; | ||
109 | |||
110 | /** | ||
111 | * Size of the message. | ||
112 | */ | ||
113 | size_t size; | ||
114 | |||
115 | }; | ||
116 | |||
117 | |||
118 | /** | ||
119 | * Connection to the NAMECACHE service. | 94 | * Connection to the NAMECACHE service. |
120 | */ | 95 | */ |
121 | struct GNUNET_NAMECACHE_Handle | 96 | struct GNUNET_NAMECACHE_Handle |
@@ -127,9 +102,9 @@ struct GNUNET_NAMECACHE_Handle | |||
127 | const struct GNUNET_CONFIGURATION_Handle *cfg; | 102 | const struct GNUNET_CONFIGURATION_Handle *cfg; |
128 | 103 | ||
129 | /** | 104 | /** |
130 | * Socket (if available). | 105 | * Message queue to service. |
131 | */ | 106 | */ |
132 | struct GNUNET_CLIENT_Connection *client; | 107 | struct GNUNET_MQ_Handle *mq; |
133 | 108 | ||
134 | /** | 109 | /** |
135 | * Currently pending transmission request (or NULL). | 110 | * Currently pending transmission request (or NULL). |
@@ -137,16 +112,6 @@ struct GNUNET_NAMECACHE_Handle | |||
137 | struct GNUNET_CLIENT_TransmitHandle *th; | 112 | struct GNUNET_CLIENT_TransmitHandle *th; |
138 | 113 | ||
139 | /** | 114 | /** |
140 | * Head of linked list of pending messages to send to the service | ||
141 | */ | ||
142 | struct PendingMessage *pending_head; | ||
143 | |||
144 | /** | ||
145 | * Tail of linked list of pending messages to send to the service | ||
146 | */ | ||
147 | struct PendingMessage *pending_tail; | ||
148 | |||
149 | /** | ||
150 | * Head of pending namecache queue entries | 115 | * Head of pending namecache queue entries |
151 | */ | 116 | */ |
152 | struct GNUNET_NAMECACHE_QueueEntry *op_head; | 117 | struct GNUNET_NAMECACHE_QueueEntry *op_head; |
@@ -159,7 +124,7 @@ struct GNUNET_NAMECACHE_Handle | |||
159 | /** | 124 | /** |
160 | * Reconnect task | 125 | * Reconnect task |
161 | */ | 126 | */ |
162 | struct GNUNET_SCHEDULER_Task * reconnect_task; | 127 | struct GNUNET_SCHEDULER_Task *reconnect_task; |
163 | 128 | ||
164 | /** | 129 | /** |
165 | * Delay introduced before we reconnect. | 130 | * Delay introduced before we reconnect. |
@@ -172,11 +137,6 @@ struct GNUNET_NAMECACHE_Handle | |||
172 | int reconnect; | 137 | int reconnect; |
173 | 138 | ||
174 | /** | 139 | /** |
175 | * Did we start to receive yet? | ||
176 | */ | ||
177 | int is_receiving; | ||
178 | |||
179 | /** | ||
180 | * The last operation id used for a NAMECACHE operation | 140 | * The last operation id used for a NAMECACHE operation |
181 | */ | 141 | */ |
182 | uint32_t last_op_id_used; | 142 | uint32_t last_op_id_used; |
@@ -194,285 +154,165 @@ force_reconnect (struct GNUNET_NAMECACHE_Handle *h); | |||
194 | 154 | ||
195 | 155 | ||
196 | /** | 156 | /** |
197 | * Handle an incoming message of type | 157 | * Find queue entry for the given @a rid. |
198 | * #GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK_RESPONSE. | ||
199 | * | 158 | * |
200 | * @param qe the respective entry in the message queue | 159 | * @param h handle to search |
201 | * @param msg the message we received | 160 | * @param rid request ID to look for |
202 | * @param size the message size | 161 | * @return NULL if not found, otherwise the queue entry (removed from the queue) |
203 | * @return #GNUNET_OK on success, #GNUNET_SYSERR on error and we did NOT notify the client | ||
204 | */ | 162 | */ |
205 | static int | 163 | static struct GNUNET_NAMECACHE_QueueEntry * |
206 | handle_lookup_block_response (struct GNUNET_NAMECACHE_QueueEntry *qe, | 164 | find_qe (struct GNUNET_NAMECACHE_Handle *h, |
207 | const struct LookupBlockResponseMessage *msg, | 165 | uint32_t rid) |
208 | size_t size) | ||
209 | { | 166 | { |
210 | struct GNUNET_GNSRECORD_Block *block; | 167 | struct GNUNET_NAMECACHE_QueueEntry *qe; |
211 | char buf[size + sizeof (struct GNUNET_GNSRECORD_Block) | ||
212 | - sizeof (struct LookupBlockResponseMessage)]; | ||
213 | |||
214 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
215 | "Received `%s'\n", | ||
216 | "LOOKUP_BLOCK_RESPONSE"); | ||
217 | if (0 == GNUNET_TIME_absolute_ntoh (msg->expire).abs_value_us) | ||
218 | { | ||
219 | /* no match found */ | ||
220 | if (NULL != qe->block_proc) | ||
221 | qe->block_proc (qe->block_proc_cls, NULL); | ||
222 | return GNUNET_OK; | ||
223 | } | ||
224 | 168 | ||
225 | block = (struct GNUNET_GNSRECORD_Block *) buf; | 169 | for (qe = h->op_head; qe != NULL; qe = qe->next) |
226 | block->signature = msg->signature; | ||
227 | block->derived_key = msg->derived_key; | ||
228 | block->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_GNS_RECORD_SIGN); | ||
229 | block->purpose.size = htonl (size - sizeof (struct LookupBlockResponseMessage) + | ||
230 | sizeof (struct GNUNET_TIME_AbsoluteNBO) + | ||
231 | sizeof (struct GNUNET_CRYPTO_EccSignaturePurpose)); | ||
232 | block->expiration_time = msg->expire; | ||
233 | memcpy (&block[1], | ||
234 | &msg[1], | ||
235 | size - sizeof (struct LookupBlockResponseMessage)); | ||
236 | if (GNUNET_OK != | ||
237 | GNUNET_GNSRECORD_block_verify (block)) | ||
238 | { | 170 | { |
239 | GNUNET_break (0); | 171 | if (qe->op_id == rid) |
240 | return GNUNET_SYSERR; | 172 | { |
173 | GNUNET_CONTAINER_DLL_remove (h->op_head, | ||
174 | h->op_tail, | ||
175 | qe); | ||
176 | return qe; | ||
177 | } | ||
241 | } | 178 | } |
242 | if (NULL != qe->block_proc) | 179 | return NULL; |
243 | qe->block_proc (qe->block_proc_cls, block); | ||
244 | else | ||
245 | GNUNET_break (0); | ||
246 | return GNUNET_OK; | ||
247 | } | 180 | } |
248 | 181 | ||
249 | 182 | ||
250 | /** | 183 | /** |
251 | * Handle an incoming message of type | 184 | * Handle an incoming message of type |
252 | * #GNUNET_MESSAGE_TYPE_NAMECACHE_BLOCK_CACHE_RESPONSE | 185 | * #GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK_RESPONSE. |
253 | * | 186 | * |
254 | * @param qe the respective entry in the message queue | 187 | * @param cls the `struct GNUNET_NAMECACHE_Handle` |
255 | * @param msg the message we received | 188 | * @param msg the message we received |
256 | * @param size the message size | ||
257 | * @return #GNUNET_OK on success, #GNUNET_SYSERR on error and we did NOT notify the client | ||
258 | */ | 189 | */ |
259 | static int | 190 | static int |
260 | handle_block_cache_response (struct GNUNET_NAMECACHE_QueueEntry *qe, | 191 | check_lookup_block_response (void *cls, |
261 | const struct BlockCacheResponseMessage *msg, | 192 | const struct LookupBlockResponseMessage *msg) |
262 | size_t size) | ||
263 | { | 193 | { |
264 | int res; | 194 | /* any length will do, format validation is in handler */ |
265 | |||
266 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
267 | "Received `%s'\n", | ||
268 | "BLOCK_CACHE_RESPONSE"); | ||
269 | res = ntohl (msg->op_result); | ||
270 | /* TODO: add actual error message from namecache to response... */ | ||
271 | if (NULL != qe->cont) | ||
272 | qe->cont (qe->cont_cls, | ||
273 | res, | ||
274 | (GNUNET_OK == res) | ||
275 | ? NULL | ||
276 | : _("Namecache failed to cache block")); | ||
277 | return GNUNET_OK; | 195 | return GNUNET_OK; |
278 | } | 196 | } |
279 | 197 | ||
280 | 198 | ||
281 | /** | 199 | /** |
282 | * Handle incoming messages for record operations | 200 | * Handle an incoming message of type |
201 | * #GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK_RESPONSE. | ||
283 | * | 202 | * |
284 | * @param qe the respective zone iteration handle | 203 | * @param cls the `struct GNUNET_NAMECACHE_Handle` |
285 | * @param msg the message we received | 204 | * @param msg the message we received |
286 | * @param type the message type in host byte order | ||
287 | * @param size the message size | ||
288 | * @return #GNUNET_OK on success, #GNUNET_NO if we notified the client about | ||
289 | * the error, #GNUNET_SYSERR on error and we did NOT notify the client | ||
290 | */ | ||
291 | static int | ||
292 | manage_record_operations (struct GNUNET_NAMECACHE_QueueEntry *qe, | ||
293 | const struct GNUNET_MessageHeader *msg, | ||
294 | uint16_t type, | ||
295 | size_t size) | ||
296 | { | ||
297 | /* handle different message type */ | ||
298 | switch (type) | ||
299 | { | ||
300 | case GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK_RESPONSE: | ||
301 | if (size < sizeof (struct LookupBlockResponseMessage)) | ||
302 | { | ||
303 | GNUNET_break (0); | ||
304 | return GNUNET_SYSERR; | ||
305 | } | ||
306 | return handle_lookup_block_response (qe, (const struct LookupBlockResponseMessage *) msg, size); | ||
307 | case GNUNET_MESSAGE_TYPE_NAMECACHE_BLOCK_CACHE_RESPONSE: | ||
308 | if (size != sizeof (struct BlockCacheResponseMessage)) | ||
309 | { | ||
310 | GNUNET_break (0); | ||
311 | return GNUNET_SYSERR; | ||
312 | } | ||
313 | return handle_block_cache_response (qe, (const struct BlockCacheResponseMessage *) msg, size); | ||
314 | default: | ||
315 | GNUNET_break (0); | ||
316 | return GNUNET_SYSERR; | ||
317 | } | ||
318 | } | ||
319 | |||
320 | |||
321 | /** | ||
322 | * Type of a function to call when we receive a message | ||
323 | * from the service. | ||
324 | * | ||
325 | * @param cls the `struct GNUNET_NAMECACHE_SchedulingHandle` | ||
326 | * @param msg message received, NULL on timeout or fatal error | ||
327 | */ | 205 | */ |
328 | static void | 206 | static void |
329 | process_namecache_message (void *cls, | 207 | handle_lookup_block_response (void *cls, |
330 | const struct GNUNET_MessageHeader *msg) | 208 | const struct LookupBlockResponseMessage *msg) |
331 | { | 209 | { |
332 | struct GNUNET_NAMECACHE_Handle *h = cls; | 210 | struct GNUNET_NAMECACHE_Handle *h = cls; |
333 | const struct GNUNET_NAMECACHE_Header *gm; | 211 | size_t size; |
334 | struct GNUNET_NAMECACHE_QueueEntry *qe; | 212 | struct GNUNET_NAMECACHE_QueueEntry *qe; |
335 | uint16_t size; | ||
336 | uint16_t type; | ||
337 | uint32_t r_id; | ||
338 | int ret; | ||
339 | 213 | ||
340 | if (NULL == msg) | 214 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
341 | { | 215 | "Received LOOKUP_BLOCK_RESPONSE\n"); |
342 | force_reconnect (h); | 216 | qe = find_qe (h, |
217 | ntohl (msg->gns_header.r_id)); | ||
218 | if (NULL == qe) | ||
343 | return; | 219 | return; |
344 | } | 220 | if (0 == GNUNET_TIME_absolute_ntoh (msg->expire).abs_value_us) |
345 | size = ntohs (msg->size); | ||
346 | type = ntohs (msg->type); | ||
347 | if (size < sizeof (struct GNUNET_NAMECACHE_Header)) | ||
348 | { | 221 | { |
349 | GNUNET_break_op (0); | 222 | /* no match found */ |
350 | GNUNET_CLIENT_receive (h->client, | 223 | if (NULL != qe->block_proc) |
351 | &process_namecache_message, h, | 224 | qe->block_proc (qe->block_proc_cls, |
352 | GNUNET_TIME_UNIT_FOREVER_REL); | 225 | NULL); |
226 | GNUNET_free (qe); | ||
353 | return; | 227 | return; |
354 | } | 228 | } |
355 | gm = (const struct GNUNET_NAMECACHE_Header *) msg; | 229 | size = ntohs (msg->gns_header.header.size) |
356 | r_id = ntohl (gm->r_id); | 230 | - sizeof (struct LookupBlockResponseMessage); |
357 | |||
358 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
359 | "Received message type %u size %u op %u\n", | ||
360 | (unsigned int) type, | ||
361 | (unsigned int) size, | ||
362 | (unsigned int) r_id); | ||
363 | |||
364 | /* Is it a record related operation ? */ | ||
365 | for (qe = h->op_head; qe != NULL; qe = qe->next) | ||
366 | if (qe->op_id == r_id) | ||
367 | break; | ||
368 | if (NULL != qe) | ||
369 | { | 231 | { |
370 | ret = manage_record_operations (qe, msg, type, size); | 232 | char buf[size + sizeof (struct GNUNET_GNSRECORD_Block)] GNUNET_ALIGN; |
371 | if (GNUNET_SYSERR == ret) | 233 | struct GNUNET_GNSRECORD_Block *block; |
234 | |||
235 | block = (struct GNUNET_GNSRECORD_Block *) buf; | ||
236 | block->signature = msg->signature; | ||
237 | block->derived_key = msg->derived_key; | ||
238 | block->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_GNS_RECORD_SIGN); | ||
239 | block->purpose.size = htonl (size + | ||
240 | sizeof (struct GNUNET_TIME_AbsoluteNBO) + | ||
241 | sizeof (struct GNUNET_CRYPTO_EccSignaturePurpose)); | ||
242 | block->expiration_time = msg->expire; | ||
243 | memcpy (&block[1], | ||
244 | &msg[1], | ||
245 | size); | ||
246 | if (GNUNET_OK != | ||
247 | GNUNET_GNSRECORD_block_verify (block)) | ||
372 | { | 248 | { |
373 | /* protocol error, need to reconnect */ | 249 | GNUNET_break (0); |
374 | h->reconnect = GNUNET_YES; | 250 | if (NULL != qe->block_proc) |
251 | qe->block_proc (qe->block_proc_cls, | ||
252 | NULL); | ||
253 | force_reconnect (h); | ||
375 | } | 254 | } |
376 | else | 255 | else |
377 | { | 256 | { |
378 | /* client was notified about success or failure, clean up 'qe' */ | 257 | if (NULL != qe->block_proc) |
379 | GNUNET_CONTAINER_DLL_remove (h->op_head, | 258 | qe->block_proc (qe->block_proc_cls, |
380 | h->op_tail, | 259 | block); |
381 | qe); | ||
382 | GNUNET_free (qe); | ||
383 | } | 260 | } |
384 | } | 261 | } |
385 | if (GNUNET_YES == h->reconnect) | 262 | GNUNET_free (qe); |
386 | { | ||
387 | force_reconnect (h); | ||
388 | return; | ||
389 | } | ||
390 | GNUNET_CLIENT_receive (h->client, &process_namecache_message, h, | ||
391 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
392 | } | 263 | } |
393 | 264 | ||
394 | 265 | ||
395 | /** | 266 | /** |
396 | * Transmit messages from the message queue to the service | 267 | * Handle an incoming message of type |
397 | * (if there are any, and if we are not already trying). | 268 | * #GNUNET_MESSAGE_TYPE_NAMECACHE_BLOCK_CACHE_RESPONSE |
398 | * | ||
399 | * @param h handle to use | ||
400 | */ | ||
401 | static void | ||
402 | do_transmit (struct GNUNET_NAMECACHE_Handle *h); | ||
403 | |||
404 | |||
405 | /** | ||
406 | * We can now transmit a message to NAMECACHE. Do it. | ||
407 | * | 269 | * |
408 | * @param cls the `struct GNUNET_NAMECACHE_Handle` | 270 | * @param cls the `struct GNUNET_NAMECACHE_Handle` |
409 | * @param size number of bytes we can transmit | 271 | * @param msg the message we received |
410 | * @param buf where to copy the messages | 272 | * @param size the message size |
411 | * @return number of bytes copied into @a buf | 273 | * @return #GNUNET_OK on success, #GNUNET_SYSERR on error and we did NOT notify the client |
412 | */ | 274 | */ |
413 | static size_t | 275 | static void |
414 | transmit_message_to_namecache (void *cls, | 276 | handle_block_cache_response (void *cls, |
415 | size_t size, | 277 | const struct BlockCacheResponseMessage *msg) |
416 | void *buf) | ||
417 | { | 278 | { |
418 | struct GNUNET_NAMECACHE_Handle *h = cls; | 279 | struct GNUNET_NAMECACHE_Handle *h = cls; |
419 | struct PendingMessage *p; | 280 | struct GNUNET_NAMECACHE_QueueEntry *qe; |
420 | size_t ret; | 281 | int res; |
421 | char *cbuf; | ||
422 | 282 | ||
423 | h->th = NULL; | 283 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
424 | if ((0 == size) || (NULL == buf)) | 284 | "Received BLOCK_CACHE_RESPONSE\n"); |
425 | { | 285 | qe = find_qe (h, |
426 | force_reconnect (h); | 286 | ntohl (msg->gns_header.r_id)); |
427 | return 0; | 287 | if (NULL == qe) |
428 | } | 288 | return; |
429 | ret = 0; | 289 | res = ntohl (msg->op_result); |
430 | cbuf = buf; | 290 | /* TODO: add actual error message from namecache to response... */ |
431 | while ( (NULL != (p = h->pending_head)) && | 291 | if (NULL != qe->cont) |
432 | (p->size <= size) ) | 292 | qe->cont (qe->cont_cls, |
433 | { | 293 | res, |
434 | memcpy (&cbuf[ret], &p[1], p->size); | 294 | (GNUNET_OK == res) |
435 | ret += p->size; | 295 | ? NULL |
436 | size -= p->size; | 296 | : _("Namecache failed to cache block")); |
437 | GNUNET_CONTAINER_DLL_remove (h->pending_head, | 297 | GNUNET_free (qe); |
438 | h->pending_tail, | ||
439 | p); | ||
440 | if (GNUNET_NO == h->is_receiving) | ||
441 | { | ||
442 | h->is_receiving = GNUNET_YES; | ||
443 | GNUNET_CLIENT_receive (h->client, | ||
444 | &process_namecache_message, h, | ||
445 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
446 | } | ||
447 | GNUNET_free (p); | ||
448 | } | ||
449 | do_transmit (h); | ||
450 | return ret; | ||
451 | } | 298 | } |
452 | 299 | ||
453 | 300 | ||
454 | /** | 301 | /** |
455 | * Transmit messages from the message queue to the service | 302 | * Generic error handler, called with the appropriate error code and |
456 | * (if there are any, and if we are not already trying). | 303 | * the same closure specified at the creation of the message queue. |
304 | * Not every message queue implementation supports an error handler. | ||
457 | * | 305 | * |
458 | * @param h handle to use | 306 | * @param cls closure with the `struct GNUNET_NAMECACHE_Handle *` |
307 | * @param error error code | ||
459 | */ | 308 | */ |
460 | static void | 309 | static void |
461 | do_transmit (struct GNUNET_NAMECACHE_Handle *h) | 310 | mq_error_handler (void *cls, |
311 | enum GNUNET_MQ_Error error) | ||
462 | { | 312 | { |
463 | struct PendingMessage *p; | 313 | struct GNUNET_NAMECACHE_Handle *h = cls; |
464 | 314 | ||
465 | if (NULL != h->th) | 315 | force_reconnect (h); |
466 | return; /* transmission request already pending */ | ||
467 | if (NULL == (p = h->pending_head)) | ||
468 | return; /* transmission queue empty */ | ||
469 | if (NULL == h->client) | ||
470 | return; /* currently reconnecting */ | ||
471 | h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, p->size, | ||
472 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
473 | GNUNET_NO, &transmit_message_to_namecache, | ||
474 | h); | ||
475 | GNUNET_break (NULL != h->th); | ||
476 | } | 316 | } |
477 | 317 | ||
478 | 318 | ||
@@ -484,10 +324,23 @@ do_transmit (struct GNUNET_NAMECACHE_Handle *h) | |||
484 | static void | 324 | static void |
485 | reconnect (struct GNUNET_NAMECACHE_Handle *h) | 325 | reconnect (struct GNUNET_NAMECACHE_Handle *h) |
486 | { | 326 | { |
487 | GNUNET_assert (NULL == h->client); | 327 | GNUNET_MQ_hd_var_size (lookup_block_response, |
488 | h->client = GNUNET_CLIENT_connect ("namecache", h->cfg); | 328 | GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK_RESPONSE, |
489 | GNUNET_assert (NULL != h->client); | 329 | struct LookupBlockResponseMessage); |
490 | do_transmit (h); | 330 | GNUNET_MQ_hd_fixed_size (block_cache_response, |
331 | GNUNET_MESSAGE_TYPE_NAMECACHE_BLOCK_CACHE_RESPONSE, | ||
332 | struct BlockCacheResponseMessage); | ||
333 | struct GNUNET_MQ_MessageHandler handlers[] = { | ||
334 | make_lookup_block_response_handler (h), | ||
335 | make_block_cache_response_handler (h), | ||
336 | GNUNET_MQ_handler_end () | ||
337 | }; | ||
338 | GNUNET_assert (NULL == h->mq); | ||
339 | h->mq = GNUNET_CLIENT_connecT (h->cfg, | ||
340 | "namecache", | ||
341 | handlers, | ||
342 | &mq_error_handler, | ||
343 | h); | ||
491 | } | 344 | } |
492 | 345 | ||
493 | 346 | ||
@@ -514,17 +367,24 @@ reconnect_task (void *cls) | |||
514 | static void | 367 | static void |
515 | force_reconnect (struct GNUNET_NAMECACHE_Handle *h) | 368 | force_reconnect (struct GNUNET_NAMECACHE_Handle *h) |
516 | { | 369 | { |
517 | if (NULL != h->th) | 370 | struct GNUNET_NAMECACHE_QueueEntry *qe; |
371 | |||
372 | h->reconnect = GNUNET_NO; | ||
373 | GNUNET_MQ_destroy (h->mq); | ||
374 | h->mq = NULL; | ||
375 | while (NULL != (qe = h->op_head)) | ||
518 | { | 376 | { |
519 | GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); | 377 | GNUNET_CONTAINER_DLL_remove (h->op_head, |
520 | h->th = NULL; | 378 | h->op_tail, |
379 | qe); | ||
380 | if (NULL != qe->cont) | ||
381 | qe->cont (qe->cont_cls, | ||
382 | GNUNET_SYSERR, | ||
383 | _("Error communicating with namecache service")); | ||
384 | GNUNET_free (qe); | ||
521 | } | 385 | } |
522 | h->reconnect = GNUNET_NO; | ||
523 | GNUNET_CLIENT_disconnect (h->client); | ||
524 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 386 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
525 | "Reconnecting to namecache\n"); | 387 | "Reconnecting to namecache\n"); |
526 | h->is_receiving = GNUNET_NO; | ||
527 | h->client = NULL; | ||
528 | h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); | 388 | h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); |
529 | h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, | 389 | h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, |
530 | &reconnect_task, | 390 | &reconnect_task, |
@@ -558,8 +418,12 @@ GNUNET_NAMECACHE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
558 | 418 | ||
559 | h = GNUNET_new (struct GNUNET_NAMECACHE_Handle); | 419 | h = GNUNET_new (struct GNUNET_NAMECACHE_Handle); |
560 | h->cfg = cfg; | 420 | h->cfg = cfg; |
561 | h->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect_task, h); | 421 | reconnect (h); |
562 | h->last_op_id_used = 0; | 422 | if (NULL == h->mq) |
423 | { | ||
424 | GNUNET_free (h); | ||
425 | return NULL; | ||
426 | } | ||
563 | return h; | 427 | return h; |
564 | } | 428 | } |
565 | 429 | ||
@@ -573,31 +437,20 @@ GNUNET_NAMECACHE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
573 | void | 437 | void |
574 | GNUNET_NAMECACHE_disconnect (struct GNUNET_NAMECACHE_Handle *h) | 438 | GNUNET_NAMECACHE_disconnect (struct GNUNET_NAMECACHE_Handle *h) |
575 | { | 439 | { |
576 | struct PendingMessage *p; | ||
577 | struct GNUNET_NAMECACHE_QueueEntry *q; | 440 | struct GNUNET_NAMECACHE_QueueEntry *q; |
578 | 441 | ||
579 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Cleaning up\n"); | ||
580 | GNUNET_assert (NULL != h); | ||
581 | if (NULL != h->th) | ||
582 | { | ||
583 | GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); | ||
584 | h->th = NULL; | ||
585 | } | ||
586 | while (NULL != (p = h->pending_head)) | ||
587 | { | ||
588 | GNUNET_CONTAINER_DLL_remove (h->pending_head, h->pending_tail, p); | ||
589 | GNUNET_free (p); | ||
590 | } | ||
591 | GNUNET_break (NULL == h->op_head); | 442 | GNUNET_break (NULL == h->op_head); |
592 | while (NULL != (q = h->op_head)) | 443 | while (NULL != (q = h->op_head)) |
593 | { | 444 | { |
594 | GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, q); | 445 | GNUNET_CONTAINER_DLL_remove (h->op_head, |
446 | h->op_tail, | ||
447 | q); | ||
595 | GNUNET_free (q); | 448 | GNUNET_free (q); |
596 | } | 449 | } |
597 | if (NULL != h->client) | 450 | if (NULL != h->mq) |
598 | { | 451 | { |
599 | GNUNET_CLIENT_disconnect (h->client); | 452 | GNUNET_MQ_destroy (h->mq); |
600 | h->client = NULL; | 453 | h->mq = NULL; |
601 | } | 454 | } |
602 | if (NULL != h->reconnect_task) | 455 | if (NULL != h->reconnect_task) |
603 | { | 456 | { |
@@ -615,8 +468,8 @@ GNUNET_NAMECACHE_disconnect (struct GNUNET_NAMECACHE_Handle *h) | |||
615 | * @param h handle to the namecache | 468 | * @param h handle to the namecache |
616 | * @param block block to store | 469 | * @param block block to store |
617 | * @param cont continuation to call when done | 470 | * @param cont continuation to call when done |
618 | * @param cont_cls closure for cont | 471 | * @param cont_cls closure for @a cont |
619 | * @return handle to abort the request | 472 | * @return handle to abort the request, NULL on error |
620 | */ | 473 | */ |
621 | struct GNUNET_NAMECACHE_QueueEntry * | 474 | struct GNUNET_NAMECACHE_QueueEntry * |
622 | GNUNET_NAMECACHE_block_cache (struct GNUNET_NAMECACHE_Handle *h, | 475 | GNUNET_NAMECACHE_block_cache (struct GNUNET_NAMECACHE_Handle *h, |
@@ -625,13 +478,13 @@ GNUNET_NAMECACHE_block_cache (struct GNUNET_NAMECACHE_Handle *h, | |||
625 | void *cont_cls) | 478 | void *cont_cls) |
626 | { | 479 | { |
627 | struct GNUNET_NAMECACHE_QueueEntry *qe; | 480 | struct GNUNET_NAMECACHE_QueueEntry *qe; |
628 | struct PendingMessage *pe; | ||
629 | struct BlockCacheMessage *msg; | 481 | struct BlockCacheMessage *msg; |
482 | struct GNUNET_MQ_Envelope *env; | ||
630 | uint32_t rid; | 483 | uint32_t rid; |
631 | size_t blen; | 484 | size_t blen; |
632 | size_t msg_size; | ||
633 | 485 | ||
634 | GNUNET_assert (NULL != h); | 486 | if (NULL == h->mq) |
487 | return NULL; | ||
635 | blen = ntohl (block->purpose.size) | 488 | blen = ntohl (block->purpose.size) |
636 | - sizeof (struct GNUNET_TIME_AbsoluteNBO) | 489 | - sizeof (struct GNUNET_TIME_AbsoluteNBO) |
637 | - sizeof (struct GNUNET_CRYPTO_EccSignaturePurpose); | 490 | - sizeof (struct GNUNET_CRYPTO_EccSignaturePurpose); |
@@ -641,27 +494,22 @@ GNUNET_NAMECACHE_block_cache (struct GNUNET_NAMECACHE_Handle *h, | |||
641 | qe->cont = cont; | 494 | qe->cont = cont; |
642 | qe->cont_cls = cont_cls; | 495 | qe->cont_cls = cont_cls; |
643 | qe->op_id = rid; | 496 | qe->op_id = rid; |
644 | GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, qe); | 497 | GNUNET_CONTAINER_DLL_insert_tail (h->op_head, |
645 | 498 | h->op_tail, | |
646 | /* setup msg */ | 499 | qe); |
647 | msg_size = sizeof (struct BlockCacheMessage) + blen; | 500 | /* send msg */ |
648 | pe = GNUNET_malloc (sizeof (struct PendingMessage) + msg_size); | 501 | env = GNUNET_MQ_msg_extra (msg, |
649 | pe->size = msg_size; | 502 | blen, |
650 | msg = (struct BlockCacheMessage *) &pe[1]; | 503 | GNUNET_MESSAGE_TYPE_NAMECACHE_BLOCK_CACHE); |
651 | msg->gns_header.header.type = htons (GNUNET_MESSAGE_TYPE_NAMECACHE_BLOCK_CACHE); | ||
652 | msg->gns_header.header.size = htons (msg_size); | ||
653 | msg->gns_header.r_id = htonl (rid); | 504 | msg->gns_header.r_id = htonl (rid); |
654 | msg->expire = block->expiration_time; | 505 | msg->expire = block->expiration_time; |
655 | msg->signature = block->signature; | 506 | msg->signature = block->signature; |
656 | msg->derived_key = block->derived_key; | 507 | msg->derived_key = block->derived_key; |
657 | memcpy (&msg[1], &block[1], blen); | 508 | memcpy (&msg[1], |
658 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 509 | &block[1], |
659 | "Sending `%s' message with size %u and expiration %s\n", | 510 | blen); |
660 | "NAMECACHE_BLOCK_CACHE", | 511 | GNUNET_MQ_send (h->mq, |
661 | (unsigned int) msg_size, | 512 | env); |
662 | GNUNET_STRINGS_absolute_time_to_string (GNUNET_TIME_absolute_ntoh (msg->expire))); | ||
663 | GNUNET_CONTAINER_DLL_insert_tail (h->pending_head, h->pending_tail, pe); | ||
664 | do_transmit (h); | ||
665 | return qe; | 513 | return qe; |
666 | } | 514 | } |
667 | 515 | ||
@@ -674,41 +522,40 @@ GNUNET_NAMECACHE_block_cache (struct GNUNET_NAMECACHE_Handle *h, | |||
674 | * @param derived_hash hash of zone key combined with name to lookup | 522 | * @param derived_hash hash of zone key combined with name to lookup |
675 | * @param proc function to call on the matching block, or with | 523 | * @param proc function to call on the matching block, or with |
676 | * NULL if there is no matching block | 524 | * NULL if there is no matching block |
677 | * @param proc_cls closure for proc | 525 | * @param proc_cls closure for @a proc |
678 | * @return a handle that can be used to cancel | 526 | * @return a handle that can be used to cancel, NULL on error |
679 | */ | 527 | */ |
680 | struct GNUNET_NAMECACHE_QueueEntry * | 528 | struct GNUNET_NAMECACHE_QueueEntry * |
681 | GNUNET_NAMECACHE_lookup_block (struct GNUNET_NAMECACHE_Handle *h, | 529 | GNUNET_NAMECACHE_lookup_block (struct GNUNET_NAMECACHE_Handle *h, |
682 | const struct GNUNET_HashCode *derived_hash, | 530 | const struct GNUNET_HashCode *derived_hash, |
683 | GNUNET_NAMECACHE_BlockProcessor proc, void *proc_cls) | 531 | GNUNET_NAMECACHE_BlockProcessor proc, |
532 | void *proc_cls) | ||
684 | { | 533 | { |
685 | struct GNUNET_NAMECACHE_QueueEntry *qe; | 534 | struct GNUNET_NAMECACHE_QueueEntry *qe; |
686 | struct PendingMessage *pe; | ||
687 | struct LookupBlockMessage *msg; | 535 | struct LookupBlockMessage *msg; |
688 | size_t msg_size; | 536 | struct GNUNET_MQ_Envelope *env; |
689 | uint32_t rid; | 537 | uint32_t rid; |
690 | 538 | ||
539 | if (NULL == h->mq) | ||
540 | return NULL; | ||
691 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 541 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
692 | "Looking for block under %s\n", | 542 | "Looking for block under %s\n", |
693 | GNUNET_h2s (derived_hash)); | 543 | GNUNET_h2s (derived_hash)); |
694 | rid = get_op_id(h); | 544 | rid = get_op_id (h); |
695 | qe = GNUNET_new (struct GNUNET_NAMECACHE_QueueEntry); | 545 | qe = GNUNET_new (struct GNUNET_NAMECACHE_QueueEntry); |
696 | qe->nsh = h; | 546 | qe->nsh = h; |
697 | qe->block_proc = proc; | 547 | qe->block_proc = proc; |
698 | qe->block_proc_cls = proc_cls; | 548 | qe->block_proc_cls = proc_cls; |
699 | qe->op_id = rid; | 549 | qe->op_id = rid; |
700 | GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, qe); | 550 | GNUNET_CONTAINER_DLL_insert_tail (h->op_head, |
701 | 551 | h->op_tail, | |
702 | msg_size = sizeof (struct LookupBlockMessage); | 552 | qe); |
703 | pe = GNUNET_malloc (sizeof (struct PendingMessage) + msg_size); | 553 | env = GNUNET_MQ_msg (msg, |
704 | pe->size = msg_size; | 554 | GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK); |
705 | msg = (struct LookupBlockMessage *) &pe[1]; | ||
706 | msg->gns_header.header.type = htons (GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK); | ||
707 | msg->gns_header.header.size = htons (msg_size); | ||
708 | msg->gns_header.r_id = htonl (rid); | 555 | msg->gns_header.r_id = htonl (rid); |
709 | msg->query = *derived_hash; | 556 | msg->query = *derived_hash; |
710 | GNUNET_CONTAINER_DLL_insert_tail (h->pending_head, h->pending_tail, pe); | 557 | GNUNET_MQ_send (h->mq, |
711 | do_transmit (h); | 558 | env); |
712 | return qe; | 559 | return qe; |
713 | } | 560 | } |
714 | 561 | ||
@@ -724,8 +571,9 @@ GNUNET_NAMECACHE_cancel (struct GNUNET_NAMECACHE_QueueEntry *qe) | |||
724 | { | 571 | { |
725 | struct GNUNET_NAMECACHE_Handle *h = qe->nsh; | 572 | struct GNUNET_NAMECACHE_Handle *h = qe->nsh; |
726 | 573 | ||
727 | GNUNET_assert (NULL != qe); | 574 | GNUNET_CONTAINER_DLL_remove (h->op_head, |
728 | GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, qe); | 575 | h->op_tail, |
576 | qe); | ||
729 | GNUNET_free(qe); | 577 | GNUNET_free(qe); |
730 | } | 578 | } |
731 | 579 | ||