aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/gnunet-service-datastore.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-06-24 20:17:39 +0000
committerChristian Grothoff <christian@grothoff.org>2016-06-24 20:17:39 +0000
commit3140154d46212e08e0d73ed891a66213a6813075 (patch)
tree018a55a3899207664b388fcf47a679ca54ca6fbf /src/datastore/gnunet-service-datastore.c
parentd5fd881c2a044474b54ddf03b6ab8be8d2b75927 (diff)
downloadgnunet-3140154d46212e08e0d73ed891a66213a6813075.tar.gz
gnunet-3140154d46212e08e0d73ed891a66213a6813075.zip
refactoring datastore API to use MQ API, also fixing misc. bugs in new mysql backend
Diffstat (limited to 'src/datastore/gnunet-service-datastore.c')
-rw-r--r--src/datastore/gnunet-service-datastore.c111
1 files changed, 81 insertions, 30 deletions
diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c
index a67d1c772..57527991c 100644
--- a/src/datastore/gnunet-service-datastore.c
+++ b/src/datastore/gnunet-service-datastore.c
@@ -416,16 +416,20 @@ delete_expired (void *cls)
416 * @param expiration expiration time for the content 416 * @param expiration expiration time for the content
417 * @param uid unique identifier for the datum; 417 * @param uid unique identifier for the datum;
418 * maybe 0 if no unique identifier is available 418 * maybe 0 if no unique identifier is available
419 * 419 * @return #GNUNET_SYSERR to abort the iteration, #GNUNET_OK to continue
420 * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
421 * (continue on call to "next", of course), 420 * (continue on call to "next", of course),
422 * GNUNET_NO to delete the item and continue (if supported) 421 * #GNUNET_NO to delete the item and continue (if supported)
423 */ 422 */
424static int 423static int
425quota_processor (void *cls, const struct GNUNET_HashCode * key, uint32_t size, 424quota_processor (void *cls,
426 const void *data, enum GNUNET_BLOCK_Type type, 425 const struct GNUNET_HashCode *key,
427 uint32_t priority, uint32_t anonymity, 426 uint32_t size,
428 struct GNUNET_TIME_Absolute expiration, uint64_t uid) 427 const void *data,
428 enum GNUNET_BLOCK_Type type,
429 uint32_t priority,
430 uint32_t anonymity,
431 struct GNUNET_TIME_Absolute expiration,
432 uint64_t uid)
429{ 433{
430 unsigned long long *need = cls; 434 unsigned long long *need = cls;
431 435
@@ -473,12 +477,15 @@ manage_space (unsigned long long need)
473 unsigned long long last; 477 unsigned long long last;
474 478
475 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 479 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
476 "Asked to free up %llu bytes of cache space\n", need); 480 "Asked to free up %llu bytes of cache space\n",
481 need);
477 last = 0; 482 last = 0;
478 while ((need > 0) && (last != need)) 483 while ((need > 0) && (last != need))
479 { 484 {
480 last = need; 485 last = need;
481 plugin->api->get_expiration (plugin->api->cls, &quota_processor, &need); 486 plugin->api->get_expiration (plugin->api->cls,
487 &quota_processor,
488 &need);
482 } 489 }
483} 490}
484 491
@@ -1068,7 +1075,7 @@ handle_put (void *cls,
1068 1075
1069 1076
1070/** 1077/**
1071 * Handle GET-message. 1078 * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET-message.
1072 * 1079 *
1073 * @param cls closure 1080 * @param cls closure
1074 * @param client identification of the client 1081 * @param client identification of the client
@@ -1080,28 +1087,52 @@ handle_get (void *cls,
1080 const struct GNUNET_MessageHeader *message) 1087 const struct GNUNET_MessageHeader *message)
1081{ 1088{
1082 const struct GetMessage *msg; 1089 const struct GetMessage *msg;
1083 uint16_t size;
1084 1090
1085 size = ntohs (message->size);
1086 if ((size != sizeof (struct GetMessage)) &&
1087 (size != sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode)))
1088 {
1089 GNUNET_break (0);
1090 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1091 return;
1092 }
1093 msg = (const struct GetMessage *) message; 1091 msg = (const struct GetMessage *) message;
1094 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1092 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1093 "Processing GET request of type %u\n",
1094 ntohl (msg->type));
1095 GNUNET_STATISTICS_update (stats,
1096 gettext_noop ("# GET requests received"),
1097 1,
1098 GNUNET_NO);
1099 GNUNET_SERVER_client_keep (client);
1100 plugin->api->get_key (plugin->api->cls,
1101 GNUNET_ntohll (msg->offset),
1102 NULL,
1103 NULL,
1104 ntohl (msg->type),
1105 &transmit_item,
1106 client);
1107}
1108
1109/**
1110 * Handle #GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY-message.
1111 *
1112 * @param cls closure
1113 * @param client identification of the client
1114 * @param message the actual message
1115 */
1116static void
1117handle_get_key (void *cls,
1118 struct GNUNET_SERVER_Client *client,
1119 const struct GNUNET_MessageHeader *message)
1120{
1121 const struct GetKeyMessage *msg;
1122
1123 msg = (const struct GetKeyMessage *) message;
1124 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095 "Processing GET request for `%s' of type %u\n", 1125 "Processing GET request for `%s' of type %u\n",
1096 GNUNET_h2s (&msg->key), 1126 GNUNET_h2s (&msg->key),
1097 ntohl (msg->type)); 1127 ntohl (msg->type));
1098 GNUNET_STATISTICS_update (stats, 1128 GNUNET_STATISTICS_update (stats,
1099 gettext_noop ("# GET requests received"), 1129 gettext_noop ("# GET KEY requests received"),
1100 1, 1130 1,
1101 GNUNET_NO); 1131 GNUNET_NO);
1102 GNUNET_SERVER_client_keep (client); 1132 GNUNET_SERVER_client_keep (client);
1103 if ( (size == sizeof (struct GetMessage)) && 1133 if (GNUNET_YES !=
1104 (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, &msg->key)) ) 1134 GNUNET_CONTAINER_bloomfilter_test (filter,
1135 &msg->key))
1105 { 1136 {
1106 /* don't bother database... */ 1137 /* don't bother database... */
1107 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1138 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1112,14 +1143,19 @@ handle_get (void *cls,
1112 ("# requests filtered by bloomfilter"), 1143 ("# requests filtered by bloomfilter"),
1113 1, 1144 1,
1114 GNUNET_NO); 1145 GNUNET_NO);
1115 transmit_item (client, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 1146 transmit_item (client,
1147 NULL, 0, NULL, 0, 0, 0,
1148 GNUNET_TIME_UNIT_ZERO_ABS,
1116 0); 1149 0);
1117 return; 1150 return;
1118 } 1151 }
1119 plugin->api->get_key (plugin->api->cls, GNUNET_ntohll (msg->offset), 1152 plugin->api->get_key (plugin->api->cls,
1120 ((size == 1153 GNUNET_ntohll (msg->offset),
1121 sizeof (struct GetMessage)) ? &msg->key : NULL), NULL, 1154 &msg->key,
1122 ntohl (msg->type), &transmit_item, client); 1155 NULL,
1156 ntohl (msg->type),
1157 &transmit_item,
1158 client);
1123} 1159}
1124 1160
1125 1161
@@ -1369,7 +1405,8 @@ disk_utilization_change_cb (void *cls,
1369 _("Datastore payload must have been inaccurate (%lld < %lld). Recomputing it.\n"), 1405 _("Datastore payload must have been inaccurate (%lld < %lld). Recomputing it.\n"),
1370 (long long) payload, 1406 (long long) payload,
1371 (long long) -delta); 1407 (long long) -delta);
1372 plugin->api->estimate_size (plugin->api->cls, &payload); 1408 plugin->api->estimate_size (plugin->api->cls,
1409 &payload);
1373 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1410 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1374 _("New payload: %lld\n"), 1411 _("New payload: %lld\n"),
1375 (long long) payload); 1412 (long long) payload);
@@ -1474,7 +1511,10 @@ static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1474 {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0}, 1511 {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0},
1475 {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 1512 {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
1476 sizeof (struct UpdateMessage)}, 1513 sizeof (struct UpdateMessage)},
1477 {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0}, 1514 {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET,
1515 sizeof (struct GetMessage) },
1516 {&handle_get_key, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY,
1517 sizeof (struct GetKeyMessage) },
1478 {&handle_get_replication, NULL, 1518 {&handle_get_replication, NULL,
1479 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION, 1519 GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
1480 sizeof (struct GNUNET_MessageHeader)}, 1520 sizeof (struct GNUNET_MessageHeader)},
@@ -1555,6 +1595,10 @@ process_stat_done (void *cls,
1555 "Failed to obtain value from statistics service, recomputing it\n"); 1595 "Failed to obtain value from statistics service, recomputing it\n");
1556 plugin->api->estimate_size (plugin->api->cls, 1596 plugin->api->estimate_size (plugin->api->cls,
1557 &payload); 1597 &payload);
1598 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1599 _("New payload: %lld\n"),
1600 (long long) payload);
1601
1558 } 1602 }
1559 if (GNUNET_YES == refresh_bf) 1603 if (GNUNET_YES == refresh_bf)
1560 { 1604 {
@@ -1624,7 +1668,13 @@ cleaning_task (void *cls)
1624 expired_kill_task = NULL; 1668 expired_kill_task = NULL;
1625 } 1669 }
1626 if (GNUNET_YES == do_drop) 1670 if (GNUNET_YES == do_drop)
1671 {
1672 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1673 "Dropping database!\n");
1627 plugin->api->drop (plugin->api->cls); 1674 plugin->api->drop (plugin->api->cls);
1675 payload = 0;
1676 last_sync++;
1677 }
1628 if (NULL != plugin) 1678 if (NULL != plugin)
1629 { 1679 {
1630 unload_plugin (plugin); 1680 unload_plugin (plugin);
@@ -1651,7 +1701,8 @@ cleaning_task (void *cls)
1651 sync_stats (); 1701 sync_stats ();
1652 if (NULL != stats) 1702 if (NULL != stats)
1653 { 1703 {
1654 GNUNET_STATISTICS_destroy (stats, GNUNET_YES); 1704 GNUNET_STATISTICS_destroy (stats,
1705 GNUNET_YES);
1655 stats = NULL; 1706 stats = NULL;
1656 } 1707 }
1657 GNUNET_free (quota_stat_name); 1708 GNUNET_free (quota_stat_name);