aboutsummaryrefslogtreecommitdiff
path: root/src/dht/gnunet-service-dht_clients.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2022-01-02 20:24:02 +0100
committerChristian Grothoff <christian@grothoff.org>2022-01-02 20:24:02 +0100
commite3ff017054eb35dedd95ae4fe66c82b88e3bbdc3 (patch)
treeabb821d63f09d0982d6f6802ec1c55234dcc0f5c /src/dht/gnunet-service-dht_clients.c
parent329f0458fa1fce45ce1c31e86771ffefb22e941e (diff)
downloadgnunet-e3ff017054eb35dedd95ae4fe66c82b88e3bbdc3.tar.gz
gnunet-e3ff017054eb35dedd95ae4fe66c82b88e3bbdc3.zip
-non-trivial refactoring/cleanup of the DHT code
Diffstat (limited to 'src/dht/gnunet-service-dht_clients.c')
-rw-r--r--src/dht/gnunet-service-dht_clients.c340
1 files changed, 110 insertions, 230 deletions
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c
index aa41f519c..b520cda41 100644
--- a/src/dht/gnunet-service-dht_clients.c
+++ b/src/dht/gnunet-service-dht_clients.c
@@ -425,6 +425,7 @@ transmit_next_request_task (void *cls)
425{ 425{
426 struct ClientQueryRecord *cqr; 426 struct ClientQueryRecord *cqr;
427 427
428 (void) cls;
428 retry_task = NULL; 429 retry_task = NULL;
429 while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap))) 430 while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
430 { 431 {
@@ -486,19 +487,23 @@ handle_dht_local_put (void *cls,
486{ 487{
487 struct ClientHandle *ch = cls; 488 struct ClientHandle *ch = cls;
488 uint16_t size = ntohs (dht_msg->header.size); 489 uint16_t size = ntohs (dht_msg->header.size);
489 uint32_t type = ntohl (dht_msg->type);
490 struct GNUNET_TIME_Absolute expiration
491 = GNUNET_TIME_absolute_ntoh (dht_msg->expiration);
492 enum GNUNET_DHT_RouteOption options 490 enum GNUNET_DHT_RouteOption options
493 = (enum GNUNET_DHT_RouteOption) ntohl (dht_msg->options); 491 = (enum GNUNET_DHT_RouteOption) ntohl (dht_msg->options);
494 uint32_t replication_level 492 uint32_t replication_level
495 = ntohl (dht_msg->desired_replication_level); 493 = ntohl (dht_msg->desired_replication_level);
494 struct GDS_DATACACHE_BlockData bd = {
495 .key = dht_msg->key,
496 .expiration_time = GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
497 .data = &dht_msg[1],
498 .data_size = size - sizeof (*dht_msg),
499 .type = ntohl (dht_msg->type)
500 };
496 501
497 LOG (GNUNET_ERROR_TYPE_DEBUG, 502 LOG (GNUNET_ERROR_TYPE_DEBUG,
498 "Handling local PUT of %lu-bytes for query %s of type %u\n", 503 "Handling local PUT of %lu-bytes for query %s of type %u\n",
499 (unsigned long) (size - sizeof(struct GNUNET_DHT_ClientPutMessage)), 504 (unsigned long) (size - sizeof(struct GNUNET_DHT_ClientPutMessage)),
500 GNUNET_h2s (&dht_msg->key), 505 GNUNET_h2s (&dht_msg->key),
501 (unsigned int) type); 506 (unsigned int) bd.type);
502 GNUNET_STATISTICS_update (GDS_stats, 507 GNUNET_STATISTICS_update (GDS_stats,
503 "# PUT requests received from clients", 508 "# PUT requests received from clients",
504 1, 509 1,
@@ -507,14 +512,10 @@ handle_dht_local_put (void *cls,
507 "CLIENT-PUT %s\n", 512 "CLIENT-PUT %s\n",
508 GNUNET_h2s_full (&dht_msg->key)); 513 GNUNET_h2s_full (&dht_msg->key));
509 /* give to local clients */ 514 /* give to local clients */
510 GDS_CLIENTS_handle_reply (expiration, 515 GDS_CLIENTS_handle_reply (&bd,
511 &dht_msg->key, 516 &bd.key,
512 &dht_msg->key, 517 0, NULL /* get path */);
513 0, NULL, /* get path */ 518
514 0, NULL, /* put path */
515 type,
516 size - sizeof(struct GNUNET_DHT_ClientPutMessage),
517 &dht_msg[1]);
518 { 519 {
519 struct GNUNET_CONTAINER_BloomFilter *peer_bf; 520 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
520 521
@@ -526,26 +527,14 @@ handle_dht_local_put (void *cls,
526 if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) || 527 if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
527 (GDS_am_closest_peer (&dht_msg->key, 528 (GDS_am_closest_peer (&dht_msg->key,
528 peer_bf))) 529 peer_bf)))
529 GDS_DATACACHE_handle_put ( 530 GDS_DATACACHE_handle_put (&bd);
530 expiration,
531 &dht_msg->key,
532 0, NULL, /* put path */
533 type,
534 size - sizeof(struct GNUNET_DHT_ClientPutMessage),
535 &dht_msg[1]);
536 /* route to other peers */ 531 /* route to other peers */
537 if (GNUNET_OK != 532 if (GNUNET_OK !=
538 GDS_NEIGHBOURS_handle_put ( 533 GDS_NEIGHBOURS_handle_put (&bd,
539 type, 534 options,
540 options, 535 replication_level,
541 replication_level, 536 0 /* hop count */,
542 expiration, 537 peer_bf))
543 0 /* hop count */,
544 peer_bf,
545 &dht_msg->key,
546 0, NULL, /* put path */
547 &dht_msg[1],
548 size - sizeof(struct GNUNET_DHT_ClientPutMessage)))
549 { 538 {
550 GNUNET_STATISTICS_update (GDS_stats, 539 GNUNET_STATISTICS_update (GDS_stats,
551 "# Local PUT requests not routed", 540 "# Local PUT requests not routed",
@@ -556,15 +545,9 @@ handle_dht_local_put (void *cls,
556 } 545 }
557 GDS_CLIENTS_process_put ( 546 GDS_CLIENTS_process_put (
558 options, 547 options,
559 type, 548 &bd,
560 0, /* hop count */ 549 0, /* hop count */
561 replication_level, 550 replication_level);
562 1, /* path length */
563 GDS_NEIGHBOURS_get_id (),
564 expiration,
565 &dht_msg->key,
566 &dht_msg[1],
567 size - sizeof(struct GNUNET_DHT_ClientPutMessage));
568 GNUNET_SERVICE_client_continue (ch->client); 551 GNUNET_SERVICE_client_continue (ch->client);
569} 552}
570 553
@@ -573,37 +556,16 @@ handle_dht_local_put (void *cls,
573 * Handle a result from local datacache for a GET operation. 556 * Handle a result from local datacache for a GET operation.
574 * 557 *
575 * @param cls the `struct ClientHandle` of the client doing the query 558 * @param cls the `struct ClientHandle` of the client doing the query
576 * @param type type of the block 559 * @param bd details about the block that was found
577 * @param expiration_time when does the content expire
578 * @param key key for the content
579 * @param put_path_length number of entries in @a put_path
580 * @param put_path peers the original PUT traversed (if tracked)
581 * @param get_path_length number of entries in @a get_path
582 * @param get_path peers this reply has traversed so far (if tracked)
583 * @param data payload of the reply
584 * @param data_size number of bytes in @a data
585 */ 560 */
586static void 561static void
587handle_local_result (void *cls, 562handle_local_result (void *cls,
588 enum GNUNET_BLOCK_Type type, 563 const struct GDS_DATACACHE_BlockData *bd)
589 struct GNUNET_TIME_Absolute expiration_time,
590 const struct GNUNET_HashCode *key,
591 unsigned int put_path_length,
592 const struct GNUNET_PeerIdentity *put_path,
593 unsigned int get_path_length,
594 const struct GNUNET_PeerIdentity *get_path,
595 const void *data,
596 size_t data_size)
597{ 564{
598 // FIXME: this may deserve some clean up: inline the function, 565 /* FIXME: use 'cls' instead of looking up the client? */
599 // possibly avoid even looking up the client! 566 GDS_CLIENTS_handle_reply (bd,
600 GDS_CLIENTS_handle_reply (expiration_time, 567 &bd->key,
601 key, 568 0, NULL /* get_path */);
602 key,
603 0, NULL,
604 put_path_length, put_path,
605 type,
606 data_size, data);
607} 569}
608 570
609 571
@@ -713,6 +675,9 @@ struct FindByUniqueIdContext
713 */ 675 */
714 struct ClientQueryRecord *cqr; 676 struct ClientQueryRecord *cqr;
715 677
678 /**
679 * Unique ID to look for.
680 */
716 uint64_t unique_id; 681 uint64_t unique_id;
717}; 682};
718 683
@@ -968,9 +933,9 @@ handle_dht_local_monitor_stop (
968struct ForwardReplyContext 933struct ForwardReplyContext
969{ 934{
970 /** 935 /**
971 * Expiration time of the reply. 936 * Block details.
972 */ 937 */
973 struct GNUNET_TIME_Absolute expiration; 938 const struct GDS_DATACACHE_BlockData *bd;
974 939
975 /** 940 /**
976 * GET path taken. 941 * GET path taken.
@@ -978,39 +943,10 @@ struct ForwardReplyContext
978 const struct GNUNET_PeerIdentity *get_path; 943 const struct GNUNET_PeerIdentity *get_path;
979 944
980 /** 945 /**
981 * PUT path taken.
982 */
983 const struct GNUNET_PeerIdentity *put_path;
984
985 /**
986 * Hash under which the payload is stored.
987 */
988 const struct GNUNET_HashCode *query_hash;
989
990 /**
991 * Embedded payload.
992 */
993 const void *data;
994
995 /**
996 * Number of bytes in data.
997 */
998 size_t data_size;
999
1000 /**
1001 * Number of entries in @e get_path. 946 * Number of entries in @e get_path.
1002 */ 947 */
1003 unsigned int get_path_length; 948 unsigned int get_path_length;
1004 949
1005 /**
1006 * Number of entries in @e put_path.
1007 */
1008 unsigned int put_path_length;
1009
1010 /**
1011 * Type of the data.
1012 */
1013 enum GNUNET_BLOCK_Type type;
1014}; 950};
1015 951
1016 952
@@ -1019,15 +955,15 @@ struct ForwardReplyContext
1019 * each of the matching clients. With some tricky recycling 955 * each of the matching clients. With some tricky recycling
1020 * of the buffer. 956 * of the buffer.
1021 * 957 *
1022 * @param cls the 'struct ForwardReplyContext' 958 * @param cls the `struct ForwardReplyContext`
1023 * @param key current key 959 * @param query_hash hash of the query for which this may be a reply
1024 * @param value value in the hash map, a ClientQueryRecord 960 * @param value value in the hash map, a ClientQueryRecord
1025 * @return #GNUNET_YES (we should continue to iterate), 961 * @return #GNUNET_YES (we should continue to iterate),
1026 * if the result is mal-formed, #GNUNET_NO 962 * if the result is mal-formed, #GNUNET_NO
1027 */ 963 */
1028static enum GNUNET_GenericReturnValue 964static enum GNUNET_GenericReturnValue
1029forward_reply (void *cls, 965forward_reply (void *cls,
1030 const struct GNUNET_HashCode *key, 966 const struct GNUNET_HashCode *query_hash,
1031 void *value) 967 void *value)
1032{ 968{
1033 struct ForwardReplyContext *frc = cls; 969 struct ForwardReplyContext *frc = cls;
@@ -1041,13 +977,13 @@ forward_reply (void *cls,
1041 977
1042 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, 978 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1043 "CLIENT-RESULT %s\n", 979 "CLIENT-RESULT %s\n",
1044 GNUNET_h2s_full (frc->query_hash)); 980 GNUNET_h2s_full (&frc->bd->key));
1045 if ((record->type != GNUNET_BLOCK_TYPE_ANY) && 981 if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
1046 (record->type != frc->type)) 982 (record->type != frc->bd->type) )
1047 { 983 {
1048 LOG (GNUNET_ERROR_TYPE_DEBUG, 984 LOG (GNUNET_ERROR_TYPE_DEBUG,
1049 "Record type mismatch, not passing request for key %s to local client\n", 985 "Record type mismatch, not passing request for key %s to local client\n",
1050 GNUNET_h2s (key)); 986 GNUNET_h2s (&frc->bd->key));
1051 GNUNET_STATISTICS_update (GDS_stats, 987 GNUNET_STATISTICS_update (GDS_stats,
1052 "# Key match, type mismatches in REPLY to CLIENT", 988 "# Key match, type mismatches in REPLY to CLIENT",
1053 1, 989 1,
@@ -1055,8 +991,8 @@ forward_reply (void *cls,
1055 return GNUNET_YES; /* type mismatch */ 991 return GNUNET_YES; /* type mismatch */
1056 } 992 }
1057 if ( (0 == (record->msg_options & GNUNET_DHT_RO_FIND_PEER)) && 993 if ( (0 == (record->msg_options & GNUNET_DHT_RO_FIND_PEER)) &&
1058 (0 != GNUNET_memcmp (key, 994 (0 != GNUNET_memcmp (&frc->bd->key,
1059 frc->query_hash)) ) 995 query_hash)) )
1060 { 996 {
1061 GNUNET_STATISTICS_update (GDS_stats, 997 GNUNET_STATISTICS_update (GDS_stats,
1062 "# Inexact key match, but exact match required", 998 "# Inexact key match, but exact match required",
@@ -1064,37 +1000,36 @@ forward_reply (void *cls,
1064 GNUNET_NO); 1000 GNUNET_NO);
1065 return GNUNET_YES; /* type mismatch */ 1001 return GNUNET_YES; /* type mismatch */
1066 } 1002 }
1067 GNUNET_CRYPTO_hash (frc->data, 1003 GNUNET_CRYPTO_hash (frc->bd->data,
1068 frc->data_size, 1004 frc->bd->data_size,
1069 &ch); 1005 &ch);
1070 for (unsigned int i = 0; i < record->seen_replies_count; i++) 1006 for (unsigned int i = 0; i < record->seen_replies_count; i++)
1071 if (0 == memcmp (&record->seen_replies[i], 1007 if (0 ==
1072 &ch, 1008 GNUNET_memcmp (&record->seen_replies[i],
1073 sizeof(struct GNUNET_HashCode))) 1009 &ch))
1074 { 1010 {
1075 LOG (GNUNET_ERROR_TYPE_DEBUG, 1011 LOG (GNUNET_ERROR_TYPE_DEBUG,
1076 "Duplicate reply, not passing request for key %s to local client\n", 1012 "Duplicate reply, not passing request for key %s to local client\n",
1077 GNUNET_h2s (key)); 1013 GNUNET_h2s (&frc->bd->key));
1078 GNUNET_STATISTICS_update (GDS_stats, 1014 GNUNET_STATISTICS_update (GDS_stats,
1079 gettext_noop 1015 "# Duplicate REPLIES to CLIENT request dropped",
1080 ( 1016 1,
1081 "# Duplicate REPLIES to CLIENT request dropped"), 1017 GNUNET_NO);
1082 1, GNUNET_NO);
1083 return GNUNET_YES; /* duplicate */ 1018 return GNUNET_YES; /* duplicate */
1084 } 1019 }
1085 eval 1020 eval
1086 = GNUNET_BLOCK_check_reply (GDS_block_context, 1021 = GNUNET_BLOCK_check_reply (GDS_block_context,
1087 record->type, 1022 record->type,
1088 NULL, 1023 NULL,
1089 key, 1024 &frc->bd->key,
1090 record->xquery, 1025 record->xquery,
1091 record->xquery_size, 1026 record->xquery_size,
1092 frc->data, 1027 frc->bd->data,
1093 frc->data_size); 1028 frc->bd->data_size);
1094 LOG (GNUNET_ERROR_TYPE_DEBUG, 1029 LOG (GNUNET_ERROR_TYPE_DEBUG,
1095 "Evaluation result is %d for key %s for local client's query\n", 1030 "Evaluation result is %d for key %s for local client's query\n",
1096 (int) eval, 1031 (int) eval,
1097 GNUNET_h2s (key)); 1032 GNUNET_h2s (&frc->bd->key));
1098 switch (eval) 1033 switch (eval)
1099 { 1034 {
1100 case GNUNET_BLOCK_REPLY_OK_LAST: 1035 case GNUNET_BLOCK_REPLY_OK_LAST:
@@ -1125,29 +1060,29 @@ forward_reply (void *cls,
1125 1, 1060 1,
1126 GNUNET_NO); 1061 GNUNET_NO);
1127 env = GNUNET_MQ_msg_extra (reply, 1062 env = GNUNET_MQ_msg_extra (reply,
1128 frc->data_size 1063 frc->bd->data_size
1129 + (frc->get_path_length + frc->put_path_length) 1064 + (frc->get_path_length + frc->bd->put_path_length)
1130 * sizeof(struct GNUNET_PeerIdentity), 1065 * sizeof(struct GNUNET_PeerIdentity),
1131 GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT); 1066 GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
1132 reply->type = htonl (frc->type); 1067 reply->type = htonl (frc->bd->type);
1133 reply->get_path_length = htonl (frc->get_path_length); 1068 reply->get_path_length = htonl (frc->get_path_length);
1134 reply->put_path_length = htonl (frc->put_path_length); 1069 reply->put_path_length = htonl (frc->bd->put_path_length);
1135 reply->unique_id = record->unique_id; 1070 reply->unique_id = record->unique_id;
1136 reply->expiration = GNUNET_TIME_absolute_hton (frc->expiration); 1071 reply->expiration = GNUNET_TIME_absolute_hton (frc->bd->expiration_time);
1137 reply->key = *key; 1072 reply->key = frc->bd->key;
1138 paths = (struct GNUNET_PeerIdentity *) &reply[1]; 1073 paths = (struct GNUNET_PeerIdentity *) &reply[1];
1139 GNUNET_memcpy (paths, 1074 GNUNET_memcpy (paths,
1140 frc->put_path, 1075 frc->bd->put_path,
1141 sizeof(struct GNUNET_PeerIdentity) * frc->put_path_length); 1076 sizeof(struct GNUNET_PeerIdentity) * frc->bd->put_path_length);
1142 GNUNET_memcpy (&paths[frc->put_path_length], 1077 GNUNET_memcpy (&paths[frc->bd->put_path_length],
1143 frc->get_path, 1078 frc->get_path,
1144 sizeof(struct GNUNET_PeerIdentity) * frc->get_path_length); 1079 sizeof(struct GNUNET_PeerIdentity) * frc->get_path_length);
1145 GNUNET_memcpy (&paths[frc->get_path_length + frc->put_path_length], 1080 GNUNET_memcpy (&paths[frc->get_path_length + frc->bd->put_path_length],
1146 frc->data, 1081 frc->bd->data,
1147 frc->data_size); 1082 frc->bd->data_size);
1148 LOG (GNUNET_ERROR_TYPE_DEBUG, 1083 LOG (GNUNET_ERROR_TYPE_DEBUG,
1149 "Sending reply to query %s for client %p\n", 1084 "Sending reply to query %s for client %p\n",
1150 GNUNET_h2s (key), 1085 GNUNET_h2s (query_hash),
1151 record->ch->client); 1086 record->ch->client);
1152 GNUNET_MQ_send (record->ch->mq, 1087 GNUNET_MQ_send (record->ch->mq,
1153 env); 1088 env);
@@ -1157,66 +1092,38 @@ forward_reply (void *cls,
1157} 1092}
1158 1093
1159 1094
1160/**
1161 * Handle a reply we've received from another peer. If the reply
1162 * matches any of our pending queries, forward it to the respective
1163 * client(s).
1164 *
1165 * @param expiration when will the reply expire
1166 * @param key the key of the query that triggered the reply
1167 * @param query_hash the query hash of the response
1168 * @param get_path_length number of peers in @a get_path
1169 * @param get_path path the reply took on get
1170 * @param put_path_length number of peers in @a put_path
1171 * @param put_path path the reply took on put
1172 * @param type type of the reply
1173 * @param data_size number of bytes in @a data
1174 * @param data application payload data
1175 */
1176void 1095void
1177GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration, 1096GDS_CLIENTS_handle_reply (const struct GDS_DATACACHE_BlockData *bd,
1178 const struct GNUNET_HashCode *key,
1179 const struct GNUNET_HashCode *query_hash, 1097 const struct GNUNET_HashCode *query_hash,
1180 unsigned int get_path_length, 1098 unsigned int get_path_length,
1181 const struct GNUNET_PeerIdentity *get_path, 1099 const struct GNUNET_PeerIdentity *get_path)
1182 unsigned int put_path_length,
1183 const struct GNUNET_PeerIdentity *put_path,
1184 enum GNUNET_BLOCK_Type type,
1185 size_t data_size,
1186 const void *data)
1187{ 1100{
1188 struct ForwardReplyContext frc; 1101 struct ForwardReplyContext frc;
1189 size_t msize; 1102 size_t msize = sizeof (struct GNUNET_DHT_ClientResultMessage)
1103 + bd->data_size
1104 + (get_path_length + bd->put_path_length)
1105 * sizeof(struct GNUNET_PeerIdentity);
1190 1106
1191 msize = sizeof(struct GNUNET_DHT_ClientResultMessage) + data_size
1192 + (get_path_length + put_path_length)
1193 * sizeof(struct GNUNET_PeerIdentity);
1194 if (msize >= GNUNET_MAX_MESSAGE_SIZE) 1107 if (msize >= GNUNET_MAX_MESSAGE_SIZE)
1195 { 1108 {
1196 GNUNET_break (0); 1109 GNUNET_break (0);
1197 return; 1110 return;
1198 } 1111 }
1199 frc.expiration = expiration; 1112 frc.bd = bd;
1200 frc.query_hash = query_hash;
1201 frc.get_path = get_path; 1113 frc.get_path = get_path;
1202 frc.put_path = put_path;
1203 frc.data = data;
1204 frc.data_size = data_size;
1205 frc.get_path_length = get_path_length; 1114 frc.get_path_length = get_path_length;
1206 frc.put_path_length = put_path_length;
1207 frc.type = type;
1208 LOG (GNUNET_ERROR_TYPE_DEBUG, 1115 LOG (GNUNET_ERROR_TYPE_DEBUG,
1209 "Forwarding reply for key %s to client\n", 1116 "Forwarding reply for query hash %s to client\n",
1210 GNUNET_h2s (key)); 1117 GNUNET_h2s (query_hash));
1211 if (0 == 1118 if (0 ==
1212 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, 1119 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
1213 key, 1120 query_hash,
1214 &forward_reply, 1121 &forward_reply,
1215 &frc)) 1122 &frc))
1216 { 1123 {
1217 LOG (GNUNET_ERROR_TYPE_DEBUG, 1124 LOG (GNUNET_ERROR_TYPE_DEBUG,
1218 "No matching client for reply for key %s\n", 1125 "No matching client for reply for query %s\n",
1219 GNUNET_h2s (key)); 1126 GNUNET_h2s (query_hash));
1220 GNUNET_STATISTICS_update (GDS_stats, 1127 GNUNET_STATISTICS_update (GDS_stats,
1221 "# REPLIES ignored for CLIENTS (no match)", 1128 "# REPLIES ignored for CLIENTS (no match)",
1222 1, 1129 1,
@@ -1298,15 +1205,9 @@ GDS_CLIENTS_process_get (uint32_t options,
1298 1205
1299 1206
1300void 1207void
1301GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type, 1208GDS_CLIENTS_process_get_resp (const struct GDS_DATACACHE_BlockData *bd,
1302 const struct GNUNET_PeerIdentity *get_path, 1209 const struct GNUNET_PeerIdentity *get_path,
1303 unsigned int get_path_length, 1210 unsigned int get_path_length)
1304 const struct GNUNET_PeerIdentity *put_path,
1305 unsigned int put_path_length,
1306 struct GNUNET_TIME_Absolute exp,
1307 const struct GNUNET_HashCode *key,
1308 const void *data,
1309 size_t size)
1310{ 1211{
1311 struct ClientHandle **cl = NULL; 1212 struct ClientHandle **cl = NULL;
1312 unsigned int cl_size = 0; 1213 unsigned int cl_size = 0;
@@ -1316,9 +1217,9 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
1316 m = m->next) 1217 m = m->next)
1317 { 1218 {
1318 if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) || 1219 if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
1319 (m->type == type) ) && 1220 (m->type == bd->type) ) &&
1320 ( (GNUNET_is_zero (&m->key)) || 1221 ( (GNUNET_is_zero (&m->key)) ||
1321 (0 == GNUNET_memcmp (key, 1222 (0 == GNUNET_memcmp (&bd->key,
1322 &m->key)) ) ) 1223 &m->key)) ) )
1323 { 1224 {
1324 struct GNUNET_MQ_Envelope *env; 1225 struct GNUNET_MQ_Envelope *env;
@@ -1336,27 +1237,27 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
1336 GNUNET_array_append (cl, 1237 GNUNET_array_append (cl,
1337 cl_size, 1238 cl_size,
1338 m->ch); 1239 m->ch);
1339 msize = size; 1240 msize = bd->data_size;
1340 msize += (get_path_length + put_path_length) 1241 msize += (get_path_length + bd->put_path_length)
1341 * sizeof(struct GNUNET_PeerIdentity); 1242 * sizeof(struct GNUNET_PeerIdentity);
1342 env = GNUNET_MQ_msg_extra (mmsg, 1243 env = GNUNET_MQ_msg_extra (mmsg,
1343 msize, 1244 msize,
1344 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP); 1245 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
1345 mmsg->type = htonl (type); 1246 mmsg->type = htonl (bd->type);
1346 mmsg->put_path_length = htonl (put_path_length); 1247 mmsg->put_path_length = htonl (bd->put_path_length);
1347 mmsg->get_path_length = htonl (get_path_length); 1248 mmsg->get_path_length = htonl (get_path_length);
1348 mmsg->expiration_time = GNUNET_TIME_absolute_hton (exp); 1249 mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
1349 mmsg->key = *key; 1250 mmsg->key = bd->key;
1350 path = (struct GNUNET_PeerIdentity *) &mmsg[1]; 1251 path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1351 GNUNET_memcpy (path, 1252 GNUNET_memcpy (path,
1352 put_path, 1253 bd->put_path,
1353 put_path_length * sizeof(struct GNUNET_PeerIdentity)); 1254 bd->put_path_length * sizeof(struct GNUNET_PeerIdentity));
1354 GNUNET_memcpy (path, 1255 GNUNET_memcpy (path,
1355 get_path, 1256 get_path,
1356 get_path_length * sizeof(struct GNUNET_PeerIdentity)); 1257 get_path_length * sizeof(struct GNUNET_PeerIdentity));
1357 GNUNET_memcpy (&path[get_path_length], 1258 GNUNET_memcpy (&path[get_path_length],
1358 data, 1259 bd->data,
1359 size); 1260 bd->data_size);
1360 GNUNET_MQ_send (m->ch->mq, 1261 GNUNET_MQ_send (m->ch->mq,
1361 env); 1262 env);
1362 } 1263 }
@@ -1365,32 +1266,11 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
1365} 1266}
1366 1267
1367 1268
1368/**
1369 * Check if some client is monitoring PUT messages and notify
1370 * them in that case. The @a path should include our own peer ID.
1371 *
1372 * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1373 * @param type The type of data in the request.
1374 * @param hop_count Hop count so far.
1375 * @param path_length number of entries in path (or 0 if not recorded).
1376 * @param path peers on the PUT path (or NULL if not recorded).
1377 * @param desired_replication_level Desired replication level.
1378 * @param exp Expiration time of the data.
1379 * @param key Key under which data is to be stored.
1380 * @param data Pointer to the data carried.
1381 * @param size Number of bytes in data.
1382 */
1383void 1269void
1384GDS_CLIENTS_process_put (uint32_t options, 1270GDS_CLIENTS_process_put (enum GNUNET_DHT_RouteOption options,
1385 enum GNUNET_BLOCK_Type type, 1271 const struct GDS_DATACACHE_BlockData *bd,
1386 uint32_t hop_count, 1272 uint32_t hop_count,
1387 uint32_t desired_replication_level, 1273 uint32_t desired_replication_level)
1388 unsigned int path_length,
1389 const struct GNUNET_PeerIdentity *path,
1390 struct GNUNET_TIME_Absolute exp,
1391 const struct GNUNET_HashCode *key,
1392 const void *data,
1393 size_t size)
1394{ 1274{
1395 struct ClientHandle **cl = NULL; 1275 struct ClientHandle **cl = NULL;
1396 unsigned int cl_size = 0; 1276 unsigned int cl_size = 0;
@@ -1400,10 +1280,10 @@ GDS_CLIENTS_process_put (uint32_t options,
1400 m = m->next) 1280 m = m->next)
1401 { 1281 {
1402 if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) || 1282 if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
1403 (m->type == type) ) && 1283 (m->type == bd->type) ) &&
1404 ( (GNUNET_is_zero (&m->key)) || 1284 ( (GNUNET_is_zero (&m->key)) ||
1405 (0 == 1285 (0 ==
1406 GNUNET_memcmp (key, 1286 GNUNET_memcmp (&bd->key,
1407 &m->key)) ) ) 1287 &m->key)) ) )
1408 { 1288 {
1409 struct GNUNET_MQ_Envelope *env; 1289 struct GNUNET_MQ_Envelope *env;
@@ -1421,25 +1301,25 @@ GDS_CLIENTS_process_put (uint32_t options,
1421 GNUNET_array_append (cl, 1301 GNUNET_array_append (cl,
1422 cl_size, 1302 cl_size,
1423 m->ch); 1303 m->ch);
1424 msize = size; 1304 msize = bd->data_size;
1425 msize += path_length * sizeof(struct GNUNET_PeerIdentity); 1305 msize += bd->put_path_length * sizeof(struct GNUNET_PeerIdentity);
1426 env = GNUNET_MQ_msg_extra (mmsg, 1306 env = GNUNET_MQ_msg_extra (mmsg,
1427 msize, 1307 msize,
1428 GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT); 1308 GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
1429 mmsg->options = htonl (options); 1309 mmsg->options = htonl (options);
1430 mmsg->type = htonl (type); 1310 mmsg->type = htonl (bd->type);
1431 mmsg->hop_count = htonl (hop_count); 1311 mmsg->hop_count = htonl (hop_count);
1432 mmsg->desired_replication_level = htonl (desired_replication_level); 1312 mmsg->desired_replication_level = htonl (desired_replication_level);
1433 mmsg->put_path_length = htonl (path_length); 1313 mmsg->put_path_length = htonl (bd->put_path_length);
1434 mmsg->key = *key; 1314 mmsg->key = bd->key;
1435 mmsg->expiration_time = GNUNET_TIME_absolute_hton (exp); 1315 mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
1436 msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; 1316 msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1437 GNUNET_memcpy (msg_path, 1317 GNUNET_memcpy (msg_path,
1438 path, 1318 bd->put_path,
1439 path_length * sizeof(struct GNUNET_PeerIdentity)); 1319 bd->put_path_length * sizeof(struct GNUNET_PeerIdentity));
1440 GNUNET_memcpy (&msg_path[path_length], 1320 GNUNET_memcpy (&msg_path[bd->put_path_length],
1441 data, 1321 bd->data,
1442 size); 1322 bd->data_size);
1443 GNUNET_MQ_send (m->ch->mq, 1323 GNUNET_MQ_send (m->ch->mq,
1444 env); 1324 env);
1445 } 1325 }