aboutsummaryrefslogtreecommitdiff
path: root/src/fs
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2010-01-26 13:29:45 +0000
committerChristian Grothoff <christian@grothoff.org>2010-01-26 13:29:45 +0000
commitc2bd55c1e68522d685c1ac539949918ad612ce16 (patch)
treead2eedff4af3e6f900631437482943e314287254 /src/fs
parent68907af3eb39fb03936a226f340d469af10d5014 (diff)
downloadgnunet-c2bd55c1e68522d685c1ac539949918ad612ce16.tar.gz
gnunet-c2bd55c1e68522d685c1ac539949918ad612ce16.zip
stuff
Diffstat (limited to 'src/fs')
-rw-r--r--src/fs/gnunet-service-fs.c343
1 files changed, 194 insertions, 149 deletions
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index 0339567f3..57b6dd421 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -50,8 +50,6 @@
50 50
51#define DEBUG_FS GNUNET_NO 51#define DEBUG_FS GNUNET_NO
52 52
53
54
55/** 53/**
56 * Signature of a function that is called whenever a datastore 54 * Signature of a function that is called whenever a datastore
57 * request can be processed (or an entry put on the queue times out). 55 * request can be processed (or an entry put on the queue times out).
@@ -398,6 +396,13 @@ struct PendingRequest
398 struct GNUNET_CORE_InformationRequestContext *irc; 396 struct GNUNET_CORE_InformationRequestContext *irc;
399 397
400 /** 398 /**
399 * Handle for an active request for transmission to this peer, or
400 * NULL. Only used for replies that we are trying to send to a peer
401 * that we are not yet connected to.
402 */
403 struct GNUNET_CORE_TransmitHandle *cth;
404
405 /**
401 * Replies that we have received but were unable to forward yet 406 * Replies that we have received but were unable to forward yet
402 * (typically non-null only if we have a pending transmission 407 * (typically non-null only if we have a pending transmission
403 * request with the client or the respective peer). 408 * request with the client or the respective peer).
@@ -405,15 +410,6 @@ struct PendingRequest
405 struct PendingMessage *replies_pending; 410 struct PendingMessage *replies_pending;
406 411
407 /** 412 /**
408 * Pending transmission request with the core service for the target
409 * peer (for processing of 'replies_pending') or Handle for a
410 * pending query-request for P2P-transmission with the core service.
411 * If non-NULL, this request must be cancelled should this struct be
412 * destroyed!
413 */
414 struct GNUNET_CORE_TransmitHandle *cth;
415
416 /**
417 * Pending transmission request for the target client (for processing of 413 * Pending transmission request for the target client (for processing of
418 * 'replies_pending'). 414 * 'replies_pending').
419 */ 415 */
@@ -558,8 +554,8 @@ struct ClientRequestList
558 554
559 555
560/** 556/**
561 * Linked list of all clients that we are 557 * Linked list of all clients that we are currently processing
562 * currently processing requests for. 558 * requests for.
563 */ 559 */
564struct ClientList 560struct ClientList
565{ 561{
@@ -658,7 +654,7 @@ struct ConnectedPeer
658 * Handle for an active request for transmission to this 654 * Handle for an active request for transmission to this
659 * peer, or NULL. 655 * peer, or NULL.
660 */ 656 */
661 struct GNUNET_CORE_PeerRequestHandle *prh; 657 struct GNUNET_CORE_TransmitHandle *cth;
662 658
663 /** 659 /**
664 * Messages (replies, queries, content migration) we would like to 660 * Messages (replies, queries, content migration) we would like to
@@ -779,9 +775,6 @@ static uint64_t max_pending_requests = 32;
779 775
780 776
781 777
782
783
784
785/** 778/**
786 * Run the next DS request in our 779 * Run the next DS request in our
787 * queue, we're done with the current one. 780 * queue, we're done with the current one.
@@ -950,8 +943,7 @@ transmit_local_result (void *cls,
950 943
951 944
952/** 945/**
953 * Mingle hash with the mingle_number to 946 * Mingle hash with the mingle_number to produce different bits.
954 * produce different bits.
955 */ 947 */
956static void 948static void
957mingle_hash (const GNUNET_HashCode * in, 949mingle_hash (const GNUNET_HashCode * in,
@@ -1114,12 +1106,10 @@ get_processing_delay ()
1114 1106
1115 1107
1116/** 1108/**
1117 * Task that is run for each request with the 1109 * Task that is run for each request with the goal of forwarding the
1118 * goal of forwarding the associated query to 1110 * associated query to other peers. The task should re-schedule
1119 * other peers. The task should re-schedule 1111 * itself to be re-run once the TTL has expired. (or at a later time
1120 * itself to be re-run once the TTL has expired. 1112 * if more peers should be queried earlier).
1121 * (or at a later time if more peers should
1122 * be queried earlier).
1123 * 1113 *
1124 * @param cls the requests "struct PendingRequest*" 1114 * @param cls the requests "struct PendingRequest*"
1125 * @param tc task context (unused) 1115 * @param tc task context (unused)
@@ -1130,9 +1120,9 @@ forward_request_task (void *cls,
1130 1120
1131 1121
1132/** 1122/**
1133 * We've selected a peer for forwarding of a query. 1123 * We've selected a peer for forwarding of a query. Construct the
1134 * Construct the message and then re-schedule the 1124 * message and then re-schedule the task to forward again to (other)
1135 * task to forward again to (other) peers. 1125 * peers.
1136 * 1126 *
1137 * @param cls closure 1127 * @param cls closure
1138 * @param size number of bytes available in buf 1128 * @param size number of bytes available in buf
@@ -1144,29 +1134,102 @@ transmit_request_cb (void *cls,
1144 size_t size, 1134 size_t size,
1145 void *buf) 1135 void *buf)
1146{ 1136{
1137 struct ConnectedPeer *cp = cls;
1138 char *cbuf = buf;
1139 struct GNUNET_PeerIdentity target;
1140 struct PendingMessage *pr;
1141 size_t tot;
1142
1143 cp->cth = NULL;
1144 tot = 0;
1145 while ( (NULL != (pr = cp->pending_messages)) &&
1146 (pr->msize < size - tot) )
1147 {
1148 memcpy (&cbuf[tot],
1149 &pr[1],
1150 pr->msize);
1151 tot += pr->msize;
1152 cp->pending_messages = pr->next;
1153 GNUNET_free (pr);
1154 }
1155 if (NULL != pr)
1156 {
1157 GNUNET_PEER_resolve (cp->pid,
1158 &target);
1159 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1160 pr->priority,
1161 GNUNET_TIME_UNIT_FOREVER_REL,
1162 &target,
1163 pr->msize,
1164 &transmit_request_cb,
1165 cp);
1166 }
1167 return tot;
1168}
1169
1170
1171/**
1172 * Function called after we've tried to reserve a certain amount of
1173 * bandwidth for a reply. Check if we succeeded and if so send our
1174 * query.
1175 *
1176 * @param cls the requests "struct PendingRequest*"
1177 * @param peer identifies the peer
1178 * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1179 * @param bpm_out set to the current bandwidth limit (sending) for this peer
1180 * @param amount set to the amount that was actually reserved or unreserved
1181 * @param preference current traffic preference for the given peer
1182 */
1183static void
1184target_reservation_cb (void *cls,
1185 const struct
1186 GNUNET_PeerIdentity * peer,
1187 unsigned int bpm_in,
1188 unsigned int bpm_out,
1189 int amount,
1190 uint64_t preference)
1191{
1147 struct PendingRequest *pr = cls; 1192 struct PendingRequest *pr = cls;
1193 struct ConnectedPeer *cp;
1194 struct PendingMessage *pm;
1195 struct PendingMessage *pos;
1196 struct PendingMessage *prev;
1148 struct GetMessage *gm; 1197 struct GetMessage *gm;
1149 GNUNET_HashCode *ext; 1198 GNUNET_HashCode *ext;
1150 char *bfdata; 1199 char *bfdata;
1151 size_t msize; 1200 size_t msize;
1152 unsigned int k; 1201 unsigned int k;
1153 1202
1154 pr->cth = NULL; 1203 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1155 /* (1) check for timeout */ 1204 get_processing_delay (), // FIXME: longer?
1156 if (NULL == buf) 1205 &forward_request_task,
1206 pr);
1207 pr->irc = NULL;
1208 GNUNET_assert (peer != NULL);
1209 if (amount != DBLOCK_SIZE)
1157 { 1210 {
1158 /* timeout, try another peer immediately again */ 1211 /* FIXME: call stats... */
1159 pr->task = GNUNET_SCHEDULER_add_with_priority (sched, 1212 return; /* this target round failed */
1160 GNUNET_SCHEDULER_PRIORITY_IDLE, 1213 }
1161 &forward_request_task, 1214 // (2) transmit, update ttl/priority
1162 pr); 1215 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1163 return 0; 1216 &peer->hashPubKey);
1217 if (cp == NULL)
1218 {
1219 /* Peer must have just left; try again immediately */
1220 pr->task = GNUNET_SCHEDULER_add_now (sched,
1221 &forward_request_task,
1222 pr);
1223 return;
1164 } 1224 }
1165 /* (2) build query message */ 1225 /* build message and insert message into priority queue */
1166 k = 0; // FIXME: count hash codes! 1226 k = 0; // FIXME: count hash codes!
1167 msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode); 1227 msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
1168 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); 1228 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1169 gm = (struct GetMessage*) buf; 1229 pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1230 pm->msize = msize;
1231 pm->priority = 0; // FIXME: calculate priority properly!
1232 gm = (struct GetMessage*) &pm[1];
1170 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); 1233 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
1171 gm->header.size = htons (msize); 1234 gm->header.size = htons (msize);
1172 gm->type = htonl (pr->type); 1235 gm->type = htonl (pr->type);
@@ -1177,91 +1240,52 @@ transmit_request_cb (void *cls,
1177 gm->hash_bitmap = htonl (42); 1240 gm->hash_bitmap = htonl (42);
1178 gm->query = pr->query; 1241 gm->query = pr->query;
1179 ext = (GNUNET_HashCode*) &gm[1]; 1242 ext = (GNUNET_HashCode*) &gm[1];
1243
1180 // FIXME: setup "ext[0]..[k-1]" 1244 // FIXME: setup "ext[0]..[k-1]"
1181 bfdata = (char *) &ext[k]; 1245 bfdata = (char *) &ext[k];
1182 if (pr->bf != NULL) 1246 if (pr->bf != NULL)
1183 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, 1247 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
1184 bfdata, 1248 bfdata,
1185 pr->bf_size); 1249 pr->bf_size);
1186
1187 /* (3) schedule job to do it again (or another peer, etc.) */
1188 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1189 get_processing_delay (), // FIXME!
1190 &forward_request_task,
1191 pr);
1192 1250
1193 return msize;
1194}
1195 1251
1196 1252 prev = NULL;
1197/** 1253 pos = cp->pending_messages;
1198 * Function called after we've tried to reserve 1254 while ( (pos != NULL) &&
1199 * a certain amount of bandwidth for a reply. 1255 (pm->priority < pos->priority) )
1200 * Check if we succeeded and if so send our query.
1201 *
1202 * @param cls the requests "struct PendingRequest*"
1203 * @param peer identifies the peer
1204 * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1205 * @param bpm_out set to the current bandwidth limit (sending) for this peer
1206 * @param amount set to the amount that was actually reserved or unreserved
1207 * @param preference current traffic preference for the given peer
1208 */
1209static void
1210target_reservation_cb (void *cls,
1211 const struct
1212 GNUNET_PeerIdentity * peer,
1213 unsigned int bpm_in,
1214 unsigned int bpm_out,
1215 int amount,
1216 uint64_t preference)
1217{
1218 struct PendingRequest *pr = cls;
1219 uint32_t priority;
1220 uint16_t size;
1221 struct GNUNET_TIME_Relative maxdelay;
1222
1223 pr->irc = NULL;
1224 GNUNET_assert (peer != NULL);
1225 if ( (amount != DBLOCK_SIZE) ||
1226 (pr->cth != NULL) )
1227 { 1256 {
1228 /* try again later; FIXME: we may need to un-reserve "amount"? */ 1257 prev = pos;
1229 pr->task = GNUNET_SCHEDULER_add_delayed (sched, 1258 pos = pos->next;
1230 get_processing_delay (), // FIXME: longer?
1231 &forward_request_task,
1232 pr);
1233 return;
1234 } 1259 }
1235 // (2) transmit, update ttl/priority 1260 if (prev == NULL)
1236 // FIXME: calculate priority, maxdelay, size properly! 1261 cp->pending_messages = pm;
1237 priority = 0; 1262 else
1238 size = 60000; 1263 prev->next = pm;
1239 maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT; 1264 pm->next = pos;
1240 pr->cth = GNUNET_CORE_notify_transmit_ready (core, 1265 if (cp->cth == NULL)
1241 priority, 1266 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1242 maxdelay, 1267 cp->pending_messages->priority,
1243 peer, 1268 GNUNET_TIME_UNIT_FOREVER_REL,
1244 size, 1269 peer,
1245 &transmit_request_cb, 1270 msize,
1246 pr); 1271 &transmit_request_cb,
1247 if (pr->cth == NULL) 1272 cp);
1273 if (cp->cth == NULL)
1248 { 1274 {
1249 /* try again later */ 1275 /* technically, this should not be a 'break'; but
1250 pr->task = GNUNET_SCHEDULER_add_delayed (sched, 1276 we don't handle this (rare) case yet, so let's warn
1251 get_processing_delay (), // FIXME: longer? 1277 about it... */
1252 &forward_request_task, 1278 GNUNET_break (0);
1253 pr); 1279 // FIXME: now what?
1254 } 1280 }
1255} 1281}
1256 1282
1257 1283
1258/** 1284/**
1259 * Task that is run for each request with the 1285 * Task that is run for each request with the goal of forwarding the
1260 * goal of forwarding the associated query to 1286 * associated query to other peers. The task should re-schedule
1261 * other peers. The task should re-schedule 1287 * itself to be re-run once the TTL has expired. (or at a later time
1262 * itself to be re-run once the TTL has expired. 1288 * if more peers should be queried earlier).
1263 * (or at a later time if more peers should
1264 * be queried earlier).
1265 * 1289 *
1266 * @param cls the requests "struct PendingRequest*" 1290 * @param cls the requests "struct PendingRequest*"
1267 * @param tc task context (unused) 1291 * @param tc task context (unused)
@@ -1274,15 +1298,6 @@ forward_request_task (void *cls,
1274 struct PeerSelectionContext psc; 1298 struct PeerSelectionContext psc;
1275 1299
1276 pr->task = GNUNET_SCHEDULER_NO_TASK; 1300 pr->task = GNUNET_SCHEDULER_NO_TASK;
1277 if (pr->cth != NULL)
1278 {
1279 /* we're busy transmitting a result, wait a bit */
1280 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1281 get_processing_delay (),
1282 &forward_request_task,
1283 pr);
1284 return;
1285 }
1286 /* (1) select target */ 1301 /* (1) select target */
1287 psc.pr = pr; 1302 psc.pr = pr;
1288 psc.target_score = DBL_MIN; 1303 psc.target_score = DBL_MIN;
@@ -1301,13 +1316,13 @@ forward_request_task (void *cls,
1301 /* (2) reserve reply bandwidth */ 1316 /* (2) reserve reply bandwidth */
1302 GNUNET_assert (NULL == pr->irc); 1317 GNUNET_assert (NULL == pr->irc);
1303 pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg, 1318 pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
1304 &psc.target, 1319 &psc.target,
1305 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 1320 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1306 -1, 1321 -1,
1307 DBLOCK_SIZE, // FIXME: make dependent on type? 1322 DBLOCK_SIZE, // FIXME: make dependent on type?
1308 0, 1323 0,
1309 &target_reservation_cb, 1324 &target_reservation_cb,
1310 pr); 1325 pr);
1311} 1326}
1312 1327
1313 1328
@@ -1668,8 +1683,6 @@ destroy_pending_request (struct PendingRequest *pr)
1668 } 1683 }
1669 if (GNUNET_SCHEDULER_NO_TASK != pr->task) 1684 if (GNUNET_SCHEDULER_NO_TASK != pr->task)
1670 GNUNET_SCHEDULER_cancel (sched, pr->task); 1685 GNUNET_SCHEDULER_cancel (sched, pr->task);
1671 if (NULL != pr->cth)
1672 GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
1673 if (NULL != pr->bf) 1686 if (NULL != pr->bf)
1674 GNUNET_CONTAINER_bloomfilter_free (pr->bf); 1687 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1675 if (NULL != pr->th) 1688 if (NULL != pr->th)
@@ -1679,6 +1692,8 @@ destroy_pending_request (struct PendingRequest *pr)
1679 pr->replies_pending = reply->next; 1692 pr->replies_pending = reply->next;
1680 GNUNET_free (reply); 1693 GNUNET_free (reply);
1681 } 1694 }
1695 if (NULL != pr->cth)
1696 GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
1682 GNUNET_PEER_change_rc (pr->source_pid, -1); 1697 GNUNET_PEER_change_rc (pr->source_pid, -1);
1683 GNUNET_PEER_change_rc (pr->target_pid, -1); 1698 GNUNET_PEER_change_rc (pr->target_pid, -1);
1684 GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off); 1699 GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
@@ -1862,12 +1877,23 @@ peer_disconnect_handler (void *cls,
1862 GNUNET_PeerIdentity * peer) 1877 GNUNET_PeerIdentity * peer)
1863{ 1878{
1864 struct ConnectedPeer *cp; 1879 struct ConnectedPeer *cp;
1880 struct PendingMessage *pm;
1865 1881
1866 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, 1882 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1867 &peer->hashPubKey); 1883 &peer->hashPubKey);
1868 GNUNET_PEER_change_rc (cp->pid, -1); 1884 if (cp != NULL)
1869 GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE); 1885 {
1870 GNUNET_free (cp); 1886 GNUNET_PEER_change_rc (cp->pid, -1);
1887 GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1888 if (NULL != cp->cth)
1889 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1890 while (NULL != (pm = cp->pending_messages))
1891 {
1892 cp->pending_messages = pm->next;
1893 GNUNET_free (pm);
1894 }
1895 GNUNET_free (cp);
1896 }
1871 GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer, 1897 GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
1872 &peer->hashPubKey, 1898 &peer->hashPubKey,
1873 &destroy_request, 1899 &destroy_request,
@@ -1876,9 +1902,8 @@ peer_disconnect_handler (void *cls,
1876 1902
1877 1903
1878/** 1904/**
1879 * We're processing a GET request from 1905 * We're processing a GET request from another peer and have decided
1880 * another peer and have decided to forward 1906 * to forward it to other peers.
1881 * it to other peers.
1882 * 1907 *
1883 * @param cls our "struct ProcessGetContext *" 1908 * @param cls our "struct ProcessGetContext *"
1884 * @param tc unused 1909 * @param tc unused
@@ -2407,7 +2432,7 @@ transmit_result (void *cls,
2407 2432
2408 2433
2409/** 2434/**
2410 * Iterator over pending requests. 2435 * We have received a reply; handle it!
2411 * 2436 *
2412 * @param cls response (struct ProcessReplyClosure) 2437 * @param cls response (struct ProcessReplyClosure)
2413 * @param key our query 2438 * @param key our query
@@ -2425,6 +2450,7 @@ process_reply (void *cls,
2425 struct PendingMessage *reply; 2450 struct PendingMessage *reply;
2426 struct PutMessage *pm; 2451 struct PutMessage *pm;
2427 struct ContentMessage *cm; 2452 struct ContentMessage *cm;
2453 struct ConnectedPeer *cp;
2428 GNUNET_HashCode chash; 2454 GNUNET_HashCode chash;
2429 GNUNET_HashCode mhash; 2455 GNUNET_HashCode mhash;
2430 struct GNUNET_PeerIdentity target; 2456 struct GNUNET_PeerIdentity target;
@@ -2472,19 +2498,20 @@ process_reply (void *cls,
2472 } 2498 }
2473 if (pr->client == NULL) 2499 if (pr->client == NULL)
2474 { 2500 {
2501 GNUNET_PEER_resolve (pr->source_pid,
2502 &target);
2503 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2504 &target.hashPubKey);
2475 msize = sizeof (struct ContentMessage) + prq->size; 2505 msize = sizeof (struct ContentMessage) + prq->size;
2476 reply = GNUNET_malloc (msize + sizeof (struct PendingMessage)); 2506 reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
2477 reply->msize = msize; 2507 reply->msize = msize;
2508 reply->priority = (uint32_t) -1; /* send replies first! */
2478 cm = (struct ContentMessage*) &reply[1]; 2509 cm = (struct ContentMessage*) &reply[1];
2479 cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT); 2510 cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
2480 cm->header.size = htons (msize); 2511 cm->header.size = htons (msize);
2481 cm->type = htonl (prq->type); 2512 cm->type = htonl (prq->type);
2482 cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration); 2513 cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2483 reply->next = pr->replies_pending;
2484 pr->replies_pending = reply;
2485 memcpy (&reply[1], prq->data, prq->size); 2514 memcpy (&reply[1], prq->data, prq->size);
2486 if (pr->cth != NULL)
2487 return GNUNET_YES;
2488 max_delay = GNUNET_TIME_UNIT_FOREVER_REL; 2515 max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2489 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests) 2516 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
2490 { 2517 {
@@ -2494,18 +2521,37 @@ process_reply (void *cls,
2494 max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time, 2521 max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
2495 eer->start_time); 2522 eer->start_time);
2496 } 2523 }
2497 GNUNET_PEER_resolve (pr->source_pid, 2524
2498 &target); 2525 if (cp == NULL)
2499 pr->cth = GNUNET_CORE_notify_transmit_ready (core, 2526 {
2500 prio, 2527 /* FIXME: bound queue size! */
2501 max_delay, 2528 reply->next = pr->replies_pending;
2502 &target, 2529 pr->replies_pending = reply;
2503 msize, 2530 if (pr->cth == NULL)
2504 &transmit_result, 2531 {
2505 pr); 2532 /* implicitly tries to connect */
2506 if (NULL == pr->cth) 2533 pr->cth = GNUNET_CORE_notify_transmit_ready (core,
2534 prio,
2535 max_delay,
2536 &target,
2537 msize,
2538 &transmit_result,
2539 pr);
2540 }
2541 }
2542 else
2507 { 2543 {
2508 // FIXME: now what? discard? 2544 /* insert replies always at the head */
2545 reply->next = cp->pending_messages;
2546 cp->pending_messages = reply;
2547 if (cp->cth == NULL)
2548 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2549 reply->priority,
2550 GNUNET_TIME_UNIT_FOREVER_REL,
2551 &target,
2552 msize,
2553 &transmit_request_cb,
2554 cp);
2509 } 2555 }
2510 } 2556 }
2511 else 2557 else
@@ -2746,7 +2792,6 @@ run (void *cls,
2746 sched = s; 2792 sched = s;
2747 cfg = c; 2793 cfg = c;
2748 2794
2749
2750 requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config 2795 requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2751 requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config 2796 requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2752 connected_peers = GNUNET_CONTAINER_multihashmap_create (64); 2797 connected_peers = GNUNET_CONTAINER_multihashmap_create (64);