aboutsummaryrefslogtreecommitdiff
path: root/src/dht/dht_api.c
diff options
context:
space:
mode:
authorBart Polot <bart@net.in.tum.de>2012-04-20 17:18:14 +0000
committerBart Polot <bart@net.in.tum.de>2012-04-20 17:18:14 +0000
commit90c9abc573f95de334a1f61faa089e79046d2f11 (patch)
tree89b596897cf1a17ea9c048748ec063b063f8d634 /src/dht/dht_api.c
parent6c889a1786be40c0d023e8971411bc327af352c6 (diff)
downloadgnunet-90c9abc573f95de334a1f61faa089e79046d2f11.tar.gz
gnunet-90c9abc573f95de334a1f61faa089e79046d2f11.zip
- Rewritten DHT monitoring
Diffstat (limited to 'src/dht/dht_api.c')
-rw-r--r--src/dht/dht_api.c238
1 files changed, 200 insertions, 38 deletions
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c
index 1030b7999..3f850b3e7 100644
--- a/src/dht/dht_api.c
+++ b/src/dht/dht_api.c
@@ -171,9 +171,19 @@ struct GNUNET_DHT_MonitorHandle
171 GNUNET_HashCode *key; 171 GNUNET_HashCode *key;
172 172
173 /** 173 /**
174 * Callback for each received message of interest. 174 * Callback for each received message of type get.
175 */ 175 */
176 GNUNET_DHT_MonitorCB cb; 176 GNUNET_DHT_MonitorGetCB get_cb;
177
178 /**
179 * Callback for each received message of type get response.
180 */
181 GNUNET_DHT_MonitorGetRespCB get_resp_cb;
182
183 /**
184 * Callback for each received message of type put.
185 */
186 GNUNET_DHT_MonitorPutCB put_cb;
177 187
178 /** 188 /**
179 * Closure for cb. 189 * Closure for cb.
@@ -533,63 +543,205 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value)
533 return GNUNET_YES; 543 return GNUNET_YES;
534} 544}
535 545
536
537/** 546/**
538 * Process a monitoring message from the service. 547 * Process a get monitor message from the service.
539 * 548 *
540 * @param handle The DHT handle. 549 * @param handle The DHT handle.
541 * @param msg Message from the service. 550 * @param msg Monitor get message from the service.
542 * 551 *
543 * @return GNUNET_OK if everything went fine, 552 * @return GNUNET_OK if everything went fine,
544 * GNUNET_SYSERR if the message is malformed. 553 * GNUNET_SYSERR if the message is malformed.
545 */ 554 */
546static int 555static int
547process_monitor_message (struct GNUNET_DHT_Handle *handle, 556process_monitor_get_message (struct GNUNET_DHT_Handle *handle,
548 const struct GNUNET_MessageHeader *msg) 557 const struct GNUNET_DHT_MonitorGetMessage *msg)
549{ 558{
550 struct GNUNET_DHT_MonitorMessage *m;
551 struct GNUNET_DHT_MonitorHandle *h; 559 struct GNUNET_DHT_MonitorHandle *h;
552 size_t msize; 560 size_t msize;
553 561
554 if (ntohs (msg->type) < GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET || 562 msize = ntohs (msg->header.size);
555 ntohs (msg->type) > GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT) 563 if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage))
556 return GNUNET_SYSERR; 564 return GNUNET_SYSERR;
557 msize = ntohs (msg->size); 565
558 if (msize < sizeof (struct GNUNET_DHT_MonitorMessage)) 566 h = handle->monitor_head;
567 while (NULL != h)
568 {
569 int type_ok;
570 int key_ok;
571
572 type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type);
573 key_ok = NULL == h->key || memcmp (h->key, &msg->key,
574 sizeof (GNUNET_HashCode)) == 0;
575 if (type_ok && key_ok && NULL != h->get_cb)
576 {
577 h->get_cb (h->cb_cls,
578 ntohl (msg->options),
579 (enum GNUNET_BLOCK_Type) ntohl(msg->type),
580 ntohl (msg->hop_count),
581 ntohl (msg->desired_replication_level),
582 ntohl (msg->get_path_length),
583 (struct GNUNET_PeerIdentity *) &msg[1],
584 &msg->key);
585 }
586 h = h->next;
587 }
588 return GNUNET_OK;
589}
590
591
592/**
593 * Process a get response monitor message from the service.
594 *
595 * @param handle The DHT handle.
596 * @param msg Monitor get response message from the service.
597 *
598 * @return GNUNET_OK if everything went fine,
599 * GNUNET_SYSERR if the message is malformed.
600 */
601static int
602process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle,
603 const struct GNUNET_DHT_MonitorGetRespMessage
604 *msg)
605{
606 struct GNUNET_DHT_MonitorHandle *h;
607 size_t msize;
608
609 msize = ntohs (msg->header.size);
610 if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage))
559 return GNUNET_SYSERR; 611 return GNUNET_SYSERR;
560 612
561 m = (struct GNUNET_DHT_MonitorMessage *) msg;
562 h = handle->monitor_head; 613 h = handle->monitor_head;
563 while (NULL != h) 614 while (NULL != h)
564 { 615 {
565 if (h->type == ntohl(m->type) && 616 int type_ok;
566 (NULL == h->key || 617 int key_ok;
567 memcmp (h->key, &m->key, sizeof (GNUNET_HashCode)) == 0)) 618
619 type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type);
620 key_ok = NULL == h->key || memcmp (h->key, &msg->key,
621 sizeof (GNUNET_HashCode)) == 0;
622 if (type_ok && key_ok && NULL != h->get_resp_cb)
568 { 623 {
569 struct GNUNET_PeerIdentity *path; 624 struct GNUNET_PeerIdentity *path;
570 uint32_t getl; 625 uint32_t getl;
571 uint32_t putl; 626 uint32_t putl;
572 627
573 path = (struct GNUNET_PeerIdentity *) &m[1]; 628 path = (struct GNUNET_PeerIdentity *) &msg[1];
574 getl = ntohl (m->get_path_length); 629 getl = ntohl (msg->get_path_length);
575 putl = ntohl (m->put_path_length); 630 putl = ntohl (msg->put_path_length);
576 h->cb (h->cb_cls, ntohs(msg->type), 631 h->get_resp_cb (h->cb_cls,
577 GNUNET_TIME_absolute_ntoh(m->expiration), 632 (enum GNUNET_BLOCK_Type) ntohl(msg->type),
578 &m->key, 633 path, getl,
579 &path[getl], putl, path, getl, 634 &path[getl], putl,
580 ntohl (m->desired_replication_level), 635 GNUNET_TIME_absolute_ntoh(msg->expiration_time),
581 ntohl (m->options), ntohl (m->type), 636 &msg->key,
582 (void *) &path[getl + putl], 637 (void *) &path[getl + putl],
583 ntohs (msg->size) - 638 msize -
584 sizeof (struct GNUNET_DHT_MonitorMessage) - 639 sizeof (struct GNUNET_DHT_MonitorGetRespMessage) -
585 sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); 640 sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
586 } 641 }
587 h = h->next; 642 h = h->next;
588 } 643 }
644 return GNUNET_OK;
645}
646
647
648/**
649 * Process a put monitor message from the service.
650 *
651 * @param handle The DHT handle.
652 * @param msg Monitor put message from the service.
653 *
654 * @return GNUNET_OK if everything went fine,
655 * GNUNET_SYSERR if the message is malformed.
656 */
657static int
658process_monitor_put_message (struct GNUNET_DHT_Handle *handle,
659 const struct GNUNET_DHT_MonitorPutMessage *msg)
660{
661 struct GNUNET_DHT_MonitorHandle *h;
662 size_t msize;
663
664 msize = ntohs (msg->header.size);
665 if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage))
666 return GNUNET_SYSERR;
667
668 h = handle->monitor_head;
669 while (NULL != h)
670 {
671 int type_ok;
672 int key_ok;
673
674 type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type);
675 key_ok = NULL == h->key || memcmp (h->key, &msg->key,
676 sizeof (GNUNET_HashCode)) == 0;
677 if (type_ok && key_ok && NULL != h->put_cb)
678 {
679 struct GNUNET_PeerIdentity *path;
680 uint32_t putl;
589 681
682 path = (struct GNUNET_PeerIdentity *) &msg[1];
683 putl = ntohl (msg->put_path_length);
684 h->put_cb (h->cb_cls,
685 ntohl (msg->options),
686 (enum GNUNET_BLOCK_Type) ntohl(msg->type),
687 ntohl (msg->hop_count),
688 ntohl (msg->desired_replication_level),
689 putl, path,
690 GNUNET_TIME_absolute_ntoh(msg->expiration_time),
691 &msg->key,
692 (void *) &path[putl],
693 msize -
694 sizeof (struct GNUNET_DHT_MonitorPutMessage) -
695 sizeof (struct GNUNET_PeerIdentity) * putl);
696 }
697 h = h->next;
698 }
590 return GNUNET_OK; 699 return GNUNET_OK;
591} 700}
592 701
702
703/**
704 * Process a monitoring message from the service: demultiplex for proper type.
705 *
706 * @param handle The DHT handle.
707 * @param msg Message from the service.
708 *
709 * @return GNUNET_OK if everything went fine,
710 * GNUNET_SYSERR if the message is malformed.
711 */
712static int
713process_monitor_message (struct GNUNET_DHT_Handle *handle,
714 const struct GNUNET_MessageHeader *msg)
715{
716 switch (ntohs (msg->type))
717 {
718 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET:
719 return process_monitor_get_message(handle,
720 (struct GNUNET_DHT_MonitorGetMessage *)
721 msg);
722
723 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP:
724 {
725 return process_monitor_get_resp_message(
726 handle,
727 (struct GNUNET_DHT_MonitorGetRespMessage *) msg);
728 }
729 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT:
730 {
731 return process_monitor_put_message(handle,
732 (struct GNUNET_DHT_MonitorPutMessage *)
733 msg);
734 }
735 case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP:
736 /* Not implemented yet */
737 GNUNET_break(0);
738 /* Fall through */
739 default:
740 GNUNET_break(0);
741 return GNUNET_SYSERR;
742 }
743}
744
593/** 745/**
594 * Handler for messages received from the DHT service 746 * Handler for messages received from the DHT service
595 * a demultiplexer which handles numerous message types 747 * a demultiplexer which handles numerous message types
@@ -930,7 +1082,9 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
930 * @param handle Handle to the DHT service. 1082 * @param handle Handle to the DHT service.
931 * @param type Type of blocks that are of interest. 1083 * @param type Type of blocks that are of interest.
932 * @param key Key of data of interest, NULL for all. 1084 * @param key Key of data of interest, NULL for all.
933 * @param cb Callback to process all monitored data. 1085 * @param get_cb Callback to process monitored get messages.
1086 * @param get_resp_cb Callback to process monitored get response messages.
1087 * @param put_cb Callback to process monitored put messages.
934 * @param cb_cls Closure for cb. 1088 * @param cb_cls Closure for cb.
935 * 1089 *
936 * @return Handle to stop monitoring. 1090 * @return Handle to stop monitoring.
@@ -939,18 +1093,21 @@ struct GNUNET_DHT_MonitorHandle *
939GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, 1093GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
940 enum GNUNET_BLOCK_Type type, 1094 enum GNUNET_BLOCK_Type type,
941 const GNUNET_HashCode *key, 1095 const GNUNET_HashCode *key,
942 GNUNET_DHT_MonitorCB cb, 1096 GNUNET_DHT_MonitorGetCB get_cb,
1097 GNUNET_DHT_MonitorGetRespCB get_resp_cb,
1098 GNUNET_DHT_MonitorPutCB put_cb,
943 void *cb_cls) 1099 void *cb_cls)
944{ 1100{
945 struct GNUNET_DHT_MonitorHandle *h; 1101 struct GNUNET_DHT_MonitorHandle *h;
946 struct GNUNET_DHT_MonitorMessage *m; 1102 struct GNUNET_DHT_MonitorStartMessage *m;
947 struct PendingMessage *pending; 1103 struct PendingMessage *pending;
948 1104
949 h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle)); 1105 h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle));
950 GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h); 1106 GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h);
951 1107
952 GNUNET_assert (NULL != cb); 1108 h->get_cb = get_cb;
953 h->cb = cb; 1109 h->get_resp_cb = get_resp_cb;
1110 h->put_cb = put_cb;
954 h->cb_cls = cb_cls; 1111 h->cb_cls = cb_cls;
955 h->type = type; 1112 h->type = type;
956 h->dht_handle = handle; 1113 h->dht_handle = handle;
@@ -960,17 +1117,22 @@ GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
960 memcpy (h->key, key, sizeof(GNUNET_HashCode)); 1117 memcpy (h->key, key, sizeof(GNUNET_HashCode));
961 } 1118 }
962 1119
963 pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorMessage) + 1120 pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartMessage) +
964 sizeof (struct PendingMessage)); 1121 sizeof (struct PendingMessage));
965 m = (struct GNUNET_DHT_MonitorMessage *) &pending[1]; 1122 m = (struct GNUNET_DHT_MonitorStartMessage *) &pending[1];
966 pending->msg = &m->header; 1123 pending->msg = &m->header;
967 pending->handle = handle; 1124 pending->handle = handle;
968 pending->free_on_send = GNUNET_YES; 1125 pending->free_on_send = GNUNET_YES;
969 m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET); 1126 m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
970 m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorMessage)); 1127 m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartMessage));
971 m->type = htonl(type); 1128 m->type = htonl(type);
972 if (NULL != key) 1129 m->get = (NULL != get_cb);
1130 m->get_resp = (NULL != get_resp_cb);
1131 m->put = (NULL != put_cb);
1132 if (NULL != key) {
1133 m->filter_key = 1;
973 memcpy (&m->key, key, sizeof(GNUNET_HashCode)); 1134 memcpy (&m->key, key, sizeof(GNUNET_HashCode));
1135 }
974 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, 1136 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
975 pending); 1137 pending);
976 pending->in_pending_queue = GNUNET_YES; 1138 pending->in_pending_queue = GNUNET_YES;