diff options
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r-- | src/set/gnunet-service-set_union.c | 116 |
1 files changed, 72 insertions, 44 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index c651a0381..6d9658ee5 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -245,8 +245,7 @@ struct ElementEntry | |||
245 | 245 | ||
246 | 246 | ||
247 | /** | 247 | /** |
248 | * Information about the element used for | 248 | * Entries in the key-to-element map of the union set. |
249 | * a specific union operation. | ||
250 | */ | 249 | */ |
251 | struct KeyEntry | 250 | struct KeyEntry |
252 | { | 251 | { |
@@ -401,11 +400,14 @@ destroy_key_to_element_iter (void *cls, | |||
401 | static void | 400 | static void |
402 | destroy_union_operation (struct UnionEvaluateOperation *eo) | 401 | destroy_union_operation (struct UnionEvaluateOperation *eo) |
403 | { | 402 | { |
403 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); | ||
404 | |||
404 | if (NULL != eo->mq) | 405 | if (NULL != eo->mq) |
405 | { | 406 | { |
406 | GNUNET_MQ_destroy (eo->mq); | 407 | GNUNET_MQ_destroy (eo->mq); |
407 | eo->mq = NULL; | 408 | eo->mq = NULL; |
408 | } | 409 | } |
410 | |||
409 | if (NULL != eo->socket) | 411 | if (NULL != eo->socket) |
410 | { | 412 | { |
411 | GNUNET_STREAM_close (eo->socket); | 413 | GNUNET_STREAM_close (eo->socket); |
@@ -433,12 +435,16 @@ destroy_union_operation (struct UnionEvaluateOperation *eo) | |||
433 | eo->key_to_element = NULL; | 435 | eo->key_to_element = NULL; |
434 | } | 436 | } |
435 | 437 | ||
436 | |||
437 | GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, | 438 | GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, |
438 | eo->set->state.u->ops_tail, | 439 | eo->set->state.u->ops_tail, |
439 | eo); | 440 | eo); |
440 | GNUNET_free (eo); | 441 | GNUNET_free (eo); |
441 | /* FIXME: free and destroy everything else */ | 442 | |
443 | |||
444 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n"); | ||
445 | |||
446 | |||
447 | /* FIXME: do a garbage collection of the set generations */ | ||
442 | } | 448 | } |
443 | 449 | ||
444 | 450 | ||
@@ -452,7 +458,7 @@ static void | |||
452 | fail_union_operation (struct UnionEvaluateOperation *eo) | 458 | fail_union_operation (struct UnionEvaluateOperation *eo) |
453 | { | 459 | { |
454 | struct GNUNET_MQ_Message *mqm; | 460 | struct GNUNET_MQ_Message *mqm; |
455 | struct ResultMessage *msg; | 461 | struct GNUNET_SET_ResultMessage *msg; |
456 | 462 | ||
457 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); | 463 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); |
458 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); | 464 | msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); |
@@ -495,20 +501,25 @@ send_operation_request (struct UnionEvaluateOperation *eo) | |||
495 | struct GNUNET_MQ_Message *mqm; | 501 | struct GNUNET_MQ_Message *mqm; |
496 | struct OperationRequestMessage *msg; | 502 | struct OperationRequestMessage *msg; |
497 | 503 | ||
498 | mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST); | 504 | mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg); |
499 | if (NULL != eo->context_msg) | 505 | |
500 | if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size))) | 506 | if (NULL == mqm) |
501 | { | 507 | { |
502 | /* the context message is too large */ | 508 | /* the context message is too large */ |
503 | GNUNET_break (0); | 509 | GNUNET_break (0); |
504 | GNUNET_SERVER_client_disconnect (eo->set->client); | 510 | GNUNET_SERVER_client_disconnect (eo->set->client); |
505 | GNUNET_MQ_discard (mqm); | 511 | return; |
506 | return; | 512 | } |
507 | } | ||
508 | msg->operation = htons (GNUNET_SET_OPERATION_UNION); | 513 | msg->operation = htons (GNUNET_SET_OPERATION_UNION); |
509 | msg->app_id = eo->app_id; | 514 | msg->app_id = eo->app_id; |
510 | GNUNET_MQ_send (eo->mq, mqm); | 515 | GNUNET_MQ_send (eo->mq, mqm); |
511 | 516 | ||
517 | if (NULL != eo->context_msg) | ||
518 | { | ||
519 | GNUNET_free (eo->context_msg); | ||
520 | eo->context_msg = NULL; | ||
521 | } | ||
522 | |||
512 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n"); | 523 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n"); |
513 | } | 524 | } |
514 | 525 | ||
@@ -537,7 +548,7 @@ insert_element_iterator (void *cls, | |||
537 | { | 548 | { |
538 | if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) | 549 | if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) |
539 | { | 550 | { |
540 | new_k->next_colliding = old_k; | 551 | new_k->next_colliding = old_k->next_colliding; |
541 | old_k->next_colliding = new_k; | 552 | old_k->next_colliding = new_k; |
542 | return GNUNET_NO; | 553 | return GNUNET_NO; |
543 | } | 554 | } |
@@ -568,12 +579,11 @@ insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee) | |||
568 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, | 579 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, |
569 | (uint32_t) ibf_key.key_val, | 580 | (uint32_t) ibf_key.key_val, |
570 | insert_element_iterator, k); | 581 | insert_element_iterator, k); |
582 | |||
571 | /* was the element inserted into a colliding bucket? */ | 583 | /* was the element inserted into a colliding bucket? */ |
572 | if (GNUNET_SYSERR == ret) | 584 | if (GNUNET_SYSERR == ret) |
573 | { | ||
574 | GNUNET_assert (NULL != k->next_colliding); | ||
575 | return; | 585 | return; |
576 | } | 586 | |
577 | GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, | 587 | GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, |
578 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | 588 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); |
579 | } | 589 | } |
@@ -781,8 +791,8 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) | |||
781 | */ | 791 | */ |
782 | static int | 792 | static int |
783 | send_element_iterator (void *cls, | 793 | send_element_iterator (void *cls, |
784 | uint32_t key, | 794 | uint32_t key, |
785 | void *value) | 795 | void *value) |
786 | { | 796 | { |
787 | struct SendElementClosure *sec = cls; | 797 | struct SendElementClosure *sec = cls; |
788 | struct IBF_Key ibf_key = sec->ibf_key; | 798 | struct IBF_Key ibf_key = sec->ibf_key; |
@@ -795,15 +805,18 @@ send_element_iterator (void *cls, | |||
795 | { | 805 | { |
796 | const struct GNUNET_SET_Element *const element = &ke->element->element; | 806 | const struct GNUNET_SET_Element *const element = &ke->element->element; |
797 | struct GNUNET_MQ_Message *mqm; | 807 | struct GNUNET_MQ_Message *mqm; |
808 | struct GNUNET_MessageHeader *mh; | ||
798 | 809 | ||
799 | GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); | 810 | GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); |
800 | mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); | 811 | mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); |
801 | if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size)) | 812 | if (NULL == mqm) |
802 | { | 813 | { |
814 | /* element too large */ | ||
803 | GNUNET_break (0); | 815 | GNUNET_break (0); |
804 | GNUNET_MQ_discard (mqm); | ||
805 | continue; | 816 | continue; |
806 | } | 817 | } |
818 | memcpy (&mh[1], element->data, element->size); | ||
819 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n"); | ||
807 | GNUNET_MQ_send (eo->mq, mqm); | 820 | GNUNET_MQ_send (eo->mq, mqm); |
808 | ke = ke->next_colliding; | 821 | ke = ke->next_colliding; |
809 | } | 822 | } |
@@ -975,34 +988,42 @@ send_client_element (struct UnionEvaluateOperation *eo, | |||
975 | struct GNUNET_SET_Element *element) | 988 | struct GNUNET_SET_Element *element) |
976 | { | 989 | { |
977 | struct GNUNET_MQ_Message *mqm; | 990 | struct GNUNET_MQ_Message *mqm; |
978 | struct ResultMessage *rm; | 991 | struct GNUNET_SET_ResultMessage *rm; |
979 | 992 | ||
980 | GNUNET_assert (0 != eo->request_id); | 993 | GNUNET_assert (0 != eo->request_id); |
981 | mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); | 994 | mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); |
982 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | 995 | if (NULL == mqm) |
983 | rm->request_id = htonl (eo->request_id); | ||
984 | if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size)) | ||
985 | { | 996 | { |
986 | GNUNET_MQ_discard (mqm); | 997 | GNUNET_MQ_discard (mqm); |
987 | GNUNET_break (0); | 998 | GNUNET_break (0); |
988 | return; | 999 | return; |
989 | } | 1000 | } |
990 | 1001 | rm->result_status = htons (GNUNET_SET_STATUS_OK); | |
1002 | rm->request_id = htonl (eo->request_id); | ||
1003 | memcpy (&rm[1], element->data, element->size); | ||
991 | GNUNET_MQ_send (eo->set->client_mq, mqm); | 1004 | GNUNET_MQ_send (eo->set->client_mq, mqm); |
992 | } | 1005 | } |
993 | 1006 | ||
994 | 1007 | ||
995 | /** | 1008 | /** |
996 | * Callback used for notifications | 1009 | * Completion callback for shutdown |
997 | * | 1010 | * |
998 | * @param cls closure | 1011 | * @param cls the closure from GNUNET_STREAM_shutdown call |
1012 | * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR, | ||
1013 | * SHUT_RDWR) | ||
999 | */ | 1014 | */ |
1000 | static void | 1015 | /* |
1001 | client_done_sent_cb (void *cls) | 1016 | static void |
1017 | stream_shutdown_cb (void *cls, | ||
1018 | int operation) | ||
1002 | { | 1019 | { |
1003 | //struct UnionEvaluateOperation *eo = cls; | 1020 | //struct UnionEvaluateOperation *eo = cls; |
1004 | /* FIXME: destroy eo */ | 1021 | |
1022 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "stream shutdown\n"); | ||
1023 | |||
1024 | // destroy_union_operation (eo); | ||
1005 | } | 1025 | } |
1026 | */ | ||
1006 | 1027 | ||
1007 | 1028 | ||
1008 | /** | 1029 | /** |
@@ -1018,16 +1039,15 @@ static void | |||
1018 | send_client_done_and_destroy (struct UnionEvaluateOperation *eo) | 1039 | send_client_done_and_destroy (struct UnionEvaluateOperation *eo) |
1019 | { | 1040 | { |
1020 | struct GNUNET_MQ_Message *mqm; | 1041 | struct GNUNET_MQ_Message *mqm; |
1021 | struct ResultMessage *rm; | 1042 | struct GNUNET_SET_ResultMessage *rm; |
1022 | 1043 | ||
1023 | GNUNET_assert (0 != eo->request_id); | 1044 | GNUNET_assert (0 != eo->request_id); |
1024 | mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); | 1045 | mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); |
1025 | rm->request_id = htonl (eo->request_id); | 1046 | rm->request_id = htonl (eo->request_id); |
1026 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); | 1047 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); |
1027 | GNUNET_MQ_notify_sent (mqm, client_done_sent_cb, eo); | ||
1028 | GNUNET_MQ_send (eo->set->client_mq, mqm); | 1048 | GNUNET_MQ_send (eo->set->client_mq, mqm); |
1029 | 1049 | ||
1030 | /* FIXME: destroy the eo */ | 1050 | // GNUNET_STREAM_shutdown (eo->socket, SHUT_RDWR, stream_shutdown_cb, eo); |
1031 | } | 1051 | } |
1032 | 1052 | ||
1033 | 1053 | ||
@@ -1199,18 +1219,25 @@ stream_open_cb (void *cls, | |||
1199 | * @parem set the set to evaluate the operation with | 1219 | * @parem set the set to evaluate the operation with |
1200 | */ | 1220 | */ |
1201 | void | 1221 | void |
1202 | _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set) | 1222 | _GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set) |
1203 | { | 1223 | { |
1204 | struct UnionEvaluateOperation *eo; | 1224 | struct UnionEvaluateOperation *eo; |
1225 | struct GNUNET_MessageHeader *context_msg; | ||
1205 | 1226 | ||
1206 | eo = GNUNET_new (struct UnionEvaluateOperation); | 1227 | eo = GNUNET_new (struct UnionEvaluateOperation); |
1207 | eo->peer = m->peer; | 1228 | eo->peer = m->target_peer; |
1208 | eo->set = set; | 1229 | eo->set = set; |
1209 | eo->request_id = htonl (m->request_id); | 1230 | eo->request_id = htonl (m->request_id); |
1210 | GNUNET_assert (0 != eo->request_id); | 1231 | GNUNET_assert (0 != eo->request_id); |
1211 | eo->se = strata_estimator_dup (set->state.u->se); | 1232 | eo->se = strata_estimator_dup (set->state.u->se); |
1212 | eo->salt = ntohs (m->salt); | 1233 | eo->salt = ntohs (m->salt); |
1213 | eo->app_id = m->app_id; | 1234 | eo->app_id = m->app_id; |
1235 | |||
1236 | context_msg = GNUNET_MQ_extract_nested_mh (m); | ||
1237 | if (NULL != context_msg) | ||
1238 | { | ||
1239 | eo->context_msg = GNUNET_copy_message (context_msg); | ||
1240 | } | ||
1214 | 1241 | ||
1215 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 1242 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
1216 | "evaluating union operation, (app %s)\n", | 1243 | "evaluating union operation, (app %s)\n", |
@@ -1235,7 +1262,7 @@ _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set) | |||
1235 | * @param incoming information about the requesting remote peer | 1262 | * @param incoming information about the requesting remote peer |
1236 | */ | 1263 | */ |
1237 | void | 1264 | void |
1238 | _GSS_union_accept (struct AcceptMessage *m, struct Set *set, | 1265 | _GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set, |
1239 | struct Incoming *incoming) | 1266 | struct Incoming *incoming) |
1240 | { | 1267 | { |
1241 | struct UnionEvaluateOperation *eo; | 1268 | struct UnionEvaluateOperation *eo; |
@@ -1250,7 +1277,6 @@ _GSS_union_accept (struct AcceptMessage *m, struct Set *set, | |||
1250 | GNUNET_assert (0 != ntohl (m->request_id)); | 1277 | GNUNET_assert (0 != ntohl (m->request_id)); |
1251 | eo->request_id = ntohl (m->request_id); | 1278 | eo->request_id = ntohl (m->request_id); |
1252 | eo->se = strata_estimator_dup (set->state.u->se); | 1279 | eo->se = strata_estimator_dup (set->state.u->se); |
1253 | eo->set = set; // FIXME: redundant!? | ||
1254 | eo->mq = incoming->mq; | 1280 | eo->mq = incoming->mq; |
1255 | /* transfer ownership of mq and socket from incoming to eo */ | 1281 | /* transfer ownership of mq and socket from incoming to eo */ |
1256 | incoming->mq = NULL; | 1282 | incoming->mq = NULL; |
@@ -1299,7 +1325,7 @@ _GSS_union_set_create (void) | |||
1299 | * @param set set to add the element to | 1325 | * @param set set to add the element to |
1300 | */ | 1326 | */ |
1301 | void | 1327 | void |
1302 | _GSS_union_add (struct ElementMessage *m, struct Set *set) | 1328 | _GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set) |
1303 | { | 1329 | { |
1304 | struct ElementEntry *ee; | 1330 | struct ElementEntry *ee; |
1305 | struct ElementEntry *ee_dup; | 1331 | struct ElementEntry *ee_dup; |
@@ -1357,7 +1383,9 @@ _GSS_union_set_destroy (struct Set *set) | |||
1357 | destroy_elements (set->state.u); | 1383 | destroy_elements (set->state.u); |
1358 | 1384 | ||
1359 | while (NULL != set->state.u->ops_head) | 1385 | while (NULL != set->state.u->ops_head) |
1386 | { | ||
1360 | destroy_union_operation (set->state.u->ops_head); | 1387 | destroy_union_operation (set->state.u->ops_head); |
1388 | } | ||
1361 | } | 1389 | } |
1362 | 1390 | ||
1363 | /** | 1391 | /** |
@@ -1368,7 +1396,7 @@ _GSS_union_set_destroy (struct Set *set) | |||
1368 | * @param set set to remove the element from | 1396 | * @param set set to remove the element from |
1369 | */ | 1397 | */ |
1370 | void | 1398 | void |
1371 | _GSS_union_remove (struct ElementMessage *m, struct Set *set) | 1399 | _GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set) |
1372 | { | 1400 | { |
1373 | struct GNUNET_HashCode hash; | 1401 | struct GNUNET_HashCode hash; |
1374 | struct ElementEntry *ee; | 1402 | struct ElementEntry *ee; |