aboutsummaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set_union.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r--src/set/gnunet-service-set_union.c89
1 files changed, 70 insertions, 19 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index fc7e578e6..4ca10f0b4 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -769,7 +769,7 @@ send_full_element_iterator (void *cls,
769 struct GNUNET_SET_Element *el = &ee->element; 769 struct GNUNET_SET_Element *el = &ee->element;
770 struct GNUNET_MQ_Envelope *ev; 770 struct GNUNET_MQ_Envelope *ev;
771 771
772 LOG (GNUNET_ERROR_TYPE_INFO, 772 LOG (GNUNET_ERROR_TYPE_DEBUG,
773 "Sending element %s\n", 773 "Sending element %s\n",
774 GNUNET_h2s (key)); 774 GNUNET_h2s (key));
775 ev = GNUNET_MQ_msg_extra (emsg, 775 ev = GNUNET_MQ_msg_extra (emsg,
@@ -796,7 +796,7 @@ send_full_set (struct Operation *op)
796 struct GNUNET_MQ_Envelope *ev; 796 struct GNUNET_MQ_Envelope *ev;
797 797
798 op->state->phase = PHASE_FULL_SENDING; 798 op->state->phase = PHASE_FULL_SENDING;
799 LOG (GNUNET_ERROR_TYPE_INFO, 799 LOG (GNUNET_ERROR_TYPE_DEBUG,
800 "Dedicing to transmit the full set\n"); 800 "Dedicing to transmit the full set\n");
801 /* FIXME: use a more memory-friendly way of doing this with an 801 /* FIXME: use a more memory-friendly way of doing this with an
802 iterator, just as we do in the non-full case! */ 802 iterator, just as we do in the non-full case! */
@@ -924,7 +924,7 @@ handle_union_p2p_strata_estimator (void *cls,
924 (diff > op->state->initial_size / 4) || 924 (diff > op->state->initial_size / 4) ||
925 (0 == other_size) ) 925 (0 == other_size) )
926 { 926 {
927 LOG (GNUNET_ERROR_TYPE_INFO, 927 LOG (GNUNET_ERROR_TYPE_DEBUG,
928 "Deciding to go for full set transmission (diff=%d, own set=%u)\n", 928 "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
929 diff, 929 diff,
930 op->state->initial_size); 930 op->state->initial_size);
@@ -941,7 +941,7 @@ handle_union_p2p_strata_estimator (void *cls,
941 { 941 {
942 struct GNUNET_MQ_Envelope *ev; 942 struct GNUNET_MQ_Envelope *ev;
943 943
944 LOG (GNUNET_ERROR_TYPE_INFO, 944 LOG (GNUNET_ERROR_TYPE_DEBUG,
945 "Telling other peer that we expect its full set\n"); 945 "Telling other peer that we expect its full set\n");
946 op->state->phase = PHASE_EXPECT_IBF; 946 op->state->phase = PHASE_EXPECT_IBF;
947 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL); 947 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
@@ -1299,7 +1299,7 @@ handle_union_p2p_ibf (void *cls,
1299 else 1299 else
1300 { 1300 {
1301 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT); 1301 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1302 LOG (GNUNET_ERROR_TYPE_INFO, 1302 LOG (GNUNET_ERROR_TYPE_DEBUG,
1303 "Received more of IBF\n"); 1303 "Received more of IBF\n");
1304 } 1304 }
1305 GNUNET_assert (NULL != op->state->remote_ibf); 1305 GNUNET_assert (NULL != op->state->remote_ibf);
@@ -1369,18 +1369,55 @@ send_client_element (struct Operation *op,
1369 1369
1370 1370
1371/** 1371/**
1372 * Destroy remote channel.
1373 *
1374 * @param op operation
1375 */
1376void destroy_channel (struct Operation *op)
1377{
1378 struct GNUNET_CADET_Channel *channel;
1379
1380 if (NULL != (channel = op->channel))
1381 {
1382 /* This will free op; called conditionally as this helper function
1383 is also called from within the channel disconnect handler. */
1384 op->channel = NULL;
1385 GNUNET_CADET_channel_destroy (channel);
1386 }
1387}
1388
1389
1390/**
1372 * Signal to the client that the operation has finished and 1391 * Signal to the client that the operation has finished and
1373 * destroy the operation. 1392 * destroy the operation.
1374 * 1393 *
1375 * @param cls operation to destroy 1394 * @param cls operation to destroy
1376 */ 1395 */
1377static void 1396static void
1378send_done_and_destroy (void *cls) 1397send_client_done (void *cls)
1379{ 1398{
1380 struct Operation *op = cls; 1399 struct Operation *op = cls;
1381 struct GNUNET_MQ_Envelope *ev; 1400 struct GNUNET_MQ_Envelope *ev;
1382 struct GNUNET_SET_ResultMessage *rm; 1401 struct GNUNET_SET_ResultMessage *rm;
1383 1402
1403 if (GNUNET_YES == op->state->client_done_sent) {
1404 return;
1405 }
1406
1407 if (PHASE_DONE != op->state->phase) {
1408 LOG (GNUNET_ERROR_TYPE_ERROR,
1409 "union operation failed\n");
1410 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1411 rm->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1412 rm->request_id = htonl (op->client_request_id);
1413 rm->element_type = htons (0);
1414 GNUNET_MQ_send (op->set->cs->mq,
1415 ev);
1416 return;
1417 }
1418
1419 op->state->client_done_sent = GNUNET_YES;
1420
1384 LOG (GNUNET_ERROR_TYPE_INFO, 1421 LOG (GNUNET_ERROR_TYPE_INFO,
1385 "Signalling client that union operation is done\n"); 1422 "Signalling client that union operation is done\n");
1386 ev = GNUNET_MQ_msg (rm, 1423 ev = GNUNET_MQ_msg (rm,
@@ -1391,9 +1428,6 @@ send_done_and_destroy (void *cls)
1391 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); 1428 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1392 GNUNET_MQ_send (op->set->cs->mq, 1429 GNUNET_MQ_send (op->set->cs->mq,
1393 ev); 1430 ev);
1394 /* Will also call the union-specific cancel function. */
1395 _GSS_operation_destroy (op,
1396 GNUNET_YES);
1397} 1431}
1398 1432
1399 1433
@@ -1422,7 +1456,7 @@ maybe_finish (struct Operation *op)
1422 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); 1456 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1423 GNUNET_MQ_send (op->mq, 1457 GNUNET_MQ_send (op->mq,
1424 ev); 1458 ev);
1425 /* We now wait until the other peer closes the channel 1459 /* We now wait until the other peer sends P2P_OVER
1426 * after it got all elements from us. */ 1460 * after it got all elements from us. */
1427 } 1461 }
1428 } 1462 }
@@ -1433,8 +1467,11 @@ maybe_finish (struct Operation *op)
1433 num_demanded); 1467 num_demanded);
1434 if (0 == num_demanded) 1468 if (0 == num_demanded)
1435 { 1469 {
1470 struct GNUNET_MQ_Envelope *ev;
1471
1436 op->state->phase = PHASE_DONE; 1472 op->state->phase = PHASE_DONE;
1437 send_done_and_destroy (op); 1473 send_client_done (op);
1474 destroy_channel (op);
1438 } 1475 }
1439 } 1476 }
1440} 1477}
@@ -1732,7 +1769,7 @@ handle_union_p2p_inquiry (void *cls,
1732 const struct IBF_Key *ibf_key; 1769 const struct IBF_Key *ibf_key;
1733 unsigned int num_keys; 1770 unsigned int num_keys;
1734 1771
1735 LOG (GNUNET_ERROR_TYPE_INFO, 1772 LOG (GNUNET_ERROR_TYPE_DEBUG,
1736 "Received union inquiry\n"); 1773 "Received union inquiry\n");
1737 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage)) 1774 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1738 / sizeof (struct IBF_Key); 1775 / sizeof (struct IBF_Key);
@@ -1800,7 +1837,7 @@ handle_union_p2p_request_full (void *cls,
1800{ 1837{
1801 struct Operation *op = cls; 1838 struct Operation *op = cls;
1802 1839
1803 LOG (GNUNET_ERROR_TYPE_INFO, 1840 LOG (GNUNET_ERROR_TYPE_DEBUG,
1804 "Received request for full set transmission\n"); 1841 "Received request for full set transmission\n");
1805 if (GNUNET_SET_OPERATION_UNION != op->set->operation) 1842 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1806 { 1843 {
@@ -1849,28 +1886,28 @@ handle_union_p2p_full_done (void *cls,
1849 op); 1886 op);
1850 1887
1851 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); 1888 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1852 GNUNET_MQ_notify_sent (ev,
1853 &send_done_and_destroy,
1854 op);
1855 GNUNET_MQ_send (op->mq, 1889 GNUNET_MQ_send (op->mq,
1856 ev); 1890 ev);
1857 op->state->phase = PHASE_DONE; 1891 op->state->phase = PHASE_DONE;
1858 /* we now wait until the other peer shuts the tunnel down*/ 1892 /* we now wait until the other peer sends us the OVER message*/
1859 } 1893 }
1860 break; 1894 break;
1861 case PHASE_FULL_SENDING: 1895 case PHASE_FULL_SENDING:
1862 { 1896 {
1897 struct GNUNET_MQ_Envelope *ev;
1898
1863 LOG (GNUNET_ERROR_TYPE_DEBUG, 1899 LOG (GNUNET_ERROR_TYPE_DEBUG,
1864 "got FULL DONE, finishing\n"); 1900 "got FULL DONE, finishing\n");
1865 /* We sent the full set, and got the response for that. We're done. */ 1901 /* We sent the full set, and got the response for that. We're done. */
1866 op->state->phase = PHASE_DONE; 1902 op->state->phase = PHASE_DONE;
1867 GNUNET_CADET_receive_done (op->channel); 1903 GNUNET_CADET_receive_done (op->channel);
1868 send_done_and_destroy (op); 1904 send_client_done (op);
1905 destroy_channel (op);
1869 return; 1906 return;
1870 } 1907 }
1871 break; 1908 break;
1872 default: 1909 default:
1873 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1910 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1874 "Handle full done phase is %u\n", 1911 "Handle full done phase is %u\n",
1875 (unsigned) op->state->phase); 1912 (unsigned) op->state->phase);
1876 GNUNET_break_op (0); 1913 GNUNET_break_op (0);
@@ -2144,6 +2181,19 @@ handle_union_p2p_done (void *cls,
2144 } 2181 }
2145} 2182}
2146 2183
2184/**
2185 * Handle a over message from a remote peer
2186 *
2187 * @param cls the union operation
2188 * @param mh the message
2189 */
2190void
2191handle_union_p2p_over (void *cls,
2192 const struct GNUNET_MessageHeader *mh)
2193{
2194 send_client_done (cls);
2195}
2196
2147 2197
2148/** 2198/**
2149 * Initiate operation to evaluate a set union with a remote peer. 2199 * Initiate operation to evaluate a set union with a remote peer.
@@ -2372,6 +2422,7 @@ union_copy_state (struct SetState *state)
2372static void 2422static void
2373union_channel_death (struct Operation *op) 2423union_channel_death (struct Operation *op)
2374{ 2424{
2425 send_client_done (op);
2375 _GSS_operation_destroy (op, 2426 _GSS_operation_destroy (op,
2376 GNUNET_YES); 2427 GNUNET_YES);
2377} 2428}