aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-10-23 16:03:54 +0000
committerChristian Grothoff <christian@grothoff.org>2016-10-23 16:03:54 +0000
commit48f8bbc215fc84a295993fb5bc529a9fe9b11b7e (patch)
tree1bdaede9381fe465fec7e7149d35a562a0fa7dc8 /src/util
parent4eecf868f0ed39d472884cf6b415bb3b3460dee7 (diff)
downloadgnunet-48f8bbc215fc84a295993fb5bc529a9fe9b11b7e.tar.gz
gnunet-48f8bbc215fc84a295993fb5bc529a9fe9b11b7e.zip
move to new client API: remove old client API
Diffstat (limited to 'src/util')
-rw-r--r--src/util/Makefile.am1
-rw-r--r--src/util/client.c1299
-rw-r--r--src/util/mq.c197
3 files changed, 0 insertions, 1497 deletions
diff --git a/src/util/Makefile.am b/src/util/Makefile.am
index 776927219..f28861f46 100644
--- a/src/util/Makefile.am
+++ b/src/util/Makefile.am
@@ -59,7 +59,6 @@ test_common_logging_dummy_LDADD = \
59libgnunetutil_la_SOURCES = \ 59libgnunetutil_la_SOURCES = \
60 bandwidth.c \ 60 bandwidth.c \
61 bio.c \ 61 bio.c \
62 client.c \
63 client_new.c \ 62 client_new.c \
64 common_allocation.c \ 63 common_allocation.c \
65 common_endian.c \ 64 common_endian.c \
diff --git a/src/util/client.c b/src/util/client.c
deleted file mode 100644
index 47db91c8e..000000000
--- a/src/util/client.c
+++ /dev/null
@@ -1,1299 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2001-2013 GNUnet e.V.
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
19*/
20
21/**
22 * @file util/client.c
23 * @brief code for access to services
24 * @author Christian Grothoff
25 *
26 * Generic TCP code for reliable, record-oriented TCP
27 * connections between clients and service providers.
28 */
29#include "platform.h"
30#include "gnunet_protocols.h"
31#include "gnunet_util_lib.h"
32#include "gnunet_socks.h"
33
34
35/**
36 * How often do we re-try tranmsitting requests before giving up?
37 * Note that if we succeeded transmitting a request but failed to read
38 * a response, we do NOT re-try.
39 */
40#define MAX_ATTEMPTS 50
41
42#define LOG(kind,...) GNUNET_log_from (kind, "util",__VA_ARGS__)
43
44/**
45 * Handle for a transmission request.
46 */
47struct GNUNET_CLIENT_TransmitHandle
48{
49 /**
50 * Connection state.
51 */
52 struct GNUNET_CLIENT_Connection *client;
53
54 /**
55 * Function to call to get the data for transmission.
56 */
57 GNUNET_CONNECTION_TransmitReadyNotify notify;
58
59 /**
60 * Closure for @e notify.
61 */
62 void *notify_cls;
63
64 /**
65 * Handle to the transmission with the underlying
66 * connection.
67 */
68 struct GNUNET_CONNECTION_TransmitHandle *th;
69
70 /**
71 * If we are re-trying and are delaying to do so,
72 * handle to the scheduled task managing the delay.
73 */
74 struct GNUNET_SCHEDULER_Task *reconnect_task;
75
76 /**
77 * Timeout for the operation overall.
78 */
79 struct GNUNET_TIME_Absolute timeout;
80
81 /**
82 * Number of bytes requested.
83 */
84 size_t size;
85
86 /**
87 * Are we allowed to re-try to connect without telling
88 * the user (of this API) about the connection troubles?
89 */
90 int auto_retry;
91
92 /**
93 * Number of attempts left for transmitting the request. We may
94 * fail the first time (say because the service is not yet up), in
95 * which case (if auto_retry is set) we wait a bit and re-try
96 * (timeout permitting).
97 */
98 unsigned int attempts_left;
99
100};
101
102
103/**
104 * Struct to refer to a GNUnet TCP connection.
105 * This is more than just a socket because if the server
106 * drops the connection, the client automatically tries
107 * to reconnect (and for that needs connection information).
108 */
109struct GNUNET_CLIENT_Connection
110{
111
112 /**
113 * The connection handle, NULL if not live
114 */
115 struct GNUNET_CONNECTION_Handle *connection;
116
117 /**
118 * Our configuration.
119 */
120 const struct GNUNET_CONFIGURATION_Handle *cfg;
121
122 /**
123 * Name of the service we interact with.
124 */
125 char *service_name;
126
127 /**
128 * Handler for current receiver task.
129 */
130 GNUNET_CLIENT_MessageHandler receiver_handler;
131
132 /**
133 * Closure for @e receiver_handler.
134 */
135 void *receiver_handler_cls;
136
137 /**
138 * Handle for a pending transmission request, NULL if there is
139 * none pending.
140 */
141 struct GNUNET_CLIENT_TransmitHandle *th;
142
143 /**
144 * If we are re-trying and are delaying to do so,
145 * handle to the scheduled task managing the delay.
146 */
147 struct GNUNET_SCHEDULER_Task * receive_task;
148
149 /**
150 * Buffer for received message.
151 */
152 char *received_buf;
153
154 /**
155 * Timeout for receiving a response (absolute time).
156 */
157 struct GNUNET_TIME_Absolute receive_timeout;
158
159 /**
160 * Current value for our incremental back-off (for
161 * connect re-tries).
162 */
163 struct GNUNET_TIME_Relative back_off;
164
165 /**
166 * Number of bytes in received_buf that are valid.
167 */
168 size_t received_pos;
169
170 /**
171 * Size of received_buf.
172 */
173 unsigned int received_size;
174
175 /**
176 * Do we have a complete response in received_buf?
177 */
178 int msg_complete;
179
180 /**
181 * Are we currently busy doing receive-processing?
182 * #GNUNET_YES if so, #GNUNET_NO if not. #GNUNET_SYSERR
183 * if the connection has failed (but we may not have
184 * closed the handle itself yet).
185 */
186 int in_receive;
187
188 /**
189 * Is this the first message we are sending to the service?
190 */
191 int first_message;
192
193 /**
194 * How often have we tried to connect?
195 */
196 unsigned int attempts;
197
198};
199
200
201/**
202 * Try connecting to the server using UNIX domain sockets.
203 *
204 * @param service_name name of service to connect to
205 * @param cfg configuration to use
206 * @return NULL on error, connection to UNIX otherwise
207 */
208static struct GNUNET_CONNECTION_Handle *
209try_unixpath (const char *service_name,
210 const struct GNUNET_CONFIGURATION_Handle *cfg)
211{
212#if AF_UNIX
213 struct GNUNET_CONNECTION_Handle *connection;
214 char *unixpath;
215 struct sockaddr_un s_un;
216
217 unixpath = NULL;
218 if ((GNUNET_OK == GNUNET_CONFIGURATION_get_value_filename (cfg, service_name, "UNIXPATH", &unixpath)) &&
219 (0 < strlen (unixpath)))
220 {
221 /* We have a non-NULL unixpath, need to validate it */
222 if (strlen (unixpath) >= sizeof (s_un.sun_path))
223 {
224 LOG (GNUNET_ERROR_TYPE_WARNING,
225 _("UNIXPATH `%s' too long, maximum length is %llu\n"), unixpath,
226 (unsigned long long) sizeof (s_un.sun_path));
227 unixpath = GNUNET_NETWORK_shorten_unixpath (unixpath);
228 LOG (GNUNET_ERROR_TYPE_INFO,
229 _("Using `%s' instead\n"), unixpath);
230 if (NULL == unixpath)
231 return NULL;
232 }
233 connection = GNUNET_CONNECTION_create_from_connect_to_unixpath (cfg, unixpath);
234 if (NULL != connection)
235 {
236 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to unixpath `%s'!\n",
237 unixpath);
238 GNUNET_free (unixpath);
239 return connection;
240 }
241 }
242 GNUNET_free_non_null (unixpath);
243#endif
244 return NULL;
245}
246
247
248/**
249 * Test whether the configuration has proper values for connection
250 * (UNIXPATH || (PORT && HOSTNAME)).
251 *
252 * @param service_name name of service to connect to
253 * @param cfg configuration to use
254 * @return #GNUNET_OK if the configuration is valid, #GNUNET_SYSERR if not
255 */
256static int
257test_service_configuration (const char *service_name,
258 const struct GNUNET_CONFIGURATION_Handle *cfg)
259{
260 int ret = GNUNET_SYSERR;
261 char *hostname = NULL;
262 unsigned long long port;
263#if AF_UNIX
264 char *unixpath = NULL;
265
266 if ((GNUNET_OK == GNUNET_CONFIGURATION_get_value_filename (cfg, service_name, "UNIXPATH", &unixpath)) &&
267 (0 < strlen (unixpath)))
268 ret = GNUNET_OK;
269 GNUNET_free_non_null (unixpath);
270#endif
271
272 if ( (GNUNET_YES ==
273 GNUNET_CONFIGURATION_have_value (cfg, service_name, "PORT")) &&
274 (GNUNET_OK ==
275 GNUNET_CONFIGURATION_get_value_number (cfg, service_name, "PORT", &port)) &&
276 (port <= 65535) && (0 != port) &&
277 (GNUNET_OK ==
278 GNUNET_CONFIGURATION_get_value_string (cfg, service_name, "HOSTNAME",
279 &hostname)) &&
280 (0 != strlen (hostname)) )
281 ret = GNUNET_OK;
282 GNUNET_free_non_null (hostname);
283 return ret;
284}
285
286
287/**
288 * Try to connect to the service.
289 *
290 * @param service_name name of service to connect to
291 * @param cfg configuration to use
292 * @param attempt counter used to alternate between IP and UNIX domain sockets
293 * @return NULL on error
294 */
295static struct GNUNET_CONNECTION_Handle *
296do_connect (const char *service_name,
297 const struct GNUNET_CONFIGURATION_Handle *cfg,
298 unsigned int attempt)
299{
300 struct GNUNET_CONNECTION_Handle *connection;
301 char *hostname;
302 unsigned long long port;
303
304 /* Never use a local source if a proxy is configured */
305 if (GNUNET_YES == GNUNET_SOCKS_check_service (service_name,cfg))
306 return GNUNET_SOCKS_do_connect (service_name,cfg);
307
308 connection = NULL;
309 if (0 == (attempt % 2))
310 {
311 /* on even rounds, try UNIX first */
312 connection = try_unixpath (service_name, cfg);
313 if (NULL != connection)
314 return connection;
315 }
316 if (GNUNET_YES ==
317 GNUNET_CONFIGURATION_have_value (cfg, service_name, "PORT"))
318 {
319 if ((GNUNET_OK !=
320 GNUNET_CONFIGURATION_get_value_number (cfg, service_name, "PORT", &port))
321 || (port > 65535) ||
322 (GNUNET_OK !=
323 GNUNET_CONFIGURATION_get_value_string (cfg, service_name, "HOSTNAME",
324 &hostname)))
325 {
326 LOG (GNUNET_ERROR_TYPE_WARNING,
327 _
328 ("Could not determine valid hostname and port for service `%s' from configuration.\n"),
329 service_name);
330 return NULL;
331 }
332 if (0 == strlen (hostname))
333 {
334 GNUNET_free (hostname);
335 LOG (GNUNET_ERROR_TYPE_WARNING,
336 _("Need a non-empty hostname for service `%s'.\n"), service_name);
337 return NULL;
338 }
339 }
340 else
341 {
342 /* unspecified means 0 (disabled) */
343 port = 0;
344 hostname = NULL;
345 }
346 if (0 == port)
347 {
348 /* if port is 0, try UNIX */
349 connection = try_unixpath (service_name, cfg);
350 if (NULL != connection)
351 {
352 GNUNET_free_non_null (hostname);
353 return connection;
354 }
355 LOG (GNUNET_ERROR_TYPE_DEBUG,
356 "Port is 0 for service `%s', UNIXPATH did not work, returning NULL!\n",
357 service_name);
358 GNUNET_free_non_null (hostname);
359 return NULL;
360 }
361 connection = GNUNET_CONNECTION_create_from_connect (cfg, hostname, port);
362 GNUNET_free (hostname);
363 return connection;
364}
365
366
367/**
368 * Create a message queue to connect to a GNUnet service.
369 * If handlers are specfied, receive messages from the connection.
370 *
371 * @param connection the client connection
372 * @param handlers handlers for receiving messages, can be NULL
373 * @param error_handler error handler
374 * @param error_handler_cls closure for the @a error_handler
375 * @return the message queue, NULL on error
376 */
377struct GNUNET_MQ_Handle *
378GNUNET_CLIENT_connecTX (const struct GNUNET_CONFIGURATION_Handle *cfg,
379 const char *service_name,
380 const struct GNUNET_MQ_MessageHandler *handlers,
381 GNUNET_MQ_ErrorHandler error_handler,
382 void *error_handler_cls)
383{
384 struct GNUNET_CLIENT_Connection *c;
385
386 c = GNUNET_CLIENT_connect (service_name,
387 cfg);
388 if (NULL == c)
389 return NULL;
390 return GNUNET_MQ_queue_for_connection_client (c,
391 handlers,
392 error_handler,
393 error_handler_cls);
394}
395
396
397/**
398 * Get a connection with a service.
399 *
400 * @param service_name name of the service
401 * @param cfg configuration to use
402 * @return NULL on error (service unknown to configuration)
403 */
404struct GNUNET_CLIENT_Connection *
405GNUNET_CLIENT_connect (const char *service_name,
406 const struct GNUNET_CONFIGURATION_Handle *cfg)
407{
408 struct GNUNET_CLIENT_Connection *client;
409 struct GNUNET_CONNECTION_Handle *connection;
410
411 if (GNUNET_OK !=
412 test_service_configuration (service_name,
413 cfg))
414 return NULL;
415 connection = do_connect (service_name, cfg, 0);
416 client = GNUNET_new (struct GNUNET_CLIENT_Connection);
417 client->first_message = GNUNET_YES;
418 client->attempts = 1;
419 client->connection = connection;
420 client->service_name = GNUNET_strdup (service_name);
421 client->cfg = cfg;
422 client->back_off = GNUNET_TIME_UNIT_MILLISECONDS;
423 return client;
424}
425
426
427/**
428 * Destroy connection with the service. This will automatically
429 * cancel any pending "receive" request (however, the handler will
430 * *NOT* be called, not even with a NULL message). Any pending
431 * transmission request will also be cancelled UNLESS the callback for
432 * the transmission request has already been called, in which case the
433 * transmission 'finish_pending_write' argument determines whether or
434 * not the write is guaranteed to complete before the socket is fully
435 * destroyed (unless, of course, there is an error with the server in
436 * which case the message may still be lost).
437 *
438 * @param client handle to the service connection
439 */
440void
441GNUNET_CLIENT_disconnect (struct GNUNET_CLIENT_Connection *client)
442{
443 if (GNUNET_YES == client->in_receive)
444 {
445 GNUNET_CONNECTION_receive_cancel (client->connection);
446 client->in_receive = GNUNET_NO;
447 }
448 if (NULL != client->th)
449 {
450 GNUNET_CLIENT_notify_transmit_ready_cancel (client->th);
451 client->th = NULL;
452 }
453 if (NULL != client->connection)
454 {
455 GNUNET_CONNECTION_destroy (client->connection);
456 client->connection = NULL;
457 }
458 if (NULL != client->receive_task)
459 {
460 GNUNET_SCHEDULER_cancel (client->receive_task);
461 client->receive_task = NULL;
462 }
463 client->receiver_handler = NULL;
464 GNUNET_array_grow (client->received_buf,
465 client->received_size,
466 0);
467 GNUNET_free (client->service_name);
468 GNUNET_free (client);
469}
470
471
472/**
473 * Check if message is complete. Sets the "msg_complete" member
474 * in the client struct.
475 *
476 * @param client connection with the buffer to check
477 */
478static void
479check_complete (struct GNUNET_CLIENT_Connection *client)
480{
481 if ((client->received_pos >= sizeof (struct GNUNET_MessageHeader)) &&
482 (client->received_pos >=
483 ntohs (((const struct GNUNET_MessageHeader *) client->received_buf)->
484 size)))
485 client->msg_complete = GNUNET_YES;
486}
487
488
489/**
490 * Callback function for data received from the network. Note that
491 * both @a available and @a errCode would be 0 if the read simply timed out.
492 *
493 * @param cls closure
494 * @param buf pointer to received data
495 * @param available number of bytes availabe in @a buf,
496 * possibly 0 (on errors)
497 * @param addr address of the sender
498 * @param addrlen size of @a addr
499 * @param errCode value of errno (on errors receiving)
500 */
501static void
502receive_helper (void *cls,
503 const void *buf,
504 size_t available,
505 const struct sockaddr *addr,
506 socklen_t addrlen,
507 int errCode)
508{
509 struct GNUNET_CLIENT_Connection *client = cls;
510 struct GNUNET_TIME_Relative remaining;
511 GNUNET_CLIENT_MessageHandler receive_handler;
512 void *receive_handler_cls;
513
514 GNUNET_assert (GNUNET_NO == client->msg_complete);
515 GNUNET_assert (GNUNET_YES == client->in_receive);
516 client->in_receive = GNUNET_NO;
517 if ( (0 == available) ||
518 (NULL == client->connection) ||
519 (0 != errCode) )
520 {
521 /* signal timeout! */
522 LOG (GNUNET_ERROR_TYPE_DEBUG,
523 "Timeout in receive_helper, available %u, client->connection %s, errCode `%s'\n",
524 (unsigned int) available,
525 NULL == client->connection ? "NULL" : "non-NULL",
526 STRERROR (errCode));
527 /* remember failure */
528 client->in_receive = GNUNET_SYSERR;
529 if (NULL != (receive_handler = client->receiver_handler))
530 {
531 receive_handler_cls = client->receiver_handler_cls;
532 client->receiver_handler = NULL;
533 receive_handler (receive_handler_cls,
534 NULL);
535 }
536 return;
537 }
538 /* FIXME: optimize for common fast case where buf contains the
539 * entire message and we need no copying... */
540
541 /* slow path: append to array */
542 if (client->received_size < client->received_pos + available)
543 GNUNET_array_grow (client->received_buf, client->received_size,
544 client->received_pos + available);
545 GNUNET_memcpy (&client->received_buf[client->received_pos], buf, available);
546 client->received_pos += available;
547 check_complete (client);
548 /* check for timeout */
549 remaining = GNUNET_TIME_absolute_get_remaining (client->receive_timeout);
550 if (0 == remaining.rel_value_us)
551 {
552 /* signal timeout! */
553 if (NULL != (receive_handler = client->receiver_handler))
554 {
555 client->receiver_handler = NULL;
556 receive_handler (client->receiver_handler_cls, NULL);
557 }
558 return;
559 }
560 /* back to receive -- either for more data or to call callback! */
561 GNUNET_CLIENT_receive (client, client->receiver_handler,
562 client->receiver_handler_cls, remaining);
563}
564
565
566/**
567 * Continuation to call the receive callback.
568 *
569 * @param cls our handle to the client connection
570 */
571static void
572receive_task (void *cls)
573{
574 struct GNUNET_CLIENT_Connection *client = cls;
575 GNUNET_CLIENT_MessageHandler handler = client->receiver_handler;
576 const struct GNUNET_MessageHeader *cmsg =
577 (const struct GNUNET_MessageHeader *) client->received_buf;
578 void *handler_cls = client->receiver_handler_cls;
579 uint16_t msize = ntohs (cmsg->size);
580 char mbuf[msize] GNUNET_ALIGN;
581 struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) mbuf;
582
583 client->receive_task = NULL;
584 if ( (GNUNET_SYSERR == client->in_receive) &&
585 (GNUNET_YES != client->msg_complete) )
586 {
587 /* Connection failure, signal to caller! */
588 client->receiver_handler = NULL;
589 if (NULL != handler)
590 handler (handler_cls,
591 NULL);
592 return;
593 }
594 LOG (GNUNET_ERROR_TYPE_DEBUG,
595 "Received message of type %u and size %u from %s service.\n",
596 ntohs (cmsg->type),
597 msize,
598 client->service_name);
599 GNUNET_assert (GNUNET_YES == client->msg_complete);
600 GNUNET_assert (client->received_pos >= msize);
601 GNUNET_memcpy (msg, cmsg, msize);
602 memmove (client->received_buf,
603 &client->received_buf[msize],
604 client->received_pos - msize);
605 client->received_pos -= msize;
606 client->msg_complete = GNUNET_NO;
607 client->receiver_handler = NULL;
608 check_complete (client);
609 if (NULL != handler)
610 handler (handler_cls, msg);
611}
612
613
614/**
615 * Read from the service.
616 *
617 * @param client the service
618 * @param handler function to call with the message
619 * @param handler_cls closure for @a handler
620 * @param timeout how long to wait until timing out
621 */
622void
623GNUNET_CLIENT_receive (struct GNUNET_CLIENT_Connection *client,
624 GNUNET_CLIENT_MessageHandler handler,
625 void *handler_cls,
626 struct GNUNET_TIME_Relative timeout)
627{
628 if (NULL == client->connection)
629 {
630 /* already disconnected, fail instantly! */
631 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
632 "Client API violation for service `%s'\n",
633 client->service_name);
634 GNUNET_break (0); /* this should not happen in well-written code! */
635 if (NULL != handler)
636 handler (handler_cls,
637 NULL);
638 return;
639 }
640 client->receiver_handler = handler;
641 client->receiver_handler_cls = handler_cls;
642 client->receive_timeout = GNUNET_TIME_relative_to_absolute (timeout);
643 if ( (GNUNET_YES == client->msg_complete) ||
644 (GNUNET_SYSERR == client->in_receive) )
645 {
646 GNUNET_assert (NULL == client->receive_task);
647 client->receive_task = GNUNET_SCHEDULER_add_now (&receive_task,
648 client);
649 return;
650 }
651 LOG (GNUNET_ERROR_TYPE_DEBUG,
652 "calling GNUNET_CONNECTION_receive\n");
653 GNUNET_assert (GNUNET_NO == client->in_receive);
654 client->in_receive = GNUNET_YES;
655 GNUNET_CONNECTION_receive (client->connection,
656 GNUNET_SERVER_MAX_MESSAGE_SIZE - 1,
657 timeout,
658 &receive_helper,
659 client);
660}
661
662
663/**
664 * Handle for a test to check if a service is running.
665 */
666struct GNUNET_CLIENT_TestHandle
667{
668 /**
669 * Function to call with the result of the test.
670 */
671 GNUNET_CLIENT_TestResultCallback cb;
672
673 /**
674 * Closure for @e cb.
675 */
676 void *cb_cls;
677
678 /**
679 * Client connection we are using for the test, if any.
680 */
681 struct GNUNET_CLIENT_Connection *client;
682
683 /**
684 * Handle for the transmission request, if any.
685 */
686 struct GNUNET_CLIENT_TransmitHandle *th;
687
688 /**
689 * Deadline for calling @e cb.
690 */
691 struct GNUNET_TIME_Absolute test_deadline;
692
693 /**
694 * ID of task used for asynchronous operations.
695 */
696 struct GNUNET_SCHEDULER_Task *task;
697
698 /**
699 * Final result to report back (once known).
700 */
701 int result;
702};
703
704
705/**
706 * Abort testing for service.
707 *
708 * @param th test handle
709 */
710void
711GNUNET_CLIENT_service_test_cancel (struct GNUNET_CLIENT_TestHandle *th)
712{
713 if (NULL != th->th)
714 {
715 GNUNET_CLIENT_notify_transmit_ready_cancel (th->th);
716 th->th = NULL;
717 }
718 if (NULL != th->client)
719 {
720 GNUNET_CLIENT_disconnect (th->client);
721 th->client = NULL;
722 }
723 if (NULL != th->task)
724 {
725 GNUNET_SCHEDULER_cancel (th->task);
726 th->task = NULL;
727 }
728 GNUNET_free (th);
729}
730
731
732/**
733 * Task that reports back the result by calling the callback
734 * and then cleans up.
735 *
736 * @param cls the `struct GNUNET_CLIENT_TestHandle`
737 */
738static void
739report_result (void *cls)
740{
741 struct GNUNET_CLIENT_TestHandle *th = cls;
742
743 th->task = NULL;
744 th->cb (th->cb_cls, th->result);
745 GNUNET_CLIENT_service_test_cancel (th);
746}
747
748
749/**
750 * Report service test result asynchronously back to callback.
751 *
752 * @param th test handle with the result and the callback
753 * @param result result to report
754 */
755static void
756service_test_report (struct GNUNET_CLIENT_TestHandle *th,
757 int result)
758{
759 th->result = result;
760 th->task = GNUNET_SCHEDULER_add_now (&report_result,
761 th);
762}
763
764
765/**
766 * Receive confirmation from test, service is up.
767 *
768 * @param cls closure with the `struct GNUNET_CLIENT_TestHandle`
769 * @param msg message received, NULL on timeout or fatal error
770 */
771static void
772confirm_handler (void *cls,
773 const struct GNUNET_MessageHeader *msg)
774{
775 struct GNUNET_CLIENT_TestHandle *th = cls;
776
777 /* We may want to consider looking at the reply in more
778 * detail in the future, for example, is this the
779 * correct service? FIXME! */
780 if (NULL != msg)
781 {
782 LOG (GNUNET_ERROR_TYPE_DEBUG,
783 "Received confirmation that service is running.\n");
784 service_test_report (th, GNUNET_YES);
785 }
786 else
787 {
788 service_test_report (th, GNUNET_NO);
789 }
790}
791
792
793/**
794 * Send the 'TEST' message to the service. If successful, prepare to
795 * receive the reply.
796 *
797 * @param cls the `struct GNUNET_CLIENT_TestHandle` of the test
798 * @param size number of bytes available in @a buf
799 * @param buf where to write the message
800 * @return number of bytes written to @a buf
801 */
802static size_t
803write_test (void *cls, size_t size, void *buf)
804{
805 struct GNUNET_CLIENT_TestHandle *th = cls;
806 struct GNUNET_MessageHeader *msg;
807
808 th->th = NULL;
809 if (size < sizeof (struct GNUNET_MessageHeader))
810 {
811 LOG (GNUNET_ERROR_TYPE_DEBUG,
812 "Failed to transmit TEST request.\n");
813 service_test_report (th, GNUNET_NO);
814 return 0; /* client disconnected */
815 }
816 LOG (GNUNET_ERROR_TYPE_DEBUG,
817 "Transmitting `%s' request.\n",
818 "TEST");
819 msg = (struct GNUNET_MessageHeader *) buf;
820 msg->type = htons (GNUNET_MESSAGE_TYPE_TEST);
821 msg->size = htons (sizeof (struct GNUNET_MessageHeader));
822 GNUNET_CLIENT_receive (th->client,
823 &confirm_handler, th,
824 GNUNET_TIME_absolute_get_remaining
825 (th->test_deadline));
826 return sizeof (struct GNUNET_MessageHeader);
827}
828
829
830/**
831 * Test if the service is running. If we are given a UNIXPATH or a
832 * local address, we do this NOT by trying to connect to the service,
833 * but by trying to BIND to the same port. If the BIND fails, we know
834 * the service is running.
835 *
836 * @param service name of the service to wait for
837 * @param cfg configuration to use
838 * @param timeout how long to wait at most
839 * @param cb function to call with the result
840 * @param cb_cls closure for @a cb
841 * @return handle to cancel the test
842 */
843struct GNUNET_CLIENT_TestHandle *
844GNUNET_CLIENT_service_test (const char *service,
845 const struct GNUNET_CONFIGURATION_Handle *cfg,
846 struct GNUNET_TIME_Relative timeout,
847 GNUNET_CLIENT_TestResultCallback cb,
848 void *cb_cls)
849{
850 struct GNUNET_CLIENT_TestHandle *th;
851 char *hostname;
852 unsigned long long port;
853 struct GNUNET_NETWORK_Handle *sock;
854
855 th = GNUNET_new (struct GNUNET_CLIENT_TestHandle);
856 th->cb = cb;
857 th->cb_cls = cb_cls;
858 th->test_deadline = GNUNET_TIME_relative_to_absolute (timeout);
859 LOG (GNUNET_ERROR_TYPE_DEBUG,
860 "Testing if service `%s' is running.\n",
861 service);
862#ifdef AF_UNIX
863 {
864 /* probe UNIX support */
865 struct sockaddr_un s_un;
866 char *unixpath;
867 int abstract;
868
869 unixpath = NULL;
870 if ((GNUNET_OK ==
871 GNUNET_CONFIGURATION_get_value_filename (cfg,
872 service,
873 "UNIXPATH",
874 &unixpath)) &&
875 (0 < strlen (unixpath))) /* We have a non-NULL unixpath, does that mean it's valid? */
876 {
877 if (strlen (unixpath) >= sizeof (s_un.sun_path))
878 {
879 LOG (GNUNET_ERROR_TYPE_WARNING,
880 _("UNIXPATH `%s' too long, maximum length is %llu\n"),
881 unixpath,
882 (unsigned long long) sizeof (s_un.sun_path));
883 unixpath = GNUNET_NETWORK_shorten_unixpath (unixpath);
884 LOG (GNUNET_ERROR_TYPE_INFO,
885 _("Using `%s' instead\n"), unixpath);
886 }
887 }
888#ifdef LINUX
889 abstract = GNUNET_CONFIGURATION_get_value_yesno (cfg,
890 "TESTING",
891 "USE_ABSTRACT_SOCKETS");
892#else
893 abstract = GNUNET_NO;
894#endif
895 if ((NULL != unixpath) && (GNUNET_YES != abstract))
896 {
897 if (GNUNET_SYSERR == GNUNET_DISK_directory_create_for_file (unixpath))
898 GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
899 "mkdir", unixpath);
900 }
901 if (NULL != unixpath)
902 {
903 sock = GNUNET_NETWORK_socket_create (PF_UNIX, SOCK_STREAM, 0);
904 if (NULL != sock)
905 {
906 memset (&s_un, 0, sizeof (s_un));
907 s_un.sun_family = AF_UNIX;
908 strncpy (s_un.sun_path, unixpath, sizeof (s_un.sun_path) - 1);
909 if (GNUNET_YES == abstract)
910 s_un.sun_path[0] = '\0';
911#if HAVE_SOCKADDR_IN_SIN_LEN
912 s_un.sun_len = (u_char) sizeof (struct sockaddr_un);
913#endif
914 if (GNUNET_OK !=
915 GNUNET_NETWORK_socket_bind (sock, (const struct sockaddr *) &s_un,
916 sizeof (struct sockaddr_un)))
917 {
918 /* failed to bind => service must be running */
919 GNUNET_free (unixpath);
920 (void) GNUNET_NETWORK_socket_close (sock);
921 service_test_report (th, GNUNET_YES);
922 return th;
923 }
924 (void) GNUNET_NETWORK_socket_close (sock);
925 /* let's try IP */
926 }
927 }
928 GNUNET_free_non_null (unixpath);
929 }
930#endif
931
932 hostname = NULL;
933 if ((GNUNET_OK !=
934 GNUNET_CONFIGURATION_get_value_number (cfg, service, "PORT", &port)) ||
935 (port > 65535) ||
936 (GNUNET_OK !=
937 GNUNET_CONFIGURATION_get_value_string (cfg, service, "HOSTNAME",
938 &hostname)))
939 {
940 /* UNIXPATH failed (if possible) AND IP failed => error */
941 service_test_report (th, GNUNET_SYSERR);
942 return th;
943 }
944
945 if (0 == strcmp ("localhost", hostname)
946#if !LINUX
947 && 0
948#endif
949 )
950 {
951 /* can test using 'bind' */
952 struct sockaddr_in s_in;
953
954 memset (&s_in, 0, sizeof (s_in));
955#if HAVE_SOCKADDR_IN_SIN_LEN
956 s_in.sin_len = sizeof (struct sockaddr_in);
957#endif
958 s_in.sin_family = AF_INET;
959 s_in.sin_port = htons (port);
960
961 sock = GNUNET_NETWORK_socket_create (AF_INET, SOCK_STREAM, 0);
962 if (NULL != sock)
963 {
964 if (GNUNET_OK !=
965 GNUNET_NETWORK_socket_bind (sock, (const struct sockaddr *) &s_in,
966 sizeof (s_in)))
967 {
968 /* failed to bind => service must be running */
969 GNUNET_free (hostname);
970 (void) GNUNET_NETWORK_socket_close (sock);
971 service_test_report (th, GNUNET_YES);
972 return th;
973 }
974 (void) GNUNET_NETWORK_socket_close (sock);
975 }
976 }
977
978 if (0 == strcmp ("ip6-localhost", hostname)
979#if !LINUX
980 && 0
981#endif
982 )
983 {
984 /* can test using 'bind' */
985 struct sockaddr_in6 s_in6;
986
987 memset (&s_in6, 0, sizeof (s_in6));
988#if HAVE_SOCKADDR_IN_SIN_LEN
989 s_in6.sin6_len = sizeof (struct sockaddr_in6);
990#endif
991 s_in6.sin6_family = AF_INET6;
992 s_in6.sin6_port = htons (port);
993
994 sock = GNUNET_NETWORK_socket_create (AF_INET6, SOCK_STREAM, 0);
995 if (NULL != sock)
996 {
997 if (GNUNET_OK !=
998 GNUNET_NETWORK_socket_bind (sock, (const struct sockaddr *) &s_in6,
999 sizeof (s_in6)))
1000 {
1001 /* failed to bind => service must be running */
1002 GNUNET_free (hostname);
1003 (void) GNUNET_NETWORK_socket_close (sock);
1004 service_test_report (th, GNUNET_YES);
1005 return th;
1006 }
1007 (void) GNUNET_NETWORK_socket_close (sock);
1008 }
1009 }
1010
1011 if (((0 == strcmp ("localhost", hostname)) ||
1012 (0 == strcmp ("ip6-localhost", hostname)))
1013#if !LINUX
1014 && 0
1015#endif
1016 )
1017 {
1018 /* all binds succeeded => claim service not running right now */
1019 GNUNET_free_non_null (hostname);
1020 service_test_report (th, GNUNET_NO);
1021 return th;
1022 }
1023 GNUNET_free_non_null (hostname);
1024
1025 /* non-localhost, try 'connect' method */
1026 th->client = GNUNET_CLIENT_connect (service, cfg);
1027 if (NULL == th->client)
1028 {
1029 LOG (GNUNET_ERROR_TYPE_INFO,
1030 _("Could not connect to service `%s', configuration broken.\n"),
1031 service);
1032 service_test_report (th, GNUNET_SYSERR);
1033 return th;
1034 }
1035 th->th = GNUNET_CLIENT_notify_transmit_ready (th->client,
1036 sizeof (struct GNUNET_MessageHeader),
1037 timeout, GNUNET_YES,
1038 &write_test, th);
1039 if (NULL == th->th)
1040 {
1041 LOG (GNUNET_ERROR_TYPE_WARNING,
1042 _("Failure to transmit request to service `%s'\n"), service);
1043 service_test_report (th, GNUNET_SYSERR);
1044 return th;
1045 }
1046 return th;
1047}
1048
1049
1050/**
1051 * Connection notifies us about failure or success of
1052 * a transmission request. Either pass it on to our
1053 * user or, if possible, retry.
1054 *
1055 * @param cls our `struct GNUNET_CLIENT_TransmissionHandle`
1056 * @param size number of bytes available for transmission
1057 * @param buf where to write them
1058 * @return number of bytes written to @a buf
1059 */
1060static size_t
1061client_notify (void *cls, size_t size, void *buf);
1062
1063
1064/**
1065 * This task is run if we should re-try connection to the
1066 * service after a while.
1067 *
1068 * @param cls our `struct GNUNET_CLIENT_TransmitHandle` of the request
1069 */
1070static void
1071client_delayed_retry (void *cls)
1072{
1073 struct GNUNET_CLIENT_TransmitHandle *th = cls;
1074 struct GNUNET_TIME_Relative delay;
1075
1076 th->reconnect_task = NULL;
1077 th->client->connection =
1078 do_connect (th->client->service_name,
1079 th->client->cfg,
1080 th->client->attempts++);
1081 th->client->first_message = GNUNET_YES;
1082 if (NULL == th->client->connection)
1083 {
1084 /* could happen if we're out of sockets */
1085 delay = GNUNET_TIME_relative_min (GNUNET_TIME_absolute_get_remaining (th->timeout),
1086 th->client->back_off);
1087 th->client->back_off = GNUNET_TIME_STD_BACKOFF (th->client->back_off);
1088 LOG (GNUNET_ERROR_TYPE_DEBUG,
1089 "Transmission failed %u times, trying again in %s.\n",
1090 MAX_ATTEMPTS - th->attempts_left,
1091 GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
1092 GNUNET_assert (NULL == th->th);
1093 GNUNET_assert (NULL == th->reconnect_task);
1094 th->reconnect_task =
1095 GNUNET_SCHEDULER_add_delayed (delay,
1096 &client_delayed_retry,
1097 th);
1098 return;
1099 }
1100 th->th =
1101 GNUNET_CONNECTION_notify_transmit_ready (th->client->connection, th->size,
1102 GNUNET_TIME_absolute_get_remaining
1103 (th->timeout),
1104 &client_notify,
1105 th);
1106 if (NULL == th->th)
1107 {
1108 GNUNET_break (0);
1109 th->client->th = NULL;
1110 th->notify (th->notify_cls, 0, NULL);
1111 GNUNET_free (th);
1112 return;
1113 }
1114}
1115
1116
1117/**
1118 * Connection notifies us about failure or success of a transmission
1119 * request. Either pass it on to our user or, if possible, retry.
1120 *
1121 * @param cls our `struct GNUNET_CLIENT_TransmissionHandle`
1122 * @param size number of bytes available for transmission
1123 * @param buf where to write them
1124 * @return number of bytes written to @a buf
1125 */
1126static size_t
1127client_notify (void *cls,
1128 size_t size,
1129 void *buf)
1130{
1131 struct GNUNET_CLIENT_TransmitHandle *th = cls;
1132 struct GNUNET_CLIENT_Connection *client = th->client;
1133 size_t ret;
1134 struct GNUNET_TIME_Relative delay;
1135
1136 LOG (GNUNET_ERROR_TYPE_DEBUG,
1137 "client_notify is running\n");
1138 th->th = NULL;
1139 client->th = NULL;
1140 if (NULL == buf)
1141 {
1142 delay = GNUNET_TIME_absolute_get_remaining (th->timeout);
1143 delay.rel_value_us /= 2;
1144 if ( (GNUNET_YES != th->auto_retry) ||
1145 (0 == --th->attempts_left) ||
1146 (delay.rel_value_us < 1) )
1147 {
1148 LOG (GNUNET_ERROR_TYPE_DEBUG,
1149 "Transmission failed %u times, giving up.\n",
1150 MAX_ATTEMPTS - th->attempts_left);
1151 GNUNET_break (0 ==
1152 th->notify (th->notify_cls, 0, NULL));
1153 GNUNET_free (th);
1154 return 0;
1155 }
1156 /* auto-retry */
1157 LOG (GNUNET_ERROR_TYPE_DEBUG,
1158 "Failed to connect to `%s', automatically trying again.\n",
1159 client->service_name);
1160 if (GNUNET_YES == client->in_receive)
1161 {
1162 GNUNET_CONNECTION_receive_cancel (client->connection);
1163 client->in_receive = GNUNET_NO;
1164 }
1165 GNUNET_CONNECTION_destroy (client->connection);
1166 client->connection = NULL;
1167 delay = GNUNET_TIME_relative_min (delay, client->back_off);
1168 client->back_off =
1169 GNUNET_TIME_relative_min (GNUNET_TIME_relative_multiply
1170 (client->back_off, 2),
1171 GNUNET_TIME_UNIT_SECONDS);
1172 LOG (GNUNET_ERROR_TYPE_DEBUG,
1173 "Transmission failed %u times, trying again in %s.\n",
1174 MAX_ATTEMPTS - th->attempts_left,
1175 GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
1176 client->th = th;
1177 GNUNET_assert (NULL == th->reconnect_task);
1178 GNUNET_assert (NULL == th->th);
1179 th->reconnect_task =
1180 GNUNET_SCHEDULER_add_delayed (delay,
1181 &client_delayed_retry,
1182 th);
1183 return 0;
1184 }
1185 GNUNET_assert (size >= th->size);
1186 ret = th->notify (th->notify_cls, size, buf);
1187 GNUNET_free (th);
1188 if (sizeof (struct GNUNET_MessageHeader) <= ret)
1189 {
1190 LOG (GNUNET_ERROR_TYPE_DEBUG,
1191 "Transmitting message of type %u and size %u to %s service.\n",
1192 ntohs (((struct GNUNET_MessageHeader *) buf)->type),
1193 ntohs (((struct GNUNET_MessageHeader *) buf)->size),
1194 client->service_name);
1195 }
1196 return ret;
1197}
1198
1199
1200/**
1201 * Ask the client to call us once the specified number of bytes
1202 * are free in the transmission buffer. Will never call the @a notify
1203 * callback in this task, but always first go into the scheduler.
1204 *
1205 * @param client connection to the service
1206 * @param size number of bytes to send
1207 * @param timeout after how long should we give up (and call
1208 * notify with buf NULL and size 0)?
1209 * @param auto_retry if the connection to the service dies, should we
1210 * automatically re-connect and retry (within the timeout period)
1211 * or should we immediately fail in this case? Pass GNUNET_YES
1212 * if the caller does not care about temporary connection errors,
1213 * for example because the protocol is stateless
1214 * @param notify function to call
1215 * @param notify_cls closure for @a notify
1216 * @return NULL if our buffer will never hold size bytes,
1217 * a handle if the notify callback was queued (can be used to cancel)
1218 */
1219struct GNUNET_CLIENT_TransmitHandle *
1220GNUNET_CLIENT_notify_transmit_ready (struct GNUNET_CLIENT_Connection *client,
1221 size_t size,
1222 struct GNUNET_TIME_Relative timeout,
1223 int auto_retry,
1224 GNUNET_CONNECTION_TransmitReadyNotify notify,
1225 void *notify_cls)
1226{
1227 struct GNUNET_CLIENT_TransmitHandle *th;
1228
1229 if (NULL != client->th)
1230 {
1231 /* If this breaks, you most likley called this function twice without waiting
1232 * for completion or canceling the request */
1233 GNUNET_assert (0);
1234 return NULL;
1235 }
1236 th = GNUNET_new (struct GNUNET_CLIENT_TransmitHandle);
1237 th->client = client;
1238 th->size = size;
1239 th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1240 /* always auto-retry on first message to service */
1241 th->auto_retry = (GNUNET_YES == client->first_message) ? GNUNET_YES : auto_retry;
1242 client->first_message = GNUNET_NO;
1243 th->notify = notify;
1244 th->notify_cls = notify_cls;
1245 th->attempts_left = MAX_ATTEMPTS;
1246 client->th = th;
1247 if (NULL == client->connection)
1248 {
1249 GNUNET_assert (NULL == th->th);
1250 GNUNET_assert (NULL == th->reconnect_task);
1251 th->reconnect_task =
1252 GNUNET_SCHEDULER_add_delayed (client->back_off,
1253 &client_delayed_retry,
1254 th);
1255 }
1256 else
1257 {
1258 th->th = GNUNET_CONNECTION_notify_transmit_ready (client->connection,
1259 size,
1260 timeout,
1261 &client_notify,
1262 th);
1263 if (NULL == th->th)
1264 {
1265 GNUNET_break (0);
1266 GNUNET_free (th);
1267 client->th = NULL;
1268 return NULL;
1269 }
1270 }
1271 return th;
1272}
1273
1274
1275/**
1276 * Cancel a request for notification.
1277 *
1278 * @param th handle from the original request.
1279 */
1280void
1281GNUNET_CLIENT_notify_transmit_ready_cancel (struct GNUNET_CLIENT_TransmitHandle *th)
1282{
1283 if (NULL != th->reconnect_task)
1284 {
1285 GNUNET_assert (NULL == th->th);
1286 GNUNET_SCHEDULER_cancel (th->reconnect_task);
1287 th->reconnect_task = NULL;
1288 }
1289 else
1290 {
1291 GNUNET_assert (NULL != th->th);
1292 GNUNET_CONNECTION_notify_transmit_ready_cancel (th->th);
1293 }
1294 th->client->th = NULL;
1295 GNUNET_free (th);
1296}
1297
1298
1299/* end of client.c */
diff --git a/src/util/mq.c b/src/util/mq.c
index ba947d5b8..193823c93 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -220,34 +220,6 @@ struct ServerClientSocketState
220 220
221 221
222/** 222/**
223 * Implementation-specific state for connection to
224 * service (MQ for clients).
225 */
226struct ClientConnectionState
227{
228 /**
229 * Did we call receive alread alreadyy?
230 */
231 int receive_active;
232
233 /**
234 * Do we also want to receive?
235 */
236 int receive_requested;
237
238 /**
239 * Connection to the service.
240 */
241 struct GNUNET_CLIENT_Connection *connection;
242
243 /**
244 * Active transmission request (or NULL).
245 */
246 struct GNUNET_CLIENT_TransmitHandle *th;
247};
248
249
250/**
251 * Call the message message handler that was registered 223 * Call the message message handler that was registered
252 * for the type of the given message in the given message queue. 224 * for the type of the given message in the given message queue.
253 * 225 *
@@ -775,175 +747,6 @@ GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client)
775 747
776 748
777/** 749/**
778 * Type of a function to call when we receive a message
779 * from the service.
780 *
781 * @param cls closure
782 * @param msg message received, NULL on timeout or fatal error
783 */
784static void
785handle_client_message (void *cls,
786 const struct GNUNET_MessageHeader *msg)
787{
788 struct GNUNET_MQ_Handle *mq = cls;
789 struct ClientConnectionState *state;
790
791 state = mq->impl_state;
792 if (NULL == msg)
793 {
794 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
795 return;
796 }
797 GNUNET_CLIENT_receive (state->connection,
798 &handle_client_message,
799 mq,
800 GNUNET_TIME_UNIT_FOREVER_REL);
801 GNUNET_MQ_inject_message (mq, msg);
802}
803
804
805/**
806 * Transmit a queued message to the session's client.
807 *
808 * @param cls consensus session
809 * @param size number of bytes available in @a buf
810 * @param buf where the callee should write the message
811 * @return number of bytes written to buf
812 */
813static size_t
814connection_client_transmit_queued (void *cls,
815 size_t size,
816 void *buf)
817{
818 struct GNUNET_MQ_Handle *mq = cls;
819 const struct GNUNET_MessageHeader *msg;
820 struct ClientConnectionState *state = mq->impl_state;
821 size_t msg_size;
822
823 GNUNET_assert (NULL != mq);
824 state->th = NULL;
825 msg = GNUNET_MQ_impl_current (mq);
826
827 if (NULL == buf)
828 {
829 GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
830 return 0;
831 }
832
833 if ( (GNUNET_YES == state->receive_requested) &&
834 (GNUNET_NO == state->receive_active) )
835 {
836 state->receive_active = GNUNET_YES;
837 GNUNET_CLIENT_receive (state->connection,
838 &handle_client_message,
839 mq,
840 GNUNET_TIME_UNIT_FOREVER_REL);
841 }
842
843 msg_size = ntohs (msg->size);
844 GNUNET_assert (size >= msg_size);
845 GNUNET_memcpy (buf, msg, msg_size);
846 state->th = NULL;
847
848 GNUNET_MQ_impl_send_continue (mq);
849
850 return msg_size;
851}
852
853
854static void
855connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq,
856 void *impl_state)
857{
858 struct ClientConnectionState *state = impl_state;
859
860 if (NULL != state->th)
861 {
862 GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
863 state->th = NULL;
864 }
865 GNUNET_CLIENT_disconnect (state->connection);
866 GNUNET_free (impl_state);
867}
868
869
870static void
871connection_client_send_impl (struct GNUNET_MQ_Handle *mq,
872 const struct GNUNET_MessageHeader *msg,
873 void *impl_state)
874{
875 struct ClientConnectionState *state = impl_state;
876
877 GNUNET_assert (NULL != state);
878 GNUNET_assert (NULL == state->th);
879 state->th =
880 GNUNET_CLIENT_notify_transmit_ready (state->connection,
881 ntohs (msg->size),
882 GNUNET_TIME_UNIT_FOREVER_REL,
883 GNUNET_NO,
884 &connection_client_transmit_queued,
885 mq);
886 GNUNET_assert (NULL != state->th);
887}
888
889
890static void
891connection_client_cancel_impl (struct GNUNET_MQ_Handle *mq,
892 void *impl_state)
893{
894 struct ClientConnectionState *state = impl_state;
895
896 if (NULL != state->th)
897 {
898 GNUNET_CLIENT_notify_transmit_ready_cancel (state->th);
899 state->th = NULL;
900 }
901 else if (NULL != mq->send_task)
902 {
903 GNUNET_SCHEDULER_cancel (mq->send_task);
904 mq->send_task = NULL;
905 }
906 else
907 GNUNET_assert (0);
908}
909
910
911struct GNUNET_MQ_Handle *
912GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
913 const struct GNUNET_MQ_MessageHandler *handlers,
914 GNUNET_MQ_ErrorHandler error_handler,
915 void *error_handler_cls)
916{
917 struct GNUNET_MQ_Handle *mq;
918 struct ClientConnectionState *state;
919 unsigned int i;
920
921 mq = GNUNET_new (struct GNUNET_MQ_Handle);
922 if (NULL != handlers)
923 {
924 for (i=0;NULL != handlers[i].cb; i++) ;
925 mq->handlers = GNUNET_new_array (i + 1,
926 struct GNUNET_MQ_MessageHandler);
927 GNUNET_memcpy (mq->handlers,
928 handlers,
929 i * sizeof (struct GNUNET_MQ_MessageHandler));
930 }
931 mq->error_handler = error_handler;
932 mq->error_handler_cls = error_handler_cls;
933 state = GNUNET_new (struct ClientConnectionState);
934 state->connection = connection;
935 mq->impl_state = state;
936 mq->send_impl = &connection_client_send_impl;
937 mq->destroy_impl = &connection_client_destroy_impl;
938 mq->cancel_impl = &connection_client_cancel_impl;
939 if (NULL != handlers)
940 state->receive_requested = GNUNET_YES;
941
942 return mq;
943}
944
945
946/**
947 * Associate the assoc_data in mq with a unique request id. 750 * Associate the assoc_data in mq with a unique request id.
948 * 751 *
949 * @param mq message queue, id will be unique for the queue 752 * @param mq message queue, id will be unique for the queue