diff options
author | Sanjit Bhat <[email protected]> | 2023-10-26 06:44:48 -0400 |
---|---|---|
committer | Sanjit Bhat <[email protected]> | 2023-10-26 06:44:48 -0400 |
commit | cfae93475dfb4cb5cfe264f4c029136e1447c262 (patch) | |
tree | 699903e093e3a23caf7ce3899e7c80e48511f900 /gradelib.py | |
parent | 1ed40716eb54e371df9d1814b9129666b3fe4f09 (diff) | |
download | xv6-labs-cfae93475dfb4cb5cfe264f4c029136e1447c262.tar.gz xv6-labs-cfae93475dfb4cb5cfe264f4c029136e1447c262.tar.bz2 xv6-labs-cfae93475dfb4cb5cfe264f4c029136e1447c262.zip |
net add missing files
Diffstat (limited to 'gradelib.py')
-rw-r--r-- | gradelib.py | 628 |
1 files changed, 628 insertions, 0 deletions
diff --git a/gradelib.py b/gradelib.py new file mode 100644 index 0000000..f0d4934 --- /dev/null +++ b/gradelib.py @@ -0,0 +1,628 @@ +from __future__ import print_function + +import sys, os, re, time, socket, select, subprocess, errno, shutil, random, string, json +from subprocess import check_call, Popen +from optparse import OptionParser + +__all__ = [] + +################################################################## +# Test structure +# + +__all__ += ["test", "end_part", "run_tests", "get_current_test"] + +TESTS = [] +TOTAL = POSSIBLE = 0 +PART_TOTAL = PART_POSSIBLE = 0 +CURRENT_TEST = None +GRADES = {} + +def test(points, title=None, parent=None): + """Decorator for declaring test functions. If title is None, the + title of the test will be derived from the function name by + stripping the leading "test_" and replacing underscores with + spaces.""" + + def register_test(fn, title=title): + if not title: + assert fn.__name__.startswith("test_") + title = fn.__name__[5:].replace("_", " ") + if parent: + title = " " + title + + def run_test(): + global TOTAL, POSSIBLE, CURRENT_TEST, GRADES + + # Handle test dependencies + if run_test.complete: + return run_test.ok + run_test.complete = True + parent_failed = False + if parent: + parent_failed = not parent() + + # Run the test + fail = None + start = time.time() + CURRENT_TEST = run_test + sys.stdout.write("== Test %s == " % title) + if parent: + sys.stdout.write("\n") + sys.stdout.flush() + try: + if parent_failed: + raise AssertionError('Parent failed: %s' % parent.__name__) + fn() + except AssertionError as e: + fail = str(e) + + # Display and handle test result + POSSIBLE += points + if points: + print("%s: %s" % (title, \ + (color("red", "FAIL") if fail else color("green", "OK"))), end=' ') + if time.time() - start > 0.1: + print("(%.1fs)" % (time.time() - start), end=' ') + print() + if fail: + print(" %s" % fail.replace("\n", "\n ")) + else: + TOTAL += points + if points: + GRADES[title] = 0 if fail else points + + for callback in run_test.on_finish: + callback(fail) + CURRENT_TEST = None + + run_test.ok = not fail + return run_test.ok + + # Record test metadata on the test wrapper function + run_test.__name__ = fn.__name__ + run_test.title = title + run_test.complete = False + run_test.ok = False + run_test.on_finish = [] + TESTS.append(run_test) + return run_test + return register_test + +def end_part(name): + def show_part(): + global PART_TOTAL, PART_POSSIBLE + print("Part %s score: %d/%d" % \ + (name, TOTAL - PART_TOTAL, POSSIBLE - PART_POSSIBLE)) + print() + PART_TOTAL, PART_POSSIBLE = TOTAL, POSSIBLE + show_part.title = "" + TESTS.append(show_part) + +def write_results(): + global options + if not options.results: + return + try: + with open(options.results, "w") as f: + f.write(json.dumps(GRADES)) + except OSError as e: + print("Provided a bad results path. Error:", e) + +def run_tests(): + """Set up for testing and run the registered test functions.""" + + # Handle command line + global options + parser = OptionParser(usage="usage: %prog [-v] [filters...]") + parser.add_option("-v", "--verbose", action="store_true", + help="print commands") + parser.add_option("--color", choices=["never", "always", "auto"], + default="auto", help="never, always, or auto") + parser.add_option("--results", help="results file path") + (options, args) = parser.parse_args() + + # Start with a full build to catch build errors + make() + + # Clean the file system if there is one + reset_fs() + + # Run tests + limit = list(map(str.lower, args)) + try: + for test in TESTS: + if not limit or any(l in test.title.lower() for l in limit): + test() + if not limit: + write_results() + print("Score: %d/%d" % (TOTAL, POSSIBLE)) + except KeyboardInterrupt: + pass + if TOTAL < POSSIBLE: + sys.exit(1) + +def get_current_test(): + if not CURRENT_TEST: + raise RuntimeError("No test is running") + return CURRENT_TEST + +################################################################## +# Assertions +# + +__all__ += ["assert_equal", "assert_lines_match"] + +def assert_equal(got, expect, msg=""): + if got == expect: + return + if msg: + msg += "\n" + raise AssertionError("%sgot:\n %s\nexpected:\n %s" % + (msg, str(got).replace("\n", "\n "), + str(expect).replace("\n", "\n "))) + +def assert_lines_match(text, *regexps, **kw): + """Assert that all of regexps match some line in text. If a 'no' + keyword argument is given, it must be a list of regexps that must + *not* match any line in text.""" + + def assert_lines_match_kw(no=[]): + return no + no = assert_lines_match_kw(**kw) + + # Check text against regexps + lines = text.splitlines() + good = set() + bad = set() + for i, line in enumerate(lines): + if any(re.match(r, line) for r in regexps): + good.add(i) + regexps = [r for r in regexps if not re.match(r, line)] + if any(re.match(r, line) for r in no): + bad.add(i) + + if not regexps and not bad: + return + + # We failed; construct an informative failure message + show = set() + for lineno in good.union(bad): + for offset in range(-2, 3): + show.add(lineno + offset) + if regexps: + show.update(n for n in range(len(lines) - 5, len(lines))) + + msg = [] + last = -1 + for lineno in sorted(show): + if 0 <= lineno < len(lines): + if lineno != last + 1: + msg.append("...") + last = lineno + msg.append("%s %s" % (color("red", "BAD ") if lineno in bad else + color("green", "GOOD") if lineno in good + else " ", + lines[lineno])) + if last != len(lines) - 1: + msg.append("...") + if bad: + msg.append("unexpected lines in output") + for r in regexps: + msg.append(color("red", "MISSING") + " '%s'" % r) + raise AssertionError("\n".join(msg)) + +################################################################## +# Utilities +# + +__all__ += ["make", "maybe_unlink", "reset_fs", "color", "random_str", "check_time", "check_answers"] + +MAKE_TIMESTAMP = 0 + +def pre_make(): + """Delay prior to running make to ensure file mtimes change.""" + while int(time.time()) == MAKE_TIMESTAMP: + time.sleep(0.1) + +def post_make(): + """Record the time after make completes so that the next run of + make can be delayed if needed.""" + global MAKE_TIMESTAMP + MAKE_TIMESTAMP = int(time.time()) + +def make(*target): + pre_make() + if Popen(("make",) + target).wait(): + sys.exit(1) + post_make() + +def show_command(cmd): + from pipes import quote + print("\n$", " ".join(map(quote, cmd))) + +def maybe_unlink(*paths): + for path in paths: + try: + os.unlink(path) + except EnvironmentError as e: + if e.errno != errno.ENOENT: + raise + +COLORS = {"default": "\033[0m", "red": "\033[31m", "green": "\033[32m"} + +def color(name, text): + if options.color == "always" or (options.color == "auto" and os.isatty(1)): + return COLORS[name] + text + COLORS["default"] + return text + +def reset_fs(): + if os.path.exists("obj/fs/clean-fs.img"): + shutil.copyfile("obj/fs/clean-fs.img", "obj/fs/fs.img") + +def random_str(n=8): + letters = string.ascii_letters + string.digits + return ''.join(random.choice(letters) for _ in range(n)) + +def check_time(): + try: + print("") + with open('time.txt') as f: + d = f.read().strip() + if not re.match(r'^\d+$', d): + raise AssertionError('time.txt does not contain a single integer (number of hours spent on the lab)') + except IOError: + raise AssertionError('Cannot read time.txt') + +def check_answers(file, n=10): + try: + print("") + with open(file) as f: + d = f.read().strip() + if len(d) < n: + raise AssertionError('%s does not seem to contain enough text' % file) + except IOError: + raise AssertionError('Cannot read %s' % file) + + +################################################################## +# Controllers +# + +__all__ += ["QEMU", "GDBClient"] + +class QEMU(object): + _GDBPORT = None + + def __init__(self, *make_args): + # Check that QEMU is not currently running + try: + GDBClient(self.get_gdb_port(), timeout=0).close() + except socket.error: + pass + else: + print("""\ +GDB stub found on port %d. +QEMU appears to already be running. Please exit it if possible or use +'killall qemu' or 'killall qemu.real'.""" % self.get_gdb_port(), file=sys.stderr) + sys.exit(1) + + if options.verbose: + show_command(("make",) + make_args) + cmd = ("make", "-s", "--no-print-directory") + make_args + self.proc = Popen(cmd, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + stdin=subprocess.PIPE) + # Accumulated output as a string + self.output = "" + # Accumulated output as a bytearray + self.outbytes = bytearray() + self.on_output = [] + + @staticmethod + def get_gdb_port(): + if QEMU._GDBPORT is None: + p = Popen(["make", "-s", "--no-print-directory", "print-gdbport"], + stdout=subprocess.PIPE) + (out, _) = p.communicate() + if p.returncode: + raise RuntimeError( + "Failed to get gdbport: make exited with %d" % + p.returncode) + QEMU._GDBPORT = int(out) + return QEMU._GDBPORT + + def fileno(self): + if self.proc: + return self.proc.stdout.fileno() + + def handle_read(self): + buf = os.read(self.proc.stdout.fileno(), 4096) + self.outbytes.extend(buf) + self.output = self.outbytes.decode("utf-8", "replace") + for callback in self.on_output: + callback(buf) + if buf == b"": + self.wait() + return + + def write(self, buf): + if isinstance(buf, str): + buf = buf.encode('utf-8') + self.proc.stdin.write(buf) + self.proc.stdin.flush() + + def wait(self): + if self.proc: + self.proc.wait() + self.proc = None + + def kill(self): + if self.proc: + self.proc.terminate() + +class GDBClient(object): + def __init__(self, port, timeout=15): + start = time.time() + while True: + self.sock = socket.socket() + try: + self.sock.settimeout(1) + self.sock.connect(("localhost", port)) + break + except socket.error: + if time.time() >= start + timeout: + raise + self.__buf = "" + + def fileno(self): + if self.sock: + return self.sock.fileno() + + def handle_read(self): + try: + data = self.sock.recv(4096).decode("ascii", "replace") + except socket.error: + data = "" + if data == "": + self.sock.close() + self.sock = None + return + self.__buf += data + + while True: + m = re.search(r"\$([^#]*)#[0-9a-zA-Z]{2}", self.__buf) + if not m: + break + pkt = m.group(1) + self.__buf = self.__buf[m.end():] + + if pkt.startswith("T05"): + # Breakpoint + raise TerminateTest + + def __send(self, cmd): + packet = "$%s#%02x" % (cmd, sum(map(ord, cmd)) % 256) + self.sock.sendall(packet.encode("ascii")) + + def __send_break(self): + self.sock.sendall(b"\x03") + + def close(self): + if self.sock: + self.sock.close() + self.sock = None + + def cont(self): + self.__send("c") + + def breakpoint(self, addr): + self.__send("Z1,%x,1" % addr) + + +################################################################## +# QEMU test runner +# + +__all__ += ["TerminateTest", "Runner"] + +class TerminateTest(Exception): + pass + +class Runner(): + def __init__(self, *default_monitors): + self.__default_monitors = default_monitors + + def run_qemu(self, *monitors, **kw): + """Run a QEMU-based test. monitors should functions that will + be called with this Runner instance once QEMU and GDB are + started. Typically, they should register callbacks that throw + TerminateTest when stop events occur. The target_base + argument gives the make target to run. The make_args argument + should be a list of additional arguments to pass to make. The + timeout argument bounds how long to run before returning.""" + + def run_qemu_kw(target_base="qemu", make_args=[], timeout=30): + return target_base, make_args, timeout + target_base, make_args, timeout = run_qemu_kw(**kw) + + # Start QEMU + pre_make() + self.qemu = QEMU(target_base + "-gdb", *make_args) + self.gdb = None + + try: + # Wait for QEMU to start or make to fail. This will set + # self.gdb if QEMU starts. + self.qemu.on_output = [self.__monitor_start] + self.__react([self.qemu], timeout=90) + self.qemu.on_output = [] + if self.gdb is None: + print("Failed to connect to QEMU; output:") + print(self.qemu.output) + sys.exit(1) + post_make() + + # QEMU and GDB are up + self.reactors = [self.qemu, self.gdb] + + # Start monitoring + for m in self.__default_monitors + monitors: + m(self) + + # Run and react + self.gdb.cont() + self.__react(self.reactors, timeout) + finally: + # Shutdown QEMU + try: + if self.gdb is None: + sys.exit(1) + self.qemu.kill() + self.__react(self.reactors, 5) + self.gdb.close() + self.qemu.wait() + except: + print("""\ +Failed to shutdown QEMU. You might need to 'killall qemu' or +'killall qemu.real'. +""") + raise + + def __monitor_start(self, output): + if b"\n" in output: + try: + self.gdb = GDBClient(self.qemu.get_gdb_port(), timeout=2) + raise TerminateTest + except socket.error: + pass + if not len(output): + raise TerminateTest + + def __react(self, reactors, timeout): + deadline = time.time() + timeout + try: + while True: + timeleft = deadline - time.time() + if timeleft < 0: + sys.stdout.write("Timeout! ") + sys.stdout.flush() + return + + rset = [r for r in reactors if r.fileno() is not None] + if not rset: + return + + rset, _, _ = select.select(rset, [], [], timeleft) + for reactor in rset: + reactor.handle_read() + except TerminateTest: + pass + + def user_test(self, binary, *monitors, **kw): + """Run a user test using the specified binary. Monitors and + keyword arguments are as for run_qemu. This runs on a disk + snapshot unless the keyword argument 'snapshot' is False.""" + + maybe_unlink("obj/kern/init.o", "obj/kern/kernel") + if kw.pop("snapshot", True): + kw.setdefault("make_args", []).append("QEMUEXTRA+=-snapshot") + self.run_qemu(target_base="run-%s" % binary, *monitors, **kw) + + def match(self, *args, **kwargs): + """Shortcut to call assert_lines_match on the most recent QEMU + output.""" + + assert_lines_match(self.qemu.output, *args, **kwargs) + +################################################################## +# Monitors +# + +__all__ += ["save", "stop_breakpoint", "call_on_line", "stop_on_line", "shell_script"] + +def save(path): + """Return a monitor that writes QEMU's output to path. If the + test fails, copy the output to path.test-name.""" + + def setup_save(runner): + f.seek(0) + f.truncate() + runner.qemu.on_output.append(f.write) + get_current_test().on_finish.append(save_on_finish) + + def save_on_finish(fail): + f.flush() + save_path = path + "." + get_current_test().__name__[5:] + if fail: + shutil.copyfile(path, save_path) + print(" QEMU output saved to %s" % save_path) + elif os.path.exists(save_path): + os.unlink(save_path) + print(" (Old %s failure log removed)" % save_path) + + f = open(path, "wb") + return setup_save + +def stop_breakpoint(addr): + """Returns a monitor that stops when addr is reached. addr may be + a number or the name of a symbol.""" + + def setup_breakpoint(runner): + if isinstance(addr, str): + addrs = [int(sym[:16], 16) for sym in open("kernel/kernel.sym") + if sym[17:].strip() == addr] + assert len(addrs), "Symbol %s not found" % addr + runner.gdb.breakpoint(addrs[0]) + else: + runner.gdb.breakpoint(addr) + return setup_breakpoint + +def call_on_line(regexp, callback): + """Returns a monitor that calls 'callback' when QEMU prints a line + matching 'regexp'.""" + + def setup_call_on_line(runner): + buf = bytearray() + def handle_output(output): + buf.extend(output) + while b"\n" in buf: + line, buf[:] = buf.split(b"\n", 1) + line = line.decode("utf-8", "replace") + if re.match(regexp, line): + callback(line) + runner.qemu.on_output.append(handle_output) + return setup_call_on_line + +def stop_on_line(regexp): + """Returns a monitor that stops when QEMU prints a line matching + 'regexp'.""" + + def stop(line): + raise TerminateTest + return call_on_line(regexp, stop) + +def shell_script(script, terminate_match=None): + """Returns a monitor that plays the script, and stops when the script is + done executing.""" + + def setup_call_on_line(runner): + class context: + n = 0 + buf = bytearray() + def handle_output(output): + context.buf.extend(output) + if terminate_match is not None: + if re.match(terminate_match, context.buf.decode('utf-8', 'replace')): + raise TerminateTest + if b'$ ' in context.buf: + context.buf = bytearray() + if context.n < len(script): + runner.qemu.write(script[context.n]) + runner.qemu.write('\n') + context.n += 1 + else: + if terminate_match is None: + raise TerminateTest + runner.qemu.on_output.append(handle_output) + return setup_call_on_line |