Make ptest more classy

This commit is contained in:
Roger A. Light 2024-11-21 11:06:57 +00:00
parent 4ab829125d
commit 86a6ef0e8a
8 changed files with 95 additions and 82 deletions

View file

@ -11,4 +11,5 @@ tests = [
(2, './ctrl-dynsec.py'),
]
ptest.run_tests(tests)
test = ptest.PTest()
test.run_tests(tests)

View file

@ -9,4 +9,5 @@ tests = []
for test_file in pathlib.Path('.').glob('db-dump-*.py'):
tests.append((1, test_file.resolve()))
ptest.run_tests(tests)
test = ptest.PTest()
test.run_tests(tests)

View file

@ -9,4 +9,5 @@ tests = []
for test_file in pathlib.Path('.').glob('passwd-*.py'):
tests.append((1, test_file.resolve()))
ptest.run_tests(tests)
test = ptest.PTest()
test.run_tests(tests)

View file

@ -9,4 +9,5 @@ tests = []
for test_file in pathlib.Path('.').glob('signal-*.py'):
tests.append((1, test_file.resolve()))
ptest.run_tests(tests)
test = ptest.PTest()
test.run_tests(tests)

View file

@ -289,4 +289,5 @@ tests = [
(1, './21-proxy-v2-ssl-require-tls-success.py'),
]
ptest.run_tests(tests)
test = ptest.PTest()
test.run_tests(tests)

View file

@ -47,4 +47,5 @@ tests = [
(2, './04-rr-qos1-ws.py'),
]
ptest.run_tests(tests)
test = ptest.PTest()
test.run_tests(tests)

View file

@ -63,4 +63,5 @@ tests = [
]
ptest.run_tests(tests)
test = ptest.PTest()
test.run_tests(tests)

View file

@ -4,7 +4,7 @@ import subprocess
import time
import sys
class PTest():
class PTestCase():
def __init__(self, testdef):
self.ports = testdef[0]
self.cmd = str(testdef[1])
@ -34,87 +34,93 @@ class PTest():
print(f"{self.runtime:0.3f}s : \033[{col}m{cmd}\033[0m")
def next_test(tests, ports):
if len(tests) == 0 or len(ports) == 0:
return
class PTest():
def __init__(self, minport=1888, max_running=20):
self.minport = minport
self.max_running = 20
self.tests = []
test = tests.pop()
test.mosq_port = []
def next_test(self, tests, ports):
if len(tests) == 0 or len(ports) == 0:
return
if len(ports) < test.ports:
tests.insert(0, test)
return None
else:
test = tests.pop()
test.mosq_port = []
for i in range(0, test.ports):
proc_port = ports.pop()
test.mosq_port.append(proc_port)
if len(ports) < test.ports:
tests.insert(0, test)
return None
else:
test.start()
return test
for i in range(0, test.ports):
proc_port = ports.pop()
test.mosq_port.append(proc_port)
test.start()
return test
def run_tests(test_list, minport=1888, max_running=20):
ports = list(range(minport, minport+max_running+1))
start_time = time.time()
passed = 0
retried = 0
failed = 0
def run_tests(self, test_list):
ports = list(range(self.minport, self.minport+self.max_running+1))
start_time = time.time()
passed = 0
retried = 0
failed = 0
tests = []
for t in test_list:
tests.append(PTest(t))
tests = []
for t in test_list:
tests.append(PTestCase(t))
failed_tests = []
running_tests = []
retry_tests = []
while len(tests) > 0 or len(running_tests) > 0 or len(retry_tests) > 0:
if len(running_tests) <= max_running:
t = next_test(tests, ports)
if t is None:
time.sleep(0.1)
else:
running_tests.append(t)
if len(running_tests) == 0 and len(tests) == 0 and len(retry_tests) > 0:
# Only retry tests after everything else has finished to reduce load
tests = retry_tests
retry_tests = []
for t in running_tests:
t.proc.poll()
if t.proc.returncode is not None:
t.runtime = time.time() - t.start_time
running_tests.remove(t)
for portret in t.mosq_port:
ports.append(portret)
t.proc.terminate()
t.proc.wait()
if t.proc.returncode == 1 and t.attempts < 5:
t.print_result(33)
retried += 1
t.attempts += 1
t.proc = None
t.mosq_port = None
retry_tests.append(t)
continue
if t.proc.returncode == 1:
t.print_result(31)
failed = failed + 1
failed_tests.append(t.cmd)
print(f"{t.cmd}:")
(stdo, stde) = t.proc.communicate()
failed_tests = []
running_tests = []
retry_tests = []
while len(tests) > 0 or len(running_tests) > 0 or len(retry_tests) > 0:
if len(running_tests) <= self.max_running:
t = self.next_test(tests, ports)
if t is None:
time.sleep(0.1)
else:
passed = passed + 1
t.print_result(32)
running_tests.append(t)
print("Passed: %d\nRetried: %d\nFailed: %d\nTotal: %d\nTotal time: %0.2f" % (passed, retried, failed, passed+failed, time.time()-start_time))
if failed > 0:
print("Failing tests:")
failed_tests.sort()
for f in failed_tests:
print(f)
sys.exit(1)
if len(running_tests) == 0 and len(tests) == 0 and len(retry_tests) > 0:
# Only retry tests after everything else has finished to reduce load
tests = retry_tests
retry_tests = []
for t in running_tests:
t.proc.poll()
if t.proc.returncode is not None:
t.runtime = time.time() - t.start_time
running_tests.remove(t)
for portret in t.mosq_port:
ports.append(portret)
t.proc.terminate()
t.proc.wait()
if t.proc.returncode == 1 and t.attempts < 5:
t.print_result(33)
retried += 1
t.attempts += 1
t.proc = None
t.mosq_port = None
retry_tests.append(t)
continue
if t.proc.returncode == 1:
t.print_result(31)
failed = failed + 1
failed_tests.append(t.cmd)
print(f"{t.cmd}:")
(stdo, stde) = t.proc.communicate()
else:
passed = passed + 1
t.print_result(32)
print("Passed: %d\nRetried: %d\nFailed: %d\nTotal: %d\nTotal time: %0.2f" % (passed, retried, failed, passed+failed, time.time()-start_time))
if failed > 0:
print("Failing tests:")
failed_tests.sort()
for f in failed_tests:
print(f)
sys.exit(1)