aboutsummaryrefslogtreecommitdiff
path: root/src/peerstore
diff options
context:
space:
mode:
authorOmar Tarabai <tarabai@devegypt.com>2014-06-03 13:50:37 +0000
committerOmar Tarabai <tarabai@devegypt.com>2014-06-03 13:50:37 +0000
commita04d70028c0615e7910f68328db9c61bba67128b (patch)
tree8d27cc7e8ad6b25cf01ac2cbae6f2839deec00f8 /src/peerstore
parenteb985bf2eae6054292e3cbffbfeb4aaea3e394d5 (diff)
downloadgnunet-a04d70028c0615e7910f68328db9c61bba67128b.tar.gz
gnunet-a04d70028c0615e7910f68328db9c61bba67128b.zip
peerstore: fixes in watch functionality
Diffstat (limited to 'src/peerstore')
-rw-r--r--src/peerstore/gnunet-service-peerstore.c32
-rw-r--r--src/peerstore/peerstore_api.c78
-rw-r--r--src/peerstore/test_peerstore_api.c11
3 files changed, 26 insertions, 95 deletions
diff --git a/src/peerstore/gnunet-service-peerstore.c b/src/peerstore/gnunet-service-peerstore.c
index 706fcaaae..82961f685 100644
--- a/src/peerstore/gnunet-service-peerstore.c
+++ b/src/peerstore/gnunet-service-peerstore.c
@@ -144,7 +144,7 @@ int record_iterator(void *cls,
144 struct GNUNET_PEERSTORE_Record *record, 144 struct GNUNET_PEERSTORE_Record *record,
145 char *emsg) 145 char *emsg)
146{ 146{
147 struct GNUNET_SERVER_TransmitContext *tc = cls; 147 struct GNUNET_SERVER_Client *client = cls;
148 struct StoreRecordMessage *srm; 148 struct StoreRecordMessage *srm;
149 149
150 srm = PEERSTORE_create_record_message(record->sub_system, 150 srm = PEERSTORE_create_record_message(record->sub_system,
@@ -154,7 +154,7 @@ int record_iterator(void *cls,
154 record->value_size, 154 record->value_size,
155 record->expiry, 155 record->expiry,
156 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD); 156 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD);
157 GNUNET_SERVER_transmit_context_append_message(tc, (const struct GNUNET_MessageHeader *)srm); 157 GNUNET_SERVER_notification_context_unicast(nc, client, (struct GNUNET_MessageHeader *)srm, GNUNET_NO);
158 return GNUNET_YES; 158 return GNUNET_YES;
159} 159}
160 160
@@ -189,19 +189,17 @@ int watch_notifier_it(void *cls,
189 record->expiry, 189 record->expiry,
190 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD); 190 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD);
191 GNUNET_SERVER_notification_context_unicast(nc, client, 191 GNUNET_SERVER_notification_context_unicast(nc, client,
192 (const struct GNUNET_MessageHeader *)srm, GNUNET_YES); 192 (const struct GNUNET_MessageHeader *)srm, GNUNET_NO);
193 return GNUNET_YES; 193 return GNUNET_YES;
194} 194}
195 195
196/** 196/**
197 * Given a new record, notifies watchers 197 * Given a new record, notifies watchers
198 * 198 *
199 * @cls closure, a 'struct GNUNET_PEERSTORE_Record *' 199 * @param record changed record to update watchers with
200 * @tc unused
201 */ 200 */
202void watch_notifier (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 201void watch_notifier (struct GNUNET_PEERSTORE_Record *record)
203{ 202{
204 struct GNUNET_PEERSTORE_Record *record = cls;
205 struct GNUNET_HashCode keyhash; 203 struct GNUNET_HashCode keyhash;
206 204
207 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Sending update to any watchers.\n"); 205 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Sending update to any watchers.\n");
@@ -265,7 +263,7 @@ void handle_iterate (void *cls,
265 const struct GNUNET_MessageHeader *message) 263 const struct GNUNET_MessageHeader *message)
266{ 264{
267 struct GNUNET_PEERSTORE_Record *record; 265 struct GNUNET_PEERSTORE_Record *record;
268 struct GNUNET_SERVER_TransmitContext *tc; 266 struct GNUNET_MessageHeader *endmsg;
269 267
270 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received an iterate request from client.\n"); 268 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received an iterate request from client.\n");
271 record = PEERSTORE_parse_record_message(message); 269 record = PEERSTORE_parse_record_message(message);
@@ -281,20 +279,21 @@ void handle_iterate (void *cls,
281 GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); 279 GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
282 return; 280 return;
283 } 281 }
284 tc = GNUNET_SERVER_transmit_context_create (client); 282 GNUNET_SERVER_notification_context_add(nc, client);
285 if(GNUNET_OK == db->iterate_records(db->cls, 283 if(GNUNET_OK == db->iterate_records(db->cls,
286 record->sub_system, 284 record->sub_system,
287 record->peer, 285 record->peer,
288 record->key, 286 record->key,
289 &record_iterator, 287 &record_iterator,
290 tc)) 288 client))
291 { 289 {
292 GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END); 290 endmsg = GNUNET_new(struct GNUNET_MessageHeader);
293 GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); 291 endmsg->size = htons(sizeof(struct GNUNET_MessageHeader));
292 endmsg->type = htons(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
293 GNUNET_SERVER_notification_context_unicast(nc, client, endmsg, GNUNET_NO);
294 } 294 }
295 else 295 else
296 { 296 {
297 GNUNET_free(tc);
298 GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); 297 GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
299 } 298 }
300 GNUNET_free(record); /* FIXME: destroy record */ 299 GNUNET_free(record); /* FIXME: destroy record */
@@ -312,7 +311,6 @@ void handle_store (void *cls,
312 const struct GNUNET_MessageHeader *message) 311 const struct GNUNET_MessageHeader *message)
313{ 312{
314 struct GNUNET_PEERSTORE_Record *record; 313 struct GNUNET_PEERSTORE_Record *record;
315 struct GNUNET_SERVER_TransmitContext *tc;
316 314
317 record = PEERSTORE_parse_record_message(message); 315 record = PEERSTORE_parse_record_message(message);
318 if(NULL == record) 316 if(NULL == record)
@@ -348,10 +346,8 @@ void handle_store (void *cls,
348 GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); 346 GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
349 return; 347 return;
350 } 348 }
351 tc = GNUNET_SERVER_transmit_context_create (client); 349 GNUNET_SERVER_receive_done(client, GNUNET_OK);
352 GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK); 350 watch_notifier(record);
353 GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
354 GNUNET_SCHEDULER_add_continuation(&watch_notifier, record, -1);
355} 351}
356 352
357/** 353/**
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c
index 8748625b7..62b4c3705 100644
--- a/src/peerstore/peerstore_api.c
+++ b/src/peerstore/peerstore_api.c
@@ -117,12 +117,6 @@ struct GNUNET_PEERSTORE_StoreContext
117 */ 117 */
118 void *cont_cls; 118 void *cont_cls;
119 119
120 /**
121 * #GNUNET_YES / #GNUNET_NO
122 * if sent, cannot be canceled
123 */
124 int request_sent;
125
126}; 120};
127 121
128/** 122/**
@@ -227,14 +221,6 @@ struct GNUNET_PEERSTORE_WatchContext
227/******************************************************************************/ 221/******************************************************************************/
228 222
229/** 223/**
230 * When a response for store request is received
231 *
232 * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
233 * @param msg message received, NULL on timeout or fatal error
234 */
235void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg);
236
237/**
238 * When a response for iterate request is received 224 * When a response for iterate request is received
239 * 225 *
240 * @param cls a 'struct GNUNET_PEERSTORE_Handle *' 226 * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
@@ -262,7 +248,6 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h);
262 * MQ message handlers 248 * MQ message handlers
263 */ 249 */
264static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { 250static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
265 {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK, sizeof(struct GNUNET_MessageHeader)},
266 {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0}, 251 {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
267 {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, sizeof(struct GNUNET_MessageHeader)}, 252 {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, sizeof(struct GNUNET_MessageHeader)},
268 {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0}, 253 {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
@@ -376,42 +361,6 @@ GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
376/******************************************************************************/ 361/******************************************************************************/
377 362
378/** 363/**
379 * When a response for store request is received
380 *
381 * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
382 * @param msg message received, NULL on timeout or fatal error
383 */
384void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg)
385{
386 struct GNUNET_PEERSTORE_Handle *h = cls;
387 struct GNUNET_PEERSTORE_StoreContext *sc;
388 GNUNET_PEERSTORE_Continuation cont;
389 void *cont_cls;
390
391 sc = h->store_head;
392 if(NULL == sc)
393 {
394 LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected store response, this should not happen.\n");
395 reconnect(h);
396 return;
397 }
398 cont = sc->cont;
399 cont_cls = sc->cont_cls;
400 GNUNET_CONTAINER_DLL_remove(h->store_head, h->store_tail, sc);
401 GNUNET_free(sc);
402 if(NULL == msg) /* Connection error */
403 {
404 if(NULL != cont)
405 cont(cont_cls, GNUNET_SYSERR);
406 reconnect(h);
407 return;
408 }
409 if(NULL != cont) /* Run continuation */
410 cont(cont_cls, GNUNET_OK);
411
412}
413
414/**
415 * Callback after MQ envelope is sent 364 * Callback after MQ envelope is sent
416 * 365 *
417 * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *' 366 * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
@@ -419,9 +368,15 @@ void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg)
419void store_request_sent (void *cls) 368void store_request_sent (void *cls)
420{ 369{
421 struct GNUNET_PEERSTORE_StoreContext *sc = cls; 370 struct GNUNET_PEERSTORE_StoreContext *sc = cls;
371 GNUNET_PEERSTORE_Continuation cont;
372 void *cont_cls;
422 373
423 sc->request_sent = GNUNET_YES;
424 sc->ev = NULL; 374 sc->ev = NULL;
375 cont = sc->cont;
376 cont_cls = sc->cont_cls;
377 GNUNET_PEERSTORE_store_cancel(sc);
378 if(NULL != cont)
379 cont(cont_cls, GNUNET_OK);
425} 380}
426 381
427/** 382/**
@@ -434,21 +389,13 @@ GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
434{ 389{
435 LOG(GNUNET_ERROR_TYPE_DEBUG, 390 LOG(GNUNET_ERROR_TYPE_DEBUG,
436 "Canceling store request.\n"); 391 "Canceling store request.\n");
437 if(GNUNET_NO == sc->request_sent) 392 if(NULL != sc->ev)
438 { 393 {
439 if(NULL != sc->ev) 394 GNUNET_MQ_send_cancel(sc->ev);
440 { 395 sc->ev = NULL;
441 GNUNET_MQ_send_cancel(sc->ev);
442 sc->ev = NULL;
443 }
444 GNUNET_CONTAINER_DLL_remove(sc->h->store_head, sc->h->store_tail, sc);
445 GNUNET_free(sc);
446 }
447 else
448 { /* request already sent, will have to wait for response */
449 sc->cont = NULL;
450 } 396 }
451 397 GNUNET_CONTAINER_DLL_remove(sc->h->store_head, sc->h->store_tail, sc);
398 GNUNET_free(sc);
452} 399}
453 400
454/** 401/**
@@ -493,7 +440,6 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
493 sc->cont = cont; 440 sc->cont = cont;
494 sc->cont_cls = cont_cls; 441 sc->cont_cls = cont_cls;
495 sc->h = h; 442 sc->h = h;
496 sc->request_sent = GNUNET_NO;
497 GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc); 443 GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc);
498 GNUNET_MQ_notify_sent(ev, &store_request_sent, sc); 444 GNUNET_MQ_notify_sent(ev, &store_request_sent, sc);
499 GNUNET_MQ_send(h->mq, ev); 445 GNUNET_MQ_send(h->mq, ev);
diff --git a/src/peerstore/test_peerstore_api.c b/src/peerstore/test_peerstore_api.c
index 02c8815c5..968467231 100644
--- a/src/peerstore/test_peerstore_api.c
+++ b/src/peerstore/test_peerstore_api.c
@@ -129,17 +129,6 @@ run (void *cls,
129 NULL); 129 NULL);
130} 130}
131 131
132int iterator (void *cls, const struct GNUNET_HashCode *key, void *value)
133{
134 struct GNUNET_CONTAINER_MultiHashMap *map = cls;
135 uint32_t *x = value;
136
137 printf("Received value: %d\n", *x);
138 if(*x == 2)
139 GNUNET_CONTAINER_multihashmap_remove(map, key, value);
140 return GNUNET_YES;
141}
142
143int 132int
144main (int argc, char *argv[]) 133main (int argc, char *argv[])
145{ 134{