aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2015-05-07 12:15:58 +0000
committerGabor X Toth <*@tg-x.net>2015-05-07 12:15:58 +0000
commit4725d59b468f1f30ba2910992333ca157682ce29 (patch)
tree23715ee20879c94a3363e28ea184370a4a71e44d /src
parenta5edf8ac9f03a368c87ea6163994d4ac3d62af06 (diff)
downloadgnunet-4725d59b468f1f30ba2910992333ca157682ce29.tar.gz
gnunet-4725d59b468f1f30ba2910992333ca157682ce29.zip
psyc/social: request history & state from psycstore; more documentation, tests, cleanup
Diffstat (limited to 'src')
-rw-r--r--src/include/gnunet_protocols.h2
-rw-r--r--src/include/gnunet_psyc_service.h247
-rw-r--r--src/include/gnunet_psyc_util_lib.h28
-rw-r--r--src/include/gnunet_psycstore_service.h35
-rw-r--r--src/include/gnunet_social_service.h141
-rw-r--r--src/psyc/gnunet-service-psyc.c355
-rw-r--r--src/psyc/psyc.h36
-rw-r--r--src/psyc/psyc_api.c532
-rw-r--r--src/psyc/psyc_util_lib.c38
-rw-r--r--src/psyc/test_psyc.c352
-rw-r--r--src/psycstore/gnunet-service-psycstore.c28
-rw-r--r--src/psycstore/plugin_psycstore_sqlite.c4
-rw-r--r--src/psycstore/psycstore_api.c53
-rw-r--r--src/psycstore/test_psycstore.c44
-rw-r--r--src/social/gnunet-service-social.c195
-rw-r--r--src/social/social_api.c531
-rw-r--r--src/social/test_social.c118
17 files changed, 1946 insertions, 793 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index cdbc46f7f..2c73eadd6 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2222,7 +2222,7 @@ extern "C"
2222 2222
2223/* 700 */ 2223/* 700 */
2224 2224
2225/** C->S: client requests channel history from PSYCstore. */ 2225/** C->S: request channel history replay from PSYCstore. */
2226#define GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY 701 2226#define GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY 701
2227 2227
2228/** S->C: result for a channel history request */ 2228/** S->C: result for a channel history request */
diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h
index 5239acb16..c7692f074 100644
--- a/src/include/gnunet_psyc_service.h
+++ b/src/include/gnunet_psyc_service.h
@@ -376,6 +376,106 @@ struct GNUNET_PSYC_JoinDecisionMessage
376 /* Followed by struct GNUNET_MessageHeader join_response */ 376 /* Followed by struct GNUNET_MessageHeader join_response */
377}; 377};
378 378
379
380enum GNUNET_PSYC_HistoryReplayFlags
381{
382 /**
383 * Replay locally available messages.
384 */
385 GNUNET_PSYC_HISTORY_REPLAY_LOCAL = 0,
386
387 /**
388 * Replay messages from remote peers if not found locally.
389 */
390 GNUNET_PSYC_HISTORY_REPLAY_REMOTE = 1,
391};
392
393
394struct GNUNET_PSYC_HistoryRequestMessage
395{
396 /**
397 * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_HISTORY_REPLAY
398 */
399 struct GNUNET_MessageHeader header;
400
401 /**
402 * @see enum GNUNET_PSYC_HistoryReplayFlags
403 */
404 uint32_t flags GNUNET_PACKED;
405
406 /**
407 * ID for this operation.
408 */
409 uint64_t op_id GNUNET_PACKED;
410
411 uint64_t start_message_id GNUNET_PACKED;
412
413 uint64_t end_message_id GNUNET_PACKED;
414
415 uint64_t message_limit GNUNET_PACKED;
416
417 /* Followed by NUL-terminated method name prefix. */
418};
419
420
421struct GNUNET_PSYC_StateRequestMessage
422{
423 /**
424 * Types:
425 * - GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_GET
426 * - GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_GET_PREFIX
427 */
428 struct GNUNET_MessageHeader header;
429
430 uint32_t reserved GNUNET_PACKED;
431
432 /**
433 * ID for this operation.
434 */
435 uint64_t op_id GNUNET_PACKED;
436
437 /* Followed by NUL-terminated name. */
438};
439
440
441/**** service -> library ****/
442
443
444/**
445 * Answer from service to client about last operation.
446 */
447struct GNUNET_PSYC_OperationResultMessage
448{
449 /**
450 * Types:
451 * - GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE
452 * - GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_RESULT
453 */
454 struct GNUNET_MessageHeader header;
455
456 uint32_t reserved GNUNET_PACKED;
457
458 /**
459 * Operation ID.
460 */
461 uint64_t op_id GNUNET_PACKED;
462
463 /**
464 * Status code for the operation.
465 */
466 uint64_t result_code GNUNET_PACKED;
467
468 /* Followed by:
469 * - on error: NUL-terminated error message
470 * - on success: one of the following message types
471 *
472 * For a STATE_RESULT, one of:
473 * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER
474 * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT
475 * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END
476 */
477};
478
379GNUNET_NETWORK_STRUCT_END 479GNUNET_NETWORK_STRUCT_END
380 480
381 481
@@ -907,23 +1007,6 @@ struct GNUNET_PSYC_Channel;
907 1007
908 1008
909/** 1009/**
910 * Function called with the result of an asynchronous operation.
911 *
912 * @param cls
913 * Closure.
914 * @param result
915 * Result of the operation.
916 * Usually one of #GNUNET_OK, #GNUNET_YES, #GNUNET_NO, or #GNUNET_SYSERR.
917 * @param err_msg
918 * Error message.
919 */
920typedef void
921(*GNUNET_PSYC_ResultCallback) (void *cls,
922 int64_t result,
923 const char *err_msg);
924
925
926/**
927 * Convert a channel @a master to a @e channel handle to access the @e channel 1010 * Convert a channel @a master to a @e channel handle to access the @e channel
928 * APIs. 1011 * APIs.
929 * 1012 *
@@ -960,17 +1043,28 @@ GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave);
960 * correctly; not doing so correctly will result in either denying other slaves 1043 * correctly; not doing so correctly will result in either denying other slaves
961 * access or offering access to channel data to non-members. 1044 * access or offering access to channel data to non-members.
962 * 1045 *
963 * @param channel Channel handle. 1046 * @param channel
964 * @param slave_key Identity of channel slave to add. 1047 * Channel handle.
965 * @param announced_at ID of the message that announced the membership change. 1048 * @param slave_key
966 * @param effective_since Addition of slave is in effect since this message ID. 1049 * Identity of channel slave to add.
1050 * @param announced_at
1051 * ID of the message that announced the membership change.
1052 * @param effective_since
1053 * Addition of slave is in effect since this message ID.
1054 * @param result_cb
1055 * Function to call with the result of the operation.
1056 * The @e result_code argument is #GNUNET_OK on success, or
1057 * #GNUNET_SYSERR on error. In case of an error, the @e data argument
1058 * can contain an optional error message.
1059 * @param cls
1060 * Closure for @a result_cb.
967 */ 1061 */
968void 1062void
969GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, 1063GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
970 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 1064 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
971 uint64_t announced_at, 1065 uint64_t announced_at,
972 uint64_t effective_since, 1066 uint64_t effective_since,
973 GNUNET_PSYC_ResultCallback result_cb, 1067 GNUNET_ResultCallback result_cb,
974 void *cls); 1068 void *cls);
975 1069
976 1070
@@ -991,33 +1085,33 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
991 * denying members access or offering access to channel data to 1085 * denying members access or offering access to channel data to
992 * non-members. 1086 * non-members.
993 * 1087 *
994 * @param channel Channel handle. 1088 * @param channel
995 * @param slave_key Identity of channel slave to remove. 1089 * Channel handle.
996 * @param announced_at ID of the message that announced the membership change. 1090 * @param slave_key
1091 * Identity of channel slave to remove.
1092 * @param announced_at
1093 * ID of the message that announced the membership change.
1094 * @param result_cb
1095 * Function to call with the result of the operation.
1096 * The @e result_code argument is #GNUNET_OK on success, or
1097 * #GNUNET_SYSERR on error. In case of an error, the @e data argument
1098 * can contain an optional error message.
1099 * @param cls
1100 * Closure for @a result_cb.
997 */ 1101 */
998void 1102void
999GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, 1103GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
1000 const struct GNUNET_CRYPTO_EcdsaPublicKey 1104 const struct GNUNET_CRYPTO_EcdsaPublicKey
1001 *slave_key, 1105 *slave_key,
1002 uint64_t announced_at, 1106 uint64_t announced_at,
1003 GNUNET_PSYC_ResultCallback result_cb, 1107 GNUNET_ResultCallback result_cb,
1004 void *cls); 1108 void *cls);
1005 1109
1006 1110
1007/** 1111/**
1008 * Function called to inform a member about stored state values for a channel. 1112 * History request handle.
1009 *
1010 * @param cls Closure.
1011 * @param name Name of the state variable. A NULL value indicates that there
1012 * are no more state variables to be returned.
1013 * @param value Value of the state variable.
1014 * @param value_size Number of bytes in @a value.
1015 */ 1113 */
1016typedef void 1114struct GNUNET_PSYC_HistoryRequest;
1017(*GNUNET_PSYC_StateVarCallback) (void *cls,
1018 const char *name,
1019 const void *value,
1020 size_t value_size);
1021 1115
1022 1116
1023/** 1117/**
@@ -1032,22 +1126,28 @@ typedef void
1032 * Earliest interesting point in history. 1126 * Earliest interesting point in history.
1033 * @param end_message_id 1127 * @param end_message_id
1034 * Last (inclusive) interesting point in history. 1128 * Last (inclusive) interesting point in history.
1035 * @param finish_cb 1129 * @param method_prefix
1036 * Function to call when the requested history has been fully replayed 1130 * Retrieve only messages with a matching method prefix.
1037 * (counting message IDs might not suffice, as some messages might be 1131 * @param flags
1038 * secret and thus the listener would not know the story is finished 1132 * OR'ed enum GNUNET_PSYC_HistoryReplayFlags
1039 * without being told explicitly)o once this function has been called, the 1133 * @param result_cb
1040 * client must not call GNUNET_PSYC_channel_history_replay_cancel() anymore. 1134 * Function to call when the requested history has been fully replayed.
1135 * Once this function has been called, the client must not call
1136 * GNUNET_PSYC_channel_history_replay_cancel() anymore.
1041 * @param cls 1137 * @param cls
1042 * Closure for the callbacks. 1138 * Closure for the callbacks.
1043 * 1139 *
1044 * @return Handle to cancel history replay operation. 1140 * @return Handle to cancel history replay operation.
1045 */ 1141 */
1046void 1142struct GNUNET_PSYC_HistoryRequest *
1047GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *channel, 1143GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *channel,
1048 uint64_t start_message_id, 1144 uint64_t start_message_id,
1049 uint64_t end_message_id, 1145 uint64_t end_message_id,
1050 GNUNET_PSYC_ResultCallback finish_cb, 1146 const char *method_prefix,
1147 uint32_t flags,
1148 GNUNET_PSYC_MessageCallback message_cb,
1149 GNUNET_PSYC_MessagePartCallback message_part_cb,
1150 GNUNET_ResultCallback result_cb,
1051 void *cls); 1151 void *cls);
1052 1152
1053 1153
@@ -1061,6 +1161,8 @@ GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *channel,
1061 * Which channel should be replayed? 1161 * Which channel should be replayed?
1062 * @param message_limit 1162 * @param message_limit
1063 * Maximum number of messages to replay. 1163 * Maximum number of messages to replay.
1164 * @param flags
1165 * OR'ed enum GNUNET_PSYC_HistoryReplayFlags
1064 * @param finish_cb 1166 * @param finish_cb
1065 * Function to call when the requested history has been fully replayed 1167 * Function to call when the requested history has been fully replayed
1066 * (counting message IDs might not suffice, as some messages might be 1168 * (counting message IDs might not suffice, as some messages might be
@@ -1072,13 +1174,44 @@ GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *channel,
1072 * 1174 *
1073 * @return Handle to cancel history replay operation. 1175 * @return Handle to cancel history replay operation.
1074 */ 1176 */
1075void 1177struct GNUNET_PSYC_HistoryRequest *
1076GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *channel, 1178GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *channel,
1077 uint64_t message_limit, 1179 uint64_t message_limit,
1078 GNUNET_PSYC_ResultCallback finish_cb, 1180 const char *method_prefix,
1181 uint32_t flags,
1182 GNUNET_PSYC_MessageCallback message_cb,
1183 GNUNET_PSYC_MessagePartCallback message_part_cb,
1184 GNUNET_ResultCallback result_cb,
1079 void *cls); 1185 void *cls);
1080 1186
1081 1187
1188void
1189GNUNET_PSYC_channel_history_replay_cancel (struct GNUNET_PSYC_Channel *channel,
1190 struct GNUNET_PSYC_HistoryRequest *hr);
1191
1192
1193/**
1194 * Function called to inform a member about stored state values for a channel.
1195 *
1196 * @param cls Closure.
1197 * @param name Name of the state variable. A NULL value indicates that there
1198 * are no more state variables to be returned.
1199 * @param value Value of the state variable.
1200 * @param value_size Number of bytes in @a value.
1201 */
1202typedef void
1203(*GNUNET_PSYC_StateVarCallback) (void *cls,
1204 const char *name,
1205 const void *value,
1206 size_t value_size);
1207
1208
1209/**
1210 * State request handle.
1211 */
1212struct GNUNET_PSYC_StateRequest;
1213
1214
1082/** 1215/**
1083 * Retrieve the best matching channel state variable. 1216 * Retrieve the best matching channel state variable.
1084 * 1217 *
@@ -1100,11 +1233,11 @@ GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *channel,
1100 * @param cls 1233 * @param cls
1101 * Closure for the callbacks. 1234 * Closure for the callbacks.
1102 */ 1235 */
1103void 1236struct GNUNET_PSYC_StateRequest *
1104GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel, 1237GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel,
1105 const char *full_name, 1238 const char *full_name,
1106 GNUNET_PSYC_StateVarCallback var_cb, 1239 GNUNET_PSYC_StateVarCallback var_cb,
1107 GNUNET_PSYC_ResultCallback result_cb, 1240 GNUNET_ResultCallback result_cb,
1108 void *cls); 1241 void *cls);
1109 1242
1110 1243
@@ -1131,13 +1264,23 @@ GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel,
1131 * @param cls 1264 * @param cls
1132 * Closure for the callbacks. 1265 * Closure for the callbacks.
1133 */ 1266 */
1134void 1267struct GNUNET_PSYC_StateRequest *
1135GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *channel, 1268GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *channel,
1136 const char *name_prefix, 1269 const char *name_prefix,
1137 GNUNET_PSYC_StateVarCallback var_cb, 1270 GNUNET_PSYC_StateVarCallback var_cb,
1138 GNUNET_PSYC_ResultCallback result_cb, 1271 GNUNET_ResultCallback result_cb,
1139 void *cls); 1272 void *cls);
1140 1273
1274/**
1275 * Cancel a state request operation.
1276 *
1277 * @param sr
1278 * Handle for the operation to cancel.
1279 */
1280void
1281GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateRequest *sr);
1282
1283
1141 1284
1142#if 0 /* keep Emacsens' auto-indent happy */ 1285#if 0 /* keep Emacsens' auto-indent happy */
1143{ 1286{
diff --git a/src/include/gnunet_psyc_util_lib.h b/src/include/gnunet_psyc_util_lib.h
index 488edc03b..3ec9fe1bf 100644
--- a/src/include/gnunet_psyc_util_lib.h
+++ b/src/include/gnunet_psyc_util_lib.h
@@ -115,16 +115,24 @@ GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit);
115/** 115/**
116 * Transmit a message. 116 * Transmit a message.
117 * 117 *
118 * @param tmit Transmission handle. 118 * @param tmit
119 * @param method_name Which method should be invoked. 119 * Transmission handle.
120 * @param env Environment for the message. 120 * @param method_name
121 * Should stay available until the first call to notify_data. 121 * Which method should be invoked.
122 * Can be NULL if there are no modifiers or @a notify_mod is provided instead. 122 * @param env
123 * @param notify_mod Function to call to obtain modifiers. 123 * Environment for the message.
124 * Can be NULL if there are no modifiers or @a env is provided instead. 124 * Should stay available until the first call to notify_data.
125 * @param notify_data Function to call to obtain fragments of the data. 125 * Can be NULL if there are no modifiers or @a notify_mod is
126 * @param notify_cls Closure for @a notify_mod and @a notify_data. 126 * provided instead.
127 * @param flags Flags for the message being transmitted. 127 * @param notify_mod
128 * Function to call to obtain modifiers.
129 * Can be NULL if there are no modifiers or @a env is provided instead.
130 * @param notify_data
131 * Function to call to obtain fragments of the data.
132 * @param notify_cls
133 * Closure for @a notify_mod and @a notify_data.
134 * @param flags
135 * Flags for the message being transmitted.
128 * 136 *
129 * @return #GNUNET_OK if the transmission was started. 137 * @return #GNUNET_OK if the transmission was started.
130 * #GNUNET_SYSERR if another transmission is already going on. 138 * #GNUNET_SYSERR if another transmission is already going on.
diff --git a/src/include/gnunet_psycstore_service.h b/src/include/gnunet_psycstore_service.h
index 2cb155f69..40acdae61 100644
--- a/src/include/gnunet_psycstore_service.h
+++ b/src/include/gnunet_psycstore_service.h
@@ -107,15 +107,20 @@ struct GNUNET_PSYCSTORE_OperationHandle;
107/** 107/**
108 * Function called with the result of an asynchronous operation. 108 * Function called with the result of an asynchronous operation.
109 * 109 *
110 * @param cls
111 * Closure.
110 * @param result 112 * @param result
111 * #GNUNET_YES on success or if the peer was a member, 113 * Result of the operation.
112 * #GNUNET_NO if the peer was not a member, 114 * @param err_msg
113 * #GNUNET_SYSERR on error, 115 * Error message, or NULL if there's no error.
116 * @param err_msg_size
117 * Size of @a err_msg
114 */ 118 */
115typedef void 119typedef void
116(*GNUNET_PSYCSTORE_ResultCallback) (void *cls, 120(*GNUNET_PSYCSTORE_ResultCallback) (void *cls,
117 int64_t result, 121 int64_t result,
118 const char *err_msg); 122 const char *err_msg,
123 uint16_t err_msg_size);
119 124
120 125
121/** 126/**
@@ -318,15 +323,15 @@ GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
318 * @param channel_key 323 * @param channel_key
319 * The channel we are interested in. 324 * The channel we are interested in.
320 * @param slave_key 325 * @param slave_key
321 * The slave requesting the message. If not NULL, a membership test is 326 * The slave requesting the message.
322 * performed first and the message is only returned if the slave has 327 * If not NULL, a membership test is performed first
323 * access to it. 328 * and the message is only returned if the slave has access to it.
324 * @param first_message_id 329 * @param first_message_id
325 * First message ID to retrieve. 330 * First message ID to retrieve.
326 * Use 0 to get the latest message.
327 * @param last_message_id 331 * @param last_message_id
328 * Last consecutive message ID to retrieve. 332 * Last consecutive message ID to retrieve.
329 * Use 0 to get the latest message. 333 * @param method_prefix
334 * Retrieve only messages with a matching method prefix.
330 * @param fragment_cb 335 * @param fragment_cb
331 * Callback to call with the retrieved fragments. 336 * Callback to call with the retrieved fragments.
332 * @param result_cb 337 * @param result_cb
@@ -342,6 +347,7 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
342 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 347 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
343 uint64_t first_message_id, 348 uint64_t first_message_id,
344 uint64_t last_message_id, 349 uint64_t last_message_id,
350 const char *method_prefix,
345 GNUNET_PSYCSTORE_FragmentCallback fragment_cb, 351 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
346 GNUNET_PSYCSTORE_ResultCallback result_cb, 352 GNUNET_PSYCSTORE_ResultCallback result_cb,
347 void *cls); 353 void *cls);
@@ -355,14 +361,16 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
355 * @param channel_key 361 * @param channel_key
356 * The channel we are interested in. 362 * The channel we are interested in.
357 * @param slave_key 363 * @param slave_key
358 * The slave requesting the message. If not NULL, a membership test is 364 * The slave requesting the message.
359 * performed first and the message is only returned if the slave has 365 * If not NULL, a membership test is performed first
360 * access to it. 366 * and the message is only returned if the slave has access to it.
361 * @param message_limit 367 * @param message_limit
362 * Maximum number of messages to retrieve. 368 * Maximum number of messages to retrieve.
369 * @param method_prefix
370 * Retrieve only messages with a matching method prefix.
363 * @param fragment_cb 371 * @param fragment_cb
364 * Callback to call with the retrieved fragments. 372 * Callback to call with the retrieved fragments.
365 * @param rcb 373 * @param result_cb
366 * Callback to call with the result of the operation. 374 * Callback to call with the result of the operation.
367 * @param cls 375 * @param cls
368 * Closure for the callbacks. 376 * Closure for the callbacks.
@@ -374,6 +382,7 @@ GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
374 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 382 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
375 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 383 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
376 uint64_t message_limit, 384 uint64_t message_limit,
385 const char *method_prefix,
377 GNUNET_PSYCSTORE_FragmentCallback fragment_cb, 386 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
378 GNUNET_PSYCSTORE_ResultCallback rcb, 387 GNUNET_PSYCSTORE_ResultCallback rcb,
379 void *cls); 388 void *cls);
diff --git a/src/include/gnunet_social_service.h b/src/include/gnunet_social_service.h
index fcfc8a359..a17446cf5 100644
--- a/src/include/gnunet_social_service.h
+++ b/src/include/gnunet_social_service.h
@@ -749,9 +749,45 @@ GNUNET_SOCIAL_guest_get_place (struct GNUNET_SOCIAL_Guest *guest);
749 749
750 750
751/** 751/**
752 * A history lesson. 752 * A history request.
753 */ 753 */
754struct GNUNET_SOCIAL_HistoryLesson; 754struct GNUNET_SOCIAL_HistoryRequest;
755
756
757/**
758 * Learn about the history of a place.
759 *
760 * Messages are returned through the @a slicer function
761 * and have the #GNUNET_PSYC_MESSAGE_HISTORIC flag set.
762 *
763 * @param place
764 * Place we want to learn more about.
765 * @param start_message_id
766 * First historic message we are interested in.
767 * @param end_message_id
768 * Last historic message we are interested in (inclusive).
769 * @param method_prefix
770 * Only retrieve messages with this method prefix.
771 * @param flags
772 * OR'ed GNUNET_PSYC_HistoryReplayFlags
773 * @param slicer
774 * Slicer to use for retrieved messages.
775 * Can be the same as the slicer of the place.
776 * @param result_cb
777 * Function called after all messages retrieved.
778 * NULL if not needed.
779 * @param cls Closure for @a result_cb.
780 */
781struct GNUNET_SOCIAL_HistoryRequest *
782GNUNET_SOCIAL_place_history_replay (struct GNUNET_SOCIAL_Place *plc,
783 uint64_t start_message_id,
784 uint64_t end_message_id,
785 const char *method_prefix,
786 uint32_t flags,
787 struct GNUNET_SOCIAL_Slicer *slicer,
788 GNUNET_ResultCallback result_cb,
789 void *cls);
790
755 791
756/** 792/**
757 * Learn about the history of a place. 793 * Learn about the history of a place.
@@ -762,36 +798,32 @@ struct GNUNET_SOCIAL_HistoryLesson;
762 * 798 *
763 * To get the latest message, use 0 for both the start and end message ID. 799 * To get the latest message, use 0 for both the start and end message ID.
764 * 800 *
765 * @param place Place we want to learn more about. 801 * @param place
766 * @param start_message_id First historic message we are interested in. 802 * Place we want to learn more about.
767 * @param end_message_id Last historic message we are interested in (inclusive). 803 * @param message_limit
768 * @param slicer Slicer to use to process history. Can be the same as the 804 * Maximum number of historic messages we are interested in.
769 * slicer of the place, as the HISTORIC flag allows distinguishing 805 * @param result_cb
770 * old messages from fresh ones. 806 * Function called after all messages retrieved.
771 * @param finish_cb Function called after the last message in the history lesson 807 * NULL if not needed.
772 * is passed through the @a slicer. NULL if not needed. 808 * @param cls Closure for @a result_cb.
773 * @param finish_cb_cls Closure for @a finish_cb.
774 * @return Handle to abort history lesson, never NULL (multiple lessons
775 * at the same time are allowed).
776 */ 809 */
777struct GNUNET_SOCIAL_HistoryLesson * 810struct GNUNET_SOCIAL_HistoryRequest *
778GNUNET_SOCIAL_place_get_history (struct GNUNET_SOCIAL_Place *place, 811GNUNET_SOCIAL_place_history_replay_latest (struct GNUNET_SOCIAL_Place *plc,
779 uint64_t start_message_id, 812 uint64_t message_limit,
780 uint64_t end_message_id, 813 const char *method_prefix,
781 const struct GNUNET_SOCIAL_Slicer *slicer, 814 uint32_t flags,
782 void (*finish_cb)(void *), 815 struct GNUNET_SOCIAL_Slicer *slicer,
783 void *finish_cb_cls); 816 GNUNET_ResultCallback result_cb,
784 817 void *cls);
785 818
786/** 819/**
787 * Stop processing messages from the history lesson. 820 * Cancel learning about the history of a place.
788 *
789 * Must not be called after the finish callback of the history lesson is called.
790 * 821 *
791 * @param hist History lesson to cancel. 822 * @param hist
823 * History lesson to cancel.
792 */ 824 */
793void 825void
794GNUNET_SOCIAL_place_get_history_cancel (struct GNUNET_SOCIAL_HistoryLesson *hist); 826GNUNET_SOCIAL_place_history_replay_cancel (struct GNUNET_SOCIAL_HistoryRequest *hist);
795 827
796 828
797struct GNUNET_SOCIAL_WatchHandle; 829struct GNUNET_SOCIAL_WatchHandle;
@@ -803,7 +835,7 @@ struct GNUNET_SOCIAL_WatchHandle;
803 * Place to watch. 835 * Place to watch.
804 * @param object_filter 836 * @param object_filter
805 * Object prefix to match. 837 * Object prefix to match.
806 * @param state_var_cb 838 * @param var_cb
807 * Function to call when an object/state var changes. 839 * Function to call when an object/state var changes.
808 * @param cls 840 * @param cls
809 * Closure for callback. 841 * Closure for callback.
@@ -813,7 +845,7 @@ struct GNUNET_SOCIAL_WatchHandle;
813struct GNUNET_SOCIAL_WatchHandle * 845struct GNUNET_SOCIAL_WatchHandle *
814GNUNET_SOCIAL_place_watch (struct GNUNET_SOCIAL_Place *place, 846GNUNET_SOCIAL_place_watch (struct GNUNET_SOCIAL_Place *place,
815 const char *object_filter, 847 const char *object_filter,
816 GNUNET_PSYC_StateVarCallback state_var_cb, 848 GNUNET_PSYC_StateVarCallback var_cb,
817 void *cls); 849 void *cls);
818 850
819 851
@@ -830,13 +862,35 @@ struct GNUNET_SOCIAL_LookHandle;
830 862
831 863
832/** 864/**
833 * Look at objects in the place with a matching name prefix. 865 * Look at a particular object in the place.
866 *
867 * The best matching object is returned (its name might be less specific than
868 * what was requested).
869 *
870 * @param place
871 * The place to look the object at.
872 * @param full_name
873 * Full name of the object.
874 * @param value_size
875 * Set to the size of the returned value.
876 *
877 * @return NULL if there is no such object at this place.
878 */
879struct GNUNET_SOCIAL_LookHandle *
880GNUNET_SOCIAL_place_look_at (struct GNUNET_SOCIAL_Place *plc,
881 const char *full_name,
882 GNUNET_PSYC_StateVarCallback var_cb,
883 GNUNET_ResultCallback result_cb,
884 void *cls);
885
886/**
887 * Look for objects in the place with a matching name prefix.
834 * 888 *
835 * @param place 889 * @param place
836 * The place to look its objects at. 890 * The place to look its objects at.
837 * @param name_prefix 891 * @param name_prefix
838 * Look at objects with names beginning with this value. 892 * Look at objects with names beginning with this value.
839 * @param state_var_cb 893 * @param var_cb
840 * Function to call for each object found. 894 * Function to call for each object found.
841 * @param cls 895 * @param cls
842 * Closure for callback function. 896 * Closure for callback function.
@@ -844,10 +898,11 @@ struct GNUNET_SOCIAL_LookHandle;
844 * @return Handle that can be used to stop looking at objects. 898 * @return Handle that can be used to stop looking at objects.
845 */ 899 */
846struct GNUNET_SOCIAL_LookHandle * 900struct GNUNET_SOCIAL_LookHandle *
847GNUNET_SOCIAL_place_look (struct GNUNET_SOCIAL_Place *place, 901GNUNET_SOCIAL_place_look_for (struct GNUNET_SOCIAL_Place *plc,
848 const char *name_prefix, 902 const char *name_prefix,
849 GNUNET_PSYC_StateVarCallback state_var_cb, 903 GNUNET_PSYC_StateVarCallback var_cb,
850 void *cls); 904 GNUNET_ResultCallback result_cb,
905 void *cls);
851 906
852 907
853/** 908/**
@@ -859,24 +914,6 @@ void
859GNUNET_SOCIAL_place_look_cancel (struct GNUNET_SOCIAL_LookHandle *lh); 914GNUNET_SOCIAL_place_look_cancel (struct GNUNET_SOCIAL_LookHandle *lh);
860 915
861 916
862
863/**
864 * Look at a particular object in the place.
865 *
866 * The best matching object is returned (its name might be less specific than
867 * what was requested).
868 *
869 * @param place The place to look the object at.
870 * @param full_name Full name of the object.
871 * @param value_size Set to the size of the returned value.
872 * @return NULL if there is no such object at this place.
873 */
874const void *
875GNUNET_SOCIAL_place_look_at (struct GNUNET_SOCIAL_Place *place,
876 const char *full_name,
877 size_t *value_size);
878
879
880#if 0 /* keep Emacsens' auto-indent happy */ 917#if 0 /* keep Emacsens' auto-indent happy */
881{ 918{
882#endif 919#endif
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 5ebbe6444..2bc128c4f 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -181,11 +181,24 @@ struct FragmentQueue
181/** 181/**
182 * List of connected clients. 182 * List of connected clients.
183 */ 183 */
184struct ClientListItem 184struct Client
185{ 185{
186 struct ClientListItem *prev; 186 struct Client *prev;
187 struct ClientListItem *next; 187 struct Client *next;
188
189 struct GNUNET_SERVER_Client *client;
190};
191
192
193struct Operation
194{
195 struct Operation *prev;
196 struct Operation *next;
197
188 struct GNUNET_SERVER_Client *client; 198 struct GNUNET_SERVER_Client *client;
199 struct Channel *chn;
200 uint64_t op_id;
201 uint32_t flags;
189}; 202};
190 203
191 204
@@ -194,8 +207,11 @@ struct ClientListItem
194 */ 207 */
195struct Channel 208struct Channel
196{ 209{
197 struct ClientListItem *clients_head; 210 struct Client *clients_head;
198 struct ClientListItem *clients_tail; 211 struct Client *clients_tail;
212
213 struct Operation *op_head;
214 struct Operation *op_tail;
199 215
200 struct TransmitMessage *tmit_head; 216 struct TransmitMessage *tmit_head;
201 struct TransmitMessage *tmit_tail; 217 struct TransmitMessage *tmit_tail;
@@ -397,14 +413,6 @@ struct Slave
397}; 413};
398 414
399 415
400struct OperationClosure
401{
402 struct GNUNET_SERVER_Client *client;
403 struct Channel *chn;
404 uint64_t op_id;
405};
406
407
408static void 416static void
409transmit_message (struct Channel *chn); 417transmit_message (struct Channel *chn);
410 418
@@ -435,6 +443,28 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
435} 443}
436 444
437 445
446static struct Operation *
447op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
448 uint64_t op_id, uint32_t flags)
449{
450 struct Operation *op = GNUNET_malloc (sizeof (*op));
451 op->client = client;
452 op->chn = chn;
453 op->op_id = op_id;
454 op->flags = flags;
455 GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
456 return op;
457}
458
459
460static void
461op_remove (struct Channel *chn, struct Operation *op)
462{
463 GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op);
464 GNUNET_free (op);
465}
466
467
438/** 468/**
439 * Clean up master data structures after a client disconnected. 469 * Clean up master data structures after a client disconnected.
440 */ 470 */
@@ -541,7 +571,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
541 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", 571 chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
542 GNUNET_h2s (&chn->pub_key_hash)); 572 GNUNET_h2s (&chn->pub_key_hash));
543 573
544 struct ClientListItem *cli = chn->clients_head; 574 struct Client *cli = chn->clients_head;
545 while (NULL != cli) 575 while (NULL != cli)
546 { 576 {
547 if (cli->client == client) 577 if (cli->client == client)
@@ -553,6 +583,17 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
553 cli = cli->next; 583 cli = cli->next;
554 } 584 }
555 585
586 struct Operation *op = chn->op_head;
587 while (NULL != op)
588 {
589 if (op->client == client)
590 {
591 op->client = NULL;
592 break;
593 }
594 op = op->next;
595 }
596
556 if (NULL == chn->clients_head) 597 if (NULL == chn->clients_head)
557 { /* Last client disconnected. */ 598 { /* Last client disconnected. */
558 if (NULL != chn->tmit_head) 599 if (NULL != chn->tmit_head)
@@ -574,10 +615,10 @@ static void
574client_send_msg (const struct Channel *chn, 615client_send_msg (const struct Channel *chn,
575 const struct GNUNET_MessageHeader *msg) 616 const struct GNUNET_MessageHeader *msg)
576{ 617{
577 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 618 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
578 "%p Sending message to clients.\n", chn); 619 "%p Sending message to clients.\n", chn);
579 620
580 struct ClientListItem *cli = chn->clients_head; 621 struct Client *cli = chn->clients_head;
581 while (NULL != cli) 622 while (NULL != cli)
582 { 623 {
583 GNUNET_SERVER_notification_context_add (nc, cli->client); 624 GNUNET_SERVER_notification_context_add (nc, cli->client);
@@ -596,33 +637,29 @@ client_send_msg (const struct Channel *chn,
596 * Code to transmit. 637 * Code to transmit.
597 * @param op_id 638 * @param op_id
598 * Operation ID in network byte order. 639 * Operation ID in network byte order.
599 * @param err_msg 640 * @param data
600 * Error message to include (or NULL for none). 641 * Data payload or NULL.
642 * @param data_size
643 * Size of @a data.
601 */ 644 */
602static void 645static void
603client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id, 646client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
604 int64_t result_code, const char *err_msg) 647 int64_t result_code, const void *data, uint16_t data_size)
605{ 648{
606 struct OperationResult *res; 649 struct GNUNET_OperationResultMessage *res;
607 size_t err_size = 0;
608 650
609 if (NULL != err_msg) 651 res = GNUNET_malloc (sizeof (*res) + data_size);
610 err_size = strnlen (err_msg,
611 GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*res)) + 1;
612 res = GNUNET_malloc (sizeof (struct OperationResult) + err_size);
613 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE); 652 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
614 res->header.size = htons (sizeof (struct OperationResult) + err_size); 653 res->header.size = htons (sizeof (*res) + data_size);
615 res->result_code = GNUNET_htonll (result_code + INT64_MAX + 1); 654 res->result_code = GNUNET_htonll_signed (result_code);
616 res->op_id = op_id; 655 res->op_id = op_id;
617 if (0 < err_size) 656 if (0 < data_size)
618 { 657 memcpy (&res[1], data, data_size);
619 memcpy (&res[1], err_msg, err_size); 658
620 ((char *) &res[1])[err_size - 1] = '\0';
621 }
622 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 659 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
623 "%p Sending result to client for operation #%" PRIu64 ": " 660 "%p Sending result to client for operation #%" PRIu64 ": "
624 "%" PRId64 " (%s)\n", 661 "%" PRId64 " (size: %u)\n",
625 client, GNUNET_ntohll (op_id), result_code, err_msg); 662 client, GNUNET_ntohll (op_id), result_code, data_size);
626 663
627 GNUNET_SERVER_notification_context_add (nc, client); 664 GNUNET_SERVER_notification_context_add (nc, client);
628 GNUNET_SERVER_notification_context_unicast (nc, client, &res->header, 665 GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
@@ -647,7 +684,8 @@ struct JoinMemTestClosure
647 * Membership test result callback used for join requests. 684 * Membership test result callback used for join requests.
648 */ 685 */
649static void 686static void
650join_mem_test_cb (void *cls, int64_t result, const char *err_msg) 687join_mem_test_cb (void *cls, int64_t result,
688 const char *err_msg, uint16_t err_msg_size)
651{ 689{
652 struct JoinMemTestClosure *jcls = cls; 690 struct JoinMemTestClosure *jcls = cls;
653 691
@@ -663,6 +701,12 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
663 } 701 }
664 else 702 else
665 { 703 {
704 if (GNUNET_SYSERR == result)
705 {
706 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
707 "Could not perform membership test (%.*s)\n",
708 err_msg_size, err_msg);
709 }
666 // FIXME: add relays 710 // FIXME: add relays
667 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL); 711 GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
668 } 712 }
@@ -759,12 +803,13 @@ mcast_recv_join_decision (void *cls, int is_admitted,
759 * Received result of GNUNET_PSYCSTORE_membership_test() 803 * Received result of GNUNET_PSYCSTORE_membership_test()
760 */ 804 */
761static void 805static void
762store_recv_membership_test_result (void *cls, int64_t result, const char *err_msg) 806store_recv_membership_test_result (void *cls, int64_t result,
807 const char *err_msg, uint16_t err_msg_size)
763{ 808{
764 struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls; 809 struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
765 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 810 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
766 "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%s)\n", 811 "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%.*s)\n",
767 mth, result, err_msg); 812 mth, result, err_msg_size, err_msg);
768 813
769 GNUNET_MULTICAST_membership_test_result (mth, result); 814 GNUNET_MULTICAST_membership_test_result (mth, result);
770} 815}
@@ -805,12 +850,13 @@ store_recv_fragment_replay (void *cls,
805 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay. 850 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
806 */ 851 */
807static void 852static void
808store_recv_fragment_replay_result (void *cls, int64_t result, const char *err_msg) 853store_recv_fragment_replay_result (void *cls, int64_t result,
854 const char *err_msg, uint16_t err_msg_size)
809{ 855{
810 struct GNUNET_MULTICAST_ReplayHandle *rh = cls; 856 struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
811 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 857 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%s)\n", 858 "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
813 rh, result, err_msg); 859 rh, result, err_msg_size, err_msg);
814 860
815 switch (result) 861 switch (result)
816 { 862 {
@@ -867,7 +913,7 @@ mcast_recv_replay_message (void *cls,
867{ 913{
868 struct Channel *chn = cls; 914 struct Channel *chn = cls;
869 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key, 915 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
870 message_id, message_id, 916 message_id, message_id, NULL,
871 &store_recv_fragment_replay, 917 &store_recv_fragment_replay,
872 &store_recv_fragment_replay_result, rh); 918 &store_recv_fragment_replay_result, rh);
873} 919}
@@ -911,6 +957,42 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
911 957
912 958
913/** 959/**
960 * Initialize PSYC message header.
961 */
962static inline void
963psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
964 const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
965{
966 uint16_t size = ntohs (mmsg->header.size);
967 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
968
969 pmsg->header.size = htons (psize);
970 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
971 pmsg->message_id = mmsg->message_id;
972 pmsg->fragment_offset = mmsg->fragment_offset;
973 pmsg->flags = htonl (flags);
974
975 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
976}
977
978
979/**
980 * Create a new PSYC message from a multicast message for sending it to clients.
981 */
982static inline struct GNUNET_PSYC_MessageHeader *
983psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t flags)
984{
985 struct GNUNET_PSYC_MessageHeader *pmsg;
986 uint16_t size = ntohs (mmsg->header.size);
987 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
988
989 pmsg = GNUNET_malloc (psize);
990 psyc_msg_init (pmsg, mmsg, flags);
991 return pmsg;
992}
993
994
995/**
914 * Send multicast message to all clients connected to the channel. 996 * Send multicast message to all clients connected to the channel.
915 */ 997 */
916static void 998static void
@@ -918,24 +1000,13 @@ client_send_mcast_msg (struct Channel *chn,
918 const struct GNUNET_MULTICAST_MessageHeader *mmsg, 1000 const struct GNUNET_MULTICAST_MessageHeader *mmsg,
919 uint32_t flags) 1001 uint32_t flags)
920{ 1002{
921 struct GNUNET_PSYC_MessageHeader *pmsg;
922 uint16_t size = ntohs (mmsg->header.size);
923 uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
924
925 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1003 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
926 "%p Sending multicast message to client. " 1004 "%p Sending multicast message to client. "
927 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", 1005 "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
928 chn, GNUNET_ntohll (mmsg->fragment_id), 1006 chn, GNUNET_ntohll (mmsg->fragment_id),
929 GNUNET_ntohll (mmsg->message_id)); 1007 GNUNET_ntohll (mmsg->message_id));
930 1008
931 pmsg = GNUNET_malloc (psize); 1009 struct GNUNET_PSYC_MessageHeader *pmsg = psyc_msg_new (mmsg, flags);
932 pmsg->header.size = htons (psize);
933 pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
934 pmsg->message_id = mmsg->message_id;
935 pmsg->fragment_offset = mmsg->fragment_offset;
936 pmsg->flags = htonl (flags);
937
938 memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
939 client_send_msg (chn, &pmsg->header); 1010 client_send_msg (chn, &pmsg->header);
940 GNUNET_free (pmsg); 1011 GNUNET_free (pmsg);
941} 1012}
@@ -1327,12 +1398,13 @@ message_queue_drop (struct Channel *chn)
1327 * Received result of GNUNET_PSYCSTORE_fragment_store(). 1398 * Received result of GNUNET_PSYCSTORE_fragment_store().
1328 */ 1399 */
1329static void 1400static void
1330store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg) 1401store_recv_fragment_store_result (void *cls, int64_t result,
1402 const char *err_msg, uint16_t err_msg_size)
1331{ 1403{
1332 struct Channel *chn = cls; 1404 struct Channel *chn = cls;
1333 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1405 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1334 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n", 1406 "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%.*s)\n",
1335 chn, result, err_msg); 1407 chn, result, err_msg_size, err_msg);
1336} 1408}
1337 1409
1338 1410
@@ -1430,7 +1502,7 @@ store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
1430 struct GNUNET_PSYC_CountersResultMessage res; 1502 struct GNUNET_PSYC_CountersResultMessage res;
1431 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); 1503 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1432 res.header.size = htons (sizeof (res)); 1504 res.header.size = htons (sizeof (res));
1433 res.result_code = htonl (result - INT32_MIN); 1505 res.result_code = GNUNET_htonl_signed (result);
1434 res.max_message_id = GNUNET_htonll (max_message_id); 1506 res.max_message_id = GNUNET_htonll (max_message_id);
1435 1507
1436 if (GNUNET_OK == result || GNUNET_NO == result) 1508 if (GNUNET_OK == result || GNUNET_NO == result)
@@ -1476,7 +1548,7 @@ store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
1476 struct GNUNET_PSYC_CountersResultMessage res; 1548 struct GNUNET_PSYC_CountersResultMessage res;
1477 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); 1549 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1478 res.header.size = htons (sizeof (res)); 1550 res.header.size = htons (sizeof (res));
1479 res.result_code = htonl (result - INT32_MIN); 1551 res.result_code = GNUNET_htonl_signed (result);
1480 res.max_message_id = GNUNET_htonll (max_message_id); 1552 res.max_message_id = GNUNET_htonll (max_message_id);
1481 1553
1482 if (GNUNET_OK == result || GNUNET_NO == result) 1554 if (GNUNET_OK == result || GNUNET_NO == result)
@@ -1566,7 +1638,7 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1566 struct GNUNET_PSYC_CountersResultMessage res; 1638 struct GNUNET_PSYC_CountersResultMessage res;
1567 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); 1639 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
1568 res.header.size = htons (sizeof (res)); 1640 res.header.size = htons (sizeof (res));
1569 res.result_code = htonl ((uint32_t) GNUNET_OK + INT32_MIN); 1641 res.result_code = GNUNET_htonl_signed (GNUNET_OK);
1570 res.max_message_id = GNUNET_htonll (mst->max_message_id); 1642 res.max_message_id = GNUNET_htonll (mst->max_message_id);
1571 1643
1572 GNUNET_SERVER_notification_context_add (nc, client); 1644 GNUNET_SERVER_notification_context_add (nc, client);
@@ -1578,7 +1650,7 @@ client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
1578 "%p Client connected as master to channel %s.\n", 1650 "%p Client connected as master to channel %s.\n",
1579 mst, GNUNET_h2s (&chn->pub_key_hash)); 1651 mst, GNUNET_h2s (&chn->pub_key_hash));
1580 1652
1581 struct ClientListItem *cli = GNUNET_new (struct ClientListItem); 1653 struct Client *cli = GNUNET_new (struct Client);
1582 cli->client = client; 1654 cli->client = client;
1583 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); 1655 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1584 1656
@@ -1677,7 +1749,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1677 struct GNUNET_PSYC_CountersResultMessage res; 1749 struct GNUNET_PSYC_CountersResultMessage res;
1678 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); 1750 res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
1679 res.header.size = htons (sizeof (res)); 1751 res.header.size = htons (sizeof (res));
1680 res.result_code = htonl ((uint32_t) GNUNET_OK - INT32_MIN); 1752 res.result_code = GNUNET_htonl_signed (GNUNET_OK);
1681 res.max_message_id = GNUNET_htonll (chn->max_message_id); 1753 res.max_message_id = GNUNET_htonll (chn->max_message_id);
1682 1754
1683 GNUNET_SERVER_notification_context_add (nc, client); 1755 GNUNET_SERVER_notification_context_add (nc, client);
@@ -1716,7 +1788,7 @@ client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
1716 "%p Client connected as slave to channel %s.\n", 1788 "%p Client connected as slave to channel %s.\n",
1717 slv, GNUNET_h2s (&chn->pub_key_hash)); 1789 slv, GNUNET_h2s (&chn->pub_key_hash));
1718 1790
1719 struct ClientListItem *cli = GNUNET_new (struct ClientListItem); 1791 struct Client *cli = GNUNET_new (struct Client);
1720 cli->client = client; 1792 cli->client = client;
1721 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); 1793 GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
1722 1794
@@ -2119,14 +2191,15 @@ struct MembershipStoreClosure
2119 * Received result of GNUNET_PSYCSTORE_membership_store() 2191 * Received result of GNUNET_PSYCSTORE_membership_store()
2120 */ 2192 */
2121static void 2193static void
2122store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg) 2194store_recv_membership_store_result (void *cls, int64_t result,
2195 const char *err_msg, uint16_t err_msg_size)
2123{ 2196{
2124 struct MembershipStoreClosure *mcls = cls; 2197 struct MembershipStoreClosure *mcls = cls;
2125 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2198 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2126 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n", 2199 "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
2127 mcls->chn, result, err_msg); 2200 mcls->chn, result, err_msg_size, err_msg);
2128 2201
2129 client_send_result (mcls->client, mcls->op_id, result, err_msg); 2202 client_send_result (mcls->client, mcls->op_id, result, err_msg, err_msg_size);
2130} 2203}
2131 2204
2132 2205
@@ -2165,36 +2238,73 @@ client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
2165} 2238}
2166 2239
2167 2240
2241/**
2242 * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
2243 * in response to a history request from a client.
2244 */
2168static int 2245static int
2169store_recv_fragment_history (void *cls, 2246store_recv_fragment_history (void *cls,
2170 struct GNUNET_MULTICAST_MessageHeader *msg, 2247 struct GNUNET_MULTICAST_MessageHeader *mmsg,
2171 enum GNUNET_PSYCSTORE_MessageFlags flags) 2248 enum GNUNET_PSYCSTORE_MessageFlags flags)
2172{ 2249{
2173 struct OperationClosure *opcls = cls; 2250 struct Operation *op = cls;
2174 struct Channel *chn = opcls->chn; 2251 if (NULL == op->client)
2175 client_send_mcast_msg (chn, msg, GNUNET_PSYC_MESSAGE_HISTORIC); 2252 { /* Requesting client already disconnected. */
2253 return GNUNET_NO;
2254 }
2255 struct Channel *chn = op->chn;
2256
2257 struct GNUNET_PSYC_MessageHeader *pmsg;
2258 uint16_t msize = ntohs (mmsg->header.size);
2259 uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
2260
2261 struct GNUNET_OperationResultMessage *
2262 res = GNUNET_malloc (sizeof (*res) + psize);
2263 res->header.size = htons (sizeof (*res) + psize);
2264 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
2265 res->op_id = op->op_id;
2266 res->result_code = GNUNET_htonll_signed (GNUNET_OK);
2267
2268 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
2269 psyc_msg_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
2270 memcpy (&res[1], pmsg, psize);
2271
2272 /** @todo FIXME: send only to requesting client */
2273 client_send_msg (chn, &res->header);
2176 return GNUNET_YES; 2274 return GNUNET_YES;
2177} 2275}
2178 2276
2179 2277
2180/** 2278/**
2181 * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay. 2279 * Received the result of GNUNET_PSYCSTORE_fragment_get(),
2280 * in response to a history request from a client.
2182 */ 2281 */
2183static void 2282static void
2184store_recv_fragment_history_result (void *cls, int64_t result, const char *err_msg) 2283store_recv_fragment_history_result (void *cls, int64_t result,
2284 const char *err_msg, uint16_t err_msg_size)
2185{ 2285{
2186 struct OperationClosure *opcls = cls; 2286 struct Operation *op = cls;
2287 if (NULL == op->client)
2288 { /* Requesting client already disconnected. */
2289 return;
2290 }
2291
2187 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2292 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2188 "%p History replay #%" PRIu64 ": " 2293 "%p History replay #%" PRIu64 ": "
2189 "PSYCSTORE returned %" PRId64 " (%s)\n", 2294 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2190 opcls->chn, opcls->op_id, result, err_msg); 2295 op->chn, op->op_id, result, err_msg_size, err_msg);
2296
2297 if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
2298 {
2299 /** @todo Multicast replay request for messages not found locally. */
2300 }
2191 2301
2192 client_send_result (opcls->client, opcls->op_id, result, err_msg); 2302 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2193} 2303}
2194 2304
2195 2305
2196/** 2306/**
2197 * Client requests channel history from PSYCstore. 2307 * Client requests channel history.
2198 */ 2308 */
2199static void 2309static void
2200client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client, 2310client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
@@ -2204,26 +2314,39 @@ client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
2204 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); 2314 chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
2205 GNUNET_assert (NULL != chn); 2315 GNUNET_assert (NULL != chn);
2206 2316
2207 const struct HistoryRequest * 2317 const struct GNUNET_PSYC_HistoryRequestMessage *
2208 req = (const struct HistoryRequest *) msg; 2318 req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
2319 uint16_t size = ntohs (msg->size);
2320 const char *method_prefix = (const char *) &req[1];
2209 2321
2210 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); 2322 if (size < sizeof (*req) + 1
2211 opcls->client = client; 2323 || '\0' != method_prefix[size - sizeof (*req) - 1])
2212 opcls->chn = chn; 2324 {
2213 opcls->op_id = req->op_id; 2325 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2326 "%p History replay #%" PRIu64 ": "
2327 "invalid method prefix. size: %u < %u?\n",
2328 chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
2329 GNUNET_break (0);
2330 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2331 return;
2332 }
2333
2334 struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
2214 2335
2215 if (0 == req->message_limit) 2336 if (0 == req->message_limit)
2216 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL, 2337 GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
2217 GNUNET_ntohll (req->start_message_id), 2338 GNUNET_ntohll (req->start_message_id),
2218 GNUNET_ntohll (req->end_message_id), 2339 GNUNET_ntohll (req->end_message_id),
2340 method_prefix,
2219 &store_recv_fragment_history, 2341 &store_recv_fragment_history,
2220 &store_recv_fragment_history_result, opcls); 2342 &store_recv_fragment_history_result, op);
2221 else 2343 else
2222 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL, 2344 GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
2223 GNUNET_ntohll (req->message_limit), 2345 GNUNET_ntohll (req->message_limit),
2346 method_prefix,
2224 &store_recv_fragment_history, 2347 &store_recv_fragment_history,
2225 &store_recv_fragment_history_result, 2348 &store_recv_fragment_history_result,
2226 opcls); 2349 op);
2227 2350
2228 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2351 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2229} 2352}
@@ -2236,19 +2359,19 @@ static int
2236store_recv_state_var (void *cls, const char *name, 2359store_recv_state_var (void *cls, const char *name,
2237 const void *value, size_t value_size) 2360 const void *value, size_t value_size)
2238{ 2361{
2239 struct OperationClosure *opcls = cls; 2362 struct Operation *op = cls;
2240 struct OperationResult *op; 2363 struct GNUNET_OperationResultMessage *res;
2241 2364
2242 if (NULL != name) 2365 if (NULL != name)
2243 { 2366 {
2244 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1; 2367 uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
2245 struct GNUNET_PSYC_MessageModifier *mod; 2368 struct GNUNET_PSYC_MessageModifier *mod;
2246 op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + name_size + value_size); 2369 res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2247 op->header.size = htons (sizeof (*op) + sizeof (*mod) + name_size + value_size); 2370 res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size + value_size);
2248 op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); 2371 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2249 op->op_id = opcls->op_id; 2372 res->op_id = op->op_id;
2250 2373
2251 mod = (struct GNUNET_PSYC_MessageModifier *) &op[1]; 2374 mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
2252 mod->header.size = htons (sizeof (*mod) + name_size + value_size); 2375 mod->header.size = htons (sizeof (*mod) + name_size + value_size);
2253 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); 2376 mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
2254 mod->name_size = htons (name_size); 2377 mod->name_size = htons (name_size);
@@ -2260,19 +2383,20 @@ store_recv_state_var (void *cls, const char *name,
2260 else 2383 else
2261 { 2384 {
2262 struct GNUNET_MessageHeader *mod; 2385 struct GNUNET_MessageHeader *mod;
2263 op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + value_size); 2386 res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
2264 op->header.size = htons (sizeof (*op) + sizeof (*mod) + value_size); 2387 res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
2265 op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT); 2388 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
2266 op->op_id = opcls->op_id; 2389 res->op_id = op->op_id;
2267 2390
2268 mod = (struct GNUNET_MessageHeader *) &op[1]; 2391 mod = (struct GNUNET_MessageHeader *) &res[1];
2269 mod->size = htons (sizeof (*mod) + value_size); 2392 mod->size = htons (sizeof (*mod) + value_size);
2270 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); 2393 mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
2271 memcpy (&mod[1], value, value_size); 2394 memcpy (&mod[1], value, value_size);
2272 } 2395 }
2273 2396
2274 GNUNET_SERVER_notification_context_add (nc, opcls->client); 2397 // FIXME: client might have been disconnected
2275 GNUNET_SERVER_notification_context_unicast (nc, opcls->client, &op->header, 2398 GNUNET_SERVER_notification_context_add (nc, op->client);
2399 GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
2276 GNUNET_NO); 2400 GNUNET_NO);
2277 GNUNET_free (op); 2401 GNUNET_free (op);
2278 return GNUNET_YES; 2402 return GNUNET_YES;
@@ -2284,15 +2408,17 @@ store_recv_state_var (void *cls, const char *name,
2284 * or GNUNET_PSYCSTORE_state_get_prefix() 2408 * or GNUNET_PSYCSTORE_state_get_prefix()
2285 */ 2409 */
2286static void 2410static void
2287store_recv_state_result (void *cls, int64_t result, const char *err_msg) 2411store_recv_state_result (void *cls, int64_t result,
2412 const char *err_msg, uint16_t err_msg_size)
2288{ 2413{
2289 struct OperationClosure *opcls = cls; 2414 struct Operation *op = cls;
2290 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2415 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2291 "%p History replay #%" PRIu64 ": " 2416 "%p History replay #%" PRIu64 ": "
2292 "PSYCSTORE returned %" PRId64 " (%s)\n", 2417 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
2293 opcls->chn, opcls->op_id, result, err_msg); 2418 op->chn, op->op_id, result, err_msg_size, err_msg);
2294 2419
2295 client_send_result (opcls->client, opcls->op_id, result, err_msg); 2420 // FIXME: client might have been disconnected
2421 client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
2296} 2422}
2297 2423
2298 2424
@@ -2314,18 +2440,15 @@ client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
2314 const char *name = (const char *) &req[1]; 2440 const char *name = (const char *) &req[1];
2315 if (0 == name_size || '\0' != name[name_size - 1]) 2441 if (0 == name_size || '\0' != name[name_size - 1])
2316 { 2442 {
2443 GNUNET_break (0);
2317 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 2444 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2318 return; 2445 return;
2319 } 2446 }
2320 2447
2321 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); 2448 struct Operation *op = op_add (chn, client, req->op_id, 0);
2322 opcls->client = client;
2323 opcls->chn = chn;
2324 opcls->op_id = req->op_id;
2325
2326 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name, 2449 GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
2327 &store_recv_state_var, 2450 &store_recv_state_var,
2328 &store_recv_state_result, opcls); 2451 &store_recv_state_result, op);
2329 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2452 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2330} 2453}
2331 2454
@@ -2348,20 +2471,16 @@ client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
2348 const char *name = (const char *) &req[1]; 2471 const char *name = (const char *) &req[1];
2349 if (0 == name_size || '\0' != name[name_size - 1]) 2472 if (0 == name_size || '\0' != name[name_size - 1])
2350 { 2473 {
2474 GNUNET_break (0);
2351 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 2475 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2352 return; 2476 return;
2353 } 2477 }
2354 2478
2355 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls)); 2479 struct Operation *op = op_add (chn, client, req->op_id, 0);
2356 opcls->client = client;
2357 opcls->chn = chn;
2358 opcls->op_id = req->op_id;
2359
2360 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name, 2480 GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
2361 &store_recv_state_var, 2481 &store_recv_state_var,
2362 &store_recv_state_result, opcls); 2482 &store_recv_state_result, op);
2363 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2483 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2364
2365} 2484}
2366 2485
2367 2486
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h
index 4bc92532f..e85e14c0e 100644
--- a/src/psyc/psyc.h
+++ b/src/psyc/psyc.h
@@ -171,42 +171,6 @@ struct StateRequest
171/**** service -> library ****/ 171/**** service -> library ****/
172 172
173 173
174/**
175 * Answer from service to client about last operation.
176 */
177struct OperationResult
178{
179 /**
180 * Types:
181 * - GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE
182 * - GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STORY_RESULT
183 * - GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_RESULT
184 */
185 struct GNUNET_MessageHeader header;
186
187 uint32_t reserved GNUNET_PACKED;
188
189 /**
190 * Operation ID.
191 */
192 uint64_t op_id GNUNET_PACKED;
193
194 /**
195 * Status code for the operation.
196 */
197 uint64_t result_code GNUNET_PACKED;
198
199 /* Followed by:
200 * - on error: NUL-terminated error message
201 * - on success: one of the following message types
202 *
203 * For a STATE_RESULT, one of:
204 * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER
205 * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT
206 * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END
207 */
208};
209
210GNUNET_NETWORK_STRUCT_END 174GNUNET_NETWORK_STRUCT_END
211 175
212#endif 176#endif
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index ce994b272..7839aaf9e 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -43,33 +43,6 @@
43#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) 43#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
44 44
45 45
46struct OperationListItem
47{
48 struct OperationListItem *prev;
49 struct OperationListItem *next;
50
51 /**
52 * Operation ID.
53 */
54 uint64_t op_id;
55
56 /**
57 * Continuation to invoke with the result of an operation.
58 */
59 GNUNET_PSYC_ResultCallback result_cb;
60
61 /**
62 * State variable result callback.
63 */
64 GNUNET_PSYC_StateVarCallback state_var_cb;
65
66 /**
67 * Closure for the callbacks.
68 */
69 void *cls;
70};
71
72
73/** 46/**
74 * Handle to access PSYC channel operations for both the master and slaves. 47 * Handle to access PSYC channel operations for both the master and slaves.
75 */ 48 */
@@ -111,21 +84,6 @@ struct GNUNET_PSYC_Channel
111 void *disconnect_cls; 84 void *disconnect_cls;
112 85
113 /** 86 /**
114 * First operation in the linked list.
115 */
116 struct OperationListItem *op_head;
117
118 /**
119 * Last operation in the linked list.
120 */
121 struct OperationListItem *op_tail;
122
123 /**
124 * Last operation ID used.
125 */
126 uint64_t last_op_id;
127
128 /**
129 * Are we polling for incoming messages right now? 87 * Are we polling for incoming messages right now?
130 */ 88 */
131 uint8_t in_receive; 89 uint8_t in_receive;
@@ -204,83 +162,62 @@ struct GNUNET_PSYC_SlaveTransmitHandle
204}; 162};
205 163
206 164
207/** 165struct GNUNET_PSYC_HistoryRequest
208 * Get a fresh operation ID to distinguish between PSYCstore requests.
209 *
210 * @param h Handle to the PSYCstore service.
211 * @return next operation id to use
212 */
213static uint64_t
214op_get_next_id (struct GNUNET_PSYC_Channel *chn)
215{
216 return ++chn->last_op_id;
217}
218
219
220/**
221 * Find operation by ID.
222 *
223 * @return Operation, or NULL if none found.
224 */
225static struct OperationListItem *
226op_find_by_id (struct GNUNET_PSYC_Channel *chn, uint64_t op_id)
227{ 166{
228 struct OperationListItem *op = chn->op_head; 167 /**
229 while (NULL != op) 168 * Channel.
230 { 169 */
231 if (op->op_id == op_id) 170 struct GNUNET_PSYC_Channel *chn;
232 return op;
233 op = op->next;
234 }
235 return NULL;
236}
237 171
172 /**
173 * Operation ID.
174 */
175 uint64_t op_id;
238 176
239static uint64_t 177 /**
240op_add (struct GNUNET_PSYC_Channel *chn, GNUNET_PSYC_ResultCallback result_cb, 178 * Message handler.
241 void *cls) 179 */
242{ 180 struct GNUNET_PSYC_ReceiveHandle *recv;
243 if (NULL == result_cb)
244 return 0;
245 181
246 struct OperationListItem *op = GNUNET_malloc (sizeof (*op)); 182 /**
247 op->op_id = op_get_next_id (chn); 183 * Function to call when the operation finished.
248 op->result_cb = result_cb; 184 */
249 op->cls = cls; 185 GNUNET_ResultCallback result_cb;
250 GNUNET_CONTAINER_DLL_insert_tail (chn->op_head, chn->op_tail, op);
251 186
252 LOG (GNUNET_ERROR_TYPE_DEBUG, 187 /**
253 "%p Added operation #%" PRIu64 "\n", chn, op->op_id); 188 * Closure for @a result_cb.
254 return op->op_id; 189 */
255} 190 void *cls;
191};
256 192
257 193
258static int 194struct GNUNET_PSYC_StateRequest
259op_result (struct GNUNET_PSYC_Channel *chn, uint64_t op_id,
260 int64_t result_code, const char *err_msg)
261{ 195{
262 LOG (GNUNET_ERROR_TYPE_DEBUG, 196 /**
263 "%p Received result for operation #%" PRIu64 ": %" PRId64 " (%s)\n", 197 * Channel.
264 chn, op_id, result_code, err_msg); 198 */
265 if (0 == op_id) 199 struct GNUNET_PSYC_Channel *chn;
266 return GNUNET_NO;
267 200
268 struct OperationListItem *op = op_find_by_id (chn, op_id); 201 /**
269 if (NULL == op) 202 * Operation ID.
270 { 203 */
271 LOG (GNUNET_ERROR_TYPE_WARNING, 204 uint64_t op_id;
272 "Could not find operation #%" PRIu64 "\n", op_id);
273 return GNUNET_NO;
274 }
275 205
276 GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op); 206 /**
207 * State variable result callback.
208 */
209 GNUNET_PSYC_StateVarCallback var_cb;
277 210
278 if (NULL != op->result_cb) 211 /**
279 op->result_cb (op->cls, result_code, err_msg); 212 * Function to call when the operation finished.
213 */
214 GNUNET_ResultCallback result_cb;
280 215
281 GNUNET_free (op); 216 /**
282 return GNUNET_YES; 217 * Closure for @a result_cb.
283} 218 */
219 void *cls;
220};
284 221
285 222
286static void 223static void
@@ -313,22 +250,97 @@ channel_recv_result (void *cls,
313 struct GNUNET_PSYC_Channel * 250 struct GNUNET_PSYC_Channel *
314 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); 251 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
315 252
253 const struct GNUNET_OperationResultMessage *
254 res = (const struct GNUNET_OperationResultMessage *) msg;
255
316 uint16_t size = ntohs (msg->size); 256 uint16_t size = ntohs (msg->size);
317 const struct OperationResult *res = (const struct OperationResult *) msg; 257 if (size < sizeof (*res))
318 const char *err_msg = NULL; 258 { /* Error, message too small. */
259 GNUNET_break (0);
260 return;
261 }
319 262
320 if (sizeof (struct OperationResult) < size) 263 uint16_t data_size = size - sizeof (*res);
321 { 264 const char *data = (0 < data_size) ? (void *) &res[1] : NULL;
322 err_msg = (const char *) &res[1]; 265 GNUNET_CLIENT_MANAGER_op_result (chn->client, GNUNET_ntohll (res->op_id),
323 if ('\0' != err_msg[size - sizeof (struct OperationResult) - 1]) 266 GNUNET_ntohll_signed (res->result_code),
324 { 267 data, data_size);
325 GNUNET_break (0); 268}
326 err_msg = NULL; 269
327 } 270
271static void
272op_recv_history_result (void *cls, int64_t result,
273 const void *data, uint16_t data_size)
274{
275 LOG (GNUNET_ERROR_TYPE_DEBUG,
276 "Received history replay result: %" PRId64 ".\n", result);
277
278 struct GNUNET_PSYC_HistoryRequest *hist = cls;
279
280 if (NULL != hist->result_cb)
281 hist->result_cb (hist->cls, result, data, data_size);
282
283 GNUNET_PSYC_receive_destroy (hist->recv);
284 GNUNET_free (hist);
285}
286
287
288static void
289op_recv_state_result (void *cls, int64_t result,
290 const void *data, uint16_t data_size)
291{
292 LOG (GNUNET_ERROR_TYPE_DEBUG,
293 "Received state request result: %" PRId64 ".\n", result);
294
295 struct GNUNET_PSYC_StateRequest *sr = cls;
296
297 if (NULL != sr->result_cb)
298 sr->result_cb (sr->cls, result, data, data_size);
299
300 GNUNET_free (sr);
301}
302
303
304static void
305channel_recv_history_result (void *cls,
306 struct GNUNET_CLIENT_MANAGER_Connection *client,
307 const struct GNUNET_MessageHeader *msg)
308{
309 struct GNUNET_PSYC_Channel *
310 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
311
312 const struct GNUNET_OperationResultMessage *
313 res = (const struct GNUNET_OperationResultMessage *) msg;
314 struct GNUNET_PSYC_MessageHeader *
315 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
316
317 LOG (GNUNET_ERROR_TYPE_DEBUG,
318 "%p Received historic fragment for message #%" PRIu64 ".\n",
319 chn, GNUNET_ntohll (pmsg->message_id));
320
321 GNUNET_ResultCallback result_cb = NULL;
322 struct GNUNET_PSYC_HistoryRequest *hist = NULL;
323
324 if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client,
325 GNUNET_ntohll (res->op_id),
326 &result_cb, (void *) &hist))
327 { /* Operation not found. */
328 LOG (GNUNET_ERROR_TYPE_WARNING,
329 "%p Replay operation not found for historic fragment of message #%"
330 PRIu64 ".\n",
331 chn, GNUNET_ntohll (pmsg->message_id));
332 return;
328 } 333 }
329 334
330 op_result (chn, GNUNET_ntohll (res->op_id), 335 uint16_t size = ntohs (msg->size);
331 GNUNET_ntohll (res->result_code) + INT64_MIN, err_msg); 336 if (size < sizeof (*res) + sizeof (*pmsg))
337 { /* Error, message too small. */
338 GNUNET_break (0);
339 return;
340 }
341
342 GNUNET_PSYC_receive_message (hist->recv,
343 (const struct GNUNET_PSYC_MessageHeader *) pmsg);
332} 344}
333 345
334 346
@@ -340,12 +352,21 @@ channel_recv_state_result (void *cls,
340 struct GNUNET_PSYC_Channel * 352 struct GNUNET_PSYC_Channel *
341 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); 353 chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
342 354
343 const struct OperationResult *res = (const struct OperationResult *) msg; 355 const struct GNUNET_OperationResultMessage *
344 struct OperationListItem *op = op_find_by_id (chn, GNUNET_ntohll (res->op_id)); 356 res = (const struct GNUNET_OperationResultMessage *) msg;
345 if (NULL == op || NULL == op->state_var_cb) 357
358 GNUNET_ResultCallback result_cb = NULL;
359 struct GNUNET_PSYC_StateRequest *sr = NULL;
360
361 if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client,
362 GNUNET_ntohll (res->op_id),
363 &result_cb, (void *) &sr))
364 { /* Operation not found. */
346 return; 365 return;
366 }
347 367
348 const struct GNUNET_MessageHeader *modc = (struct GNUNET_MessageHeader *) &op[1]; 368 const struct GNUNET_MessageHeader *
369 modc = (struct GNUNET_MessageHeader *) &res[1];
349 uint16_t modc_size = ntohs (modc->size); 370 uint16_t modc_size = ntohs (modc->size);
350 if (ntohs (msg->size) - sizeof (*msg) != modc_size) 371 if (ntohs (msg->size) - sizeof (*msg) != modc_size)
351 { 372 {
@@ -366,13 +387,13 @@ channel_recv_state_result (void *cls,
366 GNUNET_break (0); 387 GNUNET_break (0);
367 return; 388 return;
368 } 389 }
369 op->state_var_cb (op->cls, name, name + name_size, ntohs (mod->value_size)); 390 sr->var_cb (sr->cls, name, name + name_size, ntohs (mod->value_size));
370 break; 391 break;
371 } 392 }
372 393
373 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: 394 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
374 op->state_var_cb (op->cls, NULL, (const char *) &modc[1], 395 sr->var_cb (sr->cls, NULL, (const char *) &modc[1],
375 modc_size - sizeof (*modc)); 396 modc_size - sizeof (*modc));
376 break; 397 break;
377 } 398 }
378} 399}
@@ -412,11 +433,12 @@ master_recv_start_ack (void *cls,
412 433
413 struct GNUNET_PSYC_CountersResultMessage * 434 struct GNUNET_PSYC_CountersResultMessage *
414 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; 435 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
415 int32_t result = ntohl (cres->result_code) + INT32_MIN; 436 int32_t result = GNUNET_ntohl_signed (cres->result_code);
416 if (GNUNET_OK != result && GNUNET_NO != result) 437 if (GNUNET_OK != result && GNUNET_NO != result)
417 { 438 {
418 LOG (GNUNET_ERROR_TYPE_ERROR, "Could not start master.\n"); 439 LOG (GNUNET_ERROR_TYPE_ERROR, "Could not start master: %ld\n", result);
419 GNUNET_break (0); 440 GNUNET_break (0);
441 /* FIXME: disconnect */
420 } 442 }
421 if (NULL != mst->start_cb) 443 if (NULL != mst->start_cb)
422 mst->start_cb (mst->cb_cls, result, GNUNET_ntohll (cres->max_message_id)); 444 mst->start_cb (mst->cb_cls, result, GNUNET_ntohll (cres->max_message_id));
@@ -464,11 +486,12 @@ slave_recv_join_ack (void *cls,
464 sizeof (struct GNUNET_PSYC_Channel)); 486 sizeof (struct GNUNET_PSYC_Channel));
465 struct GNUNET_PSYC_CountersResultMessage * 487 struct GNUNET_PSYC_CountersResultMessage *
466 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg; 488 cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
467 int32_t result = ntohl (cres->result_code) + INT32_MIN; 489 int32_t result = GNUNET_ntohl_signed (cres->result_code);
468 if (GNUNET_YES != result && GNUNET_NO != result) 490 if (GNUNET_YES != result && GNUNET_NO != result)
469 { 491 {
470 LOG (GNUNET_ERROR_TYPE_ERROR, "Could not join slave.\n"); 492 LOG (GNUNET_ERROR_TYPE_ERROR, "Could not join slave.\n");
471 GNUNET_break (0); 493 GNUNET_break (0);
494 /* FIXME: disconnect */
472 } 495 }
473 if (NULL != slv->connect_cb) 496 if (NULL != slv->connect_cb)
474 slv->connect_cb (slv->cb_cls, result, GNUNET_ntohll (cres->max_message_id)); 497 slv->connect_cb (slv->cb_cls, result, GNUNET_ntohll (cres->max_message_id));
@@ -513,13 +536,17 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] =
513 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, 536 GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
514 sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES }, 537 sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES },
515 538
539 { &channel_recv_history_result, NULL,
540 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
541 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
542
516 { &channel_recv_state_result, NULL, 543 { &channel_recv_state_result, NULL,
517 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, 544 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
518 sizeof (struct OperationResult), GNUNET_YES }, 545 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
519 546
520 { &channel_recv_result, NULL, 547 { &channel_recv_result, NULL,
521 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, 548 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
522 sizeof (struct OperationResult), GNUNET_YES }, 549 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
523 550
524 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, 551 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
525 552
@@ -545,13 +572,17 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] =
545 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 572 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
546 sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES }, 573 sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES },
547 574
575 { &channel_recv_history_result, NULL,
576 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
577 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
578
548 { &channel_recv_state_result, NULL, 579 { &channel_recv_state_result, NULL,
549 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT, 580 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
550 sizeof (struct OperationResult), GNUNET_YES }, 581 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
551 582
552 { &channel_recv_result, NULL, 583 { &channel_recv_result, NULL,
553 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE, 584 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
554 sizeof (struct OperationResult), GNUNET_YES }, 585 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
555 586
556 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, 587 { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
557 588
@@ -1011,17 +1042,28 @@ GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slv)
1011 * correctly; not doing so correctly will result in either denying other slaves 1042 * correctly; not doing so correctly will result in either denying other slaves
1012 * access or offering access to channel data to non-members. 1043 * access or offering access to channel data to non-members.
1013 * 1044 *
1014 * @param channel Channel handle. 1045 * @param chn
1015 * @param slave_key Identity of channel slave to add. 1046 * Channel handle.
1016 * @param announced_at ID of the message that announced the membership change. 1047 * @param slave_key
1017 * @param effective_since Addition of slave is in effect since this message ID. 1048 * Identity of channel slave to add.
1049 * @param announced_at
1050 * ID of the message that announced the membership change.
1051 * @param effective_since
1052 * Addition of slave is in effect since this message ID.
1053 * @param result_cb
1054 * Function to call with the result of the operation.
1055 * The @e result_code argument is #GNUNET_OK on success, or
1056 * #GNUNET_SYSERR on error. In case of an error, the @e data argument
1057 * can contain an optional error message.
1058 * @param cls
1059 * Closure for @a result_cb.
1018 */ 1060 */
1019void 1061void
1020GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, 1062GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
1021 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 1063 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1022 uint64_t announced_at, 1064 uint64_t announced_at,
1023 uint64_t effective_since, 1065 uint64_t effective_since,
1024 GNUNET_PSYC_ResultCallback result_cb, 1066 GNUNET_ResultCallback result_cb,
1025 void *cls) 1067 void *cls)
1026{ 1068{
1027 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); 1069 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req));
@@ -1031,7 +1073,8 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
1031 req->announced_at = GNUNET_htonll (announced_at); 1073 req->announced_at = GNUNET_htonll (announced_at);
1032 req->effective_since = GNUNET_htonll (effective_since); 1074 req->effective_since = GNUNET_htonll (effective_since);
1033 req->did_join = GNUNET_YES; 1075 req->did_join = GNUNET_YES;
1034 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); 1076 req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client,
1077 result_cb, cls));
1035 1078
1036 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1079 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
1037} 1080}
@@ -1054,15 +1097,25 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
1054 * denying members access or offering access to channel data to 1097 * denying members access or offering access to channel data to
1055 * non-members. 1098 * non-members.
1056 * 1099 *
1057 * @param channel Channel handle. 1100 * @param chn
1058 * @param slave_key Identity of channel slave to remove. 1101 * Channel handle.
1059 * @param announced_at ID of the message that announced the membership change. 1102 * @param slave_key
1103 * Identity of channel slave to remove.
1104 * @param announced_at
1105 * ID of the message that announced the membership change.
1106 * @param result_cb
1107 * Function to call with the result of the operation.
1108 * The @e result_code argument is #GNUNET_OK on success, or
1109 * #GNUNET_SYSERR on error. In case of an error, the @e data argument
1110 * can contain an optional error message.
1111 * @param cls
1112 * Closure for @a result_cb.
1060 */ 1113 */
1061void 1114void
1062GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, 1115GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1063 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 1116 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1064 uint64_t announced_at, 1117 uint64_t announced_at,
1065 GNUNET_PSYC_ResultCallback result_cb, 1118 GNUNET_ResultCallback result_cb,
1066 void *cls) 1119 void *cls)
1067{ 1120{
1068 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req)); 1121 struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req));
@@ -1071,17 +1124,62 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1071 req->slave_key = *slave_key; 1124 req->slave_key = *slave_key;
1072 req->announced_at = GNUNET_htonll (announced_at); 1125 req->announced_at = GNUNET_htonll (announced_at);
1073 req->did_join = GNUNET_NO; 1126 req->did_join = GNUNET_NO;
1074 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); 1127 req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client,
1128 result_cb, cls));
1129
1130 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
1131}
1132
1133
1134static struct GNUNET_PSYC_HistoryRequest *
1135channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1136 uint64_t start_message_id,
1137 uint64_t end_message_id,
1138 uint64_t message_limit,
1139 const char *method_prefix,
1140 uint32_t flags,
1141 GNUNET_PSYC_MessageCallback message_cb,
1142 GNUNET_PSYC_MessagePartCallback message_part_cb,
1143 GNUNET_ResultCallback result_cb,
1144 void *cls)
1145{
1146 struct GNUNET_PSYC_HistoryRequestMessage *req;
1147 struct GNUNET_PSYC_HistoryRequest *hist = GNUNET_malloc (sizeof (*hist));
1148 hist->chn = chn;
1149 hist->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
1150 hist->result_cb = result_cb;
1151 hist->cls = cls;
1152 hist->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client,
1153 &op_recv_history_result, hist);
1154
1155 GNUNET_assert (NULL != method_prefix);
1156 uint16_t method_size = strnlen (method_prefix,
1157 GNUNET_SERVER_MAX_MESSAGE_SIZE
1158 - sizeof (*req)) + 1;
1159 GNUNET_assert ('\0' == method_prefix[method_size - 1]);
1160 req = GNUNET_malloc (sizeof (*req) + method_size);
1161 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY);
1162 req->header.size = htons (sizeof (*req) + method_size);
1163 req->start_message_id = GNUNET_htonll (start_message_id);
1164 req->end_message_id = GNUNET_htonll (end_message_id);
1165 req->message_limit = GNUNET_htonll (message_limit);
1166 req->flags = htonl (flags);
1167 req->op_id = GNUNET_htonll (hist->op_id);
1168 memcpy (&req[1], method_prefix, method_size);
1075 1169
1076 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1170 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
1171 return hist;
1077} 1172}
1078 1173
1079 1174
1080/** 1175/**
1081 * Request to replay a part of the message history of the channel. 1176 * Request to replay a part of the message history of the channel.
1082 * 1177 *
1083 * Historic messages (but NOT the state at the time) will be replayed (given to 1178 * Historic messages (but NOT the state at the time) will be replayed and given
1084 * the normal method handlers) if available and if access is permitted. 1179 * to the normal method handlers with a #GNUNET_PSYC_MESSAGE_HISTORIC flag set.
1180 *
1181 * Messages are retrieved from the local PSYCstore if available,
1182 * otherwise requested from the network.
1085 * 1183 *
1086 * @param channel 1184 * @param channel
1087 * Which channel should be replayed? 1185 * Which channel should be replayed?
@@ -1089,8 +1187,10 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1089 * Earliest interesting point in history. 1187 * Earliest interesting point in history.
1090 * @param end_message_id 1188 * @param end_message_id
1091 * Last (inclusive) interesting point in history. 1189 * Last (inclusive) interesting point in history.
1092 * FIXME: @param method_prefix 1190 * @param method_prefix
1093 * Retrieve only messages with a matching method prefix. 1191 * Retrieve only messages with a matching method prefix.
1192 * @param flags
1193 * OR'ed enum GNUNET_PSYC_HistoryReplayFlags
1094 * @param result_cb 1194 * @param result_cb
1095 * Function to call when the requested history has been fully replayed. 1195 * Function to call when the requested history has been fully replayed.
1096 * @param cls 1196 * @param cls
@@ -1098,22 +1198,20 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
1098 * 1198 *
1099 * @return Handle to cancel history replay operation. 1199 * @return Handle to cancel history replay operation.
1100 */ 1200 */
1101void 1201struct GNUNET_PSYC_HistoryRequest *
1102GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn, 1202GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1103 uint64_t start_message_id, 1203 uint64_t start_message_id,
1104 uint64_t end_message_id, 1204 uint64_t end_message_id,
1105 /* FIXME: const char *method_prefix, */ 1205 const char *method_prefix,
1106 GNUNET_PSYC_ResultCallback result_cb, 1206 uint32_t flags,
1207 GNUNET_PSYC_MessageCallback message_cb,
1208 GNUNET_PSYC_MessagePartCallback message_part_cb,
1209 GNUNET_ResultCallback result_cb,
1107 void *cls) 1210 void *cls)
1108{ 1211{
1109 struct HistoryRequest *req = GNUNET_malloc (sizeof (*req)); 1212 return channel_history_replay (chn, start_message_id, end_message_id, 0,
1110 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); 1213 method_prefix, flags,
1111 req->header.size = htons (sizeof (*req)); 1214 message_cb, message_part_cb, result_cb, cls);
1112 req->start_message_id = GNUNET_htonll (start_message_id);
1113 req->end_message_id = GNUNET_htonll (end_message_id);
1114 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
1115
1116 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
1117} 1215}
1118 1216
1119 1217
@@ -1127,8 +1225,11 @@ GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1127 * Which channel should be replayed? 1225 * Which channel should be replayed?
1128 * @param message_limit 1226 * @param message_limit
1129 * Maximum number of messages to replay. 1227 * Maximum number of messages to replay.
1130 * FIXME: @param method_prefix 1228 * @param method_prefix
1131 * Retrieve only messages with a matching method prefix. 1229 * Retrieve only messages with a matching method prefix.
1230 * Use NULL or "" to retrieve all.
1231 * @param flags
1232 * OR'ed enum GNUNET_PSYC_HistoryReplayFlags
1132 * @param result_cb 1233 * @param result_cb
1133 * Function to call when the requested history has been fully replayed. 1234 * Function to call when the requested history has been fully replayed.
1134 * @param cls 1235 * @param cls
@@ -1136,20 +1237,78 @@ GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn,
1136 * 1237 *
1137 * @return Handle to cancel history replay operation. 1238 * @return Handle to cancel history replay operation.
1138 */ 1239 */
1139void 1240struct GNUNET_PSYC_HistoryRequest *
1140GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn, 1241GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn,
1141 uint64_t message_limit, 1242 uint64_t message_limit,
1142 /* FIXME: const char *method_prefix, */ 1243 const char *method_prefix,
1143 GNUNET_PSYC_ResultCallback result_cb, 1244 uint32_t flags,
1245 GNUNET_PSYC_MessageCallback message_cb,
1246 GNUNET_PSYC_MessagePartCallback message_part_cb,
1247 GNUNET_ResultCallback result_cb,
1144 void *cls) 1248 void *cls)
1145{ 1249{
1146 struct HistoryRequest *req = GNUNET_malloc (sizeof (*req)); 1250 return channel_history_replay (chn, 0, 0, message_limit, method_prefix, flags,
1147 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY); 1251 message_cb, message_part_cb, result_cb, cls);
1148 req->header.size = htons (sizeof (*req)); 1252}
1149 req->message_limit = GNUNET_htonll (message_limit); 1253
1150 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls)); 1254
1255void
1256GNUNET_PSYC_channel_history_replay_cancel (struct GNUNET_PSYC_Channel *channel,
1257 struct GNUNET_PSYC_HistoryRequest *hist)
1258{
1259 GNUNET_PSYC_receive_destroy (hist->recv);
1260 GNUNET_CLIENT_MANAGER_op_cancel (hist->chn->client, hist->op_id);
1261 GNUNET_free (hist);
1262}
1263
1264
1265/**
1266 * Retrieve the best matching channel state variable.
1267 *
1268 * If the requested variable name is not present in the state, the nearest
1269 * less-specific name is matched; for example, requesting "_a_b" will match "_a"
1270 * if "_a_b" does not exist.
1271 *
1272 * @param channel
1273 * Channel handle.
1274 * @param full_name
1275 * Full name of the requested variable.
1276 * The actual variable returned might have a shorter name.
1277 * @param var_cb
1278 * Function called once when a matching state variable is found.
1279 * Not called if there's no matching state variable.
1280 * @param result_cb
1281 * Function called after the operation finished.
1282 * (i.e. all state variables have been returned via @a state_cb)
1283 * @param cls
1284 * Closure for the callbacks.
1285 */
1286static struct GNUNET_PSYC_StateRequest *
1287channel_state_get (struct GNUNET_PSYC_Channel *chn,
1288 uint16_t type, const char *name,
1289 GNUNET_PSYC_StateVarCallback var_cb,
1290 GNUNET_ResultCallback result_cb, void *cls)
1291{
1292 struct StateRequest *req;
1293 struct GNUNET_PSYC_StateRequest *sr = GNUNET_malloc (sizeof (*sr));
1294 sr->chn = chn;
1295 sr->var_cb = var_cb;
1296 sr->result_cb = result_cb;
1297 sr->cls = cls;
1298 sr->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client,
1299 &op_recv_state_result, sr);
1300
1301 GNUNET_assert (NULL != name);
1302 size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE
1303 - sizeof (*req)) + 1;
1304 req = GNUNET_malloc (sizeof (*req) + name_size);
1305 req->header.type = htons (type);
1306 req->header.size = htons (sizeof (*req) + name_size);
1307 req->op_id = GNUNET_htonll (sr->op_id);
1308 memcpy (&req[1], name, name_size);
1151 1309
1152 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1310 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
1311 return sr;
1153} 1312}
1154 1313
1155 1314
@@ -1174,21 +1333,16 @@ GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn,
1174 * @param cls 1333 * @param cls
1175 * Closure for the callbacks. 1334 * Closure for the callbacks.
1176 */ 1335 */
1177void 1336struct GNUNET_PSYC_StateRequest *
1178GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn, 1337GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn,
1179 const char *full_name, 1338 const char *full_name,
1180 GNUNET_PSYC_StateVarCallback var_cb, 1339 GNUNET_PSYC_StateVarCallback var_cb,
1181 GNUNET_PSYC_ResultCallback result_cb, 1340 GNUNET_ResultCallback result_cb,
1182 void *cls) 1341 void *cls)
1183{ 1342{
1184 size_t name_size = strlen (full_name) + 1; 1343 return channel_state_get (chn, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
1185 struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size); 1344 full_name, var_cb, result_cb, cls);
1186 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET);
1187 req->header.size = htons (sizeof (*req) + name_size);
1188 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
1189 memcpy (&req[1], full_name, name_size);
1190 1345
1191 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
1192} 1346}
1193 1347
1194 1348
@@ -1215,21 +1369,29 @@ GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn,
1215 * @param cls 1369 * @param cls
1216 * Closure for the callbacks. 1370 * Closure for the callbacks.
1217 */ 1371 */
1218void 1372struct GNUNET_PSYC_StateRequest *
1219GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn, 1373GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn,
1220 const char *name_prefix, 1374 const char *name_prefix,
1221 GNUNET_PSYC_StateVarCallback var_cb, 1375 GNUNET_PSYC_StateVarCallback var_cb,
1222 GNUNET_PSYC_ResultCallback result_cb, 1376 GNUNET_ResultCallback result_cb,
1223 void *cls) 1377 void *cls)
1224{ 1378{
1225 size_t name_size = strlen (name_prefix) + 1; 1379 return channel_state_get (chn, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
1226 struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size); 1380 name_prefix, var_cb, result_cb, cls);
1227 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET); 1381}
1228 req->header.size = htons (sizeof (*req) + name_size);
1229 req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
1230 memcpy (&req[1], name_prefix, name_size);
1231 1382
1232 GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header); 1383
1384/**
1385 * Cancel a state request operation.
1386 *
1387 * @param sr
1388 * Handle for the operation to cancel.
1389 */
1390void
1391GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateRequest *sr)
1392{
1393 GNUNET_CLIENT_MANAGER_op_cancel (sr->chn->client, sr->op_id);
1394 GNUNET_free (sr);
1233} 1395}
1234 1396
1235/* end of psyc_api.c */ 1397/* end of psyc_api.c */
diff --git a/src/psyc/psyc_util_lib.c b/src/psyc/psyc_util_lib.c
index 961922ce4..ebbc2dad8 100644
--- a/src/psyc/psyc_util_lib.c
+++ b/src/psyc/psyc_util_lib.c
@@ -326,9 +326,13 @@ GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
326 * The message part is added to the current message buffer. 326 * The message part is added to the current message buffer.
327 * When this buffer is full, it is added to the transmission queue. 327 * When this buffer is full, it is added to the transmission queue.
328 * 328 *
329 * @param tmit Transmission handle. 329 * @param tmit
330 * @param msg Message part, or NULL. 330 * Transmission handle.
331 * @param end End of message? #GNUNET_YES or #GNUNET_NO. 331 * @param msg
332 * Message part, or NULL.
333 * @param end
334 * End of message?
335 * #GNUNET_YES or #GNUNET_NO.
332 */ 336 */
333static void 337static void
334transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, 338transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
@@ -632,16 +636,24 @@ transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
632/** 636/**
633 * Transmit a message. 637 * Transmit a message.
634 * 638 *
635 * @param tmit Transmission handle. 639 * @param tmit
636 * @param method_name Which method should be invoked. 640 * Transmission handle.
637 * @param env Environment for the message. 641 * @param method_name
638 * Should stay available until the first call to notify_data. 642 * Which method should be invoked.
639 * Can be NULL if there are no modifiers or @a notify_mod is provided instead. 643 * @param env
640 * @param notify_mod Function to call to obtain modifiers. 644 * Environment for the message.
641 * Can be NULL if there are no modifiers or @a env is provided instead. 645 * Should stay available until the first call to notify_data.
642 * @param notify_data Function to call to obtain fragments of the data. 646 * Can be NULL if there are no modifiers or @a notify_mod is
643 * @param notify_cls Closure for @a notify_mod and @a notify_data. 647 * provided instead.
644 * @param flags Flags for the message being transmitted. 648 * @param notify_mod
649 * Function to call to obtain modifiers.
650 * Can be NULL if there are no modifiers or @a env is provided instead.
651 * @param notify_data
652 * Function to call to obtain fragments of the data.
653 * @param notify_cls
654 * Closure for @a notify_mod and @a notify_data.
655 * @param flags
656 * Flags for the message being transmitted.
645 * 657 *
646 * @return #GNUNET_OK if the transmission was started. 658 * @return #GNUNET_OK if the transmission was started.
647 * #GNUNET_SYSERR if another transmission is already going on. 659 * #GNUNET_SYSERR if another transmission is already going on.
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 7160c13c6..ba31d9329 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -82,7 +82,7 @@ struct TransmitClosure
82 82
83struct TransmitClosure *tmit; 83struct TransmitClosure *tmit;
84 84
85uint8_t join_req_count; 85uint8_t join_req_count, end_count;
86 86
87enum 87enum
88{ 88{
@@ -105,6 +105,9 @@ enum
105void 105void
106master_transmit (); 106master_transmit ();
107 107
108void
109master_history_replay_latest ();
110
108 111
109void master_stopped (void *cls) 112void master_stopped (void *cls)
110{ 113{
@@ -198,6 +201,134 @@ end ()
198 201
199 202
200void 203void
204master_message_cb (void *cls, uint64_t message_id, uint32_t flags,
205 const struct GNUNET_PSYC_MessageHeader *msg)
206{
207 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
208 "Test #%d: Master got PSYC message fragment of size %u "
209 "belonging to message ID %" PRIu64 " with flags %x\n",
210 test, ntohs (msg->header.size), message_id, flags);
211 // FIXME
212}
213
214
215void
216master_message_part_cb (void *cls, uint64_t message_id,
217 uint64_t data_offset, uint32_t flags,
218 const struct GNUNET_MessageHeader *msg)
219{
220 if (NULL == msg)
221 {
222 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
223 "Error while receiving message %" PRIu64 "\n", message_id);
224 return;
225 }
226
227 uint16_t type = ntohs (msg->type);
228 uint16_t size = ntohs (msg->size);
229
230 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
231 "Test #%d: Master got message part of type %u and size %u "
232 "belonging to message ID %" PRIu64 " with flags %x\n",
233 test, type, size, message_id, flags);
234
235 switch (test)
236 {
237 case TEST_SLAVE_TRANSMIT:
238 if (GNUNET_PSYC_MESSAGE_REQUEST != flags)
239 {
240 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
241 "Unexpected request flags: %x" PRIu32 "\n", flags);
242 GNUNET_assert (0);
243 return;
244 }
245 // FIXME: check rest of message
246
247 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
248 master_transmit ();
249 break;
250
251 case TEST_MASTER_TRANSMIT:
252 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type && 2 == ++end_count)
253 master_history_replay_latest ();
254 break;
255
256 case TEST_MASTER_HISTORY_REPLAY:
257 case TEST_MASTER_HISTORY_REPLAY_LATEST:
258 if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
259 {
260 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
261 "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
262 test, flags);
263 GNUNET_assert (0);
264 return;
265 }
266 break;
267
268 default:
269 GNUNET_assert (0);
270 }
271}
272
273
274void
275slave_message_cb (void *cls, uint64_t message_id, uint32_t flags,
276 const struct GNUNET_PSYC_MessageHeader *msg)
277{
278 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
279 "Test #%d: Slave got PSYC message fragment of size %u "
280 "belonging to message ID %" PRIu64 " with flags %x\n",
281 test, ntohs (msg->header.size), message_id, flags);
282 // FIXME
283}
284
285
286void
287slave_message_part_cb (void *cls, uint64_t message_id,
288 uint64_t data_offset, uint32_t flags,
289 const struct GNUNET_MessageHeader *msg)
290{
291 if (NULL == msg)
292 {
293 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
294 "Error while receiving message " PRIu64 "\n", message_id);
295 return;
296 }
297
298 uint16_t type = ntohs (msg->type);
299 uint16_t size = ntohs (msg->size);
300
301 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
302 "Test #%d: Slave got message part of type %u and size %u "
303 "belonging to message ID %" PRIu64 " with flags %x\n",
304 test, type, size, message_id, flags);
305
306 switch (test)
307 {
308 case TEST_MASTER_TRANSMIT:
309 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type && 2 == ++end_count)
310 master_history_replay_latest ();
311 break;
312
313 case TEST_SLAVE_HISTORY_REPLAY:
314 case TEST_SLAVE_HISTORY_REPLAY_LATEST:
315 if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
316 {
317 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
318 "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
319 flags);
320 GNUNET_assert (0);
321 return;
322 }
323 break;
324
325 default:
326 GNUNET_assert (0);
327 }
328}
329
330
331void
201state_get_var (void *cls, const char *name, const void *value, size_t value_size) 332state_get_var (void *cls, const char *name, const void *value, size_t value_size)
202{ 333{
203 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 334 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -208,10 +339,12 @@ state_get_var (void *cls, const char *name, const void *value, size_t value_size
208/*** Slave state_get_prefix() ***/ 339/*** Slave state_get_prefix() ***/
209 340
210void 341void
211slave_state_get_prefix_result (void *cls, int64_t result, const char *err_msg) 342slave_state_get_prefix_result (void *cls, int64_t result,
343 const void *err_msg, uint16_t err_msg_size)
212{ 344{
213 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 345 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
214 "slave_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg); 346 "slave_state_get_prefix:\t%" PRId64 " (%.s)\n",
347 result, err_msg_size, err_msg);
215 // FIXME: GNUNET_assert (2 == result); 348 // FIXME: GNUNET_assert (2 == result);
216 end (); 349 end ();
217} 350}
@@ -230,7 +363,8 @@ slave_state_get_prefix ()
230 363
231 364
232void 365void
233master_state_get_prefix_result (void *cls, int64_t result, const char *err_msg) 366master_state_get_prefix_result (void *cls, int64_t result,
367 const void *err_msg, uint16_t err_msg_size)
234{ 368{
235 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 369 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
236 "master_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg); 370 "master_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg);
@@ -252,10 +386,12 @@ master_state_get_prefix ()
252 386
253 387
254void 388void
255slave_state_get_result (void *cls, int64_t result, const char *err_msg) 389slave_state_get_result (void *cls, int64_t result,
390 const void *err_msg, uint16_t err_msg_size)
256{ 391{
257 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 392 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
258 "slave_state_get:\t%" PRId64 " (%s)\n", result, err_msg); 393 "slave_state_get:\t%" PRId64 " (%.*s)\n",
394 result, err_msg_size, err_msg);
259 // FIXME: GNUNET_assert (2 == result); 395 // FIXME: GNUNET_assert (2 == result);
260 master_state_get_prefix (); 396 master_state_get_prefix ();
261} 397}
@@ -274,10 +410,12 @@ slave_state_get ()
274 410
275 411
276void 412void
277master_state_get_result (void *cls, int64_t result, const char *err_msg) 413master_state_get_result (void *cls, int64_t result,
414 const void *err_msg, uint16_t err_msg_size)
278{ 415{
279 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 416 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
280 "master_state_get:\t%" PRId64 " (%s)\n", result, err_msg); 417 "master_state_get:\t%" PRId64 " (%.*s)\n",
418 result, err_msg_size, err_msg);
281 // FIXME: GNUNET_assert (1 == result); 419 // FIXME: GNUNET_assert (1 == result);
282 slave_state_get (); 420 slave_state_get ();
283} 421}
@@ -295,10 +433,12 @@ master_state_get ()
295/*** Slave history_replay() ***/ 433/*** Slave history_replay() ***/
296 434
297void 435void
298slave_history_replay_result (void *cls, int64_t result, const char *err_msg) 436slave_history_replay_result (void *cls, int64_t result,
437 const void *err_msg, uint16_t err_msg_size)
299{ 438{
300 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 439 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
301 "slave_history_replay:\t%" PRId64 " (%s)\n", result, err_msg); 440 "slave_history_replay:\t%" PRId64 " (%.*s)\n",
441 result, err_msg_size, err_msg);
302 GNUNET_assert (9 == result); 442 GNUNET_assert (9 == result);
303 443
304 master_state_get (); 444 master_state_get ();
@@ -309,9 +449,11 @@ void
309slave_history_replay () 449slave_history_replay ()
310{ 450{
311 test = TEST_SLAVE_HISTORY_REPLAY; 451 test = TEST_SLAVE_HISTORY_REPLAY;
312 GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, 452 GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, "",
313 &slave_history_replay_result, 453 GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
314 NULL); 454 &slave_message_cb,
455 &slave_message_part_cb,
456 &slave_history_replay_result, NULL);
315} 457}
316 458
317 459
@@ -319,10 +461,12 @@ slave_history_replay ()
319 461
320 462
321void 463void
322master_history_replay_result (void *cls, int64_t result, const char *err_msg) 464master_history_replay_result (void *cls, int64_t result,
465 const void *err_msg, uint16_t err_msg_size)
323{ 466{
324 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 467 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
325 "master_history_replay:\t%" PRId64 " (%s)\n", result, err_msg); 468 "master_history_replay:\t%" PRId64 " (%.*s)\n",
469 result, err_msg_size, err_msg);
326 GNUNET_assert (9 == result); 470 GNUNET_assert (9 == result);
327 471
328 slave_history_replay (); 472 slave_history_replay ();
@@ -333,9 +477,11 @@ void
333master_history_replay () 477master_history_replay ()
334{ 478{
335 test = TEST_MASTER_HISTORY_REPLAY; 479 test = TEST_MASTER_HISTORY_REPLAY;
336 GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, 480 GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, "",
337 &master_history_replay_result, 481 GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
338 NULL); 482 &master_message_cb,
483 &master_message_part_cb,
484 &master_history_replay_result, NULL);
339} 485}
340 486
341 487
@@ -343,10 +489,12 @@ master_history_replay ()
343 489
344 490
345void 491void
346slave_history_replay_latest_result (void *cls, int64_t result, const char *err_msg) 492slave_history_replay_latest_result (void *cls, int64_t result,
493 const void *err_msg, uint16_t err_msg_size)
347{ 494{
348 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 495 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
349 "slave_history_replay_latest:\t%" PRId64 " (%s)\n", result, err_msg); 496 "slave_history_replay_latest:\t%" PRId64 " (%.*s)\n",
497 result, err_msg_size, err_msg);
350 GNUNET_assert (9 == result); 498 GNUNET_assert (9 == result);
351 499
352 master_history_replay (); 500 master_history_replay ();
@@ -357,7 +505,10 @@ void
357slave_history_replay_latest () 505slave_history_replay_latest ()
358{ 506{
359 test = TEST_SLAVE_HISTORY_REPLAY_LATEST; 507 test = TEST_SLAVE_HISTORY_REPLAY_LATEST;
360 GNUNET_PSYC_channel_history_replay_latest (slv_chn, 1, 508 GNUNET_PSYC_channel_history_replay_latest (slv_chn, 1, "",
509 GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
510 &slave_message_cb,
511 &slave_message_part_cb,
361 &slave_history_replay_latest_result, 512 &slave_history_replay_latest_result,
362 NULL); 513 NULL);
363} 514}
@@ -367,10 +518,12 @@ slave_history_replay_latest ()
367 518
368 519
369void 520void
370master_history_replay_latest_result (void *cls, int64_t result, const char *err_msg) 521master_history_replay_latest_result (void *cls, int64_t result,
522 const void *err_msg, uint16_t err_msg_size)
371{ 523{
372 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 524 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
373 "master_history_replay_latest:\t%" PRId64 " (%s)\n", result, err_msg); 525 "master_history_replay_latest:\t%" PRId64 " (%.*s)\n",
526 result, err_msg_size, err_msg);
374 GNUNET_assert (9 == result); 527 GNUNET_assert (9 == result);
375 528
376 slave_history_replay_latest (); 529 slave_history_replay_latest ();
@@ -381,139 +534,16 @@ void
381master_history_replay_latest () 534master_history_replay_latest ()
382{ 535{
383 test = TEST_MASTER_HISTORY_REPLAY_LATEST; 536 test = TEST_MASTER_HISTORY_REPLAY_LATEST;
384 GNUNET_PSYC_channel_history_replay_latest (mst_chn, 1, 537 GNUNET_PSYC_channel_history_replay_latest (mst_chn, 1, "",
538 GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
539 &master_message_cb,
540 &master_message_part_cb,
385 &master_history_replay_latest_result, 541 &master_history_replay_latest_result,
386 NULL); 542 NULL);
387} 543}
388 544
389 545
390void 546void
391master_message_cb (void *cls, uint64_t message_id, uint32_t flags,
392 const struct GNUNET_PSYC_MessageHeader *msg)
393{
394 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
395 "Test #%d: Master got PSYC message fragment of size %u "
396 "belonging to message ID %" PRIu64 " with flags %x\n",
397 test, ntohs (msg->header.size), message_id, flags);
398 // FIXME
399}
400
401
402void
403master_message_part_cb (void *cls, uint64_t message_id,
404 uint64_t data_offset, uint32_t flags,
405 const struct GNUNET_MessageHeader *msg)
406{
407 if (NULL == msg)
408 {
409 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
410 "Error while receiving message %" PRIu64 "\n", message_id);
411 return;
412 }
413
414 uint16_t type = ntohs (msg->type);
415 uint16_t size = ntohs (msg->size);
416
417 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
418 "Test #%d: Master got message part of type %u and size %u "
419 "belonging to message ID %" PRIu64 " with flags %x\n",
420 test, type, size, message_id, flags);
421
422 switch (test)
423 {
424 case TEST_SLAVE_TRANSMIT:
425 if (GNUNET_PSYC_MESSAGE_REQUEST != flags)
426 {
427 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
428 "Unexpected request flags: %x" PRIu32 "\n", flags);
429 GNUNET_assert (0);
430 return;
431 }
432 // FIXME: check rest of message
433
434 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
435 master_transmit ();
436 break;
437
438 case TEST_MASTER_TRANSMIT:
439 break;
440
441 case TEST_MASTER_HISTORY_REPLAY:
442 case TEST_MASTER_HISTORY_REPLAY_LATEST:
443 if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
444 {
445 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
446 "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
447 flags);
448 GNUNET_assert (0);
449 return;
450 }
451 break;
452
453 default:
454 GNUNET_assert (0);
455 }
456}
457
458
459void
460slave_message_cb (void *cls, uint64_t message_id, uint32_t flags,
461 const struct GNUNET_PSYC_MessageHeader *msg)
462{
463 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
464 "Test #%d: Slave got PSYC message fragment of size %u "
465 "belonging to message ID %" PRIu64 " with flags %x\n",
466 test, ntohs (msg->header.size), message_id, flags);
467 // FIXME
468}
469
470
471void
472slave_message_part_cb (void *cls, uint64_t message_id,
473 uint64_t data_offset, uint32_t flags,
474 const struct GNUNET_MessageHeader *msg)
475{
476 if (NULL == msg)
477 {
478 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
479 "Error while receiving message " PRIu64 "\n", message_id);
480 return;
481 }
482
483 uint16_t type = ntohs (msg->type);
484 uint16_t size = ntohs (msg->size);
485
486 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
487 "Test #%d: Slave got message part of type %u and size %u "
488 "belonging to message ID %" PRIu64 " with flags %x\n",
489 test, type, size, message_id, flags);
490
491 switch (test)
492 {
493 case TEST_MASTER_TRANSMIT:
494 if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
495 master_history_replay_latest ();
496 break;
497
498 case TEST_SLAVE_HISTORY_REPLAY:
499 case TEST_SLAVE_HISTORY_REPLAY_LATEST:
500 if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
501 {
502 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
503 "Test #%d: Unexpected flags for historic message: %x" PRIu32 "\n",
504 flags);
505 GNUNET_assert (0);
506 return;
507 }
508 break;
509
510 default:
511 GNUNET_assert (0);
512 }
513}
514
515
516void
517transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 547transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
518{ 548{
519 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n"); 549 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n");
@@ -665,27 +695,31 @@ slave_transmit ()
665 tmit->data[0] = "slave test"; 695 tmit->data[0] = "slave test";
666 tmit->data_count = 1; 696 tmit->data_count = 1;
667 tmit->slv_tmit 697 tmit->slv_tmit
668 = GNUNET_PSYC_slave_transmit (slv, "_request_test", tmit_notify_mod, 698 = GNUNET_PSYC_slave_transmit (slv, "_request_test", &tmit_notify_mod,
669 tmit_notify_data, tmit, 699 &tmit_notify_data, tmit,
670 GNUNET_PSYC_SLAVE_TRANSMIT_NONE); 700 GNUNET_PSYC_SLAVE_TRANSMIT_NONE);
671} 701}
672 702
673 703
674void 704void
675slave_remove_cb (void *cls, int64_t result, const char *err_msg) 705slave_remove_cb (void *cls, int64_t result,
706 const void *err_msg, uint16_t err_msg_size)
676{ 707{
677 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 708 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
678 "slave_remove:\t%" PRId64 " (%s)\n", result, err_msg); 709 "slave_remove:\t%" PRId64 " (%.*s)\n",
710 result, err_msg_size, err_msg);
679 711
680 slave_transmit (); 712 slave_transmit ();
681} 713}
682 714
683 715
684void 716void
685slave_add_cb (void *cls, int64_t result, const char *err_msg) 717slave_add_cb (void *cls, int64_t result,
718 const void *err_msg, uint16_t err_msg_size)
686{ 719{
687 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 720 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
688 "slave_add:\t%" PRId64 " (%s)\n", result, err_msg); 721 "slave_add:\t%" PRId64 " (%.*s)\n",
722 result, err_msg_size, err_msg);
689 723
690 struct GNUNET_PSYC_Channel *chn = cls; 724 struct GNUNET_PSYC_Channel *chn = cls;
691 GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2, 725 GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2,
@@ -775,6 +809,8 @@ master_transmit ()
775{ 809{
776 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n"); 810 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n");
777 test = TEST_MASTER_TRANSMIT; 811 test = TEST_MASTER_TRANSMIT;
812 end_count = 0;
813
778 uint32_t i, j; 814 uint32_t i, j;
779 815
780 char *name_max = "_test_max"; 816 char *name_max = "_test_max";
@@ -816,8 +852,8 @@ master_transmit ()
816 tmit->data_delay[1] = 3; 852 tmit->data_delay[1] = 3;
817 tmit->data_count = 4; 853 tmit->data_count = 4;
818 tmit->mst_tmit 854 tmit->mst_tmit
819 = GNUNET_PSYC_master_transmit (mst, "_notice_test", tmit_notify_mod, 855 = GNUNET_PSYC_master_transmit (mst, "_notice_test", &tmit_notify_mod,
820 tmit_notify_data, tmit, 856 &tmit_notify_data, tmit,
821 GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); 857 GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN);
822} 858}
823 859
diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c
index 7954de991..1025da8c5 100644
--- a/src/psycstore/gnunet-service-psycstore.c
+++ b/src/psycstore/gnunet-service-psycstore.c
@@ -109,7 +109,7 @@ send_result_code (struct GNUNET_SERVER_Client *client, uint64_t op_id,
109 109
110 if (NULL != err_msg) 110 if (NULL != err_msg)
111 err_size = strnlen (err_msg, 111 err_size = strnlen (err_msg,
112 GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*res)) + 1; 112 GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*res) - 1) + 1;
113 res = GNUNET_malloc (sizeof (struct OperationResult) + err_size); 113 res = GNUNET_malloc (sizeof (struct OperationResult) + err_size);
114 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE); 114 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE);
115 res->header.size = htons (sizeof (struct OperationResult) + err_size); 115 res->header.size = htons (sizeof (struct OperationResult) + err_size);
@@ -222,7 +222,7 @@ send_state_var (void *cls, const char *name,
222 struct StateResult *res; 222 struct StateResult *res;
223 size_t name_size = strlen (name) + 1; 223 size_t name_size = strlen (name) + 1;
224 224
225 /* FIXME: split up value into 64k chunks */ 225 /** @todo FIXME: split up value into 64k chunks */
226 226
227 res = GNUNET_malloc (sizeof (struct StateResult) + name_size + value_size); 227 res = GNUNET_malloc (sizeof (struct StateResult) + name_size + value_size);
228 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE); 228 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE);
@@ -333,7 +333,7 @@ handle_fragment_get (void *cls,
333 first_fragment_id, last_fragment_id, 333 first_fragment_id, last_fragment_id,
334 &ret_frags, &send_fragment, &sc); 334 &ret_frags, &send_fragment, &sc);
335 else 335 else
336 ret = db->fragment_get_latest (db->cls, &req->channel_key, limit, 336 ret = db->fragment_get_latest (db->cls, &req->channel_key, limit,
337 &ret_frags, &send_fragment, &sc); 337 &ret_frags, &send_fragment, &sc);
338 338
339 switch (ret) 339 switch (ret)
@@ -373,6 +373,20 @@ handle_message_get (void *cls,
373{ 373{
374 const struct MessageGetRequest * 374 const struct MessageGetRequest *
375 req = (const struct MessageGetRequest *) msg; 375 req = (const struct MessageGetRequest *) msg;
376 uint16_t size = ntohs (msg->size);
377 const char *method_prefix = (const char *) &req[1];
378
379 if (size < sizeof (*req) + 1
380 || '\0' != method_prefix[size - sizeof (*req) - 1])
381 {
382 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
383 "Message get: invalid method prefix. size: %u < %u?\n",
384 size, sizeof (*req) + 1);
385 GNUNET_break (0);
386 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
387 return;
388 }
389
376 struct SendClosure 390 struct SendClosure
377 sc = { .op_id = req->op_id, .client = client, 391 sc = { .op_id = req->op_id, .client = client,
378 .channel_key = req->channel_key, .slave_key = req->slave_key, 392 .channel_key = req->channel_key, .slave_key = req->slave_key,
@@ -384,6 +398,7 @@ handle_message_get (void *cls,
384 uint64_t last_message_id = GNUNET_ntohll (req->last_message_id); 398 uint64_t last_message_id = GNUNET_ntohll (req->last_message_id);
385 uint64_t limit = GNUNET_ntohll (req->message_limit); 399 uint64_t limit = GNUNET_ntohll (req->message_limit);
386 400
401 /** @todo method_prefix */
387 if (0 == limit) 402 if (0 == limit)
388 ret = db->message_get (db->cls, &req->channel_key, 403 ret = db->message_get (db->cls, &req->channel_key,
389 first_message_id, last_message_id, 404 first_message_id, last_message_id,
@@ -478,7 +493,7 @@ handle_counters_get (void *cls,
478} 493}
479 494
480 495
481/* FIXME: stop processing further state modify messages after an error */ 496/** @todo FIXME: stop processing further state modify messages after an error */
482static void 497static void
483handle_state_modify (void *cls, 498handle_state_modify (void *cls,
484 struct GNUNET_SERVER_Client *client, 499 struct GNUNET_SERVER_Client *client,
@@ -551,7 +566,7 @@ handle_state_modify (void *cls,
551} 566}
552 567
553 568
554/* FIXME: stop processing further state sync messages after an error */ 569/** @todo FIXME: stop processing further state sync messages after an error */
555static void 570static void
556handle_state_sync (void *cls, 571handle_state_sync (void *cls,
557 struct GNUNET_SERVER_Client *client, 572 struct GNUNET_SERVER_Client *client,
@@ -761,8 +776,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
761 sizeof (struct FragmentGetRequest) }, 776 sizeof (struct FragmentGetRequest) },
762 777
763 { &handle_message_get, NULL, 778 { &handle_message_get, NULL,
764 GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET, 779 GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET, 0 },
765 sizeof (struct MessageGetRequest) },
766 780
767 { &handle_message_get_fragment, NULL, 781 { &handle_message_get_fragment, NULL,
768 GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT, 782 GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT,
diff --git a/src/psycstore/plugin_psycstore_sqlite.c b/src/psycstore/plugin_psycstore_sqlite.c
index 04bec6809..542c4bfc9 100644
--- a/src/psycstore/plugin_psycstore_sqlite.c
+++ b/src/psycstore/plugin_psycstore_sqlite.c
@@ -376,6 +376,7 @@ database_setup (struct Plugin *plugin)
376 "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id " 376 "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
377 "ON membership (channel_id, slave_id);"); 377 "ON membership (channel_id, slave_id);");
378 378
379 /** @todo messages table: add method_name column */
379 sql_exec (plugin->dbh, 380 sql_exec (plugin->dbh,
380 "CREATE TABLE IF NOT EXISTS messages (\n" 381 "CREATE TABLE IF NOT EXISTS messages (\n"
381 " channel_id INTEGER NOT NULL REFERENCES channels(id),\n" 382 " channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
@@ -468,6 +469,7 @@ database_setup (struct Plugin *plugin)
468 " AND ? <= fragment_id AND fragment_id <= ?;", 469 " AND ? <= fragment_id AND fragment_id <= ?;",
469 &plugin->select_fragments); 470 &plugin->select_fragments);
470 471
472 /** @todo select_messages: add method_prefix filter */
471 sql_prepare (plugin->dbh, 473 sql_prepare (plugin->dbh,
472 "SELECT hop_counter, signature, purpose, fragment_id,\n" 474 "SELECT hop_counter, signature, purpose, fragment_id,\n"
473 " fragment_offset, message_id, group_generation,\n" 475 " fragment_offset, message_id, group_generation,\n"
@@ -489,6 +491,7 @@ database_setup (struct Plugin *plugin)
489 "ORDER BY fragment_id;", 491 "ORDER BY fragment_id;",
490 &plugin->select_latest_fragments); 492 &plugin->select_latest_fragments);
491 493
494 /** @todo select_latest_messages: add method_prefix filter */
492 sql_prepare (plugin->dbh, 495 sql_prepare (plugin->dbh,
493 "SELECT hop_counter, signature, purpose, fragment_id,\n" 496 "SELECT hop_counter, signature, purpose, fragment_id,\n"
494 " fragment_offset, message_id, group_generation,\n" 497 " fragment_offset, message_id, group_generation,\n"
@@ -499,6 +502,7 @@ database_setup (struct Plugin *plugin)
499 " (SELECT message_id\n" 502 " (SELECT message_id\n"
500 " FROM messages\n" 503 " FROM messages\n"
501 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" 504 " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
505 " GROUP BY message_id\n"
502 " ORDER BY message_id\n" 506 " ORDER BY message_id\n"
503 " DESC LIMIT ?)\n" 507 " DESC LIMIT ?)\n"
504 "ORDER BY fragment_id;", 508 "ORDER BY fragment_id;",
diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c
index 0178e9ce6..c319b2e1b 100644
--- a/src/psycstore/psycstore_api.c
+++ b/src/psycstore/psycstore_api.c
@@ -283,7 +283,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
283 return; 283 return;
284 } 284 }
285 if (size == sizeof (struct OperationResult)) 285 if (size == sizeof (struct OperationResult))
286 str = NULL; 286 str = "";
287 287
288 op = find_op_by_id (h, GNUNET_ntohll (opres->op_id)); 288 op = find_op_by_id (h, GNUNET_ntohll (opres->op_id));
289 if (NULL == op) 289 if (NULL == op)
@@ -321,7 +321,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
321 } 321 }
322 } 322 }
323 if (NULL != op->res_cb) 323 if (NULL != op->res_cb)
324 op->res_cb (op->cls, result_code, str); 324 op->res_cb (op->cls, result_code, str, size - sizeof (*opres));
325 GNUNET_free (op); 325 GNUNET_free (op);
326 } 326 }
327 break; 327 break;
@@ -965,18 +965,19 @@ GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
965 * @param channel_key 965 * @param channel_key
966 * The channel we are interested in. 966 * The channel we are interested in.
967 * @param slave_key 967 * @param slave_key
968 * The slave requesting the message. If not NULL, a membership test is 968 * The slave requesting the message.
969 * performed first and the message is only returned if the slave has 969 * If not NULL, a membership test is performed first
970 * access to it. 970 * and the message is only returned if the slave has access to it.
971 * @param first_message_id 971 * @param first_message_id
972 * First message ID to retrieve. 972 * First message ID to retrieve.
973 * Use 0 to get the latest message.
974 * @param last_message_id 973 * @param last_message_id
975 * Last consecutive message ID to retrieve. 974 * Last consecutive message ID to retrieve.
976 * Use 0 to get the latest message. 975 * @param method_prefix
976 * Retrieve only messages with a matching method prefix.
977 * @todo Implement method_prefix query.
977 * @param fragment_cb 978 * @param fragment_cb
978 * Callback to call with the retrieved fragments. 979 * Callback to call with the retrieved fragments.
979 * @param rcb 980 * @param result_cb
980 * Callback to call with the result of the operation. 981 * Callback to call with the result of the operation.
981 * @param cls 982 * @param cls
982 * Closure for the callbacks. 983 * Closure for the callbacks.
@@ -989,11 +990,18 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
989 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 990 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
990 uint64_t first_message_id, 991 uint64_t first_message_id,
991 uint64_t last_message_id, 992 uint64_t last_message_id,
993 const char *method_prefix,
992 GNUNET_PSYCSTORE_FragmentCallback fragment_cb, 994 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
993 GNUNET_PSYCSTORE_ResultCallback rcb, 995 GNUNET_PSYCSTORE_ResultCallback rcb,
994 void *cls) 996 void *cls)
995{ 997{
996 struct MessageGetRequest *req; 998 struct MessageGetRequest *req;
999 if (NULL == method_prefix)
1000 method_prefix = "";
1001 uint16_t method_size = strnlen (method_prefix,
1002 GNUNET_SERVER_MAX_MESSAGE_SIZE
1003 - sizeof (*req)) + 1;
1004
997 struct GNUNET_PSYCSTORE_OperationHandle * 1005 struct GNUNET_PSYCSTORE_OperationHandle *
998 op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 1006 op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
999 op->h = h; 1007 op->h = h;
@@ -1004,7 +1012,7 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
1004 req = (struct MessageGetRequest *) &op[1]; 1012 req = (struct MessageGetRequest *) &op[1];
1005 op->msg = (struct GNUNET_MessageHeader *) req; 1013 op->msg = (struct GNUNET_MessageHeader *) req;
1006 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); 1014 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
1007 req->header.size = htons (sizeof (*req)); 1015 req->header.size = htons (sizeof (*req) + method_size);
1008 req->channel_key = *channel_key; 1016 req->channel_key = *channel_key;
1009 req->first_message_id = GNUNET_htonll (first_message_id); 1017 req->first_message_id = GNUNET_htonll (first_message_id);
1010 req->last_message_id = GNUNET_htonll (last_message_id); 1018 req->last_message_id = GNUNET_htonll (last_message_id);
@@ -1013,6 +1021,8 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
1013 req->slave_key = *slave_key; 1021 req->slave_key = *slave_key;
1014 req->do_membership_test = GNUNET_YES; 1022 req->do_membership_test = GNUNET_YES;
1015 } 1023 }
1024 memcpy (&req[1], method_prefix, method_size);
1025 ((char *) &req[1])[method_size - 1] = '\0';
1016 1026
1017 op->op_id = get_next_op_id (h); 1027 op->op_id = get_next_op_id (h);
1018 req->op_id = GNUNET_htonll (op->op_id); 1028 req->op_id = GNUNET_htonll (op->op_id);
@@ -1032,14 +1042,17 @@ GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
1032 * @param channel_key 1042 * @param channel_key
1033 * The channel we are interested in. 1043 * The channel we are interested in.
1034 * @param slave_key 1044 * @param slave_key
1035 * The slave requesting the message. If not NULL, a membership test is 1045 * The slave requesting the message.
1036 * performed first and the message is only returned if the slave has 1046 * If not NULL, a membership test is performed first
1037 * access to it. 1047 * and the message is only returned if the slave has access to it.
1038 * @param message_limit 1048 * @param message_limit
1039 * Maximum number of messages to retrieve. 1049 * Maximum number of messages to retrieve.
1050 * @param method_prefix
1051 * Retrieve only messages with a matching method prefix.
1052 * @todo Implement method_prefix query.
1040 * @param fragment_cb 1053 * @param fragment_cb
1041 * Callback to call with the retrieved fragments. 1054 * Callback to call with the retrieved fragments.
1042 * @param rcb 1055 * @param result_cb
1043 * Callback to call with the result of the operation. 1056 * Callback to call with the result of the operation.
1044 * @param cls 1057 * @param cls
1045 * Closure for the callbacks. 1058 * Closure for the callbacks.
@@ -1051,13 +1064,22 @@ GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
1051 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, 1064 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
1052 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, 1065 const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
1053 uint64_t message_limit, 1066 uint64_t message_limit,
1067 const char *method_prefix,
1054 GNUNET_PSYCSTORE_FragmentCallback fragment_cb, 1068 GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
1055 GNUNET_PSYCSTORE_ResultCallback rcb, 1069 GNUNET_PSYCSTORE_ResultCallback rcb,
1056 void *cls) 1070 void *cls)
1057{ 1071{
1058 struct MessageGetRequest *req; 1072 struct MessageGetRequest *req;
1073
1074 if (NULL == method_prefix)
1075 method_prefix = "";
1076 uint16_t method_size = strnlen (method_prefix,
1077 GNUNET_SERVER_MAX_MESSAGE_SIZE
1078 - sizeof (*req)) + 1;
1079 GNUNET_assert ('\0' == method_prefix[method_size - 1]);
1080
1059 struct GNUNET_PSYCSTORE_OperationHandle * 1081 struct GNUNET_PSYCSTORE_OperationHandle *
1060 op = GNUNET_malloc (sizeof (*op) + sizeof (*req)); 1082 op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + method_size);
1061 op->h = h; 1083 op->h = h;
1062 op->data_cb = (DataCallback) fragment_cb; 1084 op->data_cb = (DataCallback) fragment_cb;
1063 op->res_cb = rcb; 1085 op->res_cb = rcb;
@@ -1066,7 +1088,7 @@ GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
1066 req = (struct MessageGetRequest *) &op[1]; 1088 req = (struct MessageGetRequest *) &op[1];
1067 op->msg = (struct GNUNET_MessageHeader *) req; 1089 op->msg = (struct GNUNET_MessageHeader *) req;
1068 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); 1090 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
1069 req->header.size = htons (sizeof (*req)); 1091 req->header.size = htons (sizeof (*req) + method_size);
1070 req->channel_key = *channel_key; 1092 req->channel_key = *channel_key;
1071 req->message_limit = GNUNET_ntohll (message_limit); 1093 req->message_limit = GNUNET_ntohll (message_limit);
1072 if (NULL != slave_key) 1094 if (NULL != slave_key)
@@ -1077,6 +1099,7 @@ GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h,
1077 1099
1078 op->op_id = get_next_op_id (h); 1100 op->op_id = get_next_op_id (h);
1079 req->op_id = GNUNET_htonll (op->op_id); 1101 req->op_id = GNUNET_htonll (op->op_id);
1102 memcpy (&req[1], method_prefix, method_size);
1080 1103
1081 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op); 1104 GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
1082 transmit_next (h); 1105 transmit_next (h);
diff --git a/src/psycstore/test_psycstore.c b/src/psycstore/test_psycstore.c
index 5b1b8a335..021c457b9 100644
--- a/src/psycstore/test_psycstore.c
+++ b/src/psycstore/test_psycstore.c
@@ -154,7 +154,8 @@ end ()
154 154
155 155
156void 156void
157state_reset_result (void *cls, int64_t result, const char *err_msg) 157state_reset_result (void *cls, int64_t result,
158 const char *err_msg, uint16_t err_msg_size)
158{ 159{
159 op = NULL; 160 op = NULL;
160 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_reset_result:\t%d\n", result); 161 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_reset_result:\t%d\n", result);
@@ -195,7 +196,8 @@ state_result (void *cls, const char *name, const void *value, size_t value_size)
195 196
196 197
197void 198void
198state_get_prefix_result (void *cls, int64_t result, const char *err_msg) 199state_get_prefix_result (void *cls, int64_t result,
200 const char *err_msg, uint16_t err_msg_size)
199{ 201{
200 struct StateClosure *scls = cls; 202 struct StateClosure *scls = cls;
201 op = NULL; 203 op = NULL;
@@ -208,7 +210,8 @@ state_get_prefix_result (void *cls, int64_t result, const char *err_msg)
208 210
209 211
210void 212void
211state_get_result (void *cls, int64_t result, const char *err_msg) 213state_get_result (void *cls, int64_t result,
214 const char *err_msg, uint16_t err_msg_size)
212{ 215{
213 op = NULL; 216 op = NULL;
214 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_get_result:\t%d\n", result); 217 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_get_result:\t%d\n", result);
@@ -260,7 +263,8 @@ counters_result (void *cls, int status, uint64_t max_fragment_id,
260 263
261 264
262void 265void
263state_modify_result (void *cls, int64_t result, const char *err_msg) 266state_modify_result (void *cls, int64_t result,
267 const char *err_msg, uint16_t err_msg_size)
264{ 268{
265 op = NULL; 269 op = NULL;
266 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_modify_result:\t%d\n", result); 270 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_modify_result:\t%d\n", result);
@@ -272,7 +276,8 @@ state_modify_result (void *cls, int64_t result, const char *err_msg)
272 276
273 277
274void 278void
275state_sync_result (void *cls, int64_t result, const char *err_msg) 279state_sync_result (void *cls, int64_t result,
280 const char *err_msg, uint16_t err_msg_size)
276{ 281{
277 struct FragmentClosure *fcls = cls; 282 struct FragmentClosure *fcls = cls;
278 op = NULL; 283 op = NULL;
@@ -328,7 +333,8 @@ fragment_result (void *cls,
328 333
329 334
330void 335void
331message_get_latest_result (void *cls, int64_t result, const char *err_msg) 336message_get_latest_result (void *cls, int64_t result,
337 const char *err_msg, uint16_t err_msg_size)
332{ 338{
333 struct FragmentClosure *fcls = cls; 339 struct FragmentClosure *fcls = cls;
334 op = NULL; 340 op = NULL;
@@ -355,7 +361,8 @@ message_get_latest_result (void *cls, int64_t result, const char *err_msg)
355 361
356 362
357void 363void
358message_get_result (void *cls, int64_t result, const char *err_msg) 364message_get_result (void *cls, int64_t result,
365 const char *err_msg, uint16_t err_msg_size)
359{ 366{
360 struct FragmentClosure *fcls = cls; 367 struct FragmentClosure *fcls = cls;
361 op = NULL; 368 op = NULL;
@@ -365,13 +372,14 @@ message_get_result (void *cls, int64_t result, const char *err_msg)
365 fcls->n = 0; 372 fcls->n = 0;
366 fcls->n_expected = 3; 373 fcls->n_expected = 3;
367 op = GNUNET_PSYCSTORE_message_get_latest (h, &channel_pub_key, &slave_pub_key, 374 op = GNUNET_PSYCSTORE_message_get_latest (h, &channel_pub_key, &slave_pub_key,
368 1, &fragment_result, 375 1, "", &fragment_result,
369 &message_get_latest_result, fcls); 376 &message_get_latest_result, fcls);
370} 377}
371 378
372 379
373void 380void
374message_get_fragment_result (void *cls, int64_t result, const char *err_msg) 381message_get_fragment_result (void *cls, int64_t result,
382 const char *err_msg, uint16_t err_msg_size)
375{ 383{
376 struct FragmentClosure *fcls = cls; 384 struct FragmentClosure *fcls = cls;
377 op = NULL; 385 op = NULL;
@@ -382,14 +390,15 @@ message_get_fragment_result (void *cls, int64_t result, const char *err_msg)
382 fcls->n_expected = 3; 390 fcls->n_expected = 3;
383 uint64_t message_id = GNUNET_ntohll (fcls->msg[0]->message_id); 391 uint64_t message_id = GNUNET_ntohll (fcls->msg[0]->message_id);
384 op = GNUNET_PSYCSTORE_message_get (h, &channel_pub_key, &slave_pub_key, 392 op = GNUNET_PSYCSTORE_message_get (h, &channel_pub_key, &slave_pub_key,
385 message_id, message_id, 393 message_id, message_id, "",
386 &fragment_result, 394 &fragment_result,
387 &message_get_result, fcls); 395 &message_get_result, fcls);
388} 396}
389 397
390 398
391void 399void
392fragment_get_latest_result (void *cls, int64_t result, const char *err_msg) 400fragment_get_latest_result (void *cls, int64_t result,
401 const char *err_msg, uint16_t err_msg_size)
393{ 402{
394 struct FragmentClosure *fcls = cls; 403 struct FragmentClosure *fcls = cls;
395 op = NULL; 404 op = NULL;
@@ -407,7 +416,8 @@ fragment_get_latest_result (void *cls, int64_t result, const char *err_msg)
407 416
408 417
409void 418void
410fragment_get_result (void *cls, int64_t result, const char *err_msg) 419fragment_get_result (void *cls, int64_t result,
420 const char *err_msg, uint16_t err_msg_size)
411{ 421{
412 struct FragmentClosure *fcls = cls; 422 struct FragmentClosure *fcls = cls;
413 op = NULL; 423 op = NULL;
@@ -424,7 +434,8 @@ fragment_get_result (void *cls, int64_t result, const char *err_msg)
424 434
425 435
426void 436void
427fragment_store_result (void *cls, int64_t result, const char *err_msg) 437fragment_store_result (void *cls, int64_t result,
438 const char *err_msg, uint16_t err_msg_size)
428{ 439{
429 op = NULL; 440 op = NULL;
430 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "fragment_store:\t%d\n", result); 441 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "fragment_store:\t%d\n", result);
@@ -495,7 +506,8 @@ fragment_store ()
495 506
496 507
497void 508void
498membership_test_result (void *cls, int64_t result, const char *err_msg) 509membership_test_result (void *cls, int64_t result,
510 const char *err_msg, uint16_t err_msg_size)
499{ 511{
500 op = NULL; 512 op = NULL;
501 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "membership_test:\t%d\n", result); 513 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "membership_test:\t%d\n", result);
@@ -506,7 +518,8 @@ membership_test_result (void *cls, int64_t result, const char *err_msg)
506 518
507 519
508void 520void
509membership_store_result (void *cls, int64_t result, const char *err_msg) 521membership_store_result (void *cls, int64_t result,
522 const char *err_msg, uint16_t err_msg_size)
510{ 523{
511 op = NULL; 524 op = NULL;
512 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "membership_store:\t%d\n", result); 525 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "membership_store:\t%d\n", result);
@@ -517,6 +530,7 @@ membership_store_result (void *cls, int64_t result, const char *err_msg)
517 &membership_test_result, NULL); 530 &membership_test_result, NULL);
518} 531}
519 532
533
520/** 534/**
521 * Main function of the test, run from scheduler. 535 * Main function of the test, run from scheduler.
522 * 536 *
diff --git a/src/social/gnunet-service-social.c b/src/social/gnunet-service-social.c
index 9d6a8ff8f..cc71e0702 100644
--- a/src/social/gnunet-service-social.c
+++ b/src/social/gnunet-service-social.c
@@ -137,6 +137,8 @@ struct Place
137 struct MessageTransmitQueue *tmit_msgs_head; 137 struct MessageTransmitQueue *tmit_msgs_head;
138 struct MessageTransmitQueue *tmit_msgs_tail; 138 struct MessageTransmitQueue *tmit_msgs_tail;
139 139
140 struct GNUNET_PSYC_Channel *channel;
141
140 /** 142 /**
141 * Public key of the channel. 143 * Public key of the channel.
142 */ 144 */
@@ -288,6 +290,15 @@ struct Client
288}; 290};
289 291
290 292
293struct OperationClosure
294{
295 struct GNUNET_SERVER_Client *client;
296 struct Place *plc;
297 uint64_t op_id;
298 uint32_t flags;
299};
300
301
291static int 302static int
292psyc_transmit_message (struct Place *plc); 303psyc_transmit_message (struct Place *plc);
293 304
@@ -450,7 +461,7 @@ static void
450client_send_msg (const struct Place *plc, 461client_send_msg (const struct Place *plc,
451 const struct GNUNET_MessageHeader *msg) 462 const struct GNUNET_MessageHeader *msg)
452{ 463{
453 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 464 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
454 "%p Sending message to clients.\n", plc); 465 "%p Sending message to clients.\n", plc);
455 466
456 struct ClientListItem *cli = plc->clients_head; 467 struct ClientListItem *cli = plc->clients_head;
@@ -464,6 +475,46 @@ client_send_msg (const struct Place *plc,
464 475
465 476
466/** 477/**
478 * Send a result code back to the client.
479 *
480 * @param client
481 * Client that should receive the result code.
482 * @param result_code
483 * Code to transmit.
484 * @param op_id
485 * Operation ID in network byte order.
486 * @param data
487 * Data payload or NULL.
488 * @param data_size
489 * Size of @a data.
490 */
491static void
492client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
493 int64_t result_code, const void *data, uint16_t data_size)
494{
495 struct GNUNET_OperationResultMessage *res;
496
497 res = GNUNET_malloc (sizeof (*res) + data_size);
498 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
499 res->header.size = htons (sizeof (*res) + data_size);
500 res->result_code = GNUNET_htonll_signed (result_code);
501 res->op_id = op_id;
502 if (0 < data_size)
503 memcpy (&res[1], data, data_size);
504
505 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
506 "%p Sending result to client for operation #%" PRIu64 ": "
507 "%" PRId64 " (size: %u)\n",
508 client, GNUNET_ntohll (op_id), result_code, data_size);
509
510 GNUNET_SERVER_notification_context_add (nc, client);
511 GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
512 GNUNET_NO);
513 GNUNET_free (res);
514}
515
516
517/**
467 * Called after a PSYC master is started. 518 * Called after a PSYC master is started.
468 */ 519 */
469static void 520static void
@@ -603,6 +654,7 @@ client_recv_host_enter (void *cls, struct GNUNET_SERVER_Client *client,
603 &psyc_master_started, 654 &psyc_master_started,
604 &psyc_recv_join_request, 655 &psyc_recv_join_request,
605 &psyc_recv_message, NULL, hst); 656 &psyc_recv_message, NULL, hst);
657 hst->plc.channel = GNUNET_PSYC_master_get_channel (hst->master);
606 } 658 }
607 else 659 else
608 { 660 {
@@ -720,6 +772,7 @@ client_recv_guest_enter (void *cls, struct GNUNET_SERVER_Client *client,
720 &gst->origin, gst->relay_count, gst->relays, 772 &gst->origin, gst->relay_count, gst->relays,
721 &psyc_recv_message, NULL, &psyc_slave_connected, 773 &psyc_recv_message, NULL, &psyc_slave_connected,
722 &psyc_recv_join_dcsn, gst, join_msg); 774 &psyc_recv_join_dcsn, gst, join_msg);
775 gst->plc.channel = GNUNET_PSYC_slave_get_channel (gst->slave);
723 } 776 }
724 else 777 else
725 { 778 {
@@ -1483,6 +1536,132 @@ client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
1483 1536
1484 1537
1485/** 1538/**
1539 * A historic message result arrived from PSYC.
1540 */
1541static void
1542psyc_recv_history_message (void *cls,
1543 uint64_t message_id,
1544 uint32_t flags,
1545 const struct GNUNET_PSYC_MessageHeader *msg)
1546{
1547 struct OperationClosure *opcls = cls;
1548 struct Place *plc = opcls->plc;
1549
1550 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1551 "%p Received historic message #%" PRId64 " (flags: %x)\n",
1552 plc, message_id, flags);
1553
1554 uint16_t size = ntohs (msg->header.size);
1555
1556 struct GNUNET_OperationResultMessage *
1557 res = GNUNET_malloc (sizeof (*res) + size);
1558 res->header.size = htons (sizeof (*res) + size);
1559 res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
1560 res->op_id = opcls->op_id;
1561 res->result_code = GNUNET_htonll_signed (GNUNET_OK);
1562
1563 memcpy (&res[1], msg, size);
1564
1565 /** @todo FIXME: send only to requesting client */
1566 client_send_msg (plc, &res->header);
1567}
1568
1569
1570static void
1571psyc_recv_history_result (void *cls, int64_t result,
1572 const void *err_msg, uint16_t err_msg_size)
1573{
1574 struct OperationClosure *opcls = cls;
1575 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1576 "%p History replay #%" PRIu64 ": "
1577 "PSYCSTORE returned %" PRId64 " (%.*s)\n",
1578 opcls->plc, GNUNET_ntohll (opcls->op_id), result, err_msg_size, err_msg);
1579
1580 // FIXME: place might have been destroyed
1581 client_send_result (opcls->client, opcls->op_id, result, err_msg, err_msg_size);
1582}
1583
1584
1585/**
1586 * Client requests channel history.
1587 */
1588static void
1589client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
1590 const struct GNUNET_MessageHeader *msg)
1591{
1592 struct Client *
1593 ctx = GNUNET_SERVER_client_get_user_context (client, struct Client);
1594 GNUNET_assert (NULL != ctx);
1595 struct Place *plc = ctx->plc;
1596
1597 const struct GNUNET_PSYC_HistoryRequestMessage *
1598 req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
1599 uint16_t size = ntohs (msg->size);
1600 const char *method_prefix = (const char *) &req[1];
1601
1602 if (size < sizeof (*req) + 1
1603 || '\0' != method_prefix[size - sizeof (*req) - 1])
1604 {
1605 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1606 "%p History replay #%" PRIu64 ": "
1607 "invalid method prefix. size: %u < %u?\n",
1608 plc, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
1609 GNUNET_break (0);
1610 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1611 return;
1612 }
1613
1614 struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
1615 opcls->client = client;
1616 opcls->plc = plc;
1617 opcls->op_id = req->op_id;
1618 opcls->flags = ntohl (req->flags);
1619
1620 if (0 == req->message_limit)
1621 GNUNET_PSYC_channel_history_replay (plc->channel,
1622 GNUNET_ntohll (req->start_message_id),
1623 GNUNET_ntohll (req->end_message_id),
1624 method_prefix, opcls->flags,
1625 &psyc_recv_history_message, NULL,
1626 &psyc_recv_history_result, opcls);
1627 else
1628 GNUNET_PSYC_channel_history_replay_latest (plc->channel,
1629 GNUNET_ntohll (req->message_limit),
1630 method_prefix, opcls->flags,
1631 &psyc_recv_history_message, NULL,
1632 &psyc_recv_history_result, opcls);
1633
1634 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1635}
1636
1637
1638static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1639 { &client_recv_host_enter, NULL,
1640 GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER, 0 },
1641
1642 { &client_recv_guest_enter, NULL,
1643 GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER, 0 },
1644
1645 { &client_recv_join_decision, NULL,
1646 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
1647
1648 { &client_recv_psyc_message, NULL,
1649 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
1650
1651 { &client_recv_history_replay, NULL,
1652 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
1653#if FIXME
1654 { &client_recv_state_get, NULL,
1655 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
1656
1657 { &client_recv_state_get_prefix, NULL,
1658 GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
1659#endif
1660 { NULL, NULL, 0, 0 }
1661};
1662
1663
1664/**
1486 * Initialize the PSYC service. 1665 * Initialize the PSYC service.
1487 * 1666 *
1488 * @param cls Closure. 1667 * @param cls Closure.
@@ -1493,20 +1672,6 @@ static void
1493run (void *cls, struct GNUNET_SERVER_Handle *server, 1672run (void *cls, struct GNUNET_SERVER_Handle *server,
1494 const struct GNUNET_CONFIGURATION_Handle *c) 1673 const struct GNUNET_CONFIGURATION_Handle *c)
1495{ 1674{
1496 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1497 { &client_recv_host_enter, NULL,
1498 GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER, 0 },
1499
1500 { &client_recv_guest_enter, NULL,
1501 GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER, 0 },
1502
1503 { &client_recv_join_decision, NULL,
1504 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
1505
1506 { &client_recv_psyc_message, NULL,
1507 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }
1508 };
1509
1510 cfg = c; 1675 cfg = c;
1511 stats = GNUNET_STATISTICS_create ("social", cfg); 1676 stats = GNUNET_STATISTICS_create ("social", cfg);
1512 hosts = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 1677 hosts = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
diff --git a/src/social/social_api.c b/src/social/social_api.c
index afe85cbdc..559fa388e 100644
--- a/src/social/social_api.c
+++ b/src/social/social_api.c
@@ -47,6 +47,7 @@ static struct GNUNET_GNS_Handle *gns;
47static struct GNUNET_NAMESTORE_Handle *namestore; 47static struct GNUNET_NAMESTORE_Handle *namestore;
48static struct GNUNET_PeerIdentity this_peer; 48static struct GNUNET_PeerIdentity this_peer;
49 49
50
50/** 51/**
51 * Handle for a place where social interactions happen. 52 * Handle for a place where social interactions happen.
52 */ 53 */
@@ -239,33 +240,79 @@ struct GNUNET_SOCIAL_Announcement
239}; 240};
240 241
241 242
242struct GNUNET_SOCIAL_WatchHandle 243/**
244 * A talk request.
245 */
246struct GNUNET_SOCIAL_TalkRequest
243{ 247{
244 248
245}; 249};
246 250
247 251
248struct GNUNET_SOCIAL_LookHandle 252struct GNUNET_SOCIAL_WatchHandle
249{ 253{
250 254
251}; 255};
252 256
253 257
254/** 258/**
255 * A talk request. 259 * A history lesson.
256 */ 260 */
257struct GNUNET_SOCIAL_TalkRequest 261struct GNUNET_SOCIAL_HistoryRequest
258{ 262{
263 /**
264 * Place.
265 */
266 struct GNUNET_SOCIAL_Place *plc;
259 267
268 /**
269 * Operation ID.
270 */
271 uint64_t op_id;
272
273 /**
274 * Message handler.
275 */
276 struct GNUNET_PSYC_ReceiveHandle *recv;
277
278 /**
279 * Function to call when the operation finished.
280 */
281 GNUNET_ResultCallback result_cb;
282
283 /**
284 * Closure for @a result_cb.
285 */
286 void *cls;
260}; 287};
261 288
262 289
263/** 290struct GNUNET_SOCIAL_LookHandle
264 * A history lesson.
265 */
266struct GNUNET_SOCIAL_HistoryLesson
267{ 291{
292 /**
293 * Place.
294 */
295 struct GNUNET_SOCIAL_Place *plc;
296
297 /**
298 * Operation ID.
299 */
300 uint64_t op_id;
301
302 /**
303 * State variable result callback.
304 */
305 GNUNET_PSYC_StateVarCallback var_cb;
306
307 /**
308 * Function to call when the operation finished.
309 */
310 GNUNET_ResultCallback result_cb;
268 311
312 /**
313 * Closure for @a result_cb.
314 */
315 void *cls;
269}; 316};
270 317
271 318
@@ -418,7 +465,7 @@ slicer_message (void *cls, uint64_t message_id, uint64_t fragment_offset,
418 GNUNET_assert (message_id == slicer->message_id); 465 GNUNET_assert (message_id == slicer->message_id);
419 } 466 }
420 467
421 LOG (GNUNET_ERROR_TYPE_WARNING, 468 LOG (GNUNET_ERROR_TYPE_DEBUG,
422 "Slicer received message of type %u and size %u, " 469 "Slicer received message of type %u and size %u, "
423 "with ID %" PRIu64 " and method %s\n", 470 "with ID %" PRIu64 " and method %s\n",
424 ptype, ntohs (msg->size), message_id, slicer->method_name); 471 ptype, ntohs (msg->size), message_id, slicer->method_name);
@@ -594,6 +641,165 @@ place_send_connect_msg (struct GNUNET_SOCIAL_Place *plc)
594 641
595 642
596static void 643static void
644place_recv_result (void *cls,
645 struct GNUNET_CLIENT_MANAGER_Connection *client,
646 const struct GNUNET_MessageHeader *msg)
647{
648 struct GNUNET_SOCIAL_Place *
649 plc = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*plc));
650
651 const struct GNUNET_OperationResultMessage *
652 res = (const struct GNUNET_OperationResultMessage *) msg;
653
654 uint16_t size = ntohs (msg->size);
655 if (size < sizeof (*res))
656 { /* Error, message too small. */
657 GNUNET_break (0);
658 return;
659 }
660
661 uint16_t data_size = size - sizeof (*res);
662 const char *data = (0 < data_size) ? (const char *) &res[1] : NULL;
663 GNUNET_CLIENT_MANAGER_op_result (plc->client, GNUNET_ntohll (res->op_id),
664 GNUNET_ntohll_signed (res->result_code),
665 data, data_size);
666}
667
668
669static void
670op_recv_history_result (void *cls, int64_t result,
671 const void *err_msg, uint16_t err_msg_size)
672{
673 LOG (GNUNET_ERROR_TYPE_DEBUG,
674 "Received history replay result: %" PRId64 ".\n", result);
675
676 struct GNUNET_SOCIAL_HistoryRequest *hist = cls;
677
678 if (NULL != hist->result_cb)
679 hist->result_cb (hist->cls, result, err_msg, err_msg_size);
680
681 GNUNET_PSYC_receive_destroy (hist->recv);
682 GNUNET_free (hist);
683}
684
685
686static void
687op_recv_state_result (void *cls, int64_t result,
688 const void *err_msg, uint16_t err_msg_size)
689{
690 LOG (GNUNET_ERROR_TYPE_DEBUG,
691 "Received state request result: %" PRId64 ".\n", result);
692
693 struct GNUNET_SOCIAL_LookHandle *look = cls;
694
695 if (NULL != look->result_cb)
696 look->result_cb (look->cls, result, err_msg, err_msg_size);
697
698 GNUNET_free (look);
699}
700
701
702static void
703place_recv_history_result (void *cls,
704 struct GNUNET_CLIENT_MANAGER_Connection *client,
705 const struct GNUNET_MessageHeader *msg)
706{
707 struct GNUNET_SOCIAL_Place *
708 plc = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*plc));
709
710 const struct GNUNET_OperationResultMessage *
711 res = (const struct GNUNET_OperationResultMessage *) msg;
712 struct GNUNET_PSYC_MessageHeader *
713 pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
714
715 LOG (GNUNET_ERROR_TYPE_DEBUG,
716 "%p Received historic fragment for message #%" PRIu64 ".\n",
717 plc, GNUNET_ntohll (pmsg->message_id));
718
719 GNUNET_ResultCallback result_cb = NULL;
720 struct GNUNET_SOCIAL_HistoryRequest *hist = NULL;
721
722 if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (plc->client,
723 GNUNET_ntohll (res->op_id),
724 &result_cb, (void *) &hist))
725 { /* Operation not found. */
726 LOG (GNUNET_ERROR_TYPE_WARNING,
727 "%p Replay operation not found for historic fragment of message #%"
728 PRIu64 ".\n",
729 plc, GNUNET_ntohll (pmsg->message_id));
730 return;
731 }
732
733 uint16_t size = ntohs (msg->size);
734 if (size < sizeof (*res) + sizeof (*pmsg))
735 { /* Error, message too small. */
736 GNUNET_break (0);
737 return;
738 }
739
740 GNUNET_PSYC_receive_message (hist->recv,
741 (const struct GNUNET_PSYC_MessageHeader *) pmsg);
742}
743
744
745static void
746place_recv_state_result (void *cls,
747 struct GNUNET_CLIENT_MANAGER_Connection *client,
748 const struct GNUNET_MessageHeader *msg)
749{
750 struct GNUNET_SOCIAL_Place *
751 plc = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*plc));
752
753 const struct GNUNET_OperationResultMessage *
754 res = (const struct GNUNET_OperationResultMessage *) msg;
755
756#if FIXME
757 GNUNET_ResultCallback result_cb = NULL;
758 struct GNUNET_PSYC_StateRequest *sr = NULL;
759
760 if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (plc->client,
761 GNUNET_ntohll (res->op_id),
762 &result_cb, (void *) &sr))
763 { /* Operation not found. */
764 return;
765 }
766
767 const struct GNUNET_MessageHeader *
768 modc = (struct GNUNET_MessageHeader *) &res[1];
769 uint16_t modc_size = ntohs (modc->size);
770 if (ntohs (msg->size) - sizeof (*msg) != modc_size)
771 {
772 GNUNET_break (0);
773 return;
774 }
775 switch (ntohs (modc->type))
776 {
777 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
778 {
779 const struct GNUNET_PSYC_MessageModifier *
780 mod = (const struct GNUNET_PSYC_MessageModifier *) modc;
781
782 const char *name = (const char *) &mod[1];
783 uint16_t name_size = ntohs (mod->name_size);
784 if ('\0' != name[name_size - 1])
785 {
786 GNUNET_break (0);
787 return;
788 }
789 sr->var_cb (sr->cls, name, name + name_size, ntohs (mod->value_size));
790 break;
791 }
792
793 case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
794 sr->var_cb (sr->cls, NULL, (const char *) &modc[1],
795 modc_size - sizeof (*modc));
796 break;
797 }
798#endif
799}
800
801
802static void
597place_recv_message_ack (void *cls, 803place_recv_message_ack (void *cls,
598 struct GNUNET_CLIENT_MANAGER_Connection *client, 804 struct GNUNET_CLIENT_MANAGER_Connection *client,
599 const struct GNUNET_MessageHeader *msg) 805 const struct GNUNET_MessageHeader *msg)
@@ -752,6 +958,18 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler host_handlers[] =
752 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, 958 GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
753 sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, 959 sizeof (struct GNUNET_MessageHeader), GNUNET_NO },
754 960
961 { &place_recv_history_result, NULL,
962 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
963 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
964
965 { &place_recv_state_result, NULL,
966 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
967 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
968
969 { &place_recv_result, NULL,
970 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
971 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
972
755 { &place_recv_disconnect, NULL, 0, 0, GNUNET_NO }, 973 { &place_recv_disconnect, NULL, 0, 0, GNUNET_NO },
756 974
757 { NULL, NULL, 0, 0, GNUNET_NO } 975 { NULL, NULL, 0, 0, GNUNET_NO }
@@ -780,6 +998,18 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler guest_handlers[] =
780 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 998 GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
781 sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES }, 999 sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES },
782 1000
1001 { &place_recv_history_result, NULL,
1002 GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
1003 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
1004
1005 { &place_recv_state_result, NULL,
1006 GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
1007 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
1008
1009 { &place_recv_result, NULL,
1010 GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
1011 sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
1012
783 { &place_recv_disconnect, NULL, 0, 0, GNUNET_NO }, 1013 { &place_recv_disconnect, NULL, 0, 0, GNUNET_NO },
784 1014
785 { NULL, NULL, 0, 0, GNUNET_NO } 1015 { NULL, NULL, 0, 0, GNUNET_NO }
@@ -1546,67 +1776,13 @@ GNUNET_SOCIAL_guest_get_place (struct GNUNET_SOCIAL_Guest *gst)
1546 1776
1547 1777
1548/** 1778/**
1549 * A history lesson.
1550 */
1551struct GNUNET_SOCIAL_HistoryLesson;
1552
1553/**
1554 * Learn about the history of a place.
1555 *
1556 * Sends messages through the slicer function of the place where
1557 * start_message_id <= message_id <= end_message_id.
1558 * The messages will have the #GNUNET_PSYC_MESSAGE_HISTORIC flag set.
1559 *
1560 * To get the latest message, use 0 for both the start and end message ID.
1561 *
1562 * @param place Place we want to learn more about.
1563 * @param start_message_id First historic message we are interested in.
1564 * @param end_message_id Last historic message we are interested in (inclusive).
1565 * @param slicer Slicer to use to process history. Can be the same as the
1566 * slicer of the place, as the HISTORIC flag allows distinguishing
1567 * old messages from fresh ones.
1568 * @param finish_cb Function called after the last message in the history lesson
1569 * is passed through the @a slicer. NULL if not needed.
1570 * @param finish_cb_cls Closure for @a finish_cb.
1571 * @return Handle to abort history lesson, never NULL (multiple lessons
1572 * at the same time are allowed).
1573 */
1574struct GNUNET_SOCIAL_HistoryLesson *
1575GNUNET_SOCIAL_place_get_history (struct GNUNET_SOCIAL_Place *place,
1576 uint64_t start_message_id,
1577 uint64_t end_message_id,
1578 const struct GNUNET_SOCIAL_Slicer *slicer,
1579 void (*finish_cb)(void *),
1580 void *finish_cb_cls)
1581{
1582 return NULL;
1583}
1584
1585
1586/**
1587 * Stop processing messages from the history lesson.
1588 *
1589 * Must not be called after the finish callback of the history lesson is called.
1590 *
1591 * @param hist History lesson to cancel.
1592 */
1593void
1594GNUNET_SOCIAL_place_get_history_cancel (struct GNUNET_SOCIAL_HistoryLesson *hist)
1595{
1596
1597}
1598
1599
1600struct GNUNET_SOCIAL_WatchHandle;
1601
1602/**
1603 * Watch a place for changed objects. 1779 * Watch a place for changed objects.
1604 * 1780 *
1605 * @param place 1781 * @param place
1606 * Place to watch. 1782 * Place to watch.
1607 * @param object_filter 1783 * @param object_filter
1608 * Object prefix to match. 1784 * Object prefix to match.
1609 * @param state_var_cb 1785 * @param var_cb
1610 * Function to call when an object/state var changes. 1786 * Function to call when an object/state var changes.
1611 * @param cls 1787 * @param cls
1612 * Closure for callback. 1788 * Closure for callback.
@@ -1616,7 +1792,7 @@ struct GNUNET_SOCIAL_WatchHandle;
1616struct GNUNET_SOCIAL_WatchHandle * 1792struct GNUNET_SOCIAL_WatchHandle *
1617GNUNET_SOCIAL_place_watch (struct GNUNET_SOCIAL_Place *place, 1793GNUNET_SOCIAL_place_watch (struct GNUNET_SOCIAL_Place *place,
1618 const char *object_filter, 1794 const char *object_filter,
1619 GNUNET_PSYC_StateVarCallback state_var_cb, 1795 GNUNET_PSYC_StateVarCallback var_cb,
1620 void *cls) 1796 void *cls)
1621{ 1797{
1622 return NULL; 1798 return NULL;
@@ -1635,45 +1811,167 @@ GNUNET_SOCIAL_place_watch_cancel (struct GNUNET_SOCIAL_WatchHandle *wh)
1635} 1811}
1636 1812
1637 1813
1638struct GNUNET_SOCIAL_LookHandle; 1814static struct GNUNET_SOCIAL_HistoryRequest *
1815place_history_replay (struct GNUNET_SOCIAL_Place *plc,
1816 uint64_t start_message_id,
1817 uint64_t end_message_id,
1818 uint64_t message_limit,
1819 const char *method_prefix,
1820 uint32_t flags,
1821 struct GNUNET_SOCIAL_Slicer *slicer,
1822 GNUNET_ResultCallback result_cb,
1823 void *cls)
1824{
1825 struct GNUNET_PSYC_HistoryRequestMessage *req;
1826 struct GNUNET_SOCIAL_HistoryRequest *hist = GNUNET_malloc (sizeof (*hist));
1827 hist->plc = plc;
1828 hist->recv = GNUNET_PSYC_receive_create (NULL, &slicer_message, slicer);
1829 hist->result_cb = result_cb;
1830 hist->cls = cls;
1831 hist->op_id = GNUNET_CLIENT_MANAGER_op_add (plc->client,
1832 &op_recv_history_result, hist);
1833
1834 GNUNET_assert (NULL != method_prefix);
1835 uint16_t method_size = strnlen (method_prefix,
1836 GNUNET_SERVER_MAX_MESSAGE_SIZE
1837 - sizeof (*req)) + 1;
1838 GNUNET_assert ('\0' == method_prefix[method_size - 1]);
1839 req = GNUNET_malloc (sizeof (*req) + method_size);
1840 req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY);
1841 req->header.size = htons (sizeof (*req) + method_size);
1842 req->start_message_id = GNUNET_htonll (start_message_id);
1843 req->end_message_id = GNUNET_htonll (end_message_id);
1844 req->message_limit = GNUNET_htonll (message_limit);
1845 req->flags = htonl (flags);
1846 req->op_id = GNUNET_htonll (hist->op_id);
1847 memcpy (&req[1], method_prefix, method_size);
1848
1849 GNUNET_CLIENT_MANAGER_transmit (plc->client, &req->header);
1850 return hist;
1851}
1639 1852
1640 1853
1641/** 1854/**
1642 * Look at objects in the place with a matching name prefix. 1855 * Learn about the history of a place.
1856 *
1857 * Messages are returned through the @a slicer function
1858 * and have the #GNUNET_PSYC_MESSAGE_HISTORIC flag set.
1643 * 1859 *
1644 * @param place 1860 * @param place
1645 * The place to look its objects at. 1861 * Place we want to learn more about.
1646 * @param name_prefix 1862 * @param start_message_id
1647 * Look at objects with names beginning with this value. 1863 * First historic message we are interested in.
1648 * @param state_var_cb 1864 * @param end_message_id
1649 * Function to call for each object found. 1865 * Last historic message we are interested in (inclusive).
1650 * @param cls 1866 * @param method_prefix
1651 * Closure for callback function. 1867 * Only retrieve messages with this method prefix.
1868 * @param flags
1869 * OR'ed GNUNET_PSYC_HistoryReplayFlags
1870 * @param slicer
1871 * Slicer to use for retrieved messages.
1872 * Can be the same as the slicer of the place.
1873 * @param result_cb
1874 * Function called after all messages retrieved.
1875 * NULL if not needed.
1876 * @param cls Closure for @a result_cb.
1877 */
1878struct GNUNET_SOCIAL_HistoryRequest *
1879GNUNET_SOCIAL_place_history_replay (struct GNUNET_SOCIAL_Place *plc,
1880 uint64_t start_message_id,
1881 uint64_t end_message_id,
1882 const char *method_prefix,
1883 uint32_t flags,
1884 struct GNUNET_SOCIAL_Slicer *slicer,
1885 GNUNET_ResultCallback result_cb,
1886 void *cls)
1887{
1888 return place_history_replay (plc, start_message_id, end_message_id, 0,
1889 method_prefix, flags, slicer, result_cb, cls);
1890}
1891
1892
1893/**
1894 * Learn about the history of a place.
1652 * 1895 *
1653 * @return Handle that can be used to stop looking at objects. 1896 * Sends messages through the slicer function of the place where
1897 * start_message_id <= message_id <= end_message_id.
1898 * The messages will have the #GNUNET_PSYC_MESSAGE_HISTORIC flag set.
1899 *
1900 * To get the latest message, use 0 for both the start and end message ID.
1901 *
1902 * @param place
1903 * Place we want to learn more about.
1904 * @param message_limit
1905 * Maximum number of historic messages we are interested in.
1906 * @param method_prefix
1907 * Only retrieve messages with this method prefix.
1908 * @param flags
1909 * OR'ed GNUNET_PSYC_HistoryReplayFlags
1910 * @param result_cb
1911 * Function called after all messages retrieved.
1912 * NULL if not needed.
1913 * @param cls Closure for @a result_cb.
1654 */ 1914 */
1655struct GNUNET_SOCIAL_LookHandle * 1915struct GNUNET_SOCIAL_HistoryRequest *
1656GNUNET_SOCIAL_place_look (struct GNUNET_SOCIAL_Place *place, 1916GNUNET_SOCIAL_place_history_replay_latest (struct GNUNET_SOCIAL_Place *plc,
1657 const char *name_prefix, 1917 uint64_t message_limit,
1658 GNUNET_PSYC_StateVarCallback state_var_cb, 1918 const char *method_prefix,
1659 void *cls) 1919 uint32_t flags,
1660{ 1920 struct GNUNET_SOCIAL_Slicer *slicer,
1661 return NULL; 1921 GNUNET_ResultCallback result_cb,
1922 void *cls)
1923{
1924 return place_history_replay (plc, 0, 0, message_limit, method_prefix, flags,
1925 slicer, result_cb, cls);
1662} 1926}
1663 1927
1664 1928
1665/** 1929/**
1666 * Stop looking at objects. 1930 * Cancel learning about the history of a place.
1667 * 1931 *
1668 * @param lh Look handle to stop. 1932 * @param hist
1933 * History lesson to cancel.
1669 */ 1934 */
1670void 1935void
1671GNUNET_SOCIAL_place_look_cancel (struct GNUNET_SOCIAL_LookHandle *lh) 1936GNUNET_SOCIAL_place_history_replay_cancel (struct GNUNET_SOCIAL_HistoryRequest *hist)
1672{ 1937{
1673 1938 GNUNET_PSYC_receive_destroy (hist->recv);
1939 GNUNET_CLIENT_MANAGER_op_cancel (hist->plc->client, hist->op_id);
1940 GNUNET_free (hist);
1674} 1941}
1675 1942
1676 1943
1944/**
1945 * Request matching state variables.
1946 */
1947static struct GNUNET_SOCIAL_LookHandle *
1948place_state_get (struct GNUNET_SOCIAL_Place *plc,
1949 uint16_t type, const char *name,
1950 GNUNET_PSYC_StateVarCallback var_cb,
1951 GNUNET_ResultCallback result_cb, void *cls)
1952{
1953 struct GNUNET_PSYC_StateRequestMessage *req;
1954 struct GNUNET_SOCIAL_LookHandle *look = GNUNET_malloc (sizeof (*look));
1955 look->plc = plc;
1956 look->var_cb = var_cb;
1957 look->result_cb = result_cb;
1958 look->cls = cls;
1959 look->op_id = GNUNET_CLIENT_MANAGER_op_add (plc->client,
1960 &op_recv_state_result, look);
1961
1962 GNUNET_assert (NULL != name);
1963 size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE
1964 - sizeof (*req)) + 1;
1965 req = GNUNET_malloc (sizeof (*req) + name_size);
1966 req->header.type = htons (type);
1967 req->header.size = htons (sizeof (*req) + name_size);
1968 req->op_id = GNUNET_htonll (look->op_id);
1969 memcpy (&req[1], name, name_size);
1970
1971 GNUNET_CLIENT_MANAGER_transmit (plc->client, &req->header);
1972 return look;
1973}
1974
1677 1975
1678/** 1976/**
1679 * Look at a particular object in the place. 1977 * Look at a particular object in the place.
@@ -1681,17 +1979,64 @@ GNUNET_SOCIAL_place_look_cancel (struct GNUNET_SOCIAL_LookHandle *lh)
1681 * The best matching object is returned (its name might be less specific than 1979 * The best matching object is returned (its name might be less specific than
1682 * what was requested). 1980 * what was requested).
1683 * 1981 *
1684 * @param place The place to look the object at. 1982 * @param place
1685 * @param full_name Full name of the object. 1983 * The place to look the object at.
1686 * @param value_size Set to the size of the returned value. 1984 * @param full_name
1985 * Full name of the object.
1986 * @param value_size
1987 * Set to the size of the returned value.
1988 *
1687 * @return NULL if there is no such object at this place. 1989 * @return NULL if there is no such object at this place.
1688 */ 1990 */
1689const void * 1991struct GNUNET_SOCIAL_LookHandle *
1690GNUNET_SOCIAL_place_look_at (struct GNUNET_SOCIAL_Place *place, 1992GNUNET_SOCIAL_place_look_at (struct GNUNET_SOCIAL_Place *plc,
1691 const char *full_name, 1993 const char *full_name,
1692 size_t *value_size) 1994 GNUNET_PSYC_StateVarCallback var_cb,
1995 GNUNET_ResultCallback result_cb,
1996 void *cls)
1693{ 1997{
1694 return NULL; 1998 return place_state_get (plc, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
1999 full_name, var_cb, result_cb, cls);
2000}
2001
2002
2003/**
2004 * Look for objects in the place with a matching name prefix.
2005 *
2006 * @param place
2007 * The place to look its objects at.
2008 * @param name_prefix
2009 * Look at objects with names beginning with this value.
2010 * @param var_cb
2011 * Function to call for each object found.
2012 * @param cls
2013 * Closure for callback function.
2014 *
2015 * @return Handle that can be used to stop looking at objects.
2016 */
2017struct GNUNET_SOCIAL_LookHandle *
2018GNUNET_SOCIAL_place_look_for (struct GNUNET_SOCIAL_Place *plc,
2019 const char *name_prefix,
2020 GNUNET_PSYC_StateVarCallback var_cb,
2021 GNUNET_ResultCallback result_cb,
2022 void *cls)
2023{
2024 return place_state_get (plc, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
2025 name_prefix, var_cb, result_cb, cls);
2026}
2027
2028
2029/**
2030 * Cancel a state request operation.
2031 *
2032 * @param sr
2033 * Handle for the operation to cancel.
2034 */
2035void
2036GNUNET_SOCIAL_place_look_cancel (struct GNUNET_SOCIAL_LookHandle *look)
2037{
2038 GNUNET_CLIENT_MANAGER_op_cancel (look->plc->client, look->op_id);
2039 GNUNET_free (look);
1695} 2040}
1696 2041
1697 2042
diff --git a/src/social/test_social.c b/src/social/test_social.c
index a1bf2940c..b18c9596c 100644
--- a/src/social/test_social.c
+++ b/src/social/test_social.c
@@ -75,6 +75,9 @@ struct GNUNET_SOCIAL_Slicer *guest_slicer;
75struct GNUNET_SOCIAL_Host *hst; 75struct GNUNET_SOCIAL_Host *hst;
76struct GNUNET_SOCIAL_Guest *gst; 76struct GNUNET_SOCIAL_Guest *gst;
77 77
78struct GNUNET_SOCIAL_Place *hst_plc;
79struct GNUNET_SOCIAL_Place *gst_plc;
80
78struct GuestEnterMessage 81struct GuestEnterMessage
79{ 82{
80 struct GNUNET_PSYC_Message *msg; 83 struct GNUNET_PSYC_Message *msg;
@@ -99,6 +102,8 @@ struct TransmitClosure
99uint8_t join_req_count; 102uint8_t join_req_count;
100struct GNUNET_PSYC_Message *join_resp; 103struct GNUNET_PSYC_Message *join_resp;
101 104
105uint32_t counter;
106
102enum 107enum
103{ 108{
104 TEST_NONE = 0, 109 TEST_NONE = 0,
@@ -106,11 +111,15 @@ enum
106 TEST_GUEST_RECV_ENTRY_DCSN_REFUSE = 2, 111 TEST_GUEST_RECV_ENTRY_DCSN_REFUSE = 2,
107 TEST_HOST_ANSWER_DOOR_ADMIT = 3, 112 TEST_HOST_ANSWER_DOOR_ADMIT = 3,
108 TEST_GUEST_RECV_ENTRY_DCSN_ADMIT = 4, 113 TEST_GUEST_RECV_ENTRY_DCSN_ADMIT = 4,
109 TEST_HOST_ANNOUNCE = 5, 114 TEST_HOST_ANNOUNCE = 5,
110 TEST_HOST_ANNOUNCE_END = 6, 115 TEST_HOST_ANNOUNCE_END = 6,
111 TEST_GUEST_TALK = 7, 116 TEST_HOST_ANNOUNCE2 = 7,
112 TEST_GUEST_LEAVE = 8, 117 TEST_HOST_ANNOUNCE2_END = 8,
113 TEST_HOST_LEAVE = 9, 118 TEST_GUEST_TALK = 9,
119 TEST_GUEST_HISTORY_REPLAY = 10,
120 TEST_GUEST_HISTORY_REPLAY_LATEST = 11,
121 TEST_GUEST_LEAVE = 12,
122 TEST_HOST_LEAVE = 13,
114} test; 123} test;
115 124
116 125
@@ -148,11 +157,13 @@ cleanup ()
148 { 157 {
149 GNUNET_SOCIAL_guest_leave (gst, GNUNET_NO, NULL, NULL); 158 GNUNET_SOCIAL_guest_leave (gst, GNUNET_NO, NULL, NULL);
150 gst = NULL; 159 gst = NULL;
160 gst_plc = NULL;
151 } 161 }
152 if (NULL != hst) 162 if (NULL != hst)
153 { 163 {
154 GNUNET_SOCIAL_host_leave (hst, GNUNET_NO, NULL, NULL); 164 GNUNET_SOCIAL_host_leave (hst, GNUNET_NO, NULL, NULL);
155 hst = NULL; 165 hst = NULL;
166 hst_plc = NULL;
156 } 167 }
157 GNUNET_SCHEDULER_shutdown (); 168 GNUNET_SCHEDULER_shutdown ();
158} 169}
@@ -273,6 +284,7 @@ host_left ()
273 GNUNET_SOCIAL_slicer_destroy (host_slicer); 284 GNUNET_SOCIAL_slicer_destroy (host_slicer);
274 host_slicer = NULL; 285 host_slicer = NULL;
275 hst = NULL; 286 hst = NULL;
287 hst_plc = NULL;
276 288
277 end (); 289 end ();
278} 290}
@@ -316,6 +328,7 @@ guest_left (void *cls)
316 GNUNET_SOCIAL_slicer_destroy (guest_slicer); 328 GNUNET_SOCIAL_slicer_destroy (guest_slicer);
317 guest_slicer = NULL; 329 guest_slicer = NULL;
318 gst = NULL; 330 gst = NULL;
331 gst_plc = NULL;
319 332
320 GNUNET_SCHEDULER_add_now (&schedule_host_leave, NULL); 333 GNUNET_SCHEDULER_add_now (&schedule_host_leave, NULL);
321} 334}
@@ -331,6 +344,69 @@ guest_leave()
331 344
332 345
333static void 346static void
347schedule_guest_leave (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
348{
349 guest_leave ();
350}
351
352
353static void
354guest_recv_history_replay_latest_result (void *cls, int64_t result,
355 const void *data, uint16_t data_size)
356{
357 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
358 "Test #%u: Guest received latest history replay result: %" PRId64 "\n"
359 "%.*s\n",
360 test, result, data_size, data);
361 GNUNET_assert (2 == counter); /* message count */
362 GNUNET_assert (7 == result); /* fragment count */
363
364 GNUNET_SCHEDULER_add_now (&schedule_guest_leave, NULL);
365}
366
367
368static void
369guest_history_replay_latest ()
370{
371 test = TEST_GUEST_HISTORY_REPLAY_LATEST;
372 counter = 0;
373 GNUNET_SOCIAL_place_history_replay_latest (gst_plc, 3, "",
374 GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
375 guest_slicer,
376 &guest_recv_history_replay_latest_result,
377 NULL);
378}
379
380
381static void
382guest_recv_history_replay_result (void *cls, int64_t result,
383 const void *data, uint16_t data_size)
384{
385 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
386 "Test #%u: Guest received history replay result: %" PRId64 "\n"
387 "%.*s\n",
388 test, result, data_size, data);
389 GNUNET_assert (2 == counter); /* message count */
390 GNUNET_assert (7 == result); /* fragment count */
391
392 guest_history_replay_latest ();
393}
394
395
396static void
397guest_history_replay ()
398{
399 test = TEST_GUEST_HISTORY_REPLAY;
400 counter = 0;
401 GNUNET_SOCIAL_place_history_replay (gst_plc, 1, 3, "",
402 GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
403 guest_slicer,
404 &guest_recv_history_replay_result,
405 NULL);
406}
407
408
409static void
334guest_recv_method (void *cls, 410guest_recv_method (void *cls,
335 const struct GNUNET_PSYC_MessageMethod *meth, 411 const struct GNUNET_PSYC_MessageMethod *meth,
336 uint64_t message_id, 412 uint64_t message_id,
@@ -340,8 +416,8 @@ guest_recv_method (void *cls,
340{ 416{
341 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 417 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
342 "Test #%u: Guest received method for message ID %" PRIu64 ":\n" 418 "Test #%u: Guest received method for message ID %" PRIu64 ":\n"
343 "%s\n", 419 "%s (flags: %x)\n",
344 test, message_id, method_name); 420 test, message_id, method_name, flags);
345 /* FIXME: check message */ 421 /* FIXME: check message */
346} 422}
347 423
@@ -395,9 +471,22 @@ guest_recv_eom (void *cls,
395 break; 471 break;
396 472
397 case TEST_HOST_ANNOUNCE_END: 473 case TEST_HOST_ANNOUNCE_END:
474 host_announce2 ();
475 break;
476
477 case TEST_HOST_ANNOUNCE2:
478 test = TEST_HOST_ANNOUNCE2_END;
479 break;
480
481 case TEST_HOST_ANNOUNCE2_END:
398 guest_talk (); 482 guest_talk ();
399 break; 483 break;
400 484
485 case TEST_GUEST_HISTORY_REPLAY:
486 case TEST_GUEST_HISTORY_REPLAY_LATEST:
487 counter++;
488 break;
489
401 default: 490 default:
402 GNUNET_assert (0); 491 GNUNET_assert (0);
403 } 492 }
@@ -466,15 +555,22 @@ host_recv_eom (void *cls,
466 { 555 {
467 case TEST_HOST_ANNOUNCE: 556 case TEST_HOST_ANNOUNCE:
468 test = TEST_HOST_ANNOUNCE_END; 557 test = TEST_HOST_ANNOUNCE_END;
469 //host_announce2 ();
470 break; 558 break;
471 559
472 case TEST_HOST_ANNOUNCE_END: 560 case TEST_HOST_ANNOUNCE_END:
561 host_announce2 ();
562 break;
563
564 case TEST_HOST_ANNOUNCE2:
565 test = TEST_HOST_ANNOUNCE2_END;
566 break;
567
568 case TEST_HOST_ANNOUNCE2_END:
473 guest_talk (); 569 guest_talk ();
474 break; 570 break;
475 571
476 case TEST_GUEST_TALK: 572 case TEST_GUEST_TALK:
477 guest_leave (); 573 guest_history_replay ();
478 break; 574 break;
479 575
480 default: 576 default:
@@ -535,7 +631,7 @@ host_announce ()
535static void 631static void
536host_announce2 () 632host_announce2 ()
537{ 633{
538 test = TEST_HOST_ANNOUNCE; 634 test = TEST_HOST_ANNOUNCE2;
539 635
540 tmit = (struct TransmitClosure) {}; 636 tmit = (struct TransmitClosure) {};
541 tmit.env = GNUNET_ENV_environment_create (); 637 tmit.env = GNUNET_ENV_environment_create ();
@@ -667,6 +763,7 @@ guest_enter ()
667 &this_peer, 0, NULL, emsg->msg, 763 &this_peer, 0, NULL, emsg->msg,
668 guest_slicer, &guest_recv_local_enter, 764 guest_slicer, &guest_recv_local_enter,
669 &guest_recv_entry_decision, NULL); 765 &guest_recv_entry_decision, NULL);
766 gst_plc = GNUNET_SOCIAL_guest_get_place (gst);
670} 767}
671 768
672 769
@@ -727,6 +824,7 @@ id_host_ego_cb (void *cls, const struct GNUNET_IDENTITY_Ego *ego)
727 GNUNET_PSYC_CHANNEL_PRIVATE, host_slicer, 824 GNUNET_PSYC_CHANNEL_PRIVATE, host_slicer,
728 &host_entered, &host_answer_door, 825 &host_entered, &host_answer_door,
729 &host_farewell, NULL); 826 &host_farewell, NULL);
827 hst_plc = GNUNET_SOCIAL_host_get_place (hst);
730} 828}
731 829
732 830