aboutsummaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/gnunet-service-tng.c41
-rw-r--r--src/transport/transport.h5
-rw-r--r--src/transport/transport_api2_communication.c14
3 files changed, 40 insertions, 20 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index c04e33624..cdaf6ff3a 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -4438,8 +4438,8 @@ queue_send_msg (struct Queue *queue,
4438 env = GNUNET_MQ_msg_extra (smt, 4438 env = GNUNET_MQ_msg_extra (smt,
4439 payload_size, 4439 payload_size,
4440 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG); 4440 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
4441 smt->qid = queue->qid; 4441 smt->qid = htonl (queue->qid);
4442 smt->mid = queue->mid_gen; 4442 smt->mid = GNUNET_htonll (queue->mid_gen);
4443 smt->receiver = n->pid; 4443 smt->receiver = n->pid;
4444 memcpy (&smt[1], payload, payload_size); 4444 memcpy (&smt[1], payload, payload_size);
4445 { 4445 {
@@ -4447,7 +4447,13 @@ queue_send_msg (struct Queue *queue,
4447 struct QueueEntry *qe; 4447 struct QueueEntry *qe;
4448 4448
4449 qe = GNUNET_new (struct QueueEntry); 4449 qe = GNUNET_new (struct QueueEntry);
4450 qe->mid = queue->mid_gen++; 4450 qe->mid = queue->mid_gen;
4451 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4452 "Create QueueEntry with MID %lu and QID %lu and prefix %s\n",
4453 qe->mid,
4454 queue->qid,
4455 queue->tc->details.communicator.address_prefix);
4456 queue->mid_gen++;
4451 qe->queue = queue; 4457 qe->queue = queue;
4452 if (NULL != pm) 4458 if (NULL != pm)
4453 { 4459 {
@@ -4460,7 +4466,7 @@ queue_send_msg (struct Queue *queue,
4460 "Retransmitting message <%llu> remove pm from qe with MID: %llu \n", 4466 "Retransmitting message <%llu> remove pm from qe with MID: %llu \n",
4461 pm->logging_uuid, 4467 pm->logging_uuid,
4462 (unsigned long long) pm->qe->mid); 4468 (unsigned long long) pm->qe->mid);
4463 // pm->qe->pm = NULL; 4469 pm->qe->pm = NULL;
4464 } 4470 }
4465 pm->qe = qe; 4471 pm->qe = qe;
4466 } 4472 }
@@ -4491,7 +4497,7 @@ queue_send_msg (struct Queue *queue,
4491 queue->idle = GNUNET_NO; 4497 queue->idle = GNUNET_NO;
4492 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 4498 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4493 "Sending message MID %lu of type %u (%u) and size %lu with MQ %p\n", 4499 "Sending message MID %lu of type %u (%u) and size %lu with MQ %p\n",
4494 smt->mid, 4500 GNUNET_ntohll (smt->mid),
4495 ntohs (((const struct GNUNET_MessageHeader *) payload)->type), 4501 ntohs (((const struct GNUNET_MessageHeader *) payload)->type),
4496 ntohs (smt->header.size), 4502 ntohs (smt->header.size),
4497 payload_size, 4503 payload_size,
@@ -10075,7 +10081,7 @@ handle_del_queue_message (void *cls,
10075 { 10081 {
10076 struct Neighbour *neighbour = queue->neighbour; 10082 struct Neighbour *neighbour = queue->neighbour;
10077 10083
10078 if ((dqm->qid != queue->qid) || 10084 if ((ntohl (dqm->qid) != queue->qid) ||
10079 (0 != GNUNET_memcmp (&dqm->receiver, &neighbour->pid))) 10085 (0 != GNUNET_memcmp (&dqm->receiver, &neighbour->pid)))
10080 continue; 10086 continue;
10081 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 10087 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -10130,13 +10136,13 @@ handle_send_message_ack (void *cls,
10130 for (struct QueueEntry *qep = queue->queue_head; NULL != qep; 10136 for (struct QueueEntry *qep = queue->queue_head; NULL != qep;
10131 qep = qep->next) 10137 qep = qep->next)
10132 { 10138 {
10133 if (qep->mid != sma->mid) 10139 if (qep->mid != GNUNET_ntohll (sma->mid) && queue->qid != ntohl (sma->qid))
10134 continue; 10140 continue;
10135 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 10141 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
10136 "QueueEntry MID: %llu on queue QID: %llu, Ack MID: %llu\n", 10142 "QueueEntry MID: %llu on queue QID: %llu, Ack MID: %llu\n",
10137 (unsigned long long) qep->mid, 10143 (unsigned long long) qep->mid,
10138 (unsigned long long) queue->qid, 10144 (unsigned long) queue->qid,
10139 (unsigned long long) sma->mid); 10145 GNUNET_ntohll (sma->mid));
10140 qe = qep; 10146 qe = qep;
10141 if ((NULL != qe->pm) && (qe->pm->qe != qe)) 10147 if ((NULL != qe->pm) && (qe->pm->qe != qe))
10142 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 10148 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -10148,8 +10154,9 @@ handle_send_message_ack (void *cls,
10148 if (NULL == qe) 10154 if (NULL == qe)
10149 { 10155 {
10150 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 10156 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
10151 "No QueueEntry found for Ack MID %llu\n", 10157 "No QueueEntry found for Ack MID %llu QID: %llu\n",
10152 (unsigned long long) sma->mid); 10158 (unsigned long long) GNUNET_ntohll (sma->mid),
10159 (unsigned long) ntohl (sma->qid));
10153 // TODO I guess this can happen, if the Ack from the peer comes before the Ack from the queue. 10160 // TODO I guess this can happen, if the Ack from the peer comes before the Ack from the queue.
10154 /* this should never happen */ 10161 /* this should never happen */
10155 /*GNUNET_break (0); 10162 /*GNUNET_break (0);
@@ -10776,7 +10783,7 @@ handle_add_queue_message (void *cls,
10776 NULL != queue; 10783 NULL != queue;
10777 queue = queue->next_client) 10784 queue = queue->next_client)
10778 { 10785 {
10779 if (queue->qid != aqm->qid) 10786 if (queue->qid != ntohl (aqm->qid))
10780 continue; 10787 continue;
10781 break; 10788 break;
10782 } 10789 }
@@ -10812,14 +10819,14 @@ handle_add_queue_message (void *cls,
10812 "New queue %s to %s available with QID %llu and q_len %lu and mtu %u\n", 10819 "New queue %s to %s available with QID %llu and q_len %lu and mtu %u\n",
10813 addr, 10820 addr,
10814 GNUNET_i2s (&aqm->receiver), 10821 GNUNET_i2s (&aqm->receiver),
10815 (unsigned long long) aqm->qid, 10822 (unsigned long) ntohl (aqm->qid),
10816 GNUNET_ntohll (aqm->q_len), 10823 GNUNET_ntohll (aqm->q_len),
10817 ntohl (aqm->mtu)); 10824 ntohl (aqm->mtu));
10818 queue = GNUNET_malloc (sizeof(struct Queue) + addr_len); 10825 queue = GNUNET_malloc (sizeof(struct Queue) + addr_len);
10819 queue->tc = tc; 10826 queue->tc = tc;
10820 queue->address = (const char *) &queue[1]; 10827 queue->address = (const char *) &queue[1];
10821 queue->pd.aged_rtt = GNUNET_TIME_UNIT_FOREVER_REL; 10828 queue->pd.aged_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
10822 queue->qid = aqm->qid; 10829 queue->qid = ntohl (aqm->qid);
10823 queue->neighbour = neighbour; 10830 queue->neighbour = neighbour;
10824 if (GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED == GNUNET_ntohll (aqm->q_len)) 10831 if (GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED == GNUNET_ntohll (aqm->q_len))
10825 queue->unlimited_length = GNUNET_YES; 10832 queue->unlimited_length = GNUNET_YES;
@@ -10877,14 +10884,14 @@ handle_update_queue_message (void *cls,
10877 10884
10878 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 10885 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
10879 "Received queue update message for %u with q_len %llu and mtu %u\n", 10886 "Received queue update message for %u with q_len %llu and mtu %u\n",
10880 msg->qid, 10887 ntohl (msg->qid),
10881 (unsigned long long) GNUNET_ntohll (msg->q_len), 10888 (unsigned long long) GNUNET_ntohll (msg->q_len),
10882 ntohl (msg->mtu)); 10889 ntohl (msg->mtu));
10883 for (target_queue = tc->details.communicator.queue_head; 10890 for (target_queue = tc->details.communicator.queue_head;
10884 NULL != target_queue; 10891 NULL != target_queue;
10885 target_queue = target_queue->next_client) 10892 target_queue = target_queue->next_client)
10886 { 10893 {
10887 if (msg->qid == target_queue->qid) 10894 if (ntohl (msg->qid) == target_queue->qid)
10888 break; 10895 break;
10889 } 10896 }
10890 if (NULL == target_queue) 10897 if (NULL == target_queue)
@@ -10905,7 +10912,7 @@ handle_update_queue_message (void *cls,
10905 target_queue->unlimited_length = GNUNET_YES; 10912 target_queue->unlimited_length = GNUNET_YES;
10906 else 10913 else
10907 target_queue->unlimited_length = GNUNET_NO; 10914 target_queue->unlimited_length = GNUNET_NO;
10908 target_queue->q_capacity = GNUNET_ntohll (msg->q_len); 10915 target_queue->q_capacity += GNUNET_ntohll (msg->q_len);
10909 if (0 < target_queue->q_capacity) 10916 if (0 < target_queue->q_capacity)
10910 schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO, 10917 schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
10911 target_queue, 10918 target_queue,
diff --git a/src/transport/transport.h b/src/transport/transport.h
index e060f81ba..38f1b220a 100644
--- a/src/transport/transport.h
+++ b/src/transport/transport.h
@@ -1022,6 +1022,11 @@ struct GNUNET_TRANSPORT_SendMessageToAck
1022 uint64_t mid GNUNET_PACKED; 1022 uint64_t mid GNUNET_PACKED;
1023 1023
1024 /** 1024 /**
1025 * Queue ID for the queue which was used to send the message.
1026 */
1027 uint32_t qid GNUNET_PACKED;
1028
1029 /**
1025 * Receiver identifier. 1030 * Receiver identifier.
1026 */ 1031 */
1027 struct GNUNET_PeerIdentity receiver; 1032 struct GNUNET_PeerIdentity receiver;
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c
index 079982ca5..3811f463f 100644
--- a/src/transport/transport_api2_communication.c
+++ b/src/transport/transport_api2_communication.c
@@ -106,6 +106,11 @@ struct AckPending
106 * More-or-less unique ID for the message. 106 * More-or-less unique ID for the message.
107 */ 107 */
108 uint64_t mid; 108 uint64_t mid;
109
110 /**
111 * Queue ID of the queue which will be used for the message.
112 */
113 uint32_t qid;
109}; 114};
110 115
111 116
@@ -639,7 +644,8 @@ static void
639send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, 644send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
640 int status, 645 int status,
641 const struct GNUNET_PeerIdentity *receiver, 646 const struct GNUNET_PeerIdentity *receiver,
642 uint64_t mid) 647 uint64_t mid,
648 uint32_t qid)
643{ 649{
644 struct GNUNET_MQ_Envelope *env; 650 struct GNUNET_MQ_Envelope *env;
645 struct GNUNET_TRANSPORT_SendMessageToAck *ack; 651 struct GNUNET_TRANSPORT_SendMessageToAck *ack;
@@ -647,6 +653,7 @@ send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
647 env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK); 653 env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
648 ack->status = htonl (status); 654 ack->status = htonl (status);
649 ack->mid = mid; 655 ack->mid = mid;
656 ack->qid = qid;
650 ack->receiver = *receiver; 657 ack->receiver = *receiver;
651 GNUNET_MQ_send (ch->mq, env); 658 GNUNET_MQ_send (ch->mq, env);
652} 659}
@@ -665,7 +672,7 @@ send_ack_cb (void *cls)
665 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch; 672 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
666 673
667 GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap); 674 GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
668 send_ack (ch, GNUNET_OK, &ap->receiver, ap->mid); 675 send_ack (ch, GNUNET_OK, &ap->receiver, ap->mid, ap->qid);
669 GNUNET_free (ap); 676 GNUNET_free (ap);
670} 677}
671 678
@@ -696,13 +703,14 @@ handle_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
696 /* queue is already gone, tell transport this one failed */ 703 /* queue is already gone, tell transport this one failed */
697 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 704 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
698 "Transmission failed, queue no longer exists.\n"); 705 "Transmission failed, queue no longer exists.\n");
699 send_ack (ch, GNUNET_NO, &smt->receiver, smt->mid); 706 send_ack (ch, GNUNET_NO, &smt->receiver, smt->mid, smt->qid);
700 return; 707 return;
701 } 708 }
702 ap = GNUNET_new (struct AckPending); 709 ap = GNUNET_new (struct AckPending);
703 ap->ch = ch; 710 ap->ch = ch;
704 ap->receiver = smt->receiver; 711 ap->receiver = smt->receiver;
705 ap->mid = smt->mid; 712 ap->mid = smt->mid;
713 ap->qid = smt->qid;
706 GNUNET_CONTAINER_DLL_insert (ch->ap_head, ch->ap_tail, ap); 714 GNUNET_CONTAINER_DLL_insert (ch->ap_head, ch->ap_tail, ap);
707 mh = (const struct GNUNET_MessageHeader *) &smt[1]; 715 mh = (const struct GNUNET_MessageHeader *) &smt[1];
708 env = GNUNET_MQ_msg_copy (mh); 716 env = GNUNET_MQ_msg_copy (mh);