diff options
author | Omar Tarabai <tarabai@devegypt.com> | 2014-05-16 16:45:43 +0000 |
---|---|---|
committer | Omar Tarabai <tarabai@devegypt.com> | 2014-05-16 16:45:43 +0000 |
commit | 3c9e8b1b0f7f83f27fefb02bd67b481c67cad0c8 (patch) | |
tree | 525ae2f8cec03f298783aa693d465f789171cf51 /src/peerstore/peerstore_api.c | |
parent | aeaf5c97d7d115d99f30e86be66c622b7a6ebf4f (diff) | |
download | gnunet-3c9e8b1b0f7f83f27fefb02bd67b481c67cad0c8.tar.gz gnunet-3c9e8b1b0f7f83f27fefb02bd67b481c67cad0c8.zip |
peerstore API now uses MQ
Diffstat (limited to 'src/peerstore/peerstore_api.c')
-rw-r--r-- | src/peerstore/peerstore_api.c | 178 |
1 files changed, 143 insertions, 35 deletions
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c index aa798e653..323ba45d0 100644 --- a/src/peerstore/peerstore_api.c +++ b/src/peerstore/peerstore_api.c | |||
@@ -50,6 +50,21 @@ struct GNUNET_PEERSTORE_Handle | |||
50 | */ | 50 | */ |
51 | struct GNUNET_CLIENT_Connection *client; | 51 | struct GNUNET_CLIENT_Connection *client; |
52 | 52 | ||
53 | /** | ||
54 | * Message queue | ||
55 | */ | ||
56 | struct GNUNET_MQ_Handle *mq; | ||
57 | |||
58 | /** | ||
59 | * Head of active STORE requests. | ||
60 | */ | ||
61 | struct GNUNET_PEERSTORE_StoreContext *store_head; | ||
62 | |||
63 | /** | ||
64 | * Tail of active STORE requests. | ||
65 | */ | ||
66 | struct GNUNET_PEERSTORE_StoreContext *store_tail; | ||
67 | |||
53 | }; | 68 | }; |
54 | 69 | ||
55 | /** | 70 | /** |
@@ -57,6 +72,15 @@ struct GNUNET_PEERSTORE_Handle | |||
57 | */ | 72 | */ |
58 | struct GNUNET_PEERSTORE_StoreContext | 73 | struct GNUNET_PEERSTORE_StoreContext |
59 | { | 74 | { |
75 | /** | ||
76 | * Kept in a DLL. | ||
77 | */ | ||
78 | struct GNUNET_PEERSTORE_StoreContext *next; | ||
79 | |||
80 | /** | ||
81 | * Kept in a DLL. | ||
82 | */ | ||
83 | struct GNUNET_PEERSTORE_StoreContext *prev; | ||
60 | 84 | ||
61 | /** | 85 | /** |
62 | * Handle to the PEERSTORE service. | 86 | * Handle to the PEERSTORE service. |
@@ -64,6 +88,11 @@ struct GNUNET_PEERSTORE_StoreContext | |||
64 | struct GNUNET_PEERSTORE_Handle *h; | 88 | struct GNUNET_PEERSTORE_Handle *h; |
65 | 89 | ||
66 | /** | 90 | /** |
91 | * MQ Envelope with store request message | ||
92 | */ | ||
93 | struct GNUNET_MQ_Envelope *ev; | ||
94 | |||
95 | /** | ||
67 | * Continuation called with service response | 96 | * Continuation called with service response |
68 | */ | 97 | */ |
69 | GNUNET_PEERSTORE_Continuation cont; | 98 | GNUNET_PEERSTORE_Continuation cont; |
@@ -73,9 +102,27 @@ struct GNUNET_PEERSTORE_StoreContext | |||
73 | */ | 102 | */ |
74 | void *cont_cls; | 103 | void *cont_cls; |
75 | 104 | ||
105 | /** | ||
106 | * #GNUNET_YES / #GNUNET_NO | ||
107 | * if sent, cannot be canceled | ||
108 | */ | ||
109 | int request_sent; | ||
110 | |||
76 | }; | 111 | }; |
77 | 112 | ||
78 | /******************************************************************************/ | 113 | /******************************************************************************/ |
114 | /******************* DECLARATIONS *********************/ | ||
115 | /******************************************************************************/ | ||
116 | |||
117 | /** | ||
118 | * When a response for store request is received | ||
119 | * | ||
120 | * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *' | ||
121 | * @param msg message received, NULL on timeout or fatal error | ||
122 | */ | ||
123 | void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg); | ||
124 | |||
125 | /******************************************************************************/ | ||
79 | /******************* CONNECTION FUNCTIONS *********************/ | 126 | /******************* CONNECTION FUNCTIONS *********************/ |
80 | /******************************************************************************/ | 127 | /******************************************************************************/ |
81 | 128 | ||
@@ -85,7 +132,7 @@ struct GNUNET_PEERSTORE_StoreContext | |||
85 | * @param h handle to the service | 132 | * @param h handle to the service |
86 | */ | 133 | */ |
87 | static void | 134 | static void |
88 | reconnect (struct GNUNET_PEERSTORE_Handle *h) | 135 | reconnect (struct GNUNET_PEERSTORE_Handle *h) //FIXME: MQ friendly |
89 | { | 136 | { |
90 | 137 | ||
91 | LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n"); | 138 | LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n"); |
@@ -98,6 +145,12 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h) | |||
98 | 145 | ||
99 | } | 146 | } |
100 | 147 | ||
148 | static void | ||
149 | handle_client_error (void *cls, enum GNUNET_MQ_Error error) //FIXME: implement | ||
150 | { | ||
151 | //struct GNUNET_PEERSTORE_Handle *h = cls; | ||
152 | } | ||
153 | |||
101 | /** | 154 | /** |
102 | * Connect to the PEERSTORE service. | 155 | * Connect to the PEERSTORE service. |
103 | * | 156 | * |
@@ -106,15 +159,30 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h) | |||
106 | struct GNUNET_PEERSTORE_Handle * | 159 | struct GNUNET_PEERSTORE_Handle * |
107 | GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | 160 | GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) |
108 | { | 161 | { |
109 | struct GNUNET_CLIENT_Connection *client; | ||
110 | struct GNUNET_PEERSTORE_Handle *h; | 162 | struct GNUNET_PEERSTORE_Handle *h; |
163 | static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { | ||
164 | {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK, sizeof(struct GNUNET_MessageHeader)}, | ||
165 | {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL, sizeof(struct GNUNET_MessageHeader)}, | ||
166 | GNUNET_MQ_HANDLERS_END | ||
167 | }; | ||
111 | 168 | ||
112 | client = GNUNET_CLIENT_connect ("peerstore", cfg); | ||
113 | if(NULL == client) | ||
114 | return NULL; | ||
115 | h = GNUNET_new (struct GNUNET_PEERSTORE_Handle); | 169 | h = GNUNET_new (struct GNUNET_PEERSTORE_Handle); |
116 | h->client = client; | 170 | h->client = GNUNET_CLIENT_connect ("peerstore", cfg); |
171 | if(NULL == h->client) | ||
172 | { | ||
173 | GNUNET_free(h); | ||
174 | return NULL; | ||
175 | } | ||
117 | h->cfg = cfg; | 176 | h->cfg = cfg; |
177 | h->mq = GNUNET_MQ_queue_for_connection_client(h->client, | ||
178 | mq_handlers, | ||
179 | &handle_client_error, | ||
180 | h); | ||
181 | if(NULL == h->mq) | ||
182 | { | ||
183 | GNUNET_free(h); | ||
184 | return NULL; | ||
185 | } | ||
118 | LOG(GNUNET_ERROR_TYPE_DEBUG, "New connection created\n"); | 186 | LOG(GNUNET_ERROR_TYPE_DEBUG, "New connection created\n"); |
119 | return h; | 187 | return h; |
120 | } | 188 | } |
@@ -128,6 +196,11 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) | |||
128 | void | 196 | void |
129 | GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) | 197 | GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) |
130 | { | 198 | { |
199 | if(NULL != h->mq) | ||
200 | { | ||
201 | GNUNET_MQ_destroy(h->mq); | ||
202 | h->mq = NULL; | ||
203 | } | ||
131 | if (NULL != h->client) | 204 | if (NULL != h->client) |
132 | { | 205 | { |
133 | GNUNET_CLIENT_disconnect (h->client); | 206 | GNUNET_CLIENT_disconnect (h->client); |
@@ -139,7 +212,7 @@ GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) | |||
139 | 212 | ||
140 | 213 | ||
141 | /******************************************************************************/ | 214 | /******************************************************************************/ |
142 | /******************* ADD FUNCTIONS *********************/ | 215 | /******************* STORE FUNCTIONS *********************/ |
143 | /******************************************************************************/ | 216 | /******************************************************************************/ |
144 | 217 | ||
145 | /** | 218 | /** |
@@ -148,41 +221,77 @@ GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) | |||
148 | * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *' | 221 | * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *' |
149 | * @param msg message received, NULL on timeout or fatal error | 222 | * @param msg message received, NULL on timeout or fatal error |
150 | */ | 223 | */ |
151 | void store_response_receiver (void *cls, const struct GNUNET_MessageHeader *msg) | 224 | void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg) //FIXME: MQ friendly |
152 | { | 225 | { |
153 | struct GNUNET_PEERSTORE_StoreContext *sc = cls; | 226 | struct GNUNET_PEERSTORE_Handle *h = cls; |
227 | struct GNUNET_PEERSTORE_StoreContext *sc; | ||
154 | uint16_t msg_type; | 228 | uint16_t msg_type; |
229 | GNUNET_PEERSTORE_Continuation cont; | ||
230 | void *cont_cls; | ||
155 | 231 | ||
156 | if(NULL == sc->cont) | 232 | sc = h->store_head; |
233 | if(NULL == sc) | ||
234 | { | ||
235 | GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Unexpected store response, this should not happen.\n"); | ||
236 | reconnect(h); | ||
157 | return; | 237 | return; |
158 | if(NULL == msg) | 238 | } |
239 | cont = sc->cont; | ||
240 | cont_cls = sc->cont_cls; | ||
241 | GNUNET_CONTAINER_DLL_remove(h->store_head, h->store_tail, sc); | ||
242 | GNUNET_free(sc); | ||
243 | if(NULL == msg) /* Connection error */ | ||
159 | { | 244 | { |
160 | sc->cont(sc->cont_cls, GNUNET_SYSERR); | 245 | if(NULL != cont) |
161 | reconnect(sc->h); | 246 | cont(cont_cls, GNUNET_SYSERR); |
247 | reconnect(h); | ||
162 | return; | 248 | return; |
163 | } | 249 | } |
164 | msg_type = ntohs(msg->type); | 250 | if(NULL != cont) /* Run continuation */ |
165 | if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK == msg_type) | ||
166 | sc->cont(sc->cont_cls, GNUNET_OK); | ||
167 | else if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL == msg_type) | ||
168 | sc->cont(sc->cont_cls, GNUNET_SYSERR); | ||
169 | else | ||
170 | { | 251 | { |
171 | LOG(GNUNET_ERROR_TYPE_ERROR, "Invalid response from `PEERSTORE' service.\n"); | 252 | msg_type = ntohs(msg->type); |
172 | sc->cont(sc->cont_cls, GNUNET_SYSERR); | 253 | if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK == msg_type) |
254 | cont(cont_cls, GNUNET_OK); | ||
255 | else if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL == msg_type) | ||
256 | cont(cont_cls, GNUNET_SYSERR); | ||
173 | } | 257 | } |
174 | 258 | ||
175 | } | 259 | } |
176 | 260 | ||
177 | /** | 261 | /** |
262 | * Callback after MQ envelope is sent | ||
263 | * | ||
264 | * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *' | ||
265 | */ | ||
266 | void store_request_sent (void *cls) | ||
267 | { | ||
268 | struct GNUNET_PEERSTORE_StoreContext *sc = cls; | ||
269 | |||
270 | sc->request_sent = GNUNET_YES; | ||
271 | } | ||
272 | |||
273 | /** | ||
178 | * Cancel a store request | 274 | * Cancel a store request |
179 | * | 275 | * |
180 | * @param sc Store request context | 276 | * @param sc Store request context |
181 | */ | 277 | */ |
182 | void | 278 | void |
183 | GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc) | 279 | GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc) //FIXME: MQ friendly |
184 | { | 280 | { |
185 | sc->cont = NULL; | 281 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, |
282 | "Canceling store request.\n"); | ||
283 | if(GNUNET_NO == sc->request_sent) | ||
284 | { | ||
285 | if(NULL != sc->ev) | ||
286 | GNUNET_MQ_discard(sc->ev); | ||
287 | GNUNET_CONTAINER_DLL_remove(sc->h->store_head, sc->h->store_tail, sc); | ||
288 | GNUNET_free(sc); | ||
289 | } | ||
290 | else | ||
291 | { /* request already sent, will have to wait for response */ | ||
292 | sc->cont = NULL; | ||
293 | } | ||
294 | |||
186 | } | 295 | } |
187 | 296 | ||
188 | /** | 297 | /** |
@@ -209,29 +318,28 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h, | |||
209 | GNUNET_PEERSTORE_Continuation cont, | 318 | GNUNET_PEERSTORE_Continuation cont, |
210 | void *cont_cls) | 319 | void *cont_cls) |
211 | { | 320 | { |
321 | struct GNUNET_MQ_Envelope *ev; | ||
212 | struct GNUNET_PEERSTORE_StoreContext *sc; | 322 | struct GNUNET_PEERSTORE_StoreContext *sc; |
213 | struct StoreRecordMessage *srm; | ||
214 | 323 | ||
215 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 324 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
216 | "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n", | 325 | "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n", |
217 | size, sub_system, GNUNET_i2s (peer), key); | 326 | size, sub_system, GNUNET_i2s (peer), key); |
218 | sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext); | 327 | ev = PEERSTORE_create_record_mq_envelope(sub_system, |
219 | sc->cont = cont; | ||
220 | sc->cont_cls = cont_cls; | ||
221 | sc->h = h; | ||
222 | srm = PEERSTORE_create_record_message(sub_system, | ||
223 | peer, | 328 | peer, |
224 | key, | 329 | key, |
225 | value, | 330 | value, |
226 | size, | 331 | size, |
227 | expiry, | 332 | expiry, |
228 | GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); | 333 | GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); |
229 | GNUNET_CLIENT_transmit_and_get_response(h->client, | 334 | GNUNET_MQ_send(h->mq, ev); |
230 | (const struct GNUNET_MessageHeader *)srm, | 335 | GNUNET_MQ_notify_sent(ev, &store_request_sent, ev); |
231 | GNUNET_TIME_UNIT_FOREVER_REL, | 336 | sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext); |
232 | GNUNET_YES, | 337 | sc->ev = ev; |
233 | &store_response_receiver, | 338 | sc->cont = cont; |
234 | sc); | 339 | sc->cont_cls = cont_cls; |
340 | sc->h = h; | ||
341 | sc->request_sent = GNUNET_NO; | ||
342 | GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc); | ||
235 | return sc; | 343 | return sc; |
236 | 344 | ||
237 | } | 345 | } |