#!@PYTHONEXE@ # # This testcase simply checks that the DHT command-line tools work. # It launches a single peer, stores a value "testdata" under "testkey", # and then gives the system 50 ms to fetch it. # # This could fail if # - command line tool interfaces fail # - DHT plugins for storage are not installed / working # - block plugins for verification (the test plugin) is not installed # # The code does NOT depend on DHT routing or any actual P2P functionality. # import os import sys import shutil import re import subprocess import time import tempfile os.environ["PATH"] = "@bindirectory@" + ":" + os.environ["PATH"] if os.name == "nt": tmp = os.getenv("TEMP") else: tmp = "/tmp" if os.name == 'nt': get = './gnunet-dht-get.exe' put = './gnunet-dht-put.exe' arm = 'gnunet-arm.exe' else: get = './gnunet-dht-get' put = './gnunet-dht-put' arm = 'gnunet-arm' cfgfile = 'test_dht_api_peer1.conf' run_get = [get, '-c', cfgfile] run_put = [put, '-c', cfgfile] run_arm = [arm, '-c', cfgfile] debug = os.getenv('DEBUG') if debug: run_arm += [debug.split(' ')] def cleanup(exitcode): sys.exit(exitcode) def sub_run(args, want_stdo=True, want_stde=False, nofail=False): if want_stdo: stdo = subprocess.PIPE else: stdo = None if want_stde: stde = subprocess.PIPE else: stde = None p = subprocess.Popen(args, stdout=stdo, stderr=stde) stdo, stde = p.communicate() if not nofail: if p.returncode != 0: sys.exit(p.returncode) return (p.returncode, stdo, stde) def fail(result): print(result) r_arm(['-e'], want_stdo=False) cleanup(1) def r_something(to_run, extra_args, failer=None, normal=True, **kw): rc, stdo, stde = sub_run(to_run + extra_args, nofail=True, **kw) if failer is not None: failer(to_run + extra_args, rc, stdo, stde, normal) return (rc, stdo, stde) def r_arm(extra_args, **kw): return r_something(run_arm, extra_args, **kw) def r_get(extra_args, **kw): return r_something(run_get, extra_args, **kw) def r_put(extra_args, **kw): return r_something(run_put, extra_args, **kw) def end_arm_failer(command, rc, stdo, stde, normal): if normal: if rc != 0: fail( "FAIL: error running {}\nCommand output was:\n{}\n{}".format( command, stdo, stde ) ) else: if rc == 0: fail( "FAIL: expected error while running {}\nCommand output was:\n{}\n{}" .format(command, stdo, stde) ) def print_only_failer(command, rc, stdo, stde, normal): if normal: if rc != 0: print( "FAIL: error running {}\nCommand output was:\n{}\n{}".format( command, stdo, stde ) ) cleanup(1) else: if rc == 0: print( "FAIL: expected error while running {}\nCommand output was:\n{}\n{}" .format(command, stdo, stde) ) cleanup(1) print("TEST: Starting ARM...", end='') r_arm(['-s'], failer=end_arm_failer, want_stdo=False, want_stde=False) print("PASS") time.sleep(1) print("TEST: Testing put...", end='') r_put(['-k', 'testkey', '-d', 'testdata', '-t', '8'], failer=end_arm_failer) print("PASS") time.sleep(1) print("TEST: Testing get...", end='') rc, stdo, stde = r_get(['-k', 'testkey', '-T', '50 ms', '-t', '8'], want_stdo=True, failer=end_arm_failer) stdo = stdo.decode('utf-8').replace('\r', '').splitlines() expect = "Result 0, type 8:\ntestdata".splitlines() if len(stdo) != 2 or len(expect ) != 2 or stdo[0] != expect[0] or stdo[1] != expect[1]: fail("output `{}' differs from expected `{}'".format(stdo, expect)) print("PASS") r_arm(['-e', '-d'], failer=print_only_failer)