aboutsummaryrefslogtreecommitdiff
path: root/src/rps/rps_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/rps/rps_api.c')
-rw-r--r--src/rps/rps_api.c108
1 files changed, 56 insertions, 52 deletions
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index 420323c4b..e879c0f7e 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -207,14 +207,13 @@ new_stream_request (struct GNUNET_RPS_Handle *rps_handle,
207 struct GNUNET_RPS_StreamRequestHandle *srh; 207 struct GNUNET_RPS_StreamRequestHandle *srh;
208 208
209 srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle); 209 srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle);
210
211 srh->rps_handle = rps_handle; 210 srh->rps_handle = rps_handle;
212 srh->ready_cb = ready_cb; 211 srh->ready_cb = ready_cb;
213 srh->ready_cb_cls = cls; 212 srh->ready_cb_cls = cls;
214 GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head, 213 GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head,
215 rps_handle->stream_requests_tail, 214 rps_handle->stream_requests_tail,
216 srh); 215 srh);
217 216
218 return srh; 217 return srh;
219} 218}
220 219
@@ -223,24 +222,21 @@ new_stream_request (struct GNUNET_RPS_Handle *rps_handle,
223 * @brief Remove the given stream request from the list of requests and memory 222 * @brief Remove the given stream request from the list of requests and memory
224 * 223 *
225 * @param srh The request to be removed 224 * @param srh The request to be removed
226 * @param srh_head Head of the DLL to remove request from
227 * @param srh_tail Tail of the DLL to remove request from
228 */ 225 */
229static void 226static void
230remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh, 227remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh)
231 struct GNUNET_RPS_StreamRequestHandle *srh_head,
232 struct GNUNET_RPS_StreamRequestHandle *srh_tail)
233{ 228{
229 struct GNUNET_RPS_Handle *rps_handle = srh->rps_handle;
230
234 GNUNET_assert (NULL != srh); 231 GNUNET_assert (NULL != srh);
235 if (NULL != srh->callback_task) 232 if (NULL != srh->callback_task)
236 { 233 {
237 GNUNET_SCHEDULER_cancel (srh->callback_task); 234 GNUNET_SCHEDULER_cancel (srh->callback_task);
238 srh->callback_task = NULL; 235 srh->callback_task = NULL;
239 } 236 }
240 GNUNET_CONTAINER_DLL_remove (srh_head, 237 GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head,
241 srh_tail, 238 rps_handle->stream_requests_tail,
242 srh); 239 srh);
243
244 GNUNET_free (srh); 240 GNUNET_free (srh);
245} 241}
246 242
@@ -254,7 +250,7 @@ remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh,
254 * @param num_peers The number of peers that have been returned 250 * @param num_peers The number of peers that have been returned
255 * @param cls The #GNUNET_RPS_Request_Handle 251 * @param cls The #GNUNET_RPS_Request_Handle
256 */ 252 */
257void 253static void
258peers_ready_cb (const struct GNUNET_PeerIdentity *peers, 254peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
259 uint32_t num_peers, 255 uint32_t num_peers,
260 void *cls) 256 void *cls)
@@ -280,7 +276,7 @@ peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
280 * @param num_peers The number of peer that have been returned 276 * @param num_peers The number of peer that have been returned
281 * @param peers The array of @a num_peers that have been returned 277 * @param peers The array of @a num_peers that have been returned
282 */ 278 */
283void 279static void
284collect_peers_cb (void *cls, 280collect_peers_cb (void *cls,
285 uint64_t num_peers, 281 uint64_t num_peers,
286 const struct GNUNET_PeerIdentity *peers) 282 const struct GNUNET_PeerIdentity *peers)
@@ -450,10 +446,9 @@ GNUNET_RPS_stream_cancel (struct GNUNET_RPS_StreamRequestHandle *srh)
450 struct GNUNET_RPS_Handle *rps_handle; 446 struct GNUNET_RPS_Handle *rps_handle;
451 447
452 rps_handle = srh->rps_handle; 448 rps_handle = srh->rps_handle;
453 remove_stream_request (srh, 449 remove_stream_request (srh);
454 rps_handle->stream_requests_head, 450 if (NULL == rps_handle->stream_requests_head)
455 rps_handle->stream_requests_tail); 451 cancel_stream (rps_handle);
456 if (NULL == rps_handle->stream_requests_head) cancel_stream (rps_handle);
457} 452}
458 453
459 454
@@ -524,28 +519,27 @@ handle_stream_input (void *cls,
524 //peers = (struct GNUNET_PeerIdentity *) &msg[1]; 519 //peers = (struct GNUNET_PeerIdentity *) &msg[1];
525 num_peers = ntohl (msg->num_peers); 520 num_peers = ntohl (msg->num_peers);
526 srh_callback_num_peers = num_peers; 521 srh_callback_num_peers = num_peers;
527 if (NULL != srh_callback_peers) GNUNET_free (srh_callback_peers); 522 GNUNET_free_non_null (srh_callback_peers);
528 srh_callback_peers = 523 srh_callback_peers = GNUNET_new_array (num_peers,
529 GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity)); 524 struct GNUNET_PeerIdentity);
530 GNUNET_memcpy (srh_callback_peers, 525 GNUNET_memcpy (srh_callback_peers,
531 &msg[1], 526 &msg[1],
532 num_peers * sizeof (struct GNUNET_PeerIdentity)); 527 num_peers * sizeof (struct GNUNET_PeerIdentity));
533 LOG (GNUNET_ERROR_TYPE_DEBUG, 528 LOG (GNUNET_ERROR_TYPE_DEBUG,
534 "Received %" PRIu64 " peer(s) from stream input.\n", 529 "Received %" PRIu64 " peer(s) from stream input.\n",
535 num_peers); 530 num_peers);
536 srh_iter = h->stream_requests_head; 531 for (srh_iter = h->stream_requests_head;
537 while (NULL != srh_iter) 532 NULL != srh_iter;
533 srh_iter = srh_next)
538 { 534 {
539 LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n"); 535 LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n");
540 /* Store next pointer - srh might be removed/freed in callback */ 536 /* Store next pointer - srh might be removed/freed in callback */
541 srh_next = srh_iter->next; 537 srh_next = srh_iter->next;
542 if (NULL != srh_iter->callback_task) 538 if (NULL != srh_iter->callback_task)
543 {
544 GNUNET_SCHEDULER_cancel (srh_iter->callback_task); 539 GNUNET_SCHEDULER_cancel (srh_iter->callback_task);
545 }
546 srh_iter->callback_task = 540 srh_iter->callback_task =
547 GNUNET_SCHEDULER_add_now (srh_callback_scheduled, srh_iter); 541 GNUNET_SCHEDULER_add_now (&srh_callback_scheduled,
548 srh_iter = srh_next; 542 srh_iter);
549 } 543 }
550 544
551 if (NULL == h->stream_requests_head) 545 if (NULL == h->stream_requests_head)
@@ -581,6 +575,7 @@ mq_error_handler (void *cls,
581 1: READ,\n\ 575 1: READ,\n\
582 2: WRITE,\n\ 576 2: WRITE,\n\
583 4: TIMEOUT\n", 577 4: TIMEOUT\n",
578 // TODO: write GNUNET_MQ_strerror (error)
584 error); 579 error);
585 reconnect (h); 580 reconnect (h);
586 /* Resend all pending request as the service destroyed its knowledge 581 /* Resend all pending request as the service destroyed its knowledge
@@ -592,17 +587,20 @@ mq_error_handler (void *cls,
592 * @brief Create the hash value from the share value that defines the sub 587 * @brief Create the hash value from the share value that defines the sub
593 * (-group) 588 * (-group)
594 * 589 *
595 * @param share_val Share value - strings longer than 508 (512 - 4) will be 590 * @param share_val Share value
596 * truncated. 591 * @param hash[out] Pointer to the location in which the hash will be stored.
597 * @param hash Pointer to the location in which the hash will be stored.
598 */ 592 */
599static void 593static void
600hash_from_share_val (const char *share_val, struct GNUNET_HashCode *hash) 594hash_from_share_val (const char *share_val,
595 struct GNUNET_HashCode *hash)
601{ 596{
602 char hash_port_string[512] = "rps"; 597 GNUNET_CRYPTO_kdf (hash,
603 598 sizeof (struct GNUNET_HashCode),
604 (void) strncat (hash_port_string, share_val, 508); 599 "rps",
605 GNUNET_CRYPTO_hash (hash_port_string, strlen (hash_port_string), hash); 600 strlen ("rps"),
601 share_val,
602 strlen (share_val),
603 NULL, 0);
606} 604}
607 605
608 606
@@ -757,12 +755,10 @@ GNUNET_RPS_seed_ids (struct GNUNET_RPS_Handle *h,
757 struct GNUNET_MQ_Envelope *ev; 755 struct GNUNET_MQ_Envelope *ev;
758 struct GNUNET_RPS_CS_SeedMessage *msg; 756 struct GNUNET_RPS_CS_SeedMessage *msg;
759 757
760 unsigned int i;
761
762 LOG (GNUNET_ERROR_TYPE_DEBUG, 758 LOG (GNUNET_ERROR_TYPE_DEBUG,
763 "Client wants to seed %" PRIu32 " peers:\n", 759 "Client wants to seed %" PRIu32 " peers:\n",
764 n); 760 n);
765 for (i = 0 ; i < n ; i++) 761 for (unsigned int i = 0 ; i < n ; i++)
766 LOG (GNUNET_ERROR_TYPE_DEBUG, 762 LOG (GNUNET_ERROR_TYPE_DEBUG,
767 "%u. peer: %s\n", 763 "%u. peer: %s\n",
768 i, 764 i,
@@ -780,12 +776,15 @@ GNUNET_RPS_seed_ids (struct GNUNET_RPS_Handle *h,
780 776
781 while (GNUNET_MAX_MESSAGE_SIZE < size_needed) 777 while (GNUNET_MAX_MESSAGE_SIZE < size_needed)
782 { 778 {
783 ev = GNUNET_MQ_msg_extra (msg, num_peers_max * sizeof (struct GNUNET_PeerIdentity), 779 ev = GNUNET_MQ_msg_extra (msg,
784 GNUNET_MESSAGE_TYPE_RPS_CS_SEED); 780 num_peers_max * sizeof (struct GNUNET_PeerIdentity),
781 GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
785 msg->num_peers = htonl (num_peers_max); 782 msg->num_peers = htonl (num_peers_max);
786 GNUNET_memcpy (&msg[1], tmp_peer_pointer, num_peers_max * sizeof (struct GNUNET_PeerIdentity)); 783 GNUNET_memcpy (&msg[1],
787 GNUNET_MQ_send (h->mq, ev); 784 tmp_peer_pointer,
788 785 num_peers_max * sizeof (struct GNUNET_PeerIdentity));
786 GNUNET_MQ_send (h->mq,
787 ev);
789 n -= num_peers_max; 788 n -= num_peers_max;
790 size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) + 789 size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) +
791 n * sizeof (struct GNUNET_PeerIdentity); 790 n * sizeof (struct GNUNET_PeerIdentity);
@@ -793,12 +792,15 @@ GNUNET_RPS_seed_ids (struct GNUNET_RPS_Handle *h,
793 tmp_peer_pointer = &ids[num_peers_max]; 792 tmp_peer_pointer = &ids[num_peers_max];
794 } 793 }
795 794
796 ev = GNUNET_MQ_msg_extra (msg, n * sizeof (struct GNUNET_PeerIdentity), 795 ev = GNUNET_MQ_msg_extra (msg,
796 n * sizeof (struct GNUNET_PeerIdentity),
797 GNUNET_MESSAGE_TYPE_RPS_CS_SEED); 797 GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
798 msg->num_peers = htonl (n); 798 msg->num_peers = htonl (n);
799 GNUNET_memcpy (&msg[1], tmp_peer_pointer, n * sizeof (struct GNUNET_PeerIdentity)); 799 GNUNET_memcpy (&msg[1],
800 800 tmp_peer_pointer,
801 GNUNET_MQ_send (h->mq, ev); 801 n * sizeof (struct GNUNET_PeerIdentity));
802 GNUNET_MQ_send (h->mq,
803 ev);
802} 804}
803 805
804 806
@@ -886,7 +888,9 @@ GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h,
886 if ( (2 == type) || 888 if ( (2 == type) ||
887 (3 == type) ) 889 (3 == type) )
888 msg->attacked_peer = *target_peer; 890 msg->attacked_peer = *target_peer;
889 GNUNET_memcpy (&msg[1], tmp_peer_pointer, num_peers * sizeof (struct GNUNET_PeerIdentity)); 891 GNUNET_memcpy (&msg[1],
892 tmp_peer_pointer,
893 num_peers * sizeof (struct GNUNET_PeerIdentity));
890 894
891 GNUNET_MQ_send (h->mq, ev); 895 GNUNET_MQ_send (h->mq, ev);
892} 896}
@@ -928,16 +932,16 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
928{ 932{
929 if (NULL != h->stream_requests_head) 933 if (NULL != h->stream_requests_head)
930 { 934 {
931 struct GNUNET_RPS_StreamRequestHandle *srh_iter; 935 struct GNUNET_RPS_StreamRequestHandle *srh_next;
932 936
933 LOG (GNUNET_ERROR_TYPE_WARNING, 937 LOG (GNUNET_ERROR_TYPE_WARNING,
934 "Still waiting for replies\n"); 938 "Still waiting for replies\n");
935 srh_iter = h->stream_requests_head; 939 for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head;
936 while (NULL != srh_iter) 940 NULL != srh_iter;
941 srh_iter = srh_next)
937 { 942 {
938 struct GNUNET_RPS_StreamRequestHandle *srh_tmp = srh_iter; 943 srh_next = srh_iter->next;
939 srh_iter = srh_iter->next; 944 GNUNET_RPS_stream_cancel (srh_iter);
940 GNUNET_RPS_stream_cancel (srh_tmp);
941 } 945 }
942 } 946 }
943 if (NULL != srh_callback_peers) 947 if (NULL != srh_callback_peers)