#!@PYTHON@ # This file is part of GNUnet. # (C) 2010, 2018 Christian Grothoff (and other contributing authors) # # GNUnet is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, # or (at your option) any later version. # # GNUnet is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # SPDX-License-Identifier: AGPL3.0-or-later # # # # This test starts 3 peers (nated, server, no nat)and expects bootstrap # and a connected clique # # Conditions for successful exit: # Both peers have 2 connected peers in transport, core, topology, fs and dht import sys import signal import os import subprocess import re import shutil import time from gnunet_testing import Peer from gnunet_testing import Test from gnunet_testing import Check from gnunet_testing import Condition from gnunet_testing import * if os.name == "nt": tmp = os.getenv("TEMP") else: tmp = "/tmp" # definitions testname = "test_integration_clique" verbose = True check_timeout = 180 def cleanup_onerror(function, path, excinfo): import stat if not os.path.exists(path): pass elif not os.access(path, os.W_OK): # Is the error an access error ? os.chmod(path, stat.S_IWUSR) function(path) else: raise def cleanup(): retries = 10 path = os.path.join(tmp, "c_bootstrap_server") test.p("Removing " + path) while ((os.path.exists(path)) and (retries > 0)): shutil.rmtree((path), False, cleanup_onerror) time.sleep(1) retries -= 1 if (os.path.exists(path)): test.p("Failed to remove " + path) retries = 10 path = os.path.join(tmp, "c_no_nat_client") test.p("Removing " + path) while ((os.path.exists(path)) and (retries > 0)): shutil.rmtree((path), False, cleanup_onerror) time.sleep(1) retries -= 1 if (os.path.exists(path)): test.p("Failed to remove " + path) retries = 10 path = os.path.join(tmp, "c_nat_client") test.p("Removing " + path) while ((os.path.exists(path)) and (retries > 0)): shutil.rmtree((path), False, cleanup_onerror) time.sleep(1) retries -= 1 if (os.path.exists(path)): test.p("Failed to remove " + path) def success_cont(check): global success success = True print('Connected clique successfully') def fail_cont(check): global success success = False check.evaluate(True) print('Failed to connect clique') def check_connect(): check = Check(test) check.add(StatisticsCondition(client, 'transport', '# peers connected', 2)) check.add(StatisticsCondition(client, 'core', '# peers connected', 2)) check.add(StatisticsCondition(client, 'topology', '# peers connected', 2)) check.add(StatisticsCondition(client, 'dht', '# peers connected', 2)) check.add(StatisticsCondition(client, 'fs', '# peers connected', 2)) check.add(StatisticsCondition(client_nat, 'transport', '# peers connected', 2)) check.add(StatisticsCondition(client_nat, 'core', '# peers connected', 2)) check.add(StatisticsCondition(client_nat, 'topology', '# peers connected', 2)) check.add(StatisticsCondition(client_nat, 'dht', '# peers connected', 2)) check.add(StatisticsCondition(client_nat, 'fs', '# peers connected', 2)) check.add(StatisticsCondition(server, 'transport', '# peers connected', 2)) check.add(StatisticsCondition(server, 'core', '# peers connected', 2)) check.add(StatisticsCondition(server, 'topology', '# peers connected', 2)) check.add(StatisticsCondition(server, 'dht', '# peers connected', 2)) check.add(StatisticsCondition(server, 'fs', '# peers connected', 2)) check.run_blocking(check_timeout, success_cont, fail_cont) # # Test execution # def SigHandler(signum=None, frame=None): global success global server global client global client_nat print('Test was aborted!') if (None != server): server.stop() if (None != client): client.stop() if (None != client_nat): client_nat.stop() cleanup() sys.exit(success) def run(): global success global test global server global client global client_nat success = False server = None client = None client_nat = None test = Test('test_integration_clique', verbose) cleanup() server = Peer(test, './confs/c_bootstrap_server.conf') if (True != server.start()): print('Failed to start server') if (None != server): server.stop() cleanup() sys.exit(success) # Server has to settle down time.sleep(5) client = Peer(test, './confs/c_no_nat_client.conf') if (True != client.start()): print('Failed to start client') if (None != server): server.stop() if (None != client): client.stop() cleanup() sys.exit(success) # Server has to settle down time.sleep(5) client_nat = Peer(test, './confs/c_nat_client.conf') if (True != client_nat.start()): print('Failed to start client_nat') if (None != server): server.stop() if (None != client): client.stop() if (None != client_nat): client_nat.stop() cleanup() sys.exit(success) if ((client.started == True) and (client_nat.started == True) and (server.started == True)): test.p('Peers started, running check') check_connect() server.stop() client.stop() client_nat.stop() cleanup() if (success == False): print('Test failed') return False else: return True try: run() except (KeyboardInterrupt, SystemExit): print('Test interrupted') server.stop() client.stop() client_nat.stop() cleanup() if (success == False): sys.exit(1) else: sys.exit(0)