aboutsummaryrefslogtreecommitdiff
path: root/src/set
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2014-11-27 13:31:52 +0000
committerChristian Grothoff <christian@grothoff.org>2014-11-27 13:31:52 +0000
commit1d96a7f8dc2aa6311eae76e60a92eb2a2b397fe2 (patch)
tree309d6241bf67a6a549071aa0ec3d9af82d030d7b /src/set
parentce9b32618b6ee488352ef0eb506c744868145f82 (diff)
downloadgnunet-1d96a7f8dc2aa6311eae76e60a92eb2a2b397fe2.tar.gz
gnunet-1d96a7f8dc2aa6311eae76e60a92eb2a2b397fe2.zip
use and respect send_more field in IterAckMessage
Diffstat (limited to 'src/set')
-rw-r--r--src/set/gnunet-service-set.c225
-rw-r--r--src/set/set.h3
-rw-r--r--src/set/set_api.c463
3 files changed, 379 insertions, 312 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index 97ee1480d..5e1b89936 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -176,7 +176,6 @@ get_incoming (uint32_t id)
176 for (op = incoming_head; NULL != op; op = op->next) 176 for (op = incoming_head; NULL != op; op = op->next)
177 if (op->suggest_id == id) 177 if (op->suggest_id == id)
178 { 178 {
179 // FIXME: remove this assertion once the corresponding bug is gone!
180 GNUNET_assert (GNUNET_YES == op->is_incoming); 179 GNUNET_assert (GNUNET_YES == op->is_incoming);
181 return op; 180 return op;
182 } 181 }
@@ -325,8 +324,8 @@ _GSS_operation_destroy (struct Operation *op,
325 GNUNET_assert (GNUNET_NO == op->is_incoming); 324 GNUNET_assert (GNUNET_NO == op->is_incoming);
326 GNUNET_assert (NULL != op->spec); 325 GNUNET_assert (NULL != op->spec);
327 set = op->spec->set; 326 set = op->spec->set;
328 GNUNET_CONTAINER_DLL_remove (op->spec->set->ops_head, 327 GNUNET_CONTAINER_DLL_remove (set->ops_head,
329 op->spec->set->ops_tail, 328 set->ops_tail,
330 op); 329 op);
331 op->vt->cancel (op); 330 op->vt->cancel (op);
332 op->vt = NULL; 331 op->vt = NULL;
@@ -511,12 +510,12 @@ static struct Listener *
511listener_get_by_target (enum GNUNET_SET_OperationType op, 510listener_get_by_target (enum GNUNET_SET_OperationType op,
512 const struct GNUNET_HashCode *app_id) 511 const struct GNUNET_HashCode *app_id)
513{ 512{
514 struct Listener *l; 513 struct Listener *listener;
515 514
516 for (l = listeners_head; NULL != l; l = l->next) 515 for (listener = listeners_head; NULL != listener; listener = listener->next)
517 if ( (l->operation == op) && 516 if ( (listener->operation == op) &&
518 (0 == GNUNET_CRYPTO_hash_cmp (app_id, &l->app_id)) ) 517 (0 == GNUNET_CRYPTO_hash_cmp (app_id, &listener->app_id)) )
519 return l; 518 return listener;
520 return NULL; 519 return NULL;
521} 520}
522 521
@@ -997,7 +996,7 @@ handle_client_remove (void *cls,
997 996
998 997
999/** 998/**
1000 * Called when a client wants to evaluate a set operation with another 999 * Called when a client wants to initiate a set operation with another
1001 * peer. Initiates the CADET connection to the listener and sends the 1000 * peer. Initiates the CADET connection to the listener and sends the
1002 * request. 1001 * request.
1003 * 1002 *
@@ -1068,6 +1067,7 @@ handle_client_iter_ack (void *cls,
1068 struct GNUNET_SERVER_Client *client, 1067 struct GNUNET_SERVER_Client *client,
1069 const struct GNUNET_MessageHeader *m) 1068 const struct GNUNET_MessageHeader *m)
1070{ 1069{
1070 const struct GNUNET_SET_IterAckMessage *ack;
1071 struct Set *set; 1071 struct Set *set;
1072 1072
1073 set = set_get (client); 1073 set = set_get (client);
@@ -1086,9 +1086,18 @@ handle_client_iter_ack (void *cls,
1086 GNUNET_SERVER_client_disconnect (client); 1086 GNUNET_SERVER_client_disconnect (client);
1087 return; 1087 return;
1088 } 1088 }
1089 ack = (const struct GNUNET_SET_IterAckMessage *) m;
1089 GNUNET_SERVER_receive_done (client, 1090 GNUNET_SERVER_receive_done (client,
1090 GNUNET_OK); 1091 GNUNET_OK);
1091 send_client_element (set); 1092 if (ntohl (ack->send_more))
1093 {
1094 send_client_element (set);
1095 }
1096 else
1097 {
1098 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
1099 set->iter = NULL;
1100 }
1092} 1101}
1093 1102
1094 1103
@@ -1111,19 +1120,17 @@ handle_client_cancel (void *cls,
1111 struct Operation *op; 1120 struct Operation *op;
1112 int found; 1121 int found;
1113 1122
1114 // client without a set requested an operation
1115 set = set_get (client); 1123 set = set_get (client);
1116 if (NULL == set) 1124 if (NULL == set)
1117 { 1125 {
1126 /* client without a set requested an operation */
1118 GNUNET_break (0); 1127 GNUNET_break (0);
1119 GNUNET_SERVER_client_disconnect (client); 1128 GNUNET_SERVER_client_disconnect (client);
1120 return; 1129 return;
1121 } 1130 }
1122
1123 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1131 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1124 "client requested cancel for op %u\n", 1132 "Client requested cancel for op %u\n",
1125 ntohl (msg->request_id)); 1133 ntohl (msg->request_id));
1126
1127 found = GNUNET_NO; 1134 found = GNUNET_NO;
1128 for (op = set->ops_head; NULL != op; op = op->next) 1135 for (op = set->ops_head; NULL != op; op = op->next)
1129 { 1136 {
@@ -1133,27 +1140,30 @@ handle_client_cancel (void *cls,
1133 break; 1140 break;
1134 } 1141 }
1135 } 1142 }
1136 1143 if (GNUNET_NO == found)
1137 /* It may happen that the operation was destroyed due to 1144 {
1138 * the other peer disconnecting. The client may not know about this 1145 /* It may happen that the operation was already destroyed due to
1139 * yet and try to cancel the (non non-existent) operation. 1146 * the other peer disconnecting. The client may not know about this
1140 */ 1147 * yet and try to cancel the (just barely non-existent) operation.
1141 if (GNUNET_NO != found) 1148 * So this is not a hard error.
1149 */
1150 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1151 "Client canceled non-existent op\n");
1152 }
1153 else
1154 {
1142 _GSS_operation_destroy (op, 1155 _GSS_operation_destroy (op,
1143 GNUNET_YES); 1156 GNUNET_YES);
1144 else 1157 }
1145 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1158 GNUNET_SERVER_receive_done (client,
1146 "client canceled non-existent op\n"); 1159 GNUNET_OK);
1147
1148
1149 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1150} 1160}
1151 1161
1152 1162
1153/** 1163/**
1154 * Handle a request from the client to accept 1164 * Handle a request from the client to accept a set operation that
1155 * a set operation that came from a remote peer. 1165 * came from a remote peer. We forward the accept to the associated
1156 * We forward the accept to the associated operation for handling 1166 * operation for handling
1157 * 1167 *
1158 * @param cls unused 1168 * @param cls unused
1159 * @param client the client 1169 * @param client the client
@@ -1167,28 +1177,27 @@ handle_client_accept (void *cls,
1167 struct Set *set; 1177 struct Set *set;
1168 const struct GNUNET_SET_AcceptMessage *msg; 1178 const struct GNUNET_SET_AcceptMessage *msg;
1169 struct Operation *op; 1179 struct Operation *op;
1180 struct GNUNET_SET_ResultMessage *result_message;
1181 struct GNUNET_MQ_Envelope *ev;
1170 1182
1171 msg = (const struct GNUNET_SET_AcceptMessage *) mh; 1183 msg = (const struct GNUNET_SET_AcceptMessage *) mh;
1172
1173 // client without a set requested an operation
1174 set = set_get (client); 1184 set = set_get (client);
1175
1176 if (NULL == set) 1185 if (NULL == set)
1177 { 1186 {
1187 /* client without a set requested to accept */
1178 GNUNET_break (0); 1188 GNUNET_break (0);
1179 GNUNET_SERVER_client_disconnect (client); 1189 GNUNET_SERVER_client_disconnect (client);
1180 return; 1190 return;
1181 } 1191 }
1182
1183 op = get_incoming (ntohl (msg->accept_reject_id)); 1192 op = get_incoming (ntohl (msg->accept_reject_id));
1184
1185 /* it is not an error if the set op does not exist -- it may
1186 * have been destroyed when the partner peer disconnected. */
1187 if (NULL == op) 1193 if (NULL == op)
1188 { 1194 {
1189 struct GNUNET_SET_ResultMessage *result_message; 1195 /* It is not an error if the set op does not exist -- it may
1190 struct GNUNET_MQ_Envelope *ev; 1196 * have been destroyed when the partner peer disconnected. */
1191 ev = GNUNET_MQ_msg (result_message, GNUNET_MESSAGE_TYPE_SET_RESULT); 1197 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1198 "Client accepted request that is no longer active\n");
1199 ev = GNUNET_MQ_msg (result_message,
1200 GNUNET_MESSAGE_TYPE_SET_RESULT);
1192 result_message->request_id = msg->request_id; 1201 result_message->request_id = msg->request_id;
1193 result_message->element_type = 0; 1202 result_message->element_type = 0;
1194 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE); 1203 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
@@ -1198,33 +1207,24 @@ handle_client_accept (void *cls,
1198 } 1207 }
1199 1208
1200 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1209 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1201 "client accepting %u\n", 1210 "Client accepting request %u\n",
1202 ntohl (msg->accept_reject_id)); 1211 ntohl (msg->accept_reject_id));
1203
1204 GNUNET_assert (GNUNET_YES == op->is_incoming);
1205
1206
1207 op->spec->set = set;
1208
1209 GNUNET_assert (GNUNET_YES == op->is_incoming); 1212 GNUNET_assert (GNUNET_YES == op->is_incoming);
1210 op->is_incoming = GNUNET_NO; 1213 op->is_incoming = GNUNET_NO;
1211 GNUNET_CONTAINER_DLL_remove (incoming_head, 1214 GNUNET_CONTAINER_DLL_remove (incoming_head,
1212 incoming_tail, 1215 incoming_tail,
1213 op); 1216 op);
1214 1217 op->spec->set = set;
1215 GNUNET_assert (NULL != op->spec->set);
1216 GNUNET_assert (NULL != op->spec->set->vt);
1217
1218 GNUNET_CONTAINER_DLL_insert (set->ops_head, 1218 GNUNET_CONTAINER_DLL_insert (set->ops_head,
1219 set->ops_tail, 1219 set->ops_tail,
1220 op); 1220 op);
1221
1222 op->spec->client_request_id = ntohl (msg->request_id); 1221 op->spec->client_request_id = ntohl (msg->request_id);
1223 op->spec->result_mode = ntohl (msg->result_mode); 1222 op->spec->result_mode = ntohl (msg->result_mode);
1224 op->generation_created = set->current_generation++; 1223 op->generation_created = set->current_generation++;
1225 op->vt = op->spec->set->vt; 1224 op->vt = set->vt;
1226 set->vt->accept (op); 1225 op->vt->accept (op);
1227 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1226 GNUNET_SERVER_receive_done (client,
1227 GNUNET_OK);
1228} 1228}
1229 1229
1230 1230
@@ -1240,10 +1240,8 @@ shutdown_task (void *cls,
1240{ 1240{
1241 while (NULL != incoming_head) 1241 while (NULL != incoming_head)
1242 incoming_destroy (incoming_head); 1242 incoming_destroy (incoming_head);
1243
1244 while (NULL != listeners_head) 1243 while (NULL != listeners_head)
1245 listener_destroy (listeners_head); 1244 listener_destroy (listeners_head);
1246
1247 while (NULL != sets_head) 1245 while (NULL != sets_head)
1248 set_destroy (sets_head); 1246 set_destroy (sets_head);
1249 1247
@@ -1280,7 +1278,7 @@ incoming_timeout_cb (void *cls,
1280 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) 1278 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1281 return; 1279 return;
1282 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1280 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1283 "remote peer timed out\n"); 1281 "Remote peer's incoming request timed out\n");
1284 incoming_destroy (incoming); 1282 incoming_destroy (incoming);
1285} 1283}
1286 1284
@@ -1319,7 +1317,7 @@ handle_incoming_disconnect (struct Operation *op)
1319 * @param port Port this channel is for. 1317 * @param port Port this channel is for.
1320 * @param options Unused. 1318 * @param options Unused.
1321 * @return initial channel context for the channel 1319 * @return initial channel context for the channel
1322 * (can be NULL -- that's not an error) 1320 * returns NULL on error
1323 */ 1321 */
1324static void * 1322static void *
1325channel_new_cb (void *cls, 1323channel_new_cb (void *cls,
@@ -1365,7 +1363,7 @@ channel_new_cb (void *cls,
1365 * GNUNET_CADET_channel_destroy() on the channel. 1363 * GNUNET_CADET_channel_destroy() on the channel.
1366 * 1364 *
1367 * The peer_disconnect function is part of a a virtual table set initially either 1365 * The peer_disconnect function is part of a a virtual table set initially either
1368 * when a peer creates a new channel with us (channel_new_cb), or once we create 1366 * when a peer creates a new channel with us (#channel_new_cb()), or once we create
1369 * a new channel ourselves (evaluate). 1367 * a new channel ourselves (evaluate).
1370 * 1368 *
1371 * Once we know the exact type of operation (union/intersection), the vt is 1369 * Once we know the exact type of operation (union/intersection), the vt is
@@ -1384,7 +1382,7 @@ channel_end_cb (void *cls,
1384 struct Operation *op = channel_ctx; 1382 struct Operation *op = channel_ctx;
1385 1383
1386 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1384 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1387 "channel end cb called\n"); 1385 "channel_end_cb called\n");
1388 op->channel = NULL; 1386 op->channel = NULL;
1389 /* the vt can be null if a client already requested canceling op. */ 1387 /* the vt can be null if a client already requested canceling op. */
1390 if (NULL != op->vt) 1388 if (NULL != op->vt)
@@ -1393,14 +1391,13 @@ channel_end_cb (void *cls,
1393 "calling peer disconnect due to channel end\n"); 1391 "calling peer disconnect due to channel end\n");
1394 op->vt->peer_disconnect (op); 1392 op->vt->peer_disconnect (op);
1395 } 1393 }
1396 1394 if (GNUNET_YES != op->keep)
1397 if (GNUNET_YES == op->keep) 1395 {
1398 return; 1396 /* cadet will never call us with the context again! */
1399 1397 GNUNET_free (op);
1400 /* cadet will never call us with the context again! */ 1398 }
1401 GNUNET_free (channel_ctx);
1402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1399 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1403 "channel end cb finished\n"); 1400 "channel_end_cb finished\n");
1404} 1401}
1405 1402
1406 1403
@@ -1432,16 +1429,17 @@ dispatch_p2p_message (void *cls,
1432 int ret; 1429 int ret;
1433 1430
1434 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1431 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1435 "dispatching cadet message (type: %u)\n", 1432 "Dispatching cadet message (type: %u)\n",
1436 ntohs (message->type)); 1433 ntohs (message->type));
1437 /* do this before the handler, as the handler might kill the channel */ 1434 /* do this before the handler, as the handler might kill the channel */
1438 GNUNET_CADET_receive_done (channel); 1435 GNUNET_CADET_receive_done (channel);
1439 if (NULL != op->vt) 1436 if (NULL != op->vt)
1440 ret = op->vt->msg_handler (op, message); 1437 ret = op->vt->msg_handler (op,
1438 message);
1441 else 1439 else
1442 ret = GNUNET_SYSERR; 1440 ret = GNUNET_SYSERR;
1443 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1441 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1444 "handled cadet message (type: %u)\n", 1442 "Handled cadet message (type: %u)\n",
1445 ntohs (message->type)); 1443 ntohs (message->type));
1446 return ret; 1444 return ret;
1447} 1445}
@@ -1460,34 +1458,48 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
1460 const struct GNUNET_CONFIGURATION_Handle *cfg) 1458 const struct GNUNET_CONFIGURATION_Handle *cfg)
1461{ 1459{
1462 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { 1460 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
1463 {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 1461 { &handle_client_accept, NULL,
1464 sizeof (struct GNUNET_SET_AcceptMessage)}, 1462 GNUNET_MESSAGE_TYPE_SET_ACCEPT,
1465 {handle_client_iter_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ITER_ACK, 0}, 1463 sizeof (struct GNUNET_SET_AcceptMessage)},
1466 {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0}, 1464 { &handle_client_iter_ack, NULL,
1467 {handle_client_create_set, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 1465 GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
1468 sizeof (struct GNUNET_SET_CreateMessage)}, 1466 sizeof (struct GNUNET_SET_IterAckMessage) },
1469 {handle_client_iterate, NULL, GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST, 1467 { &handle_client_add, NULL,
1470 sizeof (struct GNUNET_MessageHeader)}, 1468 GNUNET_MESSAGE_TYPE_SET_ADD,
1471 {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0}, 1469 0},
1472 {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 1470 { &handle_client_create_set, NULL,
1473 sizeof (struct GNUNET_SET_ListenMessage)}, 1471 GNUNET_MESSAGE_TYPE_SET_CREATE,
1474 {handle_client_reject, NULL, GNUNET_MESSAGE_TYPE_SET_REJECT, 1472 sizeof (struct GNUNET_SET_CreateMessage)},
1475 sizeof (struct GNUNET_SET_RejectMessage)}, 1473 { &handle_client_iterate, NULL,
1476 {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0}, 1474 GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
1477 {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_CANCEL, 1475 sizeof (struct GNUNET_MessageHeader)},
1478 sizeof (struct GNUNET_SET_CancelMessage)}, 1476 { &handle_client_evaluate, NULL,
1479 {NULL, NULL, 0, 0} 1477 GNUNET_MESSAGE_TYPE_SET_EVALUATE,
1478 0},
1479 { &handle_client_listen, NULL,
1480 GNUNET_MESSAGE_TYPE_SET_LISTEN,
1481 sizeof (struct GNUNET_SET_ListenMessage)},
1482 { &handle_client_reject, NULL,
1483 GNUNET_MESSAGE_TYPE_SET_REJECT,
1484 sizeof (struct GNUNET_SET_RejectMessage)},
1485 { &handle_client_remove, NULL,
1486 GNUNET_MESSAGE_TYPE_SET_REMOVE,
1487 0},
1488 { &handle_client_cancel, NULL,
1489 GNUNET_MESSAGE_TYPE_SET_CANCEL,
1490 sizeof (struct GNUNET_SET_CancelMessage)},
1491 { NULL, NULL, 0, 0}
1480 }; 1492 };
1481 static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = { 1493 static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1482 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0}, 1494 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
1483 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0}, 1495 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0},
1484 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0}, 1496 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
1485 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0}, 1497 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0},
1486 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0}, 1498 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
1487 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0}, 1499 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0},
1488 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0}, 1500 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0},
1489 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0}, 1501 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0},
1490 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART, 0}, 1502 { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART, 0},
1491 {NULL, 0, 0} 1503 {NULL, 0, 0}
1492 }; 1504 };
1493 static const uint32_t cadet_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0}; 1505 static const uint32_t cadet_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0};
@@ -1495,11 +1507,15 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
1495 configuration = cfg; 1507 configuration = cfg;
1496 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, 1508 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1497 &shutdown_task, NULL); 1509 &shutdown_task, NULL);
1498 GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL); 1510 GNUNET_SERVER_disconnect_notify (server,
1499 GNUNET_SERVER_add_handlers (server, server_handlers); 1511 &handle_client_disconnect, NULL);
1500 1512 GNUNET_SERVER_add_handlers (server,
1501 cadet = GNUNET_CADET_connect (cfg, NULL, channel_new_cb, channel_end_cb, 1513 server_handlers);
1502 cadet_handlers, cadet_ports); 1514 cadet = GNUNET_CADET_connect (cfg, NULL,
1515 &channel_new_cb,
1516 &channel_end_cb,
1517 cadet_handlers,
1518 cadet_ports);
1503 if (NULL == cadet) 1519 if (NULL == cadet)
1504 { 1520 {
1505 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1521 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1517,14 +1533,15 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
1517 * @return 0 ok, 1 on error 1533 * @return 0 ok, 1 on error
1518 */ 1534 */
1519int 1535int
1520main (int argc, char *const *argv) 1536main (int argc,
1537 char *const *argv)
1521{ 1538{
1522 int ret; 1539 int ret;
1523 1540
1524 ret = GNUNET_SERVICE_run (argc, argv, "set", 1541 ret = GNUNET_SERVICE_run (argc, argv, "set",
1525 GNUNET_SERVICE_OPTION_NONE, &run, NULL); 1542 GNUNET_SERVICE_OPTION_NONE,
1543 &run, NULL);
1526 return (GNUNET_OK == ret) ? 0 : 1; 1544 return (GNUNET_OK == ret) ? 0 : 1;
1527} 1545}
1528 1546
1529/* end of gnunet-service-set.c */ 1547/* end of gnunet-service-set.c */
1530
diff --git a/src/set/set.h b/src/set/set.h
index b470ebf92..01c96896d 100644
--- a/src/set/set.h
+++ b/src/set/set.h
@@ -207,7 +207,8 @@ struct GNUNET_SET_ResultMessage
207 uint32_t request_id GNUNET_PACKED; 207 uint32_t request_id GNUNET_PACKED;
208 208
209 /** 209 /**
210 * Was the evaluation successful? 210 * Was the evaluation successful? Contains
211 * an `enum GNUNET_SET_Status` in NBO.
211 */ 212 */
212 uint16_t result_status GNUNET_PACKED; 213 uint16_t result_status GNUNET_PACKED;
213 214
diff --git a/src/set/set_api.c b/src/set/set_api.c
index 90cba446c..d62475013 100644
--- a/src/set/set_api.c
+++ b/src/set/set_api.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 (C) 2012, 2013 Christian Grothoff (and other contributing authors) 3 (C) 2012-2014 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -17,7 +17,6 @@
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330, 17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA. 18 Boston, MA 02111-1307, USA.
19*/ 19*/
20
21/** 20/**
22 * @file set/set_api.c 21 * @file set/set_api.c
23 * @brief api for the set service 22 * @brief api for the set service
@@ -44,7 +43,7 @@ struct GNUNET_SET_Handle
44 struct GNUNET_CLIENT_Connection *client; 43 struct GNUNET_CLIENT_Connection *client;
45 44
46 /** 45 /**
47 * Message queue for 'client'. 46 * Message queue for @e client.
48 */ 47 */
49 struct GNUNET_MQ_Handle *mq; 48 struct GNUNET_MQ_Handle *mq;
50 49
@@ -59,30 +58,30 @@ struct GNUNET_SET_Handle
59 struct GNUNET_SET_OperationHandle *ops_tail; 58 struct GNUNET_SET_OperationHandle *ops_tail;
60 59
61 /** 60 /**
62 * Should the set be destroyed once all operations are gone? 61 * Callback for the current iteration over the set,
62 * NULL if no iterator is active.
63 */ 63 */
64 int destroy_requested; 64 GNUNET_SET_ElementIterator iterator;
65 65
66 /** 66 /**
67 * Has the set become invalid (e.g. service died)? 67 * Closure for @e iterator
68 */ 68 */
69 int invalid; 69 void *iterator_cls;
70 70
71 /** 71 /**
72 * Callback for the current iteration over the set, 72 * Should the set be destroyed once all operations are gone?
73 * NULL if no iterator is active.
74 */ 73 */
75 GNUNET_SET_ElementIterator iterator; 74 int destroy_requested;
76 75
77 /** 76 /**
78 * Closure for 'iterator' 77 * Has the set become invalid (e.g. service died)?
79 */ 78 */
80 void *iterator_cls; 79 int invalid;
81}; 80};
82 81
83 82
84/** 83/**
85 * Opaque handle to a set operation request from another peer. 84 * Handle for a set operation request from another peer.
86 */ 85 */
87struct GNUNET_SET_Request 86struct GNUNET_SET_Request
88{ 87{
@@ -94,15 +93,14 @@ struct GNUNET_SET_Request
94 93
95 /** 94 /**
96 * Has the request been accepted already? 95 * Has the request been accepted already?
97 * GNUNET_YES/GNUNET_NO 96 * #GNUNET_YES/#GNUNET_NO
98 */ 97 */
99 int accepted; 98 int accepted;
100}; 99};
101 100
102 101
103/** 102/**
104 * Handle to an operation. 103 * Handle to an operation. Only known to the service after committing
105 * Only known to the service after commiting
106 * the handle with a set. 104 * the handle with a set.
107 */ 105 */
108struct GNUNET_SET_OperationHandle 106struct GNUNET_SET_OperationHandle
@@ -114,7 +112,7 @@ struct GNUNET_SET_OperationHandle
114 GNUNET_SET_ResultIterator result_cb; 112 GNUNET_SET_ResultIterator result_cb;
115 113
116 /** 114 /**
117 * Closure for result_cb. 115 * Closure for @e result_cb.
118 */ 116 */
119 void *result_cls; 117 void *result_cls;
120 118
@@ -125,11 +123,6 @@ struct GNUNET_SET_OperationHandle
125 struct GNUNET_SET_Handle *set; 123 struct GNUNET_SET_Handle *set;
126 124
127 /** 125 /**
128 * Request ID to identify the operation within the set.
129 */
130 uint32_t request_id;
131
132 /**
133 * Message sent to the server on calling conclude, 126 * Message sent to the server on calling conclude,
134 * NULL if conclude has been called. 127 * NULL if conclude has been called.
135 */ 128 */
@@ -150,6 +143,11 @@ struct GNUNET_SET_OperationHandle
150 * Handles are kept in a linked list. 143 * Handles are kept in a linked list.
151 */ 144 */
152 struct GNUNET_SET_OperationHandle *next; 145 struct GNUNET_SET_OperationHandle *next;
146
147 /**
148 * Request ID to identify the operation within the set.
149 */
150 uint32_t request_id;
153}; 151};
154 152
155 153
@@ -182,16 +180,11 @@ struct GNUNET_SET_ListenHandle
182 GNUNET_SET_ListenCallback listen_cb; 180 GNUNET_SET_ListenCallback listen_cb;
183 181
184 /** 182 /**
185 * Closure for listen_cb. 183 * Closure for @e listen_cb.
186 */ 184 */
187 void *listen_cls; 185 void *listen_cls;
188 186
189 /** 187 /**
190 * Operation we listen for.
191 */
192 enum GNUNET_SET_OperationType operation;
193
194 /**
195 * Application ID we listen for. 188 * Application ID we listen for.
196 */ 189 */
197 struct GNUNET_HashCode app_id; 190 struct GNUNET_HashCode app_id;
@@ -205,59 +198,78 @@ struct GNUNET_SET_ListenHandle
205 * Task for reconnecting when the listener fails. 198 * Task for reconnecting when the listener fails.
206 */ 199 */
207 GNUNET_SCHEDULER_TaskIdentifier reconnect_task; 200 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
208};
209 201
210 202 /**
211/* forward declaration */ 203 * Operation we listen for.
212static void 204 */
213listen_connect (void *cls, 205 enum GNUNET_SET_OperationType operation;
214 const struct GNUNET_SCHEDULER_TaskContext *tc); 206};
215 207
216 208
217/** 209/**
218 * Handle element for iteration over the set. 210 * Handle element for iteration over the set. Notifies the
211 * iterator and sends an acknowledgement to the service.
219 * 212 *
220 * @param cls the set 213 * @param cls the set
221 * @param mh the message 214 * @param mh the message
222 */ 215 */
223static void 216static void
224handle_iter_element (void *cls, const struct GNUNET_MessageHeader *mh) 217handle_iter_element (void *cls,
218 const struct GNUNET_MessageHeader *mh)
225{ 219{
226 struct GNUNET_SET_Handle *set = cls; 220 struct GNUNET_SET_Handle *set = cls;
221 GNUNET_SET_ElementIterator iter = set->iterator;
227 struct GNUNET_SET_Element element; 222 struct GNUNET_SET_Element element;
228 const struct GNUNET_SET_IterResponseMessage *msg = 223 const struct GNUNET_SET_IterResponseMessage *msg;
229 (const struct GNUNET_SET_IterResponseMessage *) mh;
230 struct GNUNET_SET_IterAckMessage *ack_msg; 224 struct GNUNET_SET_IterAckMessage *ack_msg;
231 struct GNUNET_MQ_Envelope *ev; 225 struct GNUNET_MQ_Envelope *ev;
226 uint16_t msize;
232 227
233 if (NULL == set->iterator) 228 msize = ntohs (mh->size);
234 return; 229 if (msize < sizeof (sizeof (struct GNUNET_SET_IterResponseMessage)))
235 230 {
236 element.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_IterResponseMessage); 231 /* message malformed */
237 element.element_type = htons (msg->element_type); 232 GNUNET_break (0);
238 element.data = &msg[1]; 233 set->iterator = NULL;
239 set->iterator (set->iterator_cls, &element); 234 iter (set->iterator_cls,
240 ev = GNUNET_MQ_msg (ack_msg, GNUNET_MESSAGE_TYPE_SET_ITER_ACK); 235 NULL);
241 ack_msg->send_more = htonl (1); 236 iter = NULL;
237 }
238 if (NULL != iter)
239 {
240 msg = (const struct GNUNET_SET_IterResponseMessage *) mh;
241 element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage);
242 element.element_type = htons (msg->element_type);
243 element.data = &msg[1];
244 iter (set->iterator_cls,
245 &element);
246 }
247 ev = GNUNET_MQ_msg (ack_msg,
248 GNUNET_MESSAGE_TYPE_SET_ITER_ACK);
249 ack_msg->send_more = htonl ((NULL != iter));
242 GNUNET_MQ_send (set->mq, ev); 250 GNUNET_MQ_send (set->mq, ev);
243} 251}
244 252
245 253
246/** 254/**
247 * Handle element for iteration over the set. 255 * Handle message signalling conclusion of iteration over the set.
256 * Notifies the iterator that we are done.
248 * 257 *
249 * @param cls the set 258 * @param cls the set
250 * @param mh the message 259 * @param mh the message
251 */ 260 */
252static void 261static void
253handle_iter_done (void *cls, const struct GNUNET_MessageHeader *mh) 262handle_iter_done (void *cls,
263 const struct GNUNET_MessageHeader *mh)
254{ 264{
255 struct GNUNET_SET_Handle *set = cls; 265 struct GNUNET_SET_Handle *set = cls;
266 GNUNET_SET_ElementIterator iter = set->iterator;
256 267
257 if (NULL == set->iterator) 268 if (NULL == iter)
258 return; 269 return;
259 270 set->iterator = NULL;
260 set->iterator (set->iterator_cls, NULL); 271 iter (set->iterator_cls,
272 NULL);
261} 273}
262 274
263 275
@@ -268,47 +280,53 @@ handle_iter_done (void *cls, const struct GNUNET_MessageHeader *mh)
268 * @param mh the message 280 * @param mh the message
269 */ 281 */
270static void 282static void
271handle_result (void *cls, const struct GNUNET_MessageHeader *mh) 283handle_result (void *cls,
284 const struct GNUNET_MessageHeader *mh)
272{ 285{
273 const struct GNUNET_SET_ResultMessage *msg;
274 struct GNUNET_SET_Handle *set = cls; 286 struct GNUNET_SET_Handle *set = cls;
287 const struct GNUNET_SET_ResultMessage *msg;
275 struct GNUNET_SET_OperationHandle *oh; 288 struct GNUNET_SET_OperationHandle *oh;
276 struct GNUNET_SET_Element e; 289 struct GNUNET_SET_Element e;
277 enum GNUNET_SET_Status result_status; 290 enum GNUNET_SET_Status result_status;
278 291
279 msg = (const struct GNUNET_SET_ResultMessage *) mh; 292 msg = (const struct GNUNET_SET_ResultMessage *) mh;
280 GNUNET_assert (NULL != set);
281 GNUNET_assert (NULL != set->mq); 293 GNUNET_assert (NULL != set->mq);
282
283 result_status = ntohs (msg->result_status); 294 result_status = ntohs (msg->result_status);
284 295 oh = GNUNET_MQ_assoc_get (set->mq,
285 oh = GNUNET_MQ_assoc_get (set->mq, ntohl (msg->request_id)); 296 ntohl (msg->request_id));
286 // 'oh' can be NULL if we canceled the operation, but the service
287 // did not get the cancel message yet.
288 if (NULL == oh) 297 if (NULL == oh)
289 { 298 {
290 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ignoring result from canceled operation\n"); 299 /* 'oh' can be NULL if we canceled the operation, but the service
300 did not get the cancel message yet. */
301 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
302 "Ignoring result from canceled operation\n");
291 return; 303 return;
292 } 304 }
293 /* status is not STATUS_OK => there's no attached element,
294 * and this is the last result message we get */
295 if (GNUNET_SET_STATUS_OK != result_status) 305 if (GNUNET_SET_STATUS_OK != result_status)
296 { 306 {
307 /* status is not STATUS_OK => there's no attached element,
308 * and this is the last result message we get */
297 GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id)); 309 GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id));
298 GNUNET_CONTAINER_DLL_remove (oh->set->ops_head, oh->set->ops_tail, oh); 310 GNUNET_CONTAINER_DLL_remove (set->ops_head,
299 if (GNUNET_YES == oh->set->destroy_requested) 311 set->ops_tail,
300 GNUNET_SET_destroy (oh->set); 312 oh);
313 if ( (GNUNET_YES == set->destroy_requested) &&
314 (NULL == set->ops_head) )
315 GNUNET_SET_destroy (set);
301 if (NULL != oh->result_cb) 316 if (NULL != oh->result_cb)
302 oh->result_cb (oh->result_cls, NULL, result_status); 317 oh->result_cb (oh->result_cls,
318 NULL,
319 result_status);
303 GNUNET_free (oh); 320 GNUNET_free (oh);
304 return; 321 return;
305 } 322 }
306
307 e.data = &msg[1]; 323 e.data = &msg[1];
308 e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage); 324 e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
309 e.element_type = msg->element_type; 325 e.element_type = msg->element_type;
310 if (NULL != oh->result_cb) 326 if (NULL != oh->result_cb)
311 oh->result_cb (oh->result_cls, &e, result_status); 327 oh->result_cb (oh->result_cls,
328 &e,
329 result_status);
312} 330}
313 331
314 332
@@ -360,93 +378,30 @@ handle_request (void *cls,
360} 378}
361 379
362 380
363static void
364handle_client_listener_error (void *cls,
365 enum GNUNET_MQ_Error error)
366{
367 struct GNUNET_SET_ListenHandle *lh = cls;
368
369 LOG (GNUNET_ERROR_TYPE_DEBUG,
370 "listener broke down, re-connecting\n");
371 GNUNET_CLIENT_disconnect (lh->client);
372 lh->client = NULL;
373 GNUNET_MQ_destroy (lh->mq);
374 lh->mq = NULL;
375 lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
376 &listen_connect, lh);
377 lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
378}
379
380
381/** 381/**
382 * Destroy the set handle if no operations are left, mark the set 382 * Destroy the given set operation.
383 * for destruction otherwise.
384 * 383 *
385 * @param set set handle to destroy 384 * @param oh set operation to destroy
386 */ 385 */
387static int 386static void
388set_destroy (struct GNUNET_SET_Handle *set) 387set_operation_destroy (struct GNUNET_SET_OperationHandle *oh)
389{
390 if (NULL != set->ops_head)
391 {
392 set->destroy_requested = GNUNET_YES;
393 return GNUNET_NO;
394 }
395 LOG (GNUNET_ERROR_TYPE_DEBUG, "Really destroying set\n");
396 GNUNET_CLIENT_disconnect (set->client);
397 set->client = NULL;
398 GNUNET_MQ_destroy (set->mq);
399 set->mq = NULL;
400 GNUNET_free (set);
401 return GNUNET_YES;
402}
403
404
405/**
406 * Cancel the given set operation. We need to send an explicit cancel message,
407 * as all operations one one set communicate using one handle.
408 *
409 * In contrast to #GNUNET_SET_operation_cancel(), this function indicates whether
410 * the set of the operation has been destroyed because all operations are done and
411 * the set's destruction was requested before.
412 *
413 * @param oh set operation to cancel
414 * @return #GNUNET_YES if the set of the operation was destroyed
415 */
416static int
417set_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
418{ 388{
419 int ret = GNUNET_NO; 389 struct GNUNET_SET_Handle *set = oh->set;
390 struct GNUNET_SET_OperationHandle *h_assoc;
420 391
421 if (NULL != oh->conclude_mqm) 392 if (NULL != oh->conclude_mqm)
422 GNUNET_MQ_discard (oh->conclude_mqm); 393 GNUNET_MQ_discard (oh->conclude_mqm);
423
424 /* is the operation already commited? */ 394 /* is the operation already commited? */
425 if (NULL != oh->set) 395 if (NULL != set)
426 { 396 {
427 struct GNUNET_SET_OperationHandle *h_assoc; 397 GNUNET_CONTAINER_DLL_remove (set->ops_head,
428 struct GNUNET_SET_CancelMessage *m; 398 set->ops_tail,
429 struct GNUNET_MQ_Envelope *mqm;
430
431 GNUNET_CONTAINER_DLL_remove (oh->set->ops_head,
432 oh->set->ops_tail,
433 oh); 399 oh);
434 h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, 400 h_assoc = GNUNET_MQ_assoc_remove (set->mq,
435 oh->request_id); 401 oh->request_id);
436 GNUNET_assert ((h_assoc == NULL) || (h_assoc == oh)); 402 GNUNET_assert ((NULL == h_assoc) || (h_assoc == oh));
437 mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
438 m->request_id = htonl (oh->request_id);
439 GNUNET_MQ_send (oh->set->mq, mqm);
440
441 if (GNUNET_YES == oh->set->destroy_requested)
442 {
443 LOG (GNUNET_ERROR_TYPE_DEBUG,
444 "Destroying set after operation cancel\n");
445 ret = set_destroy (oh->set);
446 }
447 } 403 }
448 GNUNET_free (oh); 404 GNUNET_free (oh);
449 return ret;
450} 405}
451 406
452 407
@@ -460,27 +415,58 @@ set_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
460void 415void
461GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh) 416GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
462{ 417{
463 (void) set_operation_cancel (oh); 418 struct GNUNET_SET_Handle *set = oh->set;
419 struct GNUNET_SET_CancelMessage *m;
420 struct GNUNET_MQ_Envelope *mqm;
421
422 if (NULL != set)
423 {
424 mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
425 m->request_id = htonl (oh->request_id);
426 GNUNET_MQ_send (set->mq, mqm);
427 }
428 set_operation_destroy (oh);
429 if ( (NULL != set) &&
430 (GNUNET_YES == set->destroy_requested) &&
431 (NULL == set->ops_head) )
432 {
433 LOG (GNUNET_ERROR_TYPE_DEBUG,
434 "Destroying set after operation cancel\n");
435 GNUNET_SET_destroy (set);
436 }
464} 437}
465 438
466 439
440/**
441 * We encountered an error communicating with the set service while
442 * performing a set operation. Report to the application.
443 *
444 * @param cls the `struct GNUNET_SET_Handle`
445 * @param error error code
446 */
467static void 447static void
468handle_client_set_error (void *cls, enum GNUNET_MQ_Error error) 448handle_client_set_error (void *cls,
449 enum GNUNET_MQ_Error error)
469{ 450{
470 struct GNUNET_SET_Handle *set = cls; 451 struct GNUNET_SET_Handle *set = cls;
471 452
472 LOG (GNUNET_ERROR_TYPE_DEBUG, 453 LOG (GNUNET_ERROR_TYPE_DEBUG,
473 "handling client set error\n"); 454 "Handling client set error\n");
474
475 while (NULL != set->ops_head) 455 while (NULL != set->ops_head)
476 { 456 {
477 if (NULL != set->ops_head->result_cb) 457 if (NULL != set->ops_head->result_cb)
478 set->ops_head->result_cb (set->ops_head->result_cls, NULL, 458 set->ops_head->result_cb (set->ops_head->result_cls,
459 NULL,
479 GNUNET_SET_STATUS_FAILURE); 460 GNUNET_SET_STATUS_FAILURE);
480 if (GNUNET_YES == set_operation_cancel (set->ops_head)) 461 set_operation_destroy (set->ops_head);
481 return; /* stop if the set is destroyed */
482 } 462 }
483 set->invalid = GNUNET_YES; 463 set->invalid = GNUNET_YES;
464 if (GNUNET_YES == set->destroy_requested)
465 {
466 LOG (GNUNET_ERROR_TYPE_DEBUG,
467 "Destroying set after operation failure\n");
468 GNUNET_SET_destroy (set);
469 }
484} 470}
485 471
486 472
@@ -500,9 +486,11 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
500 enum GNUNET_SET_OperationType op) 486 enum GNUNET_SET_OperationType op)
501{ 487{
502 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { 488 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
503 {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT, 0}, 489 { &handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT, 0},
504 {handle_iter_element, GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT, 0}, 490 { &handle_iter_element, GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT, 0},
505 {handle_iter_done, GNUNET_MESSAGE_TYPE_SET_ITER_DONE, 0}, 491 { &handle_iter_done,
492 GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
493 sizeof (struct GNUNET_MessageHeader) },
506 GNUNET_MQ_HANDLERS_END 494 GNUNET_MQ_HANDLERS_END
507 }; 495 };
508 struct GNUNET_SET_Handle *set; 496 struct GNUNET_SET_Handle *set;
@@ -511,12 +499,17 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
511 499
512 set = GNUNET_new (struct GNUNET_SET_Handle); 500 set = GNUNET_new (struct GNUNET_SET_Handle);
513 set->client = GNUNET_CLIENT_connect ("set", cfg); 501 set->client = GNUNET_CLIENT_connect ("set", cfg);
514 LOG (GNUNET_ERROR_TYPE_DEBUG, "set client created\n"); 502 if (NULL == set->client)
515 GNUNET_assert (NULL != set->client); 503 {
516 set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, 504 GNUNET_free (set);
517 handle_client_set_error, set); 505 return NULL;
506 }
507 set->mq = GNUNET_MQ_queue_for_connection_client (set->client,
508 mq_handlers,
509 &handle_client_set_error, set);
518 GNUNET_assert (NULL != set->mq); 510 GNUNET_assert (NULL != set->mq);
519 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE); 511 mqm = GNUNET_MQ_msg (msg,
512 GNUNET_MESSAGE_TYPE_SET_CREATE);
520 msg->operation = htonl (op); 513 msg->operation = htonl (op);
521 GNUNET_MQ_send (set->mq, mqm); 514 GNUNET_MQ_send (set->mq, mqm);
522 return set; 515 return set;
@@ -524,10 +517,10 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
524 517
525 518
526/** 519/**
527 * Add an element to the given set. 520 * Add an element to the given set. After the element has been added
528 * After the element has been added (in the sense of being 521 * (in the sense of being transmitted to the set service), @a cont
529 * transmitted to the set service), cont will be called. 522 * will be called. Multiple calls to GNUNET_SET_add_element() can be
530 * Calls to add_element can be queued 523 * queued.
531 * 524 *
532 * @param set set to add element to 525 * @param set set to add element to
533 * @param element element to add to the set 526 * @param element element to add to the set
@@ -551,21 +544,24 @@ GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
551 cont (cont_cls); 544 cont (cont_cls);
552 return GNUNET_SYSERR; 545 return GNUNET_SYSERR;
553 } 546 }
554 547 mqm = GNUNET_MQ_msg_extra (msg, element->size,
555 mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD); 548 GNUNET_MESSAGE_TYPE_SET_ADD);
556 msg->element_type = element->element_type; 549 msg->element_type = element->element_type;
557 memcpy (&msg[1], element->data, element->size); 550 memcpy (&msg[1],
558 GNUNET_MQ_notify_sent (mqm, cont, cont_cls); 551 element->data,
552 element->size);
553 GNUNET_MQ_notify_sent (mqm,
554 cont, cont_cls);
559 GNUNET_MQ_send (set->mq, mqm); 555 GNUNET_MQ_send (set->mq, mqm);
560 return GNUNET_OK; 556 return GNUNET_OK;
561} 557}
562 558
563 559
564/** 560/**
565 * Remove an element to the given set. 561 * Remove an element to the given set. After the element has been
566 * After the element has been removed (in the sense of the 562 * removed (in the sense of the request being transmitted to the set
567 * request being transmitted to the set service), cont will be called. 563 * service), @a cont will be called. Multiple calls to
568 * Calls to remove_element can be queued 564 * GNUNET_SET_remove_element() can be queued
569 * 565 *
570 * @param set set to remove element from 566 * @param set set to remove element from
571 * @param element element to remove from the set 567 * @param element element to remove from the set
@@ -589,27 +585,49 @@ GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
589 cont (cont_cls); 585 cont (cont_cls);
590 return GNUNET_SYSERR; 586 return GNUNET_SYSERR;
591 } 587 }
592
593 mqm = GNUNET_MQ_msg_extra (msg, 588 mqm = GNUNET_MQ_msg_extra (msg,
594 element->size, 589 element->size,
595 GNUNET_MESSAGE_TYPE_SET_REMOVE); 590 GNUNET_MESSAGE_TYPE_SET_REMOVE);
596 msg->element_type = element->element_type; 591 msg->element_type = element->element_type;
597 memcpy (&msg[1], element->data, element->size); 592 memcpy (&msg[1],
598 GNUNET_MQ_notify_sent (mqm, cont, cont_cls); 593 element->data,
594 element->size);
595 GNUNET_MQ_notify_sent (mqm,
596 cont, cont_cls);
599 GNUNET_MQ_send (set->mq, mqm); 597 GNUNET_MQ_send (set->mq, mqm);
600 return GNUNET_OK; 598 return GNUNET_OK;
601} 599}
602 600
603 601
604/** 602/**
605 * Destroy the set handle, and free all associated resources. 603 * Destroy the set handle if no operations are left, mark the set
604 * for destruction otherwise.
606 * 605 *
607 * @param set set handle to destroy 606 * @param set set handle to destroy
608 */ 607 */
609void 608void
610GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) 609GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
611{ 610{
612 (void) set_destroy (set); 611 if (NULL != set->ops_head)
612 {
613 LOG (GNUNET_ERROR_TYPE_DEBUG,
614 "Set operations are pending, delaying set destruction\n");
615 set->destroy_requested = GNUNET_YES;
616 return;
617 }
618 LOG (GNUNET_ERROR_TYPE_DEBUG,
619 "Really destroying set\n");
620 if (NULL != set->client)
621 {
622 GNUNET_CLIENT_disconnect (set->client);
623 set->client = NULL;
624 }
625 if (NULL != set->mq)
626 {
627 GNUNET_MQ_destroy (set->mq);
628 set->mq = NULL;
629 }
630 GNUNET_free (set);
613} 631}
614 632
615 633
@@ -656,43 +674,76 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer,
656 674
657 675
658/** 676/**
659 * Connect to the set service in order to listen 677 * Connect to the set service in order to listen for requests.
660 * for request.
661 * 678 *
662 * @param cls the listen handle to connect 679 * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
663 * @param tc task context if invoked as a task, NULL otherwise 680 * @param tc task context if invoked as a task, NULL otherwise
664 */ 681 */
665static void 682static void
666listen_connect (void *cls, 683listen_connect (void *cls,
667 const struct GNUNET_SCHEDULER_TaskContext *tc) 684 const struct GNUNET_SCHEDULER_TaskContext *tc);
685
686
687/**
688 * Our connection with the set service encountered an error,
689 * re-initialize with exponential back-off.
690 *
691 * @param cls the `struct GNUNET_SET_ListenHandle *`
692 * @param error reason for the disconnect
693 */
694static void
695handle_client_listener_error (void *cls,
696 enum GNUNET_MQ_Error error)
668{ 697{
669 struct GNUNET_MQ_Envelope *mqm;
670 struct GNUNET_SET_ListenMessage *msg;
671 struct GNUNET_SET_ListenHandle *lh = cls; 698 struct GNUNET_SET_ListenHandle *lh = cls;
699
700 LOG (GNUNET_ERROR_TYPE_DEBUG,
701 "Listener broke down (%d), re-connecting\n",
702 (int) error);
703 GNUNET_CLIENT_disconnect (lh->client);
704 lh->client = NULL;
705 GNUNET_MQ_destroy (lh->mq);
706 lh->mq = NULL;
707 lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
708 &listen_connect, lh);
709 lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
710}
711
712
713/**
714 * Connect to the set service in order to listen for requests.
715 *
716 * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
717 * @param tc task context if invoked as a task, NULL otherwise
718 */
719static void
720listen_connect (void *cls,
721 const struct GNUNET_SCHEDULER_TaskContext *tc)
722{
672 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { 723 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
673 {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST}, 724 { &handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST },
674 GNUNET_MQ_HANDLERS_END 725 GNUNET_MQ_HANDLERS_END
675 }; 726 };
727 struct GNUNET_SET_ListenHandle *lh = cls;
728 struct GNUNET_MQ_Envelope *mqm;
729 struct GNUNET_SET_ListenMessage *msg;
676 730
677 if ((tc != NULL) &&(tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) 731 if ( (NULL != tc) &&
732 (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) )
678 { 733 {
679 LOG (GNUNET_ERROR_TYPE_DEBUG, "listener not reconnecting due to shutdown\n"); 734 LOG (GNUNET_ERROR_TYPE_DEBUG,
735 "Listener not reconnecting due to shutdown\n");
680 return; 736 return;
681 } 737 }
682
683 lh->reconnect_task = GNUNET_SCHEDULER_NO_TASK; 738 lh->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
684 739
685 GNUNET_assert (NULL == lh->client); 740 GNUNET_assert (NULL == lh->client);
686 lh->client = GNUNET_CLIENT_connect ("set", lh->cfg); 741 lh->client = GNUNET_CLIENT_connect ("set", lh->cfg);
687 if (NULL == lh->client) 742 if (NULL == lh->client)
688 {
689 LOG (GNUNET_ERROR_TYPE_ERROR,
690 "could not connect to set (wrong configuration?), giving up listening\n");
691 return; 743 return;
692 }
693 GNUNET_assert (NULL == lh->mq); 744 GNUNET_assert (NULL == lh->mq);
694 lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers, 745 lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers,
695 handle_client_listener_error, lh); 746 &handle_client_listener_error, lh);
696 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN); 747 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
697 msg->operation = htonl (lh->operation); 748 msg->operation = htonl (lh->operation);
698 msg->app_id = lh->app_id; 749 msg->app_id = lh->app_id;
@@ -709,7 +760,7 @@ listen_connect (void *cls,
709 * @param app_id id of the application that handles set operation requests 760 * @param app_id id of the application that handles set operation requests
710 * @param listen_cb called for each incoming request matching the operation 761 * @param listen_cb called for each incoming request matching the operation
711 * and application id 762 * and application id
712 * @param listen_cls handle for listen_cb 763 * @param listen_cls handle for @a listen_cb
713 * @return a handle that can be used to cancel the listen operation 764 * @return a handle that can be used to cancel the listen operation
714 */ 765 */
715struct GNUNET_SET_ListenHandle * 766struct GNUNET_SET_ListenHandle *
@@ -729,6 +780,11 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
729 lh->app_id = *app_id; 780 lh->app_id = *app_id;
730 lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; 781 lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
731 listen_connect (lh, NULL); 782 listen_connect (lh, NULL);
783 if (NULL == lh->client)
784 {
785 GNUNET_free (lh);
786 return NULL;
787 }
732 return lh; 788 return lh;
733} 789}
734 790
@@ -741,8 +797,8 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
741void 797void
742GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh) 798GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
743{ 799{
744 LOG (GNUNET_ERROR_TYPE_DEBUG, "canceling listener\n"); 800 LOG (GNUNET_ERROR_TYPE_DEBUG,
745 /* listener's connection may have failed, thus mq/client could be NULL */ 801 "Canceling listener\n");
746 if (NULL != lh->mq) 802 if (NULL != lh->mq)
747 { 803 {
748 GNUNET_MQ_destroy (lh->mq); 804 GNUNET_MQ_destroy (lh->mq);
@@ -786,21 +842,16 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request,
786 struct GNUNET_SET_OperationHandle *oh; 842 struct GNUNET_SET_OperationHandle *oh;
787 struct GNUNET_SET_AcceptMessage *msg; 843 struct GNUNET_SET_AcceptMessage *msg;
788 844
789 GNUNET_assert (NULL != request);
790 GNUNET_assert (GNUNET_NO == request->accepted); 845 GNUNET_assert (GNUNET_NO == request->accepted);
791 request->accepted = GNUNET_YES; 846 request->accepted = GNUNET_YES;
792
793 oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
794 oh->result_cb = result_cb;
795 oh->result_cls = result_cls;
796
797 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT); 847 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
798 msg->accept_reject_id = htonl (request->accept_id); 848 msg->accept_reject_id = htonl (request->accept_id);
799 msg->result_mode = htonl (result_mode); 849 msg->result_mode = htonl (result_mode);
800 850 oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
851 oh->result_cb = result_cb;
852 oh->result_cls = result_cls;
801 oh->conclude_mqm = mqm; 853 oh->conclude_mqm = mqm;
802 oh->request_id_addr = &msg->request_id; 854 oh->request_id_addr = &msg->request_id;
803
804 return oh; 855 return oh;
805} 856}
806 857
@@ -840,9 +891,9 @@ GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
840 891
841 892
842/** 893/**
843 * Iterate over all elements in the given set. 894 * Iterate over all elements in the given set. Note that this
844 * Note that this operation involves transferring every element of the set 895 * operation involves transferring every element of the set from the
845 * from the service to the client, and is thus costly. 896 * service to the client, and is thus costly.
846 * 897 *
847 * @param set the set to iterate over 898 * @param set the set to iterate over
848 * @param iter the iterator to call for each element 899 * @param iter the iterator to call for each element
@@ -858,15 +909,13 @@ GNUNET_SET_iterate (struct GNUNET_SET_Handle *set,
858{ 909{
859 struct GNUNET_MQ_Envelope *ev; 910 struct GNUNET_MQ_Envelope *ev;
860 911
861
862 GNUNET_assert (NULL != iter); 912 GNUNET_assert (NULL != iter);
863
864 if (GNUNET_YES == set->invalid) 913 if (GNUNET_YES == set->invalid)
865 return GNUNET_SYSERR; 914 return GNUNET_SYSERR;
866 if (NULL != set->iterator) 915 if (NULL != set->iterator)
867 return GNUNET_NO; 916 return GNUNET_NO;
868 LOG (GNUNET_ERROR_TYPE_DEBUG, 917 LOG (GNUNET_ERROR_TYPE_DEBUG,
869 "iterating set\n"); 918 "Iterating over set\n");
870 set->iterator = iter; 919 set->iterator = iter;
871 set->iterator_cls = iter_cls; 920 set->iterator_cls = iter_cls;
872 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST); 921 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST);