aboutsummaryrefslogtreecommitdiff
path: root/src/dht
diff options
context:
space:
mode:
authorBart Polot <bart@net.in.tum.de>2012-04-20 17:18:14 +0000
committerBart Polot <bart@net.in.tum.de>2012-04-20 17:18:14 +0000
commit90c9abc573f95de334a1f61faa089e79046d2f11 (patch)
tree89b596897cf1a17ea9c048748ec063b063f8d634 /src/dht
parent6c889a1786be40c0d023e8971411bc327af352c6 (diff)
downloadgnunet-90c9abc573f95de334a1f61faa089e79046d2f11.tar.gz
gnunet-90c9abc573f95de334a1f61faa089e79046d2f11.zip
- Rewritten DHT monitoring
Diffstat (limited to 'src/dht')
-rw-r--r--src/dht/dht.h44
-rw-r--r--src/dht/dht_api.c238
-rw-r--r--src/dht/gnunet-service-dht_clients.c283
-rw-r--r--src/dht/gnunet-service-dht_clients.h86
-rw-r--r--src/dht/gnunet-service-dht_neighbours.c36
5 files changed, 564 insertions, 123 deletions
diff --git a/src/dht/dht.h b/src/dht/dht.h
index 936895b83..a7126aed7 100644
--- a/src/dht/dht.h
+++ b/src/dht/dht.h
@@ -249,6 +249,48 @@ struct GNUNET_DHT_MonitorPutMessage
249 249
250 250
251/** 251/**
252 * Message to request monitoring messages, clients --> DHT service.
253 */
254struct GNUNET_DHT_MonitorStartMessage
255{
256 /**
257 * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_START
258 */
259 struct GNUNET_MessageHeader header;
260
261 /**
262 * The type of data desired, GNUNET_BLOCK_TYPE_ANY for all.
263 */
264 uint32_t type GNUNET_PACKED;
265
266 /**
267 * Flag whether to notify about GET messages.
268 */
269 int16_t get GNUNET_PACKED;
270
271 /**
272 * Flag whether to notify about GET_REPONSE messages.
273 */
274 int16_t get_resp GNUNET_PACKED;
275
276 /**
277 * Flag whether to notify about PUT messages.
278 */
279 int16_t put GNUNET_PACKED;
280
281 /**
282 * Flag whether to use the provided key to filter messages.
283 */
284 int16_t filter_key GNUNET_PACKED;
285
286 /**
287 * The key to filter messages by..
288 */
289 GNUNET_HashCode key;
290};
291
292
293/**
252 * Message to monitor get requests going through peer, DHT service --> clients. 294 * Message to monitor get requests going through peer, DHT service --> clients.
253 */ 295 */
254struct GNUNET_DHT_MonitorGetMessage 296struct GNUNET_DHT_MonitorGetMessage
@@ -296,7 +338,7 @@ struct GNUNET_DHT_MonitorGetMessage
296/** 338/**
297 * Message to monitor get results going through peer, DHT service --> clients. 339 * Message to monitor get results going through peer, DHT service --> clients.
298 */ 340 */
299struct GNUNET_DHT_MonitorGetResultMessage 341struct GNUNET_DHT_MonitorGetRespMessage
300{ 342{
301 /** 343 /**
302 * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT 344 * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c
index 1030b7999..3f850b3e7 100644
--- a/src/dht/dht_api.c
+++ b/src/dht/dht_api.c
@@ -171,9 +171,19 @@ struct GNUNET_DHT_MonitorHandle
171 GNUNET_HashCode *key; 171 GNUNET_HashCode *key;
172 172
173 /** 173 /**
174 * Callback for each received message of interest. 174 * Callback for each received message of type get.
175 */ 175 */
176 GNUNET_DHT_MonitorCB cb; 176 GNUNET_DHT_MonitorGetCB get_cb;
177
178 /**
179 * Callback for each received message of type get response.
180 */
181 GNUNET_DHT_MonitorGetRespCB get_resp_cb;
182
183 /**
184 * Callback for each received message of type put.
185 */
186 GNUNET_DHT_MonitorPutCB put_cb;
177 187
178 /** 188 /**
179 * Closure for cb. 189 * Closure for cb.
@@ -533,63 +543,205 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value)
533 return GNUNET_YES; 543 return GNUNET_YES;
534} 544}
535 545
536
537/** 546/**
538 * Process a monitoring message from the service. 547 * Process a get monitor message from the service.
539 * 548 *
540 * @param handle The DHT handle. 549 * @param handle The DHT handle.
541 * @param msg Message from the service. 550 * @param msg Monitor get message from the service.
542 * 551 *
543 * @return GNUNET_OK if everything went fine, 552 * @return GNUNET_OK if everything went fine,
544 * GNUNET_SYSERR if the message is malformed. 553 * GNUNET_SYSERR if the message is malformed.
545 */ 554 */
546static int 555static int
547process_monitor_message (struct GNUNET_DHT_Handle *handle, 556process_monitor_get_message (struct GNUNET_DHT_Handle *handle,
548 const struct GNUNET_MessageHeader *msg) 557 const struct GNUNET_DHT_MonitorGetMessage *msg)
549{ 558{
550 struct GNUNET_DHT_MonitorMessage *m;
551 struct GNUNET_DHT_MonitorHandle *h; 559 struct GNUNET_DHT_MonitorHandle *h;
552 size_t msize; 560 size_t msize;
553 561
554 if (ntohs (msg->type) < GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET || 562 msize = ntohs (msg->header.size);
555 ntohs (msg->type) > GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT) 563 if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage))
556 return GNUNET_SYSERR; 564 return GNUNET_SYSERR;
557 msize = ntohs (msg->size); 565
558 if (msize < sizeof (struct GNUNET_DHT_MonitorMessage)) 566 h = handle->monitor_head;
567 while (NULL != h)
568 {
569 int type_ok;
570 int key_ok;
571
572 type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type);
573 key_ok = NULL == h->key || memcmp (h->key, &msg->key,
574 sizeof (GNUNET_HashCode)) == 0;
575 if (type_ok && key_ok && NULL != h->get_cb)
576 {
577 h->get_cb (h->cb_cls,
578 ntohl (msg->options),
579 (enum GNUNET_BLOCK_Type) ntohl(msg->type),
580 ntohl (msg->hop_count),
581 ntohl (msg->desired_replication_level),
582 ntohl (msg->get_path_length),
583 (struct GNUNET_PeerIdentity *) &msg[1],
584 &msg->key);
585 }
586 h = h->next;
587 }
588 return GNUNET_OK;
589}
590
591
592/**
593 * Process a get response monitor message from the service.
594 *
595 * @param handle The DHT handle.
596 * @param msg Monitor get response message from the service.
597 *
598 * @return GNUNET_OK if everything went fine,
599 * GNUNET_SYSERR if the message is malformed.
600 */
601static int
602process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle,
603 const struct GNUNET_DHT_MonitorGetRespMessage
604 *msg)
605{
606 struct GNUNET_DHT_MonitorHandle *h;
607 size_t msize;
608
609 msize = ntohs (msg->header.size);
610 if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage))
559 return GNUNET_SYSERR; 611 return GNUNET_SYSERR;
560 612
561 m = (struct GNUNET_DHT_MonitorMessage *) msg;
562 h = handle->monitor_head; 613 h = handle->monitor_head;
563 while (NULL != h) 614 while (NULL != h)
564 { 615 {
565 if (h->type == ntohl(m->type) && 616 int type_ok;
566 (NULL == h->key || 617 int key_ok;
567 memcmp (h->key, &m->key, sizeof (GNUNET_HashCode)) == 0)) 618
619 type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type);
620 key_ok = NULL == h->key || memcmp (h->key, &msg->key,
621 sizeof (GNUNET_HashCode)) == 0;
622 if (type_ok && key_ok && NULL != h->get_resp_cb)
568 { 623 {
569 struct GNUNET_PeerIdentity *path; 624 struct GNUNET_PeerIdentity *path;
570 uint32_t getl; 625 uint32_t getl;
571 uint32_t putl; 626 uint32_t putl;
572 627
573 path = (struct GNUNET_PeerIdentity *) &m[1]; 628 path = (struct GNUNET_PeerIdentity *) &msg[1];
574 getl = ntohl (m->get_path_length); 629 getl = ntohl (msg->get_path_length);
575 putl = ntohl (m->put_path_length); 630 putl = ntohl (msg->put_path_length);
576 h->cb (h->cb_cls, ntohs(msg->type), 631 h->get_resp_cb (h->cb_cls,
577 GNUNET_TIME_absolute_ntoh(m->expiration), 632 (enum GNUNET_BLOCK_Type) ntohl(msg->type),
578 &m->key, 633 path, getl,
579 &path[getl], putl, path, getl, 634 &path[getl], putl,
580 ntohl (m->desired_replication_level), 635 GNUNET_TIME_absolute_ntoh(msg->expiration_time),
581 ntohl (m->options), ntohl (m->type), 636 &msg->key,
582 (void *) &path[getl + putl], 637 (void *) &path[getl + putl],
583 ntohs (msg->size) - 638 msize -
584 sizeof (struct GNUNET_DHT_MonitorMessage) - 639 sizeof (struct GNUNET_DHT_MonitorGetRespMessage) -
585 sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); 640 sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
586 } 641 }
587 h = h->next; 642 h = h->next;
588 } 643 }
644 return GNUNET_OK;
645}
646
647
648/**
649 * Process a put monitor message from the service.
650 *
651 * @param handle The DHT handle.
652 * @param msg Monitor put message from the service.
653 *
654 * @return GNUNET_OK if everything went fine,
655 * GNUNET_SYSERR if the message is malformed.
656 */
657static int
658process_monitor_put_message (struct GNUNET_DHT_Handle *handle,
659 const struct GNUNET_DHT_MonitorPutMessage *msg)
660{
661 struct GNUNET_DHT_MonitorHandle *h;
662 size_t msize;
663
664 msize = ntohs (msg->header.size);
665 if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage))
666 return GNUNET_SYSERR;
667
668 h = handle->monitor_head;
669 while (NULL != h)
670 {
671 int type_ok;
672 int key_ok;
673
674 type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type);
675 key_ok = NULL == h->key || memcmp (h->key, &msg->key,
676 sizeof (GNUNET_HashCode)) == 0;
677 if (type_ok && key_ok && NULL != h->put_cb)
678 {
679 struct GNUNET_PeerIdentity *path;
680 uint32_t putl;
589 681
682 path = (struct GNUNET_PeerIdentity *) &msg[1];
683 putl = ntohl (msg->put_path_length);
684 h->put_cb (h->cb_cls,
685 ntohl (msg->options),
686 (enum GNUNET_BLOCK_Type) ntohl(msg->type),
687 ntohl (msg->hop_count),
688 ntohl (msg->desired_replication_level),
689 putl, path,
690 GNUNET_TIME_absolute_ntoh(msg->expiration_time),
691 &msg->key,
692 (void *) &path[putl],
693 msize -
694 sizeof (struct GNUNET_DHT_MonitorPutMessage) -
695 sizeof (struct GNUNET_PeerIdentity) * putl);
696 }
697 h = h->next;
698 }
590 return GNUNET_OK; 699 return GNUNET_OK;
591} 700}
592 701
702
703/**
704 * Process a monitoring message from the service: demultiplex for proper type.
705 *
706 * @param handle The DHT handle.
707 * @param msg Message from the service.
708 *
709 * @return GNUNET_OK if everything went fine,
710 * GNUNET_SYSERR if the message is malformed.
711 */
712static int
713process_monitor_message (struct GNUNET_DHT_Handle *handle,
714 const struct GNUNET_MessageHeader *msg)
715{
716 switch (ntohs (msg->type))
717 {
718 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET:
719 return process_monitor_get_message(handle,
720 (struct GNUNET_DHT_MonitorGetMessage *)
721 msg);
722
723 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP:
724 {
725 return process_monitor_get_resp_message(
726 handle,
727 (struct GNUNET_DHT_MonitorGetRespMessage *) msg);
728 }
729 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT:
730 {
731 return process_monitor_put_message(handle,
732 (struct GNUNET_DHT_MonitorPutMessage *)
733 msg);
734 }
735 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP:
736 /* Not implemented yet */
737 GNUNET_break(0);
738 /* Fall through */
739 default:
740 GNUNET_break(0);
741 return GNUNET_SYSERR;
742 }
743}
744
593/** 745/**
594 * Handler for messages received from the DHT service 746 * Handler for messages received from the DHT service
595 * a demultiplexer which handles numerous message types 747 * a demultiplexer which handles numerous message types
@@ -930,7 +1082,9 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
930 * @param handle Handle to the DHT service. 1082 * @param handle Handle to the DHT service.
931 * @param type Type of blocks that are of interest. 1083 * @param type Type of blocks that are of interest.
932 * @param key Key of data of interest, NULL for all. 1084 * @param key Key of data of interest, NULL for all.
933 * @param cb Callback to process all monitored data. 1085 * @param get_cb Callback to process monitored get messages.
1086 * @param get_resp_cb Callback to process monitored get response messages.
1087 * @param put_cb Callback to process monitored put messages.
934 * @param cb_cls Closure for cb. 1088 * @param cb_cls Closure for cb.
935 * 1089 *
936 * @return Handle to stop monitoring. 1090 * @return Handle to stop monitoring.
@@ -939,18 +1093,21 @@ struct GNUNET_DHT_MonitorHandle *
939GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, 1093GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
940 enum GNUNET_BLOCK_Type type, 1094 enum GNUNET_BLOCK_Type type,
941 const GNUNET_HashCode *key, 1095 const GNUNET_HashCode *key,
942 GNUNET_DHT_MonitorCB cb, 1096 GNUNET_DHT_MonitorGetCB get_cb,
1097 GNUNET_DHT_MonitorGetRespCB get_resp_cb,
1098 GNUNET_DHT_MonitorPutCB put_cb,
943 void *cb_cls) 1099 void *cb_cls)
944{ 1100{
945 struct GNUNET_DHT_MonitorHandle *h; 1101 struct GNUNET_DHT_MonitorHandle *h;
946 struct GNUNET_DHT_MonitorMessage *m; 1102 struct GNUNET_DHT_MonitorStartMessage *m;
947 struct PendingMessage *pending; 1103 struct PendingMessage *pending;
948 1104
949 h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle)); 1105 h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle));
950 GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h); 1106 GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h);
951 1107
952 GNUNET_assert (NULL != cb); 1108 h->get_cb = get_cb;
953 h->cb = cb; 1109 h->get_resp_cb = get_resp_cb;
1110 h->put_cb = put_cb;
954 h->cb_cls = cb_cls; 1111 h->cb_cls = cb_cls;
955 h->type = type; 1112 h->type = type;
956 h->dht_handle = handle; 1113 h->dht_handle = handle;
@@ -960,17 +1117,22 @@ GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
960 memcpy (h->key, key, sizeof(GNUNET_HashCode)); 1117 memcpy (h->key, key, sizeof(GNUNET_HashCode));
961 } 1118 }
962 1119
963 pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorMessage) + 1120 pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartMessage) +
964 sizeof (struct PendingMessage)); 1121 sizeof (struct PendingMessage));
965 m = (struct GNUNET_DHT_MonitorMessage *) &pending[1]; 1122 m = (struct GNUNET_DHT_MonitorStartMessage *) &pending[1];
966 pending->msg = &m->header; 1123 pending->msg = &m->header;
967 pending->handle = handle; 1124 pending->handle = handle;
968 pending->free_on_send = GNUNET_YES; 1125 pending->free_on_send = GNUNET_YES;
969 m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET); 1126 m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
970 m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorMessage)); 1127 m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartMessage));
971 m->type = htonl(type); 1128 m->type = htonl(type);
972 if (NULL != key) 1129 m->get = (NULL != get_cb);
1130 m->get_resp = (NULL != get_resp_cb);
1131 m->put = (NULL != put_cb);
1132 if (NULL != key) {
1133 m->filter_key = 1;
973 memcpy (&m->key, key, sizeof(GNUNET_HashCode)); 1134 memcpy (&m->key, key, sizeof(GNUNET_HashCode));
1135 }
974 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, 1136 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
975 pending); 1137 pending);
976 pending->in_pending_queue = GNUNET_YES; 1138 pending->in_pending_queue = GNUNET_YES;
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c
index 98e280094..f26d77792 100644
--- a/src/dht/gnunet-service-dht_clients.c
+++ b/src/dht/gnunet-service-dht_clients.c
@@ -204,6 +204,21 @@ struct ClientMonitorRecord
204 GNUNET_HashCode *key; 204 GNUNET_HashCode *key;
205 205
206 /** 206 /**
207 * Flag whether to notify about GET messages.
208 */
209 int16_t get;
210
211 /**
212 * Flag whether to notify about GET_REPONSE messages.
213 */
214 int16_t get_resp;
215
216 /**
217 * Flag whether to notify about PUT messages.
218 */
219 uint16_t put;
220
221 /**
207 * Client to notify of these requests. 222 * Client to notify of these requests.
208 */ 223 */
209 struct ClientList *client; 224 struct ClientList *client;
@@ -490,12 +505,16 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
490 peer_bf, &dht_msg->key, 0, NULL, &dht_msg[1], 505 peer_bf, &dht_msg->key, 0, NULL, &dht_msg[1],
491 size - 506 size -
492 sizeof (struct GNUNET_DHT_ClientPutMessage)); 507 sizeof (struct GNUNET_DHT_ClientPutMessage));
493 GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT, 508 GDS_CLIENTS_process_put (ntohl (dht_msg->options),
494 GNUNET_TIME_absolute_ntoh (dht_msg->expiration), &dht_msg->key, 509 ntohl (dht_msg->type),
495 1, GDS_NEIGHBOURS_get_id(), 0, NULL, 510 0,
496 ntohl (dht_msg->desired_replication_level), 511 ntohl (dht_msg->desired_replication_level),
497 ntohl (dht_msg->type), &(dht_msg[1].header), 512 1,
498 size - sizeof (struct GNUNET_DHT_ClientPutMessage)); 513 GDS_NEIGHBOURS_get_id(),
514 GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
515 &dht_msg->key,
516 &dht_msg[1],
517 size - sizeof (struct GNUNET_DHT_ClientPutMessage));
499 GNUNET_CONTAINER_bloomfilter_free (peer_bf); 518 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
500 GNUNET_SERVER_receive_done (client, GNUNET_OK); 519 GNUNET_SERVER_receive_done (client, GNUNET_OK);
501} 520}
@@ -554,11 +573,13 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
554 cqr->type = ntohl (get->type); 573 cqr->type = ntohl (get->type);
555 GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr, 574 GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr,
556 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 575 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
557 GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET, 576 GDS_CLIENTS_process_get (ntohl (get->options),
558 GNUNET_TIME_UNIT_FOREVER_ABS, &get->key, 577 ntohl (get->type),
559 0, NULL, 1, GDS_NEIGHBOURS_get_id(), 578 0,
560 ntohl (get->desired_replication_level), 579 ntohl (get->desired_replication_level),
561 ntohl (get->type), NULL, 0); 580 1,
581 GDS_NEIGHBOURS_get_id(),
582 &get->key);
562 /* start remote requests */ 583 /* start remote requests */
563 if (GNUNET_SCHEDULER_NO_TASK != retry_task) 584 if (GNUNET_SCHEDULER_NO_TASK != retry_task)
564 GNUNET_SCHEDULER_cancel (retry_task); 585 GNUNET_SCHEDULER_cancel (retry_task);
@@ -659,15 +680,18 @@ handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client,
659 const struct GNUNET_MessageHeader *message) 680 const struct GNUNET_MessageHeader *message)
660{ 681{
661 struct ClientMonitorRecord *r; 682 struct ClientMonitorRecord *r;
662 const struct GNUNET_DHT_MonitorMessage *msg; 683 const struct GNUNET_DHT_MonitorStartMessage *msg;
663 unsigned int i; 684 unsigned int i;
664 char *c; 685 char *c;
665 686
666 msg = (struct GNUNET_DHT_MonitorMessage *) message; 687 msg = (struct GNUNET_DHT_MonitorStartMessage *) message;
667 r = GNUNET_malloc (sizeof(struct ClientMonitorRecord)); 688 r = GNUNET_malloc (sizeof(struct ClientMonitorRecord));
668 689
669 r->client = find_active_client(client); 690 r->client = find_active_client(client);
670 r->type = ntohl(msg->type); 691 r->type = ntohl(msg->type);
692 r->get = msg->get;
693 r->get_resp = msg->get_resp;
694 r->put = msg->put;
671 c = (char *) &msg->key; 695 c = (char *) &msg->key;
672 for (i = 0; i < sizeof (GNUNET_HashCode) && c[i] == 0; i++); 696 for (i = 0; i < sizeof (GNUNET_HashCode) && c[i] == 0; i++);
673 if (sizeof (GNUNET_HashCode) == i) 697 if (sizeof (GNUNET_HashCode) == i)
@@ -1038,33 +1062,101 @@ GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
1038 1062
1039 1063
1040/** 1064/**
1041 * Check if some client is monitoring messages of this type and notify 1065 * Check if some client is monitoring GET messages and notify
1042 * him in that case. 1066 * them in that case.
1043 * 1067 *
1044 * @param mtype Type of the DHT message. 1068 * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1045 * @param exp When will this value expire. 1069 * @param type The type of data in the request.
1046 * @param key Key of the result/request. 1070 * @param hop_count Hop count so far.
1047 * @param putl number of entries in get_path. 1071 * @param path_length number of entries in path (or 0 if not recorded).
1048 * @param put_path peers on the PUT path (or NULL if not recorded). 1072 * @param path peers on the GET path (or NULL if not recorded).
1049 * @param getl number of entries in get_path.
1050 * @param get_path Peers on reply path (or NULL if not recorded).
1051 * @param desired_replication_level Desired replication level. 1073 * @param desired_replication_level Desired replication level.
1052 * @param type Type of the result/request. 1074 * @param key Key of the requested data.
1075 */
1076void
1077GDS_CLIENTS_process_get (uint32_t options,
1078 enum GNUNET_BLOCK_Type type,
1079 uint32_t hop_count,
1080 uint32_t desired_replication_level,
1081 unsigned int path_length,
1082 const struct GNUNET_PeerIdentity *path,
1083 const GNUNET_HashCode * key)
1084{
1085 struct ClientMonitorRecord *m;
1086 struct ClientList **cl;
1087 unsigned int cl_size;
1088
1089 cl = NULL;
1090 cl_size = 0;
1091 for (m = monitor_head; NULL != m; m = m->next)
1092 {
1093 if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1094 (NULL == m->key ||
1095 memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0))
1096 {
1097 struct PendingMessage *pm;
1098 struct GNUNET_DHT_MonitorGetMessage *mmsg;
1099 struct GNUNET_PeerIdentity *msg_path;
1100 size_t msize;
1101 unsigned int i;
1102
1103 /* Don't send duplicates */
1104 for (i = 0; i < cl_size; i++)
1105 if (cl[i] == m->client)
1106 break;
1107 if (i < cl_size)
1108 continue;
1109 GNUNET_array_append (cl, cl_size, m->client);
1110
1111 msize = path_length * sizeof (struct GNUNET_PeerIdentity);
1112 msize += sizeof (struct GNUNET_DHT_MonitorGetMessage);
1113 msize += sizeof (struct PendingMessage);
1114 pm = (struct PendingMessage *) GNUNET_malloc (msize);
1115 mmsg = (struct GNUNET_DHT_MonitorGetMessage *) &pm[1];
1116 pm->msg = (struct GNUNET_MessageHeader *) mmsg;
1117 mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
1118 mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
1119 mmsg->options = htonl(options);
1120 mmsg->type = htonl(type);
1121 mmsg->hop_count = htonl(hop_count);
1122 mmsg->desired_replication_level = htonl(desired_replication_level);
1123 mmsg->get_path_length = htonl(path_length);
1124 memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode));
1125 msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1126 if (path_length > 0)
1127 memcpy (msg_path, path,
1128 path_length * sizeof (struct GNUNET_PeerIdentity));
1129 add_pending_message (m->client, pm);
1130 }
1131 }
1132 GNUNET_free_non_null (cl);
1133}
1134
1135
1136/**
1137 * Check if some client is monitoring GET RESP messages and notify
1138 * them in that case.
1139 *
1140 * @param type The type of data in the result.
1141 * @param get_path Peers on GET path (or NULL if not recorded).
1142 * @param get_path_length number of entries in get_path.
1143 * @param put_path peers on the PUT path (or NULL if not recorded).
1144 * @param put_path_length number of entries in get_path.
1145 * @param exp Expiration time of the data.
1146 * @param key Key of the data.
1053 * @param data Pointer to the result data. 1147 * @param data Pointer to the result data.
1054 * @param size Number of bytes in data. 1148 * @param size Number of bytes in data.
1055 */ 1149 */
1056void 1150void
1057GDS_CLIENTS_process_monitor (uint16_t mtype, 1151GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
1058 const struct GNUNET_TIME_Absolute exp, 1152 const struct GNUNET_PeerIdentity *get_path,
1059 const GNUNET_HashCode *key, 1153 unsigned int get_path_length,
1060 uint32_t putl, 1154 const struct GNUNET_PeerIdentity *put_path,
1061 const struct GNUNET_PeerIdentity *put_path, 1155 unsigned int put_path_length,
1062 uint32_t getl, 1156 struct GNUNET_TIME_Absolute exp,
1063 const struct GNUNET_PeerIdentity *get_path, 1157 const GNUNET_HashCode * key,
1064 uint32_t desired_replication_level, 1158 const void *data,
1065 enum GNUNET_BLOCK_Type type, 1159 size_t size)
1066 const struct GNUNET_MessageHeader *data,
1067 uint16_t size)
1068{ 1160{
1069 struct ClientMonitorRecord *m; 1161 struct ClientMonitorRecord *m;
1070 struct ClientList **cl; 1162 struct ClientList **cl;
@@ -1079,7 +1171,7 @@ GDS_CLIENTS_process_monitor (uint16_t mtype,
1079 memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0)) 1171 memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0))
1080 { 1172 {
1081 struct PendingMessage *pm; 1173 struct PendingMessage *pm;
1082 struct GNUNET_DHT_MonitorMessage *mmsg; 1174 struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
1083 struct GNUNET_PeerIdentity *path; 1175 struct GNUNET_PeerIdentity *path;
1084 size_t msize; 1176 size_t msize;
1085 unsigned int i; 1177 unsigned int i;
@@ -1093,29 +1185,116 @@ GDS_CLIENTS_process_monitor (uint16_t mtype,
1093 GNUNET_array_append (cl, cl_size, m->client); 1185 GNUNET_array_append (cl, cl_size, m->client);
1094 1186
1095 msize = size; 1187 msize = size;
1096 msize += (getl + putl) * sizeof (struct GNUNET_PeerIdentity); 1188 msize += (get_path_length + put_path_length)
1097 msize += sizeof (struct GNUNET_DHT_MonitorMessage); 1189 * sizeof (struct GNUNET_PeerIdentity);
1190 msize += sizeof (struct GNUNET_DHT_MonitorGetRespMessage);
1098 msize += sizeof (struct PendingMessage); 1191 msize += sizeof (struct PendingMessage);
1099 pm = (struct PendingMessage *) GNUNET_malloc (msize); 1192 pm = (struct PendingMessage *) GNUNET_malloc (msize);
1100 mmsg = (struct GNUNET_DHT_MonitorMessage *) &pm[1]; 1193 mmsg = (struct GNUNET_DHT_MonitorGetRespMessage *) &pm[1];
1101 pm->msg = (struct GNUNET_MessageHeader *) mmsg; 1194 pm->msg = (struct GNUNET_MessageHeader *) mmsg;
1102 mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); 1195 mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
1103 mmsg->header.type = htons (mtype); 1196 mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
1104 mmsg->expiration = GNUNET_TIME_absolute_hton(exp); 1197 mmsg->type = htonl(type);
1105 memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode)); 1198 mmsg->put_path_length = htonl(put_path_length);
1106 mmsg->put_path_length = htonl(putl); 1199 mmsg->get_path_length = htonl(get_path_length);
1107 mmsg->get_path_length = htonl(getl);
1108 mmsg->desired_replication_level = htonl (desired_replication_level);
1109 path = (struct GNUNET_PeerIdentity *) &mmsg[1]; 1200 path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1110 if (putl > 0) 1201 if (put_path_length > 0)
1202 {
1203 memcpy (path, put_path,
1204 put_path_length * sizeof (struct GNUNET_PeerIdentity));
1205 path = &path[put_path_length];
1206 }
1207 if (get_path_length > 0)
1208 memcpy (path, get_path,
1209 get_path_length * sizeof (struct GNUNET_PeerIdentity));
1210 mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
1211 memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode));
1212 if (size > 0)
1213 memcpy (&path[get_path_length], data, size);
1214 add_pending_message (m->client, pm);
1215 }
1216 }
1217 GNUNET_free_non_null (cl);
1218}
1219
1220
1221/**
1222 * Check if some client is monitoring PUT messages and notify
1223 * them in that case.
1224 *
1225 * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1226 * @param type The type of data in the request.
1227 * @param hop_count Hop count so far.
1228 * @param path_length number of entries in path (or 0 if not recorded).
1229 * @param path peers on the PUT path (or NULL if not recorded).
1230 * @param desired_replication_level Desired replication level.
1231 * @param exp Expiration time of the data.
1232 * @param key Key under which data is to be stored.
1233 * @param data Pointer to the data carried.
1234 * @param size Number of bytes in data.
1235 */
1236void
1237GDS_CLIENTS_process_put (uint32_t options,
1238 enum GNUNET_BLOCK_Type type,
1239 uint32_t hop_count,
1240 uint32_t desired_replication_level,
1241 unsigned int path_length,
1242 const struct GNUNET_PeerIdentity *path,
1243 struct GNUNET_TIME_Absolute exp,
1244 const GNUNET_HashCode * key,
1245 const void *data,
1246 size_t size)
1247{
1248 struct ClientMonitorRecord *m;
1249 struct ClientList **cl;
1250 unsigned int cl_size;
1251
1252 cl = NULL;
1253 cl_size = 0;
1254 for (m = monitor_head; NULL != m; m = m->next)
1255 {
1256 if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1257 (NULL == m->key ||
1258 memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0))
1259 {
1260 struct PendingMessage *pm;
1261 struct GNUNET_DHT_MonitorPutMessage *mmsg;
1262 struct GNUNET_PeerIdentity *msg_path;
1263 size_t msize;
1264 unsigned int i;
1265
1266 /* Don't send duplicates */
1267 for (i = 0; i < cl_size; i++)
1268 if (cl[i] == m->client)
1269 break;
1270 if (i < cl_size)
1271 continue;
1272 GNUNET_array_append (cl, cl_size, m->client);
1273
1274 msize = size;
1275 msize += path_length * sizeof (struct GNUNET_PeerIdentity);
1276 msize += sizeof (struct GNUNET_DHT_MonitorPutMessage);
1277 msize += sizeof (struct PendingMessage);
1278 pm = (struct PendingMessage *) GNUNET_malloc (msize);
1279 mmsg = (struct GNUNET_DHT_MonitorPutMessage *) &pm[1];
1280 pm->msg = (struct GNUNET_MessageHeader *) mmsg;
1281 mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
1282 mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
1283 mmsg->options = htonl(options);
1284 mmsg->type = htonl(type);
1285 mmsg->hop_count = htonl(hop_count);
1286 mmsg->desired_replication_level = htonl(desired_replication_level);
1287 mmsg->put_path_length = htonl(path_length);
1288 msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1289 if (path_length > 0)
1111 { 1290 {
1112 memcpy (path, put_path, putl * sizeof (struct GNUNET_PeerIdentity)); 1291 memcpy (msg_path, path,
1113 path = &path[putl]; 1292 path_length * sizeof (struct GNUNET_PeerIdentity));
1114 } 1293 }
1115 if (getl > 0) 1294 mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
1116 memcpy (path, get_path, getl * sizeof (struct GNUNET_PeerIdentity)); 1295 memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode));
1117 if (size > 0) 1296 if (size > 0)
1118 memcpy (&path[getl], data, size); 1297 memcpy (&msg_path[path_length], data, size);
1119 add_pending_message (m->client, pm); 1298 add_pending_message (m->client, pm);
1120 } 1299 }
1121 } 1300 }
@@ -1140,8 +1319,8 @@ GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server)
1140 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, 1319 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP,
1141 sizeof (struct GNUNET_DHT_ClientGetStopMessage)}, 1320 sizeof (struct GNUNET_DHT_ClientGetStopMessage)},
1142 {&handle_dht_local_monitor, NULL, 1321 {&handle_dht_local_monitor, NULL,
1143 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET, 1322 GNUNET_MESSAGE_TYPE_DHT_MONITOR_START,
1144 sizeof (struct GNUNET_DHT_MonitorMessage)}, 1323 sizeof (struct GNUNET_DHT_MonitorStartMessage)},
1145 {NULL, NULL, 0, 0} 1324 {NULL, NULL, 0, 0}
1146 }; 1325 };
1147 forward_map = GNUNET_CONTAINER_multihashmap_create (1024); 1326 forward_map = GNUNET_CONTAINER_multihashmap_create (1024);
diff --git a/src/dht/gnunet-service-dht_clients.h b/src/dht/gnunet-service-dht_clients.h
index a477456a4..9f3d2dd83 100644
--- a/src/dht/gnunet-service-dht_clients.h
+++ b/src/dht/gnunet-service-dht_clients.h
@@ -57,33 +57,77 @@ GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
57 57
58 58
59/** 59/**
60 * Check if some client is monitoring messages of this type and notify 60 * Check if some client is monitoring GET messages and notify
61 * him in that case. 61 * them in that case.
62 * 62 *
63 * @param mtype Type of the DHT message. 63 * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
64 * @param exp When will this value expire. 64 * @param type The type of data in the request.
65 * @param key Key of the result/request. 65 * @param hop_count Hop count so far.
66 * @param putl number of entries in get_path. 66 * @param path_length number of entries in path (or 0 if not recorded).
67 * @param put_path peers on the PUT path (or NULL if not recorded). 67 * @param path peers on the GET path (or NULL if not recorded).
68 * @param getl number of entries in get_path.
69 * @param get_path Peers on reply path (or NULL if not recorded).
70 * @param desired_replication_level Desired replication level. 68 * @param desired_replication_level Desired replication level.
71 * @param type Type of the result/request. 69 * @param key Key of the requested data.
70 */
71void
72GDS_CLIENTS_process_get (uint32_t options,
73 enum GNUNET_BLOCK_Type type,
74 uint32_t hop_count,
75 uint32_t desired_replication_level,
76 unsigned int path_length,
77 const struct GNUNET_PeerIdentity *path,
78 const GNUNET_HashCode * key);
79
80/**
81 * Check if some client is monitoring GET RESP messages and notify
82 * them in that case.
83 *
84 * @param type The type of data in the result.
85 * @param get_path Peers on GET path (or NULL if not recorded).
86 * @param get_path_length number of entries in get_path.
87 * @param put_path peers on the PUT path (or NULL if not recorded).
88 * @param put_path_length number of entries in get_path.
89 * @param exp Expiration time of the data.
90 * @param key Key of the data.
72 * @param data Pointer to the result data. 91 * @param data Pointer to the result data.
73 * @param size Number of bytes in data. 92 * @param size Number of bytes in data.
74 */ 93 */
75void 94void
76GDS_CLIENTS_process_monitor (uint16_t mtype, 95GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
77 const struct GNUNET_TIME_Absolute exp, 96 const struct GNUNET_PeerIdentity *get_path,
78 const GNUNET_HashCode *key, 97 unsigned int get_path_length,
79 uint32_t putl, 98 const struct GNUNET_PeerIdentity *put_path,
80 const struct GNUNET_PeerIdentity *put_path, 99 unsigned int put_path_length,
81 uint32_t getl, 100 struct GNUNET_TIME_Absolute exp,
82 const struct GNUNET_PeerIdentity *get_path, 101 const GNUNET_HashCode * key,
83 uint32_t desired_replication_level, 102 const void *data,
84 enum GNUNET_BLOCK_Type type, 103 size_t size);
85 const struct GNUNET_MessageHeader *data, 104
86 uint16_t size); 105/**
106 * Check if some client is monitoring PUT messages and notify
107 * them in that case.
108 *
109 * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
110 * @param type The type of data in the request.
111 * @param hop_count Hop count so far.
112 * @param path_length number of entries in path (or 0 if not recorded).
113 * @param path peers on the PUT path (or NULL if not recorded).
114 * @param desired_replication_level Desired replication level.
115 * @param exp Expiration time of the data.
116 * @param key Key under which data is to be stored.
117 * @param data Pointer to the data carried.
118 * @param size Number of bytes in data.
119 */
120void
121GDS_CLIENTS_process_put (uint32_t options,
122 enum GNUNET_BLOCK_Type type,
123 uint32_t hop_count,
124 uint32_t desired_replication_level,
125 unsigned int path_length,
126 const struct GNUNET_PeerIdentity *path,
127 struct GNUNET_TIME_Absolute exp,
128 const GNUNET_HashCode * key,
129 const void *data,
130 size_t size);
87 131
88/** 132/**
89 * Initialize client subsystem. 133 * Initialize client subsystem.
diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c
index e7c1fbee3..cfcedcc92 100644
--- a/src/dht/gnunet-service-dht_neighbours.c
+++ b/src/dht/gnunet-service-dht_neighbours.c
@@ -1617,10 +1617,15 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
1617 pp, payload, payload_size); 1617 pp, payload, payload_size);
1618 } 1618 }
1619 GNUNET_CONTAINER_bloomfilter_free (bf); 1619 GNUNET_CONTAINER_bloomfilter_free (bf);
1620 GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT, 1620 GDS_CLIENTS_process_put (options,
1621 GNUNET_TIME_absolute_ntoh (put->expiration_time), &put->key, 1621 ntohl (put->type),
1622 putlen, put_path, 0, NULL, ntohl(put->desired_replication_level), 1622 ntohl (put->hop_count),
1623 ntohl (put->type), payload, payload_size); 1623 ntohl (put->desired_replication_level),
1624 putlen, put_path,
1625 GNUNET_TIME_absolute_ntoh (put->expiration_time),
1626 &put->key,
1627 payload,
1628 payload_size);
1624 return GNUNET_YES; 1629 return GNUNET_YES;
1625} 1630}
1626 1631
@@ -1827,9 +1832,12 @@ handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1827 } 1832 }
1828 1833
1829 /* FIXME Path */ 1834 /* FIXME Path */
1830 GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET, 1835 GDS_CLIENTS_process_get (options,
1831 GNUNET_TIME_UNIT_FOREVER_ABS, &get->key, 0, NULL, 0, NULL, 1836 type,
1832 ntohl (get->desired_replication_level), type, NULL, 0); 1837 ntohl(get->hop_count),
1838 ntohl(get->desired_replication_level),
1839 0, NULL,
1840 &get->key);
1833 1841
1834 /* P2P forwarding */ 1842 /* P2P forwarding */
1835 if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST) 1843 if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
@@ -1963,10 +1971,16 @@ handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1963 xget_path, data, data_size); 1971 xget_path, data, data_size);
1964 } 1972 }
1965 1973
1966 GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP, 1974 GDS_CLIENTS_process_get_resp (type,
1967 GNUNET_TIME_absolute_ntoh (prm->expiration_time), &prm->key, 1975 get_path,
1968 put_path_length, put_path, get_path_length, get_path, 1976 get_path_length,
1969 0, type, data, data_size); 1977 put_path,
1978 put_path_length,
1979 GNUNET_TIME_absolute_ntoh (
1980 prm->expiration_time),
1981 &prm->key,
1982 data,
1983 data_size);
1970 1984
1971 return GNUNET_YES; 1985 return GNUNET_YES;
1972} 1986}