aboutsummaryrefslogtreecommitdiff
path: root/src/dht
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-09-26 18:53:28 +0000
committerChristian Grothoff <christian@grothoff.org>2016-09-26 18:53:28 +0000
commit60ff113fe4e7bb71d5696063b9a9b81eba60a108 (patch)
tree8f3ed8cc47be49299a01d8ce1385f012bf19043b /src/dht
parent808e4a55f8ac3516a766873502b9bb020ef3d066 (diff)
downloadgnunet-60ff113fe4e7bb71d5696063b9a9b81eba60a108.tar.gz
gnunet-60ff113fe4e7bb71d5696063b9a9b81eba60a108.zip
refactor DHT for new service API
Diffstat (limited to 'src/dht')
-rw-r--r--src/dht/Makefile.am1
-rw-r--r--src/dht/gnunet-service-dht.c61
-rw-r--r--src/dht/gnunet-service-dht.h113
-rw-r--r--src/dht/gnunet-service-dht_clients.c1099
-rw-r--r--src/dht/gnunet-service-dht_clients.h153
-rw-r--r--src/dht/gnunet-service-dht_datacache.c1
-rw-r--r--src/dht/gnunet-service-dht_neighbours.c3
7 files changed, 642 insertions, 789 deletions
diff --git a/src/dht/Makefile.am b/src/dht/Makefile.am
index f44721094..f7dc5df6d 100644
--- a/src/dht/Makefile.am
+++ b/src/dht/Makefile.am
@@ -63,7 +63,6 @@ noinst_PROGRAMS = \
63 63
64gnunet_service_dht_SOURCES = \ 64gnunet_service_dht_SOURCES = \
65 gnunet-service-dht.c gnunet-service-dht.h \ 65 gnunet-service-dht.c gnunet-service-dht.h \
66 gnunet-service-dht_clients.c gnunet-service-dht_clients.h \
67 gnunet-service-dht_datacache.c gnunet-service-dht_datacache.h \ 66 gnunet-service-dht_datacache.c gnunet-service-dht_datacache.h \
68 gnunet-service-dht_hello.c gnunet-service-dht_hello.h \ 67 gnunet-service-dht_hello.c gnunet-service-dht_hello.h \
69 gnunet-service-dht_nse.c gnunet-service-dht_nse.h \ 68 gnunet-service-dht_nse.c gnunet-service-dht_nse.h \
diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c
index 1e3dd339d..a2ba2e8b0 100644
--- a/src/dht/gnunet-service-dht.c
+++ b/src/dht/gnunet-service-dht.c
@@ -33,7 +33,6 @@
33#include "gnunet_dht_service.h" 33#include "gnunet_dht_service.h"
34#include "gnunet_statistics_service.h" 34#include "gnunet_statistics_service.h"
35#include "gnunet-service-dht.h" 35#include "gnunet-service-dht.h"
36#include "gnunet-service-dht_clients.h"
37#include "gnunet-service-dht_datacache.h" 36#include "gnunet-service-dht_datacache.h"
38#include "gnunet-service-dht_hello.h" 37#include "gnunet-service-dht_hello.h"
39#include "gnunet-service-dht_neighbours.h" 38#include "gnunet-service-dht_neighbours.h"
@@ -47,6 +46,11 @@
47struct GNUNET_STATISTICS_Handle *GDS_stats; 46struct GNUNET_STATISTICS_Handle *GDS_stats;
48 47
49/** 48/**
49 * Handle for the service.
50 */
51struct GNUNET_SERVICE_Handle *GDS_service;
52
53/**
50 * Our handle to the BLOCK library. 54 * Our handle to the BLOCK library.
51 */ 55 */
52struct GNUNET_BLOCK_Context *GDS_block_context; 56struct GNUNET_BLOCK_Context *GDS_block_context;
@@ -57,11 +61,6 @@ struct GNUNET_BLOCK_Context *GDS_block_context;
57const struct GNUNET_CONFIGURATION_Handle *GDS_cfg; 61const struct GNUNET_CONFIGURATION_Handle *GDS_cfg;
58 62
59/** 63/**
60 * Handle to our server.
61 */
62struct GNUNET_SERVER_Handle *GDS_server;
63
64/**
65 * Our HELLO 64 * Our HELLO
66 */ 65 */
67struct GNUNET_MessageHeader *GDS_my_hello; 66struct GNUNET_MessageHeader *GDS_my_hello;
@@ -77,6 +76,10 @@ static struct GNUNET_TRANSPORT_HelloGetHandle *ghh;
77struct GNUNET_TIME_Relative hello_expiration; 76struct GNUNET_TIME_Relative hello_expiration;
78 77
79 78
79/* Code shared between different DHT implementations */
80#include "gnunet-service-dht_clients.c"
81
82
80/** 83/**
81 * Receive the HELLO from transport service, free current and replace 84 * Receive the HELLO from transport service, free current and replace
82 * if necessary. 85 * if necessary.
@@ -90,7 +93,9 @@ process_hello (void *cls,
90{ 93{
91 GNUNET_free_non_null (GDS_my_hello); 94 GNUNET_free_non_null (GDS_my_hello);
92 GDS_my_hello = GNUNET_malloc (ntohs (message->size)); 95 GDS_my_hello = GNUNET_malloc (ntohs (message->size));
93 GNUNET_memcpy (GDS_my_hello, message, ntohs (message->size)); 96 GNUNET_memcpy (GDS_my_hello,
97 message,
98 ntohs (message->size));
94} 99}
95 100
96 101
@@ -133,17 +138,16 @@ shutdown_task (void *cls)
133 * Process dht requests. 138 * Process dht requests.
134 * 139 *
135 * @param cls closure 140 * @param cls closure
136 * @param server the initialized server
137 * @param c configuration to use 141 * @param c configuration to use
142 * @param service the initialized service
138 */ 143 */
139static void 144static void
140run (void *cls, 145run (void *cls,
141 struct GNUNET_SERVER_Handle *server, 146 const struct GNUNET_CONFIGURATION_Handle *c,
142 const struct GNUNET_CONFIGURATION_Handle *c) 147 struct GNUNET_SERVICE_Handle *service)
143{ 148{
144 GDS_cfg = c; 149 GDS_cfg = c;
145 GDS_server = server; 150 GDS_service = service;
146 GNUNET_SERVER_suspend (server);
147 if (GNUNET_OK != 151 if (GNUNET_OK !=
148 GNUNET_CONFIGURATION_get_value_time (c, 152 GNUNET_CONFIGURATION_get_value_time (c,
149 "transport", 153 "transport",
@@ -153,7 +157,10 @@ run (void *cls,
153 hello_expiration = GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION; 157 hello_expiration = GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION;
154 } 158 }
155 GDS_block_context = GNUNET_BLOCK_context_create (GDS_cfg); 159 GDS_block_context = GNUNET_BLOCK_context_create (GDS_cfg);
156 GDS_stats = GNUNET_STATISTICS_create ("dht", GDS_cfg); 160 GDS_stats = GNUNET_STATISTICS_create ("dht",
161 GDS_cfg);
162 GNUNET_SERVICE_suspend (GDS_service);
163 GDS_CLIENTS_init ();
157 GDS_ROUTING_init (); 164 GDS_ROUTING_init ();
158 GDS_NSE_init (); 165 GDS_NSE_init ();
159 GDS_DATACACHE_init (); 166 GDS_DATACACHE_init ();
@@ -172,28 +179,10 @@ run (void *cls,
172} 179}
173 180
174 181
175/** 182/* Finally, define the main method */
176 * The main function for the dht service. 183GDS_DHT_SERVICE_INIT(&run);
177 * 184
178 * @param argc number of arguments from the command line 185
179 * @param argv command line arguments 186
180 * @return 0 ok, 1 on error
181 */
182int
183main (int argc,
184 char *const *argv)
185{
186 int ret;
187
188 ret = (GNUNET_OK ==
189 GNUNET_SERVICE_run (argc,
190 argv,
191 "dht",
192 GNUNET_SERVICE_OPTION_NONE,
193 &run,
194 NULL)) ? 0 : 1;
195 GDS_CLIENTS_done ();
196 return ret;
197}
198 187
199/* end of gnunet-service-dht.c */ 188/* end of gnunet-service-dht.c */
diff --git a/src/dht/gnunet-service-dht.h b/src/dht/gnunet-service-dht.h
index 4684c2324..bc7a48b5a 100644
--- a/src/dht/gnunet-service-dht.h
+++ b/src/dht/gnunet-service-dht.h
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2011 GNUnet e.V. 3 Copyright (C) 2009-2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -29,6 +29,7 @@
29#include "gnunet_util_lib.h" 29#include "gnunet_util_lib.h"
30#include "gnunet_statistics_service.h" 30#include "gnunet_statistics_service.h"
31#include "gnunet_transport_service.h" 31#include "gnunet_transport_service.h"
32#include "gnunet_block_lib.h"
32 33
33#define DEBUG_DHT GNUNET_EXTRA_LOGGING 34#define DEBUG_DHT GNUNET_EXTRA_LOGGING
34 35
@@ -38,6 +39,11 @@
38extern const struct GNUNET_CONFIGURATION_Handle *GDS_cfg; 39extern const struct GNUNET_CONFIGURATION_Handle *GDS_cfg;
39 40
40/** 41/**
42 * Handle for the service.
43 */
44extern struct GNUNET_SERVICE_Handle *GDS_service;
45
46/**
41 * Our handle to the BLOCK library. 47 * Our handle to the BLOCK library.
42 */ 48 */
43extern struct GNUNET_BLOCK_Context *GDS_block_context; 49extern struct GNUNET_BLOCK_Context *GDS_block_context;
@@ -48,14 +54,111 @@ extern struct GNUNET_BLOCK_Context *GDS_block_context;
48extern struct GNUNET_STATISTICS_Handle *GDS_stats; 54extern struct GNUNET_STATISTICS_Handle *GDS_stats;
49 55
50/** 56/**
51 * Handle to our server. 57 * Our HELLO
52 */ 58 */
53extern struct GNUNET_SERVER_Handle *GDS_server; 59extern struct GNUNET_MessageHeader *GDS_my_hello;
60
61
54 62
55/** 63/**
56 * Our HELLO 64 * Handle a reply we've received from another peer. If the reply
65 * matches any of our pending queries, forward it to the respective
66 * client(s).
67 *
68 * @param expiration when will the reply expire
69 * @param key the query this reply is for
70 * @param get_path_length number of peers in @a get_path
71 * @param get_path path the reply took on get
72 * @param put_path_length number of peers in @a put_path
73 * @param put_path path the reply took on put
74 * @param type type of the reply
75 * @param data_size number of bytes in @a data
76 * @param data application payload data
57 */ 77 */
58extern struct GNUNET_MessageHeader *GDS_my_hello; 78void
79GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
80 const struct GNUNET_HashCode *key,
81 unsigned int get_path_length,
82 const struct GNUNET_PeerIdentity *get_path,
83 unsigned int put_path_length,
84 const struct GNUNET_PeerIdentity *put_path,
85 enum GNUNET_BLOCK_Type type, size_t data_size,
86 const void *data);
87
59 88
89/**
90 * Check if some client is monitoring GET messages and notify
91 * them in that case.
92 *
93 * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
94 * @param type The type of data in the request.
95 * @param hop_count Hop count so far.
96 * @param path_length number of entries in path (or 0 if not recorded).
97 * @param path peers on the GET path (or NULL if not recorded).
98 * @param desired_replication_level Desired replication level.
99 * @param key Key of the requested data.
100 */
101void
102GDS_CLIENTS_process_get (uint32_t options,
103 enum GNUNET_BLOCK_Type type,
104 uint32_t hop_count,
105 uint32_t desired_replication_level,
106 unsigned int path_length,
107 const struct GNUNET_PeerIdentity *path,
108 const struct GNUNET_HashCode *key);
109
110
111/**
112 * Check if some client is monitoring GET RESP messages and notify
113 * them in that case.
114 *
115 * @param type The type of data in the result.
116 * @param get_path Peers on GET path (or NULL if not recorded).
117 * @param get_path_length number of entries in @a get_path.
118 * @param put_path peers on the PUT path (or NULL if not recorded).
119 * @param put_path_length number of entries in @a get_path.
120 * @param exp Expiration time of the data.
121 * @param key Key of the @a data.
122 * @param data Pointer to the result data.
123 * @param size Number of bytes in @a data.
124 */
125void
126GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
127 const struct GNUNET_PeerIdentity *get_path,
128 unsigned int get_path_length,
129 const struct GNUNET_PeerIdentity *put_path,
130 unsigned int put_path_length,
131 struct GNUNET_TIME_Absolute exp,
132 const struct GNUNET_HashCode * key,
133 const void *data,
134 size_t size);
135
136
137/**
138 * Check if some client is monitoring PUT messages and notify
139 * them in that case.
140 *
141 * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
142 * @param type The type of data in the request.
143 * @param hop_count Hop count so far.
144 * @param path_length number of entries in path (or 0 if not recorded).
145 * @param path peers on the PUT path (or NULL if not recorded).
146 * @param desired_replication_level Desired replication level.
147 * @param exp Expiration time of the data.
148 * @param key Key under which data is to be stored.
149 * @param data Pointer to the data carried.
150 * @param size Number of bytes in data.
151 */
152void
153GDS_CLIENTS_process_put (uint32_t options,
154 enum GNUNET_BLOCK_Type type,
155 uint32_t hop_count,
156 uint32_t desired_replication_level,
157 unsigned int path_length,
158 const struct GNUNET_PeerIdentity *path,
159 struct GNUNET_TIME_Absolute exp,
160 const struct GNUNET_HashCode * key,
161 const void *data,
162 size_t size);
60 163
61#endif 164#endif
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c
index 9dbeef6bd..0e344b566 100644
--- a/src/dht/gnunet-service-dht_clients.c
+++ b/src/dht/gnunet-service-dht_clients.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2009, 2010, 2011 GNUnet e.V. 3 Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -30,7 +30,6 @@
30#include "gnunet_protocols.h" 30#include "gnunet_protocols.h"
31#include "gnunet_statistics_service.h" 31#include "gnunet_statistics_service.h"
32#include "gnunet-service-dht.h" 32#include "gnunet-service-dht.h"
33#include "gnunet-service-dht_clients.h"
34#include "gnunet-service-dht_datacache.h" 33#include "gnunet-service-dht_datacache.h"
35#include "gnunet-service-dht_neighbours.h" 34#include "gnunet-service-dht_neighbours.h"
36#include "dht.h" 35#include "dht.h"
@@ -43,70 +42,13 @@
43 42
44#define LOG(kind,...) GNUNET_log_from (kind, "dht-clients",__VA_ARGS__) 43#define LOG(kind,...) GNUNET_log_from (kind, "dht-clients",__VA_ARGS__)
45 44
46/**
47 * Linked list of messages to send to clients.
48 */
49struct PendingMessage
50{
51 /**
52 * Pointer to next item in the list
53 */
54 struct PendingMessage *next;
55
56 /**
57 * Pointer to previous item in the list
58 */
59 struct PendingMessage *prev;
60
61 /**
62 * Actual message to be sent, allocated at the end of the struct:
63 * // msg = (cast) &pm[1];
64 * // GNUNET_memcpy (&pm[1], data, len);
65 */
66 const struct GNUNET_MessageHeader *msg;
67
68};
69
70 45
71/** 46/**
72 * Struct containing information about a client, 47 * Struct containing information about a client,
73 * handle to connect to it, and any pending messages 48 * handle to connect to it, and any pending messages
74 * that need to be sent to it. 49 * that need to be sent to it.
75 */ 50 */
76struct ClientList 51struct ClientHandle;
77{
78 /**
79 * Linked list of active clients
80 */
81 struct ClientList *next;
82
83 /**
84 * Linked list of active clients
85 */
86 struct ClientList *prev;
87
88 /**
89 * The handle to this client
90 */
91 struct GNUNET_SERVER_Client *client_handle;
92
93 /**
94 * Handle to the current transmission request, NULL
95 * if none pending.
96 */
97 struct GNUNET_SERVER_TransmitHandle *transmit_handle;
98
99 /**
100 * Linked list of pending messages for this client
101 */
102 struct PendingMessage *pending_head;
103
104 /**
105 * Tail of linked list of pending messages for this client
106 */
107 struct PendingMessage *pending_tail;
108
109};
110 52
111 53
112/** 54/**
@@ -121,9 +63,19 @@ struct ClientQueryRecord
121 struct GNUNET_HashCode key; 63 struct GNUNET_HashCode key;
122 64
123 /** 65 /**
66 * Kept in a DLL with @e client.
67 */
68 struct ClientQueryRecord *next;
69
70 /**
71 * Kept in a DLL with @e client.
72 */
73 struct ClientQueryRecord *prev;
74
75 /**
124 * Client responsible for the request. 76 * Client responsible for the request.
125 */ 77 */
126 struct ClientList *client; 78 struct ClientHandle *ch;
127 79
128 /** 80 /**
129 * Extended query (see gnunet_block_lib.h), allocated at the end of this struct. 81 * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
@@ -193,22 +145,22 @@ struct ClientMonitorRecord
193 /** 145 /**
194 * Next element in DLL. 146 * Next element in DLL.
195 */ 147 */
196 struct ClientMonitorRecord *next; 148 struct ClientMonitorRecord *next;
197 149
198 /** 150 /**
199 * Previous element in DLL. 151 * Previous element in DLL.
200 */ 152 */
201 struct ClientMonitorRecord *prev; 153 struct ClientMonitorRecord *prev;
202 154
203 /** 155 /**
204 * Type of blocks that are of interest 156 * Type of blocks that are of interest
205 */ 157 */
206 enum GNUNET_BLOCK_Type type; 158 enum GNUNET_BLOCK_Type type;
207 159
208 /** 160 /**
209 * Key of data of interest, NULL for all. 161 * Key of data of interest, NULL for all.
210 */ 162 */
211 struct GNUNET_HashCode *key; 163 struct GNUNET_HashCode *key;
212 164
213 /** 165 /**
214 * Flag whether to notify about GET messages. 166 * Flag whether to notify about GET messages.
@@ -228,19 +180,39 @@ struct ClientMonitorRecord
228 /** 180 /**
229 * Client to notify of these requests. 181 * Client to notify of these requests.
230 */ 182 */
231 struct ClientList *client; 183 struct ClientHandle *ch;
232}; 184};
233 185
234 186
235/** 187/**
236 * List of active clients. 188 * Struct containing information about a client,
189 * handle to connect to it, and any pending messages
190 * that need to be sent to it.
237 */ 191 */
238static struct ClientList *client_head; 192struct ClientHandle
193{
194 /**
195 * Linked list of active queries of this client.
196 */
197 struct ClientQueryRecord *cqr_head;
198
199 /**
200 * Linked list of active queries of this client.
201 */
202 struct ClientQueryRecord *cqr_tail;
203
204 /**
205 * The handle to this client
206 */
207 struct GNUNET_SERVICE_Client *client;
208
209 /**
210 * The message queue to this client
211 */
212 struct GNUNET_MQ_Handle *mq;
213
214};
239 215
240/**
241 * List of active clients.
242 */
243static struct ClientList *client_tail;
244 216
245/** 217/**
246 * List of active monitoring requests. 218 * List of active monitoring requests.
@@ -265,88 +237,55 @@ static struct GNUNET_CONTAINER_Heap *retry_heap;
265/** 237/**
266 * Task that re-transmits requests (using retry_heap). 238 * Task that re-transmits requests (using retry_heap).
267 */ 239 */
268static struct GNUNET_SCHEDULER_Task * retry_task; 240static struct GNUNET_SCHEDULER_Task *retry_task;
269 241
270 242
271/** 243/**
272 * Task run to check for messages that need to be sent to a client. 244 * Free data structures associated with the given query.
273 * 245 *
274 * @param client a ClientList, containing the client and any messages to be sent to it 246 * @param record record to remove
275 */ 247 */
276static void 248static void
277process_pending_messages (struct ClientList *client); 249remove_client_record (struct ClientQueryRecord *record)
278
279
280/**
281 * Add a PendingMessage to the clients list of messages to be sent
282 *
283 * @param client the active client to send the message to
284 * @param pending_message the actual message to send
285 */
286static void
287add_pending_message (struct ClientList *client,
288 struct PendingMessage *pending_message)
289{ 250{
290 GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail, 251 struct ClientHandle *ch = record->ch;
291 pending_message);
292 process_pending_messages (client);
293}
294 252
295 253 GNUNET_CONTAINER_DLL_remove (ch->cqr_head,
296/** 254 ch->cqr_tail,
297 * Find a client if it exists, add it otherwise. 255 record);
298 * 256 GNUNET_assert (GNUNET_YES ==
299 * @param client the server handle to the client 257 GNUNET_CONTAINER_multihashmap_remove (forward_map,
300 * 258 &record->key,
301 * @return the client if found, a new client otherwise 259 record));
302 */ 260 if (NULL != record->hnode)
303static struct ClientList * 261 GNUNET_CONTAINER_heap_remove_node (record->hnode);
304find_active_client (struct GNUNET_SERVER_Client *client) 262 GNUNET_array_grow (record->seen_replies,
305{ 263 record->seen_replies_count,
306 struct ClientList *pos = client_head; 264 0);
307 struct ClientList *ret; 265 GNUNET_free (record);
308
309 while (pos != NULL)
310 {
311 if (pos->client_handle == client)
312 return pos;
313 pos = pos->next;
314 }
315 ret = GNUNET_new (struct ClientList);
316 ret->client_handle = client;
317 GNUNET_CONTAINER_DLL_insert (client_head, client_tail, ret);
318 return ret;
319} 266}
320 267
321 268
322/** 269/**
323 * Iterator over hash map entries that frees all entries 270 * Functions with this signature are called whenever a local client is
324 * associated with the given client. 271 * connects to us.
325 * 272 *
326 * @param cls client to search for in source routes 273 * @param cls closure (NULL for dht)
327 * @param key current key code (ignored) 274 * @param client identification of the client
328 * @param value value in the hash map, a ClientQueryRecord 275 * @param mq message queue for talking to @a client
329 * @return #GNUNET_YES (we should continue to iterate) 276 * @return our `struct ClientHandle` for @a client
330 */ 277 */
331static int 278static void *
332remove_client_records (void *cls, const struct GNUNET_HashCode * key, void *value) 279client_connect_cb (void *cls,
280 struct GNUNET_SERVICE_Client *client,
281 struct GNUNET_MQ_Handle *mq)
333{ 282{
334 struct ClientList *client = cls; 283 struct ClientHandle *ch;
335 struct ClientQueryRecord *record = value;
336 284
337 if (record->client != client) 285 ch = GNUNET_new (struct ClientHandle);
338 return GNUNET_YES; 286 ch->client = client;
339 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 287 ch->mq = mq;
340 "Removing client %p's record for key %s\n", client, 288 return ch;
341 GNUNET_h2s (key));
342 GNUNET_assert (GNUNET_YES ==
343 GNUNET_CONTAINER_multihashmap_remove (forward_map, key,
344 record));
345 if (NULL != record->hnode)
346 GNUNET_CONTAINER_heap_remove_node (record->hnode);
347 GNUNET_array_grow (record->seen_replies, record->seen_replies_count, 0);
348 GNUNET_free (record);
349 return GNUNET_YES;
350} 289}
351 290
352 291
@@ -355,48 +294,44 @@ remove_client_records (void *cls, const struct GNUNET_HashCode * key, void *valu
355 * is disconnected on the network level. 294 * is disconnected on the network level.
356 * 295 *
357 * @param cls closure (NULL for dht) 296 * @param cls closure (NULL for dht)
358 * @param client identification of the client; NULL 297 * @param client identification of the client
359 * for the last call when the server is destroyed 298 * @param app_ctx our `struct ClientHandle` for @a client
360 */ 299 */
361static void 300static void
362handle_client_disconnect (void *cls, 301client_disconnect_cb (void *cls,
363 struct GNUNET_SERVER_Client *client) 302 struct GNUNET_SERVICE_Client *client,
303 void *app_ctx)
364{ 304{
365 struct ClientList *pos; 305 struct ClientHandle *ch = app_ctx;
366 struct PendingMessage *reply; 306 struct ClientQueryRecord *cqr;
367 struct ClientMonitorRecord *monitor; 307 struct ClientMonitorRecord *monitor;
368 308
369 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 309 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
370 "Local client %p disconnects\n", 310 "Local client %p disconnects\n",
371 client); 311 ch);
372 pos = find_active_client (client);
373 GNUNET_CONTAINER_DLL_remove (client_head, client_tail, pos);
374 if (pos->transmit_handle != NULL)
375 GNUNET_SERVER_notify_transmit_ready_cancel (pos->transmit_handle);
376 while (NULL != (reply = pos->pending_head))
377 {
378 GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail, reply);
379 GNUNET_free (reply);
380 }
381 monitor = monitor_head; 312 monitor = monitor_head;
382 while (NULL != monitor) 313 while (NULL != monitor)
383 { 314 {
384 if (monitor->client == pos) 315 if (monitor->ch == ch)
385 { 316 {
386 struct ClientMonitorRecord *next; 317 struct ClientMonitorRecord *next;
387 318
388 GNUNET_free_non_null (monitor->key);
389 next = monitor->next; 319 next = monitor->next;
390 GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, monitor); 320 GNUNET_free_non_null (monitor->key);
321 GNUNET_CONTAINER_DLL_remove (monitor_head,
322 monitor_tail,
323 monitor);
391 GNUNET_free (monitor); 324 GNUNET_free (monitor);
392 monitor = next; 325 monitor = next;
393 } 326 }
394 else 327 else
328 {
395 monitor = monitor->next; 329 monitor = monitor->next;
330 }
396 } 331 }
397 GNUNET_CONTAINER_multihashmap_iterate (forward_map, &remove_client_records, 332 while (NULL != (cqr = ch->cqr_head))
398 pos); 333 remove_client_record (cqr);
399 GNUNET_free (pos); 334 GNUNET_free (ch);
400} 335}
401 336
402 337
@@ -413,27 +348,35 @@ transmit_request (struct ClientQueryRecord *cqr)
413 struct GNUNET_CONTAINER_BloomFilter *peer_bf; 348 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
414 349
415 GNUNET_STATISTICS_update (GDS_stats, 350 GNUNET_STATISTICS_update (GDS_stats,
416 gettext_noop 351 gettext_noop ("# GET requests from clients injected"),
417 ("# GET requests from clients injected"), 1, 352 1,
418 GNUNET_NO); 353 GNUNET_NO);
419 reply_bf_mutator = 354 reply_bf_mutator =
420 (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 355 (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
421 UINT32_MAX); 356 UINT32_MAX);
422 reply_bf = 357 reply_bf
423 GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator, cqr->seen_replies, 358 = GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator,
359 cqr->seen_replies,
424 cqr->seen_replies_count); 360 cqr->seen_replies_count);
425 peer_bf = 361 peer_bf
426 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, 362 = GNUNET_CONTAINER_bloomfilter_init (NULL,
363 DHT_BLOOM_SIZE,
427 GNUNET_CONSTANTS_BLOOMFILTER_K); 364 GNUNET_CONSTANTS_BLOOMFILTER_K);
428 LOG (GNUNET_ERROR_TYPE_DEBUG, 365 LOG (GNUNET_ERROR_TYPE_DEBUG,
429 "Initiating GET for %s, replication %u, already have %u replies\n", 366 "Initiating GET for %s, replication %u, already have %u replies\n",
430 GNUNET_h2s (&cqr->key), 367 GNUNET_h2s (&cqr->key),
431 cqr->replication, 368 cqr->replication,
432 cqr->seen_replies_count); 369 cqr->seen_replies_count);
433 GDS_NEIGHBOURS_handle_get (cqr->type, cqr->msg_options, cqr->replication, 370 GDS_NEIGHBOURS_handle_get (cqr->type,
371 cqr->msg_options,
372 cqr->replication,
434 0 /* hop count */ , 373 0 /* hop count */ ,
435 &cqr->key, cqr->xquery, cqr->xquery_size, reply_bf, 374 &cqr->key,
436 reply_bf_mutator, peer_bf); 375 cqr->xquery,
376 cqr->xquery_size,
377 reply_bf,
378 reply_bf_mutator,
379 peer_bf);
437 GNUNET_CONTAINER_bloomfilter_free (reply_bf); 380 GNUNET_CONTAINER_bloomfilter_free (reply_bf);
438 GNUNET_CONTAINER_bloomfilter_free (peer_bf); 381 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
439 382
@@ -445,7 +388,7 @@ transmit_request (struct ClientQueryRecord *cqr)
445 388
446 389
447/** 390/**
448 * Task that looks at the 'retry_heap' and transmits all of the requests 391 * Task that looks at the #retry_heap and transmits all of the requests
449 * on the heap that are ready for transmission. Then re-schedules 392 * on the heap that are ready for transmission. Then re-schedules
450 * itself (unless the heap is empty). 393 * itself (unless the heap is empty).
451 * 394 *
@@ -465,51 +408,62 @@ transmit_next_request_task (void *cls)
465 if (delay.rel_value_us > 0) 408 if (delay.rel_value_us > 0)
466 { 409 {
467 cqr->hnode = 410 cqr->hnode =
468 GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 411 GNUNET_CONTAINER_heap_insert (retry_heap,
412 cqr,
469 cqr->retry_time.abs_value_us); 413 cqr->retry_time.abs_value_us);
470 retry_task = 414 retry_task =
471 GNUNET_SCHEDULER_add_delayed (delay, &transmit_next_request_task, 415 GNUNET_SCHEDULER_add_delayed (delay,
416 &transmit_next_request_task,
472 NULL); 417 NULL);
473 return; 418 return;
474 } 419 }
475 transmit_request (cqr); 420 transmit_request (cqr);
476 cqr->hnode = 421 cqr->hnode
477 GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 422 = GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
478 cqr->retry_time.abs_value_us); 423 cqr->retry_time.abs_value_us);
479 } 424 }
480} 425}
481 426
482 427
483/** 428/**
429 * Check DHT PUT messages from the client.
430 *
431 * @param cls the client we received this message from
432 * @param dht_msg the actual message received
433 * @return #GNUNET_OK (always)
434 */
435static int
436check_dht_local_put (void *cls,
437 const struct GNUNET_DHT_ClientPutMessage *dht_msg)
438{
439 /* always well-formed */
440 return GNUNET_OK;
441}
442
443
444/**
484 * Handler for PUT messages. 445 * Handler for PUT messages.
485 * 446 *
486 * @param cls closure for the service 447 * @param cls the client we received this message from
487 * @param client the client we received this message from 448 * @param dht_msg the actual message received
488 * @param message the actual message received
489 */ 449 */
490static void 450static void
491handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, 451handle_dht_local_put (void *cls,
492 const struct GNUNET_MessageHeader *message) 452 const struct GNUNET_DHT_ClientPutMessage *dht_msg)
493{ 453{
494 const struct GNUNET_DHT_ClientPutMessage *dht_msg; 454 struct ClientHandle *ch = cls;
495 struct GNUNET_CONTAINER_BloomFilter *peer_bf; 455 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
496 uint16_t size; 456 uint16_t size;
497 struct PendingMessage *pm; 457 struct GNUNET_MQ_Envelope *env;
498 struct GNUNET_DHT_ClientPutConfirmationMessage *conf; 458 struct GNUNET_DHT_ClientPutConfirmationMessage *conf;
499 459
500 size = ntohs (message->size); 460 size = ntohs (dht_msg->header.size);
501 if (size < sizeof (struct GNUNET_DHT_ClientPutMessage))
502 {
503 GNUNET_break (0);
504 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
505 return;
506 }
507 GNUNET_STATISTICS_update (GDS_stats, 461 GNUNET_STATISTICS_update (GDS_stats,
508 gettext_noop 462 gettext_noop ("# PUT requests received from clients"),
509 ("# PUT requests received from clients"), 1, 463 1,
510 GNUNET_NO); 464 GNUNET_NO);
511 dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message; 465 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
512 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "R5N CLIENT-PUT %s\n", 466 "CLIENT-PUT %s\n",
513 GNUNET_h2s_full (&dht_msg->key)); 467 GNUNET_h2s_full (&dht_msg->key));
514 /* give to local clients */ 468 /* give to local clients */
515 LOG (GNUNET_ERROR_TYPE_DEBUG, 469 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -517,26 +471,38 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
517 size - sizeof (struct GNUNET_DHT_ClientPutMessage), 471 size - sizeof (struct GNUNET_DHT_ClientPutMessage),
518 GNUNET_h2s (&dht_msg->key)); 472 GNUNET_h2s (&dht_msg->key));
519 GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration), 473 GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
520 &dht_msg->key, 0, NULL, 0, NULL, 474 &dht_msg->key,
475 0,
476 NULL,
477 0,
478 NULL,
521 ntohl (dht_msg->type), 479 ntohl (dht_msg->type),
522 size - sizeof (struct GNUNET_DHT_ClientPutMessage), 480 size - sizeof (struct GNUNET_DHT_ClientPutMessage),
523 &dht_msg[1]); 481 &dht_msg[1]);
524 /* store locally */ 482 /* store locally */
525 GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration), 483 GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
526 &dht_msg->key, 0, NULL, ntohl (dht_msg->type), 484 &dht_msg->key,
485 0,
486 NULL,
487 ntohl (dht_msg->type),
527 size - sizeof (struct GNUNET_DHT_ClientPutMessage), 488 size - sizeof (struct GNUNET_DHT_ClientPutMessage),
528 &dht_msg[1]); 489 &dht_msg[1]);
529 /* route to other peers */ 490 /* route to other peers */
530 peer_bf = 491 peer_bf
531 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, 492 = GNUNET_CONTAINER_bloomfilter_init (NULL,
493 DHT_BLOOM_SIZE,
532 GNUNET_CONSTANTS_BLOOMFILTER_K); 494 GNUNET_CONSTANTS_BLOOMFILTER_K);
533 GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type), ntohl (dht_msg->options), 495 GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
496 ntohl (dht_msg->options),
534 ntohl (dht_msg->desired_replication_level), 497 ntohl (dht_msg->desired_replication_level),
535 GNUNET_TIME_absolute_ntoh (dht_msg->expiration), 498 GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
536 0 /* hop count */ , 499 0 /* hop count */ ,
537 peer_bf, &dht_msg->key, 0, NULL, &dht_msg[1], 500 peer_bf,
538 size - 501 &dht_msg->key,
539 sizeof (struct GNUNET_DHT_ClientPutMessage)); 502 0,
503 NULL,
504 &dht_msg[1],
505 size - sizeof (struct GNUNET_DHT_ClientPutMessage));
540 GDS_CLIENTS_process_put (ntohl (dht_msg->options), 506 GDS_CLIENTS_process_put (ntohl (dht_msg->options),
541 ntohl (dht_msg->type), 507 ntohl (dht_msg->type),
542 0, 508 0,
@@ -548,45 +514,50 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
548 &dht_msg[1], 514 &dht_msg[1],
549 size - sizeof (struct GNUNET_DHT_ClientPutMessage)); 515 size - sizeof (struct GNUNET_DHT_ClientPutMessage));
550 GNUNET_CONTAINER_bloomfilter_free (peer_bf); 516 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
551 pm = GNUNET_malloc (sizeof (struct PendingMessage) + 517 env = GNUNET_MQ_msg (conf,
552 sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage)); 518 GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK);
553 conf = (struct GNUNET_DHT_ClientPutConfirmationMessage *) &pm[1];
554 conf->header.size = htons (sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage));
555 conf->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK);
556 conf->reserved = htonl (0); 519 conf->reserved = htonl (0);
557 conf->unique_id = dht_msg->unique_id; 520 conf->unique_id = dht_msg->unique_id;
558 pm->msg = &conf->header; 521 GNUNET_MQ_send (ch->mq,
559 add_pending_message (find_active_client (client), pm); 522 env);
560 GNUNET_SERVER_receive_done (client, GNUNET_OK); 523 GNUNET_SERVICE_client_continue (ch->client);
524}
525
526
527/**
528 * Check DHT GET messages from the client.
529 *
530 * @param cls the client we received this message from
531 * @param message the actual message received
532 * @return #GNUNET_OK (always)
533 */
534static int
535check_dht_local_get (void *cls,
536 const struct GNUNET_DHT_ClientGetMessage *get)
537{
538 /* always well-formed */
539 return GNUNET_OK;
561} 540}
562 541
563 542
564/** 543/**
565 * Handler for DHT GET messages from the client. 544 * Handler for DHT GET messages from the client.
566 * 545 *
567 * @param cls closure for the service 546 * @param cls the client we received this message from
568 * @param client the client we received this message from
569 * @param message the actual message received 547 * @param message the actual message received
570 */ 548 */
571static void 549static void
572handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client, 550handle_dht_local_get (void *cls,
573 const struct GNUNET_MessageHeader *message) 551 const struct GNUNET_DHT_ClientGetMessage *get)
574{ 552{
575 const struct GNUNET_DHT_ClientGetMessage *get; 553 struct ClientHandle *ch = cls;
576 struct ClientQueryRecord *cqr; 554 struct ClientQueryRecord *cqr;
577 size_t xquery_size; 555 size_t xquery_size;
578 const char *xquery; 556 const char *xquery;
579 uint16_t size; 557 uint16_t size;
580 558
581 size = ntohs (message->size); 559 size = ntohs (get->header.size);
582 if (size < sizeof (struct GNUNET_DHT_ClientGetMessage))
583 {
584 GNUNET_break (0);
585 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
586 return;
587 }
588 xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage); 560 xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
589 get = (const struct GNUNET_DHT_ClientGetMessage *) message;
590 xquery = (const char *) &get[1]; 561 xquery = (const char *) &get[1];
591 GNUNET_STATISTICS_update (GDS_stats, 562 GNUNET_STATISTICS_update (GDS_stats,
592 gettext_noop 563 gettext_noop
@@ -594,15 +565,17 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
594 GNUNET_NO); 565 GNUNET_NO);
595 LOG (GNUNET_ERROR_TYPE_DEBUG, 566 LOG (GNUNET_ERROR_TYPE_DEBUG,
596 "Received GET request for %s from local client %p, xq: %.*s\n", 567 "Received GET request for %s from local client %p, xq: %.*s\n",
597 GNUNET_h2s (&get->key), client, xquery_size, xquery); 568 GNUNET_h2s (&get->key),
598 569 ch->client,
599 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "R5N CLIENT-GET %s\n", 570 xquery_size,
571 xquery);
572 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
573 "CLIENT-GET %s\n",
600 GNUNET_h2s_full (&get->key)); 574 GNUNET_h2s_full (&get->key));
601 575
602
603 cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size); 576 cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
604 cqr->key = get->key; 577 cqr->key = get->key;
605 cqr->client = find_active_client (client); 578 cqr->ch = ch;
606 cqr->xquery = (void *) &cqr[1]; 579 cqr->xquery = (void *) &cqr[1];
607 GNUNET_memcpy (&cqr[1], xquery, xquery_size); 580 GNUNET_memcpy (&cqr[1], xquery, xquery_size);
608 cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0); 581 cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0);
@@ -613,8 +586,12 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
613 cqr->replication = ntohl (get->desired_replication_level); 586 cqr->replication = ntohl (get->desired_replication_level);
614 cqr->msg_options = ntohl (get->options); 587 cqr->msg_options = ntohl (get->options);
615 cqr->type = ntohl (get->type); 588 cqr->type = ntohl (get->type);
616 // FIXME use cqr->key, set multihashmap create to GNUNET_YES 589 GNUNET_CONTAINER_DLL_insert (ch->cqr_head,
617 GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr, 590 ch->cqr_tail,
591 cqr);
592 GNUNET_CONTAINER_multihashmap_put (forward_map,
593 &cqr->key,
594 cqr,
618 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 595 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
619 GDS_CLIENTS_process_get (ntohl (get->options), 596 GDS_CLIENTS_process_get (ntohl (get->options),
620 ntohl (get->type), 597 ntohl (get->type),
@@ -635,13 +612,12 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
635 xquery_size, 612 xquery_size,
636 NULL, 613 NULL,
637 0); 614 0);
638 GNUNET_SERVER_receive_done (client, 615 GNUNET_SERVICE_client_continue (ch->client);
639 GNUNET_OK);
640} 616}
641 617
642 618
643/** 619/**
644 * Closure for 'find_by_unique_id'. 620 * Closure for #find_by_unique_id().
645 */ 621 */
646struct FindByUniqueIdContext 622struct FindByUniqueIdContext
647{ 623{
@@ -661,8 +637,8 @@ struct FindByUniqueIdContext
661 * 637 *
662 * @param cls the search context 638 * @param cls the search context
663 * @param key query for the lookup (not used) 639 * @param key query for the lookup (not used)
664 * @param value the 'struct ClientQueryRecord' 640 * @param value the `struct ClientQueryRecord`
665 * @return GNUNET_YES to continue iteration (result not yet found) 641 * @return #GNUNET_YES to continue iteration (result not yet found)
666 */ 642 */
667static int 643static int
668find_by_unique_id (void *cls, 644find_by_unique_id (void *cls,
@@ -680,17 +656,41 @@ find_by_unique_id (void *cls,
680 656
681 657
682/** 658/**
659 * Check "GET result seen" messages from the client.
660 *
661 * @param cls the client we received this message from
662 * @param message the actual message received
663 * @return #GNUNET_OK if @a seen is well-formed
664 */
665static int
666check_dht_local_get_result_seen (void *cls,
667 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
668{
669 uint16_t size;
670 unsigned int hash_count;
671
672 size = ntohs (seen->header.size);
673 hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
674 if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof (struct GNUNET_HashCode))
675 {
676 GNUNET_break (0);
677 return GNUNET_SYSERR;
678 }
679 return GNUNET_OK;
680}
681
682
683/**
683 * Handler for "GET result seen" messages from the client. 684 * Handler for "GET result seen" messages from the client.
684 * 685 *
685 * @param cls closure for the service 686 * @param cls the client we received this message from
686 * @param client the client we received this message from
687 * @param message the actual message received 687 * @param message the actual message received
688 */ 688 */
689static void 689static void
690handle_dht_local_get_result_seen (void *cls, struct GNUNET_SERVER_Client *client, 690handle_dht_local_get_result_seen (void *cls,
691 const struct GNUNET_MessageHeader *message) 691 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
692{ 692{
693 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen; 693 struct ClientHandle *ch = cls;
694 uint16_t size; 694 uint16_t size;
695 unsigned int hash_count; 695 unsigned int hash_count;
696 unsigned int old_count; 696 unsigned int old_count;
@@ -698,21 +698,8 @@ handle_dht_local_get_result_seen (void *cls, struct GNUNET_SERVER_Client *client
698 struct FindByUniqueIdContext fui_ctx; 698 struct FindByUniqueIdContext fui_ctx;
699 struct ClientQueryRecord *cqr; 699 struct ClientQueryRecord *cqr;
700 700
701 size = ntohs (message->size); 701 size = ntohs (seen->header.size);
702 if (size < sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage))
703 {
704 GNUNET_break (0);
705 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
706 return;
707 }
708 seen = (const struct GNUNET_DHT_ClientGetResultSeenMessage *) message;
709 hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode); 702 hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
710 if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof (struct GNUNET_HashCode))
711 {
712 GNUNET_break (0);
713 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
714 return;
715 }
716 hc = (const struct GNUNET_HashCode*) &seen[1]; 703 hc = (const struct GNUNET_HashCode*) &seen[1];
717 fui_ctx.unique_id = seen->unique_id; 704 fui_ctx.unique_id = seen->unique_id;
718 fui_ctx.cqr = NULL; 705 fui_ctx.cqr = NULL;
@@ -723,7 +710,7 @@ handle_dht_local_get_result_seen (void *cls, struct GNUNET_SERVER_Client *client
723 if (NULL == (cqr = fui_ctx.cqr)) 710 if (NULL == (cqr = fui_ctx.cqr))
724 { 711 {
725 GNUNET_break (0); 712 GNUNET_break (0);
726 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 713 GNUNET_SERVICE_client_drop (ch->client);
727 return; 714 return;
728 } 715 }
729 /* finally, update 'seen' list */ 716 /* finally, update 'seen' list */
@@ -732,20 +719,20 @@ handle_dht_local_get_result_seen (void *cls, struct GNUNET_SERVER_Client *client
732 cqr->seen_replies_count, 719 cqr->seen_replies_count,
733 cqr->seen_replies_count + hash_count); 720 cqr->seen_replies_count + hash_count);
734 GNUNET_memcpy (&cqr->seen_replies[old_count], 721 GNUNET_memcpy (&cqr->seen_replies[old_count],
735 hc, 722 hc,
736 sizeof (struct GNUNET_HashCode) * hash_count); 723 sizeof (struct GNUNET_HashCode) * hash_count);
737} 724}
738 725
739 726
740/** 727/**
741 * Closure for 'remove_by_unique_id'. 728 * Closure for #remove_by_unique_id().
742 */ 729 */
743struct RemoveByUniqueIdContext 730struct RemoveByUniqueIdContext
744{ 731{
745 /** 732 /**
746 * Client that issued the removal request. 733 * Client that issued the removal request.
747 */ 734 */
748 struct ClientList *client; 735 struct ClientHandle *ch;
749 736
750 /** 737 /**
751 * Unique ID of the request. 738 * Unique ID of the request.
@@ -761,20 +748,24 @@ struct RemoveByUniqueIdContext
761 * @param cls unique ID and client to search for in source routes 748 * @param cls unique ID and client to search for in source routes
762 * @param key current key code 749 * @param key current key code
763 * @param value value in the hash map, a ClientQueryRecord 750 * @param value value in the hash map, a ClientQueryRecord
764 * @return GNUNET_YES (we should continue to iterate) 751 * @return #GNUNET_YES (we should continue to iterate)
765 */ 752 */
766static int 753static int
767remove_by_unique_id (void *cls, const struct GNUNET_HashCode * key, void *value) 754remove_by_unique_id (void *cls,
755 const struct GNUNET_HashCode *key,
756 void *value)
768{ 757{
769 const struct RemoveByUniqueIdContext *ctx = cls; 758 const struct RemoveByUniqueIdContext *ctx = cls;
770 struct ClientQueryRecord *record = value; 759 struct ClientQueryRecord *cqr = value;
771 760
772 if (record->unique_id != ctx->unique_id) 761 if (cqr->unique_id != ctx->unique_id)
773 return GNUNET_YES; 762 return GNUNET_YES;
774 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 763 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
775 "Removing client %p's record for key %s (by unique id)\n", 764 "Removing client %p's record for key %s (by unique id)\n",
776 ctx->client->client_handle, GNUNET_h2s (key)); 765 ctx->ch->client,
777 return remove_client_records (ctx->client, key, record); 766 GNUNET_h2s (key));
767 remove_client_record (cqr);
768 return GNUNET_YES;
778} 769}
779 770
780 771
@@ -782,18 +773,15 @@ remove_by_unique_id (void *cls, const struct GNUNET_HashCode * key, void *value)
782 * Handler for any generic DHT stop messages, calls the appropriate handler 773 * Handler for any generic DHT stop messages, calls the appropriate handler
783 * depending on message type (if processed locally) 774 * depending on message type (if processed locally)
784 * 775 *
785 * @param cls closure for the service 776 * @param cls client we received this message from
786 * @param client the client we received this message from
787 * @param message the actual message received 777 * @param message the actual message received
788 * 778 *
789 */ 779 */
790static void 780static void
791handle_dht_local_get_stop (void *cls, 781handle_dht_local_get_stop (void *cls,
792 struct GNUNET_SERVER_Client *client, 782 const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg)
793 const struct GNUNET_MessageHeader *message)
794{ 783{
795 const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg = 784 struct ClientHandle *ch = cls;
796 (const struct GNUNET_DHT_ClientGetStopMessage *) message;
797 struct RemoveByUniqueIdContext ctx; 785 struct RemoveByUniqueIdContext ctx;
798 786
799 GNUNET_STATISTICS_update (GDS_stats, 787 GNUNET_STATISTICS_update (GDS_stats,
@@ -803,179 +791,98 @@ handle_dht_local_get_stop (void *cls,
803 LOG (GNUNET_ERROR_TYPE_DEBUG, 791 LOG (GNUNET_ERROR_TYPE_DEBUG,
804 "Received GET STOP request for %s from local client %p\n", 792 "Received GET STOP request for %s from local client %p\n",
805 GNUNET_h2s (&dht_stop_msg->key), 793 GNUNET_h2s (&dht_stop_msg->key),
806 client); 794 ch->client);
807 ctx.client = find_active_client (client); 795 ctx.ch = ch;
808 ctx.unique_id = dht_stop_msg->unique_id; 796 ctx.unique_id = dht_stop_msg->unique_id;
809 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, &dht_stop_msg->key, 797 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
810 &remove_by_unique_id, &ctx); 798 &dht_stop_msg->key,
811 GNUNET_SERVER_receive_done (client, GNUNET_OK); 799 &remove_by_unique_id,
800 &ctx);
801 GNUNET_SERVICE_client_continue (ch->client);
812} 802}
813 803
814 804
815/** 805/**
816 * Handler for monitor start messages 806 * Handler for monitor start messages
817 * 807 *
818 * @param cls closure for the service 808 * @param cls the client we received this message from
819 * @param client the client we received this message from 809 * @param msg the actual message received
820 * @param message the actual message received
821 * 810 *
822 */ 811 */
823static void 812static void
824handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client, 813handle_dht_local_monitor (void *cls,
825 const struct GNUNET_MessageHeader *message) 814 const struct GNUNET_DHT_MonitorStartStopMessage *msg)
826{ 815{
816 struct ClientHandle *ch = cls;
827 struct ClientMonitorRecord *r; 817 struct ClientMonitorRecord *r;
828 const struct GNUNET_DHT_MonitorStartStopMessage *msg;
829 818
830 msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message;
831 r = GNUNET_new (struct ClientMonitorRecord); 819 r = GNUNET_new (struct ClientMonitorRecord);
832 820 r->ch = ch;
833 r->client = find_active_client(client); 821 r->type = ntohl (msg->type);
834 r->type = ntohl(msg->type); 822 r->get = ntohs (msg->get);
835 r->get = ntohs(msg->get); 823 r->get_resp = ntohs (msg->get_resp);
836 r->get_resp = ntohs(msg->get_resp); 824 r->put = ntohs (msg->put);
837 r->put = ntohs(msg->put); 825 if (0 == ntohs (msg->filter_key))
838 if (0 == ntohs(msg->filter_key)) 826 {
839 r->key = NULL; 827 r->key = NULL;
828 }
840 else 829 else
841 { 830 {
842 r->key = GNUNET_new (struct GNUNET_HashCode); 831 r->key = GNUNET_new (struct GNUNET_HashCode);
843 GNUNET_memcpy (r->key, &msg->key, sizeof (struct GNUNET_HashCode)); 832 GNUNET_memcpy (r->key,
833 &msg->key,
834 sizeof (struct GNUNET_HashCode));
844 } 835 }
845 GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, r); 836 GNUNET_CONTAINER_DLL_insert (monitor_head,
846 GNUNET_SERVER_receive_done (client, GNUNET_OK); 837 monitor_tail,
838 r);
839 GNUNET_SERVICE_client_continue (ch->client);
847} 840}
848 841
849 842
850/** 843/**
851 * Handler for monitor stop messages 844 * Handler for monitor stop messages
852 * 845 *
853 * @param cls closure for the service 846 * @param cls the client we received this message from
854 * @param client the client we received this message from 847 * @param msg the actual message received
855 * @param message the actual message received
856 *
857 */ 848 */
858static void 849static void
859handle_dht_local_monitor_stop (void *cls, struct GNUNET_SERVER_Client *client, 850handle_dht_local_monitor_stop (void *cls,
860 const struct GNUNET_MessageHeader *message) 851 const struct GNUNET_DHT_MonitorStartStopMessage *msg)
861{ 852{
853 struct ClientHandle *ch = cls;
862 struct ClientMonitorRecord *r; 854 struct ClientMonitorRecord *r;
863 const struct GNUNET_DHT_MonitorStartStopMessage *msg;
864 int keys_match; 855 int keys_match;
865 856
866 msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message; 857 GNUNET_SERVICE_client_continue (ch->client);
867 r = monitor_head; 858 for (r = monitor_head; NULL != r; r = r->next)
868
869 while (NULL != r)
870 { 859 {
871 if (NULL == r->key) 860 if (NULL == r->key)
872 keys_match = (0 == ntohs(msg->filter_key)); 861 {
862 keys_match = (0 == ntohs(msg->filter_key));
863 }
873 else 864 else
874 { 865 {
875 keys_match = (0 != ntohs(msg->filter_key) 866 keys_match = ( (0 != ntohs(msg->filter_key)) &&
876 && !memcmp(r->key, &msg->key, sizeof(struct GNUNET_HashCode))); 867 (! memcmp (r->key,
868 &msg->key,
869 sizeof(struct GNUNET_HashCode))) );
877 } 870 }
878 if (find_active_client(client) == r->client 871 if ( (ch == r->ch) &&
879 && ntohl(msg->type) == r->type 872 (ntohl(msg->type) == r->type) &&
880 && r->get == msg->get 873 (r->get == msg->get) &&
881 && r->get_resp == msg->get_resp 874 (r->get_resp == msg->get_resp) &&
882 && r->put == msg->put 875 (r->put == msg->put) &&
883 && keys_match 876 keys_match )
884 )
885 { 877 {
886 GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, r); 878 GNUNET_CONTAINER_DLL_remove (monitor_head,
887 GNUNET_free_non_null (r->key); 879 monitor_tail,
888 GNUNET_free (r); 880 r);
889 GNUNET_SERVER_receive_done (client, GNUNET_OK); 881 GNUNET_free_non_null (r->key);
890 return; /* Delete only ONE entry */ 882 GNUNET_free (r);
883 return; /* Delete only ONE entry */
891 } 884 }
892 r = r->next;
893 } 885 }
894
895 GNUNET_SERVER_receive_done (client, GNUNET_OK);
896}
897
898
899/**
900 * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
901 * request. A ClientList is passed as closure, take the head of the list
902 * and copy it into buf, which has the result of sending the message to the
903 * client.
904 *
905 * @param cls closure to this call
906 * @param size maximum number of bytes available to send
907 * @param buf where to copy the actual message to
908 *
909 * @return the number of bytes actually copied, 0 indicates failure
910 */
911static size_t
912send_reply_to_client (void *cls, size_t size, void *buf)
913{
914 struct ClientList *client = cls;
915 char *cbuf = buf;
916 struct PendingMessage *reply;
917 size_t off;
918 size_t msize;
919
920 client->transmit_handle = NULL;
921 if (buf == NULL)
922 {
923 /* client disconnected */
924 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
925 "Client %p disconnected, pending messages will be discarded\n",
926 client->client_handle);
927 return 0;
928 }
929 off = 0;
930 while ((NULL != (reply = client->pending_head)) &&
931 (size >= off + (msize = ntohs (reply->msg->size))))
932 {
933 GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail,
934 reply);
935 GNUNET_memcpy (&cbuf[off], reply->msg, msize);
936 GNUNET_free (reply);
937 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
938 "Transmitting %u bytes to client %p\n",
939 (unsigned int) msize,
940 client->client_handle);
941 off += msize;
942 }
943 process_pending_messages (client);
944 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
945 "Transmitted %u/%u bytes to client %p\n",
946 (unsigned int) off,
947 (unsigned int) size,
948 client->client_handle);
949 return off;
950}
951
952
953/**
954 * Task run to check for messages that need to be sent to a client.
955 *
956 * @param client a ClientList, containing the client and any messages to be sent to it
957 */
958static void
959process_pending_messages (struct ClientList *client)
960{
961 if ((client->pending_head == NULL) || (client->transmit_handle != NULL))
962 {
963 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
964 "Not asking for transmission to %p now: %s\n",
965 client->client_handle,
966 client->pending_head ==
967 NULL ? "no more messages" : "request already pending");
968 return;
969 }
970 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
971 "Asking for transmission of %u bytes to client %p\n",
972 ntohs (client->pending_head->msg->size), client->client_handle);
973 client->transmit_handle =
974 GNUNET_SERVER_notify_transmit_ready (client->client_handle,
975 ntohs (client->pending_head->
976 msg->size),
977 GNUNET_TIME_UNIT_FOREVER_REL,
978 &send_reply_to_client, client);
979} 886}
980 887
981 888
@@ -986,19 +893,24 @@ struct ForwardReplyContext
986{ 893{
987 894
988 /** 895 /**
989 * Actual message to send to matching clients. 896 * Expiration time of the reply.
990 */ 897 */
991 struct PendingMessage *pm; 898 struct GNUNET_TIME_Absolute expiration;
992 899
993 /** 900 /**
994 * Embedded payload. 901 * GET path taken.
995 */ 902 */
996 const void *data; 903 const struct GNUNET_PeerIdentity *get_path;
997 904
998 /** 905 /**
999 * Type of the data. 906 * PUT path taken.
1000 */ 907 */
1001 enum GNUNET_BLOCK_Type type; 908 const struct GNUNET_PeerIdentity *put_path;
909
910 /**
911 * Embedded payload.
912 */
913 const void *data;
1002 914
1003 /** 915 /**
1004 * Number of bytes in data. 916 * Number of bytes in data.
@@ -1006,9 +918,19 @@ struct ForwardReplyContext
1006 size_t data_size; 918 size_t data_size;
1007 919
1008 /** 920 /**
1009 * Do we need to copy @a pm because it was already used? 921 * Number of entries in @e get_path.
1010 */ 922 */
1011 int do_copy; 923 unsigned int get_path_length;
924
925 /**
926 * Number of entries in @e put_path.
927 */
928 unsigned int put_path_length;
929
930 /**
931 * Type of the data.
932 */
933 enum GNUNET_BLOCK_Type type;
1012 934
1013}; 935};
1014 936
@@ -1031,17 +953,18 @@ forward_reply (void *cls,
1031{ 953{
1032 struct ForwardReplyContext *frc = cls; 954 struct ForwardReplyContext *frc = cls;
1033 struct ClientQueryRecord *record = value; 955 struct ClientQueryRecord *record = value;
1034 struct PendingMessage *pm; 956 struct GNUNET_MQ_Envelope *env;
1035 struct GNUNET_DHT_ClientResultMessage *reply; 957 struct GNUNET_DHT_ClientResultMessage *reply;
1036 enum GNUNET_BLOCK_EvaluationResult eval; 958 enum GNUNET_BLOCK_EvaluationResult eval;
1037 int do_free; 959 int do_free;
1038 struct GNUNET_HashCode ch; 960 struct GNUNET_HashCode ch;
1039 unsigned int i; 961 struct GNUNET_PeerIdentity *paths;
1040 962
1041 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, 963 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1042 "R5N CLIENT-RESULT %s\n", 964 "CLIENT-RESULT %s\n",
1043 GNUNET_h2s_full (key)); 965 GNUNET_h2s_full (key));
1044 if ((record->type != GNUNET_BLOCK_TYPE_ANY) && (record->type != frc->type)) 966 if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
967 (record->type != frc->type))
1045 { 968 {
1046 LOG (GNUNET_ERROR_TYPE_DEBUG, 969 LOG (GNUNET_ERROR_TYPE_DEBUG,
1047 "Record type missmatch, not passing request for key %s to local client\n", 970 "Record type missmatch, not passing request for key %s to local client\n",
@@ -1053,8 +976,10 @@ forward_reply (void *cls,
1053 return GNUNET_YES; /* type mismatch */ 976 return GNUNET_YES; /* type mismatch */
1054 } 977 }
1055 GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch); 978 GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch);
1056 for (i = 0; i < record->seen_replies_count; i++) 979 for (unsigned int i = 0; i < record->seen_replies_count; i++)
1057 if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (struct GNUNET_HashCode))) 980 if (0 == memcmp (&record->seen_replies[i],
981 &ch,
982 sizeof (struct GNUNET_HashCode)))
1058 { 983 {
1059 LOG (GNUNET_ERROR_TYPE_DEBUG, 984 LOG (GNUNET_ERROR_TYPE_DEBUG,
1060 "Duplicate reply, not passing request for key %s to local client\n", 985 "Duplicate reply, not passing request for key %s to local client\n",
@@ -1065,8 +990,8 @@ forward_reply (void *cls,
1065 1, GNUNET_NO); 990 1, GNUNET_NO);
1066 return GNUNET_YES; /* duplicate */ 991 return GNUNET_YES; /* duplicate */
1067 } 992 }
1068 eval = 993 eval
1069 GNUNET_BLOCK_evaluate (GDS_block_context, 994 = GNUNET_BLOCK_evaluate (GDS_block_context,
1070 record->type, 995 record->type,
1071 GNUNET_BLOCK_EO_NONE, 996 GNUNET_BLOCK_EO_NONE,
1072 key, 997 key,
@@ -1086,7 +1011,9 @@ forward_reply (void *cls,
1086 do_free = GNUNET_YES; 1011 do_free = GNUNET_YES;
1087 break; 1012 break;
1088 case GNUNET_BLOCK_EVALUATION_OK_MORE: 1013 case GNUNET_BLOCK_EVALUATION_OK_MORE:
1089 GNUNET_array_append (record->seen_replies, record->seen_replies_count, ch); 1014 GNUNET_array_append (record->seen_replies,
1015 record->seen_replies_count,
1016 ch);
1090 do_free = GNUNET_NO; 1017 do_free = GNUNET_NO;
1091 break; 1018 break;
1092 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: 1019 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
@@ -1112,34 +1039,38 @@ forward_reply (void *cls,
1112 GNUNET_break (0); 1039 GNUNET_break (0);
1113 return GNUNET_NO; 1040 return GNUNET_NO;
1114 } 1041 }
1115 if (GNUNET_NO == frc->do_copy)
1116 {
1117 /* first time, we can use the original data */
1118 pm = frc->pm;
1119 frc->do_copy = GNUNET_YES;
1120 }
1121 else
1122 {
1123 /* two clients waiting for same reply, must copy for queueing */
1124 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
1125 ntohs (frc->pm->msg->size));
1126 GNUNET_memcpy (pm, frc->pm,
1127 sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size));
1128 pm->next = pm->prev = NULL;
1129 pm->msg = (struct GNUNET_MessageHeader *) &pm[1];
1130 }
1131 GNUNET_STATISTICS_update (GDS_stats, 1042 GNUNET_STATISTICS_update (GDS_stats,
1132 gettext_noop ("# RESULTS queued for clients"), 1, 1043 gettext_noop ("# RESULTS queued for clients"),
1044 1,
1133 GNUNET_NO); 1045 GNUNET_NO);
1134 reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1]; 1046 env = GNUNET_MQ_msg_extra (reply,
1047 frc->data_size +
1048 (frc->get_path_length + frc->put_path_length) * sizeof (struct GNUNET_PeerIdentity),
1049 GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
1050 reply->type = htonl (frc->type);
1051 reply->get_path_length = htonl (frc->get_path_length);
1052 reply->put_path_length = htonl (frc->put_path_length);
1135 reply->unique_id = record->unique_id; 1053 reply->unique_id = record->unique_id;
1054 reply->expiration = GNUNET_TIME_absolute_hton (frc->expiration);
1055 reply->key = *key;
1056 paths = (struct GNUNET_PeerIdentity *) &reply[1];
1057 GNUNET_memcpy (paths,
1058 frc->put_path,
1059 sizeof (struct GNUNET_PeerIdentity) * frc->put_path_length);
1060 GNUNET_memcpy (&paths[frc->put_path_length],
1061 frc->get_path,
1062 sizeof (struct GNUNET_PeerIdentity) * frc->get_path_length);
1063 GNUNET_memcpy (&paths[frc->get_path_length + frc->put_path_length],
1064 frc->data,
1065 frc->data_size);
1136 LOG (GNUNET_ERROR_TYPE_DEBUG, 1066 LOG (GNUNET_ERROR_TYPE_DEBUG,
1137 "Queueing reply to query %s for client %p\n", 1067 "Sending reply to query %s for client %p\n",
1138 GNUNET_h2s (key), 1068 GNUNET_h2s (key),
1139 record->client->client_handle); 1069 record->ch->client);
1140 add_pending_message (record->client, pm); 1070 GNUNET_MQ_send (record->ch->mq,
1071 env);
1141 if (GNUNET_YES == do_free) 1072 if (GNUNET_YES == do_free)
1142 remove_client_records (record->client, key, record); 1073 remove_client_record (record);
1143 return GNUNET_YES; 1074 return GNUNET_YES;
1144} 1075}
1145 1076
@@ -1166,74 +1097,48 @@ GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
1166 const struct GNUNET_PeerIdentity *get_path, 1097 const struct GNUNET_PeerIdentity *get_path,
1167 unsigned int put_path_length, 1098 unsigned int put_path_length,
1168 const struct GNUNET_PeerIdentity *put_path, 1099 const struct GNUNET_PeerIdentity *put_path,
1169 enum GNUNET_BLOCK_Type type, size_t data_size, 1100 enum GNUNET_BLOCK_Type type,
1101 size_t data_size,
1170 const void *data) 1102 const void *data)
1171{ 1103{
1172 struct ForwardReplyContext frc; 1104 struct ForwardReplyContext frc;
1173 struct PendingMessage *pm;
1174 struct GNUNET_DHT_ClientResultMessage *reply;
1175 struct GNUNET_PeerIdentity *paths;
1176 size_t msize; 1105 size_t msize;
1177 1106
1178 if (NULL == GNUNET_CONTAINER_multihashmap_get (forward_map, key)) 1107 msize = sizeof (struct GNUNET_DHT_ClientResultMessage) + data_size +
1108 (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1109 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1110 {
1111 GNUNET_break (0);
1112 return;
1113 }
1114 if (NULL == GNUNET_CONTAINER_multihashmap_get (forward_map,
1115 key))
1179 { 1116 {
1180 LOG (GNUNET_ERROR_TYPE_DEBUG, 1117 LOG (GNUNET_ERROR_TYPE_DEBUG,
1181 "No matching client for reply for key %s\n", 1118 "No matching client for reply for key %s\n",
1182 GNUNET_h2s (key)); 1119 GNUNET_h2s (key));
1183 GNUNET_STATISTICS_update (GDS_stats, 1120 GNUNET_STATISTICS_update (GDS_stats,
1184 gettext_noop 1121 gettext_noop ("# REPLIES ignored for CLIENTS (no match)"),
1185 ("# REPLIES ignored for CLIENTS (no match)"), 1, 1122 1,
1186 GNUNET_NO); 1123 GNUNET_NO);
1187 return; /* no matching request, fast exit! */ 1124 return; /* no matching request, fast exit! */
1188 } 1125 }
1189 msize = 1126 frc.expiration = expiration;
1190 sizeof (struct GNUNET_DHT_ClientResultMessage) + data_size + 1127 frc.get_path = get_path;
1191 (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity); 1128 frc.put_path = put_path;
1192 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1193 {
1194 GNUNET_break (0);
1195 return;
1196 }
1197 LOG (GNUNET_ERROR_TYPE_DEBUG,
1198 "Forwarding reply for key %s to client\n",
1199 GNUNET_h2s (key));
1200
1201 pm = GNUNET_malloc (msize + sizeof (struct PendingMessage));
1202 reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1];
1203 pm->msg = &reply->header;
1204 reply->header.size = htons ((uint16_t) msize);
1205 reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
1206 reply->type = htonl (type);
1207 reply->get_path_length = htonl (get_path_length);
1208 reply->put_path_length = htonl (put_path_length);
1209 reply->unique_id = 0; /* filled in later */
1210 reply->expiration = GNUNET_TIME_absolute_hton (expiration);
1211 reply->key = *key;
1212 paths = (struct GNUNET_PeerIdentity *) &reply[1];
1213 GNUNET_memcpy (paths, put_path,
1214 sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1215 GNUNET_memcpy (&paths[put_path_length], get_path,
1216 sizeof (struct GNUNET_PeerIdentity) * get_path_length);
1217 GNUNET_memcpy (&paths[get_path_length + put_path_length], data, data_size);
1218 frc.do_copy = GNUNET_NO;
1219 frc.pm = pm;
1220 frc.data = data; 1129 frc.data = data;
1221 frc.data_size = data_size; 1130 frc.data_size = data_size;
1131 frc.get_path_length = get_path_length;
1132 frc.put_path_length = put_path_length;
1222 frc.type = type; 1133 frc.type = type;
1134 LOG (GNUNET_ERROR_TYPE_DEBUG,
1135 "Forwarding reply for key %s to client\n",
1136 GNUNET_h2s (key));
1223 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, 1137 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
1224 key, 1138 key,
1225 &forward_reply, 1139 &forward_reply,
1226 &frc); 1140 &frc);
1227 1141
1228 if (GNUNET_NO == frc.do_copy)
1229 {
1230 /* did not match any of the requests, free! */
1231 GNUNET_STATISTICS_update (GDS_stats,
1232 gettext_noop
1233 ("# REPLIES ignored for CLIENTS (no match)"), 1,
1234 GNUNET_NO);
1235 GNUNET_free (pm);
1236 }
1237} 1142}
1238 1143
1239 1144
@@ -1259,18 +1164,21 @@ GDS_CLIENTS_process_get (uint32_t options,
1259 const struct GNUNET_HashCode * key) 1164 const struct GNUNET_HashCode * key)
1260{ 1165{
1261 struct ClientMonitorRecord *m; 1166 struct ClientMonitorRecord *m;
1262 struct ClientList **cl; 1167 struct ClientHandle **cl;
1263 unsigned int cl_size; 1168 unsigned int cl_size;
1264 1169
1265 cl = NULL; 1170 cl = NULL;
1266 cl_size = 0; 1171 cl_size = 0;
1267 for (m = monitor_head; NULL != m; m = m->next) 1172 for (m = monitor_head; NULL != m; m = m->next)
1268 { 1173 {
1269 if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) && 1174 if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
1270 (NULL == m->key || 1175 (m->type == type) ) &&
1271 memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0)) 1176 ( (NULL == m->key) ||
1177 (0 == memcmp (key,
1178 m->key,
1179 sizeof(struct GNUNET_HashCode))) ) )
1272 { 1180 {
1273 struct PendingMessage *pm; 1181 struct GNUNET_MQ_Envelope *env;
1274 struct GNUNET_DHT_MonitorGetMessage *mmsg; 1182 struct GNUNET_DHT_MonitorGetMessage *mmsg;
1275 struct GNUNET_PeerIdentity *msg_path; 1183 struct GNUNET_PeerIdentity *msg_path;
1276 size_t msize; 1184 size_t msize;
@@ -1278,31 +1186,30 @@ GDS_CLIENTS_process_get (uint32_t options,
1278 1186
1279 /* Don't send duplicates */ 1187 /* Don't send duplicates */
1280 for (i = 0; i < cl_size; i++) 1188 for (i = 0; i < cl_size; i++)
1281 if (cl[i] == m->client) 1189 if (cl[i] == m->ch)
1282 break; 1190 break;
1283 if (i < cl_size) 1191 if (i < cl_size)
1284 continue; 1192 continue;
1285 GNUNET_array_append (cl, cl_size, m->client); 1193 GNUNET_array_append (cl,
1194 cl_size,
1195 m->ch);
1286 1196
1287 msize = path_length * sizeof (struct GNUNET_PeerIdentity); 1197 msize = path_length * sizeof (struct GNUNET_PeerIdentity);
1288 msize += sizeof (struct GNUNET_DHT_MonitorGetMessage); 1198 env = GNUNET_MQ_msg_extra (mmsg,
1289 msize += sizeof (struct PendingMessage); 1199 msize,
1290 pm = GNUNET_malloc (msize); 1200 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
1291 mmsg = (struct GNUNET_DHT_MonitorGetMessage *) &pm[1];
1292 pm->msg = &mmsg->header;
1293 mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
1294 mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
1295 mmsg->options = htonl(options); 1201 mmsg->options = htonl(options);
1296 mmsg->type = htonl(type); 1202 mmsg->type = htonl(type);
1297 mmsg->hop_count = htonl(hop_count); 1203 mmsg->hop_count = htonl(hop_count);
1298 mmsg->desired_replication_level = htonl(desired_replication_level); 1204 mmsg->desired_replication_level = htonl(desired_replication_level);
1299 mmsg->get_path_length = htonl(path_length); 1205 mmsg->get_path_length = htonl(path_length);
1300 GNUNET_memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode)); 1206 mmsg->key = *key;
1301 msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; 1207 msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1302 if (path_length > 0) 1208 GNUNET_memcpy (msg_path,
1303 GNUNET_memcpy (msg_path, path, 1209 path,
1304 path_length * sizeof (struct GNUNET_PeerIdentity)); 1210 path_length * sizeof (struct GNUNET_PeerIdentity));
1305 add_pending_message (m->client, pm); 1211 GNUNET_MQ_send (m->ch->mq,
1212 env);
1306 } 1213 }
1307 } 1214 }
1308 GNUNET_free_non_null (cl); 1215 GNUNET_free_non_null (cl);
@@ -1335,7 +1242,7 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
1335 size_t size) 1242 size_t size)
1336{ 1243{
1337 struct ClientMonitorRecord *m; 1244 struct ClientMonitorRecord *m;
1338 struct ClientList **cl; 1245 struct ClientHandle **cl;
1339 unsigned int cl_size; 1246 unsigned int cl_size;
1340 1247
1341 cl = NULL; 1248 cl = NULL;
@@ -1346,7 +1253,7 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
1346 (NULL == m->key || 1253 (NULL == m->key ||
1347 memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0)) 1254 memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1348 { 1255 {
1349 struct PendingMessage *pm; 1256 struct GNUNET_MQ_Envelope *env;
1350 struct GNUNET_DHT_MonitorGetRespMessage *mmsg; 1257 struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
1351 struct GNUNET_PeerIdentity *path; 1258 struct GNUNET_PeerIdentity *path;
1352 size_t msize; 1259 size_t msize;
@@ -1354,40 +1261,37 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
1354 1261
1355 /* Don't send duplicates */ 1262 /* Don't send duplicates */
1356 for (i = 0; i < cl_size; i++) 1263 for (i = 0; i < cl_size; i++)
1357 if (cl[i] == m->client) 1264 if (cl[i] == m->ch)
1358 break; 1265 break;
1359 if (i < cl_size) 1266 if (i < cl_size)
1360 continue; 1267 continue;
1361 GNUNET_array_append (cl, cl_size, m->client); 1268 GNUNET_array_append (cl,
1269 cl_size,
1270 m->ch);
1362 1271
1363 msize = size; 1272 msize = size;
1364 msize += (get_path_length + put_path_length) 1273 msize += (get_path_length + put_path_length)
1365 * sizeof (struct GNUNET_PeerIdentity); 1274 * sizeof (struct GNUNET_PeerIdentity);
1366 msize += sizeof (struct GNUNET_DHT_MonitorGetRespMessage); 1275 env = GNUNET_MQ_msg_extra (mmsg,
1367 msize += sizeof (struct PendingMessage); 1276 msize,
1368 pm = GNUNET_malloc (msize); 1277 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
1369 mmsg = (struct GNUNET_DHT_MonitorGetRespMessage *) &pm[1];
1370 pm->msg = (struct GNUNET_MessageHeader *) mmsg;
1371 mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
1372 mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
1373 mmsg->type = htonl(type); 1278 mmsg->type = htonl(type);
1374 mmsg->put_path_length = htonl(put_path_length); 1279 mmsg->put_path_length = htonl(put_path_length);
1375 mmsg->get_path_length = htonl(get_path_length); 1280 mmsg->get_path_length = htonl(get_path_length);
1376 path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1377 if (put_path_length > 0)
1378 {
1379 GNUNET_memcpy (path, put_path,
1380 put_path_length * sizeof (struct GNUNET_PeerIdentity));
1381 path = &path[put_path_length];
1382 }
1383 if (get_path_length > 0)
1384 GNUNET_memcpy (path, get_path,
1385 get_path_length * sizeof (struct GNUNET_PeerIdentity));
1386 mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp); 1281 mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
1387 GNUNET_memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode)); 1282 mmsg->key = *key;
1388 if (size > 0) 1283 path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1389 GNUNET_memcpy (&path[get_path_length], data, size); 1284 GNUNET_memcpy (path,
1390 add_pending_message (m->client, pm); 1285 put_path,
1286 put_path_length * sizeof (struct GNUNET_PeerIdentity));
1287 GNUNET_memcpy (path,
1288 get_path,
1289 get_path_length * sizeof (struct GNUNET_PeerIdentity));
1290 GNUNET_memcpy (&path[get_path_length],
1291 data,
1292 size);
1293 GNUNET_MQ_send (m->ch->mq,
1294 env);
1391 } 1295 }
1392 } 1296 }
1393 GNUNET_free_non_null (cl); 1297 GNUNET_free_non_null (cl);
@@ -1422,7 +1326,7 @@ GDS_CLIENTS_process_put (uint32_t options,
1422 size_t size) 1326 size_t size)
1423{ 1327{
1424 struct ClientMonitorRecord *m; 1328 struct ClientMonitorRecord *m;
1425 struct ClientList **cl; 1329 struct ClientHandle **cl;
1426 unsigned int cl_size; 1330 unsigned int cl_size;
1427 1331
1428 cl = NULL; 1332 cl = NULL;
@@ -1433,7 +1337,7 @@ GDS_CLIENTS_process_put (uint32_t options,
1433 (NULL == m->key || 1337 (NULL == m->key ||
1434 memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0)) 1338 memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1435 { 1339 {
1436 struct PendingMessage *pm; 1340 struct GNUNET_MQ_Envelope *env;
1437 struct GNUNET_DHT_MonitorPutMessage *mmsg; 1341 struct GNUNET_DHT_MonitorPutMessage *mmsg;
1438 struct GNUNET_PeerIdentity *msg_path; 1342 struct GNUNET_PeerIdentity *msg_path;
1439 size_t msize; 1343 size_t msize;
@@ -1441,38 +1345,35 @@ GDS_CLIENTS_process_put (uint32_t options,
1441 1345
1442 /* Don't send duplicates */ 1346 /* Don't send duplicates */
1443 for (i = 0; i < cl_size; i++) 1347 for (i = 0; i < cl_size; i++)
1444 if (cl[i] == m->client) 1348 if (cl[i] == m->ch)
1445 break; 1349 break;
1446 if (i < cl_size) 1350 if (i < cl_size)
1447 continue; 1351 continue;
1448 GNUNET_array_append (cl, cl_size, m->client); 1352 GNUNET_array_append (cl,
1353 cl_size,
1354 m->ch);
1449 1355
1450 msize = size; 1356 msize = size;
1451 msize += path_length * sizeof (struct GNUNET_PeerIdentity); 1357 msize += path_length * sizeof (struct GNUNET_PeerIdentity);
1452 msize += sizeof (struct GNUNET_DHT_MonitorPutMessage); 1358 env = GNUNET_MQ_msg_extra (mmsg,
1453 msize += sizeof (struct PendingMessage); 1359 msize,
1454 pm = GNUNET_malloc (msize); 1360 GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
1455 mmsg = (struct GNUNET_DHT_MonitorPutMessage *) &pm[1];
1456 pm->msg = (struct GNUNET_MessageHeader *) mmsg;
1457 mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
1458 mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
1459 mmsg->options = htonl(options); 1361 mmsg->options = htonl(options);
1460 mmsg->type = htonl(type); 1362 mmsg->type = htonl(type);
1461 mmsg->hop_count = htonl(hop_count); 1363 mmsg->hop_count = htonl(hop_count);
1462 mmsg->desired_replication_level = htonl(desired_replication_level); 1364 mmsg->desired_replication_level = htonl (desired_replication_level);
1463 mmsg->put_path_length = htonl(path_length); 1365 mmsg->put_path_length = htonl (path_length);
1366 mmsg->key = *key;
1367 mmsg->expiration_time = GNUNET_TIME_absolute_hton (exp);
1464 msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; 1368 msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1465 if (path_length > 0) 1369 GNUNET_memcpy (msg_path,
1466 { 1370 path,
1467 GNUNET_memcpy (msg_path, 1371 path_length * sizeof (struct GNUNET_PeerIdentity));
1468 path, 1372 GNUNET_memcpy (&msg_path[path_length],
1469 path_length * sizeof (struct GNUNET_PeerIdentity)); 1373 data,
1470 } 1374 size);
1471 mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp); 1375 GNUNET_MQ_send (m->ch->mq,
1472 GNUNET_memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode)); 1376 env);
1473 if (size > 0)
1474 GNUNET_memcpy (&msg_path[path_length], data, size);
1475 add_pending_message (m->client, pm);
1476 } 1377 }
1477 } 1378 }
1478 GNUNET_free_non_null (cl); 1379 GNUNET_free_non_null (cl);
@@ -1484,43 +1385,21 @@ GDS_CLIENTS_process_put (uint32_t options,
1484 * 1385 *
1485 * @param server the initialized server 1386 * @param server the initialized server
1486 */ 1387 */
1487void 1388static void
1488GDS_CLIENTS_init () 1389GDS_CLIENTS_init ()
1489{ 1390{
1490 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { 1391 forward_map
1491 {&handle_dht_local_put, NULL, 1392 = GNUNET_CONTAINER_multihashmap_create (1024,
1492 GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, 0}, 1393 GNUNET_YES);
1493 {&handle_dht_local_get, NULL, 1394 retry_heap
1494 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0}, 1395 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1495 {&handle_dht_local_get_stop, NULL,
1496 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP,
1497 sizeof (struct GNUNET_DHT_ClientGetStopMessage)},
1498 {&handle_dht_local_monitor, NULL,
1499 GNUNET_MESSAGE_TYPE_DHT_MONITOR_START,
1500 sizeof (struct GNUNET_DHT_MonitorStartStopMessage)},
1501 {&handle_dht_local_monitor_stop, NULL,
1502 GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP,
1503 sizeof (struct GNUNET_DHT_MonitorStartStopMessage)},
1504 {&handle_dht_local_get_result_seen, NULL,
1505 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, 0},
1506 {NULL, NULL, 0, 0}
1507 };
1508
1509 forward_map = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO);
1510 retry_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1511 GNUNET_SERVER_resume (GDS_server);
1512 GNUNET_SERVER_add_handlers (GDS_server,
1513 plugin_handlers);
1514 GNUNET_SERVER_disconnect_notify (GDS_server,
1515 &handle_client_disconnect,
1516 NULL);
1517} 1396}
1518 1397
1519 1398
1520/** 1399/**
1521 * Shutdown client subsystem. 1400 * Shutdown client subsystem.
1522 */ 1401 */
1523void 1402static void
1524GDS_CLIENTS_stop () 1403GDS_CLIENTS_stop ()
1525{ 1404{
1526 if (NULL != retry_task) 1405 if (NULL != retry_task)
@@ -1532,13 +1411,51 @@ GDS_CLIENTS_stop ()
1532 1411
1533 1412
1534/** 1413/**
1535 * Shutdown client subsystem. 1414 * Define "main" method using service macro.
1415 *
1416 * @param run name of the initializaton method for the service
1536 */ 1417 */
1537void 1418#define GDS_DHT_SERVICE_INIT(run) \
1419 GNUNET_SERVICE_MAIN \
1420 ("dht", \
1421 GNUNET_SERVICE_OPTION_NONE, \
1422 run, \
1423 &client_connect_cb, \
1424 &client_disconnect_cb, \
1425 NULL, \
1426 GNUNET_MQ_hd_var_size (dht_local_put, \
1427 GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, \
1428 struct GNUNET_DHT_ClientPutMessage, \
1429 NULL), \
1430 GNUNET_MQ_hd_var_size (dht_local_get, \
1431 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, \
1432 struct GNUNET_DHT_ClientGetMessage, \
1433 NULL), \
1434 GNUNET_MQ_hd_fixed_size (dht_local_get_stop, \
1435 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, \
1436 struct GNUNET_DHT_ClientGetStopMessage, \
1437 NULL), \
1438 GNUNET_MQ_hd_fixed_size (dht_local_monitor, \
1439 GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, \
1440 struct GNUNET_DHT_MonitorStartStopMessage, \
1441 NULL), \
1442 GNUNET_MQ_hd_fixed_size (dht_local_monitor_stop, \
1443 GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, \
1444 struct GNUNET_DHT_MonitorStartStopMessage, \
1445 NULL), \
1446 GNUNET_MQ_hd_var_size (dht_local_get_result_seen, \
1447 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, \
1448 struct GNUNET_DHT_ClientGetResultSeenMessage , \
1449 NULL), \
1450 GNUNET_MQ_handler_end ())
1451
1452
1453/**
1454 * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1455 */
1456void __attribute__ ((destructor))
1538GDS_CLIENTS_done () 1457GDS_CLIENTS_done ()
1539{ 1458{
1540 GNUNET_assert (NULL == client_head);
1541 GNUNET_assert (NULL == client_tail);
1542 if (NULL != retry_heap) 1459 if (NULL != retry_heap)
1543 { 1460 {
1544 GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap)); 1461 GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
diff --git a/src/dht/gnunet-service-dht_clients.h b/src/dht/gnunet-service-dht_clients.h
deleted file mode 100644
index 8a0931f6a..000000000
--- a/src/dht/gnunet-service-dht_clients.h
+++ /dev/null
@@ -1,153 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2009, 2010, 2011 GNUnet e.V.
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
19*/
20
21/**
22 * @file dht/gnunet-service-dht_clients.h
23 * @brief GNUnet DHT service's client management code
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27#ifndef GNUNET_SERVICE_DHT_CLIENT_H
28#define GNUNET_SERVICE_DHT_CLIENT_H
29
30#include "gnunet_util_lib.h"
31#include "gnunet_block_lib.h"
32
33/**
34 * Handle a reply we've received from another peer. If the reply
35 * matches any of our pending queries, forward it to the respective
36 * client(s).
37 *
38 * @param expiration when will the reply expire
39 * @param key the query this reply is for
40 * @param get_path_length number of peers in @a get_path
41 * @param get_path path the reply took on get
42 * @param put_path_length number of peers in @a put_path
43 * @param put_path path the reply took on put
44 * @param type type of the reply
45 * @param data_size number of bytes in @a data
46 * @param data application payload data
47 */
48void
49GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
50 const struct GNUNET_HashCode *key,
51 unsigned int get_path_length,
52 const struct GNUNET_PeerIdentity *get_path,
53 unsigned int put_path_length,
54 const struct GNUNET_PeerIdentity *put_path,
55 enum GNUNET_BLOCK_Type type, size_t data_size,
56 const void *data);
57
58
59/**
60 * Check if some client is monitoring GET messages and notify
61 * them in that case.
62 *
63 * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
64 * @param type The type of data in the request.
65 * @param hop_count Hop count so far.
66 * @param path_length number of entries in path (or 0 if not recorded).
67 * @param path peers on the GET path (or NULL if not recorded).
68 * @param desired_replication_level Desired replication level.
69 * @param key Key of the requested data.
70 */
71void
72GDS_CLIENTS_process_get (uint32_t options,
73 enum GNUNET_BLOCK_Type type,
74 uint32_t hop_count,
75 uint32_t desired_replication_level,
76 unsigned int path_length,
77 const struct GNUNET_PeerIdentity *path,
78 const struct GNUNET_HashCode *key);
79
80
81/**
82 * Check if some client is monitoring GET RESP messages and notify
83 * them in that case.
84 *
85 * @param type The type of data in the result.
86 * @param get_path Peers on GET path (or NULL if not recorded).
87 * @param get_path_length number of entries in @a get_path.
88 * @param put_path peers on the PUT path (or NULL if not recorded).
89 * @param put_path_length number of entries in @a get_path.
90 * @param exp Expiration time of the data.
91 * @param key Key of the @a data.
92 * @param data Pointer to the result data.
93 * @param size Number of bytes in @a data.
94 */
95void
96GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
97 const struct GNUNET_PeerIdentity *get_path,
98 unsigned int get_path_length,
99 const struct GNUNET_PeerIdentity *put_path,
100 unsigned int put_path_length,
101 struct GNUNET_TIME_Absolute exp,
102 const struct GNUNET_HashCode * key,
103 const void *data,
104 size_t size);
105
106
107/**
108 * Check if some client is monitoring PUT messages and notify
109 * them in that case.
110 *
111 * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
112 * @param type The type of data in the request.
113 * @param hop_count Hop count so far.
114 * @param path_length number of entries in path (or 0 if not recorded).
115 * @param path peers on the PUT path (or NULL if not recorded).
116 * @param desired_replication_level Desired replication level.
117 * @param exp Expiration time of the data.
118 * @param key Key under which data is to be stored.
119 * @param data Pointer to the data carried.
120 * @param size Number of bytes in data.
121 */
122void
123GDS_CLIENTS_process_put (uint32_t options,
124 enum GNUNET_BLOCK_Type type,
125 uint32_t hop_count,
126 uint32_t desired_replication_level,
127 unsigned int path_length,
128 const struct GNUNET_PeerIdentity *path,
129 struct GNUNET_TIME_Absolute exp,
130 const struct GNUNET_HashCode * key,
131 const void *data,
132 size_t size);
133
134/**
135 * Initialize client subsystem.
136 */
137void
138GDS_CLIENTS_init (void);
139
140/**
141 * Shutdown client subsystem.
142 */
143void
144GDS_CLIENTS_stop (void);
145
146
147/**
148 * Shutdown client subsystem.
149 */
150void
151GDS_CLIENTS_done (void);
152
153#endif
diff --git a/src/dht/gnunet-service-dht_datacache.c b/src/dht/gnunet-service-dht_datacache.c
index 322b5af17..12c79764d 100644
--- a/src/dht/gnunet-service-dht_datacache.c
+++ b/src/dht/gnunet-service-dht_datacache.c
@@ -25,7 +25,6 @@
25 */ 25 */
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_datacache_lib.h" 27#include "gnunet_datacache_lib.h"
28#include "gnunet-service-dht_clients.h"
29#include "gnunet-service-dht_datacache.h" 28#include "gnunet-service-dht_datacache.h"
30#include "gnunet-service-dht_routing.h" 29#include "gnunet-service-dht_routing.h"
31#include "gnunet-service-dht.h" 30#include "gnunet-service-dht.h"
diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c
index 683e0991a..39fb43495 100644
--- a/src/dht/gnunet-service-dht_neighbours.c
+++ b/src/dht/gnunet-service-dht_neighbours.c
@@ -39,7 +39,6 @@
39#include "gnunet_dht_service.h" 39#include "gnunet_dht_service.h"
40#include "gnunet_statistics_service.h" 40#include "gnunet_statistics_service.h"
41#include "gnunet-service-dht.h" 41#include "gnunet-service-dht.h"
42#include "gnunet-service-dht_clients.h"
43#include "gnunet-service-dht_datacache.h" 42#include "gnunet-service-dht_datacache.h"
44#include "gnunet-service-dht_hello.h" 43#include "gnunet-service-dht_hello.h"
45#include "gnunet-service-dht_neighbours.h" 44#include "gnunet-service-dht_neighbours.h"
@@ -1587,7 +1586,7 @@ core_init (void *cls,
1587 GNUNET_CRYPTO_hash (identity, 1586 GNUNET_CRYPTO_hash (identity,
1588 sizeof (struct GNUNET_PeerIdentity), 1587 sizeof (struct GNUNET_PeerIdentity),
1589 &my_identity_hash); 1588 &my_identity_hash);
1590 GDS_CLIENTS_init (); 1589 GNUNET_SERVICE_resume (GDS_service);
1591} 1590}
1592 1591
1593 1592