diff options
author | Omar Tarabai <tarabai@devegypt.com> | 2014-06-03 13:50:37 +0000 |
---|---|---|
committer | Omar Tarabai <tarabai@devegypt.com> | 2014-06-03 13:50:37 +0000 |
commit | a04d70028c0615e7910f68328db9c61bba67128b (patch) | |
tree | 8d27cc7e8ad6b25cf01ac2cbae6f2839deec00f8 /src/peerstore | |
parent | eb985bf2eae6054292e3cbffbfeb4aaea3e394d5 (diff) | |
download | gnunet-a04d70028c0615e7910f68328db9c61bba67128b.tar.gz gnunet-a04d70028c0615e7910f68328db9c61bba67128b.zip |
peerstore: fixes in watch functionality
Diffstat (limited to 'src/peerstore')
-rw-r--r-- | src/peerstore/gnunet-service-peerstore.c | 32 | ||||
-rw-r--r-- | src/peerstore/peerstore_api.c | 78 | ||||
-rw-r--r-- | src/peerstore/test_peerstore_api.c | 11 |
3 files changed, 26 insertions, 95 deletions
diff --git a/src/peerstore/gnunet-service-peerstore.c b/src/peerstore/gnunet-service-peerstore.c index 706fcaaae..82961f685 100644 --- a/src/peerstore/gnunet-service-peerstore.c +++ b/src/peerstore/gnunet-service-peerstore.c | |||
@@ -144,7 +144,7 @@ int record_iterator(void *cls, | |||
144 | struct GNUNET_PEERSTORE_Record *record, | 144 | struct GNUNET_PEERSTORE_Record *record, |
145 | char *emsg) | 145 | char *emsg) |
146 | { | 146 | { |
147 | struct GNUNET_SERVER_TransmitContext *tc = cls; | 147 | struct GNUNET_SERVER_Client *client = cls; |
148 | struct StoreRecordMessage *srm; | 148 | struct StoreRecordMessage *srm; |
149 | 149 | ||
150 | srm = PEERSTORE_create_record_message(record->sub_system, | 150 | srm = PEERSTORE_create_record_message(record->sub_system, |
@@ -154,7 +154,7 @@ int record_iterator(void *cls, | |||
154 | record->value_size, | 154 | record->value_size, |
155 | record->expiry, | 155 | record->expiry, |
156 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD); | 156 | GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD); |
157 | GNUNET_SERVER_transmit_context_append_message(tc, (const struct GNUNET_MessageHeader *)srm); | 157 | GNUNET_SERVER_notification_context_unicast(nc, client, (struct GNUNET_MessageHeader *)srm, GNUNET_NO); |
158 | return GNUNET_YES; | 158 | return GNUNET_YES; |
159 | } | 159 | } |
160 | 160 | ||
@@ -189,19 +189,17 @@ int watch_notifier_it(void *cls, | |||
189 | record->expiry, | 189 | record->expiry, |
190 | GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD); | 190 | GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD); |
191 | GNUNET_SERVER_notification_context_unicast(nc, client, | 191 | GNUNET_SERVER_notification_context_unicast(nc, client, |
192 | (const struct GNUNET_MessageHeader *)srm, GNUNET_YES); | 192 | (const struct GNUNET_MessageHeader *)srm, GNUNET_NO); |
193 | return GNUNET_YES; | 193 | return GNUNET_YES; |
194 | } | 194 | } |
195 | 195 | ||
196 | /** | 196 | /** |
197 | * Given a new record, notifies watchers | 197 | * Given a new record, notifies watchers |
198 | * | 198 | * |
199 | * @cls closure, a 'struct GNUNET_PEERSTORE_Record *' | 199 | * @param record changed record to update watchers with |
200 | * @tc unused | ||
201 | */ | 200 | */ |
202 | void watch_notifier (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | 201 | void watch_notifier (struct GNUNET_PEERSTORE_Record *record) |
203 | { | 202 | { |
204 | struct GNUNET_PEERSTORE_Record *record = cls; | ||
205 | struct GNUNET_HashCode keyhash; | 203 | struct GNUNET_HashCode keyhash; |
206 | 204 | ||
207 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Sending update to any watchers.\n"); | 205 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Sending update to any watchers.\n"); |
@@ -265,7 +263,7 @@ void handle_iterate (void *cls, | |||
265 | const struct GNUNET_MessageHeader *message) | 263 | const struct GNUNET_MessageHeader *message) |
266 | { | 264 | { |
267 | struct GNUNET_PEERSTORE_Record *record; | 265 | struct GNUNET_PEERSTORE_Record *record; |
268 | struct GNUNET_SERVER_TransmitContext *tc; | 266 | struct GNUNET_MessageHeader *endmsg; |
269 | 267 | ||
270 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received an iterate request from client.\n"); | 268 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received an iterate request from client.\n"); |
271 | record = PEERSTORE_parse_record_message(message); | 269 | record = PEERSTORE_parse_record_message(message); |
@@ -281,20 +279,21 @@ void handle_iterate (void *cls, | |||
281 | GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); | 279 | GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); |
282 | return; | 280 | return; |
283 | } | 281 | } |
284 | tc = GNUNET_SERVER_transmit_context_create (client); | 282 | GNUNET_SERVER_notification_context_add(nc, client); |
285 | if(GNUNET_OK == db->iterate_records(db->cls, | 283 | if(GNUNET_OK == db->iterate_records(db->cls, |
286 | record->sub_system, | 284 | record->sub_system, |
287 | record->peer, | 285 | record->peer, |
288 | record->key, | 286 | record->key, |
289 | &record_iterator, | 287 | &record_iterator, |
290 | tc)) | 288 | client)) |
291 | { | 289 | { |
292 | GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END); | 290 | endmsg = GNUNET_new(struct GNUNET_MessageHeader); |
293 | GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); | 291 | endmsg->size = htons(sizeof(struct GNUNET_MessageHeader)); |
292 | endmsg->type = htons(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END); | ||
293 | GNUNET_SERVER_notification_context_unicast(nc, client, endmsg, GNUNET_NO); | ||
294 | } | 294 | } |
295 | else | 295 | else |
296 | { | 296 | { |
297 | GNUNET_free(tc); | ||
298 | GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); | 297 | GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); |
299 | } | 298 | } |
300 | GNUNET_free(record); /* FIXME: destroy record */ | 299 | GNUNET_free(record); /* FIXME: destroy record */ |
@@ -312,7 +311,6 @@ void handle_store (void *cls, | |||
312 | const struct GNUNET_MessageHeader *message) | 311 | const struct GNUNET_MessageHeader *message) |
313 | { | 312 | { |
314 | struct GNUNET_PEERSTORE_Record *record; | 313 | struct GNUNET_PEERSTORE_Record *record; |
315 | struct GNUNET_SERVER_TransmitContext *tc; | ||
316 | 314 | ||
317 | record = PEERSTORE_parse_record_message(message); | 315 | record = PEERSTORE_parse_record_message(message); |
318 | if(NULL == record) | 316 | if(NULL == record) |
@@ -348,10 +346,8 @@ void handle_store (void *cls, | |||
348 | GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); | 346 | GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); |
349 | return; | 347 | return; |
350 | } | 348 | } |
351 | tc = GNUNET_SERVER_transmit_context_create (client); | 349 | GNUNET_SERVER_receive_done(client, GNUNET_OK); |
352 | GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK); | 350 | watch_notifier(record); |
353 | GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); | ||
354 | GNUNET_SCHEDULER_add_continuation(&watch_notifier, record, -1); | ||
355 | } | 351 | } |
356 | 352 | ||
357 | /** | 353 | /** |
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c index 8748625b7..62b4c3705 100644 --- a/src/peerstore/peerstore_api.c +++ b/src/peerstore/peerstore_api.c | |||
@@ -117,12 +117,6 @@ struct GNUNET_PEERSTORE_StoreContext | |||
117 | */ | 117 | */ |
118 | void *cont_cls; | 118 | void *cont_cls; |
119 | 119 | ||
120 | /** | ||
121 | * #GNUNET_YES / #GNUNET_NO | ||
122 | * if sent, cannot be canceled | ||
123 | */ | ||
124 | int request_sent; | ||
125 | |||
126 | }; | 120 | }; |
127 | 121 | ||
128 | /** | 122 | /** |
@@ -227,14 +221,6 @@ struct GNUNET_PEERSTORE_WatchContext | |||
227 | /******************************************************************************/ | 221 | /******************************************************************************/ |
228 | 222 | ||
229 | /** | 223 | /** |
230 | * When a response for store request is received | ||
231 | * | ||
232 | * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *' | ||
233 | * @param msg message received, NULL on timeout or fatal error | ||
234 | */ | ||
235 | void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg); | ||
236 | |||
237 | /** | ||
238 | * When a response for iterate request is received | 224 | * When a response for iterate request is received |
239 | * | 225 | * |
240 | * @param cls a 'struct GNUNET_PEERSTORE_Handle *' | 226 | * @param cls a 'struct GNUNET_PEERSTORE_Handle *' |
@@ -262,7 +248,6 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h); | |||
262 | * MQ message handlers | 248 | * MQ message handlers |
263 | */ | 249 | */ |
264 | static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { | 250 | static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { |
265 | {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK, sizeof(struct GNUNET_MessageHeader)}, | ||
266 | {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0}, | 251 | {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0}, |
267 | {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, sizeof(struct GNUNET_MessageHeader)}, | 252 | {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, sizeof(struct GNUNET_MessageHeader)}, |
268 | {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0}, | 253 | {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0}, |
@@ -376,42 +361,6 @@ GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) | |||
376 | /******************************************************************************/ | 361 | /******************************************************************************/ |
377 | 362 | ||
378 | /** | 363 | /** |
379 | * When a response for store request is received | ||
380 | * | ||
381 | * @param cls a 'struct GNUNET_PEERSTORE_Handle *' | ||
382 | * @param msg message received, NULL on timeout or fatal error | ||
383 | */ | ||
384 | void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg) | ||
385 | { | ||
386 | struct GNUNET_PEERSTORE_Handle *h = cls; | ||
387 | struct GNUNET_PEERSTORE_StoreContext *sc; | ||
388 | GNUNET_PEERSTORE_Continuation cont; | ||
389 | void *cont_cls; | ||
390 | |||
391 | sc = h->store_head; | ||
392 | if(NULL == sc) | ||
393 | { | ||
394 | LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected store response, this should not happen.\n"); | ||
395 | reconnect(h); | ||
396 | return; | ||
397 | } | ||
398 | cont = sc->cont; | ||
399 | cont_cls = sc->cont_cls; | ||
400 | GNUNET_CONTAINER_DLL_remove(h->store_head, h->store_tail, sc); | ||
401 | GNUNET_free(sc); | ||
402 | if(NULL == msg) /* Connection error */ | ||
403 | { | ||
404 | if(NULL != cont) | ||
405 | cont(cont_cls, GNUNET_SYSERR); | ||
406 | reconnect(h); | ||
407 | return; | ||
408 | } | ||
409 | if(NULL != cont) /* Run continuation */ | ||
410 | cont(cont_cls, GNUNET_OK); | ||
411 | |||
412 | } | ||
413 | |||
414 | /** | ||
415 | * Callback after MQ envelope is sent | 364 | * Callback after MQ envelope is sent |
416 | * | 365 | * |
417 | * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *' | 366 | * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *' |
@@ -419,9 +368,15 @@ void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg) | |||
419 | void store_request_sent (void *cls) | 368 | void store_request_sent (void *cls) |
420 | { | 369 | { |
421 | struct GNUNET_PEERSTORE_StoreContext *sc = cls; | 370 | struct GNUNET_PEERSTORE_StoreContext *sc = cls; |
371 | GNUNET_PEERSTORE_Continuation cont; | ||
372 | void *cont_cls; | ||
422 | 373 | ||
423 | sc->request_sent = GNUNET_YES; | ||
424 | sc->ev = NULL; | 374 | sc->ev = NULL; |
375 | cont = sc->cont; | ||
376 | cont_cls = sc->cont_cls; | ||
377 | GNUNET_PEERSTORE_store_cancel(sc); | ||
378 | if(NULL != cont) | ||
379 | cont(cont_cls, GNUNET_OK); | ||
425 | } | 380 | } |
426 | 381 | ||
427 | /** | 382 | /** |
@@ -434,21 +389,13 @@ GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc) | |||
434 | { | 389 | { |
435 | LOG(GNUNET_ERROR_TYPE_DEBUG, | 390 | LOG(GNUNET_ERROR_TYPE_DEBUG, |
436 | "Canceling store request.\n"); | 391 | "Canceling store request.\n"); |
437 | if(GNUNET_NO == sc->request_sent) | 392 | if(NULL != sc->ev) |
438 | { | 393 | { |
439 | if(NULL != sc->ev) | 394 | GNUNET_MQ_send_cancel(sc->ev); |
440 | { | 395 | sc->ev = NULL; |
441 | GNUNET_MQ_send_cancel(sc->ev); | ||
442 | sc->ev = NULL; | ||
443 | } | ||
444 | GNUNET_CONTAINER_DLL_remove(sc->h->store_head, sc->h->store_tail, sc); | ||
445 | GNUNET_free(sc); | ||
446 | } | ||
447 | else | ||
448 | { /* request already sent, will have to wait for response */ | ||
449 | sc->cont = NULL; | ||
450 | } | 396 | } |
451 | 397 | GNUNET_CONTAINER_DLL_remove(sc->h->store_head, sc->h->store_tail, sc); | |
398 | GNUNET_free(sc); | ||
452 | } | 399 | } |
453 | 400 | ||
454 | /** | 401 | /** |
@@ -493,7 +440,6 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h, | |||
493 | sc->cont = cont; | 440 | sc->cont = cont; |
494 | sc->cont_cls = cont_cls; | 441 | sc->cont_cls = cont_cls; |
495 | sc->h = h; | 442 | sc->h = h; |
496 | sc->request_sent = GNUNET_NO; | ||
497 | GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc); | 443 | GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc); |
498 | GNUNET_MQ_notify_sent(ev, &store_request_sent, sc); | 444 | GNUNET_MQ_notify_sent(ev, &store_request_sent, sc); |
499 | GNUNET_MQ_send(h->mq, ev); | 445 | GNUNET_MQ_send(h->mq, ev); |
diff --git a/src/peerstore/test_peerstore_api.c b/src/peerstore/test_peerstore_api.c index 02c8815c5..968467231 100644 --- a/src/peerstore/test_peerstore_api.c +++ b/src/peerstore/test_peerstore_api.c | |||
@@ -129,17 +129,6 @@ run (void *cls, | |||
129 | NULL); | 129 | NULL); |
130 | } | 130 | } |
131 | 131 | ||
132 | int iterator (void *cls, const struct GNUNET_HashCode *key, void *value) | ||
133 | { | ||
134 | struct GNUNET_CONTAINER_MultiHashMap *map = cls; | ||
135 | uint32_t *x = value; | ||
136 | |||
137 | printf("Received value: %d\n", *x); | ||
138 | if(*x == 2) | ||
139 | GNUNET_CONTAINER_multihashmap_remove(map, key, value); | ||
140 | return GNUNET_YES; | ||
141 | } | ||
142 | |||
143 | int | 132 | int |
144 | main (int argc, char *argv[]) | 133 | main (int argc, char *argv[]) |
145 | { | 134 | { |