#!@PYTHON@ # This file is part of GNUnet. # (C) 2010, 2018 Christian Grothoff (and other contributing authors) # # GNUnet is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # GNUnet is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # Testcase for gnunet-peerinfo from __future__ import print_function import os import re import subprocess import sys import shutil import time class pexpect (object): def __init__(self): super(pexpect, self).__init__() def spawn(self, stdin, arglist, *pargs, **kwargs): env = kwargs.pop('env', None) if env is None: env = os.environ.copy() # This messes up some testcases, disable log redirection env.pop('GNUNET_FORCE_LOGFILE', None) self.proc = subprocess.Popen(arglist, *pargs, env=env, **kwargs) if self.proc is None: print("Failed to spawn a process {0}".format(arglist)) sys.exit(1) if stdin is not None: self.stdo, self.stde = self.proc.communicate(stdin) else: self.stdo, self.stde = self.proc.communicate() return self.proc def expect(self, s, r, flags=0): stream = self.stdo if s == 'stdout' else self.stde if isinstance(r, str): if r == "EOF": if len(stream) == 0: return True else: print("Failed to find `{1}' in {0}, which is `{2}' ({3})".format(s, r, stream, len(stream))) sys.exit(2) raise ValueError("Argument `r' should be an instance of re.RegexObject or a special string, but is `{0}'".format(r)) m = r.search(stream.decode(), flags) if not m: print("Failed to find `{1}' in {0}, which is is `{2}'".format(s, r.pattern, stream)) sys.exit(2) stream = stream[m.end():] if s == 'stdout': self.stdo = stream else: self.stde = stream return m def read(self, s, size=-1): stream = self.stdo if s == 'stdout' else self.stde result = "" if size < 0: result = stream new_stream = "" else: result = stream[0:size] new_stream = stream[size:] if s == 'stdout': self.stdo = new_stream else: self.stde = new_stream return result