summaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set_union.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r--src/set/gnunet-service-set_union.c116
1 files changed, 72 insertions, 44 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index c651a0381..6d9658ee5 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -245,8 +245,7 @@ struct ElementEntry
245 245
246 246
247/** 247/**
248 * Information about the element used for 248 * Entries in the key-to-element map of the union set.
249 * a specific union operation.
250 */ 249 */
251struct KeyEntry 250struct KeyEntry
252{ 251{
@@ -401,11 +400,14 @@ destroy_key_to_element_iter (void *cls,
401static void 400static void
402destroy_union_operation (struct UnionEvaluateOperation *eo) 401destroy_union_operation (struct UnionEvaluateOperation *eo)
403{ 402{
403 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
404
404 if (NULL != eo->mq) 405 if (NULL != eo->mq)
405 { 406 {
406 GNUNET_MQ_destroy (eo->mq); 407 GNUNET_MQ_destroy (eo->mq);
407 eo->mq = NULL; 408 eo->mq = NULL;
408 } 409 }
410
409 if (NULL != eo->socket) 411 if (NULL != eo->socket)
410 { 412 {
411 GNUNET_STREAM_close (eo->socket); 413 GNUNET_STREAM_close (eo->socket);
@@ -433,12 +435,16 @@ destroy_union_operation (struct UnionEvaluateOperation *eo)
433 eo->key_to_element = NULL; 435 eo->key_to_element = NULL;
434 } 436 }
435 437
436
437 GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, 438 GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
438 eo->set->state.u->ops_tail, 439 eo->set->state.u->ops_tail,
439 eo); 440 eo);
440 GNUNET_free (eo); 441 GNUNET_free (eo);
441 /* FIXME: free and destroy everything else */ 442
443
444 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n");
445
446
447 /* FIXME: do a garbage collection of the set generations */
442} 448}
443 449
444 450
@@ -452,7 +458,7 @@ static void
452fail_union_operation (struct UnionEvaluateOperation *eo) 458fail_union_operation (struct UnionEvaluateOperation *eo)
453{ 459{
454 struct GNUNET_MQ_Message *mqm; 460 struct GNUNET_MQ_Message *mqm;
455 struct ResultMessage *msg; 461 struct GNUNET_SET_ResultMessage *msg;
456 462
457 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); 463 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
458 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 464 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
@@ -495,20 +501,25 @@ send_operation_request (struct UnionEvaluateOperation *eo)
495 struct GNUNET_MQ_Message *mqm; 501 struct GNUNET_MQ_Message *mqm;
496 struct OperationRequestMessage *msg; 502 struct OperationRequestMessage *msg;
497 503
498 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST); 504 mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg);
499 if (NULL != eo->context_msg) 505
500 if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size))) 506 if (NULL == mqm)
501 { 507 {
502 /* the context message is too large */ 508 /* the context message is too large */
503 GNUNET_break (0); 509 GNUNET_break (0);
504 GNUNET_SERVER_client_disconnect (eo->set->client); 510 GNUNET_SERVER_client_disconnect (eo->set->client);
505 GNUNET_MQ_discard (mqm); 511 return;
506 return; 512 }
507 }
508 msg->operation = htons (GNUNET_SET_OPERATION_UNION); 513 msg->operation = htons (GNUNET_SET_OPERATION_UNION);
509 msg->app_id = eo->app_id; 514 msg->app_id = eo->app_id;
510 GNUNET_MQ_send (eo->mq, mqm); 515 GNUNET_MQ_send (eo->mq, mqm);
511 516
517 if (NULL != eo->context_msg)
518 {
519 GNUNET_free (eo->context_msg);
520 eo->context_msg = NULL;
521 }
522
512 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n"); 523 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
513} 524}
514 525
@@ -537,7 +548,7 @@ insert_element_iterator (void *cls,
537 { 548 {
538 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) 549 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
539 { 550 {
540 new_k->next_colliding = old_k; 551 new_k->next_colliding = old_k->next_colliding;
541 old_k->next_colliding = new_k; 552 old_k->next_colliding = new_k;
542 return GNUNET_NO; 553 return GNUNET_NO;
543 } 554 }
@@ -568,12 +579,11 @@ insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
568 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, 579 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
569 (uint32_t) ibf_key.key_val, 580 (uint32_t) ibf_key.key_val,
570 insert_element_iterator, k); 581 insert_element_iterator, k);
582
571 /* was the element inserted into a colliding bucket? */ 583 /* was the element inserted into a colliding bucket? */
572 if (GNUNET_SYSERR == ret) 584 if (GNUNET_SYSERR == ret)
573 {
574 GNUNET_assert (NULL != k->next_colliding);
575 return; 585 return;
576 } 586
577 GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, 587 GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
578 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 588 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
579} 589}
@@ -781,8 +791,8 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
781 */ 791 */
782static int 792static int
783send_element_iterator (void *cls, 793send_element_iterator (void *cls,
784 uint32_t key, 794 uint32_t key,
785 void *value) 795 void *value)
786{ 796{
787 struct SendElementClosure *sec = cls; 797 struct SendElementClosure *sec = cls;
788 struct IBF_Key ibf_key = sec->ibf_key; 798 struct IBF_Key ibf_key = sec->ibf_key;
@@ -795,15 +805,18 @@ send_element_iterator (void *cls,
795 { 805 {
796 const struct GNUNET_SET_Element *const element = &ke->element->element; 806 const struct GNUNET_SET_Element *const element = &ke->element->element;
797 struct GNUNET_MQ_Message *mqm; 807 struct GNUNET_MQ_Message *mqm;
808 struct GNUNET_MessageHeader *mh;
798 809
799 GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); 810 GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
800 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); 811 mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
801 if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size)) 812 if (NULL == mqm)
802 { 813 {
814 /* element too large */
803 GNUNET_break (0); 815 GNUNET_break (0);
804 GNUNET_MQ_discard (mqm);
805 continue; 816 continue;
806 } 817 }
818 memcpy (&mh[1], element->data, element->size);
819 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
807 GNUNET_MQ_send (eo->mq, mqm); 820 GNUNET_MQ_send (eo->mq, mqm);
808 ke = ke->next_colliding; 821 ke = ke->next_colliding;
809 } 822 }
@@ -975,34 +988,42 @@ send_client_element (struct UnionEvaluateOperation *eo,
975 struct GNUNET_SET_Element *element) 988 struct GNUNET_SET_Element *element)
976{ 989{
977 struct GNUNET_MQ_Message *mqm; 990 struct GNUNET_MQ_Message *mqm;
978 struct ResultMessage *rm; 991 struct GNUNET_SET_ResultMessage *rm;
979 992
980 GNUNET_assert (0 != eo->request_id); 993 GNUNET_assert (0 != eo->request_id);
981 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); 994 mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
982 rm->result_status = htons (GNUNET_SET_STATUS_OK); 995 if (NULL == mqm)
983 rm->request_id = htonl (eo->request_id);
984 if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
985 { 996 {
986 GNUNET_MQ_discard (mqm); 997 GNUNET_MQ_discard (mqm);
987 GNUNET_break (0); 998 GNUNET_break (0);
988 return; 999 return;
989 } 1000 }
990 1001 rm->result_status = htons (GNUNET_SET_STATUS_OK);
1002 rm->request_id = htonl (eo->request_id);
1003 memcpy (&rm[1], element->data, element->size);
991 GNUNET_MQ_send (eo->set->client_mq, mqm); 1004 GNUNET_MQ_send (eo->set->client_mq, mqm);
992} 1005}
993 1006
994 1007
995/** 1008/**
996 * Callback used for notifications 1009 * Completion callback for shutdown
997 * 1010 *
998 * @param cls closure 1011 * @param cls the closure from GNUNET_STREAM_shutdown call
1012 * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
1013 * SHUT_RDWR)
999 */ 1014 */
1000static void 1015/*
1001client_done_sent_cb (void *cls) 1016static void
1017stream_shutdown_cb (void *cls,
1018 int operation)
1002{ 1019{
1003 //struct UnionEvaluateOperation *eo = cls; 1020 //struct UnionEvaluateOperation *eo = cls;
1004 /* FIXME: destroy eo */ 1021
1022 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "stream shutdown\n");
1023
1024 // destroy_union_operation (eo);
1005} 1025}
1026*/
1006 1027
1007 1028
1008/** 1029/**
@@ -1018,16 +1039,15 @@ static void
1018send_client_done_and_destroy (struct UnionEvaluateOperation *eo) 1039send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
1019{ 1040{
1020 struct GNUNET_MQ_Message *mqm; 1041 struct GNUNET_MQ_Message *mqm;
1021 struct ResultMessage *rm; 1042 struct GNUNET_SET_ResultMessage *rm;
1022 1043
1023 GNUNET_assert (0 != eo->request_id); 1044 GNUNET_assert (0 != eo->request_id);
1024 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); 1045 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1025 rm->request_id = htonl (eo->request_id); 1046 rm->request_id = htonl (eo->request_id);
1026 rm->result_status = htons (GNUNET_SET_STATUS_DONE); 1047 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1027 GNUNET_MQ_notify_sent (mqm, client_done_sent_cb, eo);
1028 GNUNET_MQ_send (eo->set->client_mq, mqm); 1048 GNUNET_MQ_send (eo->set->client_mq, mqm);
1029 1049
1030 /* FIXME: destroy the eo */ 1050 // GNUNET_STREAM_shutdown (eo->socket, SHUT_RDWR, stream_shutdown_cb, eo);
1031} 1051}
1032 1052
1033 1053
@@ -1199,18 +1219,25 @@ stream_open_cb (void *cls,
1199 * @parem set the set to evaluate the operation with 1219 * @parem set the set to evaluate the operation with
1200 */ 1220 */
1201void 1221void
1202_GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set) 1222_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set)
1203{ 1223{
1204 struct UnionEvaluateOperation *eo; 1224 struct UnionEvaluateOperation *eo;
1225 struct GNUNET_MessageHeader *context_msg;
1205 1226
1206 eo = GNUNET_new (struct UnionEvaluateOperation); 1227 eo = GNUNET_new (struct UnionEvaluateOperation);
1207 eo->peer = m->peer; 1228 eo->peer = m->target_peer;
1208 eo->set = set; 1229 eo->set = set;
1209 eo->request_id = htonl (m->request_id); 1230 eo->request_id = htonl (m->request_id);
1210 GNUNET_assert (0 != eo->request_id); 1231 GNUNET_assert (0 != eo->request_id);
1211 eo->se = strata_estimator_dup (set->state.u->se); 1232 eo->se = strata_estimator_dup (set->state.u->se);
1212 eo->salt = ntohs (m->salt); 1233 eo->salt = ntohs (m->salt);
1213 eo->app_id = m->app_id; 1234 eo->app_id = m->app_id;
1235
1236 context_msg = GNUNET_MQ_extract_nested_mh (m);
1237 if (NULL != context_msg)
1238 {
1239 eo->context_msg = GNUNET_copy_message (context_msg);
1240 }
1214 1241
1215 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1242 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1216 "evaluating union operation, (app %s)\n", 1243 "evaluating union operation, (app %s)\n",
@@ -1235,7 +1262,7 @@ _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set)
1235 * @param incoming information about the requesting remote peer 1262 * @param incoming information about the requesting remote peer
1236 */ 1263 */
1237void 1264void
1238_GSS_union_accept (struct AcceptMessage *m, struct Set *set, 1265_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
1239 struct Incoming *incoming) 1266 struct Incoming *incoming)
1240{ 1267{
1241 struct UnionEvaluateOperation *eo; 1268 struct UnionEvaluateOperation *eo;
@@ -1250,7 +1277,6 @@ _GSS_union_accept (struct AcceptMessage *m, struct Set *set,
1250 GNUNET_assert (0 != ntohl (m->request_id)); 1277 GNUNET_assert (0 != ntohl (m->request_id));
1251 eo->request_id = ntohl (m->request_id); 1278 eo->request_id = ntohl (m->request_id);
1252 eo->se = strata_estimator_dup (set->state.u->se); 1279 eo->se = strata_estimator_dup (set->state.u->se);
1253 eo->set = set; // FIXME: redundant!?
1254 eo->mq = incoming->mq; 1280 eo->mq = incoming->mq;
1255 /* transfer ownership of mq and socket from incoming to eo */ 1281 /* transfer ownership of mq and socket from incoming to eo */
1256 incoming->mq = NULL; 1282 incoming->mq = NULL;
@@ -1299,7 +1325,7 @@ _GSS_union_set_create (void)
1299 * @param set set to add the element to 1325 * @param set set to add the element to
1300 */ 1326 */
1301void 1327void
1302_GSS_union_add (struct ElementMessage *m, struct Set *set) 1328_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1303{ 1329{
1304 struct ElementEntry *ee; 1330 struct ElementEntry *ee;
1305 struct ElementEntry *ee_dup; 1331 struct ElementEntry *ee_dup;
@@ -1357,7 +1383,9 @@ _GSS_union_set_destroy (struct Set *set)
1357 destroy_elements (set->state.u); 1383 destroy_elements (set->state.u);
1358 1384
1359 while (NULL != set->state.u->ops_head) 1385 while (NULL != set->state.u->ops_head)
1386 {
1360 destroy_union_operation (set->state.u->ops_head); 1387 destroy_union_operation (set->state.u->ops_head);
1388 }
1361} 1389}
1362 1390
1363/** 1391/**
@@ -1368,7 +1396,7 @@ _GSS_union_set_destroy (struct Set *set)
1368 * @param set set to remove the element from 1396 * @param set set to remove the element from
1369 */ 1397 */
1370void 1398void
1371_GSS_union_remove (struct ElementMessage *m, struct Set *set) 1399_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set)
1372{ 1400{
1373 struct GNUNET_HashCode hash; 1401 struct GNUNET_HashCode hash;
1374 struct ElementEntry *ee; 1402 struct ElementEntry *ee;