aboutsummaryrefslogtreecommitdiff
path: root/src/namecache
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-06-21 19:07:03 +0000
committerChristian Grothoff <christian@grothoff.org>2016-06-21 19:07:03 +0000
commit7262f6743a026cf110d3aedc7e39951280d65fa1 (patch)
tree6e48d45fafe9dc0da62d23cee31ae4cc37b109ac /src/namecache
parent3d7b29ec1c5d1c2de96cf4c9badaa112e86ef899 (diff)
downloadgnunet-7262f6743a026cf110d3aedc7e39951280d65fa1.tar.gz
gnunet-7262f6743a026cf110d3aedc7e39951280d65fa1.zip
convert namecache to new MQ API
Diffstat (limited to 'src/namecache')
-rw-r--r--src/namecache/namecache_api.c518
1 files changed, 183 insertions, 335 deletions
diff --git a/src/namecache/namecache_api.c b/src/namecache/namecache_api.c
index 15a750448..51cbacf12 100644
--- a/src/namecache/namecache_api.c
+++ b/src/namecache/namecache_api.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2010-2013 GNUnet e.V. 3 Copyright (C) 2010-2013, 2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -91,31 +91,6 @@ struct GNUNET_NAMECACHE_QueueEntry
91 91
92 92
93/** 93/**
94 * Message in linked list we should send to the service. The
95 * actual binary message follows this struct.
96 */
97struct PendingMessage
98{
99
100 /**
101 * Kept in a DLL.
102 */
103 struct PendingMessage *next;
104
105 /**
106 * Kept in a DLL.
107 */
108 struct PendingMessage *prev;
109
110 /**
111 * Size of the message.
112 */
113 size_t size;
114
115};
116
117
118/**
119 * Connection to the NAMECACHE service. 94 * Connection to the NAMECACHE service.
120 */ 95 */
121struct GNUNET_NAMECACHE_Handle 96struct GNUNET_NAMECACHE_Handle
@@ -127,9 +102,9 @@ struct GNUNET_NAMECACHE_Handle
127 const struct GNUNET_CONFIGURATION_Handle *cfg; 102 const struct GNUNET_CONFIGURATION_Handle *cfg;
128 103
129 /** 104 /**
130 * Socket (if available). 105 * Message queue to service.
131 */ 106 */
132 struct GNUNET_CLIENT_Connection *client; 107 struct GNUNET_MQ_Handle *mq;
133 108
134 /** 109 /**
135 * Currently pending transmission request (or NULL). 110 * Currently pending transmission request (or NULL).
@@ -137,16 +112,6 @@ struct GNUNET_NAMECACHE_Handle
137 struct GNUNET_CLIENT_TransmitHandle *th; 112 struct GNUNET_CLIENT_TransmitHandle *th;
138 113
139 /** 114 /**
140 * Head of linked list of pending messages to send to the service
141 */
142 struct PendingMessage *pending_head;
143
144 /**
145 * Tail of linked list of pending messages to send to the service
146 */
147 struct PendingMessage *pending_tail;
148
149 /**
150 * Head of pending namecache queue entries 115 * Head of pending namecache queue entries
151 */ 116 */
152 struct GNUNET_NAMECACHE_QueueEntry *op_head; 117 struct GNUNET_NAMECACHE_QueueEntry *op_head;
@@ -159,7 +124,7 @@ struct GNUNET_NAMECACHE_Handle
159 /** 124 /**
160 * Reconnect task 125 * Reconnect task
161 */ 126 */
162 struct GNUNET_SCHEDULER_Task * reconnect_task; 127 struct GNUNET_SCHEDULER_Task *reconnect_task;
163 128
164 /** 129 /**
165 * Delay introduced before we reconnect. 130 * Delay introduced before we reconnect.
@@ -172,11 +137,6 @@ struct GNUNET_NAMECACHE_Handle
172 int reconnect; 137 int reconnect;
173 138
174 /** 139 /**
175 * Did we start to receive yet?
176 */
177 int is_receiving;
178
179 /**
180 * The last operation id used for a NAMECACHE operation 140 * The last operation id used for a NAMECACHE operation
181 */ 141 */
182 uint32_t last_op_id_used; 142 uint32_t last_op_id_used;
@@ -194,285 +154,165 @@ force_reconnect (struct GNUNET_NAMECACHE_Handle *h);
194 154
195 155
196/** 156/**
197 * Handle an incoming message of type 157 * Find queue entry for the given @a rid.
198 * #GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK_RESPONSE.
199 * 158 *
200 * @param qe the respective entry in the message queue 159 * @param h handle to search
201 * @param msg the message we received 160 * @param rid request ID to look for
202 * @param size the message size 161 * @return NULL if not found, otherwise the queue entry (removed from the queue)
203 * @return #GNUNET_OK on success, #GNUNET_SYSERR on error and we did NOT notify the client
204 */ 162 */
205static int 163static struct GNUNET_NAMECACHE_QueueEntry *
206handle_lookup_block_response (struct GNUNET_NAMECACHE_QueueEntry *qe, 164find_qe (struct GNUNET_NAMECACHE_Handle *h,
207 const struct LookupBlockResponseMessage *msg, 165 uint32_t rid)
208 size_t size)
209{ 166{
210 struct GNUNET_GNSRECORD_Block *block; 167 struct GNUNET_NAMECACHE_QueueEntry *qe;
211 char buf[size + sizeof (struct GNUNET_GNSRECORD_Block)
212 - sizeof (struct LookupBlockResponseMessage)];
213
214 LOG (GNUNET_ERROR_TYPE_DEBUG,
215 "Received `%s'\n",
216 "LOOKUP_BLOCK_RESPONSE");
217 if (0 == GNUNET_TIME_absolute_ntoh (msg->expire).abs_value_us)
218 {
219 /* no match found */
220 if (NULL != qe->block_proc)
221 qe->block_proc (qe->block_proc_cls, NULL);
222 return GNUNET_OK;
223 }
224 168
225 block = (struct GNUNET_GNSRECORD_Block *) buf; 169 for (qe = h->op_head; qe != NULL; qe = qe->next)
226 block->signature = msg->signature;
227 block->derived_key = msg->derived_key;
228 block->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_GNS_RECORD_SIGN);
229 block->purpose.size = htonl (size - sizeof (struct LookupBlockResponseMessage) +
230 sizeof (struct GNUNET_TIME_AbsoluteNBO) +
231 sizeof (struct GNUNET_CRYPTO_EccSignaturePurpose));
232 block->expiration_time = msg->expire;
233 memcpy (&block[1],
234 &msg[1],
235 size - sizeof (struct LookupBlockResponseMessage));
236 if (GNUNET_OK !=
237 GNUNET_GNSRECORD_block_verify (block))
238 { 170 {
239 GNUNET_break (0); 171 if (qe->op_id == rid)
240 return GNUNET_SYSERR; 172 {
173 GNUNET_CONTAINER_DLL_remove (h->op_head,
174 h->op_tail,
175 qe);
176 return qe;
177 }
241 } 178 }
242 if (NULL != qe->block_proc) 179 return NULL;
243 qe->block_proc (qe->block_proc_cls, block);
244 else
245 GNUNET_break (0);
246 return GNUNET_OK;
247} 180}
248 181
249 182
250/** 183/**
251 * Handle an incoming message of type 184 * Handle an incoming message of type
252 * #GNUNET_MESSAGE_TYPE_NAMECACHE_BLOCK_CACHE_RESPONSE 185 * #GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK_RESPONSE.
253 * 186 *
254 * @param qe the respective entry in the message queue 187 * @param cls the `struct GNUNET_NAMECACHE_Handle`
255 * @param msg the message we received 188 * @param msg the message we received
256 * @param size the message size
257 * @return #GNUNET_OK on success, #GNUNET_SYSERR on error and we did NOT notify the client
258 */ 189 */
259static int 190static int
260handle_block_cache_response (struct GNUNET_NAMECACHE_QueueEntry *qe, 191check_lookup_block_response (void *cls,
261 const struct BlockCacheResponseMessage *msg, 192 const struct LookupBlockResponseMessage *msg)
262 size_t size)
263{ 193{
264 int res; 194 /* any length will do, format validation is in handler */
265
266 LOG (GNUNET_ERROR_TYPE_DEBUG,
267 "Received `%s'\n",
268 "BLOCK_CACHE_RESPONSE");
269 res = ntohl (msg->op_result);
270 /* TODO: add actual error message from namecache to response... */
271 if (NULL != qe->cont)
272 qe->cont (qe->cont_cls,
273 res,
274 (GNUNET_OK == res)
275 ? NULL
276 : _("Namecache failed to cache block"));
277 return GNUNET_OK; 195 return GNUNET_OK;
278} 196}
279 197
280 198
281/** 199/**
282 * Handle incoming messages for record operations 200 * Handle an incoming message of type
201 * #GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK_RESPONSE.
283 * 202 *
284 * @param qe the respective zone iteration handle 203 * @param cls the `struct GNUNET_NAMECACHE_Handle`
285 * @param msg the message we received 204 * @param msg the message we received
286 * @param type the message type in host byte order
287 * @param size the message size
288 * @return #GNUNET_OK on success, #GNUNET_NO if we notified the client about
289 * the error, #GNUNET_SYSERR on error and we did NOT notify the client
290 */
291static int
292manage_record_operations (struct GNUNET_NAMECACHE_QueueEntry *qe,
293 const struct GNUNET_MessageHeader *msg,
294 uint16_t type,
295 size_t size)
296{
297 /* handle different message type */
298 switch (type)
299 {
300 case GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK_RESPONSE:
301 if (size < sizeof (struct LookupBlockResponseMessage))
302 {
303 GNUNET_break (0);
304 return GNUNET_SYSERR;
305 }
306 return handle_lookup_block_response (qe, (const struct LookupBlockResponseMessage *) msg, size);
307 case GNUNET_MESSAGE_TYPE_NAMECACHE_BLOCK_CACHE_RESPONSE:
308 if (size != sizeof (struct BlockCacheResponseMessage))
309 {
310 GNUNET_break (0);
311 return GNUNET_SYSERR;
312 }
313 return handle_block_cache_response (qe, (const struct BlockCacheResponseMessage *) msg, size);
314 default:
315 GNUNET_break (0);
316 return GNUNET_SYSERR;
317 }
318}
319
320
321/**
322 * Type of a function to call when we receive a message
323 * from the service.
324 *
325 * @param cls the `struct GNUNET_NAMECACHE_SchedulingHandle`
326 * @param msg message received, NULL on timeout or fatal error
327 */ 205 */
328static void 206static void
329process_namecache_message (void *cls, 207handle_lookup_block_response (void *cls,
330 const struct GNUNET_MessageHeader *msg) 208 const struct LookupBlockResponseMessage *msg)
331{ 209{
332 struct GNUNET_NAMECACHE_Handle *h = cls; 210 struct GNUNET_NAMECACHE_Handle *h = cls;
333 const struct GNUNET_NAMECACHE_Header *gm; 211 size_t size;
334 struct GNUNET_NAMECACHE_QueueEntry *qe; 212 struct GNUNET_NAMECACHE_QueueEntry *qe;
335 uint16_t size;
336 uint16_t type;
337 uint32_t r_id;
338 int ret;
339 213
340 if (NULL == msg) 214 LOG (GNUNET_ERROR_TYPE_DEBUG,
341 { 215 "Received LOOKUP_BLOCK_RESPONSE\n");
342 force_reconnect (h); 216 qe = find_qe (h,
217 ntohl (msg->gns_header.r_id));
218 if (NULL == qe)
343 return; 219 return;
344 } 220 if (0 == GNUNET_TIME_absolute_ntoh (msg->expire).abs_value_us)
345 size = ntohs (msg->size);
346 type = ntohs (msg->type);
347 if (size < sizeof (struct GNUNET_NAMECACHE_Header))
348 { 221 {
349 GNUNET_break_op (0); 222 /* no match found */
350 GNUNET_CLIENT_receive (h->client, 223 if (NULL != qe->block_proc)
351 &process_namecache_message, h, 224 qe->block_proc (qe->block_proc_cls,
352 GNUNET_TIME_UNIT_FOREVER_REL); 225 NULL);
226 GNUNET_free (qe);
353 return; 227 return;
354 } 228 }
355 gm = (const struct GNUNET_NAMECACHE_Header *) msg; 229 size = ntohs (msg->gns_header.header.size)
356 r_id = ntohl (gm->r_id); 230 - sizeof (struct LookupBlockResponseMessage);
357
358 LOG (GNUNET_ERROR_TYPE_DEBUG,
359 "Received message type %u size %u op %u\n",
360 (unsigned int) type,
361 (unsigned int) size,
362 (unsigned int) r_id);
363
364 /* Is it a record related operation ? */
365 for (qe = h->op_head; qe != NULL; qe = qe->next)
366 if (qe->op_id == r_id)
367 break;
368 if (NULL != qe)
369 { 231 {
370 ret = manage_record_operations (qe, msg, type, size); 232 char buf[size + sizeof (struct GNUNET_GNSRECORD_Block)] GNUNET_ALIGN;
371 if (GNUNET_SYSERR == ret) 233 struct GNUNET_GNSRECORD_Block *block;
234
235 block = (struct GNUNET_GNSRECORD_Block *) buf;
236 block->signature = msg->signature;
237 block->derived_key = msg->derived_key;
238 block->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_GNS_RECORD_SIGN);
239 block->purpose.size = htonl (size +
240 sizeof (struct GNUNET_TIME_AbsoluteNBO) +
241 sizeof (struct GNUNET_CRYPTO_EccSignaturePurpose));
242 block->expiration_time = msg->expire;
243 memcpy (&block[1],
244 &msg[1],
245 size);
246 if (GNUNET_OK !=
247 GNUNET_GNSRECORD_block_verify (block))
372 { 248 {
373 /* protocol error, need to reconnect */ 249 GNUNET_break (0);
374 h->reconnect = GNUNET_YES; 250 if (NULL != qe->block_proc)
251 qe->block_proc (qe->block_proc_cls,
252 NULL);
253 force_reconnect (h);
375 } 254 }
376 else 255 else
377 { 256 {
378 /* client was notified about success or failure, clean up 'qe' */ 257 if (NULL != qe->block_proc)
379 GNUNET_CONTAINER_DLL_remove (h->op_head, 258 qe->block_proc (qe->block_proc_cls,
380 h->op_tail, 259 block);
381 qe);
382 GNUNET_free (qe);
383 } 260 }
384 } 261 }
385 if (GNUNET_YES == h->reconnect) 262 GNUNET_free (qe);
386 {
387 force_reconnect (h);
388 return;
389 }
390 GNUNET_CLIENT_receive (h->client, &process_namecache_message, h,
391 GNUNET_TIME_UNIT_FOREVER_REL);
392} 263}
393 264
394 265
395/** 266/**
396 * Transmit messages from the message queue to the service 267 * Handle an incoming message of type
397 * (if there are any, and if we are not already trying). 268 * #GNUNET_MESSAGE_TYPE_NAMECACHE_BLOCK_CACHE_RESPONSE
398 *
399 * @param h handle to use
400 */
401static void
402do_transmit (struct GNUNET_NAMECACHE_Handle *h);
403
404
405/**
406 * We can now transmit a message to NAMECACHE. Do it.
407 * 269 *
408 * @param cls the `struct GNUNET_NAMECACHE_Handle` 270 * @param cls the `struct GNUNET_NAMECACHE_Handle`
409 * @param size number of bytes we can transmit 271 * @param msg the message we received
410 * @param buf where to copy the messages 272 * @param size the message size
411 * @return number of bytes copied into @a buf 273 * @return #GNUNET_OK on success, #GNUNET_SYSERR on error and we did NOT notify the client
412 */ 274 */
413static size_t 275static void
414transmit_message_to_namecache (void *cls, 276handle_block_cache_response (void *cls,
415 size_t size, 277 const struct BlockCacheResponseMessage *msg)
416 void *buf)
417{ 278{
418 struct GNUNET_NAMECACHE_Handle *h = cls; 279 struct GNUNET_NAMECACHE_Handle *h = cls;
419 struct PendingMessage *p; 280 struct GNUNET_NAMECACHE_QueueEntry *qe;
420 size_t ret; 281 int res;
421 char *cbuf;
422 282
423 h->th = NULL; 283 LOG (GNUNET_ERROR_TYPE_DEBUG,
424 if ((0 == size) || (NULL == buf)) 284 "Received BLOCK_CACHE_RESPONSE\n");
425 { 285 qe = find_qe (h,
426 force_reconnect (h); 286 ntohl (msg->gns_header.r_id));
427 return 0; 287 if (NULL == qe)
428 } 288 return;
429 ret = 0; 289 res = ntohl (msg->op_result);
430 cbuf = buf; 290 /* TODO: add actual error message from namecache to response... */
431 while ( (NULL != (p = h->pending_head)) && 291 if (NULL != qe->cont)
432 (p->size <= size) ) 292 qe->cont (qe->cont_cls,
433 { 293 res,
434 memcpy (&cbuf[ret], &p[1], p->size); 294 (GNUNET_OK == res)
435 ret += p->size; 295 ? NULL
436 size -= p->size; 296 : _("Namecache failed to cache block"));
437 GNUNET_CONTAINER_DLL_remove (h->pending_head, 297 GNUNET_free (qe);
438 h->pending_tail,
439 p);
440 if (GNUNET_NO == h->is_receiving)
441 {
442 h->is_receiving = GNUNET_YES;
443 GNUNET_CLIENT_receive (h->client,
444 &process_namecache_message, h,
445 GNUNET_TIME_UNIT_FOREVER_REL);
446 }
447 GNUNET_free (p);
448 }
449 do_transmit (h);
450 return ret;
451} 298}
452 299
453 300
454/** 301/**
455 * Transmit messages from the message queue to the service 302 * Generic error handler, called with the appropriate error code and
456 * (if there are any, and if we are not already trying). 303 * the same closure specified at the creation of the message queue.
304 * Not every message queue implementation supports an error handler.
457 * 305 *
458 * @param h handle to use 306 * @param cls closure with the `struct GNUNET_NAMECACHE_Handle *`
307 * @param error error code
459 */ 308 */
460static void 309static void
461do_transmit (struct GNUNET_NAMECACHE_Handle *h) 310mq_error_handler (void *cls,
311 enum GNUNET_MQ_Error error)
462{ 312{
463 struct PendingMessage *p; 313 struct GNUNET_NAMECACHE_Handle *h = cls;
464 314
465 if (NULL != h->th) 315 force_reconnect (h);
466 return; /* transmission request already pending */
467 if (NULL == (p = h->pending_head))
468 return; /* transmission queue empty */
469 if (NULL == h->client)
470 return; /* currently reconnecting */
471 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, p->size,
472 GNUNET_TIME_UNIT_FOREVER_REL,
473 GNUNET_NO, &transmit_message_to_namecache,
474 h);
475 GNUNET_break (NULL != h->th);
476} 316}
477 317
478 318
@@ -484,10 +324,23 @@ do_transmit (struct GNUNET_NAMECACHE_Handle *h)
484static void 324static void
485reconnect (struct GNUNET_NAMECACHE_Handle *h) 325reconnect (struct GNUNET_NAMECACHE_Handle *h)
486{ 326{
487 GNUNET_assert (NULL == h->client); 327 GNUNET_MQ_hd_var_size (lookup_block_response,
488 h->client = GNUNET_CLIENT_connect ("namecache", h->cfg); 328 GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK_RESPONSE,
489 GNUNET_assert (NULL != h->client); 329 struct LookupBlockResponseMessage);
490 do_transmit (h); 330 GNUNET_MQ_hd_fixed_size (block_cache_response,
331 GNUNET_MESSAGE_TYPE_NAMECACHE_BLOCK_CACHE_RESPONSE,
332 struct BlockCacheResponseMessage);
333 struct GNUNET_MQ_MessageHandler handlers[] = {
334 make_lookup_block_response_handler (h),
335 make_block_cache_response_handler (h),
336 GNUNET_MQ_handler_end ()
337 };
338 GNUNET_assert (NULL == h->mq);
339 h->mq = GNUNET_CLIENT_connecT (h->cfg,
340 "namecache",
341 handlers,
342 &mq_error_handler,
343 h);
491} 344}
492 345
493 346
@@ -514,17 +367,24 @@ reconnect_task (void *cls)
514static void 367static void
515force_reconnect (struct GNUNET_NAMECACHE_Handle *h) 368force_reconnect (struct GNUNET_NAMECACHE_Handle *h)
516{ 369{
517 if (NULL != h->th) 370 struct GNUNET_NAMECACHE_QueueEntry *qe;
371
372 h->reconnect = GNUNET_NO;
373 GNUNET_MQ_destroy (h->mq);
374 h->mq = NULL;
375 while (NULL != (qe = h->op_head))
518 { 376 {
519 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); 377 GNUNET_CONTAINER_DLL_remove (h->op_head,
520 h->th = NULL; 378 h->op_tail,
379 qe);
380 if (NULL != qe->cont)
381 qe->cont (qe->cont_cls,
382 GNUNET_SYSERR,
383 _("Error communicating with namecache service"));
384 GNUNET_free (qe);
521 } 385 }
522 h->reconnect = GNUNET_NO;
523 GNUNET_CLIENT_disconnect (h->client);
524 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 386 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
525 "Reconnecting to namecache\n"); 387 "Reconnecting to namecache\n");
526 h->is_receiving = GNUNET_NO;
527 h->client = NULL;
528 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); 388 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
529 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, 389 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
530 &reconnect_task, 390 &reconnect_task,
@@ -558,8 +418,12 @@ GNUNET_NAMECACHE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
558 418
559 h = GNUNET_new (struct GNUNET_NAMECACHE_Handle); 419 h = GNUNET_new (struct GNUNET_NAMECACHE_Handle);
560 h->cfg = cfg; 420 h->cfg = cfg;
561 h->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect_task, h); 421 reconnect (h);
562 h->last_op_id_used = 0; 422 if (NULL == h->mq)
423 {
424 GNUNET_free (h);
425 return NULL;
426 }
563 return h; 427 return h;
564} 428}
565 429
@@ -573,31 +437,20 @@ GNUNET_NAMECACHE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
573void 437void
574GNUNET_NAMECACHE_disconnect (struct GNUNET_NAMECACHE_Handle *h) 438GNUNET_NAMECACHE_disconnect (struct GNUNET_NAMECACHE_Handle *h)
575{ 439{
576 struct PendingMessage *p;
577 struct GNUNET_NAMECACHE_QueueEntry *q; 440 struct GNUNET_NAMECACHE_QueueEntry *q;
578 441
579 LOG (GNUNET_ERROR_TYPE_DEBUG, "Cleaning up\n");
580 GNUNET_assert (NULL != h);
581 if (NULL != h->th)
582 {
583 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
584 h->th = NULL;
585 }
586 while (NULL != (p = h->pending_head))
587 {
588 GNUNET_CONTAINER_DLL_remove (h->pending_head, h->pending_tail, p);
589 GNUNET_free (p);
590 }
591 GNUNET_break (NULL == h->op_head); 442 GNUNET_break (NULL == h->op_head);
592 while (NULL != (q = h->op_head)) 443 while (NULL != (q = h->op_head))
593 { 444 {
594 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, q); 445 GNUNET_CONTAINER_DLL_remove (h->op_head,
446 h->op_tail,
447 q);
595 GNUNET_free (q); 448 GNUNET_free (q);
596 } 449 }
597 if (NULL != h->client) 450 if (NULL != h->mq)
598 { 451 {
599 GNUNET_CLIENT_disconnect (h->client); 452 GNUNET_MQ_destroy (h->mq);
600 h->client = NULL; 453 h->mq = NULL;
601 } 454 }
602 if (NULL != h->reconnect_task) 455 if (NULL != h->reconnect_task)
603 { 456 {
@@ -615,8 +468,8 @@ GNUNET_NAMECACHE_disconnect (struct GNUNET_NAMECACHE_Handle *h)
615 * @param h handle to the namecache 468 * @param h handle to the namecache
616 * @param block block to store 469 * @param block block to store
617 * @param cont continuation to call when done 470 * @param cont continuation to call when done
618 * @param cont_cls closure for cont 471 * @param cont_cls closure for @a cont
619 * @return handle to abort the request 472 * @return handle to abort the request, NULL on error
620 */ 473 */
621struct GNUNET_NAMECACHE_QueueEntry * 474struct GNUNET_NAMECACHE_QueueEntry *
622GNUNET_NAMECACHE_block_cache (struct GNUNET_NAMECACHE_Handle *h, 475GNUNET_NAMECACHE_block_cache (struct GNUNET_NAMECACHE_Handle *h,
@@ -625,13 +478,13 @@ GNUNET_NAMECACHE_block_cache (struct GNUNET_NAMECACHE_Handle *h,
625 void *cont_cls) 478 void *cont_cls)
626{ 479{
627 struct GNUNET_NAMECACHE_QueueEntry *qe; 480 struct GNUNET_NAMECACHE_QueueEntry *qe;
628 struct PendingMessage *pe;
629 struct BlockCacheMessage *msg; 481 struct BlockCacheMessage *msg;
482 struct GNUNET_MQ_Envelope *env;
630 uint32_t rid; 483 uint32_t rid;
631 size_t blen; 484 size_t blen;
632 size_t msg_size;
633 485
634 GNUNET_assert (NULL != h); 486 if (NULL == h->mq)
487 return NULL;
635 blen = ntohl (block->purpose.size) 488 blen = ntohl (block->purpose.size)
636 - sizeof (struct GNUNET_TIME_AbsoluteNBO) 489 - sizeof (struct GNUNET_TIME_AbsoluteNBO)
637 - sizeof (struct GNUNET_CRYPTO_EccSignaturePurpose); 490 - sizeof (struct GNUNET_CRYPTO_EccSignaturePurpose);
@@ -641,27 +494,22 @@ GNUNET_NAMECACHE_block_cache (struct GNUNET_NAMECACHE_Handle *h,
641 qe->cont = cont; 494 qe->cont = cont;
642 qe->cont_cls = cont_cls; 495 qe->cont_cls = cont_cls;
643 qe->op_id = rid; 496 qe->op_id = rid;
644 GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, qe); 497 GNUNET_CONTAINER_DLL_insert_tail (h->op_head,
645 498 h->op_tail,
646 /* setup msg */ 499 qe);
647 msg_size = sizeof (struct BlockCacheMessage) + blen; 500 /* send msg */
648 pe = GNUNET_malloc (sizeof (struct PendingMessage) + msg_size); 501 env = GNUNET_MQ_msg_extra (msg,
649 pe->size = msg_size; 502 blen,
650 msg = (struct BlockCacheMessage *) &pe[1]; 503 GNUNET_MESSAGE_TYPE_NAMECACHE_BLOCK_CACHE);
651 msg->gns_header.header.type = htons (GNUNET_MESSAGE_TYPE_NAMECACHE_BLOCK_CACHE);
652 msg->gns_header.header.size = htons (msg_size);
653 msg->gns_header.r_id = htonl (rid); 504 msg->gns_header.r_id = htonl (rid);
654 msg->expire = block->expiration_time; 505 msg->expire = block->expiration_time;
655 msg->signature = block->signature; 506 msg->signature = block->signature;
656 msg->derived_key = block->derived_key; 507 msg->derived_key = block->derived_key;
657 memcpy (&msg[1], &block[1], blen); 508 memcpy (&msg[1],
658 LOG (GNUNET_ERROR_TYPE_DEBUG, 509 &block[1],
659 "Sending `%s' message with size %u and expiration %s\n", 510 blen);
660 "NAMECACHE_BLOCK_CACHE", 511 GNUNET_MQ_send (h->mq,
661 (unsigned int) msg_size, 512 env);
662 GNUNET_STRINGS_absolute_time_to_string (GNUNET_TIME_absolute_ntoh (msg->expire)));
663 GNUNET_CONTAINER_DLL_insert_tail (h->pending_head, h->pending_tail, pe);
664 do_transmit (h);
665 return qe; 513 return qe;
666} 514}
667 515
@@ -674,41 +522,40 @@ GNUNET_NAMECACHE_block_cache (struct GNUNET_NAMECACHE_Handle *h,
674 * @param derived_hash hash of zone key combined with name to lookup 522 * @param derived_hash hash of zone key combined with name to lookup
675 * @param proc function to call on the matching block, or with 523 * @param proc function to call on the matching block, or with
676 * NULL if there is no matching block 524 * NULL if there is no matching block
677 * @param proc_cls closure for proc 525 * @param proc_cls closure for @a proc
678 * @return a handle that can be used to cancel 526 * @return a handle that can be used to cancel, NULL on error
679 */ 527 */
680struct GNUNET_NAMECACHE_QueueEntry * 528struct GNUNET_NAMECACHE_QueueEntry *
681GNUNET_NAMECACHE_lookup_block (struct GNUNET_NAMECACHE_Handle *h, 529GNUNET_NAMECACHE_lookup_block (struct GNUNET_NAMECACHE_Handle *h,
682 const struct GNUNET_HashCode *derived_hash, 530 const struct GNUNET_HashCode *derived_hash,
683 GNUNET_NAMECACHE_BlockProcessor proc, void *proc_cls) 531 GNUNET_NAMECACHE_BlockProcessor proc,
532 void *proc_cls)
684{ 533{
685 struct GNUNET_NAMECACHE_QueueEntry *qe; 534 struct GNUNET_NAMECACHE_QueueEntry *qe;
686 struct PendingMessage *pe;
687 struct LookupBlockMessage *msg; 535 struct LookupBlockMessage *msg;
688 size_t msg_size; 536 struct GNUNET_MQ_Envelope *env;
689 uint32_t rid; 537 uint32_t rid;
690 538
539 if (NULL == h->mq)
540 return NULL;
691 LOG (GNUNET_ERROR_TYPE_DEBUG, 541 LOG (GNUNET_ERROR_TYPE_DEBUG,
692 "Looking for block under %s\n", 542 "Looking for block under %s\n",
693 GNUNET_h2s (derived_hash)); 543 GNUNET_h2s (derived_hash));
694 rid = get_op_id(h); 544 rid = get_op_id (h);
695 qe = GNUNET_new (struct GNUNET_NAMECACHE_QueueEntry); 545 qe = GNUNET_new (struct GNUNET_NAMECACHE_QueueEntry);
696 qe->nsh = h; 546 qe->nsh = h;
697 qe->block_proc = proc; 547 qe->block_proc = proc;
698 qe->block_proc_cls = proc_cls; 548 qe->block_proc_cls = proc_cls;
699 qe->op_id = rid; 549 qe->op_id = rid;
700 GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, qe); 550 GNUNET_CONTAINER_DLL_insert_tail (h->op_head,
701 551 h->op_tail,
702 msg_size = sizeof (struct LookupBlockMessage); 552 qe);
703 pe = GNUNET_malloc (sizeof (struct PendingMessage) + msg_size); 553 env = GNUNET_MQ_msg (msg,
704 pe->size = msg_size; 554 GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK);
705 msg = (struct LookupBlockMessage *) &pe[1];
706 msg->gns_header.header.type = htons (GNUNET_MESSAGE_TYPE_NAMECACHE_LOOKUP_BLOCK);
707 msg->gns_header.header.size = htons (msg_size);
708 msg->gns_header.r_id = htonl (rid); 555 msg->gns_header.r_id = htonl (rid);
709 msg->query = *derived_hash; 556 msg->query = *derived_hash;
710 GNUNET_CONTAINER_DLL_insert_tail (h->pending_head, h->pending_tail, pe); 557 GNUNET_MQ_send (h->mq,
711 do_transmit (h); 558 env);
712 return qe; 559 return qe;
713} 560}
714 561
@@ -724,8 +571,9 @@ GNUNET_NAMECACHE_cancel (struct GNUNET_NAMECACHE_QueueEntry *qe)
724{ 571{
725 struct GNUNET_NAMECACHE_Handle *h = qe->nsh; 572 struct GNUNET_NAMECACHE_Handle *h = qe->nsh;
726 573
727 GNUNET_assert (NULL != qe); 574 GNUNET_CONTAINER_DLL_remove (h->op_head,
728 GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, qe); 575 h->op_tail,
576 qe);
729 GNUNET_free(qe); 577 GNUNET_free(qe);
730} 578}
731 579