aboutsummaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set.c
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2015-08-30 00:44:11 +0000
committerFlorian Dold <florian.dold@gmail.com>2015-08-30 00:44:11 +0000
commit7936cab9bd741a9ff30362c8495daa29f1874b2e (patch)
tree33606962451dcb4b15dabff20f605a5c58f1cc72 /src/set/gnunet-service-set.c
parent38963d1e81332032e0ac774f4f2c6b804c38802a (diff)
downloadgnunet-7936cab9bd741a9ff30362c8495daa29f1874b2e.tar.gz
gnunet-7936cab9bd741a9ff30362c8495daa29f1874b2e.zip
work in progress: fix set bug, implement lazy copy
Diffstat (limited to 'src/set/gnunet-service-set.c')
-rw-r--r--src/set/gnunet-service-set.c414
1 files changed, 367 insertions, 47 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index f2c5c4322..b2ad01d1b 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -72,6 +72,16 @@ struct Listener
72}; 72};
73 73
74 74
75struct LazyCopyRequest
76{
77 struct Set *source_set;
78 uint32_t cookie;
79
80 struct LazyCopyRequest *prev;
81 struct LazyCopyRequest *next;
82};
83
84
75/** 85/**
76 * Configuration of our local peer. 86 * Configuration of our local peer.
77 */ 87 */
@@ -115,6 +125,11 @@ static struct Operation *incoming_head;
115 */ 125 */
116static struct Operation *incoming_tail; 126static struct Operation *incoming_tail;
117 127
128static struct LazyCopyRequest *lazy_copy_head;
129static struct LazyCopyRequest *lazy_copy_tail;
130
131static uint32_t lazy_copy_cookie = 1;
132
118/** 133/**
119 * Counter for allocating unique IDs for clients, used to identify 134 * Counter for allocating unique IDs for clients, used to identify
120 * incoming operation requests from remote peers, that the client can 135 * incoming operation requests from remote peers, that the client can
@@ -253,20 +268,20 @@ garbage_collect_cb (void *cls,
253 const struct GNUNET_HashCode *key, 268 const struct GNUNET_HashCode *key,
254 void *value) 269 void *value)
255{ 270{
256 struct GarbageContext *gc = cls; 271 //struct GarbageContext *gc = cls;
257 struct ElementEntry *ee = value; 272 //struct ElementEntry *ee = value;
258 273
259 if (GNUNET_YES != ee->removed) 274 //if (GNUNET_YES != ee->removed)
260 return GNUNET_OK; 275 // return GNUNET_OK;
261 if ( (gc->max_op_generation < ee->generation_added) || 276 //if ( (gc->max_op_generation < ee->generation_added) ||
262 (ee->generation_removed > gc->min_op_generation) ) 277 // (ee->generation_removed > gc->min_op_generation) )
263 { 278 //{
264 GNUNET_assert (GNUNET_YES == 279 // GNUNET_assert (GNUNET_YES ==
265 GNUNET_CONTAINER_multihashmap_remove (gc->map, 280 // GNUNET_CONTAINER_multihashmap_remove (gc->map,
266 key, 281 // key,
267 ee)); 282 // ee));
268 GNUNET_free (ee); 283 // GNUNET_free (ee);
269 } 284 //}
270 return GNUNET_OK; 285 return GNUNET_OK;
271} 286}
272 287
@@ -293,12 +308,91 @@ collect_generation_garbage (struct Set *set)
293 gc.max_op_generation = GNUNET_MAX (gc.max_op_generation, 308 gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
294 op->generation_created); 309 op->generation_created);
295 } 310 }
296 gc.map = set->elements; 311 gc.map = set->content->elements;
297 GNUNET_CONTAINER_multihashmap_iterate (set->elements, 312 GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
298 &garbage_collect_cb, 313 &garbage_collect_cb,
299 &gc); 314 &gc);
300} 315}
301 316
317int
318is_excluded_generation (unsigned int generation,
319 struct GenerationRange *excluded,
320 unsigned int excluded_size)
321{
322 unsigned int i;
323
324 for (i = 0; i < excluded_size; i++)
325 {
326 if ( (generation >= excluded[i].start) && (generation < excluded[i].end) )
327 return GNUNET_YES;
328 }
329
330 return GNUNET_NO;
331}
332
333
334int
335is_element_of_generation (struct ElementEntry *ee,
336 unsigned int query_generation,
337 struct GenerationRange *excluded,
338 unsigned int excluded_size)
339{
340 struct MutationEvent *mut;
341 int is_present;
342
343 if (NULL == ee->mutations)
344 return GNUNET_YES;
345
346 if (GNUNET_YES == is_excluded_generation (query_generation, excluded, excluded_size))
347 {
348 GNUNET_break (0);
349 return GNUNET_NO;
350 }
351
352 is_present = GNUNET_YES;
353
354 // Could be made faster with binary search, but lists
355 // are small, so why bother.
356 for (mut = ee->mutations; 0 != mut->generation; mut++)
357 {
358 if ( (mut->generation > query_generation) ||
359 (GNUNET_YES == is_excluded_generation (mut->generation, excluded, excluded_size)) )
360 {
361 continue;
362 }
363
364 // This would be an inconsistency in how we manage mutations.
365 if ( (GNUNET_YES == is_present) && (GNUNET_YES == mut->added) )
366 GNUNET_assert (0);
367
368 is_present = mut->added;
369 }
370
371 return GNUNET_YES;
372}
373
374
375int
376_GSS_is_element_of_set (struct ElementEntry *ee,
377 struct Set *set)
378{
379 return is_element_of_generation (ee,
380 set->current_generation,
381 set->excluded_generations,
382 set->excluded_generations_size);
383}
384
385
386int
387_GSS_is_element_of_operation (struct ElementEntry *ee,
388 struct Operation *op)
389{
390 return is_element_of_generation (ee,
391 op->generation_created,
392 op->spec->set->excluded_generations,
393 op->spec->set->excluded_generations_size);
394}
395
302 396
303/** 397/**
304 * Destroy the given operation. Call the implementation-specific 398 * Destroy the given operation. Call the implementation-specific
@@ -371,6 +465,8 @@ destroy_elements_iterator (void *cls,
371{ 465{
372 struct ElementEntry *ee = value; 466 struct ElementEntry *ee = value;
373 467
468 GNUNET_free_non_null (ee->mutations);
469
374 GNUNET_free (ee); 470 GNUNET_free (ee);
375 return GNUNET_YES; 471 return GNUNET_YES;
376} 472}
@@ -412,17 +508,31 @@ set_destroy (struct Set *set)
412 set->iter = NULL; 508 set->iter = NULL;
413 set->iteration_id++; 509 set->iteration_id++;
414 } 510 }
415 if (NULL != set->elements)
416 { 511 {
417 GNUNET_CONTAINER_multihashmap_iterate (set->elements, 512 struct SetContent *content;
418 &destroy_elements_iterator, 513
419 NULL); 514 content = set->content;
420 GNUNET_CONTAINER_multihashmap_destroy (set->elements); 515 set->content = NULL;
421 set->elements = NULL; 516 GNUNET_assert (0 != content->refcount);
517 content->refcount -= 1;
518 if (0 == content->refcount)
519 {
520 GNUNET_assert (NULL != content->elements);
521 GNUNET_CONTAINER_multihashmap_iterate (content->elements,
522 &destroy_elements_iterator,
523 NULL);
524 GNUNET_CONTAINER_multihashmap_destroy (content->elements);
525 content->elements = NULL;
526 }
422 } 527 }
528 GNUNET_free_non_null (set->excluded_generations);
529 set->excluded_generations = NULL;
423 GNUNET_CONTAINER_DLL_remove (sets_head, 530 GNUNET_CONTAINER_DLL_remove (sets_head,
424 sets_tail, 531 sets_tail,
425 set); 532 set);
533
534 // FIXME: remove from lazy copy requests
535
426 GNUNET_free (set); 536 GNUNET_free (set);
427} 537}
428 538
@@ -722,10 +832,10 @@ handle_client_iterate (void *cls,
722 } 832 }
723 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 833 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
724 "Iterating set with %u elements\n", 834 "Iterating set with %u elements\n",
725 GNUNET_CONTAINER_multihashmap_size (set->elements)); 835 GNUNET_CONTAINER_multihashmap_size (set->content->elements));
726 GNUNET_SERVER_receive_done (client, 836 GNUNET_SERVER_receive_done (client,
727 GNUNET_OK); 837 GNUNET_OK);
728 set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->elements); 838 set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
729 send_client_element (set); 839 send_client_element (set);
730} 840}
731 841
@@ -773,8 +883,11 @@ handle_client_create_set (void *cls,
773 GNUNET_SERVER_client_disconnect (client); 883 GNUNET_SERVER_client_disconnect (client);
774 return; 884 return;
775 } 885 }
886 set->operation = ntohl (msg->operation);
776 set->state = set->vt->create (); 887 set->state = set->vt->create ();
777 set->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 888 set->content = GNUNET_new (struct SetContent);
889 set->content->refcount = 1;
890 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
778 set->client = client; 891 set->client = client;
779 set->client_mq = GNUNET_MQ_queue_for_server_client (client); 892 set->client_mq = GNUNET_MQ_queue_for_server_client (client);
780 GNUNET_CONTAINER_DLL_insert (sets_head, 893 GNUNET_CONTAINER_DLL_insert (sets_head,
@@ -895,7 +1008,7 @@ handle_client_add (void *cls,
895 const struct GNUNET_SET_ElementMessage *msg; 1008 const struct GNUNET_SET_ElementMessage *msg;
896 struct GNUNET_SET_Element el; 1009 struct GNUNET_SET_Element el;
897 struct ElementEntry *ee; 1010 struct ElementEntry *ee;
898 struct ElementEntry *ee_dup; 1011 struct GNUNET_HashCode hash;
899 1012
900 set = set_get (client); 1013 set = set_get (client);
901 if (NULL == set) 1014 if (NULL == set)
@@ -913,28 +1026,43 @@ handle_client_add (void *cls,
913 "Client inserts element of size %u\n", 1026 "Client inserts element of size %u\n",
914 el.size); 1027 el.size);
915 el.data = &msg[1]; 1028 el.data = &msg[1];
916 ee = GNUNET_malloc (el.size + sizeof *ee); 1029 GNUNET_CRYPTO_hash (el.data,
917 ee->element.size = el.size;
918 memcpy (&ee[1],
919 el.data,
920 el.size);
921 ee->element.data = &ee[1];
922 ee->generation_added = set->current_generation;
923 ee->remote = GNUNET_NO;
924 GNUNET_CRYPTO_hash (ee->element.data,
925 el.size, 1030 el.size,
926 &ee->element_hash); 1031 &hash);
927 ee_dup = GNUNET_CONTAINER_multihashmap_get (set->elements, 1032
928 &ee->element_hash); 1033 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
929 if (NULL != ee_dup) 1034 &hash);
1035
1036 if (NULL == ee)
930 { 1037 {
1038 ee = GNUNET_malloc (el.size + sizeof *ee);
1039 ee->element.size = el.size;
1040 memcpy (&ee[1],
1041 el.data,
1042 el.size);
1043 ee->element.data = &ee[1];
1044 ee->remote = GNUNET_NO;
1045 ee->mutations = NULL;
1046 ee->mutations_size = 0;
1047 ee->element_hash = hash;
1048 } else if (GNUNET_YES == _GSS_is_element_of_set (ee, set)) {
931 /* same element inserted twice */ 1049 /* same element inserted twice */
932 GNUNET_break (0); 1050 GNUNET_break (0);
933 GNUNET_free (ee);
934 return; 1051 return;
935 } 1052 }
1053
1054 if (0 != set->current_generation)
1055 {
1056 struct MutationEvent mut = {
1057 .generation = set->current_generation,
1058 .added = GNUNET_YES
1059 };
1060 GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
1061 ee->mutations_size += 1;
1062 }
1063
936 GNUNET_break (GNUNET_YES == 1064 GNUNET_break (GNUNET_YES ==
937 GNUNET_CONTAINER_multihashmap_put (set->elements, 1065 GNUNET_CONTAINER_multihashmap_put (set->content->elements,
938 &ee->element_hash, 1066 &ee->element_hash,
939 ee, 1067 ee,
940 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 1068 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
@@ -980,7 +1108,7 @@ handle_client_remove (void *cls,
980 GNUNET_CRYPTO_hash (el.data, 1108 GNUNET_CRYPTO_hash (el.data,
981 el.size, 1109 el.size,
982 &hash); 1110 &hash);
983 ee = GNUNET_CONTAINER_multihashmap_get (set->elements, 1111 ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
984 &hash); 1112 &hash);
985 if (NULL == ee) 1113 if (NULL == ee)
986 { 1114 {
@@ -988,18 +1116,65 @@ handle_client_remove (void *cls,
988 GNUNET_break (0); 1116 GNUNET_break (0);
989 return; 1117 return;
990 } 1118 }
991 if (GNUNET_YES == ee->removed) 1119 if (GNUNET_NO == _GSS_is_element_of_set (ee, set))
992 { 1120 {
993 /* Client tried to remove element twice */ 1121 /* Client tried to remove element twice */
994 GNUNET_break (0); 1122 GNUNET_break (0);
995 return; 1123 return;
996 } 1124 }
997 ee->removed = GNUNET_YES; 1125 else if (0 == set->current_generation)
998 ee->generation_removed = set->current_generation; 1126 {
1127 // If current_generation is 0, then there are no running set operations
1128 // or lazy copies, thus we can safely remove the element.
1129 (void) GNUNET_CONTAINER_multihashmap_remove_all (set->content->elements, &hash);
1130 }
1131 else
1132 {
1133 struct MutationEvent mut = {
1134 .generation = set->current_generation,
1135 .added = GNUNET_NO
1136 };
1137 GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
1138 ee->mutations_size += 1;
1139 }
999 set->vt->remove (set->state, ee); 1140 set->vt->remove (set->state, ee);
1000} 1141}
1001 1142
1002 1143
1144
1145/**
1146 * Advance the current generation of a set,
1147 * adding exclusion ranges if necessary.
1148 *
1149 * @param set the set where we want to advance the generation
1150 */
1151static void
1152advance_generation (struct Set *set)
1153{
1154 struct GenerationRange r;
1155
1156 if (set->current_generation == set->content->latest_generation)
1157 {
1158 set->content->latest_generation += 1;
1159 set->current_generation += 1;
1160 return;
1161 }
1162
1163 GNUNET_assert (set->current_generation < set->content->latest_generation);
1164
1165 r.start = set->current_generation + 1;
1166 r.end = set->content->latest_generation + 1;
1167
1168 set->content->latest_generation = r.end;
1169 set->current_generation = r.end;
1170
1171 GNUNET_array_append (set->excluded_generations,
1172 set->excluded_generations_size,
1173 r);
1174
1175 set->excluded_generations_size += 1;
1176}
1177
1003/** 1178/**
1004 * Called when a client wants to initiate a set operation with another 1179 * Called when a client wants to initiate a set operation with another
1005 * peer. Initiates the CADET connection to the listener and sends the 1180 * peer. Initiates the CADET connection to the listener and sends the
@@ -1040,7 +1215,12 @@ handle_client_evaluate (void *cls,
1040 context = GNUNET_MQ_extract_nested_mh (msg); 1215 context = GNUNET_MQ_extract_nested_mh (msg);
1041 op = GNUNET_new (struct Operation); 1216 op = GNUNET_new (struct Operation);
1042 op->spec = spec; 1217 op->spec = spec;
1043 op->generation_created = set->current_generation++; 1218
1219 // Advance generation values, so that
1220 // mutations won't interfer with the running operation.
1221 op->generation_created = set->current_generation;
1222 advance_generation (set);
1223
1044 op->vt = set->vt; 1224 op->vt = set->vt;
1045 GNUNET_CONTAINER_DLL_insert (set->ops_head, 1225 GNUNET_CONTAINER_DLL_insert (set->ops_head,
1046 set->ops_tail, 1226 set->ops_tail,
@@ -1109,6 +1289,135 @@ handle_client_iter_ack (void *cls,
1109 1289
1110/** 1290/**
1111 * Handle a request from the client to 1291 * Handle a request from the client to
1292 * copy a set.
1293 *
1294 * @param cls unused
1295 * @param client the client
1296 * @param mh the message
1297 */
1298static void
1299handle_client_copy_lazy_prepare (void *cls,
1300 struct GNUNET_SERVER_Client *client,
1301 const struct GNUNET_MessageHeader *mh)
1302{
1303 struct Set *set;
1304 struct LazyCopyRequest *cr;
1305
1306 set = set_get (client);
1307 if (NULL == set)
1308 {
1309 /* client without a set requested an operation */
1310 GNUNET_break (0);
1311 GNUNET_SERVER_client_disconnect (client);
1312 return;
1313 }
1314
1315 cr = GNUNET_new (struct LazyCopyRequest);
1316
1317 cr->cookie = lazy_copy_cookie;
1318 lazy_copy_cookie += 1;
1319 cr->source_set = set;
1320
1321 GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
1322 lazy_copy_tail,
1323 cr);
1324 GNUNET_SERVER_receive_done (client,
1325 GNUNET_OK);
1326}
1327
1328
1329/**
1330 * Handle a request from the client to
1331 * connect to a copy of a set.
1332 *
1333 * @param cls unused
1334 * @param client the client
1335 * @param mh the message
1336 */
1337static void
1338handle_client_copy_lazy_connect (void *cls,
1339 struct GNUNET_SERVER_Client *client,
1340 const struct GNUNET_MessageHeader *mh)
1341{
1342 struct LazyCopyRequest *cr;
1343 const struct GNUNET_SET_CopyLazyConnectMessage *msg =
1344 (const struct GNUNET_SET_CopyLazyConnectMessage *) mh;
1345 struct Set *set;
1346 int found;
1347
1348 if (NULL != set_get (client))
1349 {
1350 /* There can only be one set per client */
1351 GNUNET_break (0);
1352 GNUNET_SERVER_client_disconnect (client);
1353 return;
1354 }
1355
1356 found = GNUNET_NO;
1357
1358 for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1359 {
1360 if (cr->cookie == msg->cookie)
1361 {
1362 found = GNUNET_YES;
1363 break;
1364 }
1365 }
1366
1367 if (GNUNET_NO == found)
1368 {
1369 /* client asked for copy with cookie we don't know */
1370 GNUNET_break (0);
1371 GNUNET_SERVER_client_disconnect (client);
1372 return;
1373 }
1374
1375 GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
1376 lazy_copy_tail,
1377 cr);
1378
1379 set = GNUNET_new (struct Set);
1380
1381 switch (cr->source_set->operation)
1382 {
1383 case GNUNET_SET_OPERATION_INTERSECTION:
1384 set->vt = _GSS_intersection_vt ();
1385 break;
1386 case GNUNET_SET_OPERATION_UNION:
1387 set->vt = _GSS_union_vt ();
1388 break;
1389 default:
1390 GNUNET_assert (0);
1391 return;
1392 }
1393
1394 if (NULL == set->vt->copy_state) {
1395 /* Lazy copy not supported for this set operation */
1396 GNUNET_break (0);
1397 GNUNET_free (set);
1398 GNUNET_free (cr);
1399 return;
1400 }
1401
1402 set->operation = cr->source_set->operation;
1403 set->state = set->vt->copy_state (set);
1404 set->content = cr->source_set->content;
1405 set->content->refcount += 1;
1406 set->client = client;
1407 set->client_mq = GNUNET_MQ_queue_for_server_client (client);
1408 GNUNET_CONTAINER_DLL_insert (sets_head,
1409 sets_tail,
1410 set);
1411
1412 GNUNET_free (cr);
1413
1414 GNUNET_SERVER_receive_done (client,
1415 GNUNET_OK);
1416}
1417
1418
1419/**
1420 * Handle a request from the client to
1112 * cancel a running set operation. 1421 * cancel a running set operation.
1113 * 1422 *
1114 * @param cls unused 1423 * @param cls unused
@@ -1226,7 +1535,12 @@ handle_client_accept (void *cls,
1226 op); 1535 op);
1227 op->spec->client_request_id = ntohl (msg->request_id); 1536 op->spec->client_request_id = ntohl (msg->request_id);
1228 op->spec->result_mode = ntohl (msg->result_mode); 1537 op->spec->result_mode = ntohl (msg->result_mode);
1229 op->generation_created = set->current_generation++; 1538
1539 // Advance generation values, so that
1540 // mutations won't interfer with the running operation.
1541 op->generation_created = set->current_generation;
1542 advance_generation (set);
1543
1230 op->vt = set->vt; 1544 op->vt = set->vt;
1231 op->vt->accept (op); 1545 op->vt->accept (op);
1232 GNUNET_SERVER_receive_done (client, 1546 GNUNET_SERVER_receive_done (client,
@@ -1497,6 +1811,12 @@ run (void *cls,
1497 { &handle_client_cancel, NULL, 1811 { &handle_client_cancel, NULL,
1498 GNUNET_MESSAGE_TYPE_SET_CANCEL, 1812 GNUNET_MESSAGE_TYPE_SET_CANCEL,
1499 sizeof (struct GNUNET_SET_CancelMessage)}, 1813 sizeof (struct GNUNET_SET_CancelMessage)},
1814 { &handle_client_copy_lazy_prepare, NULL,
1815 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
1816 sizeof (struct GNUNET_MessageHeader)},
1817 { &handle_client_copy_lazy_connect, NULL,
1818 GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
1819 sizeof (struct GNUNET_SET_CopyLazyConnectMessage)},
1500 { NULL, NULL, 0, 0} 1820 { NULL, NULL, 0, 0}
1501 }; 1821 };
1502 static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = { 1822 static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {