diff options
author | Christian Grothoff <christian@grothoff.org> | 2010-01-26 13:29:45 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2010-01-26 13:29:45 +0000 |
commit | c2bd55c1e68522d685c1ac539949918ad612ce16 (patch) | |
tree | ad2eedff4af3e6f900631437482943e314287254 /src/fs | |
parent | 68907af3eb39fb03936a226f340d469af10d5014 (diff) | |
download | gnunet-c2bd55c1e68522d685c1ac539949918ad612ce16.tar.gz gnunet-c2bd55c1e68522d685c1ac539949918ad612ce16.zip |
stuff
Diffstat (limited to 'src/fs')
-rw-r--r-- | src/fs/gnunet-service-fs.c | 343 |
1 files changed, 194 insertions, 149 deletions
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 0339567f3..57b6dd421 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c | |||
@@ -50,8 +50,6 @@ | |||
50 | 50 | ||
51 | #define DEBUG_FS GNUNET_NO | 51 | #define DEBUG_FS GNUNET_NO |
52 | 52 | ||
53 | |||
54 | |||
55 | /** | 53 | /** |
56 | * Signature of a function that is called whenever a datastore | 54 | * Signature of a function that is called whenever a datastore |
57 | * request can be processed (or an entry put on the queue times out). | 55 | * request can be processed (or an entry put on the queue times out). |
@@ -398,6 +396,13 @@ struct PendingRequest | |||
398 | struct GNUNET_CORE_InformationRequestContext *irc; | 396 | struct GNUNET_CORE_InformationRequestContext *irc; |
399 | 397 | ||
400 | /** | 398 | /** |
399 | * Handle for an active request for transmission to this peer, or | ||
400 | * NULL. Only used for replies that we are trying to send to a peer | ||
401 | * that we are not yet connected to. | ||
402 | */ | ||
403 | struct GNUNET_CORE_TransmitHandle *cth; | ||
404 | |||
405 | /** | ||
401 | * Replies that we have received but were unable to forward yet | 406 | * Replies that we have received but were unable to forward yet |
402 | * (typically non-null only if we have a pending transmission | 407 | * (typically non-null only if we have a pending transmission |
403 | * request with the client or the respective peer). | 408 | * request with the client or the respective peer). |
@@ -405,15 +410,6 @@ struct PendingRequest | |||
405 | struct PendingMessage *replies_pending; | 410 | struct PendingMessage *replies_pending; |
406 | 411 | ||
407 | /** | 412 | /** |
408 | * Pending transmission request with the core service for the target | ||
409 | * peer (for processing of 'replies_pending') or Handle for a | ||
410 | * pending query-request for P2P-transmission with the core service. | ||
411 | * If non-NULL, this request must be cancelled should this struct be | ||
412 | * destroyed! | ||
413 | */ | ||
414 | struct GNUNET_CORE_TransmitHandle *cth; | ||
415 | |||
416 | /** | ||
417 | * Pending transmission request for the target client (for processing of | 413 | * Pending transmission request for the target client (for processing of |
418 | * 'replies_pending'). | 414 | * 'replies_pending'). |
419 | */ | 415 | */ |
@@ -558,8 +554,8 @@ struct ClientRequestList | |||
558 | 554 | ||
559 | 555 | ||
560 | /** | 556 | /** |
561 | * Linked list of all clients that we are | 557 | * Linked list of all clients that we are currently processing |
562 | * currently processing requests for. | 558 | * requests for. |
563 | */ | 559 | */ |
564 | struct ClientList | 560 | struct ClientList |
565 | { | 561 | { |
@@ -658,7 +654,7 @@ struct ConnectedPeer | |||
658 | * Handle for an active request for transmission to this | 654 | * Handle for an active request for transmission to this |
659 | * peer, or NULL. | 655 | * peer, or NULL. |
660 | */ | 656 | */ |
661 | struct GNUNET_CORE_PeerRequestHandle *prh; | 657 | struct GNUNET_CORE_TransmitHandle *cth; |
662 | 658 | ||
663 | /** | 659 | /** |
664 | * Messages (replies, queries, content migration) we would like to | 660 | * Messages (replies, queries, content migration) we would like to |
@@ -779,9 +775,6 @@ static uint64_t max_pending_requests = 32; | |||
779 | 775 | ||
780 | 776 | ||
781 | 777 | ||
782 | |||
783 | |||
784 | |||
785 | /** | 778 | /** |
786 | * Run the next DS request in our | 779 | * Run the next DS request in our |
787 | * queue, we're done with the current one. | 780 | * queue, we're done with the current one. |
@@ -950,8 +943,7 @@ transmit_local_result (void *cls, | |||
950 | 943 | ||
951 | 944 | ||
952 | /** | 945 | /** |
953 | * Mingle hash with the mingle_number to | 946 | * Mingle hash with the mingle_number to produce different bits. |
954 | * produce different bits. | ||
955 | */ | 947 | */ |
956 | static void | 948 | static void |
957 | mingle_hash (const GNUNET_HashCode * in, | 949 | mingle_hash (const GNUNET_HashCode * in, |
@@ -1114,12 +1106,10 @@ get_processing_delay () | |||
1114 | 1106 | ||
1115 | 1107 | ||
1116 | /** | 1108 | /** |
1117 | * Task that is run for each request with the | 1109 | * Task that is run for each request with the goal of forwarding the |
1118 | * goal of forwarding the associated query to | 1110 | * associated query to other peers. The task should re-schedule |
1119 | * other peers. The task should re-schedule | 1111 | * itself to be re-run once the TTL has expired. (or at a later time |
1120 | * itself to be re-run once the TTL has expired. | 1112 | * if more peers should be queried earlier). |
1121 | * (or at a later time if more peers should | ||
1122 | * be queried earlier). | ||
1123 | * | 1113 | * |
1124 | * @param cls the requests "struct PendingRequest*" | 1114 | * @param cls the requests "struct PendingRequest*" |
1125 | * @param tc task context (unused) | 1115 | * @param tc task context (unused) |
@@ -1130,9 +1120,9 @@ forward_request_task (void *cls, | |||
1130 | 1120 | ||
1131 | 1121 | ||
1132 | /** | 1122 | /** |
1133 | * We've selected a peer for forwarding of a query. | 1123 | * We've selected a peer for forwarding of a query. Construct the |
1134 | * Construct the message and then re-schedule the | 1124 | * message and then re-schedule the task to forward again to (other) |
1135 | * task to forward again to (other) peers. | 1125 | * peers. |
1136 | * | 1126 | * |
1137 | * @param cls closure | 1127 | * @param cls closure |
1138 | * @param size number of bytes available in buf | 1128 | * @param size number of bytes available in buf |
@@ -1144,29 +1134,102 @@ transmit_request_cb (void *cls, | |||
1144 | size_t size, | 1134 | size_t size, |
1145 | void *buf) | 1135 | void *buf) |
1146 | { | 1136 | { |
1137 | struct ConnectedPeer *cp = cls; | ||
1138 | char *cbuf = buf; | ||
1139 | struct GNUNET_PeerIdentity target; | ||
1140 | struct PendingMessage *pr; | ||
1141 | size_t tot; | ||
1142 | |||
1143 | cp->cth = NULL; | ||
1144 | tot = 0; | ||
1145 | while ( (NULL != (pr = cp->pending_messages)) && | ||
1146 | (pr->msize < size - tot) ) | ||
1147 | { | ||
1148 | memcpy (&cbuf[tot], | ||
1149 | &pr[1], | ||
1150 | pr->msize); | ||
1151 | tot += pr->msize; | ||
1152 | cp->pending_messages = pr->next; | ||
1153 | GNUNET_free (pr); | ||
1154 | } | ||
1155 | if (NULL != pr) | ||
1156 | { | ||
1157 | GNUNET_PEER_resolve (cp->pid, | ||
1158 | &target); | ||
1159 | cp->cth = GNUNET_CORE_notify_transmit_ready (core, | ||
1160 | pr->priority, | ||
1161 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1162 | &target, | ||
1163 | pr->msize, | ||
1164 | &transmit_request_cb, | ||
1165 | cp); | ||
1166 | } | ||
1167 | return tot; | ||
1168 | } | ||
1169 | |||
1170 | |||
1171 | /** | ||
1172 | * Function called after we've tried to reserve a certain amount of | ||
1173 | * bandwidth for a reply. Check if we succeeded and if so send our | ||
1174 | * query. | ||
1175 | * | ||
1176 | * @param cls the requests "struct PendingRequest*" | ||
1177 | * @param peer identifies the peer | ||
1178 | * @param bpm_in set to the current bandwidth limit (receiving) for this peer | ||
1179 | * @param bpm_out set to the current bandwidth limit (sending) for this peer | ||
1180 | * @param amount set to the amount that was actually reserved or unreserved | ||
1181 | * @param preference current traffic preference for the given peer | ||
1182 | */ | ||
1183 | static void | ||
1184 | target_reservation_cb (void *cls, | ||
1185 | const struct | ||
1186 | GNUNET_PeerIdentity * peer, | ||
1187 | unsigned int bpm_in, | ||
1188 | unsigned int bpm_out, | ||
1189 | int amount, | ||
1190 | uint64_t preference) | ||
1191 | { | ||
1147 | struct PendingRequest *pr = cls; | 1192 | struct PendingRequest *pr = cls; |
1193 | struct ConnectedPeer *cp; | ||
1194 | struct PendingMessage *pm; | ||
1195 | struct PendingMessage *pos; | ||
1196 | struct PendingMessage *prev; | ||
1148 | struct GetMessage *gm; | 1197 | struct GetMessage *gm; |
1149 | GNUNET_HashCode *ext; | 1198 | GNUNET_HashCode *ext; |
1150 | char *bfdata; | 1199 | char *bfdata; |
1151 | size_t msize; | 1200 | size_t msize; |
1152 | unsigned int k; | 1201 | unsigned int k; |
1153 | 1202 | ||
1154 | pr->cth = NULL; | 1203 | pr->task = GNUNET_SCHEDULER_add_delayed (sched, |
1155 | /* (1) check for timeout */ | 1204 | get_processing_delay (), // FIXME: longer? |
1156 | if (NULL == buf) | 1205 | &forward_request_task, |
1206 | pr); | ||
1207 | pr->irc = NULL; | ||
1208 | GNUNET_assert (peer != NULL); | ||
1209 | if (amount != DBLOCK_SIZE) | ||
1157 | { | 1210 | { |
1158 | /* timeout, try another peer immediately again */ | 1211 | /* FIXME: call stats... */ |
1159 | pr->task = GNUNET_SCHEDULER_add_with_priority (sched, | 1212 | return; /* this target round failed */ |
1160 | GNUNET_SCHEDULER_PRIORITY_IDLE, | 1213 | } |
1161 | &forward_request_task, | 1214 | // (2) transmit, update ttl/priority |
1162 | pr); | 1215 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, |
1163 | return 0; | 1216 | &peer->hashPubKey); |
1217 | if (cp == NULL) | ||
1218 | { | ||
1219 | /* Peer must have just left; try again immediately */ | ||
1220 | pr->task = GNUNET_SCHEDULER_add_now (sched, | ||
1221 | &forward_request_task, | ||
1222 | pr); | ||
1223 | return; | ||
1164 | } | 1224 | } |
1165 | /* (2) build query message */ | 1225 | /* build message and insert message into priority queue */ |
1166 | k = 0; // FIXME: count hash codes! | 1226 | k = 0; // FIXME: count hash codes! |
1167 | msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode); | 1227 | msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode); |
1168 | GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); | 1228 | GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); |
1169 | gm = (struct GetMessage*) buf; | 1229 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); |
1230 | pm->msize = msize; | ||
1231 | pm->priority = 0; // FIXME: calculate priority properly! | ||
1232 | gm = (struct GetMessage*) &pm[1]; | ||
1170 | gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); | 1233 | gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); |
1171 | gm->header.size = htons (msize); | 1234 | gm->header.size = htons (msize); |
1172 | gm->type = htonl (pr->type); | 1235 | gm->type = htonl (pr->type); |
@@ -1177,91 +1240,52 @@ transmit_request_cb (void *cls, | |||
1177 | gm->hash_bitmap = htonl (42); | 1240 | gm->hash_bitmap = htonl (42); |
1178 | gm->query = pr->query; | 1241 | gm->query = pr->query; |
1179 | ext = (GNUNET_HashCode*) &gm[1]; | 1242 | ext = (GNUNET_HashCode*) &gm[1]; |
1243 | |||
1180 | // FIXME: setup "ext[0]..[k-1]" | 1244 | // FIXME: setup "ext[0]..[k-1]" |
1181 | bfdata = (char *) &ext[k]; | 1245 | bfdata = (char *) &ext[k]; |
1182 | if (pr->bf != NULL) | 1246 | if (pr->bf != NULL) |
1183 | GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, | 1247 | GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, |
1184 | bfdata, | 1248 | bfdata, |
1185 | pr->bf_size); | 1249 | pr->bf_size); |
1186 | |||
1187 | /* (3) schedule job to do it again (or another peer, etc.) */ | ||
1188 | pr->task = GNUNET_SCHEDULER_add_delayed (sched, | ||
1189 | get_processing_delay (), // FIXME! | ||
1190 | &forward_request_task, | ||
1191 | pr); | ||
1192 | 1250 | ||
1193 | return msize; | ||
1194 | } | ||
1195 | 1251 | ||
1196 | 1252 | prev = NULL; | |
1197 | /** | 1253 | pos = cp->pending_messages; |
1198 | * Function called after we've tried to reserve | 1254 | while ( (pos != NULL) && |
1199 | * a certain amount of bandwidth for a reply. | 1255 | (pm->priority < pos->priority) ) |
1200 | * Check if we succeeded and if so send our query. | ||
1201 | * | ||
1202 | * @param cls the requests "struct PendingRequest*" | ||
1203 | * @param peer identifies the peer | ||
1204 | * @param bpm_in set to the current bandwidth limit (receiving) for this peer | ||
1205 | * @param bpm_out set to the current bandwidth limit (sending) for this peer | ||
1206 | * @param amount set to the amount that was actually reserved or unreserved | ||
1207 | * @param preference current traffic preference for the given peer | ||
1208 | */ | ||
1209 | static void | ||
1210 | target_reservation_cb (void *cls, | ||
1211 | const struct | ||
1212 | GNUNET_PeerIdentity * peer, | ||
1213 | unsigned int bpm_in, | ||
1214 | unsigned int bpm_out, | ||
1215 | int amount, | ||
1216 | uint64_t preference) | ||
1217 | { | ||
1218 | struct PendingRequest *pr = cls; | ||
1219 | uint32_t priority; | ||
1220 | uint16_t size; | ||
1221 | struct GNUNET_TIME_Relative maxdelay; | ||
1222 | |||
1223 | pr->irc = NULL; | ||
1224 | GNUNET_assert (peer != NULL); | ||
1225 | if ( (amount != DBLOCK_SIZE) || | ||
1226 | (pr->cth != NULL) ) | ||
1227 | { | 1256 | { |
1228 | /* try again later; FIXME: we may need to un-reserve "amount"? */ | 1257 | prev = pos; |
1229 | pr->task = GNUNET_SCHEDULER_add_delayed (sched, | 1258 | pos = pos->next; |
1230 | get_processing_delay (), // FIXME: longer? | ||
1231 | &forward_request_task, | ||
1232 | pr); | ||
1233 | return; | ||
1234 | } | 1259 | } |
1235 | // (2) transmit, update ttl/priority | 1260 | if (prev == NULL) |
1236 | // FIXME: calculate priority, maxdelay, size properly! | 1261 | cp->pending_messages = pm; |
1237 | priority = 0; | 1262 | else |
1238 | size = 60000; | 1263 | prev->next = pm; |
1239 | maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT; | 1264 | pm->next = pos; |
1240 | pr->cth = GNUNET_CORE_notify_transmit_ready (core, | 1265 | if (cp->cth == NULL) |
1241 | priority, | 1266 | cp->cth = GNUNET_CORE_notify_transmit_ready (core, |
1242 | maxdelay, | 1267 | cp->pending_messages->priority, |
1243 | peer, | 1268 | GNUNET_TIME_UNIT_FOREVER_REL, |
1244 | size, | 1269 | peer, |
1245 | &transmit_request_cb, | 1270 | msize, |
1246 | pr); | 1271 | &transmit_request_cb, |
1247 | if (pr->cth == NULL) | 1272 | cp); |
1273 | if (cp->cth == NULL) | ||
1248 | { | 1274 | { |
1249 | /* try again later */ | 1275 | /* technically, this should not be a 'break'; but |
1250 | pr->task = GNUNET_SCHEDULER_add_delayed (sched, | 1276 | we don't handle this (rare) case yet, so let's warn |
1251 | get_processing_delay (), // FIXME: longer? | 1277 | about it... */ |
1252 | &forward_request_task, | 1278 | GNUNET_break (0); |
1253 | pr); | 1279 | // FIXME: now what? |
1254 | } | 1280 | } |
1255 | } | 1281 | } |
1256 | 1282 | ||
1257 | 1283 | ||
1258 | /** | 1284 | /** |
1259 | * Task that is run for each request with the | 1285 | * Task that is run for each request with the goal of forwarding the |
1260 | * goal of forwarding the associated query to | 1286 | * associated query to other peers. The task should re-schedule |
1261 | * other peers. The task should re-schedule | 1287 | * itself to be re-run once the TTL has expired. (or at a later time |
1262 | * itself to be re-run once the TTL has expired. | 1288 | * if more peers should be queried earlier). |
1263 | * (or at a later time if more peers should | ||
1264 | * be queried earlier). | ||
1265 | * | 1289 | * |
1266 | * @param cls the requests "struct PendingRequest*" | 1290 | * @param cls the requests "struct PendingRequest*" |
1267 | * @param tc task context (unused) | 1291 | * @param tc task context (unused) |
@@ -1274,15 +1298,6 @@ forward_request_task (void *cls, | |||
1274 | struct PeerSelectionContext psc; | 1298 | struct PeerSelectionContext psc; |
1275 | 1299 | ||
1276 | pr->task = GNUNET_SCHEDULER_NO_TASK; | 1300 | pr->task = GNUNET_SCHEDULER_NO_TASK; |
1277 | if (pr->cth != NULL) | ||
1278 | { | ||
1279 | /* we're busy transmitting a result, wait a bit */ | ||
1280 | pr->task = GNUNET_SCHEDULER_add_delayed (sched, | ||
1281 | get_processing_delay (), | ||
1282 | &forward_request_task, | ||
1283 | pr); | ||
1284 | return; | ||
1285 | } | ||
1286 | /* (1) select target */ | 1301 | /* (1) select target */ |
1287 | psc.pr = pr; | 1302 | psc.pr = pr; |
1288 | psc.target_score = DBL_MIN; | 1303 | psc.target_score = DBL_MIN; |
@@ -1301,13 +1316,13 @@ forward_request_task (void *cls, | |||
1301 | /* (2) reserve reply bandwidth */ | 1316 | /* (2) reserve reply bandwidth */ |
1302 | GNUNET_assert (NULL == pr->irc); | 1317 | GNUNET_assert (NULL == pr->irc); |
1303 | pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg, | 1318 | pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg, |
1304 | &psc.target, | 1319 | &psc.target, |
1305 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, | 1320 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, |
1306 | -1, | 1321 | -1, |
1307 | DBLOCK_SIZE, // FIXME: make dependent on type? | 1322 | DBLOCK_SIZE, // FIXME: make dependent on type? |
1308 | 0, | 1323 | 0, |
1309 | &target_reservation_cb, | 1324 | &target_reservation_cb, |
1310 | pr); | 1325 | pr); |
1311 | } | 1326 | } |
1312 | 1327 | ||
1313 | 1328 | ||
@@ -1668,8 +1683,6 @@ destroy_pending_request (struct PendingRequest *pr) | |||
1668 | } | 1683 | } |
1669 | if (GNUNET_SCHEDULER_NO_TASK != pr->task) | 1684 | if (GNUNET_SCHEDULER_NO_TASK != pr->task) |
1670 | GNUNET_SCHEDULER_cancel (sched, pr->task); | 1685 | GNUNET_SCHEDULER_cancel (sched, pr->task); |
1671 | if (NULL != pr->cth) | ||
1672 | GNUNET_CORE_notify_transmit_ready_cancel (pr->cth); | ||
1673 | if (NULL != pr->bf) | 1686 | if (NULL != pr->bf) |
1674 | GNUNET_CONTAINER_bloomfilter_free (pr->bf); | 1687 | GNUNET_CONTAINER_bloomfilter_free (pr->bf); |
1675 | if (NULL != pr->th) | 1688 | if (NULL != pr->th) |
@@ -1679,6 +1692,8 @@ destroy_pending_request (struct PendingRequest *pr) | |||
1679 | pr->replies_pending = reply->next; | 1692 | pr->replies_pending = reply->next; |
1680 | GNUNET_free (reply); | 1693 | GNUNET_free (reply); |
1681 | } | 1694 | } |
1695 | if (NULL != pr->cth) | ||
1696 | GNUNET_CORE_notify_transmit_ready_cancel (pr->cth); | ||
1682 | GNUNET_PEER_change_rc (pr->source_pid, -1); | 1697 | GNUNET_PEER_change_rc (pr->source_pid, -1); |
1683 | GNUNET_PEER_change_rc (pr->target_pid, -1); | 1698 | GNUNET_PEER_change_rc (pr->target_pid, -1); |
1684 | GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off); | 1699 | GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off); |
@@ -1862,12 +1877,23 @@ peer_disconnect_handler (void *cls, | |||
1862 | GNUNET_PeerIdentity * peer) | 1877 | GNUNET_PeerIdentity * peer) |
1863 | { | 1878 | { |
1864 | struct ConnectedPeer *cp; | 1879 | struct ConnectedPeer *cp; |
1880 | struct PendingMessage *pm; | ||
1865 | 1881 | ||
1866 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, | 1882 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, |
1867 | &peer->hashPubKey); | 1883 | &peer->hashPubKey); |
1868 | GNUNET_PEER_change_rc (cp->pid, -1); | 1884 | if (cp != NULL) |
1869 | GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE); | 1885 | { |
1870 | GNUNET_free (cp); | 1886 | GNUNET_PEER_change_rc (cp->pid, -1); |
1887 | GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE); | ||
1888 | if (NULL != cp->cth) | ||
1889 | GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); | ||
1890 | while (NULL != (pm = cp->pending_messages)) | ||
1891 | { | ||
1892 | cp->pending_messages = pm->next; | ||
1893 | GNUNET_free (pm); | ||
1894 | } | ||
1895 | GNUNET_free (cp); | ||
1896 | } | ||
1871 | GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer, | 1897 | GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer, |
1872 | &peer->hashPubKey, | 1898 | &peer->hashPubKey, |
1873 | &destroy_request, | 1899 | &destroy_request, |
@@ -1876,9 +1902,8 @@ peer_disconnect_handler (void *cls, | |||
1876 | 1902 | ||
1877 | 1903 | ||
1878 | /** | 1904 | /** |
1879 | * We're processing a GET request from | 1905 | * We're processing a GET request from another peer and have decided |
1880 | * another peer and have decided to forward | 1906 | * to forward it to other peers. |
1881 | * it to other peers. | ||
1882 | * | 1907 | * |
1883 | * @param cls our "struct ProcessGetContext *" | 1908 | * @param cls our "struct ProcessGetContext *" |
1884 | * @param tc unused | 1909 | * @param tc unused |
@@ -2407,7 +2432,7 @@ transmit_result (void *cls, | |||
2407 | 2432 | ||
2408 | 2433 | ||
2409 | /** | 2434 | /** |
2410 | * Iterator over pending requests. | 2435 | * We have received a reply; handle it! |
2411 | * | 2436 | * |
2412 | * @param cls response (struct ProcessReplyClosure) | 2437 | * @param cls response (struct ProcessReplyClosure) |
2413 | * @param key our query | 2438 | * @param key our query |
@@ -2425,6 +2450,7 @@ process_reply (void *cls, | |||
2425 | struct PendingMessage *reply; | 2450 | struct PendingMessage *reply; |
2426 | struct PutMessage *pm; | 2451 | struct PutMessage *pm; |
2427 | struct ContentMessage *cm; | 2452 | struct ContentMessage *cm; |
2453 | struct ConnectedPeer *cp; | ||
2428 | GNUNET_HashCode chash; | 2454 | GNUNET_HashCode chash; |
2429 | GNUNET_HashCode mhash; | 2455 | GNUNET_HashCode mhash; |
2430 | struct GNUNET_PeerIdentity target; | 2456 | struct GNUNET_PeerIdentity target; |
@@ -2472,19 +2498,20 @@ process_reply (void *cls, | |||
2472 | } | 2498 | } |
2473 | if (pr->client == NULL) | 2499 | if (pr->client == NULL) |
2474 | { | 2500 | { |
2501 | GNUNET_PEER_resolve (pr->source_pid, | ||
2502 | &target); | ||
2503 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, | ||
2504 | &target.hashPubKey); | ||
2475 | msize = sizeof (struct ContentMessage) + prq->size; | 2505 | msize = sizeof (struct ContentMessage) + prq->size; |
2476 | reply = GNUNET_malloc (msize + sizeof (struct PendingMessage)); | 2506 | reply = GNUNET_malloc (msize + sizeof (struct PendingMessage)); |
2477 | reply->msize = msize; | 2507 | reply->msize = msize; |
2508 | reply->priority = (uint32_t) -1; /* send replies first! */ | ||
2478 | cm = (struct ContentMessage*) &reply[1]; | 2509 | cm = (struct ContentMessage*) &reply[1]; |
2479 | cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT); | 2510 | cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT); |
2480 | cm->header.size = htons (msize); | 2511 | cm->header.size = htons (msize); |
2481 | cm->type = htonl (prq->type); | 2512 | cm->type = htonl (prq->type); |
2482 | cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration); | 2513 | cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration); |
2483 | reply->next = pr->replies_pending; | ||
2484 | pr->replies_pending = reply; | ||
2485 | memcpy (&reply[1], prq->data, prq->size); | 2514 | memcpy (&reply[1], prq->data, prq->size); |
2486 | if (pr->cth != NULL) | ||
2487 | return GNUNET_YES; | ||
2488 | max_delay = GNUNET_TIME_UNIT_FOREVER_REL; | 2515 | max_delay = GNUNET_TIME_UNIT_FOREVER_REL; |
2489 | if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests) | 2516 | if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests) |
2490 | { | 2517 | { |
@@ -2494,18 +2521,37 @@ process_reply (void *cls, | |||
2494 | max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time, | 2521 | max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time, |
2495 | eer->start_time); | 2522 | eer->start_time); |
2496 | } | 2523 | } |
2497 | GNUNET_PEER_resolve (pr->source_pid, | 2524 | |
2498 | &target); | 2525 | if (cp == NULL) |
2499 | pr->cth = GNUNET_CORE_notify_transmit_ready (core, | 2526 | { |
2500 | prio, | 2527 | /* FIXME: bound queue size! */ |
2501 | max_delay, | 2528 | reply->next = pr->replies_pending; |
2502 | &target, | 2529 | pr->replies_pending = reply; |
2503 | msize, | 2530 | if (pr->cth == NULL) |
2504 | &transmit_result, | 2531 | { |
2505 | pr); | 2532 | /* implicitly tries to connect */ |
2506 | if (NULL == pr->cth) | 2533 | pr->cth = GNUNET_CORE_notify_transmit_ready (core, |
2534 | prio, | ||
2535 | max_delay, | ||
2536 | &target, | ||
2537 | msize, | ||
2538 | &transmit_result, | ||
2539 | pr); | ||
2540 | } | ||
2541 | } | ||
2542 | else | ||
2507 | { | 2543 | { |
2508 | // FIXME: now what? discard? | 2544 | /* insert replies always at the head */ |
2545 | reply->next = cp->pending_messages; | ||
2546 | cp->pending_messages = reply; | ||
2547 | if (cp->cth == NULL) | ||
2548 | cp->cth = GNUNET_CORE_notify_transmit_ready (core, | ||
2549 | reply->priority, | ||
2550 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
2551 | &target, | ||
2552 | msize, | ||
2553 | &transmit_request_cb, | ||
2554 | cp); | ||
2509 | } | 2555 | } |
2510 | } | 2556 | } |
2511 | else | 2557 | else |
@@ -2746,7 +2792,6 @@ run (void *cls, | |||
2746 | sched = s; | 2792 | sched = s; |
2747 | cfg = c; | 2793 | cfg = c; |
2748 | 2794 | ||
2749 | |||
2750 | requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config | 2795 | requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config |
2751 | requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config | 2796 | requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config |
2752 | connected_peers = GNUNET_CONTAINER_multihashmap_create (64); | 2797 | connected_peers = GNUNET_CONTAINER_multihashmap_create (64); |