aboutsummaryrefslogtreecommitdiff
path: root/src/peerstore/peerstore_api.c
diff options
context:
space:
mode:
authorOmar Tarabai <tarabai@devegypt.com>2014-05-16 16:45:43 +0000
committerOmar Tarabai <tarabai@devegypt.com>2014-05-16 16:45:43 +0000
commit3c9e8b1b0f7f83f27fefb02bd67b481c67cad0c8 (patch)
tree525ae2f8cec03f298783aa693d465f789171cf51 /src/peerstore/peerstore_api.c
parentaeaf5c97d7d115d99f30e86be66c622b7a6ebf4f (diff)
downloadgnunet-3c9e8b1b0f7f83f27fefb02bd67b481c67cad0c8.tar.gz
gnunet-3c9e8b1b0f7f83f27fefb02bd67b481c67cad0c8.zip
peerstore API now uses MQ
Diffstat (limited to 'src/peerstore/peerstore_api.c')
-rw-r--r--src/peerstore/peerstore_api.c178
1 files changed, 143 insertions, 35 deletions
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c
index aa798e653..323ba45d0 100644
--- a/src/peerstore/peerstore_api.c
+++ b/src/peerstore/peerstore_api.c
@@ -50,6 +50,21 @@ struct GNUNET_PEERSTORE_Handle
50 */ 50 */
51 struct GNUNET_CLIENT_Connection *client; 51 struct GNUNET_CLIENT_Connection *client;
52 52
53 /**
54 * Message queue
55 */
56 struct GNUNET_MQ_Handle *mq;
57
58 /**
59 * Head of active STORE requests.
60 */
61 struct GNUNET_PEERSTORE_StoreContext *store_head;
62
63 /**
64 * Tail of active STORE requests.
65 */
66 struct GNUNET_PEERSTORE_StoreContext *store_tail;
67
53}; 68};
54 69
55/** 70/**
@@ -57,6 +72,15 @@ struct GNUNET_PEERSTORE_Handle
57 */ 72 */
58struct GNUNET_PEERSTORE_StoreContext 73struct GNUNET_PEERSTORE_StoreContext
59{ 74{
75 /**
76 * Kept in a DLL.
77 */
78 struct GNUNET_PEERSTORE_StoreContext *next;
79
80 /**
81 * Kept in a DLL.
82 */
83 struct GNUNET_PEERSTORE_StoreContext *prev;
60 84
61 /** 85 /**
62 * Handle to the PEERSTORE service. 86 * Handle to the PEERSTORE service.
@@ -64,6 +88,11 @@ struct GNUNET_PEERSTORE_StoreContext
64 struct GNUNET_PEERSTORE_Handle *h; 88 struct GNUNET_PEERSTORE_Handle *h;
65 89
66 /** 90 /**
91 * MQ Envelope with store request message
92 */
93 struct GNUNET_MQ_Envelope *ev;
94
95 /**
67 * Continuation called with service response 96 * Continuation called with service response
68 */ 97 */
69 GNUNET_PEERSTORE_Continuation cont; 98 GNUNET_PEERSTORE_Continuation cont;
@@ -73,9 +102,27 @@ struct GNUNET_PEERSTORE_StoreContext
73 */ 102 */
74 void *cont_cls; 103 void *cont_cls;
75 104
105 /**
106 * #GNUNET_YES / #GNUNET_NO
107 * if sent, cannot be canceled
108 */
109 int request_sent;
110
76}; 111};
77 112
78/******************************************************************************/ 113/******************************************************************************/
114/******************* DECLARATIONS *********************/
115/******************************************************************************/
116
117/**
118 * When a response for store request is received
119 *
120 * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
121 * @param msg message received, NULL on timeout or fatal error
122 */
123void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg);
124
125/******************************************************************************/
79/******************* CONNECTION FUNCTIONS *********************/ 126/******************* CONNECTION FUNCTIONS *********************/
80/******************************************************************************/ 127/******************************************************************************/
81 128
@@ -85,7 +132,7 @@ struct GNUNET_PEERSTORE_StoreContext
85 * @param h handle to the service 132 * @param h handle to the service
86 */ 133 */
87static void 134static void
88reconnect (struct GNUNET_PEERSTORE_Handle *h) 135reconnect (struct GNUNET_PEERSTORE_Handle *h) //FIXME: MQ friendly
89{ 136{
90 137
91 LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n"); 138 LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
@@ -98,6 +145,12 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h)
98 145
99} 146}
100 147
148static void
149handle_client_error (void *cls, enum GNUNET_MQ_Error error) //FIXME: implement
150{
151 //struct GNUNET_PEERSTORE_Handle *h = cls;
152}
153
101/** 154/**
102 * Connect to the PEERSTORE service. 155 * Connect to the PEERSTORE service.
103 * 156 *
@@ -106,15 +159,30 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h)
106struct GNUNET_PEERSTORE_Handle * 159struct GNUNET_PEERSTORE_Handle *
107GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) 160GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
108{ 161{
109 struct GNUNET_CLIENT_Connection *client;
110 struct GNUNET_PEERSTORE_Handle *h; 162 struct GNUNET_PEERSTORE_Handle *h;
163 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
164 {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK, sizeof(struct GNUNET_MessageHeader)},
165 {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL, sizeof(struct GNUNET_MessageHeader)},
166 GNUNET_MQ_HANDLERS_END
167 };
111 168
112 client = GNUNET_CLIENT_connect ("peerstore", cfg);
113 if(NULL == client)
114 return NULL;
115 h = GNUNET_new (struct GNUNET_PEERSTORE_Handle); 169 h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
116 h->client = client; 170 h->client = GNUNET_CLIENT_connect ("peerstore", cfg);
171 if(NULL == h->client)
172 {
173 GNUNET_free(h);
174 return NULL;
175 }
117 h->cfg = cfg; 176 h->cfg = cfg;
177 h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
178 mq_handlers,
179 &handle_client_error,
180 h);
181 if(NULL == h->mq)
182 {
183 GNUNET_free(h);
184 return NULL;
185 }
118 LOG(GNUNET_ERROR_TYPE_DEBUG, "New connection created\n"); 186 LOG(GNUNET_ERROR_TYPE_DEBUG, "New connection created\n");
119 return h; 187 return h;
120} 188}
@@ -128,6 +196,11 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
128void 196void
129GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) 197GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
130{ 198{
199 if(NULL != h->mq)
200 {
201 GNUNET_MQ_destroy(h->mq);
202 h->mq = NULL;
203 }
131 if (NULL != h->client) 204 if (NULL != h->client)
132 { 205 {
133 GNUNET_CLIENT_disconnect (h->client); 206 GNUNET_CLIENT_disconnect (h->client);
@@ -139,7 +212,7 @@ GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
139 212
140 213
141/******************************************************************************/ 214/******************************************************************************/
142/******************* ADD FUNCTIONS *********************/ 215/******************* STORE FUNCTIONS *********************/
143/******************************************************************************/ 216/******************************************************************************/
144 217
145/** 218/**
@@ -148,41 +221,77 @@ GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
148 * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *' 221 * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
149 * @param msg message received, NULL on timeout or fatal error 222 * @param msg message received, NULL on timeout or fatal error
150 */ 223 */
151void store_response_receiver (void *cls, const struct GNUNET_MessageHeader *msg) 224void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg) //FIXME: MQ friendly
152{ 225{
153 struct GNUNET_PEERSTORE_StoreContext *sc = cls; 226 struct GNUNET_PEERSTORE_Handle *h = cls;
227 struct GNUNET_PEERSTORE_StoreContext *sc;
154 uint16_t msg_type; 228 uint16_t msg_type;
229 GNUNET_PEERSTORE_Continuation cont;
230 void *cont_cls;
155 231
156 if(NULL == sc->cont) 232 sc = h->store_head;
233 if(NULL == sc)
234 {
235 GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Unexpected store response, this should not happen.\n");
236 reconnect(h);
157 return; 237 return;
158 if(NULL == msg) 238 }
239 cont = sc->cont;
240 cont_cls = sc->cont_cls;
241 GNUNET_CONTAINER_DLL_remove(h->store_head, h->store_tail, sc);
242 GNUNET_free(sc);
243 if(NULL == msg) /* Connection error */
159 { 244 {
160 sc->cont(sc->cont_cls, GNUNET_SYSERR); 245 if(NULL != cont)
161 reconnect(sc->h); 246 cont(cont_cls, GNUNET_SYSERR);
247 reconnect(h);
162 return; 248 return;
163 } 249 }
164 msg_type = ntohs(msg->type); 250 if(NULL != cont) /* Run continuation */
165 if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK == msg_type)
166 sc->cont(sc->cont_cls, GNUNET_OK);
167 else if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL == msg_type)
168 sc->cont(sc->cont_cls, GNUNET_SYSERR);
169 else
170 { 251 {
171 LOG(GNUNET_ERROR_TYPE_ERROR, "Invalid response from `PEERSTORE' service.\n"); 252 msg_type = ntohs(msg->type);
172 sc->cont(sc->cont_cls, GNUNET_SYSERR); 253 if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK == msg_type)
254 cont(cont_cls, GNUNET_OK);
255 else if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL == msg_type)
256 cont(cont_cls, GNUNET_SYSERR);
173 } 257 }
174 258
175} 259}
176 260
177/** 261/**
262 * Callback after MQ envelope is sent
263 *
264 * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
265 */
266void store_request_sent (void *cls)
267{
268 struct GNUNET_PEERSTORE_StoreContext *sc = cls;
269
270 sc->request_sent = GNUNET_YES;
271}
272
273/**
178 * Cancel a store request 274 * Cancel a store request
179 * 275 *
180 * @param sc Store request context 276 * @param sc Store request context
181 */ 277 */
182void 278void
183GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc) 279GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc) //FIXME: MQ friendly
184{ 280{
185 sc->cont = NULL; 281 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
282 "Canceling store request.\n");
283 if(GNUNET_NO == sc->request_sent)
284 {
285 if(NULL != sc->ev)
286 GNUNET_MQ_discard(sc->ev);
287 GNUNET_CONTAINER_DLL_remove(sc->h->store_head, sc->h->store_tail, sc);
288 GNUNET_free(sc);
289 }
290 else
291 { /* request already sent, will have to wait for response */
292 sc->cont = NULL;
293 }
294
186} 295}
187 296
188/** 297/**
@@ -209,29 +318,28 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
209 GNUNET_PEERSTORE_Continuation cont, 318 GNUNET_PEERSTORE_Continuation cont,
210 void *cont_cls) 319 void *cont_cls)
211{ 320{
321 struct GNUNET_MQ_Envelope *ev;
212 struct GNUNET_PEERSTORE_StoreContext *sc; 322 struct GNUNET_PEERSTORE_StoreContext *sc;
213 struct StoreRecordMessage *srm;
214 323
215 LOG (GNUNET_ERROR_TYPE_DEBUG, 324 LOG (GNUNET_ERROR_TYPE_DEBUG,
216 "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n", 325 "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
217 size, sub_system, GNUNET_i2s (peer), key); 326 size, sub_system, GNUNET_i2s (peer), key);
218 sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext); 327 ev = PEERSTORE_create_record_mq_envelope(sub_system,
219 sc->cont = cont;
220 sc->cont_cls = cont_cls;
221 sc->h = h;
222 srm = PEERSTORE_create_record_message(sub_system,
223 peer, 328 peer,
224 key, 329 key,
225 value, 330 value,
226 size, 331 size,
227 expiry, 332 expiry,
228 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); 333 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
229 GNUNET_CLIENT_transmit_and_get_response(h->client, 334 GNUNET_MQ_send(h->mq, ev);
230 (const struct GNUNET_MessageHeader *)srm, 335 GNUNET_MQ_notify_sent(ev, &store_request_sent, ev);
231 GNUNET_TIME_UNIT_FOREVER_REL, 336 sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext);
232 GNUNET_YES, 337 sc->ev = ev;
233 &store_response_receiver, 338 sc->cont = cont;
234 sc); 339 sc->cont_cls = cont_cls;
340 sc->h = h;
341 sc->request_sent = GNUNET_NO;
342 GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc);
235 return sc; 343 return sc;
236 344
237} 345}