aboutsummaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set_union.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-03-11 18:15:38 +0100
committerChristian Grothoff <christian@grothoff.org>2017-03-11 18:15:38 +0100
commitabdec5e11ff11bb10d32c013e11344a54786f80f (patch)
treec2b8eb6705efa8ac8278a6024d8ab19222471f0e /src/set/gnunet-service-set_union.c
parent4e981fb2bd74f21c33adf05d7999b05704d6909b (diff)
downloadgnunet-abdec5e11ff11bb10d32c013e11344a54786f80f.tar.gz
gnunet-abdec5e11ff11bb10d32c013e11344a54786f80f.zip
cleaning up set handlers, eliminating 2nd level demultiplexing and improving use of types
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r--src/set/gnunet-service-set_union.c645
1 files changed, 373 insertions, 272 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index b5b602074..200bd4b8e 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 Copyright (C) 2013-2016 GNUnet e.V. 3 Copyright (C) 2013-2017 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -19,15 +19,16 @@
19*/ 19*/
20/** 20/**
21 * @file set/gnunet-service-set_union.c 21 * @file set/gnunet-service-set_union.c
22
23 * @brief two-peer set operations 22 * @brief two-peer set operations
24 * @author Florian Dold 23 * @author Florian Dold
24 * @author Christian Grothoff
25 */ 25 */
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_util_lib.h" 27#include "gnunet_util_lib.h"
28#include "gnunet_statistics_service.h" 28#include "gnunet_statistics_service.h"
29#include "gnunet-service-set.h" 29#include "gnunet-service-set.h"
30#include "ibf.h" 30#include "ibf.h"
31#include "gnunet-service-set_union.h"
31#include "gnunet-service-set_union_strata_estimator.h" 32#include "gnunet-service-set_union_strata_estimator.h"
32#include "gnunet-service-set_protocol.h" 33#include "gnunet-service-set_protocol.h"
33#include <gcrypt.h> 34#include <gcrypt.h>
@@ -813,42 +814,56 @@ send_full_set (struct Operation *op)
813 * Handle a strata estimator from a remote peer 814 * Handle a strata estimator from a remote peer
814 * 815 *
815 * @param cls the union operation 816 * @param cls the union operation
816 * @param mh the message 817 * @param msg the message
817 * @param is_compressed #GNUNET_YES if the estimator is compressed
818 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
819 * #GNUNET_OK otherwise
820 */ 818 */
821static int 819int
822handle_p2p_strata_estimator (void *cls, 820check_union_p2p_strata_estimator (void *cls,
823 const struct GNUNET_MessageHeader *mh, 821 const struct StrataEstimatorMessage *msg)
824 int is_compressed)
825{ 822{
826 struct Operation *op = cls; 823 struct Operation *op = cls;
827 struct StrataEstimator *remote_se; 824 int is_compressed;
828 struct StrataEstimatorMessage *msg = (void *) mh;
829 unsigned int diff;
830 uint64_t other_size;
831 size_t len; 825 size_t len;
832 826
833 GNUNET_STATISTICS_update (_GSS_statistics,
834 "# bytes of SE received",
835 ntohs (mh->size),
836 GNUNET_NO);
837
838 if (op->state->phase != PHASE_EXPECT_SE) 827 if (op->state->phase != PHASE_EXPECT_SE)
839 { 828 {
840 GNUNET_break (0); 829 GNUNET_break (0);
841 fail_union_operation (op);
842 return GNUNET_SYSERR; 830 return GNUNET_SYSERR;
843 } 831 }
844 len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage); 832 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
833 len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
845 if ( (GNUNET_NO == is_compressed) && 834 if ( (GNUNET_NO == is_compressed) &&
846 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) ) 835 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
847 { 836 {
848 fail_union_operation (op);
849 GNUNET_break (0); 837 GNUNET_break (0);
850 return GNUNET_SYSERR; 838 return GNUNET_SYSERR;
851 } 839 }
840 return GNUNET_OK;
841}
842
843
844/**
845 * Handle a strata estimator from a remote peer
846 *
847 * @param cls the union operation
848 * @param msg the message
849 */
850void
851handle_union_p2p_strata_estimator (void *cls,
852 const struct StrataEstimatorMessage *msg)
853{
854 struct Operation *op = cls;
855 struct StrataEstimator *remote_se;
856 unsigned int diff;
857 uint64_t other_size;
858 size_t len;
859 int is_compressed;
860
861 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
862 GNUNET_STATISTICS_update (_GSS_statistics,
863 "# bytes of SE received",
864 ntohs (msg->header.size),
865 GNUNET_NO);
866 len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
852 other_size = GNUNET_ntohll (msg->set_size); 867 other_size = GNUNET_ntohll (msg->set_size);
853 remote_se = strata_estimator_create (SE_STRATA_COUNT, 868 remote_se = strata_estimator_create (SE_STRATA_COUNT,
854 SE_IBF_SIZE, 869 SE_IBF_SIZE,
@@ -857,7 +872,7 @@ handle_p2p_strata_estimator (void *cls,
857 { 872 {
858 /* insufficient resources, fail */ 873 /* insufficient resources, fail */
859 fail_union_operation (op); 874 fail_union_operation (op);
860 return GNUNET_SYSERR; 875 return;
861 } 876 }
862 if (GNUNET_OK != 877 if (GNUNET_OK !=
863 strata_estimator_read (&msg[1], 878 strata_estimator_read (&msg[1],
@@ -866,18 +881,16 @@ handle_p2p_strata_estimator (void *cls,
866 remote_se)) 881 remote_se))
867 { 882 {
868 /* decompression failed */ 883 /* decompression failed */
869 fail_union_operation (op);
870 strata_estimator_destroy (remote_se); 884 strata_estimator_destroy (remote_se);
871 return GNUNET_SYSERR; 885 fail_union_operation (op);
886 return;
872 } 887 }
873 GNUNET_assert (NULL != op->state->se); 888 GNUNET_assert (NULL != op->state->se);
874 diff = strata_estimator_difference (remote_se, 889 diff = strata_estimator_difference (remote_se,
875 op->state->se); 890 op->state->se);
876 891
877 if (diff > 200) 892 if (diff > 200)
878 diff = diff * 3 / 2; 893 diff = diff * 3 / 2;
879
880
881 894
882 strata_estimator_destroy (remote_se); 895 strata_estimator_destroy (remote_se);
883 strata_estimator_destroy (op->state->se); 896 strata_estimator_destroy (op->state->se);
@@ -885,12 +898,14 @@ handle_p2p_strata_estimator (void *cls,
885 LOG (GNUNET_ERROR_TYPE_DEBUG, 898 LOG (GNUNET_ERROR_TYPE_DEBUG,
886 "got se diff=%d, using ibf size %d\n", 899 "got se diff=%d, using ibf size %d\n",
887 diff, 900 diff,
888 1<<get_order_from_difference (diff)); 901 1U << get_order_from_difference (diff));
889 902
890 { 903 {
891 char *set_debug; 904 char *set_debug;
905
892 set_debug = getenv ("GNUNET_SET_BENCHMARK"); 906 set_debug = getenv ("GNUNET_SET_BENCHMARK");
893 if ( (NULL != set_debug) && (0 == strcmp (set_debug, "1")) ) 907 if ( (NULL != set_debug) &&
908 (0 == strcmp (set_debug, "1")) )
894 { 909 {
895 FILE *f = fopen ("set.log", "a"); 910 FILE *f = fopen ("set.log", "a");
896 fprintf (f, "%llu\n", (unsigned long long) diff); 911 fprintf (f, "%llu\n", (unsigned long long) diff);
@@ -898,15 +913,16 @@ handle_p2p_strata_estimator (void *cls,
898 } 913 }
899 } 914 }
900 915
901 if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound)) 916 if ( (GNUNET_YES == op->spec->byzantine) &&
917 (other_size < op->spec->byzantine_lower_bound) )
902 { 918 {
903 GNUNET_break (0); 919 GNUNET_break (0);
904 fail_union_operation (op); 920 fail_union_operation (op);
905 return GNUNET_SYSERR; 921 return;
906 } 922 }
907 923
908 924 if ( (GNUNET_YES == op->spec->force_full) ||
909 if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 4)) 925 (diff > op->state->initial_size / 4))
910 { 926 {
911 LOG (GNUNET_ERROR_TYPE_INFO, 927 LOG (GNUNET_ERROR_TYPE_INFO,
912 "Sending full set (diff=%d, own set=%u)\n", 928 "Sending full set (diff=%d, own set=%u)\n",
@@ -923,6 +939,7 @@ handle_p2p_strata_estimator (void *cls,
923 else 939 else
924 { 940 {
925 struct GNUNET_MQ_Envelope *ev; 941 struct GNUNET_MQ_Envelope *ev;
942
926 op->state->phase = PHASE_EXPECT_IBF; 943 op->state->phase = PHASE_EXPECT_IBF;
927 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL); 944 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
928 GNUNET_MQ_send (op->mq, ev); 945 GNUNET_MQ_send (op->mq, ev);
@@ -942,11 +959,10 @@ handle_p2p_strata_estimator (void *cls,
942 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 959 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
943 "Failed to send IBF, closing connection\n"); 960 "Failed to send IBF, closing connection\n");
944 fail_union_operation (op); 961 fail_union_operation (op);
945 return GNUNET_SYSERR; 962 return;
946 } 963 }
947 } 964 }
948 965 GNUNET_CADET_receive_done (op->channel);
949 return GNUNET_OK;
950} 966}
951 967
952 968
@@ -1164,99 +1180,116 @@ decode_and_send (struct Operation *op)
1164 1180
1165 1181
1166/** 1182/**
1167 * Handle an IBF message from a remote peer. 1183 * Check an IBF message from a remote peer.
1168 * 1184 *
1169 * Reassemble the IBF from multiple pieces, and 1185 * Reassemble the IBF from multiple pieces, and
1170 * process the whole IBF once possible. 1186 * process the whole IBF once possible.
1171 * 1187 *
1172 * @param cls the union operation 1188 * @param cls the union operation
1173 * @param mh the header of the message 1189 * @param msg the header of the message
1174 * @return #GNUNET_SYSERR if the tunnel should be disconnected, 1190 * @return #GNUNET_OK if @a msg is well-formed
1175 * #GNUNET_OK otherwise
1176 */ 1191 */
1177static int 1192int
1178handle_p2p_ibf (void *cls, 1193check_union_p2p_ibf (void *cls,
1179 const struct GNUNET_MessageHeader *mh) 1194 const struct IBFMessage *msg)
1180{ 1195{
1181 struct Operation *op = cls; 1196 struct Operation *op = cls;
1182 const struct IBFMessage *msg;
1183 unsigned int buckets_in_message; 1197 unsigned int buckets_in_message;
1184 1198
1185 if (ntohs (mh->size) < sizeof (struct IBFMessage)) 1199 if (OT_UNION != op->type)
1186 { 1200 {
1187 GNUNET_break_op (0); 1201 GNUNET_break_op (0);
1188 fail_union_operation (op);
1189 return GNUNET_SYSERR; 1202 return GNUNET_SYSERR;
1190 } 1203 }
1191 msg = (const struct IBFMessage *) mh; 1204 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1192 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) || 1205 if (0 == buckets_in_message)
1193 (op->state->phase == PHASE_EXPECT_IBF) )
1194 { 1206 {
1195 op->state->phase = PHASE_EXPECT_IBF_CONT; 1207 GNUNET_break_op (0);
1196 GNUNET_assert (NULL == op->state->remote_ibf); 1208 return GNUNET_SYSERR;
1197 LOG (GNUNET_ERROR_TYPE_DEBUG,
1198 "Creating new ibf of size %u\n",
1199 1 << msg->order);
1200 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1201 op->state->salt_receive = ntohl (msg->salt);
1202 LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
1203 if (NULL == op->state->remote_ibf)
1204 {
1205 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1206 "Failed to parse remote IBF, closing connection\n");
1207 fail_union_operation (op);
1208 return GNUNET_SYSERR;
1209 }
1210 op->state->ibf_buckets_received = 0;
1211 if (0 != ntohl (msg->offset))
1212 {
1213 GNUNET_break_op (0);
1214 fail_union_operation (op);
1215 return GNUNET_SYSERR;
1216 }
1217 } 1209 }
1218 else if (op->state->phase == PHASE_EXPECT_IBF_CONT) 1210 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1211 {
1212 GNUNET_break_op (0);
1213 return GNUNET_SYSERR;
1214 }
1215 if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1219 { 1216 {
1220 if (ntohl (msg->offset) != op->state->ibf_buckets_received) 1217 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1221 { 1218 {
1222 GNUNET_break_op (0); 1219 GNUNET_break_op (0);
1223 fail_union_operation (op);
1224 return GNUNET_SYSERR; 1220 return GNUNET_SYSERR;
1225 } 1221 }
1226 if (1<<msg->order != op->state->remote_ibf->size) 1222 if (1<<msg->order != op->state->remote_ibf->size)
1227 { 1223 {
1228 GNUNET_break_op (0); 1224 GNUNET_break_op (0);
1229 fail_union_operation (op);
1230 return GNUNET_SYSERR; 1225 return GNUNET_SYSERR;
1231 } 1226 }
1232 if (ntohl (msg->salt) != op->state->salt_receive) 1227 if (ntohl (msg->salt) != op->state->salt_receive)
1233 { 1228 {
1234 GNUNET_break_op (0); 1229 GNUNET_break_op (0);
1235 fail_union_operation (op);
1236 return GNUNET_SYSERR; 1230 return GNUNET_SYSERR;
1237 } 1231 }
1238 } 1232 }
1239 else 1233 else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1234 (op->state->phase != PHASE_EXPECT_IBF) )
1240 { 1235 {
1241 GNUNET_assert (0); 1236 GNUNET_break_op (0);
1237 return GNUNET_SYSERR;
1242 } 1238 }
1243 1239
1244 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; 1240 return GNUNET_OK;
1241}
1245 1242
1246 if (0 == buckets_in_message) 1243
1244/**
1245 * Handle an IBF message from a remote peer.
1246 *
1247 * Reassemble the IBF from multiple pieces, and
1248 * process the whole IBF once possible.
1249 *
1250 * @param cls the union operation
1251 * @param msg the header of the message
1252 */
1253void
1254handle_union_p2p_ibf (void *cls,
1255 const struct IBFMessage *msg)
1256{
1257 struct Operation *op = cls;
1258 unsigned int buckets_in_message;
1259
1260 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1261 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1262 (op->state->phase == PHASE_EXPECT_IBF) )
1247 { 1263 {
1248 GNUNET_break_op (0); 1264 op->state->phase = PHASE_EXPECT_IBF_CONT;
1249 fail_union_operation (op); 1265 GNUNET_assert (NULL == op->state->remote_ibf);
1250 return GNUNET_SYSERR; 1266 LOG (GNUNET_ERROR_TYPE_DEBUG,
1267 "Creating new ibf of size %u\n",
1268 1 << msg->order);
1269 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1270 op->state->salt_receive = ntohl (msg->salt);
1271 LOG (GNUNET_ERROR_TYPE_DEBUG,
1272 "Receiving new IBF with salt %u\n",
1273 op->state->salt_receive);
1274 if (NULL == op->state->remote_ibf)
1275 {
1276 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1277 "Failed to parse remote IBF, closing connection\n");
1278 fail_union_operation (op);
1279 return;
1280 }
1281 op->state->ibf_buckets_received = 0;
1282 if (0 != ntohl (msg->offset))
1283 {
1284 GNUNET_break_op (0);
1285 fail_union_operation (op);
1286 return;
1287 }
1251 } 1288 }
1252 1289 else
1253 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1254 { 1290 {
1255 GNUNET_break_op (0); 1291 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1256 fail_union_operation (op);
1257 return GNUNET_SYSERR;
1258 } 1292 }
1259
1260 GNUNET_assert (NULL != op->state->remote_ibf); 1293 GNUNET_assert (NULL != op->state->remote_ibf);
1261 1294
1262 ibf_read_slice (&msg[1], 1295 ibf_read_slice (&msg[1],
@@ -1276,10 +1309,11 @@ handle_p2p_ibf (void *cls,
1276 /* Internal error, best we can do is shut down */ 1309 /* Internal error, best we can do is shut down */
1277 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1310 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1278 "Failed to decode IBF, closing connection\n"); 1311 "Failed to decode IBF, closing connection\n");
1279 return GNUNET_SYSERR; 1312 fail_union_operation (op);
1313 return;
1280 } 1314 }
1281 } 1315 }
1282 return GNUNET_OK; 1316 GNUNET_CADET_receive_done (op->channel);
1283} 1317}
1284 1318
1285 1319
@@ -1343,6 +1377,11 @@ send_done_and_destroy (void *cls)
1343} 1377}
1344 1378
1345 1379
1380/**
1381 * Tests if the operation is finished, and if so notify.
1382 *
1383 * @param op operation to check
1384 */
1346static void 1385static void
1347maybe_finish (struct Operation *op) 1386maybe_finish (struct Operation *op)
1348{ 1387{
@@ -1382,46 +1421,59 @@ maybe_finish (struct Operation *op)
1382 1421
1383 1422
1384/** 1423/**
1385 * Handle an element message from a remote peer. 1424 * Check an element message from a remote peer.
1386 * Sent by the other peer either because we decoded an IBF and placed a demand,
1387 * or because the other peer switched to full set transmission.
1388 * 1425 *
1389 * @param cls the union operation 1426 * @param cls the union operation
1390 * @param mh the message 1427 * @param emsg the message
1391 */ 1428 */
1392static void 1429int
1393handle_p2p_elements (void *cls, 1430check_union_p2p_elements (void *cls,
1394 const struct GNUNET_MessageHeader *mh) 1431 const struct GNUNET_SET_ElementMessage *emsg)
1395{ 1432{
1396 struct Operation *op = cls; 1433 struct Operation *op = cls;
1397 struct ElementEntry *ee;
1398 const struct GNUNET_SET_ElementMessage *emsg;
1399 uint16_t element_size;
1400 1434
1401 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes)) 1435 if (OT_UNION != op->type)
1402 { 1436 {
1403 GNUNET_break_op (0); 1437 GNUNET_break_op (0);
1404 fail_union_operation (op); 1438 return GNUNET_SYSERR;
1405 return;
1406 } 1439 }
1407 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) 1440 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1408 { 1441 {
1409 GNUNET_break_op (0); 1442 GNUNET_break_op (0);
1410 fail_union_operation (op); 1443 return GNUNET_SYSERR;
1411 return;
1412 } 1444 }
1445 return GNUNET_OK;
1446}
1447
1413 1448
1414 emsg = (const struct GNUNET_SET_ElementMessage *) mh; 1449/**
1450 * Handle an element message from a remote peer.
1451 * Sent by the other peer either because we decoded an IBF and placed a demand,
1452 * or because the other peer switched to full set transmission.
1453 *
1454 * @param cls the union operation
1455 * @param emsg the message
1456 */
1457void
1458handle_union_p2p_elements (void *cls,
1459 const struct GNUNET_SET_ElementMessage *emsg)
1460{
1461 struct Operation *op = cls;
1462 struct ElementEntry *ee;
1463 struct KeyEntry *ke;
1464 uint16_t element_size;
1415 1465
1416 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); 1466 element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1417 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); 1467 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1418 GNUNET_memcpy (&ee[1], &emsg[1], element_size); 1468 GNUNET_memcpy (&ee[1],
1469 &emsg[1],
1470 element_size);
1419 ee->element.size = element_size; 1471 ee->element.size = element_size;
1420 ee->element.data = &ee[1]; 1472 ee->element.data = &ee[1];
1421 ee->element.element_type = ntohs (emsg->element_type); 1473 ee->element.element_type = ntohs (emsg->element_type);
1422 ee->remote = GNUNET_YES; 1474 ee->remote = GNUNET_YES;
1423 GNUNET_SET_element_hash (&ee->element, &ee->element_hash); 1475 GNUNET_SET_element_hash (&ee->element,
1424 1476 &ee->element_hash);
1425 if (GNUNET_NO == 1477 if (GNUNET_NO ==
1426 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, 1478 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1427 &ee->element_hash, 1479 &ee->element_hash,
@@ -1429,7 +1481,6 @@ handle_p2p_elements (void *cls,
1429 { 1481 {
1430 /* We got something we didn't demand, since it's not in our map. */ 1482 /* We got something we didn't demand, since it's not in our map. */
1431 GNUNET_break_op (0); 1483 GNUNET_break_op (0);
1432 GNUNET_free (ee);
1433 fail_union_operation (op); 1484 fail_union_operation (op);
1434 return; 1485 return;
1435 } 1486 }
@@ -1448,10 +1499,9 @@ handle_p2p_elements (void *cls,
1448 1, 1499 1,
1449 GNUNET_NO); 1500 GNUNET_NO);
1450 1501
1451 op->state->received_total += 1; 1502 op->state->received_total++;
1452
1453 struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1454 1503
1504 ke = op_get_element (op, &ee->element_hash);
1455 if (NULL != ke) 1505 if (NULL != ke)
1456 { 1506 {
1457 /* Got repeated element. Should not happen since 1507 /* Got repeated element. Should not happen since
@@ -1467,7 +1517,7 @@ handle_p2p_elements (void *cls,
1467 { 1517 {
1468 LOG (GNUNET_ERROR_TYPE_DEBUG, 1518 LOG (GNUNET_ERROR_TYPE_DEBUG,
1469 "Registering new element from remote peer\n"); 1519 "Registering new element from remote peer\n");
1470 op->state->received_fresh += 1; 1520 op->state->received_fresh++;
1471 op_register_element (op, ee, GNUNET_YES); 1521 op_register_element (op, ee, GNUNET_YES);
1472 /* only send results immediately if the client wants it */ 1522 /* only send results immediately if the client wants it */
1473 switch (op->spec->result_mode) 1523 switch (op->spec->result_mode)
@@ -1485,43 +1535,57 @@ handle_p2p_elements (void *cls,
1485 } 1535 }
1486 } 1536 }
1487 1537
1488 if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3) 1538 if ( (op->state->received_total > 8) &&
1539 (op->state->received_fresh < op->state->received_total / 3) )
1489 { 1540 {
1490 /* The other peer gave us lots of old elements, there's something wrong. */ 1541 /* The other peer gave us lots of old elements, there's something wrong. */
1491 GNUNET_break_op (0); 1542 GNUNET_break_op (0);
1492 fail_union_operation (op); 1543 fail_union_operation (op);
1493 return; 1544 return;
1494 } 1545 }
1495 1546 GNUNET_CADET_receive_done (op->channel);
1496 maybe_finish (op); 1547 maybe_finish (op);
1497} 1548}
1498 1549
1499 1550
1500/** 1551/**
1501 * Handle an element message from a remote peer. 1552 * Check a full element message from a remote peer.
1502 * 1553 *
1503 * @param cls the union operation 1554 * @param cls the union operation
1504 * @param mh the message 1555 * @param emsg the message
1505 */ 1556 */
1506static void 1557int
1507handle_p2p_full_element (void *cls, 1558check_union_p2p_full_element (void *cls,
1508 const struct GNUNET_MessageHeader *mh) 1559 const struct GNUNET_SET_ElementMessage *emsg)
1509{ 1560{
1510 struct Operation *op = cls; 1561 struct Operation *op = cls;
1511 struct ElementEntry *ee;
1512 const struct GNUNET_SET_ElementMessage *emsg;
1513 uint16_t element_size;
1514 1562
1515 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) 1563 if (OT_UNION != op->type)
1516 { 1564 {
1517 GNUNET_break_op (0); 1565 GNUNET_break_op (0);
1518 fail_union_operation (op); 1566 return GNUNET_SYSERR;
1519 return;
1520 } 1567 }
1568 // FIXME: check that we expect full elements here?
1569 return GNUNET_OK;
1570}
1521 1571
1522 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1523 1572
1524 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); 1573/**
1574 * Handle an element message from a remote peer.
1575 *
1576 * @param cls the union operation
1577 * @param emsg the message
1578 */
1579void
1580handle_union_p2p_full_element (void *cls,
1581 const struct GNUNET_SET_ElementMessage *emsg)
1582{
1583 struct Operation *op = cls;
1584 struct ElementEntry *ee;
1585 struct KeyEntry *ke;
1586 uint16_t element_size;
1587
1588 element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
1525 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); 1589 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1526 GNUNET_memcpy (&ee[1], &emsg[1], element_size); 1590 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1527 ee->element.size = element_size; 1591 ee->element.size = element_size;
@@ -1544,10 +1608,9 @@ handle_p2p_full_element (void *cls,
1544 1, 1608 1,
1545 GNUNET_NO); 1609 GNUNET_NO);
1546 1610
1547 op->state->received_total += 1; 1611 op->state->received_total++;
1548
1549 struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1550 1612
1613 ke = op_get_element (op, &ee->element_hash);
1551 if (NULL != ke) 1614 if (NULL != ke)
1552 { 1615 {
1553 /* Got repeated element. Should not happen since 1616 /* Got repeated element. Should not happen since
@@ -1563,7 +1626,7 @@ handle_p2p_full_element (void *cls,
1563 { 1626 {
1564 LOG (GNUNET_ERROR_TYPE_DEBUG, 1627 LOG (GNUNET_ERROR_TYPE_DEBUG,
1565 "Registering new element from remote peer\n"); 1628 "Registering new element from remote peer\n");
1566 op->state->received_fresh += 1; 1629 op->state->received_fresh++;
1567 op_register_element (op, ee, GNUNET_YES); 1630 op_register_element (op, ee, GNUNET_YES);
1568 /* only send results immediately if the client wants it */ 1631 /* only send results immediately if the client wants it */
1569 switch (op->spec->result_mode) 1632 switch (op->spec->result_mode)
@@ -1581,8 +1644,8 @@ handle_p2p_full_element (void *cls,
1581 } 1644 }
1582 } 1645 }
1583 1646
1584 if ( (GNUNET_YES == op->spec->byzantine) && 1647 if ( (GNUNET_YES == op->spec->byzantine) &&
1585 (op->state->received_total > 384 + op->state->received_fresh * 4) && 1648 (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1586 (op->state->received_fresh < op->state->received_total / 6) ) 1649 (op->state->received_fresh < op->state->received_total / 6) )
1587 { 1650 {
1588 /* The other peer gave us lots of old elements, there's something wrong. */ 1651 /* The other peer gave us lots of old elements, there's something wrong. */
@@ -1594,51 +1657,73 @@ handle_p2p_full_element (void *cls,
1594 fail_union_operation (op); 1657 fail_union_operation (op);
1595 return; 1658 return;
1596 } 1659 }
1660 GNUNET_CADET_receive_done (op->channel);
1597} 1661}
1598 1662
1663
1599/** 1664/**
1600 * Send offers (for GNUNET_Hash-es) in response 1665 * Send offers (for GNUNET_Hash-es) in response
1601 * to inquiries (for IBF_Key-s). 1666 * to inquiries (for IBF_Key-s).
1602 * 1667 *
1603 * @param cls the union operation 1668 * @param cls the union operation
1604 * @param mh the message 1669 * @param msg the message
1605 */ 1670 */
1606static void 1671int
1607handle_p2p_inquiry (void *cls, 1672check_union_p2p_inquiry (void *cls,
1608 const struct GNUNET_MessageHeader *mh) 1673 const struct InquiryMessage *msg)
1609{ 1674{
1610 struct Operation *op = cls; 1675 struct Operation *op = cls;
1611 const struct IBF_Key *ibf_key;
1612 unsigned int num_keys; 1676 unsigned int num_keys;
1613 struct InquiryMessage *msg;
1614 1677
1615 /* look up elements and send them */ 1678 if (OT_UNION != op->type)
1679 {
1680 GNUNET_break_op (0);
1681 return GNUNET_SYSERR;
1682 }
1616 if (op->state->phase != PHASE_INVENTORY_PASSIVE) 1683 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1617 { 1684 {
1618 GNUNET_break_op (0); 1685 GNUNET_break_op (0);
1619 fail_union_operation (op); 1686 return GNUNET_SYSERR;
1620 return;
1621 } 1687 }
1622 num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage)) 1688 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1623 / sizeof (struct IBF_Key); 1689 / sizeof (struct IBF_Key);
1624 if ((ntohs (mh->size) - sizeof (struct InquiryMessage)) 1690 if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1625 != num_keys * sizeof (struct IBF_Key)) 1691 != num_keys * sizeof (struct IBF_Key))
1626 { 1692 {
1627 GNUNET_break_op (0); 1693 GNUNET_break_op (0);
1628 fail_union_operation (op); 1694 return GNUNET_SYSERR;
1629 return;
1630 } 1695 }
1696 return GNUNET_OK;
1697}
1631 1698
1632 msg = (struct InquiryMessage *) mh;
1633 1699
1700/**
1701 * Send offers (for GNUNET_Hash-es) in response
1702 * to inquiries (for IBF_Key-s).
1703 *
1704 * @param cls the union operation
1705 * @param msg the message
1706 */
1707void
1708handle_union_p2p_inquiry (void *cls,
1709 const struct InquiryMessage *msg)
1710{
1711 struct Operation *op = cls;
1712 const struct IBF_Key *ibf_key;
1713 unsigned int num_keys;
1714
1715 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1716 / sizeof (struct IBF_Key);
1634 ibf_key = (const struct IBF_Key *) &msg[1]; 1717 ibf_key = (const struct IBF_Key *) &msg[1];
1635 while (0 != num_keys--) 1718 while (0 != num_keys--)
1636 { 1719 {
1637 struct IBF_Key unsalted_key; 1720 struct IBF_Key unsalted_key;
1721
1638 unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key); 1722 unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1639 send_offers_for_key (op, unsalted_key); 1723 send_offers_for_key (op, unsalted_key);
1640 ibf_key++; 1724 ibf_key++;
1641 } 1725 }
1726 GNUNET_CADET_receive_done (op->channel);
1642} 1727}
1643 1728
1644 1729
@@ -1677,27 +1762,34 @@ send_missing_elements_iter (void *cls,
1677 1762
1678 1763
1679/** 1764/**
1680 * Handle a 1765 * Handle a request for full set transmission.
1681 * 1766 *
1682 * @parem cls closure, a set union operation 1767 * @parem cls closure, a set union operation
1683 * @param mh the demand message 1768 * @param mh the demand message
1684 */ 1769 */
1685static void 1770void
1686handle_p2p_request_full (void *cls, 1771handle_union_p2p_request_full (void *cls,
1687 const struct GNUNET_MessageHeader *mh) 1772 const struct GNUNET_MessageHeader *mh)
1688{ 1773{
1689 struct Operation *op = cls; 1774 struct Operation *op = cls;
1690 1775
1691 if (PHASE_EXPECT_IBF != op->state->phase) 1776 if (OT_UNION != op->type)
1692 { 1777 {
1778 GNUNET_break_op (0);
1693 fail_union_operation (op); 1779 fail_union_operation (op);
1780 return;
1781 }
1782 if (PHASE_EXPECT_IBF != op->state->phase)
1783 {
1694 GNUNET_break_op (0); 1784 GNUNET_break_op (0);
1785 fail_union_operation (op);
1695 return; 1786 return;
1696 } 1787 }
1697 1788
1698 // FIXME: we need to check that our set is larger than the 1789 // FIXME: we need to check that our set is larger than the
1699 // byzantine_lower_bound by some threshold 1790 // byzantine_lower_bound by some threshold
1700 send_full_set (op); 1791 send_full_set (op);
1792 GNUNET_CADET_receive_done (op->channel);
1701} 1793}
1702 1794
1703 1795
@@ -1707,56 +1799,97 @@ handle_p2p_request_full (void *cls,
1707 * @parem cls closure, a set union operation 1799 * @parem cls closure, a set union operation
1708 * @param mh the demand message 1800 * @param mh the demand message
1709 */ 1801 */
1710static void 1802void
1711handle_p2p_full_done (void *cls, 1803handle_union_p2p_full_done (void *cls,
1712 const struct GNUNET_MessageHeader *mh) 1804 const struct GNUNET_MessageHeader *mh)
1713{ 1805{
1714 struct Operation *op = cls; 1806 struct Operation *op = cls;
1715 1807
1716 if (PHASE_EXPECT_IBF == op->state->phase) 1808 switch (op->state->phase)
1717 { 1809 {
1718 struct GNUNET_MQ_Envelope *ev; 1810 case PHASE_EXPECT_IBF:
1719 1811 {
1720 LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n"); 1812 struct GNUNET_MQ_Envelope *ev;
1721 1813
1722 /* send all the elements that did not come from the remote peer */ 1814 LOG (GNUNET_ERROR_TYPE_DEBUG,
1723 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, 1815 "got FULL DONE, sending elements that other peer is missing\n");
1724 &send_missing_elements_iter,
1725 op);
1726 1816
1727 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); 1817 /* send all the elements that did not come from the remote peer */
1728 GNUNET_MQ_send (op->mq, ev); 1818 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1729 op->state->phase = PHASE_DONE; 1819 &send_missing_elements_iter,
1820 op);
1730 1821
1731 /* we now wait until the other peer shuts the tunnel down*/ 1822 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1823 GNUNET_MQ_send (op->mq, ev);
1824 op->state->phase = PHASE_DONE;
1825 /* we now wait until the other peer shuts the tunnel down*/
1826 }
1827 break;
1828 case PHASE_FULL_SENDING:
1829 {
1830 LOG (GNUNET_ERROR_TYPE_DEBUG,
1831 "got FULL DONE, finishing\n");
1832 /* We sent the full set, and got the response for that. We're done. */
1833 op->state->phase = PHASE_DONE;
1834 GNUNET_CADET_receive_done (op->channel);
1835 send_done_and_destroy (op);
1836 return;
1837 }
1838 break;
1839 default:
1840 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1841 "Handle full done phase is %u\n",
1842 (unsigned) op->state->phase);
1843 GNUNET_break_op (0);
1844 fail_union_operation (op);
1845 return;
1732 } 1846 }
1733 else if (PHASE_FULL_SENDING == op->state->phase) 1847 GNUNET_CADET_receive_done (op->channel);
1848}
1849
1850
1851/**
1852 * Check a demand by the other peer for elements based on a list
1853 * of `struct GNUNET_HashCode`s.
1854 *
1855 * @parem cls closure, a set union operation
1856 * @param mh the demand message
1857 * @return #GNUNET_OK if @a mh is well-formed
1858 */
1859int
1860check_union_p2p_demand (void *cls,
1861 const struct GNUNET_MessageHeader *mh)
1862{
1863 struct Operation *op = cls;
1864 unsigned int num_hashes;
1865
1866 if (OT_UNION != op->type)
1734 { 1867 {
1735 LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n"); 1868 GNUNET_break_op (0);
1736 /* We sent the full set, and got the response for that. We're done. */ 1869 return GNUNET_SYSERR;
1737 op->state->phase = PHASE_DONE;
1738 send_done_and_destroy (op);
1739 } 1870 }
1740 else 1871 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1872 / sizeof (struct GNUNET_HashCode);
1873 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1874 != num_hashes * sizeof (struct GNUNET_HashCode))
1741 { 1875 {
1742 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
1743 GNUNET_break_op (0); 1876 GNUNET_break_op (0);
1744 fail_union_operation (op); 1877 return GNUNET_SYSERR;
1745 return;
1746 } 1878 }
1879 return GNUNET_OK;
1747} 1880}
1748 1881
1749 1882
1750/** 1883/**
1751 * Handle a demand by the other peer for elements based on a list 1884 * Handle a demand by the other peer for elements based on a list
1752 * of GNUNET_HashCode-s. 1885 * of `struct GNUNET_HashCode`s.
1753 * 1886 *
1754 * @parem cls closure, a set union operation 1887 * @parem cls closure, a set union operation
1755 * @param mh the demand message 1888 * @param mh the demand message
1756 */ 1889 */
1757static void 1890void
1758handle_p2p_demand (void *cls, 1891handle_union_p2p_demand (void *cls,
1759 const struct GNUNET_MessageHeader *mh) 1892 const struct GNUNET_MessageHeader *mh)
1760{ 1893{
1761 struct Operation *op = cls; 1894 struct Operation *op = cls;
1762 struct ElementEntry *ee; 1895 struct ElementEntry *ee;
@@ -1767,19 +1900,12 @@ handle_p2p_demand (void *cls,
1767 1900
1768 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) 1901 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1769 / sizeof (struct GNUNET_HashCode); 1902 / sizeof (struct GNUNET_HashCode);
1770 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1771 != num_hashes * sizeof (struct GNUNET_HashCode))
1772 {
1773 GNUNET_break_op (0);
1774 fail_union_operation (op);
1775 return;
1776 }
1777
1778 for (hash = (const struct GNUNET_HashCode *) &mh[1]; 1903 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1779 num_hashes > 0; 1904 num_hashes > 0;
1780 hash++, num_hashes--) 1905 hash++, num_hashes--)
1781 { 1906 {
1782 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash); 1907 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1908 hash);
1783 if (NULL == ee) 1909 if (NULL == ee)
1784 { 1910 {
1785 /* Demand for non-existing element. */ 1911 /* Demand for non-existing element. */
@@ -1823,31 +1949,35 @@ handle_p2p_demand (void *cls,
1823 break; 1949 break;
1824 } 1950 }
1825 } 1951 }
1952 GNUNET_CADET_receive_done (op->channel);
1826} 1953}
1827 1954
1828 1955
1829/** 1956/**
1830 * Handle offers (of GNUNET_HashCode-s) and 1957 * Check offer (of `struct GNUNET_HashCode`s).
1831 * respond with demands (of GNUNET_HashCode-s).
1832 * 1958 *
1833 * @param cls the union operation 1959 * @param cls the union operation
1834 * @param mh the message 1960 * @param mh the message
1961 * @return #GNUNET_OK if @a mh is well-formed
1835 */ 1962 */
1836static void 1963int
1837handle_p2p_offer (void *cls, 1964check_union_p2p_offer (void *cls,
1838 const struct GNUNET_MessageHeader *mh) 1965 const struct GNUNET_MessageHeader *mh)
1839{ 1966{
1840 struct Operation *op = cls; 1967 struct Operation *op = cls;
1841 const struct GNUNET_HashCode *hash;
1842 unsigned int num_hashes; 1968 unsigned int num_hashes;
1843 1969
1970 if (OT_UNION != op->type)
1971 {
1972 GNUNET_break_op (0);
1973 return GNUNET_SYSERR;
1974 }
1844 /* look up elements and send them */ 1975 /* look up elements and send them */
1845 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) && 1976 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1846 (op->state->phase != PHASE_INVENTORY_ACTIVE)) 1977 (op->state->phase != PHASE_INVENTORY_ACTIVE))
1847 { 1978 {
1848 GNUNET_break_op (0); 1979 GNUNET_break_op (0);
1849 fail_union_operation (op); 1980 return GNUNET_SYSERR;
1850 return;
1851 } 1981 }
1852 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) 1982 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1853 / sizeof (struct GNUNET_HashCode); 1983 / sizeof (struct GNUNET_HashCode);
@@ -1855,10 +1985,29 @@ handle_p2p_offer (void *cls,
1855 != num_hashes * sizeof (struct GNUNET_HashCode)) 1985 != num_hashes * sizeof (struct GNUNET_HashCode))
1856 { 1986 {
1857 GNUNET_break_op (0); 1987 GNUNET_break_op (0);
1858 fail_union_operation (op); 1988 return GNUNET_SYSERR;
1859 return;
1860 } 1989 }
1990 return GNUNET_OK;
1991}
1861 1992
1993
1994/**
1995 * Handle offers (of `struct GNUNET_HashCode`s) and
1996 * respond with demands (of `struct GNUNET_HashCode`s).
1997 *
1998 * @param cls the union operation
1999 * @param mh the message
2000 */
2001void
2002handle_union_p2p_offer (void *cls,
2003 const struct GNUNET_MessageHeader *mh)
2004{
2005 struct Operation *op = cls;
2006 const struct GNUNET_HashCode *hash;
2007 unsigned int num_hashes;
2008
2009 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2010 / sizeof (struct GNUNET_HashCode);
1862 for (hash = (const struct GNUNET_HashCode *) &mh[1]; 2011 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1863 num_hashes > 0; 2012 num_hashes > 0;
1864 hash++, num_hashes--) 2013 hash++, num_hashes--)
@@ -1897,6 +2046,7 @@ handle_p2p_offer (void *cls,
1897 *(struct GNUNET_HashCode *) &demands[1] = *hash; 2046 *(struct GNUNET_HashCode *) &demands[1] = *hash;
1898 GNUNET_MQ_send (op->mq, ev); 2047 GNUNET_MQ_send (op->mq, ev);
1899 } 2048 }
2049 GNUNET_CADET_receive_done (op->channel);
1900} 2050}
1901 2051
1902 2052
@@ -1906,16 +2056,22 @@ handle_p2p_offer (void *cls,
1906 * @param cls the union operation 2056 * @param cls the union operation
1907 * @param mh the message 2057 * @param mh the message
1908 */ 2058 */
1909static void 2059void
1910handle_p2p_done (void *cls, 2060handle_union_p2p_done (void *cls,
1911 const struct GNUNET_MessageHeader *mh) 2061 const struct GNUNET_MessageHeader *mh)
1912{ 2062{
1913 struct Operation *op = cls; 2063 struct Operation *op = cls;
1914 2064
1915 if (op->state->phase == PHASE_INVENTORY_PASSIVE) 2065 if (OT_UNION != op->type)
1916 { 2066 {
2067 GNUNET_break_op (0);
2068 fail_union_operation (op);
2069 return;
2070 }
2071 switch (op->state->phase)
2072 {
2073 case PHASE_INVENTORY_PASSIVE:
1917 /* We got all requests, but still have to send our elements in response. */ 2074 /* We got all requests, but still have to send our elements in response. */
1918
1919 op->state->phase = PHASE_FINISH_WAITING; 2075 op->state->phase = PHASE_FINISH_WAITING;
1920 2076
1921 LOG (GNUNET_ERROR_TYPE_DEBUG, 2077 LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1929,11 +2085,10 @@ handle_p2p_done (void *cls,
1929 * all our demands are satisfied, so that the active 2085 * all our demands are satisfied, so that the active
1930 * peer can quit if we gave him everything. 2086 * peer can quit if we gave him everything.
1931 */ 2087 */
2088 GNUNET_CADET_receive_done (op->channel);
1932 maybe_finish (op); 2089 maybe_finish (op);
1933 return; 2090 return;
1934 } 2091 case PHASE_INVENTORY_ACTIVE:
1935 if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1936 {
1937 LOG (GNUNET_ERROR_TYPE_DEBUG, 2092 LOG (GNUNET_ERROR_TYPE_DEBUG,
1938 "got DONE (as active partner), waiting to finish\n"); 2093 "got DONE (as active partner), waiting to finish\n");
1939 /* All demands of the other peer are satisfied, 2094 /* All demands of the other peer are satisfied,
@@ -1944,11 +2099,14 @@ handle_p2p_done (void *cls,
1944 * to the other peer once our demands are met. 2099 * to the other peer once our demands are met.
1945 */ 2100 */
1946 op->state->phase = PHASE_FINISH_CLOSING; 2101 op->state->phase = PHASE_FINISH_CLOSING;
2102 GNUNET_CADET_receive_done (op->channel);
1947 maybe_finish (op); 2103 maybe_finish (op);
1948 return; 2104 return;
2105 default:
2106 GNUNET_break_op (0);
2107 fail_union_operation (op);
2108 return;
1949 } 2109 }
1950 GNUNET_break_op (0);
1951 fail_union_operation (op);
1952} 2110}
1953 2111
1954 2112
@@ -2119,62 +2277,6 @@ union_set_destroy (struct SetState *set_state)
2119 2277
2120 2278
2121/** 2279/**
2122 * Dispatch messages for a union operation.
2123 *
2124 * @param op the state of the union evaluate operation
2125 * @param mh the received message
2126 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
2127 * #GNUNET_OK otherwise
2128 */
2129int
2130union_handle_p2p_message (struct Operation *op,
2131 const struct GNUNET_MessageHeader *mh)
2132{
2133 //LOG (GNUNET_ERROR_TYPE_DEBUG,
2134 // "received p2p message (t: %u, s: %u)\n",
2135 // ntohs (mh->type),
2136 // ntohs (mh->size));
2137 switch (ntohs (mh->type))
2138 {
2139 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
2140 return handle_p2p_ibf (op, mh);
2141 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
2142 return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
2143 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
2144 return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
2145 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
2146 handle_p2p_elements (op, mh);
2147 break;
2148 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
2149 handle_p2p_full_element (op, mh);
2150 break;
2151 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
2152 handle_p2p_inquiry (op, mh);
2153 break;
2154 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
2155 handle_p2p_done (op, mh);
2156 break;
2157 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
2158 handle_p2p_offer (op, mh);
2159 break;
2160 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
2161 handle_p2p_demand (op, mh);
2162 break;
2163 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
2164 handle_p2p_full_done (op, mh);
2165 break;
2166 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
2167 handle_p2p_request_full (op, mh);
2168 break;
2169 default:
2170 /* Something wrong with cadet's message handlers? */
2171 GNUNET_assert (0);
2172 }
2173 return GNUNET_OK;
2174}
2175
2176
2177/**
2178 * Handler for peer-disconnects, notifies the client 2280 * Handler for peer-disconnects, notifies the client
2179 * about the aborted operation in case the op was not concluded. 2281 * about the aborted operation in case the op was not concluded.
2180 * 2282 *
@@ -2240,7 +2342,6 @@ _GSS_union_vt ()
2240{ 2342{
2241 static const struct SetVT union_vt = { 2343 static const struct SetVT union_vt = {
2242 .create = &union_set_create, 2344 .create = &union_set_create,
2243 .msg_handler = &union_handle_p2p_message,
2244 .add = &union_add, 2345 .add = &union_add,
2245 .remove = &union_remove, 2346 .remove = &union_remove,
2246 .destroy_set = &union_set_destroy, 2347 .destroy_set = &union_set_destroy,