diff options
author | Christian Grothoff <christian@grothoff.org> | 2017-03-11 18:15:38 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2017-03-11 18:15:38 +0100 |
commit | abdec5e11ff11bb10d32c013e11344a54786f80f (patch) | |
tree | c2b8eb6705efa8ac8278a6024d8ab19222471f0e /src/set/gnunet-service-set_union.c | |
parent | 4e981fb2bd74f21c33adf05d7999b05704d6909b (diff) | |
download | gnunet-abdec5e11ff11bb10d32c013e11344a54786f80f.tar.gz gnunet-abdec5e11ff11bb10d32c013e11344a54786f80f.zip |
cleaning up set handlers, eliminating 2nd level demultiplexing and improving use of types
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r-- | src/set/gnunet-service-set_union.c | 645 |
1 files changed, 373 insertions, 272 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index b5b602074..200bd4b8e 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet | 2 | This file is part of GNUnet |
3 | Copyright (C) 2013-2016 GNUnet e.V. | 3 | Copyright (C) 2013-2017 GNUnet e.V. |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -19,15 +19,16 @@ | |||
19 | */ | 19 | */ |
20 | /** | 20 | /** |
21 | * @file set/gnunet-service-set_union.c | 21 | * @file set/gnunet-service-set_union.c |
22 | |||
23 | * @brief two-peer set operations | 22 | * @brief two-peer set operations |
24 | * @author Florian Dold | 23 | * @author Florian Dold |
24 | * @author Christian Grothoff | ||
25 | */ | 25 | */ |
26 | #include "platform.h" | 26 | #include "platform.h" |
27 | #include "gnunet_util_lib.h" | 27 | #include "gnunet_util_lib.h" |
28 | #include "gnunet_statistics_service.h" | 28 | #include "gnunet_statistics_service.h" |
29 | #include "gnunet-service-set.h" | 29 | #include "gnunet-service-set.h" |
30 | #include "ibf.h" | 30 | #include "ibf.h" |
31 | #include "gnunet-service-set_union.h" | ||
31 | #include "gnunet-service-set_union_strata_estimator.h" | 32 | #include "gnunet-service-set_union_strata_estimator.h" |
32 | #include "gnunet-service-set_protocol.h" | 33 | #include "gnunet-service-set_protocol.h" |
33 | #include <gcrypt.h> | 34 | #include <gcrypt.h> |
@@ -813,42 +814,56 @@ send_full_set (struct Operation *op) | |||
813 | * Handle a strata estimator from a remote peer | 814 | * Handle a strata estimator from a remote peer |
814 | * | 815 | * |
815 | * @param cls the union operation | 816 | * @param cls the union operation |
816 | * @param mh the message | 817 | * @param msg the message |
817 | * @param is_compressed #GNUNET_YES if the estimator is compressed | ||
818 | * @return #GNUNET_SYSERR if the tunnel should be disconnected, | ||
819 | * #GNUNET_OK otherwise | ||
820 | */ | 818 | */ |
821 | static int | 819 | int |
822 | handle_p2p_strata_estimator (void *cls, | 820 | check_union_p2p_strata_estimator (void *cls, |
823 | const struct GNUNET_MessageHeader *mh, | 821 | const struct StrataEstimatorMessage *msg) |
824 | int is_compressed) | ||
825 | { | 822 | { |
826 | struct Operation *op = cls; | 823 | struct Operation *op = cls; |
827 | struct StrataEstimator *remote_se; | 824 | int is_compressed; |
828 | struct StrataEstimatorMessage *msg = (void *) mh; | ||
829 | unsigned int diff; | ||
830 | uint64_t other_size; | ||
831 | size_t len; | 825 | size_t len; |
832 | 826 | ||
833 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
834 | "# bytes of SE received", | ||
835 | ntohs (mh->size), | ||
836 | GNUNET_NO); | ||
837 | |||
838 | if (op->state->phase != PHASE_EXPECT_SE) | 827 | if (op->state->phase != PHASE_EXPECT_SE) |
839 | { | 828 | { |
840 | GNUNET_break (0); | 829 | GNUNET_break (0); |
841 | fail_union_operation (op); | ||
842 | return GNUNET_SYSERR; | 830 | return GNUNET_SYSERR; |
843 | } | 831 | } |
844 | len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage); | 832 | is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type)); |
833 | len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage); | ||
845 | if ( (GNUNET_NO == is_compressed) && | 834 | if ( (GNUNET_NO == is_compressed) && |
846 | (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) ) | 835 | (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) ) |
847 | { | 836 | { |
848 | fail_union_operation (op); | ||
849 | GNUNET_break (0); | 837 | GNUNET_break (0); |
850 | return GNUNET_SYSERR; | 838 | return GNUNET_SYSERR; |
851 | } | 839 | } |
840 | return GNUNET_OK; | ||
841 | } | ||
842 | |||
843 | |||
844 | /** | ||
845 | * Handle a strata estimator from a remote peer | ||
846 | * | ||
847 | * @param cls the union operation | ||
848 | * @param msg the message | ||
849 | */ | ||
850 | void | ||
851 | handle_union_p2p_strata_estimator (void *cls, | ||
852 | const struct StrataEstimatorMessage *msg) | ||
853 | { | ||
854 | struct Operation *op = cls; | ||
855 | struct StrataEstimator *remote_se; | ||
856 | unsigned int diff; | ||
857 | uint64_t other_size; | ||
858 | size_t len; | ||
859 | int is_compressed; | ||
860 | |||
861 | is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type)); | ||
862 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
863 | "# bytes of SE received", | ||
864 | ntohs (msg->header.size), | ||
865 | GNUNET_NO); | ||
866 | len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage); | ||
852 | other_size = GNUNET_ntohll (msg->set_size); | 867 | other_size = GNUNET_ntohll (msg->set_size); |
853 | remote_se = strata_estimator_create (SE_STRATA_COUNT, | 868 | remote_se = strata_estimator_create (SE_STRATA_COUNT, |
854 | SE_IBF_SIZE, | 869 | SE_IBF_SIZE, |
@@ -857,7 +872,7 @@ handle_p2p_strata_estimator (void *cls, | |||
857 | { | 872 | { |
858 | /* insufficient resources, fail */ | 873 | /* insufficient resources, fail */ |
859 | fail_union_operation (op); | 874 | fail_union_operation (op); |
860 | return GNUNET_SYSERR; | 875 | return; |
861 | } | 876 | } |
862 | if (GNUNET_OK != | 877 | if (GNUNET_OK != |
863 | strata_estimator_read (&msg[1], | 878 | strata_estimator_read (&msg[1], |
@@ -866,18 +881,16 @@ handle_p2p_strata_estimator (void *cls, | |||
866 | remote_se)) | 881 | remote_se)) |
867 | { | 882 | { |
868 | /* decompression failed */ | 883 | /* decompression failed */ |
869 | fail_union_operation (op); | ||
870 | strata_estimator_destroy (remote_se); | 884 | strata_estimator_destroy (remote_se); |
871 | return GNUNET_SYSERR; | 885 | fail_union_operation (op); |
886 | return; | ||
872 | } | 887 | } |
873 | GNUNET_assert (NULL != op->state->se); | 888 | GNUNET_assert (NULL != op->state->se); |
874 | diff = strata_estimator_difference (remote_se, | 889 | diff = strata_estimator_difference (remote_se, |
875 | op->state->se); | 890 | op->state->se); |
876 | 891 | ||
877 | if (diff > 200) | 892 | if (diff > 200) |
878 | diff = diff * 3 / 2; | 893 | diff = diff * 3 / 2; |
879 | |||
880 | |||
881 | 894 | ||
882 | strata_estimator_destroy (remote_se); | 895 | strata_estimator_destroy (remote_se); |
883 | strata_estimator_destroy (op->state->se); | 896 | strata_estimator_destroy (op->state->se); |
@@ -885,12 +898,14 @@ handle_p2p_strata_estimator (void *cls, | |||
885 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 898 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
886 | "got se diff=%d, using ibf size %d\n", | 899 | "got se diff=%d, using ibf size %d\n", |
887 | diff, | 900 | diff, |
888 | 1<<get_order_from_difference (diff)); | 901 | 1U << get_order_from_difference (diff)); |
889 | 902 | ||
890 | { | 903 | { |
891 | char *set_debug; | 904 | char *set_debug; |
905 | |||
892 | set_debug = getenv ("GNUNET_SET_BENCHMARK"); | 906 | set_debug = getenv ("GNUNET_SET_BENCHMARK"); |
893 | if ( (NULL != set_debug) && (0 == strcmp (set_debug, "1")) ) | 907 | if ( (NULL != set_debug) && |
908 | (0 == strcmp (set_debug, "1")) ) | ||
894 | { | 909 | { |
895 | FILE *f = fopen ("set.log", "a"); | 910 | FILE *f = fopen ("set.log", "a"); |
896 | fprintf (f, "%llu\n", (unsigned long long) diff); | 911 | fprintf (f, "%llu\n", (unsigned long long) diff); |
@@ -898,15 +913,16 @@ handle_p2p_strata_estimator (void *cls, | |||
898 | } | 913 | } |
899 | } | 914 | } |
900 | 915 | ||
901 | if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound)) | 916 | if ( (GNUNET_YES == op->spec->byzantine) && |
917 | (other_size < op->spec->byzantine_lower_bound) ) | ||
902 | { | 918 | { |
903 | GNUNET_break (0); | 919 | GNUNET_break (0); |
904 | fail_union_operation (op); | 920 | fail_union_operation (op); |
905 | return GNUNET_SYSERR; | 921 | return; |
906 | } | 922 | } |
907 | 923 | ||
908 | 924 | if ( (GNUNET_YES == op->spec->force_full) || | |
909 | if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 4)) | 925 | (diff > op->state->initial_size / 4)) |
910 | { | 926 | { |
911 | LOG (GNUNET_ERROR_TYPE_INFO, | 927 | LOG (GNUNET_ERROR_TYPE_INFO, |
912 | "Sending full set (diff=%d, own set=%u)\n", | 928 | "Sending full set (diff=%d, own set=%u)\n", |
@@ -923,6 +939,7 @@ handle_p2p_strata_estimator (void *cls, | |||
923 | else | 939 | else |
924 | { | 940 | { |
925 | struct GNUNET_MQ_Envelope *ev; | 941 | struct GNUNET_MQ_Envelope *ev; |
942 | |||
926 | op->state->phase = PHASE_EXPECT_IBF; | 943 | op->state->phase = PHASE_EXPECT_IBF; |
927 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL); | 944 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL); |
928 | GNUNET_MQ_send (op->mq, ev); | 945 | GNUNET_MQ_send (op->mq, ev); |
@@ -942,11 +959,10 @@ handle_p2p_strata_estimator (void *cls, | |||
942 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 959 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
943 | "Failed to send IBF, closing connection\n"); | 960 | "Failed to send IBF, closing connection\n"); |
944 | fail_union_operation (op); | 961 | fail_union_operation (op); |
945 | return GNUNET_SYSERR; | 962 | return; |
946 | } | 963 | } |
947 | } | 964 | } |
948 | 965 | GNUNET_CADET_receive_done (op->channel); | |
949 | return GNUNET_OK; | ||
950 | } | 966 | } |
951 | 967 | ||
952 | 968 | ||
@@ -1164,99 +1180,116 @@ decode_and_send (struct Operation *op) | |||
1164 | 1180 | ||
1165 | 1181 | ||
1166 | /** | 1182 | /** |
1167 | * Handle an IBF message from a remote peer. | 1183 | * Check an IBF message from a remote peer. |
1168 | * | 1184 | * |
1169 | * Reassemble the IBF from multiple pieces, and | 1185 | * Reassemble the IBF from multiple pieces, and |
1170 | * process the whole IBF once possible. | 1186 | * process the whole IBF once possible. |
1171 | * | 1187 | * |
1172 | * @param cls the union operation | 1188 | * @param cls the union operation |
1173 | * @param mh the header of the message | 1189 | * @param msg the header of the message |
1174 | * @return #GNUNET_SYSERR if the tunnel should be disconnected, | 1190 | * @return #GNUNET_OK if @a msg is well-formed |
1175 | * #GNUNET_OK otherwise | ||
1176 | */ | 1191 | */ |
1177 | static int | 1192 | int |
1178 | handle_p2p_ibf (void *cls, | 1193 | check_union_p2p_ibf (void *cls, |
1179 | const struct GNUNET_MessageHeader *mh) | 1194 | const struct IBFMessage *msg) |
1180 | { | 1195 | { |
1181 | struct Operation *op = cls; | 1196 | struct Operation *op = cls; |
1182 | const struct IBFMessage *msg; | ||
1183 | unsigned int buckets_in_message; | 1197 | unsigned int buckets_in_message; |
1184 | 1198 | ||
1185 | if (ntohs (mh->size) < sizeof (struct IBFMessage)) | 1199 | if (OT_UNION != op->type) |
1186 | { | 1200 | { |
1187 | GNUNET_break_op (0); | 1201 | GNUNET_break_op (0); |
1188 | fail_union_operation (op); | ||
1189 | return GNUNET_SYSERR; | 1202 | return GNUNET_SYSERR; |
1190 | } | 1203 | } |
1191 | msg = (const struct IBFMessage *) mh; | 1204 | buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; |
1192 | if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) || | 1205 | if (0 == buckets_in_message) |
1193 | (op->state->phase == PHASE_EXPECT_IBF) ) | ||
1194 | { | 1206 | { |
1195 | op->state->phase = PHASE_EXPECT_IBF_CONT; | 1207 | GNUNET_break_op (0); |
1196 | GNUNET_assert (NULL == op->state->remote_ibf); | 1208 | return GNUNET_SYSERR; |
1197 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1198 | "Creating new ibf of size %u\n", | ||
1199 | 1 << msg->order); | ||
1200 | op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); | ||
1201 | op->state->salt_receive = ntohl (msg->salt); | ||
1202 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive); | ||
1203 | if (NULL == op->state->remote_ibf) | ||
1204 | { | ||
1205 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
1206 | "Failed to parse remote IBF, closing connection\n"); | ||
1207 | fail_union_operation (op); | ||
1208 | return GNUNET_SYSERR; | ||
1209 | } | ||
1210 | op->state->ibf_buckets_received = 0; | ||
1211 | if (0 != ntohl (msg->offset)) | ||
1212 | { | ||
1213 | GNUNET_break_op (0); | ||
1214 | fail_union_operation (op); | ||
1215 | return GNUNET_SYSERR; | ||
1216 | } | ||
1217 | } | 1209 | } |
1218 | else if (op->state->phase == PHASE_EXPECT_IBF_CONT) | 1210 | if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) |
1211 | { | ||
1212 | GNUNET_break_op (0); | ||
1213 | return GNUNET_SYSERR; | ||
1214 | } | ||
1215 | if (op->state->phase == PHASE_EXPECT_IBF_CONT) | ||
1219 | { | 1216 | { |
1220 | if (ntohl (msg->offset) != op->state->ibf_buckets_received) | 1217 | if (ntohl (msg->offset) != op->state->ibf_buckets_received) |
1221 | { | 1218 | { |
1222 | GNUNET_break_op (0); | 1219 | GNUNET_break_op (0); |
1223 | fail_union_operation (op); | ||
1224 | return GNUNET_SYSERR; | 1220 | return GNUNET_SYSERR; |
1225 | } | 1221 | } |
1226 | if (1<<msg->order != op->state->remote_ibf->size) | 1222 | if (1<<msg->order != op->state->remote_ibf->size) |
1227 | { | 1223 | { |
1228 | GNUNET_break_op (0); | 1224 | GNUNET_break_op (0); |
1229 | fail_union_operation (op); | ||
1230 | return GNUNET_SYSERR; | 1225 | return GNUNET_SYSERR; |
1231 | } | 1226 | } |
1232 | if (ntohl (msg->salt) != op->state->salt_receive) | 1227 | if (ntohl (msg->salt) != op->state->salt_receive) |
1233 | { | 1228 | { |
1234 | GNUNET_break_op (0); | 1229 | GNUNET_break_op (0); |
1235 | fail_union_operation (op); | ||
1236 | return GNUNET_SYSERR; | 1230 | return GNUNET_SYSERR; |
1237 | } | 1231 | } |
1238 | } | 1232 | } |
1239 | else | 1233 | else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) && |
1234 | (op->state->phase != PHASE_EXPECT_IBF) ) | ||
1240 | { | 1235 | { |
1241 | GNUNET_assert (0); | 1236 | GNUNET_break_op (0); |
1237 | return GNUNET_SYSERR; | ||
1242 | } | 1238 | } |
1243 | 1239 | ||
1244 | buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; | 1240 | return GNUNET_OK; |
1241 | } | ||
1245 | 1242 | ||
1246 | if (0 == buckets_in_message) | 1243 | |
1244 | /** | ||
1245 | * Handle an IBF message from a remote peer. | ||
1246 | * | ||
1247 | * Reassemble the IBF from multiple pieces, and | ||
1248 | * process the whole IBF once possible. | ||
1249 | * | ||
1250 | * @param cls the union operation | ||
1251 | * @param msg the header of the message | ||
1252 | */ | ||
1253 | void | ||
1254 | handle_union_p2p_ibf (void *cls, | ||
1255 | const struct IBFMessage *msg) | ||
1256 | { | ||
1257 | struct Operation *op = cls; | ||
1258 | unsigned int buckets_in_message; | ||
1259 | |||
1260 | buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; | ||
1261 | if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) || | ||
1262 | (op->state->phase == PHASE_EXPECT_IBF) ) | ||
1247 | { | 1263 | { |
1248 | GNUNET_break_op (0); | 1264 | op->state->phase = PHASE_EXPECT_IBF_CONT; |
1249 | fail_union_operation (op); | 1265 | GNUNET_assert (NULL == op->state->remote_ibf); |
1250 | return GNUNET_SYSERR; | 1266 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1267 | "Creating new ibf of size %u\n", | ||
1268 | 1 << msg->order); | ||
1269 | op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); | ||
1270 | op->state->salt_receive = ntohl (msg->salt); | ||
1271 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1272 | "Receiving new IBF with salt %u\n", | ||
1273 | op->state->salt_receive); | ||
1274 | if (NULL == op->state->remote_ibf) | ||
1275 | { | ||
1276 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
1277 | "Failed to parse remote IBF, closing connection\n"); | ||
1278 | fail_union_operation (op); | ||
1279 | return; | ||
1280 | } | ||
1281 | op->state->ibf_buckets_received = 0; | ||
1282 | if (0 != ntohl (msg->offset)) | ||
1283 | { | ||
1284 | GNUNET_break_op (0); | ||
1285 | fail_union_operation (op); | ||
1286 | return; | ||
1287 | } | ||
1251 | } | 1288 | } |
1252 | 1289 | else | |
1253 | if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) | ||
1254 | { | 1290 | { |
1255 | GNUNET_break_op (0); | 1291 | GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT); |
1256 | fail_union_operation (op); | ||
1257 | return GNUNET_SYSERR; | ||
1258 | } | 1292 | } |
1259 | |||
1260 | GNUNET_assert (NULL != op->state->remote_ibf); | 1293 | GNUNET_assert (NULL != op->state->remote_ibf); |
1261 | 1294 | ||
1262 | ibf_read_slice (&msg[1], | 1295 | ibf_read_slice (&msg[1], |
@@ -1276,10 +1309,11 @@ handle_p2p_ibf (void *cls, | |||
1276 | /* Internal error, best we can do is shut down */ | 1309 | /* Internal error, best we can do is shut down */ |
1277 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 1310 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
1278 | "Failed to decode IBF, closing connection\n"); | 1311 | "Failed to decode IBF, closing connection\n"); |
1279 | return GNUNET_SYSERR; | 1312 | fail_union_operation (op); |
1313 | return; | ||
1280 | } | 1314 | } |
1281 | } | 1315 | } |
1282 | return GNUNET_OK; | 1316 | GNUNET_CADET_receive_done (op->channel); |
1283 | } | 1317 | } |
1284 | 1318 | ||
1285 | 1319 | ||
@@ -1343,6 +1377,11 @@ send_done_and_destroy (void *cls) | |||
1343 | } | 1377 | } |
1344 | 1378 | ||
1345 | 1379 | ||
1380 | /** | ||
1381 | * Tests if the operation is finished, and if so notify. | ||
1382 | * | ||
1383 | * @param op operation to check | ||
1384 | */ | ||
1346 | static void | 1385 | static void |
1347 | maybe_finish (struct Operation *op) | 1386 | maybe_finish (struct Operation *op) |
1348 | { | 1387 | { |
@@ -1382,46 +1421,59 @@ maybe_finish (struct Operation *op) | |||
1382 | 1421 | ||
1383 | 1422 | ||
1384 | /** | 1423 | /** |
1385 | * Handle an element message from a remote peer. | 1424 | * Check an element message from a remote peer. |
1386 | * Sent by the other peer either because we decoded an IBF and placed a demand, | ||
1387 | * or because the other peer switched to full set transmission. | ||
1388 | * | 1425 | * |
1389 | * @param cls the union operation | 1426 | * @param cls the union operation |
1390 | * @param mh the message | 1427 | * @param emsg the message |
1391 | */ | 1428 | */ |
1392 | static void | 1429 | int |
1393 | handle_p2p_elements (void *cls, | 1430 | check_union_p2p_elements (void *cls, |
1394 | const struct GNUNET_MessageHeader *mh) | 1431 | const struct GNUNET_SET_ElementMessage *emsg) |
1395 | { | 1432 | { |
1396 | struct Operation *op = cls; | 1433 | struct Operation *op = cls; |
1397 | struct ElementEntry *ee; | ||
1398 | const struct GNUNET_SET_ElementMessage *emsg; | ||
1399 | uint16_t element_size; | ||
1400 | 1434 | ||
1401 | if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes)) | 1435 | if (OT_UNION != op->type) |
1402 | { | 1436 | { |
1403 | GNUNET_break_op (0); | 1437 | GNUNET_break_op (0); |
1404 | fail_union_operation (op); | 1438 | return GNUNET_SYSERR; |
1405 | return; | ||
1406 | } | 1439 | } |
1407 | if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) | 1440 | if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes)) |
1408 | { | 1441 | { |
1409 | GNUNET_break_op (0); | 1442 | GNUNET_break_op (0); |
1410 | fail_union_operation (op); | 1443 | return GNUNET_SYSERR; |
1411 | return; | ||
1412 | } | 1444 | } |
1445 | return GNUNET_OK; | ||
1446 | } | ||
1447 | |||
1413 | 1448 | ||
1414 | emsg = (const struct GNUNET_SET_ElementMessage *) mh; | 1449 | /** |
1450 | * Handle an element message from a remote peer. | ||
1451 | * Sent by the other peer either because we decoded an IBF and placed a demand, | ||
1452 | * or because the other peer switched to full set transmission. | ||
1453 | * | ||
1454 | * @param cls the union operation | ||
1455 | * @param emsg the message | ||
1456 | */ | ||
1457 | void | ||
1458 | handle_union_p2p_elements (void *cls, | ||
1459 | const struct GNUNET_SET_ElementMessage *emsg) | ||
1460 | { | ||
1461 | struct Operation *op = cls; | ||
1462 | struct ElementEntry *ee; | ||
1463 | struct KeyEntry *ke; | ||
1464 | uint16_t element_size; | ||
1415 | 1465 | ||
1416 | element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); | 1466 | element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage); |
1417 | ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); | 1467 | ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); |
1418 | GNUNET_memcpy (&ee[1], &emsg[1], element_size); | 1468 | GNUNET_memcpy (&ee[1], |
1469 | &emsg[1], | ||
1470 | element_size); | ||
1419 | ee->element.size = element_size; | 1471 | ee->element.size = element_size; |
1420 | ee->element.data = &ee[1]; | 1472 | ee->element.data = &ee[1]; |
1421 | ee->element.element_type = ntohs (emsg->element_type); | 1473 | ee->element.element_type = ntohs (emsg->element_type); |
1422 | ee->remote = GNUNET_YES; | 1474 | ee->remote = GNUNET_YES; |
1423 | GNUNET_SET_element_hash (&ee->element, &ee->element_hash); | 1475 | GNUNET_SET_element_hash (&ee->element, |
1424 | 1476 | &ee->element_hash); | |
1425 | if (GNUNET_NO == | 1477 | if (GNUNET_NO == |
1426 | GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, | 1478 | GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, |
1427 | &ee->element_hash, | 1479 | &ee->element_hash, |
@@ -1429,7 +1481,6 @@ handle_p2p_elements (void *cls, | |||
1429 | { | 1481 | { |
1430 | /* We got something we didn't demand, since it's not in our map. */ | 1482 | /* We got something we didn't demand, since it's not in our map. */ |
1431 | GNUNET_break_op (0); | 1483 | GNUNET_break_op (0); |
1432 | GNUNET_free (ee); | ||
1433 | fail_union_operation (op); | 1484 | fail_union_operation (op); |
1434 | return; | 1485 | return; |
1435 | } | 1486 | } |
@@ -1448,10 +1499,9 @@ handle_p2p_elements (void *cls, | |||
1448 | 1, | 1499 | 1, |
1449 | GNUNET_NO); | 1500 | GNUNET_NO); |
1450 | 1501 | ||
1451 | op->state->received_total += 1; | 1502 | op->state->received_total++; |
1452 | |||
1453 | struct KeyEntry *ke = op_get_element (op, &ee->element_hash); | ||
1454 | 1503 | ||
1504 | ke = op_get_element (op, &ee->element_hash); | ||
1455 | if (NULL != ke) | 1505 | if (NULL != ke) |
1456 | { | 1506 | { |
1457 | /* Got repeated element. Should not happen since | 1507 | /* Got repeated element. Should not happen since |
@@ -1467,7 +1517,7 @@ handle_p2p_elements (void *cls, | |||
1467 | { | 1517 | { |
1468 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1518 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1469 | "Registering new element from remote peer\n"); | 1519 | "Registering new element from remote peer\n"); |
1470 | op->state->received_fresh += 1; | 1520 | op->state->received_fresh++; |
1471 | op_register_element (op, ee, GNUNET_YES); | 1521 | op_register_element (op, ee, GNUNET_YES); |
1472 | /* only send results immediately if the client wants it */ | 1522 | /* only send results immediately if the client wants it */ |
1473 | switch (op->spec->result_mode) | 1523 | switch (op->spec->result_mode) |
@@ -1485,43 +1535,57 @@ handle_p2p_elements (void *cls, | |||
1485 | } | 1535 | } |
1486 | } | 1536 | } |
1487 | 1537 | ||
1488 | if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3) | 1538 | if ( (op->state->received_total > 8) && |
1539 | (op->state->received_fresh < op->state->received_total / 3) ) | ||
1489 | { | 1540 | { |
1490 | /* The other peer gave us lots of old elements, there's something wrong. */ | 1541 | /* The other peer gave us lots of old elements, there's something wrong. */ |
1491 | GNUNET_break_op (0); | 1542 | GNUNET_break_op (0); |
1492 | fail_union_operation (op); | 1543 | fail_union_operation (op); |
1493 | return; | 1544 | return; |
1494 | } | 1545 | } |
1495 | 1546 | GNUNET_CADET_receive_done (op->channel); | |
1496 | maybe_finish (op); | 1547 | maybe_finish (op); |
1497 | } | 1548 | } |
1498 | 1549 | ||
1499 | 1550 | ||
1500 | /** | 1551 | /** |
1501 | * Handle an element message from a remote peer. | 1552 | * Check a full element message from a remote peer. |
1502 | * | 1553 | * |
1503 | * @param cls the union operation | 1554 | * @param cls the union operation |
1504 | * @param mh the message | 1555 | * @param emsg the message |
1505 | */ | 1556 | */ |
1506 | static void | 1557 | int |
1507 | handle_p2p_full_element (void *cls, | 1558 | check_union_p2p_full_element (void *cls, |
1508 | const struct GNUNET_MessageHeader *mh) | 1559 | const struct GNUNET_SET_ElementMessage *emsg) |
1509 | { | 1560 | { |
1510 | struct Operation *op = cls; | 1561 | struct Operation *op = cls; |
1511 | struct ElementEntry *ee; | ||
1512 | const struct GNUNET_SET_ElementMessage *emsg; | ||
1513 | uint16_t element_size; | ||
1514 | 1562 | ||
1515 | if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) | 1563 | if (OT_UNION != op->type) |
1516 | { | 1564 | { |
1517 | GNUNET_break_op (0); | 1565 | GNUNET_break_op (0); |
1518 | fail_union_operation (op); | 1566 | return GNUNET_SYSERR; |
1519 | return; | ||
1520 | } | 1567 | } |
1568 | // FIXME: check that we expect full elements here? | ||
1569 | return GNUNET_OK; | ||
1570 | } | ||
1521 | 1571 | ||
1522 | emsg = (const struct GNUNET_SET_ElementMessage *) mh; | ||
1523 | 1572 | ||
1524 | element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); | 1573 | /** |
1574 | * Handle an element message from a remote peer. | ||
1575 | * | ||
1576 | * @param cls the union operation | ||
1577 | * @param emsg the message | ||
1578 | */ | ||
1579 | void | ||
1580 | handle_union_p2p_full_element (void *cls, | ||
1581 | const struct GNUNET_SET_ElementMessage *emsg) | ||
1582 | { | ||
1583 | struct Operation *op = cls; | ||
1584 | struct ElementEntry *ee; | ||
1585 | struct KeyEntry *ke; | ||
1586 | uint16_t element_size; | ||
1587 | |||
1588 | element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage); | ||
1525 | ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); | 1589 | ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); |
1526 | GNUNET_memcpy (&ee[1], &emsg[1], element_size); | 1590 | GNUNET_memcpy (&ee[1], &emsg[1], element_size); |
1527 | ee->element.size = element_size; | 1591 | ee->element.size = element_size; |
@@ -1544,10 +1608,9 @@ handle_p2p_full_element (void *cls, | |||
1544 | 1, | 1608 | 1, |
1545 | GNUNET_NO); | 1609 | GNUNET_NO); |
1546 | 1610 | ||
1547 | op->state->received_total += 1; | 1611 | op->state->received_total++; |
1548 | |||
1549 | struct KeyEntry *ke = op_get_element (op, &ee->element_hash); | ||
1550 | 1612 | ||
1613 | ke = op_get_element (op, &ee->element_hash); | ||
1551 | if (NULL != ke) | 1614 | if (NULL != ke) |
1552 | { | 1615 | { |
1553 | /* Got repeated element. Should not happen since | 1616 | /* Got repeated element. Should not happen since |
@@ -1563,7 +1626,7 @@ handle_p2p_full_element (void *cls, | |||
1563 | { | 1626 | { |
1564 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1627 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1565 | "Registering new element from remote peer\n"); | 1628 | "Registering new element from remote peer\n"); |
1566 | op->state->received_fresh += 1; | 1629 | op->state->received_fresh++; |
1567 | op_register_element (op, ee, GNUNET_YES); | 1630 | op_register_element (op, ee, GNUNET_YES); |
1568 | /* only send results immediately if the client wants it */ | 1631 | /* only send results immediately if the client wants it */ |
1569 | switch (op->spec->result_mode) | 1632 | switch (op->spec->result_mode) |
@@ -1581,8 +1644,8 @@ handle_p2p_full_element (void *cls, | |||
1581 | } | 1644 | } |
1582 | } | 1645 | } |
1583 | 1646 | ||
1584 | if ( (GNUNET_YES == op->spec->byzantine) && | 1647 | if ( (GNUNET_YES == op->spec->byzantine) && |
1585 | (op->state->received_total > 384 + op->state->received_fresh * 4) && | 1648 | (op->state->received_total > 384 + op->state->received_fresh * 4) && |
1586 | (op->state->received_fresh < op->state->received_total / 6) ) | 1649 | (op->state->received_fresh < op->state->received_total / 6) ) |
1587 | { | 1650 | { |
1588 | /* The other peer gave us lots of old elements, there's something wrong. */ | 1651 | /* The other peer gave us lots of old elements, there's something wrong. */ |
@@ -1594,51 +1657,73 @@ handle_p2p_full_element (void *cls, | |||
1594 | fail_union_operation (op); | 1657 | fail_union_operation (op); |
1595 | return; | 1658 | return; |
1596 | } | 1659 | } |
1660 | GNUNET_CADET_receive_done (op->channel); | ||
1597 | } | 1661 | } |
1598 | 1662 | ||
1663 | |||
1599 | /** | 1664 | /** |
1600 | * Send offers (for GNUNET_Hash-es) in response | 1665 | * Send offers (for GNUNET_Hash-es) in response |
1601 | * to inquiries (for IBF_Key-s). | 1666 | * to inquiries (for IBF_Key-s). |
1602 | * | 1667 | * |
1603 | * @param cls the union operation | 1668 | * @param cls the union operation |
1604 | * @param mh the message | 1669 | * @param msg the message |
1605 | */ | 1670 | */ |
1606 | static void | 1671 | int |
1607 | handle_p2p_inquiry (void *cls, | 1672 | check_union_p2p_inquiry (void *cls, |
1608 | const struct GNUNET_MessageHeader *mh) | 1673 | const struct InquiryMessage *msg) |
1609 | { | 1674 | { |
1610 | struct Operation *op = cls; | 1675 | struct Operation *op = cls; |
1611 | const struct IBF_Key *ibf_key; | ||
1612 | unsigned int num_keys; | 1676 | unsigned int num_keys; |
1613 | struct InquiryMessage *msg; | ||
1614 | 1677 | ||
1615 | /* look up elements and send them */ | 1678 | if (OT_UNION != op->type) |
1679 | { | ||
1680 | GNUNET_break_op (0); | ||
1681 | return GNUNET_SYSERR; | ||
1682 | } | ||
1616 | if (op->state->phase != PHASE_INVENTORY_PASSIVE) | 1683 | if (op->state->phase != PHASE_INVENTORY_PASSIVE) |
1617 | { | 1684 | { |
1618 | GNUNET_break_op (0); | 1685 | GNUNET_break_op (0); |
1619 | fail_union_operation (op); | 1686 | return GNUNET_SYSERR; |
1620 | return; | ||
1621 | } | 1687 | } |
1622 | num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage)) | 1688 | num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage)) |
1623 | / sizeof (struct IBF_Key); | 1689 | / sizeof (struct IBF_Key); |
1624 | if ((ntohs (mh->size) - sizeof (struct InquiryMessage)) | 1690 | if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage)) |
1625 | != num_keys * sizeof (struct IBF_Key)) | 1691 | != num_keys * sizeof (struct IBF_Key)) |
1626 | { | 1692 | { |
1627 | GNUNET_break_op (0); | 1693 | GNUNET_break_op (0); |
1628 | fail_union_operation (op); | 1694 | return GNUNET_SYSERR; |
1629 | return; | ||
1630 | } | 1695 | } |
1696 | return GNUNET_OK; | ||
1697 | } | ||
1631 | 1698 | ||
1632 | msg = (struct InquiryMessage *) mh; | ||
1633 | 1699 | ||
1700 | /** | ||
1701 | * Send offers (for GNUNET_Hash-es) in response | ||
1702 | * to inquiries (for IBF_Key-s). | ||
1703 | * | ||
1704 | * @param cls the union operation | ||
1705 | * @param msg the message | ||
1706 | */ | ||
1707 | void | ||
1708 | handle_union_p2p_inquiry (void *cls, | ||
1709 | const struct InquiryMessage *msg) | ||
1710 | { | ||
1711 | struct Operation *op = cls; | ||
1712 | const struct IBF_Key *ibf_key; | ||
1713 | unsigned int num_keys; | ||
1714 | |||
1715 | num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage)) | ||
1716 | / sizeof (struct IBF_Key); | ||
1634 | ibf_key = (const struct IBF_Key *) &msg[1]; | 1717 | ibf_key = (const struct IBF_Key *) &msg[1]; |
1635 | while (0 != num_keys--) | 1718 | while (0 != num_keys--) |
1636 | { | 1719 | { |
1637 | struct IBF_Key unsalted_key; | 1720 | struct IBF_Key unsalted_key; |
1721 | |||
1638 | unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key); | 1722 | unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key); |
1639 | send_offers_for_key (op, unsalted_key); | 1723 | send_offers_for_key (op, unsalted_key); |
1640 | ibf_key++; | 1724 | ibf_key++; |
1641 | } | 1725 | } |
1726 | GNUNET_CADET_receive_done (op->channel); | ||
1642 | } | 1727 | } |
1643 | 1728 | ||
1644 | 1729 | ||
@@ -1677,27 +1762,34 @@ send_missing_elements_iter (void *cls, | |||
1677 | 1762 | ||
1678 | 1763 | ||
1679 | /** | 1764 | /** |
1680 | * Handle a | 1765 | * Handle a request for full set transmission. |
1681 | * | 1766 | * |
1682 | * @parem cls closure, a set union operation | 1767 | * @parem cls closure, a set union operation |
1683 | * @param mh the demand message | 1768 | * @param mh the demand message |
1684 | */ | 1769 | */ |
1685 | static void | 1770 | void |
1686 | handle_p2p_request_full (void *cls, | 1771 | handle_union_p2p_request_full (void *cls, |
1687 | const struct GNUNET_MessageHeader *mh) | 1772 | const struct GNUNET_MessageHeader *mh) |
1688 | { | 1773 | { |
1689 | struct Operation *op = cls; | 1774 | struct Operation *op = cls; |
1690 | 1775 | ||
1691 | if (PHASE_EXPECT_IBF != op->state->phase) | 1776 | if (OT_UNION != op->type) |
1692 | { | 1777 | { |
1778 | GNUNET_break_op (0); | ||
1693 | fail_union_operation (op); | 1779 | fail_union_operation (op); |
1780 | return; | ||
1781 | } | ||
1782 | if (PHASE_EXPECT_IBF != op->state->phase) | ||
1783 | { | ||
1694 | GNUNET_break_op (0); | 1784 | GNUNET_break_op (0); |
1785 | fail_union_operation (op); | ||
1695 | return; | 1786 | return; |
1696 | } | 1787 | } |
1697 | 1788 | ||
1698 | // FIXME: we need to check that our set is larger than the | 1789 | // FIXME: we need to check that our set is larger than the |
1699 | // byzantine_lower_bound by some threshold | 1790 | // byzantine_lower_bound by some threshold |
1700 | send_full_set (op); | 1791 | send_full_set (op); |
1792 | GNUNET_CADET_receive_done (op->channel); | ||
1701 | } | 1793 | } |
1702 | 1794 | ||
1703 | 1795 | ||
@@ -1707,56 +1799,97 @@ handle_p2p_request_full (void *cls, | |||
1707 | * @parem cls closure, a set union operation | 1799 | * @parem cls closure, a set union operation |
1708 | * @param mh the demand message | 1800 | * @param mh the demand message |
1709 | */ | 1801 | */ |
1710 | static void | 1802 | void |
1711 | handle_p2p_full_done (void *cls, | 1803 | handle_union_p2p_full_done (void *cls, |
1712 | const struct GNUNET_MessageHeader *mh) | 1804 | const struct GNUNET_MessageHeader *mh) |
1713 | { | 1805 | { |
1714 | struct Operation *op = cls; | 1806 | struct Operation *op = cls; |
1715 | 1807 | ||
1716 | if (PHASE_EXPECT_IBF == op->state->phase) | 1808 | switch (op->state->phase) |
1717 | { | 1809 | { |
1718 | struct GNUNET_MQ_Envelope *ev; | 1810 | case PHASE_EXPECT_IBF: |
1719 | 1811 | { | |
1720 | LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n"); | 1812 | struct GNUNET_MQ_Envelope *ev; |
1721 | 1813 | ||
1722 | /* send all the elements that did not come from the remote peer */ | 1814 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1723 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, | 1815 | "got FULL DONE, sending elements that other peer is missing\n"); |
1724 | &send_missing_elements_iter, | ||
1725 | op); | ||
1726 | 1816 | ||
1727 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); | 1817 | /* send all the elements that did not come from the remote peer */ |
1728 | GNUNET_MQ_send (op->mq, ev); | 1818 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, |
1729 | op->state->phase = PHASE_DONE; | 1819 | &send_missing_elements_iter, |
1820 | op); | ||
1730 | 1821 | ||
1731 | /* we now wait until the other peer shuts the tunnel down*/ | 1822 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); |
1823 | GNUNET_MQ_send (op->mq, ev); | ||
1824 | op->state->phase = PHASE_DONE; | ||
1825 | /* we now wait until the other peer shuts the tunnel down*/ | ||
1826 | } | ||
1827 | break; | ||
1828 | case PHASE_FULL_SENDING: | ||
1829 | { | ||
1830 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1831 | "got FULL DONE, finishing\n"); | ||
1832 | /* We sent the full set, and got the response for that. We're done. */ | ||
1833 | op->state->phase = PHASE_DONE; | ||
1834 | GNUNET_CADET_receive_done (op->channel); | ||
1835 | send_done_and_destroy (op); | ||
1836 | return; | ||
1837 | } | ||
1838 | break; | ||
1839 | default: | ||
1840 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
1841 | "Handle full done phase is %u\n", | ||
1842 | (unsigned) op->state->phase); | ||
1843 | GNUNET_break_op (0); | ||
1844 | fail_union_operation (op); | ||
1845 | return; | ||
1732 | } | 1846 | } |
1733 | else if (PHASE_FULL_SENDING == op->state->phase) | 1847 | GNUNET_CADET_receive_done (op->channel); |
1848 | } | ||
1849 | |||
1850 | |||
1851 | /** | ||
1852 | * Check a demand by the other peer for elements based on a list | ||
1853 | * of `struct GNUNET_HashCode`s. | ||
1854 | * | ||
1855 | * @parem cls closure, a set union operation | ||
1856 | * @param mh the demand message | ||
1857 | * @return #GNUNET_OK if @a mh is well-formed | ||
1858 | */ | ||
1859 | int | ||
1860 | check_union_p2p_demand (void *cls, | ||
1861 | const struct GNUNET_MessageHeader *mh) | ||
1862 | { | ||
1863 | struct Operation *op = cls; | ||
1864 | unsigned int num_hashes; | ||
1865 | |||
1866 | if (OT_UNION != op->type) | ||
1734 | { | 1867 | { |
1735 | LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n"); | 1868 | GNUNET_break_op (0); |
1736 | /* We sent the full set, and got the response for that. We're done. */ | 1869 | return GNUNET_SYSERR; |
1737 | op->state->phase = PHASE_DONE; | ||
1738 | send_done_and_destroy (op); | ||
1739 | } | 1870 | } |
1740 | else | 1871 | num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) |
1872 | / sizeof (struct GNUNET_HashCode); | ||
1873 | if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) | ||
1874 | != num_hashes * sizeof (struct GNUNET_HashCode)) | ||
1741 | { | 1875 | { |
1742 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase); | ||
1743 | GNUNET_break_op (0); | 1876 | GNUNET_break_op (0); |
1744 | fail_union_operation (op); | 1877 | return GNUNET_SYSERR; |
1745 | return; | ||
1746 | } | 1878 | } |
1879 | return GNUNET_OK; | ||
1747 | } | 1880 | } |
1748 | 1881 | ||
1749 | 1882 | ||
1750 | /** | 1883 | /** |
1751 | * Handle a demand by the other peer for elements based on a list | 1884 | * Handle a demand by the other peer for elements based on a list |
1752 | * of GNUNET_HashCode-s. | 1885 | * of `struct GNUNET_HashCode`s. |
1753 | * | 1886 | * |
1754 | * @parem cls closure, a set union operation | 1887 | * @parem cls closure, a set union operation |
1755 | * @param mh the demand message | 1888 | * @param mh the demand message |
1756 | */ | 1889 | */ |
1757 | static void | 1890 | void |
1758 | handle_p2p_demand (void *cls, | 1891 | handle_union_p2p_demand (void *cls, |
1759 | const struct GNUNET_MessageHeader *mh) | 1892 | const struct GNUNET_MessageHeader *mh) |
1760 | { | 1893 | { |
1761 | struct Operation *op = cls; | 1894 | struct Operation *op = cls; |
1762 | struct ElementEntry *ee; | 1895 | struct ElementEntry *ee; |
@@ -1767,19 +1900,12 @@ handle_p2p_demand (void *cls, | |||
1767 | 1900 | ||
1768 | num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) | 1901 | num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) |
1769 | / sizeof (struct GNUNET_HashCode); | 1902 | / sizeof (struct GNUNET_HashCode); |
1770 | if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) | ||
1771 | != num_hashes * sizeof (struct GNUNET_HashCode)) | ||
1772 | { | ||
1773 | GNUNET_break_op (0); | ||
1774 | fail_union_operation (op); | ||
1775 | return; | ||
1776 | } | ||
1777 | |||
1778 | for (hash = (const struct GNUNET_HashCode *) &mh[1]; | 1903 | for (hash = (const struct GNUNET_HashCode *) &mh[1]; |
1779 | num_hashes > 0; | 1904 | num_hashes > 0; |
1780 | hash++, num_hashes--) | 1905 | hash++, num_hashes--) |
1781 | { | 1906 | { |
1782 | ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash); | 1907 | ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, |
1908 | hash); | ||
1783 | if (NULL == ee) | 1909 | if (NULL == ee) |
1784 | { | 1910 | { |
1785 | /* Demand for non-existing element. */ | 1911 | /* Demand for non-existing element. */ |
@@ -1823,31 +1949,35 @@ handle_p2p_demand (void *cls, | |||
1823 | break; | 1949 | break; |
1824 | } | 1950 | } |
1825 | } | 1951 | } |
1952 | GNUNET_CADET_receive_done (op->channel); | ||
1826 | } | 1953 | } |
1827 | 1954 | ||
1828 | 1955 | ||
1829 | /** | 1956 | /** |
1830 | * Handle offers (of GNUNET_HashCode-s) and | 1957 | * Check offer (of `struct GNUNET_HashCode`s). |
1831 | * respond with demands (of GNUNET_HashCode-s). | ||
1832 | * | 1958 | * |
1833 | * @param cls the union operation | 1959 | * @param cls the union operation |
1834 | * @param mh the message | 1960 | * @param mh the message |
1961 | * @return #GNUNET_OK if @a mh is well-formed | ||
1835 | */ | 1962 | */ |
1836 | static void | 1963 | int |
1837 | handle_p2p_offer (void *cls, | 1964 | check_union_p2p_offer (void *cls, |
1838 | const struct GNUNET_MessageHeader *mh) | 1965 | const struct GNUNET_MessageHeader *mh) |
1839 | { | 1966 | { |
1840 | struct Operation *op = cls; | 1967 | struct Operation *op = cls; |
1841 | const struct GNUNET_HashCode *hash; | ||
1842 | unsigned int num_hashes; | 1968 | unsigned int num_hashes; |
1843 | 1969 | ||
1970 | if (OT_UNION != op->type) | ||
1971 | { | ||
1972 | GNUNET_break_op (0); | ||
1973 | return GNUNET_SYSERR; | ||
1974 | } | ||
1844 | /* look up elements and send them */ | 1975 | /* look up elements and send them */ |
1845 | if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) && | 1976 | if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) && |
1846 | (op->state->phase != PHASE_INVENTORY_ACTIVE)) | 1977 | (op->state->phase != PHASE_INVENTORY_ACTIVE)) |
1847 | { | 1978 | { |
1848 | GNUNET_break_op (0); | 1979 | GNUNET_break_op (0); |
1849 | fail_union_operation (op); | 1980 | return GNUNET_SYSERR; |
1850 | return; | ||
1851 | } | 1981 | } |
1852 | num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) | 1982 | num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) |
1853 | / sizeof (struct GNUNET_HashCode); | 1983 | / sizeof (struct GNUNET_HashCode); |
@@ -1855,10 +1985,29 @@ handle_p2p_offer (void *cls, | |||
1855 | != num_hashes * sizeof (struct GNUNET_HashCode)) | 1985 | != num_hashes * sizeof (struct GNUNET_HashCode)) |
1856 | { | 1986 | { |
1857 | GNUNET_break_op (0); | 1987 | GNUNET_break_op (0); |
1858 | fail_union_operation (op); | 1988 | return GNUNET_SYSERR; |
1859 | return; | ||
1860 | } | 1989 | } |
1990 | return GNUNET_OK; | ||
1991 | } | ||
1861 | 1992 | ||
1993 | |||
1994 | /** | ||
1995 | * Handle offers (of `struct GNUNET_HashCode`s) and | ||
1996 | * respond with demands (of `struct GNUNET_HashCode`s). | ||
1997 | * | ||
1998 | * @param cls the union operation | ||
1999 | * @param mh the message | ||
2000 | */ | ||
2001 | void | ||
2002 | handle_union_p2p_offer (void *cls, | ||
2003 | const struct GNUNET_MessageHeader *mh) | ||
2004 | { | ||
2005 | struct Operation *op = cls; | ||
2006 | const struct GNUNET_HashCode *hash; | ||
2007 | unsigned int num_hashes; | ||
2008 | |||
2009 | num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) | ||
2010 | / sizeof (struct GNUNET_HashCode); | ||
1862 | for (hash = (const struct GNUNET_HashCode *) &mh[1]; | 2011 | for (hash = (const struct GNUNET_HashCode *) &mh[1]; |
1863 | num_hashes > 0; | 2012 | num_hashes > 0; |
1864 | hash++, num_hashes--) | 2013 | hash++, num_hashes--) |
@@ -1897,6 +2046,7 @@ handle_p2p_offer (void *cls, | |||
1897 | *(struct GNUNET_HashCode *) &demands[1] = *hash; | 2046 | *(struct GNUNET_HashCode *) &demands[1] = *hash; |
1898 | GNUNET_MQ_send (op->mq, ev); | 2047 | GNUNET_MQ_send (op->mq, ev); |
1899 | } | 2048 | } |
2049 | GNUNET_CADET_receive_done (op->channel); | ||
1900 | } | 2050 | } |
1901 | 2051 | ||
1902 | 2052 | ||
@@ -1906,16 +2056,22 @@ handle_p2p_offer (void *cls, | |||
1906 | * @param cls the union operation | 2056 | * @param cls the union operation |
1907 | * @param mh the message | 2057 | * @param mh the message |
1908 | */ | 2058 | */ |
1909 | static void | 2059 | void |
1910 | handle_p2p_done (void *cls, | 2060 | handle_union_p2p_done (void *cls, |
1911 | const struct GNUNET_MessageHeader *mh) | 2061 | const struct GNUNET_MessageHeader *mh) |
1912 | { | 2062 | { |
1913 | struct Operation *op = cls; | 2063 | struct Operation *op = cls; |
1914 | 2064 | ||
1915 | if (op->state->phase == PHASE_INVENTORY_PASSIVE) | 2065 | if (OT_UNION != op->type) |
1916 | { | 2066 | { |
2067 | GNUNET_break_op (0); | ||
2068 | fail_union_operation (op); | ||
2069 | return; | ||
2070 | } | ||
2071 | switch (op->state->phase) | ||
2072 | { | ||
2073 | case PHASE_INVENTORY_PASSIVE: | ||
1917 | /* We got all requests, but still have to send our elements in response. */ | 2074 | /* We got all requests, but still have to send our elements in response. */ |
1918 | |||
1919 | op->state->phase = PHASE_FINISH_WAITING; | 2075 | op->state->phase = PHASE_FINISH_WAITING; |
1920 | 2076 | ||
1921 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2077 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
@@ -1929,11 +2085,10 @@ handle_p2p_done (void *cls, | |||
1929 | * all our demands are satisfied, so that the active | 2085 | * all our demands are satisfied, so that the active |
1930 | * peer can quit if we gave him everything. | 2086 | * peer can quit if we gave him everything. |
1931 | */ | 2087 | */ |
2088 | GNUNET_CADET_receive_done (op->channel); | ||
1932 | maybe_finish (op); | 2089 | maybe_finish (op); |
1933 | return; | 2090 | return; |
1934 | } | 2091 | case PHASE_INVENTORY_ACTIVE: |
1935 | if (op->state->phase == PHASE_INVENTORY_ACTIVE) | ||
1936 | { | ||
1937 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 2092 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1938 | "got DONE (as active partner), waiting to finish\n"); | 2093 | "got DONE (as active partner), waiting to finish\n"); |
1939 | /* All demands of the other peer are satisfied, | 2094 | /* All demands of the other peer are satisfied, |
@@ -1944,11 +2099,14 @@ handle_p2p_done (void *cls, | |||
1944 | * to the other peer once our demands are met. | 2099 | * to the other peer once our demands are met. |
1945 | */ | 2100 | */ |
1946 | op->state->phase = PHASE_FINISH_CLOSING; | 2101 | op->state->phase = PHASE_FINISH_CLOSING; |
2102 | GNUNET_CADET_receive_done (op->channel); | ||
1947 | maybe_finish (op); | 2103 | maybe_finish (op); |
1948 | return; | 2104 | return; |
2105 | default: | ||
2106 | GNUNET_break_op (0); | ||
2107 | fail_union_operation (op); | ||
2108 | return; | ||
1949 | } | 2109 | } |
1950 | GNUNET_break_op (0); | ||
1951 | fail_union_operation (op); | ||
1952 | } | 2110 | } |
1953 | 2111 | ||
1954 | 2112 | ||
@@ -2119,62 +2277,6 @@ union_set_destroy (struct SetState *set_state) | |||
2119 | 2277 | ||
2120 | 2278 | ||
2121 | /** | 2279 | /** |
2122 | * Dispatch messages for a union operation. | ||
2123 | * | ||
2124 | * @param op the state of the union evaluate operation | ||
2125 | * @param mh the received message | ||
2126 | * @return #GNUNET_SYSERR if the tunnel should be disconnected, | ||
2127 | * #GNUNET_OK otherwise | ||
2128 | */ | ||
2129 | int | ||
2130 | union_handle_p2p_message (struct Operation *op, | ||
2131 | const struct GNUNET_MessageHeader *mh) | ||
2132 | { | ||
2133 | //LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
2134 | // "received p2p message (t: %u, s: %u)\n", | ||
2135 | // ntohs (mh->type), | ||
2136 | // ntohs (mh->size)); | ||
2137 | switch (ntohs (mh->type)) | ||
2138 | { | ||
2139 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF: | ||
2140 | return handle_p2p_ibf (op, mh); | ||
2141 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE: | ||
2142 | return handle_p2p_strata_estimator (op, mh, GNUNET_NO); | ||
2143 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC: | ||
2144 | return handle_p2p_strata_estimator (op, mh, GNUNET_YES); | ||
2145 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: | ||
2146 | handle_p2p_elements (op, mh); | ||
2147 | break; | ||
2148 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT: | ||
2149 | handle_p2p_full_element (op, mh); | ||
2150 | break; | ||
2151 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY: | ||
2152 | handle_p2p_inquiry (op, mh); | ||
2153 | break; | ||
2154 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE: | ||
2155 | handle_p2p_done (op, mh); | ||
2156 | break; | ||
2157 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER: | ||
2158 | handle_p2p_offer (op, mh); | ||
2159 | break; | ||
2160 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND: | ||
2161 | handle_p2p_demand (op, mh); | ||
2162 | break; | ||
2163 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE: | ||
2164 | handle_p2p_full_done (op, mh); | ||
2165 | break; | ||
2166 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL: | ||
2167 | handle_p2p_request_full (op, mh); | ||
2168 | break; | ||
2169 | default: | ||
2170 | /* Something wrong with cadet's message handlers? */ | ||
2171 | GNUNET_assert (0); | ||
2172 | } | ||
2173 | return GNUNET_OK; | ||
2174 | } | ||
2175 | |||
2176 | |||
2177 | /** | ||
2178 | * Handler for peer-disconnects, notifies the client | 2280 | * Handler for peer-disconnects, notifies the client |
2179 | * about the aborted operation in case the op was not concluded. | 2281 | * about the aborted operation in case the op was not concluded. |
2180 | * | 2282 | * |
@@ -2240,7 +2342,6 @@ _GSS_union_vt () | |||
2240 | { | 2342 | { |
2241 | static const struct SetVT union_vt = { | 2343 | static const struct SetVT union_vt = { |
2242 | .create = &union_set_create, | 2344 | .create = &union_set_create, |
2243 | .msg_handler = &union_handle_p2p_message, | ||
2244 | .add = &union_add, | 2345 | .add = &union_add, |
2245 | .remove = &union_remove, | 2346 | .remove = &union_remove, |
2246 | .destroy_set = &union_set_destroy, | 2347 | .destroy_set = &union_set_destroy, |