diff options
author | Christian Grothoff <christian@grothoff.org> | 2022-01-02 20:24:02 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2022-01-02 20:24:02 +0100 |
commit | e3ff017054eb35dedd95ae4fe66c82b88e3bbdc3 (patch) | |
tree | abb821d63f09d0982d6f6802ec1c55234dcc0f5c /src/dht/gnunet-service-dht_clients.c | |
parent | 329f0458fa1fce45ce1c31e86771ffefb22e941e (diff) | |
download | gnunet-e3ff017054eb35dedd95ae4fe66c82b88e3bbdc3.tar.gz gnunet-e3ff017054eb35dedd95ae4fe66c82b88e3bbdc3.zip |
-non-trivial refactoring/cleanup of the DHT code
Diffstat (limited to 'src/dht/gnunet-service-dht_clients.c')
-rw-r--r-- | src/dht/gnunet-service-dht_clients.c | 340 |
1 files changed, 110 insertions, 230 deletions
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c index aa41f519c..b520cda41 100644 --- a/src/dht/gnunet-service-dht_clients.c +++ b/src/dht/gnunet-service-dht_clients.c | |||
@@ -425,6 +425,7 @@ transmit_next_request_task (void *cls) | |||
425 | { | 425 | { |
426 | struct ClientQueryRecord *cqr; | 426 | struct ClientQueryRecord *cqr; |
427 | 427 | ||
428 | (void) cls; | ||
428 | retry_task = NULL; | 429 | retry_task = NULL; |
429 | while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap))) | 430 | while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap))) |
430 | { | 431 | { |
@@ -486,19 +487,23 @@ handle_dht_local_put (void *cls, | |||
486 | { | 487 | { |
487 | struct ClientHandle *ch = cls; | 488 | struct ClientHandle *ch = cls; |
488 | uint16_t size = ntohs (dht_msg->header.size); | 489 | uint16_t size = ntohs (dht_msg->header.size); |
489 | uint32_t type = ntohl (dht_msg->type); | ||
490 | struct GNUNET_TIME_Absolute expiration | ||
491 | = GNUNET_TIME_absolute_ntoh (dht_msg->expiration); | ||
492 | enum GNUNET_DHT_RouteOption options | 490 | enum GNUNET_DHT_RouteOption options |
493 | = (enum GNUNET_DHT_RouteOption) ntohl (dht_msg->options); | 491 | = (enum GNUNET_DHT_RouteOption) ntohl (dht_msg->options); |
494 | uint32_t replication_level | 492 | uint32_t replication_level |
495 | = ntohl (dht_msg->desired_replication_level); | 493 | = ntohl (dht_msg->desired_replication_level); |
494 | struct GDS_DATACACHE_BlockData bd = { | ||
495 | .key = dht_msg->key, | ||
496 | .expiration_time = GNUNET_TIME_absolute_ntoh (dht_msg->expiration), | ||
497 | .data = &dht_msg[1], | ||
498 | .data_size = size - sizeof (*dht_msg), | ||
499 | .type = ntohl (dht_msg->type) | ||
500 | }; | ||
496 | 501 | ||
497 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 502 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
498 | "Handling local PUT of %lu-bytes for query %s of type %u\n", | 503 | "Handling local PUT of %lu-bytes for query %s of type %u\n", |
499 | (unsigned long) (size - sizeof(struct GNUNET_DHT_ClientPutMessage)), | 504 | (unsigned long) (size - sizeof(struct GNUNET_DHT_ClientPutMessage)), |
500 | GNUNET_h2s (&dht_msg->key), | 505 | GNUNET_h2s (&dht_msg->key), |
501 | (unsigned int) type); | 506 | (unsigned int) bd.type); |
502 | GNUNET_STATISTICS_update (GDS_stats, | 507 | GNUNET_STATISTICS_update (GDS_stats, |
503 | "# PUT requests received from clients", | 508 | "# PUT requests received from clients", |
504 | 1, | 509 | 1, |
@@ -507,14 +512,10 @@ handle_dht_local_put (void *cls, | |||
507 | "CLIENT-PUT %s\n", | 512 | "CLIENT-PUT %s\n", |
508 | GNUNET_h2s_full (&dht_msg->key)); | 513 | GNUNET_h2s_full (&dht_msg->key)); |
509 | /* give to local clients */ | 514 | /* give to local clients */ |
510 | GDS_CLIENTS_handle_reply (expiration, | 515 | GDS_CLIENTS_handle_reply (&bd, |
511 | &dht_msg->key, | 516 | &bd.key, |
512 | &dht_msg->key, | 517 | 0, NULL /* get path */); |
513 | 0, NULL, /* get path */ | 518 | |
514 | 0, NULL, /* put path */ | ||
515 | type, | ||
516 | size - sizeof(struct GNUNET_DHT_ClientPutMessage), | ||
517 | &dht_msg[1]); | ||
518 | { | 519 | { |
519 | struct GNUNET_CONTAINER_BloomFilter *peer_bf; | 520 | struct GNUNET_CONTAINER_BloomFilter *peer_bf; |
520 | 521 | ||
@@ -526,26 +527,14 @@ handle_dht_local_put (void *cls, | |||
526 | if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) || | 527 | if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) || |
527 | (GDS_am_closest_peer (&dht_msg->key, | 528 | (GDS_am_closest_peer (&dht_msg->key, |
528 | peer_bf))) | 529 | peer_bf))) |
529 | GDS_DATACACHE_handle_put ( | 530 | GDS_DATACACHE_handle_put (&bd); |
530 | expiration, | ||
531 | &dht_msg->key, | ||
532 | 0, NULL, /* put path */ | ||
533 | type, | ||
534 | size - sizeof(struct GNUNET_DHT_ClientPutMessage), | ||
535 | &dht_msg[1]); | ||
536 | /* route to other peers */ | 531 | /* route to other peers */ |
537 | if (GNUNET_OK != | 532 | if (GNUNET_OK != |
538 | GDS_NEIGHBOURS_handle_put ( | 533 | GDS_NEIGHBOURS_handle_put (&bd, |
539 | type, | 534 | options, |
540 | options, | 535 | replication_level, |
541 | replication_level, | 536 | 0 /* hop count */, |
542 | expiration, | 537 | peer_bf)) |
543 | 0 /* hop count */, | ||
544 | peer_bf, | ||
545 | &dht_msg->key, | ||
546 | 0, NULL, /* put path */ | ||
547 | &dht_msg[1], | ||
548 | size - sizeof(struct GNUNET_DHT_ClientPutMessage))) | ||
549 | { | 538 | { |
550 | GNUNET_STATISTICS_update (GDS_stats, | 539 | GNUNET_STATISTICS_update (GDS_stats, |
551 | "# Local PUT requests not routed", | 540 | "# Local PUT requests not routed", |
@@ -556,15 +545,9 @@ handle_dht_local_put (void *cls, | |||
556 | } | 545 | } |
557 | GDS_CLIENTS_process_put ( | 546 | GDS_CLIENTS_process_put ( |
558 | options, | 547 | options, |
559 | type, | 548 | &bd, |
560 | 0, /* hop count */ | 549 | 0, /* hop count */ |
561 | replication_level, | 550 | replication_level); |
562 | 1, /* path length */ | ||
563 | GDS_NEIGHBOURS_get_id (), | ||
564 | expiration, | ||
565 | &dht_msg->key, | ||
566 | &dht_msg[1], | ||
567 | size - sizeof(struct GNUNET_DHT_ClientPutMessage)); | ||
568 | GNUNET_SERVICE_client_continue (ch->client); | 551 | GNUNET_SERVICE_client_continue (ch->client); |
569 | } | 552 | } |
570 | 553 | ||
@@ -573,37 +556,16 @@ handle_dht_local_put (void *cls, | |||
573 | * Handle a result from local datacache for a GET operation. | 556 | * Handle a result from local datacache for a GET operation. |
574 | * | 557 | * |
575 | * @param cls the `struct ClientHandle` of the client doing the query | 558 | * @param cls the `struct ClientHandle` of the client doing the query |
576 | * @param type type of the block | 559 | * @param bd details about the block that was found |
577 | * @param expiration_time when does the content expire | ||
578 | * @param key key for the content | ||
579 | * @param put_path_length number of entries in @a put_path | ||
580 | * @param put_path peers the original PUT traversed (if tracked) | ||
581 | * @param get_path_length number of entries in @a get_path | ||
582 | * @param get_path peers this reply has traversed so far (if tracked) | ||
583 | * @param data payload of the reply | ||
584 | * @param data_size number of bytes in @a data | ||
585 | */ | 560 | */ |
586 | static void | 561 | static void |
587 | handle_local_result (void *cls, | 562 | handle_local_result (void *cls, |
588 | enum GNUNET_BLOCK_Type type, | 563 | const struct GDS_DATACACHE_BlockData *bd) |
589 | struct GNUNET_TIME_Absolute expiration_time, | ||
590 | const struct GNUNET_HashCode *key, | ||
591 | unsigned int put_path_length, | ||
592 | const struct GNUNET_PeerIdentity *put_path, | ||
593 | unsigned int get_path_length, | ||
594 | const struct GNUNET_PeerIdentity *get_path, | ||
595 | const void *data, | ||
596 | size_t data_size) | ||
597 | { | 564 | { |
598 | // FIXME: this may deserve some clean up: inline the function, | 565 | /* FIXME: use 'cls' instead of looking up the client? */ |
599 | // possibly avoid even looking up the client! | 566 | GDS_CLIENTS_handle_reply (bd, |
600 | GDS_CLIENTS_handle_reply (expiration_time, | 567 | &bd->key, |
601 | key, | 568 | 0, NULL /* get_path */); |
602 | key, | ||
603 | 0, NULL, | ||
604 | put_path_length, put_path, | ||
605 | type, | ||
606 | data_size, data); | ||
607 | } | 569 | } |
608 | 570 | ||
609 | 571 | ||
@@ -713,6 +675,9 @@ struct FindByUniqueIdContext | |||
713 | */ | 675 | */ |
714 | struct ClientQueryRecord *cqr; | 676 | struct ClientQueryRecord *cqr; |
715 | 677 | ||
678 | /** | ||
679 | * Unique ID to look for. | ||
680 | */ | ||
716 | uint64_t unique_id; | 681 | uint64_t unique_id; |
717 | }; | 682 | }; |
718 | 683 | ||
@@ -968,9 +933,9 @@ handle_dht_local_monitor_stop ( | |||
968 | struct ForwardReplyContext | 933 | struct ForwardReplyContext |
969 | { | 934 | { |
970 | /** | 935 | /** |
971 | * Expiration time of the reply. | 936 | * Block details. |
972 | */ | 937 | */ |
973 | struct GNUNET_TIME_Absolute expiration; | 938 | const struct GDS_DATACACHE_BlockData *bd; |
974 | 939 | ||
975 | /** | 940 | /** |
976 | * GET path taken. | 941 | * GET path taken. |
@@ -978,39 +943,10 @@ struct ForwardReplyContext | |||
978 | const struct GNUNET_PeerIdentity *get_path; | 943 | const struct GNUNET_PeerIdentity *get_path; |
979 | 944 | ||
980 | /** | 945 | /** |
981 | * PUT path taken. | ||
982 | */ | ||
983 | const struct GNUNET_PeerIdentity *put_path; | ||
984 | |||
985 | /** | ||
986 | * Hash under which the payload is stored. | ||
987 | */ | ||
988 | const struct GNUNET_HashCode *query_hash; | ||
989 | |||
990 | /** | ||
991 | * Embedded payload. | ||
992 | */ | ||
993 | const void *data; | ||
994 | |||
995 | /** | ||
996 | * Number of bytes in data. | ||
997 | */ | ||
998 | size_t data_size; | ||
999 | |||
1000 | /** | ||
1001 | * Number of entries in @e get_path. | 946 | * Number of entries in @e get_path. |
1002 | */ | 947 | */ |
1003 | unsigned int get_path_length; | 948 | unsigned int get_path_length; |
1004 | 949 | ||
1005 | /** | ||
1006 | * Number of entries in @e put_path. | ||
1007 | */ | ||
1008 | unsigned int put_path_length; | ||
1009 | |||
1010 | /** | ||
1011 | * Type of the data. | ||
1012 | */ | ||
1013 | enum GNUNET_BLOCK_Type type; | ||
1014 | }; | 950 | }; |
1015 | 951 | ||
1016 | 952 | ||
@@ -1019,15 +955,15 @@ struct ForwardReplyContext | |||
1019 | * each of the matching clients. With some tricky recycling | 955 | * each of the matching clients. With some tricky recycling |
1020 | * of the buffer. | 956 | * of the buffer. |
1021 | * | 957 | * |
1022 | * @param cls the 'struct ForwardReplyContext' | 958 | * @param cls the `struct ForwardReplyContext` |
1023 | * @param key current key | 959 | * @param query_hash hash of the query for which this may be a reply |
1024 | * @param value value in the hash map, a ClientQueryRecord | 960 | * @param value value in the hash map, a ClientQueryRecord |
1025 | * @return #GNUNET_YES (we should continue to iterate), | 961 | * @return #GNUNET_YES (we should continue to iterate), |
1026 | * if the result is mal-formed, #GNUNET_NO | 962 | * if the result is mal-formed, #GNUNET_NO |
1027 | */ | 963 | */ |
1028 | static enum GNUNET_GenericReturnValue | 964 | static enum GNUNET_GenericReturnValue |
1029 | forward_reply (void *cls, | 965 | forward_reply (void *cls, |
1030 | const struct GNUNET_HashCode *key, | 966 | const struct GNUNET_HashCode *query_hash, |
1031 | void *value) | 967 | void *value) |
1032 | { | 968 | { |
1033 | struct ForwardReplyContext *frc = cls; | 969 | struct ForwardReplyContext *frc = cls; |
@@ -1041,13 +977,13 @@ forward_reply (void *cls, | |||
1041 | 977 | ||
1042 | LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, | 978 | LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, |
1043 | "CLIENT-RESULT %s\n", | 979 | "CLIENT-RESULT %s\n", |
1044 | GNUNET_h2s_full (frc->query_hash)); | 980 | GNUNET_h2s_full (&frc->bd->key)); |
1045 | if ((record->type != GNUNET_BLOCK_TYPE_ANY) && | 981 | if ( (record->type != GNUNET_BLOCK_TYPE_ANY) && |
1046 | (record->type != frc->type)) | 982 | (record->type != frc->bd->type) ) |
1047 | { | 983 | { |
1048 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 984 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1049 | "Record type mismatch, not passing request for key %s to local client\n", | 985 | "Record type mismatch, not passing request for key %s to local client\n", |
1050 | GNUNET_h2s (key)); | 986 | GNUNET_h2s (&frc->bd->key)); |
1051 | GNUNET_STATISTICS_update (GDS_stats, | 987 | GNUNET_STATISTICS_update (GDS_stats, |
1052 | "# Key match, type mismatches in REPLY to CLIENT", | 988 | "# Key match, type mismatches in REPLY to CLIENT", |
1053 | 1, | 989 | 1, |
@@ -1055,8 +991,8 @@ forward_reply (void *cls, | |||
1055 | return GNUNET_YES; /* type mismatch */ | 991 | return GNUNET_YES; /* type mismatch */ |
1056 | } | 992 | } |
1057 | if ( (0 == (record->msg_options & GNUNET_DHT_RO_FIND_PEER)) && | 993 | if ( (0 == (record->msg_options & GNUNET_DHT_RO_FIND_PEER)) && |
1058 | (0 != GNUNET_memcmp (key, | 994 | (0 != GNUNET_memcmp (&frc->bd->key, |
1059 | frc->query_hash)) ) | 995 | query_hash)) ) |
1060 | { | 996 | { |
1061 | GNUNET_STATISTICS_update (GDS_stats, | 997 | GNUNET_STATISTICS_update (GDS_stats, |
1062 | "# Inexact key match, but exact match required", | 998 | "# Inexact key match, but exact match required", |
@@ -1064,37 +1000,36 @@ forward_reply (void *cls, | |||
1064 | GNUNET_NO); | 1000 | GNUNET_NO); |
1065 | return GNUNET_YES; /* type mismatch */ | 1001 | return GNUNET_YES; /* type mismatch */ |
1066 | } | 1002 | } |
1067 | GNUNET_CRYPTO_hash (frc->data, | 1003 | GNUNET_CRYPTO_hash (frc->bd->data, |
1068 | frc->data_size, | 1004 | frc->bd->data_size, |
1069 | &ch); | 1005 | &ch); |
1070 | for (unsigned int i = 0; i < record->seen_replies_count; i++) | 1006 | for (unsigned int i = 0; i < record->seen_replies_count; i++) |
1071 | if (0 == memcmp (&record->seen_replies[i], | 1007 | if (0 == |
1072 | &ch, | 1008 | GNUNET_memcmp (&record->seen_replies[i], |
1073 | sizeof(struct GNUNET_HashCode))) | 1009 | &ch)) |
1074 | { | 1010 | { |
1075 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1011 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1076 | "Duplicate reply, not passing request for key %s to local client\n", | 1012 | "Duplicate reply, not passing request for key %s to local client\n", |
1077 | GNUNET_h2s (key)); | 1013 | GNUNET_h2s (&frc->bd->key)); |
1078 | GNUNET_STATISTICS_update (GDS_stats, | 1014 | GNUNET_STATISTICS_update (GDS_stats, |
1079 | gettext_noop | 1015 | "# Duplicate REPLIES to CLIENT request dropped", |
1080 | ( | 1016 | 1, |
1081 | "# Duplicate REPLIES to CLIENT request dropped"), | 1017 | GNUNET_NO); |
1082 | 1, GNUNET_NO); | ||
1083 | return GNUNET_YES; /* duplicate */ | 1018 | return GNUNET_YES; /* duplicate */ |
1084 | } | 1019 | } |
1085 | eval | 1020 | eval |
1086 | = GNUNET_BLOCK_check_reply (GDS_block_context, | 1021 | = GNUNET_BLOCK_check_reply (GDS_block_context, |
1087 | record->type, | 1022 | record->type, |
1088 | NULL, | 1023 | NULL, |
1089 | key, | 1024 | &frc->bd->key, |
1090 | record->xquery, | 1025 | record->xquery, |
1091 | record->xquery_size, | 1026 | record->xquery_size, |
1092 | frc->data, | 1027 | frc->bd->data, |
1093 | frc->data_size); | 1028 | frc->bd->data_size); |
1094 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1029 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1095 | "Evaluation result is %d for key %s for local client's query\n", | 1030 | "Evaluation result is %d for key %s for local client's query\n", |
1096 | (int) eval, | 1031 | (int) eval, |
1097 | GNUNET_h2s (key)); | 1032 | GNUNET_h2s (&frc->bd->key)); |
1098 | switch (eval) | 1033 | switch (eval) |
1099 | { | 1034 | { |
1100 | case GNUNET_BLOCK_REPLY_OK_LAST: | 1035 | case GNUNET_BLOCK_REPLY_OK_LAST: |
@@ -1125,29 +1060,29 @@ forward_reply (void *cls, | |||
1125 | 1, | 1060 | 1, |
1126 | GNUNET_NO); | 1061 | GNUNET_NO); |
1127 | env = GNUNET_MQ_msg_extra (reply, | 1062 | env = GNUNET_MQ_msg_extra (reply, |
1128 | frc->data_size | 1063 | frc->bd->data_size |
1129 | + (frc->get_path_length + frc->put_path_length) | 1064 | + (frc->get_path_length + frc->bd->put_path_length) |
1130 | * sizeof(struct GNUNET_PeerIdentity), | 1065 | * sizeof(struct GNUNET_PeerIdentity), |
1131 | GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT); | 1066 | GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT); |
1132 | reply->type = htonl (frc->type); | 1067 | reply->type = htonl (frc->bd->type); |
1133 | reply->get_path_length = htonl (frc->get_path_length); | 1068 | reply->get_path_length = htonl (frc->get_path_length); |
1134 | reply->put_path_length = htonl (frc->put_path_length); | 1069 | reply->put_path_length = htonl (frc->bd->put_path_length); |
1135 | reply->unique_id = record->unique_id; | 1070 | reply->unique_id = record->unique_id; |
1136 | reply->expiration = GNUNET_TIME_absolute_hton (frc->expiration); | 1071 | reply->expiration = GNUNET_TIME_absolute_hton (frc->bd->expiration_time); |
1137 | reply->key = *key; | 1072 | reply->key = frc->bd->key; |
1138 | paths = (struct GNUNET_PeerIdentity *) &reply[1]; | 1073 | paths = (struct GNUNET_PeerIdentity *) &reply[1]; |
1139 | GNUNET_memcpy (paths, | 1074 | GNUNET_memcpy (paths, |
1140 | frc->put_path, | 1075 | frc->bd->put_path, |
1141 | sizeof(struct GNUNET_PeerIdentity) * frc->put_path_length); | 1076 | sizeof(struct GNUNET_PeerIdentity) * frc->bd->put_path_length); |
1142 | GNUNET_memcpy (&paths[frc->put_path_length], | 1077 | GNUNET_memcpy (&paths[frc->bd->put_path_length], |
1143 | frc->get_path, | 1078 | frc->get_path, |
1144 | sizeof(struct GNUNET_PeerIdentity) * frc->get_path_length); | 1079 | sizeof(struct GNUNET_PeerIdentity) * frc->get_path_length); |
1145 | GNUNET_memcpy (&paths[frc->get_path_length + frc->put_path_length], | 1080 | GNUNET_memcpy (&paths[frc->get_path_length + frc->bd->put_path_length], |
1146 | frc->data, | 1081 | frc->bd->data, |
1147 | frc->data_size); | 1082 | frc->bd->data_size); |
1148 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1083 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1149 | "Sending reply to query %s for client %p\n", | 1084 | "Sending reply to query %s for client %p\n", |
1150 | GNUNET_h2s (key), | 1085 | GNUNET_h2s (query_hash), |
1151 | record->ch->client); | 1086 | record->ch->client); |
1152 | GNUNET_MQ_send (record->ch->mq, | 1087 | GNUNET_MQ_send (record->ch->mq, |
1153 | env); | 1088 | env); |
@@ -1157,66 +1092,38 @@ forward_reply (void *cls, | |||
1157 | } | 1092 | } |
1158 | 1093 | ||
1159 | 1094 | ||
1160 | /** | ||
1161 | * Handle a reply we've received from another peer. If the reply | ||
1162 | * matches any of our pending queries, forward it to the respective | ||
1163 | * client(s). | ||
1164 | * | ||
1165 | * @param expiration when will the reply expire | ||
1166 | * @param key the key of the query that triggered the reply | ||
1167 | * @param query_hash the query hash of the response | ||
1168 | * @param get_path_length number of peers in @a get_path | ||
1169 | * @param get_path path the reply took on get | ||
1170 | * @param put_path_length number of peers in @a put_path | ||
1171 | * @param put_path path the reply took on put | ||
1172 | * @param type type of the reply | ||
1173 | * @param data_size number of bytes in @a data | ||
1174 | * @param data application payload data | ||
1175 | */ | ||
1176 | void | 1095 | void |
1177 | GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration, | 1096 | GDS_CLIENTS_handle_reply (const struct GDS_DATACACHE_BlockData *bd, |
1178 | const struct GNUNET_HashCode *key, | ||
1179 | const struct GNUNET_HashCode *query_hash, | 1097 | const struct GNUNET_HashCode *query_hash, |
1180 | unsigned int get_path_length, | 1098 | unsigned int get_path_length, |
1181 | const struct GNUNET_PeerIdentity *get_path, | 1099 | const struct GNUNET_PeerIdentity *get_path) |
1182 | unsigned int put_path_length, | ||
1183 | const struct GNUNET_PeerIdentity *put_path, | ||
1184 | enum GNUNET_BLOCK_Type type, | ||
1185 | size_t data_size, | ||
1186 | const void *data) | ||
1187 | { | 1100 | { |
1188 | struct ForwardReplyContext frc; | 1101 | struct ForwardReplyContext frc; |
1189 | size_t msize; | 1102 | size_t msize = sizeof (struct GNUNET_DHT_ClientResultMessage) |
1103 | + bd->data_size | ||
1104 | + (get_path_length + bd->put_path_length) | ||
1105 | * sizeof(struct GNUNET_PeerIdentity); | ||
1190 | 1106 | ||
1191 | msize = sizeof(struct GNUNET_DHT_ClientResultMessage) + data_size | ||
1192 | + (get_path_length + put_path_length) | ||
1193 | * sizeof(struct GNUNET_PeerIdentity); | ||
1194 | if (msize >= GNUNET_MAX_MESSAGE_SIZE) | 1107 | if (msize >= GNUNET_MAX_MESSAGE_SIZE) |
1195 | { | 1108 | { |
1196 | GNUNET_break (0); | 1109 | GNUNET_break (0); |
1197 | return; | 1110 | return; |
1198 | } | 1111 | } |
1199 | frc.expiration = expiration; | 1112 | frc.bd = bd; |
1200 | frc.query_hash = query_hash; | ||
1201 | frc.get_path = get_path; | 1113 | frc.get_path = get_path; |
1202 | frc.put_path = put_path; | ||
1203 | frc.data = data; | ||
1204 | frc.data_size = data_size; | ||
1205 | frc.get_path_length = get_path_length; | 1114 | frc.get_path_length = get_path_length; |
1206 | frc.put_path_length = put_path_length; | ||
1207 | frc.type = type; | ||
1208 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1115 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1209 | "Forwarding reply for key %s to client\n", | 1116 | "Forwarding reply for query hash %s to client\n", |
1210 | GNUNET_h2s (key)); | 1117 | GNUNET_h2s (query_hash)); |
1211 | if (0 == | 1118 | if (0 == |
1212 | GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, | 1119 | GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, |
1213 | key, | 1120 | query_hash, |
1214 | &forward_reply, | 1121 | &forward_reply, |
1215 | &frc)) | 1122 | &frc)) |
1216 | { | 1123 | { |
1217 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1124 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1218 | "No matching client for reply for key %s\n", | 1125 | "No matching client for reply for query %s\n", |
1219 | GNUNET_h2s (key)); | 1126 | GNUNET_h2s (query_hash)); |
1220 | GNUNET_STATISTICS_update (GDS_stats, | 1127 | GNUNET_STATISTICS_update (GDS_stats, |
1221 | "# REPLIES ignored for CLIENTS (no match)", | 1128 | "# REPLIES ignored for CLIENTS (no match)", |
1222 | 1, | 1129 | 1, |
@@ -1298,15 +1205,9 @@ GDS_CLIENTS_process_get (uint32_t options, | |||
1298 | 1205 | ||
1299 | 1206 | ||
1300 | void | 1207 | void |
1301 | GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type, | 1208 | GDS_CLIENTS_process_get_resp (const struct GDS_DATACACHE_BlockData *bd, |
1302 | const struct GNUNET_PeerIdentity *get_path, | 1209 | const struct GNUNET_PeerIdentity *get_path, |
1303 | unsigned int get_path_length, | 1210 | unsigned int get_path_length) |
1304 | const struct GNUNET_PeerIdentity *put_path, | ||
1305 | unsigned int put_path_length, | ||
1306 | struct GNUNET_TIME_Absolute exp, | ||
1307 | const struct GNUNET_HashCode *key, | ||
1308 | const void *data, | ||
1309 | size_t size) | ||
1310 | { | 1211 | { |
1311 | struct ClientHandle **cl = NULL; | 1212 | struct ClientHandle **cl = NULL; |
1312 | unsigned int cl_size = 0; | 1213 | unsigned int cl_size = 0; |
@@ -1316,9 +1217,9 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type, | |||
1316 | m = m->next) | 1217 | m = m->next) |
1317 | { | 1218 | { |
1318 | if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) || | 1219 | if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) || |
1319 | (m->type == type) ) && | 1220 | (m->type == bd->type) ) && |
1320 | ( (GNUNET_is_zero (&m->key)) || | 1221 | ( (GNUNET_is_zero (&m->key)) || |
1321 | (0 == GNUNET_memcmp (key, | 1222 | (0 == GNUNET_memcmp (&bd->key, |
1322 | &m->key)) ) ) | 1223 | &m->key)) ) ) |
1323 | { | 1224 | { |
1324 | struct GNUNET_MQ_Envelope *env; | 1225 | struct GNUNET_MQ_Envelope *env; |
@@ -1336,27 +1237,27 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type, | |||
1336 | GNUNET_array_append (cl, | 1237 | GNUNET_array_append (cl, |
1337 | cl_size, | 1238 | cl_size, |
1338 | m->ch); | 1239 | m->ch); |
1339 | msize = size; | 1240 | msize = bd->data_size; |
1340 | msize += (get_path_length + put_path_length) | 1241 | msize += (get_path_length + bd->put_path_length) |
1341 | * sizeof(struct GNUNET_PeerIdentity); | 1242 | * sizeof(struct GNUNET_PeerIdentity); |
1342 | env = GNUNET_MQ_msg_extra (mmsg, | 1243 | env = GNUNET_MQ_msg_extra (mmsg, |
1343 | msize, | 1244 | msize, |
1344 | GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP); | 1245 | GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP); |
1345 | mmsg->type = htonl (type); | 1246 | mmsg->type = htonl (bd->type); |
1346 | mmsg->put_path_length = htonl (put_path_length); | 1247 | mmsg->put_path_length = htonl (bd->put_path_length); |
1347 | mmsg->get_path_length = htonl (get_path_length); | 1248 | mmsg->get_path_length = htonl (get_path_length); |
1348 | mmsg->expiration_time = GNUNET_TIME_absolute_hton (exp); | 1249 | mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time); |
1349 | mmsg->key = *key; | 1250 | mmsg->key = bd->key; |
1350 | path = (struct GNUNET_PeerIdentity *) &mmsg[1]; | 1251 | path = (struct GNUNET_PeerIdentity *) &mmsg[1]; |
1351 | GNUNET_memcpy (path, | 1252 | GNUNET_memcpy (path, |
1352 | put_path, | 1253 | bd->put_path, |
1353 | put_path_length * sizeof(struct GNUNET_PeerIdentity)); | 1254 | bd->put_path_length * sizeof(struct GNUNET_PeerIdentity)); |
1354 | GNUNET_memcpy (path, | 1255 | GNUNET_memcpy (path, |
1355 | get_path, | 1256 | get_path, |
1356 | get_path_length * sizeof(struct GNUNET_PeerIdentity)); | 1257 | get_path_length * sizeof(struct GNUNET_PeerIdentity)); |
1357 | GNUNET_memcpy (&path[get_path_length], | 1258 | GNUNET_memcpy (&path[get_path_length], |
1358 | data, | 1259 | bd->data, |
1359 | size); | 1260 | bd->data_size); |
1360 | GNUNET_MQ_send (m->ch->mq, | 1261 | GNUNET_MQ_send (m->ch->mq, |
1361 | env); | 1262 | env); |
1362 | } | 1263 | } |
@@ -1365,32 +1266,11 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type, | |||
1365 | } | 1266 | } |
1366 | 1267 | ||
1367 | 1268 | ||
1368 | /** | ||
1369 | * Check if some client is monitoring PUT messages and notify | ||
1370 | * them in that case. The @a path should include our own peer ID. | ||
1371 | * | ||
1372 | * @param options Options, for instance RecordRoute, DemultiplexEverywhere. | ||
1373 | * @param type The type of data in the request. | ||
1374 | * @param hop_count Hop count so far. | ||
1375 | * @param path_length number of entries in path (or 0 if not recorded). | ||
1376 | * @param path peers on the PUT path (or NULL if not recorded). | ||
1377 | * @param desired_replication_level Desired replication level. | ||
1378 | * @param exp Expiration time of the data. | ||
1379 | * @param key Key under which data is to be stored. | ||
1380 | * @param data Pointer to the data carried. | ||
1381 | * @param size Number of bytes in data. | ||
1382 | */ | ||
1383 | void | 1269 | void |
1384 | GDS_CLIENTS_process_put (uint32_t options, | 1270 | GDS_CLIENTS_process_put (enum GNUNET_DHT_RouteOption options, |
1385 | enum GNUNET_BLOCK_Type type, | 1271 | const struct GDS_DATACACHE_BlockData *bd, |
1386 | uint32_t hop_count, | 1272 | uint32_t hop_count, |
1387 | uint32_t desired_replication_level, | 1273 | uint32_t desired_replication_level) |
1388 | unsigned int path_length, | ||
1389 | const struct GNUNET_PeerIdentity *path, | ||
1390 | struct GNUNET_TIME_Absolute exp, | ||
1391 | const struct GNUNET_HashCode *key, | ||
1392 | const void *data, | ||
1393 | size_t size) | ||
1394 | { | 1274 | { |
1395 | struct ClientHandle **cl = NULL; | 1275 | struct ClientHandle **cl = NULL; |
1396 | unsigned int cl_size = 0; | 1276 | unsigned int cl_size = 0; |
@@ -1400,10 +1280,10 @@ GDS_CLIENTS_process_put (uint32_t options, | |||
1400 | m = m->next) | 1280 | m = m->next) |
1401 | { | 1281 | { |
1402 | if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) || | 1282 | if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) || |
1403 | (m->type == type) ) && | 1283 | (m->type == bd->type) ) && |
1404 | ( (GNUNET_is_zero (&m->key)) || | 1284 | ( (GNUNET_is_zero (&m->key)) || |
1405 | (0 == | 1285 | (0 == |
1406 | GNUNET_memcmp (key, | 1286 | GNUNET_memcmp (&bd->key, |
1407 | &m->key)) ) ) | 1287 | &m->key)) ) ) |
1408 | { | 1288 | { |
1409 | struct GNUNET_MQ_Envelope *env; | 1289 | struct GNUNET_MQ_Envelope *env; |
@@ -1421,25 +1301,25 @@ GDS_CLIENTS_process_put (uint32_t options, | |||
1421 | GNUNET_array_append (cl, | 1301 | GNUNET_array_append (cl, |
1422 | cl_size, | 1302 | cl_size, |
1423 | m->ch); | 1303 | m->ch); |
1424 | msize = size; | 1304 | msize = bd->data_size; |
1425 | msize += path_length * sizeof(struct GNUNET_PeerIdentity); | 1305 | msize += bd->put_path_length * sizeof(struct GNUNET_PeerIdentity); |
1426 | env = GNUNET_MQ_msg_extra (mmsg, | 1306 | env = GNUNET_MQ_msg_extra (mmsg, |
1427 | msize, | 1307 | msize, |
1428 | GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT); | 1308 | GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT); |
1429 | mmsg->options = htonl (options); | 1309 | mmsg->options = htonl (options); |
1430 | mmsg->type = htonl (type); | 1310 | mmsg->type = htonl (bd->type); |
1431 | mmsg->hop_count = htonl (hop_count); | 1311 | mmsg->hop_count = htonl (hop_count); |
1432 | mmsg->desired_replication_level = htonl (desired_replication_level); | 1312 | mmsg->desired_replication_level = htonl (desired_replication_level); |
1433 | mmsg->put_path_length = htonl (path_length); | 1313 | mmsg->put_path_length = htonl (bd->put_path_length); |
1434 | mmsg->key = *key; | 1314 | mmsg->key = bd->key; |
1435 | mmsg->expiration_time = GNUNET_TIME_absolute_hton (exp); | 1315 | mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time); |
1436 | msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; | 1316 | msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; |
1437 | GNUNET_memcpy (msg_path, | 1317 | GNUNET_memcpy (msg_path, |
1438 | path, | 1318 | bd->put_path, |
1439 | path_length * sizeof(struct GNUNET_PeerIdentity)); | 1319 | bd->put_path_length * sizeof(struct GNUNET_PeerIdentity)); |
1440 | GNUNET_memcpy (&msg_path[path_length], | 1320 | GNUNET_memcpy (&msg_path[bd->put_path_length], |
1441 | data, | 1321 | bd->data, |
1442 | size); | 1322 | bd->data_size); |
1443 | GNUNET_MQ_send (m->ch->mq, | 1323 | GNUNET_MQ_send (m->ch->mq, |
1444 | env); | 1324 | env); |
1445 | } | 1325 | } |