How to use start_test method in Lemoncheesecake

Best Python code snippet using lemoncheesecake

test_offload.py

Source:test_offload.py Github

copy

Full Screen

...67 return68 print("FAIL: " + msg)69 log("FAIL: " + msg, "", level=1)70 os.sys.exit(1)71def start_test(msg):72 log(msg, "", level=1)73 log_level_inc()74 print(msg)75def cmd(cmd, shell=True, include_stderr=False, background=False, fail=True):76 """77 Run a command in subprocess and return tuple of (retval, stdout);78 optionally return stderr as well as third value.79 """80 proc = subprocess.Popen(cmd, shell=shell, stdout=subprocess.PIPE,81 stderr=subprocess.PIPE)82 if background:83 msg = "%s START: %s" % (log_get_sec(1),84 datetime.now().strftime("%H:%M:%S.%f"))85 log("BKG " + proc.args, msg)86 return proc87 return cmd_result(proc, include_stderr=include_stderr, fail=fail)88def cmd_result(proc, include_stderr=False, fail=False):89 stdout, stderr = proc.communicate()90 stdout = stdout.decode("utf-8")91 stderr = stderr.decode("utf-8")92 proc.stdout.close()93 proc.stderr.close()94 stderr = "\n" + stderr95 if stderr[-1] == "\n":96 stderr = stderr[:-1]97 sec = log_get_sec(1)98 log("CMD " + proc.args,99 "RETCODE: %d\n%s STDOUT:\n%s%s STDERR:%s\n%s END: %s" %100 (proc.returncode, sec, stdout, sec, stderr,101 sec, datetime.now().strftime("%H:%M:%S.%f")))102 if proc.returncode != 0 and fail:103 if len(stderr) > 0 and stderr[-1] == "\n":104 stderr = stderr[:-1]105 raise Exception("Command failed: %s\n%s" % (proc.args, stderr))106 if include_stderr:107 return proc.returncode, stdout, stderr108 else:109 return proc.returncode, stdout110def rm(f):111 cmd("rm -f %s" % (f))112 if f in files:113 files.remove(f)114def tool(name, args, flags, JSON=True, ns="", fail=True, include_stderr=False):115 params = ""116 if JSON:117 params += "%s " % (flags["json"])118 if ns != "":119 ns = "ip netns exec %s " % (ns)120 if include_stderr:121 ret, stdout, stderr = cmd(ns + name + " " + params + args,122 fail=fail, include_stderr=True)123 else:124 ret, stdout = cmd(ns + name + " " + params + args,125 fail=fail, include_stderr=False)126 if JSON and len(stdout.strip()) != 0:127 out = json.loads(stdout)128 else:129 out = stdout130 if include_stderr:131 return ret, out, stderr132 else:133 return ret, out134def bpftool(args, JSON=True, ns="", fail=True, include_stderr=False):135 return tool("bpftool", args, {"json":"-p"}, JSON=JSON, ns=ns,136 fail=fail, include_stderr=include_stderr)137def bpftool_prog_list(expected=None, ns=""):138 _, progs = bpftool("prog show", JSON=True, ns=ns, fail=True)139 # Remove the base progs140 for p in base_progs:141 if p in progs:142 progs.remove(p)143 if expected is not None:144 if len(progs) != expected:145 fail(True, "%d BPF programs loaded, expected %d" %146 (len(progs), expected))147 return progs148def bpftool_map_list(expected=None, ns=""):149 _, maps = bpftool("map show", JSON=True, ns=ns, fail=True)150 # Remove the base maps151 for m in base_maps:152 if m in maps:153 maps.remove(m)154 if expected is not None:155 if len(maps) != expected:156 fail(True, "%d BPF maps loaded, expected %d" %157 (len(maps), expected))158 return maps159def bpftool_prog_list_wait(expected=0, n_retry=20):160 for i in range(n_retry):161 nprogs = len(bpftool_prog_list())162 if nprogs == expected:163 return164 time.sleep(0.05)165 raise Exception("Time out waiting for program counts to stabilize want %d, have %d" % (expected, nprogs))166def bpftool_map_list_wait(expected=0, n_retry=20):167 for i in range(n_retry):168 nmaps = len(bpftool_map_list())169 if nmaps == expected:170 return171 time.sleep(0.05)172 raise Exception("Time out waiting for map counts to stabilize want %d, have %d" % (expected, nmaps))173def bpftool_prog_load(sample, file_name, maps=[], prog_type="xdp", dev=None,174 fail=True, include_stderr=False):175 args = "prog load %s %s" % (os.path.join(bpf_test_dir, sample), file_name)176 if prog_type is not None:177 args += " type " + prog_type178 if dev is not None:179 args += " dev " + dev180 if len(maps):181 args += " map " + " map ".join(maps)182 res = bpftool(args, fail=fail, include_stderr=include_stderr)183 if res[0] == 0:184 files.append(file_name)185 return res186def ip(args, force=False, JSON=True, ns="", fail=True, include_stderr=False):187 if force:188 args = "-force " + args189 return tool("ip", args, {"json":"-j"}, JSON=JSON, ns=ns,190 fail=fail, include_stderr=include_stderr)191def tc(args, JSON=True, ns="", fail=True, include_stderr=False):192 return tool("tc", args, {"json":"-p"}, JSON=JSON, ns=ns,193 fail=fail, include_stderr=include_stderr)194def ethtool(dev, opt, args, fail=True):195 return cmd("ethtool %s %s %s" % (opt, dev["ifname"], args), fail=fail)196def bpf_obj(name, sec=".text", path=bpf_test_dir,):197 return "obj %s sec %s" % (os.path.join(path, name), sec)198def bpf_pinned(name):199 return "pinned %s" % (name)200def bpf_bytecode(bytecode):201 return "bytecode \"%s\"" % (bytecode)202def mknetns(n_retry=10):203 for i in range(n_retry):204 name = ''.join([random.choice(string.ascii_letters) for i in range(8)])205 ret, _ = ip("netns add %s" % (name), fail=False)206 if ret == 0:207 netns.append(name)208 return name209 return None210def int2str(fmt, val):211 ret = []212 for b in struct.pack(fmt, val):213 ret.append(int(b))214 return " ".join(map(lambda x: str(x), ret))215def str2int(strtab):216 inttab = []217 for i in strtab:218 inttab.append(int(i, 16))219 ba = bytearray(inttab)220 if len(strtab) == 4:221 fmt = "I"222 elif len(strtab) == 8:223 fmt = "Q"224 else:225 raise Exception("String array of len %d can't be unpacked to an int" %226 (len(strtab)))227 return struct.unpack(fmt, ba)[0]228class DebugfsDir:229 """230 Class for accessing DebugFS directories as a dictionary.231 """232 def __init__(self, path):233 self.path = path234 self._dict = self._debugfs_dir_read(path)235 def __len__(self):236 return len(self._dict.keys())237 def __getitem__(self, key):238 if type(key) is int:239 key = list(self._dict.keys())[key]240 return self._dict[key]241 def __setitem__(self, key, value):242 log("DebugFS set %s = %s" % (key, value), "")243 log_level_inc()244 cmd("echo '%s' > %s/%s" % (value, self.path, key))245 log_level_dec()246 _, out = cmd('cat %s/%s' % (self.path, key))247 self._dict[key] = out.strip()248 def _debugfs_dir_read(self, path):249 dfs = {}250 log("DebugFS state for %s" % (path), "")251 log_level_inc(add=2)252 _, out = cmd('ls ' + path)253 for f in out.split():254 p = os.path.join(path, f)255 if os.path.isfile(p):256 _, out = cmd('cat %s/%s' % (path, f))257 dfs[f] = out.strip()258 elif os.path.isdir(p):259 dfs[f] = DebugfsDir(p)260 else:261 raise Exception("%s is neither file nor directory" % (p))262 log_level_dec()263 log("DebugFS state", dfs)264 log_level_dec()265 return dfs266class NetdevSim:267 """268 Class for netdevsim netdevice and its attributes.269 """270 def __init__(self, link=None):271 self.link = link272 self.dev = self._netdevsim_create()273 devs.append(self)274 self.ns = ""275 self.dfs_dir = '/sys/kernel/debug/netdevsim/%s' % (self.dev['ifname'])276 self.sdev_dir = self.dfs_dir + '/sdev/'277 self.dfs_refresh()278 def __getitem__(self, key):279 return self.dev[key]280 def _netdevsim_create(self):281 link = "" if self.link is None else "link " + self.link.dev['ifname']282 _, old = ip("link show")283 ip("link add sim%d {link} type netdevsim".format(link=link))284 _, new = ip("link show")285 for dev in new:286 f = filter(lambda x: x["ifname"] == dev["ifname"], old)287 if len(list(f)) == 0:288 return dev289 raise Exception("failed to create netdevsim device")290 def remove(self):291 devs.remove(self)292 ip("link del dev %s" % (self.dev["ifname"]), ns=self.ns)293 def dfs_refresh(self):294 self.dfs = DebugfsDir(self.dfs_dir)295 return self.dfs296 def dfs_read(self, f):297 path = os.path.join(self.dfs_dir, f)298 _, data = cmd('cat %s' % (path))299 return data.strip()300 def dfs_num_bound_progs(self):301 path = os.path.join(self.sdev_dir, "bpf_bound_progs")302 _, progs = cmd('ls %s' % (path))303 return len(progs.split())304 def dfs_get_bound_progs(self, expected):305 progs = DebugfsDir(os.path.join(self.sdev_dir, "bpf_bound_progs"))306 if expected is not None:307 if len(progs) != expected:308 fail(True, "%d BPF programs bound, expected %d" %309 (len(progs), expected))310 return progs311 def wait_for_flush(self, bound=0, total=0, n_retry=20):312 for i in range(n_retry):313 nbound = self.dfs_num_bound_progs()314 nprogs = len(bpftool_prog_list())315 if nbound == bound and nprogs == total:316 return317 time.sleep(0.05)318 raise Exception("Time out waiting for program counts to stabilize want %d/%d, have %d bound, %d loaded" % (bound, total, nbound, nprogs))319 def set_ns(self, ns):320 name = "1" if ns == "" else ns321 ip("link set dev %s netns %s" % (self.dev["ifname"], name), ns=self.ns)322 self.ns = ns323 def set_mtu(self, mtu, fail=True):324 return ip("link set dev %s mtu %d" % (self.dev["ifname"], mtu),325 fail=fail)326 def set_xdp(self, bpf, mode, force=False, JSON=True, verbose=False,327 fail=True, include_stderr=False):328 if verbose:329 bpf += " verbose"330 return ip("link set dev %s xdp%s %s" % (self.dev["ifname"], mode, bpf),331 force=force, JSON=JSON,332 fail=fail, include_stderr=include_stderr)333 def unset_xdp(self, mode, force=False, JSON=True,334 fail=True, include_stderr=False):335 return ip("link set dev %s xdp%s off" % (self.dev["ifname"], mode),336 force=force, JSON=JSON,337 fail=fail, include_stderr=include_stderr)338 def ip_link_show(self, xdp):339 _, link = ip("link show dev %s" % (self['ifname']))340 if len(link) > 1:341 raise Exception("Multiple objects on ip link show")342 if len(link) < 1:343 return {}344 fail(xdp != "xdp" in link,345 "XDP program not reporting in iplink (reported %s, expected %s)" %346 ("xdp" in link, xdp))347 return link[0]348 def tc_add_ingress(self):349 tc("qdisc add dev %s ingress" % (self['ifname']))350 def tc_del_ingress(self):351 tc("qdisc del dev %s ingress" % (self['ifname']))352 def tc_flush_filters(self, bound=0, total=0):353 self.tc_del_ingress()354 self.tc_add_ingress()355 self.wait_for_flush(bound=bound, total=total)356 def tc_show_ingress(self, expected=None):357 # No JSON support, oh well...358 flags = ["skip_sw", "skip_hw", "in_hw"]359 named = ["protocol", "pref", "chain", "handle", "id", "tag"]360 args = "-s filter show dev %s ingress" % (self['ifname'])361 _, out = tc(args, JSON=False)362 filters = []363 lines = out.split('\n')364 for line in lines:365 words = line.split()366 if "handle" not in words:367 continue368 fltr = {}369 for flag in flags:370 fltr[flag] = flag in words371 for name in named:372 try:373 idx = words.index(name)374 fltr[name] = words[idx + 1]375 except ValueError:376 pass377 filters.append(fltr)378 if expected is not None:379 fail(len(filters) != expected,380 "%d ingress filters loaded, expected %d" %381 (len(filters), expected))382 return filters383 def cls_filter_op(self, op, qdisc="ingress", prio=None, handle=None,384 chain=None, cls="", params="",385 fail=True, include_stderr=False):386 spec = ""387 if prio is not None:388 spec += " prio %d" % (prio)389 if handle:390 spec += " handle %s" % (handle)391 if chain is not None:392 spec += " chain %d" % (chain)393 return tc("filter {op} dev {dev} {qdisc} {spec} {cls} {params}"\394 .format(op=op, dev=self['ifname'], qdisc=qdisc, spec=spec,395 cls=cls, params=params),396 fail=fail, include_stderr=include_stderr)397 def cls_bpf_add_filter(self, bpf, op="add", prio=None, handle=None,398 chain=None, da=False, verbose=False,399 skip_sw=False, skip_hw=False,400 fail=True, include_stderr=False):401 cls = "bpf " + bpf402 params = ""403 if da:404 params += " da"405 if verbose:406 params += " verbose"407 if skip_sw:408 params += " skip_sw"409 if skip_hw:410 params += " skip_hw"411 return self.cls_filter_op(op=op, prio=prio, handle=handle, cls=cls,412 chain=chain, params=params,413 fail=fail, include_stderr=include_stderr)414 def set_ethtool_tc_offloads(self, enable, fail=True):415 args = "hw-tc-offload %s" % ("on" if enable else "off")416 return ethtool(self, "-K", args, fail=fail)417################################################################################418def clean_up():419 global files, netns, devs420 for dev in devs:421 dev.remove()422 for f in files:423 cmd("rm -f %s" % (f))424 for ns in netns:425 cmd("ip netns delete %s" % (ns))426 files = []427 netns = []428def pin_prog(file_name, idx=0):429 progs = bpftool_prog_list(expected=(idx + 1))430 prog = progs[idx]431 bpftool("prog pin id %d %s" % (prog["id"], file_name))432 files.append(file_name)433 return file_name, bpf_pinned(file_name)434def pin_map(file_name, idx=0, expected=1):435 maps = bpftool_map_list(expected=expected)436 m = maps[idx]437 bpftool("map pin id %d %s" % (m["id"], file_name))438 files.append(file_name)439 return file_name, bpf_pinned(file_name)440def check_dev_info_removed(prog_file=None, map_file=None):441 bpftool_prog_list(expected=0)442 ret, err = bpftool("prog show pin %s" % (prog_file), fail=False)443 fail(ret == 0, "Showing prog with removed device did not fail")444 fail(err["error"].find("No such device") == -1,445 "Showing prog with removed device expected ENODEV, error is %s" %446 (err["error"]))447 bpftool_map_list(expected=0)448 ret, err = bpftool("map show pin %s" % (map_file), fail=False)449 fail(ret == 0, "Showing map with removed device did not fail")450 fail(err["error"].find("No such device") == -1,451 "Showing map with removed device expected ENODEV, error is %s" %452 (err["error"]))453def check_dev_info(other_ns, ns, prog_file=None, map_file=None, removed=False):454 progs = bpftool_prog_list(expected=1, ns=ns)455 prog = progs[0]456 fail("dev" not in prog.keys(), "Device parameters not reported")457 dev = prog["dev"]458 fail("ifindex" not in dev.keys(), "Device parameters not reported")459 fail("ns_dev" not in dev.keys(), "Device parameters not reported")460 fail("ns_inode" not in dev.keys(), "Device parameters not reported")461 if not other_ns:462 fail("ifname" not in dev.keys(), "Ifname not reported")463 fail(dev["ifname"] != sim["ifname"],464 "Ifname incorrect %s vs %s" % (dev["ifname"], sim["ifname"]))465 else:466 fail("ifname" in dev.keys(), "Ifname is reported for other ns")467 maps = bpftool_map_list(expected=2, ns=ns)468 for m in maps:469 fail("dev" not in m.keys(), "Device parameters not reported")470 fail(dev != m["dev"], "Map's device different than program's")471def check_extack(output, reference, args):472 if skip_extack:473 return474 lines = output.split("\n")475 comp = len(lines) >= 2 and lines[1] == 'Error: ' + reference476 fail(not comp, "Missing or incorrect netlink extack message")477def check_extack_nsim(output, reference, args):478 check_extack(output, "netdevsim: " + reference, args)479def check_no_extack(res, needle):480 fail((res[1] + res[2]).count(needle) or (res[1] + res[2]).count("Warning:"),481 "Found '%s' in command output, leaky extack?" % (needle))482def check_verifier_log(output, reference):483 lines = output.split("\n")484 for l in reversed(lines):485 if l == reference:486 return487 fail(True, "Missing or incorrect message from netdevsim in verifier log")488def test_spurios_extack(sim, obj, skip_hw, needle):489 res = sim.cls_bpf_add_filter(obj, prio=1, handle=1, skip_hw=skip_hw,490 include_stderr=True)491 check_no_extack(res, needle)492 res = sim.cls_bpf_add_filter(obj, op="replace", prio=1, handle=1,493 skip_hw=skip_hw, include_stderr=True)494 check_no_extack(res, needle)495 res = sim.cls_filter_op(op="delete", prio=1, handle=1, cls="bpf",496 include_stderr=True)497 check_no_extack(res, needle)498# Parse command line499parser = argparse.ArgumentParser()500parser.add_argument("--log", help="output verbose log to given file")501args = parser.parse_args()502if args.log:503 logfile = open(args.log, 'w+')504 logfile.write("# -*-Org-*-")505log("Prepare...", "", level=1)506log_level_inc()507# Check permissions508skip(os.getuid() != 0, "test must be run as root")509# Check tools510ret, progs = bpftool("prog", fail=False)511skip(ret != 0, "bpftool not installed")512base_progs = progs513_, base_maps = bpftool("map")514# Check netdevsim515ret, out = cmd("modprobe netdevsim", fail=False)516skip(ret != 0, "netdevsim module could not be loaded")517# Check debugfs518_, out = cmd("mount")519if out.find("/sys/kernel/debug type debugfs") == -1:520 cmd("mount -t debugfs none /sys/kernel/debug")521# Check samples are compiled522samples = ["sample_ret0.o", "sample_map_ret0.o"]523for s in samples:524 ret, out = cmd("ls %s/%s" % (bpf_test_dir, s), fail=False)525 skip(ret != 0, "sample %s/%s not found, please compile it" %526 (bpf_test_dir, s))527# Check if iproute2 is built with libmnl (needed by extack support)528_, _, err = cmd("tc qdisc delete dev lo handle 0",529 fail=False, include_stderr=True)530if err.find("Error: Failed to find qdisc with specified handle.") == -1:531 print("Warning: no extack message in iproute2 output, libmnl missing?")532 log("Warning: no extack message in iproute2 output, libmnl missing?", "")533 skip_extack = True534# Check if net namespaces seem to work535ns = mknetns()536skip(ns is None, "Could not create a net namespace")537cmd("ip netns delete %s" % (ns))538netns = []539try:540 obj = bpf_obj("sample_ret0.o")541 bytecode = bpf_bytecode("1,6 0 0 4294967295,")542 start_test("Test destruction of generic XDP...")543 sim = NetdevSim()544 sim.set_xdp(obj, "generic")545 sim.remove()546 bpftool_prog_list_wait(expected=0)547 sim = NetdevSim()548 sim.tc_add_ingress()549 start_test("Test TC non-offloaded...")550 ret, _ = sim.cls_bpf_add_filter(obj, skip_hw=True, fail=False)551 fail(ret != 0, "Software TC filter did not load")552 start_test("Test TC non-offloaded isn't getting bound...")553 ret, _ = sim.cls_bpf_add_filter(obj, fail=False)554 fail(ret != 0, "Software TC filter did not load")555 sim.dfs_get_bound_progs(expected=0)556 sim.tc_flush_filters()557 start_test("Test TC offloads are off by default...")558 ret, _, err = sim.cls_bpf_add_filter(obj, skip_sw=True,559 fail=False, include_stderr=True)560 fail(ret == 0, "TC filter loaded without enabling TC offloads")561 check_extack(err, "TC offload is disabled on net device.", args)562 sim.wait_for_flush()563 sim.set_ethtool_tc_offloads(True)564 sim.dfs["bpf_tc_non_bound_accept"] = "Y"565 start_test("Test TC offload by default...")566 ret, _ = sim.cls_bpf_add_filter(obj, fail=False)567 fail(ret != 0, "Software TC filter did not load")568 sim.dfs_get_bound_progs(expected=0)569 ingress = sim.tc_show_ingress(expected=1)570 fltr = ingress[0]571 fail(not fltr["in_hw"], "Filter not offloaded by default")572 sim.tc_flush_filters()573 start_test("Test TC cBPF bytcode tries offload by default...")574 ret, _ = sim.cls_bpf_add_filter(bytecode, fail=False)575 fail(ret != 0, "Software TC filter did not load")576 sim.dfs_get_bound_progs(expected=0)577 ingress = sim.tc_show_ingress(expected=1)578 fltr = ingress[0]579 fail(not fltr["in_hw"], "Bytecode not offloaded by default")580 sim.tc_flush_filters()581 sim.dfs["bpf_tc_non_bound_accept"] = "N"582 start_test("Test TC cBPF unbound bytecode doesn't offload...")583 ret, _, err = sim.cls_bpf_add_filter(bytecode, skip_sw=True,584 fail=False, include_stderr=True)585 fail(ret == 0, "TC bytecode loaded for offload")586 check_extack_nsim(err, "netdevsim configured to reject unbound programs.",587 args)588 sim.wait_for_flush()589 start_test("Test non-0 chain offload...")590 ret, _, err = sim.cls_bpf_add_filter(obj, chain=1, prio=1, handle=1,591 skip_sw=True,592 fail=False, include_stderr=True)593 fail(ret == 0, "Offloaded a filter to chain other than 0")594 check_extack(err, "Driver supports only offload of chain 0.", args)595 sim.tc_flush_filters()596 start_test("Test TC replace...")597 sim.cls_bpf_add_filter(obj, prio=1, handle=1)598 sim.cls_bpf_add_filter(obj, op="replace", prio=1, handle=1)599 sim.cls_filter_op(op="delete", prio=1, handle=1, cls="bpf")600 sim.cls_bpf_add_filter(obj, prio=1, handle=1, skip_sw=True)601 sim.cls_bpf_add_filter(obj, op="replace", prio=1, handle=1, skip_sw=True)602 sim.cls_filter_op(op="delete", prio=1, handle=1, cls="bpf")603 sim.cls_bpf_add_filter(obj, prio=1, handle=1, skip_hw=True)604 sim.cls_bpf_add_filter(obj, op="replace", prio=1, handle=1, skip_hw=True)605 sim.cls_filter_op(op="delete", prio=1, handle=1, cls="bpf")606 start_test("Test TC replace bad flags...")607 for i in range(3):608 for j in range(3):609 ret, _ = sim.cls_bpf_add_filter(obj, op="replace", prio=1, handle=1,610 skip_sw=(j == 1), skip_hw=(j == 2),611 fail=False)612 fail(bool(ret) != bool(j),613 "Software TC incorrect load in replace test, iteration %d" %614 (j))615 sim.cls_filter_op(op="delete", prio=1, handle=1, cls="bpf")616 start_test("Test spurious extack from the driver...")617 test_spurios_extack(sim, obj, False, "netdevsim")618 test_spurios_extack(sim, obj, True, "netdevsim")619 sim.set_ethtool_tc_offloads(False)620 test_spurios_extack(sim, obj, False, "TC offload is disabled")621 test_spurios_extack(sim, obj, True, "TC offload is disabled")622 sim.set_ethtool_tc_offloads(True)623 sim.tc_flush_filters()624 start_test("Test TC offloads work...")625 ret, _, err = sim.cls_bpf_add_filter(obj, verbose=True, skip_sw=True,626 fail=False, include_stderr=True)627 fail(ret != 0, "TC filter did not load with TC offloads enabled")628 check_verifier_log(err, "[netdevsim] Hello from netdevsim!")629 start_test("Test TC offload basics...")630 dfs = sim.dfs_get_bound_progs(expected=1)631 progs = bpftool_prog_list(expected=1)632 ingress = sim.tc_show_ingress(expected=1)633 dprog = dfs[0]634 prog = progs[0]635 fltr = ingress[0]636 fail(fltr["skip_hw"], "TC does reports 'skip_hw' on offloaded filter")637 fail(not fltr["in_hw"], "TC does not report 'in_hw' for offloaded filter")638 fail(not fltr["skip_sw"], "TC does not report 'skip_sw' back")639 start_test("Test TC offload is device-bound...")640 fail(str(prog["id"]) != fltr["id"], "Program IDs don't match")641 fail(prog["tag"] != fltr["tag"], "Program tags don't match")642 fail(fltr["id"] != dprog["id"], "Program IDs don't match")643 fail(dprog["state"] != "xlated", "Offloaded program state not translated")644 fail(dprog["loaded"] != "Y", "Offloaded program is not loaded")645 start_test("Test disabling TC offloads is rejected while filters installed...")646 ret, _ = sim.set_ethtool_tc_offloads(False, fail=False)647 fail(ret == 0, "Driver should refuse to disable TC offloads with filters installed...")648 start_test("Test qdisc removal frees things...")649 sim.tc_flush_filters()650 sim.tc_show_ingress(expected=0)651 start_test("Test disabling TC offloads is OK without filters...")652 ret, _ = sim.set_ethtool_tc_offloads(False, fail=False)653 fail(ret != 0,654 "Driver refused to disable TC offloads without filters installed...")655 sim.set_ethtool_tc_offloads(True)656 start_test("Test destroying device gets rid of TC filters...")657 sim.cls_bpf_add_filter(obj, skip_sw=True)658 sim.remove()659 bpftool_prog_list_wait(expected=0)660 sim = NetdevSim()661 sim.set_ethtool_tc_offloads(True)662 start_test("Test destroying device gets rid of XDP...")663 sim.set_xdp(obj, "offload")664 sim.remove()665 bpftool_prog_list_wait(expected=0)666 sim = NetdevSim()667 sim.set_ethtool_tc_offloads(True)668 start_test("Test XDP prog reporting...")669 sim.set_xdp(obj, "drv")670 ipl = sim.ip_link_show(xdp=True)671 progs = bpftool_prog_list(expected=1)672 fail(ipl["xdp"]["prog"]["id"] != progs[0]["id"],673 "Loaded program has wrong ID")674 start_test("Test XDP prog replace without force...")675 ret, _ = sim.set_xdp(obj, "drv", fail=False)676 fail(ret == 0, "Replaced XDP program without -force")677 sim.wait_for_flush(total=1)678 start_test("Test XDP prog replace with force...")679 ret, _ = sim.set_xdp(obj, "drv", force=True, fail=False)680 fail(ret != 0, "Could not replace XDP program with -force")681 bpftool_prog_list_wait(expected=1)682 ipl = sim.ip_link_show(xdp=True)683 progs = bpftool_prog_list(expected=1)684 fail(ipl["xdp"]["prog"]["id"] != progs[0]["id"],685 "Loaded program has wrong ID")686 fail("dev" in progs[0].keys(),687 "Device parameters reported for non-offloaded program")688 start_test("Test XDP prog replace with bad flags...")689 ret, _, err = sim.set_xdp(obj, "generic", force=True,690 fail=False, include_stderr=True)691 fail(ret == 0, "Replaced XDP program with a program in different mode")692 fail(err.count("File exists") != 1, "Replaced driver XDP with generic")693 ret, _, err = sim.set_xdp(obj, "", force=True,694 fail=False, include_stderr=True)695 fail(ret == 0, "Replaced XDP program with a program in different mode")696 check_extack(err, "program loaded with different flags.", args)697 start_test("Test XDP prog remove with bad flags...")698 ret, _, err = sim.unset_xdp("", force=True,699 fail=False, include_stderr=True)700 fail(ret == 0, "Removed program with a bad mode")701 check_extack(err, "program loaded with different flags.", args)702 start_test("Test MTU restrictions...")703 ret, _ = sim.set_mtu(9000, fail=False)704 fail(ret == 0,705 "Driver should refuse to increase MTU to 9000 with XDP loaded...")706 sim.unset_xdp("drv")707 bpftool_prog_list_wait(expected=0)708 sim.set_mtu(9000)709 ret, _, err = sim.set_xdp(obj, "drv", fail=False, include_stderr=True)710 fail(ret == 0, "Driver should refuse to load program with MTU of 9000...")711 check_extack_nsim(err, "MTU too large w/ XDP enabled.", args)712 sim.set_mtu(1500)713 sim.wait_for_flush()714 start_test("Test non-offload XDP attaching to HW...")715 bpftool_prog_load("sample_ret0.o", "/sys/fs/bpf/nooffload")716 nooffload = bpf_pinned("/sys/fs/bpf/nooffload")717 ret, _, err = sim.set_xdp(nooffload, "offload",718 fail=False, include_stderr=True)719 fail(ret == 0, "attached non-offloaded XDP program to HW")720 check_extack_nsim(err, "xdpoffload of non-bound program.", args)721 rm("/sys/fs/bpf/nooffload")722 start_test("Test offload XDP attaching to drv...")723 bpftool_prog_load("sample_ret0.o", "/sys/fs/bpf/offload",724 dev=sim['ifname'])725 offload = bpf_pinned("/sys/fs/bpf/offload")726 ret, _, err = sim.set_xdp(offload, "drv", fail=False, include_stderr=True)727 fail(ret == 0, "attached offloaded XDP program to drv")728 check_extack(err, "using device-bound program without HW_MODE flag is not supported.", args)729 rm("/sys/fs/bpf/offload")730 sim.wait_for_flush()731 start_test("Test XDP offload...")732 _, _, err = sim.set_xdp(obj, "offload", verbose=True, include_stderr=True)733 ipl = sim.ip_link_show(xdp=True)734 link_xdp = ipl["xdp"]["prog"]735 progs = bpftool_prog_list(expected=1)736 prog = progs[0]737 fail(link_xdp["id"] != prog["id"], "Loaded program has wrong ID")738 check_verifier_log(err, "[netdevsim] Hello from netdevsim!")739 start_test("Test XDP offload is device bound...")740 dfs = sim.dfs_get_bound_progs(expected=1)741 dprog = dfs[0]742 fail(prog["id"] != link_xdp["id"], "Program IDs don't match")743 fail(prog["tag"] != link_xdp["tag"], "Program tags don't match")744 fail(str(link_xdp["id"]) != dprog["id"], "Program IDs don't match")745 fail(dprog["state"] != "xlated", "Offloaded program state not translated")746 fail(dprog["loaded"] != "Y", "Offloaded program is not loaded")747 start_test("Test removing XDP program many times...")748 sim.unset_xdp("offload")749 sim.unset_xdp("offload")750 sim.unset_xdp("drv")751 sim.unset_xdp("drv")752 sim.unset_xdp("")753 sim.unset_xdp("")754 bpftool_prog_list_wait(expected=0)755 start_test("Test attempt to use a program for a wrong device...")756 sim2 = NetdevSim()757 sim2.set_xdp(obj, "offload")758 pin_file, pinned = pin_prog("/sys/fs/bpf/tmp")759 ret, _, err = sim.set_xdp(pinned, "offload",760 fail=False, include_stderr=True)761 fail(ret == 0, "Pinned program loaded for a different device accepted")762 check_extack_nsim(err, "program bound to different dev.", args)763 sim2.remove()764 ret, _, err = sim.set_xdp(pinned, "offload",765 fail=False, include_stderr=True)766 fail(ret == 0, "Pinned program loaded for a removed device accepted")767 check_extack_nsim(err, "xdpoffload of non-bound program.", args)768 rm(pin_file)769 bpftool_prog_list_wait(expected=0)770 start_test("Test multi-attachment XDP - attach...")771 sim.set_xdp(obj, "offload")772 xdp = sim.ip_link_show(xdp=True)["xdp"]773 offloaded = sim.dfs_read("bpf_offloaded_id")774 fail("prog" not in xdp, "Base program not reported in single program mode")775 fail(len(ipl["xdp"]["attached"]) != 1,776 "Wrong attached program count with one program")777 sim.set_xdp(obj, "")778 two_xdps = sim.ip_link_show(xdp=True)["xdp"]779 offloaded2 = sim.dfs_read("bpf_offloaded_id")780 fail(two_xdps["mode"] != 4, "Bad mode reported with multiple programs")781 fail("prog" in two_xdps, "Base program reported in multi program mode")782 fail(xdp["attached"][0] not in two_xdps["attached"],783 "Offload program not reported after driver activated")784 fail(len(two_xdps["attached"]) != 2,785 "Wrong attached program count with two programs")786 fail(two_xdps["attached"][0]["prog"]["id"] ==787 two_xdps["attached"][1]["prog"]["id"],788 "offloaded and drv programs have the same id")789 fail(offloaded != offloaded2,790 "offload ID changed after loading driver program")791 start_test("Test multi-attachment XDP - replace...")792 ret, _, err = sim.set_xdp(obj, "offload", fail=False, include_stderr=True)793 fail(err.count("busy") != 1, "Replaced one of programs without -force")794 start_test("Test multi-attachment XDP - detach...")795 ret, _, err = sim.unset_xdp("drv", force=True,796 fail=False, include_stderr=True)797 fail(ret == 0, "Removed program with a bad mode")798 check_extack(err, "program loaded with different flags.", args)799 sim.unset_xdp("offload")800 xdp = sim.ip_link_show(xdp=True)["xdp"]801 offloaded = sim.dfs_read("bpf_offloaded_id")802 fail(xdp["mode"] != 1, "Bad mode reported after multiple programs")803 fail("prog" not in xdp,804 "Base program not reported after multi program mode")805 fail(xdp["attached"][0] not in two_xdps["attached"],806 "Offload program not reported after driver activated")807 fail(len(ipl["xdp"]["attached"]) != 1,808 "Wrong attached program count with remaining programs")809 fail(offloaded != "0", "offload ID reported with only driver program left")810 start_test("Test multi-attachment XDP - device remove...")811 sim.set_xdp(obj, "offload")812 sim.remove()813 sim = NetdevSim()814 sim.set_ethtool_tc_offloads(True)815 start_test("Test mixing of TC and XDP...")816 sim.tc_add_ingress()817 sim.set_xdp(obj, "offload")818 ret, _, err = sim.cls_bpf_add_filter(obj, skip_sw=True,819 fail=False, include_stderr=True)820 fail(ret == 0, "Loading TC when XDP active should fail")821 check_extack_nsim(err, "driver and netdev offload states mismatch.", args)822 sim.unset_xdp("offload")823 sim.wait_for_flush()824 sim.cls_bpf_add_filter(obj, skip_sw=True)825 ret, _, err = sim.set_xdp(obj, "offload", fail=False, include_stderr=True)826 fail(ret == 0, "Loading XDP when TC active should fail")827 check_extack_nsim(err, "TC program is already loaded.", args)828 start_test("Test binding TC from pinned...")829 pin_file, pinned = pin_prog("/sys/fs/bpf/tmp")830 sim.tc_flush_filters(bound=1, total=1)831 sim.cls_bpf_add_filter(pinned, da=True, skip_sw=True)832 sim.tc_flush_filters(bound=1, total=1)833 start_test("Test binding XDP from pinned...")834 sim.set_xdp(obj, "offload")835 pin_file, pinned = pin_prog("/sys/fs/bpf/tmp2", idx=1)836 sim.set_xdp(pinned, "offload", force=True)837 sim.unset_xdp("offload")838 sim.set_xdp(pinned, "offload", force=True)839 sim.unset_xdp("offload")840 start_test("Test offload of wrong type fails...")841 ret, _ = sim.cls_bpf_add_filter(pinned, da=True, skip_sw=True, fail=False)842 fail(ret == 0, "Managed to attach XDP program to TC")843 start_test("Test asking for TC offload of two filters...")844 sim.cls_bpf_add_filter(obj, da=True, skip_sw=True)845 ret, _, err = sim.cls_bpf_add_filter(obj, da=True, skip_sw=True,846 fail=False, include_stderr=True)847 fail(ret == 0, "Managed to offload two TC filters at the same time")848 check_extack_nsim(err, "driver and netdev offload states mismatch.", args)849 sim.tc_flush_filters(bound=2, total=2)850 start_test("Test if netdev removal waits for translation...")851 delay_msec = 500852 sim.dfs["bpf_bind_verifier_delay"] = delay_msec853 start = time.time()854 cmd_line = "tc filter add dev %s ingress bpf %s da skip_sw" % \855 (sim['ifname'], obj)856 tc_proc = cmd(cmd_line, background=True, fail=False)857 # Wait for the verifier to start858 while sim.dfs_num_bound_progs() <= 2:859 pass860 sim.remove()861 end = time.time()862 ret, _ = cmd_result(tc_proc, fail=False)863 time_diff = end - start864 log("Time", "start:\t%s\nend:\t%s\ndiff:\t%s" % (start, end, time_diff))865 fail(ret == 0, "Managed to load TC filter on a unregistering device")866 delay_sec = delay_msec * 0.001867 fail(time_diff < delay_sec, "Removal process took %s, expected %s" %868 (time_diff, delay_sec))869 # Remove all pinned files and reinstantiate the netdev870 clean_up()871 bpftool_prog_list_wait(expected=0)872 sim = NetdevSim()873 map_obj = bpf_obj("sample_map_ret0.o")874 start_test("Test loading program with maps...")875 sim.set_xdp(map_obj, "offload", JSON=False) # map fixup msg breaks JSON876 start_test("Test bpftool bound info reporting (own ns)...")877 check_dev_info(False, "")878 start_test("Test bpftool bound info reporting (other ns)...")879 ns = mknetns()880 sim.set_ns(ns)881 check_dev_info(True, "")882 start_test("Test bpftool bound info reporting (remote ns)...")883 check_dev_info(False, ns)884 start_test("Test bpftool bound info reporting (back to own ns)...")885 sim.set_ns("")886 check_dev_info(False, "")887 prog_file, _ = pin_prog("/sys/fs/bpf/tmp_prog")888 map_file, _ = pin_map("/sys/fs/bpf/tmp_map", idx=1, expected=2)889 sim.remove()890 start_test("Test bpftool bound info reporting (removed dev)...")891 check_dev_info_removed(prog_file=prog_file, map_file=map_file)892 # Remove all pinned files and reinstantiate the netdev893 clean_up()894 bpftool_prog_list_wait(expected=0)895 sim = NetdevSim()896 start_test("Test map update (no flags)...")897 sim.set_xdp(map_obj, "offload", JSON=False) # map fixup msg breaks JSON898 maps = bpftool_map_list(expected=2)899 array = maps[0] if maps[0]["type"] == "array" else maps[1]900 htab = maps[0] if maps[0]["type"] == "hash" else maps[1]901 for m in maps:902 for i in range(2):903 bpftool("map update id %d key %s value %s" %904 (m["id"], int2str("I", i), int2str("Q", i * 3)))905 for m in maps:906 ret, _ = bpftool("map update id %d key %s value %s" %907 (m["id"], int2str("I", 3), int2str("Q", 3 * 3)),908 fail=False)909 fail(ret == 0, "added too many entries")910 start_test("Test map update (exists)...")911 for m in maps:912 for i in range(2):913 bpftool("map update id %d key %s value %s exist" %914 (m["id"], int2str("I", i), int2str("Q", i * 3)))915 for m in maps:916 ret, err = bpftool("map update id %d key %s value %s exist" %917 (m["id"], int2str("I", 3), int2str("Q", 3 * 3)),918 fail=False)919 fail(ret == 0, "updated non-existing key")920 fail(err["error"].find("No such file or directory") == -1,921 "expected ENOENT, error is '%s'" % (err["error"]))922 start_test("Test map update (noexist)...")923 for m in maps:924 for i in range(2):925 ret, err = bpftool("map update id %d key %s value %s noexist" %926 (m["id"], int2str("I", i), int2str("Q", i * 3)),927 fail=False)928 fail(ret == 0, "updated existing key")929 fail(err["error"].find("File exists") == -1,930 "expected EEXIST, error is '%s'" % (err["error"]))931 start_test("Test map dump...")932 for m in maps:933 _, entries = bpftool("map dump id %d" % (m["id"]))934 for i in range(2):935 key = str2int(entries[i]["key"])936 fail(key != i, "expected key %d, got %d" % (key, i))937 val = str2int(entries[i]["value"])938 fail(val != i * 3, "expected value %d, got %d" % (val, i * 3))939 start_test("Test map getnext...")940 for m in maps:941 _, entry = bpftool("map getnext id %d" % (m["id"]))942 key = str2int(entry["next_key"])943 fail(key != 0, "next key %d, expected %d" % (key, 0))944 _, entry = bpftool("map getnext id %d key %s" %945 (m["id"], int2str("I", 0)))946 key = str2int(entry["next_key"])947 fail(key != 1, "next key %d, expected %d" % (key, 1))948 ret, err = bpftool("map getnext id %d key %s" %949 (m["id"], int2str("I", 1)), fail=False)950 fail(ret == 0, "got next key past the end of map")951 fail(err["error"].find("No such file or directory") == -1,952 "expected ENOENT, error is '%s'" % (err["error"]))953 start_test("Test map delete (htab)...")954 for i in range(2):955 bpftool("map delete id %d key %s" % (htab["id"], int2str("I", i)))956 start_test("Test map delete (array)...")957 for i in range(2):958 ret, err = bpftool("map delete id %d key %s" %959 (htab["id"], int2str("I", i)), fail=False)960 fail(ret == 0, "removed entry from an array")961 fail(err["error"].find("No such file or directory") == -1,962 "expected ENOENT, error is '%s'" % (err["error"]))963 start_test("Test map remove...")964 sim.unset_xdp("offload")965 bpftool_map_list_wait(expected=0)966 sim.remove()967 sim = NetdevSim()968 sim.set_xdp(map_obj, "offload", JSON=False) # map fixup msg breaks JSON969 sim.remove()970 bpftool_map_list_wait(expected=0)971 start_test("Test map creation fail path...")972 sim = NetdevSim()973 sim.dfs["bpf_map_accept"] = "N"974 ret, _ = sim.set_xdp(map_obj, "offload", JSON=False, fail=False)975 fail(ret == 0,976 "netdevsim didn't refuse to create a map with offload disabled")977 sim.remove()978 start_test("Test multi-dev ASIC program reuse...")979 simA = NetdevSim()980 simB1 = NetdevSim()981 simB2 = NetdevSim(link=simB1)982 simB3 = NetdevSim(link=simB1)983 sims = (simA, simB1, simB2, simB3)984 simB = (simB1, simB2, simB3)985 bpftool_prog_load("sample_map_ret0.o", "/sys/fs/bpf/nsimA",986 dev=simA['ifname'])987 progA = bpf_pinned("/sys/fs/bpf/nsimA")988 bpftool_prog_load("sample_map_ret0.o", "/sys/fs/bpf/nsimB",989 dev=simB1['ifname'])990 progB = bpf_pinned("/sys/fs/bpf/nsimB")991 simA.set_xdp(progA, "offload", JSON=False)992 for d in simB:993 d.set_xdp(progB, "offload", JSON=False)994 start_test("Test multi-dev ASIC cross-dev replace...")995 ret, _ = simA.set_xdp(progB, "offload", force=True, JSON=False, fail=False)996 fail(ret == 0, "cross-ASIC program allowed")997 for d in simB:998 ret, _ = d.set_xdp(progA, "offload", force=True, JSON=False, fail=False)999 fail(ret == 0, "cross-ASIC program allowed")1000 start_test("Test multi-dev ASIC cross-dev install...")1001 for d in sims:1002 d.unset_xdp("offload")1003 ret, _, err = simA.set_xdp(progB, "offload", force=True, JSON=False,1004 fail=False, include_stderr=True)1005 fail(ret == 0, "cross-ASIC program allowed")1006 check_extack_nsim(err, "program bound to different dev.", args)1007 for d in simB:1008 ret, _, err = d.set_xdp(progA, "offload", force=True, JSON=False,1009 fail=False, include_stderr=True)1010 fail(ret == 0, "cross-ASIC program allowed")1011 check_extack_nsim(err, "program bound to different dev.", args)1012 start_test("Test multi-dev ASIC cross-dev map reuse...")1013 mapA = bpftool("prog show %s" % (progA))[1]["map_ids"][0]1014 mapB = bpftool("prog show %s" % (progB))[1]["map_ids"][0]1015 ret, _ = bpftool_prog_load("sample_map_ret0.o", "/sys/fs/bpf/nsimB_",1016 dev=simB3['ifname'],1017 maps=["idx 0 id %d" % (mapB)],1018 fail=False)1019 fail(ret != 0, "couldn't reuse a map on the same ASIC")1020 rm("/sys/fs/bpf/nsimB_")1021 ret, _, err = bpftool_prog_load("sample_map_ret0.o", "/sys/fs/bpf/nsimA_",1022 dev=simA['ifname'],1023 maps=["idx 0 id %d" % (mapB)],1024 fail=False, include_stderr=True)1025 fail(ret == 0, "could reuse a map on a different ASIC")1026 fail(err.count("offload device mismatch between prog and map") == 0,1027 "error message missing for cross-ASIC map")1028 ret, _, err = bpftool_prog_load("sample_map_ret0.o", "/sys/fs/bpf/nsimB_",1029 dev=simB1['ifname'],1030 maps=["idx 0 id %d" % (mapA)],1031 fail=False, include_stderr=True)1032 fail(ret == 0, "could reuse a map on a different ASIC")1033 fail(err.count("offload device mismatch between prog and map") == 0,1034 "error message missing for cross-ASIC map")1035 start_test("Test multi-dev ASIC cross-dev destruction...")1036 bpftool_prog_list_wait(expected=2)1037 simA.remove()1038 bpftool_prog_list_wait(expected=1)1039 ifnameB = bpftool("prog show %s" % (progB))[1]["dev"]["ifname"]1040 fail(ifnameB != simB1['ifname'], "program not bound to originial device")1041 simB1.remove()1042 bpftool_prog_list_wait(expected=1)1043 start_test("Test multi-dev ASIC cross-dev destruction - move...")1044 ifnameB = bpftool("prog show %s" % (progB))[1]["dev"]["ifname"]1045 fail(ifnameB not in (simB2['ifname'], simB3['ifname']),1046 "program not bound to remaining devices")1047 simB2.remove()1048 ifnameB = bpftool("prog show %s" % (progB))[1]["dev"]["ifname"]1049 fail(ifnameB != simB3['ifname'], "program not bound to remaining device")1050 simB3.remove()1051 bpftool_prog_list_wait(expected=0)1052 start_test("Test multi-dev ASIC cross-dev destruction - orphaned...")1053 ret, out = bpftool("prog show %s" % (progB), fail=False)1054 fail(ret == 0, "got information about orphaned program")1055 fail("error" not in out, "no error reported for get info on orphaned")1056 fail(out["error"] != "can't get prog info: No such device",1057 "wrong error for get info on orphaned")1058 print("%s: OK" % (os.path.basename(__file__)))1059finally:1060 log("Clean up...", "", level=1)1061 log_level_inc()...

Full Screen

Full Screen

validation.py

Source:validation.py Github

copy

Full Screen

...11# ---- STAGE slide_is_possible -----12def tests_slide_is_possible(test_slide):13 vt.STAGE_SLIDE_IS_OVER = test_slide14 #15 vt.start_test('slide impossible with empty line')16 vt.test_slide_is_possible([0, 0, 0, 0], False)17 #18 vt.start_test('slide impossible with full line')19 vt.test_slide_is_possible([1, 2, 1, 2], False)20 #21 vt.start_test('slide impossible with almost full line')22 vt.test_slide_is_possible([3, 2, 3, 0], False)23 #24 vt.start_test('slide impossible with two tiles')25 vt.test_slide_is_possible([1, 3, 0, 0], False)26 #27 vt.start_test('slide impossible with one tile')28 vt.test_slide_is_possible([2, 0, 0, 0], False)29 #30 vt.start_test('slide possible with one tile (pos 1)')31 vt.test_slide_is_possible([0, 1, 0, 0], True)32 #33 vt.start_test('slide with one tile (pos 2)')34 vt.test_slide_is_possible([0, 0, 2, 0], True)35 #36 vt.start_test('slide with one tile (pos 3)')37 vt.test_slide_is_possible([0, 0, 0, 3], True)38 #39 vt.start_test('slide with two different tiles v1')40 vt.test_slide_is_possible([0, 2, 0, 3], True)41 #42 vt.start_test('slide with two different tiles v2')43 vt.test_slide_is_possible([3, 0, 2, 0], True)44 #45 vt.start_test('slide with two different tiles v3')46 vt.test_slide_is_possible([1, 0, 0, 3], True)47 #48 vt.start_test('slide with three different tiles v1')49 vt.test_slide_is_possible([1, 2, 0, 1], True)50 #51 vt.start_test('slide with three different tiles v2')52 vt.test_slide_is_possible([2, 0, 1, 2], True)53 #54 vt.start_test('slide with two identical consecutive tiles v1')55 vt.test_slide_is_possible([1, 1, 0, 0], True)56 #57 vt.start_test('slide with two identical consecutive tiles v2')58 vt.test_slide_is_possible([2, 1, 1, 0], True)59 #60 vt.start_test('slide with two identical consecutive tiles v3')61 vt.test_slide_is_possible([1, 2, 3, 3], True)62tests_slide_is_possible(False)63vt.valid_current_stage("rules.slide_is_possible")64 65# ---- STAGE move_dir_possible + game_over -----66def test_move_dir_possible():67 for d in rules.DIRECTIONS:68 vt.start_test('impossible move dir on ' + rules.DIR_NAME[d] + ' (empty)')69 vt.test_move_dir_possible(d, rules.EMPTYBOARD, False)70 #71 empty1_board = [72 [1, 2, 3, 4],73 [5, 6, 7, 8],74 [9, 10, 11, 12],75 [0, 14, 15, 16],76 ]77 #78 for d in (rules.RIGHT, rules.UP):79 vt.start_test('impossible move dir on ' + rules.DIR_NAME[d] + ' (empty1)')80 vt.test_move_dir_possible(d, empty1_board, False)81 #82 for d in (rules.LEFT, rules.DOWN):83 vt.start_test('possible move dir on ' + rules.DIR_NAME[d] + ' (empty1)')84 vt.test_move_dir_possible(d, empty1_board, True)85 #86 empty1_board = vt.perm_dir(rules.UP, empty1_board)87 #88 for d in (rules.RIGHT, rules.UP):89 vt.start_test('possible move dir on ' + rules.DIR_NAME[d] + ' (empty1)')90 vt.test_move_dir_possible(d, empty1_board, True)91 #92 for d in (rules.LEFT, rules.DOWN):93 vt.start_test('impossible move dir on ' + rules.DIR_NAME[d] + ' (empty1)')94 vt.test_move_dir_possible(d, empty1_board, False)95 #96 for d in (rules.UP, rules.DOWN):97 vt.start_test('impossible move dir on ' +98 rules.DIR_NAME[d] + ' (XFULLBOARD)')99 vt.test_move_dir_possible(d, rules.XFULLBOARD, False)100 #101 for d in (rules.LEFT, rules.RIGHT):102 vt.start_test('possible move dir on ' +103 rules.DIR_NAME[d] + ' (XFULLBOARD)')104 vt.test_move_dir_possible(d, rules.XFULLBOARD, True)105 #106 myboard = vt.perm_dir(rules.DOWN, rules.XFULLBOARD)107 #108 for d in (rules.UP, rules.DOWN):109 vt.start_test('possible move dir on ' +110 rules.DIR_NAME[d] + ' (XFULLBOARD)')111 vt.test_move_dir_possible(d, myboard, True)112 #113 for d in (rules.LEFT, rules.RIGHT):114 vt.start_test('impossible move dir on ' +115 rules.DIR_NAME[d] + ' (XFULLBOARD)')116 vt.test_move_dir_possible(d, myboard, False)117test_move_dir_possible()118 119vt.start_test('game.over')120assert rules.game_over(rules.FULLBOARD)121vt.valid_current_stage("move_dir_possible + game_over")122# ---- STAGE slide -----123vt.start_test('slide with zero tile')124vt.test_slide(125 [0, 0, 0, 0],126 [0, 0, 0, 0])127vt.start_test('slide with one tile (pos 0)')128vt.test_slide(129 [4, 0, 0, 0],130 [4, 0, 0, 0])131vt.start_test('slide with one tile (pos 1)')132vt.test_slide(133 [0, 1, 0, 0],134 [1, 0, 0, 0])135vt.start_test('slide with one tile (pos 2)')136vt.test_slide(137 [0, 0, 2, 0],138 [2, 0, 0, 0])139vt.start_test('slide with one tile (pos 3)')140vt.test_slide(141 [0, 0, 0, 3],142 [3, 0, 0, 0])143vt.start_test('slide with two different tiles at bottom')144vt.test_slide(145 [1, 2, 0, 0],146 [1, 2, 0, 0])147vt.start_test('slide with two different tiles v1')148vt.test_slide(149 [0, 2, 0, 3],150 [2, 3, 0, 0])151vt.start_test('slide with two different tiles v2')152vt.test_slide(153 [3, 0, 0, 2],154 [3, 2, 0, 0])155vt.start_test('slide with two identical tiles')156vt.test_slide(157 [0, 1, 0, 1],158 [2, 0, 0, 0])159vt.start_test('slide with two identical consecutive tiles')160vt.test_slide(161 [0, 3, 3, 0],162 [4, 0, 0, 0])163vt.start_test('slide with two identical consecutive tiles at bottom')164vt.test_slide(165 [2, 2, 0, 0],166 [3, 0, 0, 0])167vt.start_test('slide with 3 mixed tiles')168vt.test_slide(169 [2, 1, 0, 2],170 [2, 1, 2, 0])171vt.start_test('slide with 3 identical consecutive tiles')172vt.test_slide(173 [4, 4, 4, 0],174 [5, 4, 0, 0])175vt.start_test('slide with 3 identical non-consecutive tiles')176vt.test_slide(177 [4, 4, 0, 4],178 [5, 4, 0, 0])179vt.start_test('slide with 3 identical tiles ')180vt.test_slide(181 [4, 0, 4, 4],182 [5, 4, 0, 0])183vt.start_test('slide with 2 identical tiles and 1 greater')184vt.test_slide(185 [4, 4, 5, 0],186 [5, 5, 0, 0])187vt.start_test('slide with 1 greater and 2 identical tiles')188vt.test_slide(189 [5, 4, 0, 4],190 [5, 5, 0, 0])191vt.start_test('slide with 4 identical tiles')192vt.test_slide(193 [6, 6, 6, 6],194 [7, 7, 0, 0])195tests_slide_is_possible(True)196vt.valid_current_stage("rules.slide")197# ---- STAGE MOVE_DIR -----198simple_board = [199 [0, 0, 0, 0],200 [0, 2, 0, 0],201 [0, 0, 3, 0],202 [0, 0, 0, 0],203]204vt.start_test('Simple rules.move_dir left')205vt.test_move_dir(rules.LEFT, simple_board, [206 [0, 0, 0, 0],207 [2, 0, 0, 0],208 [3, 0, 0, 0],209 [0, 0, 0, 0],210])211vt.start_test('Simple rules.move_dir right')212vt.test_move_dir(rules.RIGHT, simple_board, [213 [0, 0, 0, 0],214 [0, 0, 0, 2],215 [0, 0, 0, 3],216 [0, 0, 0, 0],217])218vt.start_test('Simple rules.move_dir up')219vt.test_move_dir(rules.UP, simple_board, [220 [0, 2, 3, 0],221 [0, 0, 0, 0],222 [0, 0, 0, 0],223 [0, 0, 0, 0],224])225vt.start_test('Simple rules.move_dir down')226vt.test_move_dir(rules.DOWN, simple_board, [227 [0, 0, 0, 0],228 [0, 0, 0, 0],229 [0, 0, 0, 0],230 [0, 2, 3, 0],231])232complex_board= [233 [2, 4, 2, 3], 234 [0, 0, 0, 0], 235 [0, 0, 0, 0], 236 [0, 0, 2, 0]237]238vt.start_test('Complex rules.move_dir left')239vt.test_move_dir(rules.LEFT, complex_board, [240 [2, 4, 2, 3], 241 [0, 0, 0, 0], 242 [0, 0, 0, 0], 243 [2, 0, 0, 0]244])245vt.start_test('Complex rules.move_dir right')246vt.test_move_dir(rules.RIGHT, complex_board, [247 [2, 4, 2, 3], 248 [0, 0, 0, 0], 249 [0, 0, 0, 0], 250 [0, 0, 0, 2]251])252vt.start_test('Complex rules.move_dir down')253vt.test_move_dir(rules.DOWN, complex_board, [254 [0, 0, 0, 0], 255 [0, 0, 0, 0], 256 [0, 0, 0, 0], 257 [2, 4, 3, 3]258])259vt.start_test('Complex rules.move_dir up')260vt.test_move_dir(rules.UP, complex_board, [261 [2, 4, 3, 3],262 [0, 0, 0, 0], 263 [0, 0, 0, 0], 264 [0, 0, 0, 0]265])266complex_board = [267 [2, 2, 2, 1],268 [2, 2, 2, 0],269 [1, 2, 1, 1],270 [1, 0, 0, 0],271]272vt.start_test('Complex rules.move_dir left (2)')273vt.test_move_dir(rules.LEFT, complex_board, [274 [3, 2, 1, 0],275 [3, 2, 0, 0],276 [1, 2, 2, 0],277 [1, 0, 0, 0],278])279vt.start_test('Complex rules.move_dir right (2)')280vt.test_move_dir(rules.RIGHT, complex_board, [281 [0, 2, 3, 1],282 [0, 0, 2, 3],283 [0, 1, 2, 2],284 [0, 0, 0, 1],285])286vt.start_test('Complex rules.move_dir down (2)')287vt.test_move_dir(rules.DOWN, complex_board, [288 [0, 0, 0, 0],289 [0, 0, 0, 0],290 [3, 2, 3, 0],291 [2, 3, 1, 2],292])293vt.start_test('Complex rules.move_dir up (2)')294vt.test_move_dir(rules.UP, complex_board, [295 [3, 3, 3, 2],296 [2, 2, 1, 0],297 [0, 0, 0, 0],298 [0, 0, 0, 0],299])300test_move_dir_possible()301vt.valid_current_stage("rules.move_dir")302# ---- STAGE MEAN-SCORE -----303vt.start_test('mean_score.game_direction_first on FULLBOARD')304vt.test_game_direction_first(rules.FULLBOARD, (65536, 0))305vt.start_test('mean_score.game_direction_first on XFULLBOARD')306vt.test_game_direction_first(rules.XFULLBOARD, (64, 4))307vt.start_test('mean_score.game_direction_first on STEP0')308vt.test_game_direction_first(rules.STEP0, (512, 334))309vt.start_test('mean_score.game_direction_first torture DIR')310vt.torture_game_direction_first(vt.play_DOWN,311 players.first_tile,312 rules.XFULLBOARD)313vt.start_test('mean_score.game_direction_first torture TILE')314vt.torture_game_direction_first(players.first_direction,315 vt.play_FIRST,316 rules.XFULLBOARD)317vt.start_test('mean_score.game_direction_first torture None')318vt.torture_game_direction_first(vt.play_None,319 players.first_tile,320 rules.STEP0)321vt.start_test('mean_score.game_tile_first on FULLBOARD')322vt.test_game_tile_first(rules.FULLBOARD, (65536, 0))323vt.start_test('mean_score.game_tile_first on XFULLBOARD')324vt.test_game_tile_first(rules.XFULLBOARD, (32, 0))325vt.start_test('mean_score.game_tile_first on STEP0')326vt.test_game_tile_first(rules.STEP0, (512, 334))327vt.start_test('mean_score.game_tile_first on EMPTYBOARD')328vt.test_game_tile_first(rules.EMPTYBOARD, (512, 337))329myboard = [330 [1, 2, 3, 4],331 [2, 3, 4, 5],332 [3, 4, 3, 2],333 [4, 0, 5, 5],334]335vt.start_test('mean_score.game_tile_first torture DIR')336vt.torture_game_tile_first(vt.play_DOWN,337 players.first_tile,338 myboard)339vt.start_test('mean_score.game_tile_first torture TILE')340vt.torture_game_tile_first(players.first_direction,341 vt.play_FIRST,342 myboard)343vt.valid_current_stage("mean_score.games")344# ---- STAGE LEVEL -----345vt.start_test('rules.level EMPTYBOARD')346vt.test_invperm(rules.level, rules.EMPTYBOARD, 0)347vt.start_test('rules.level basic corner 0')348vt.test_invperm(rules.level, [349 [1, 0, 0, 0],350 [0, 0, 1, 0],351 [0, 1, 0, 0],352 [0, 0, 0, 0]353], 3)354vt.start_test('rules.level basic corner 1')355vt.test_invperm(rules.level, [356 [0, 1, 1, 0],357 [0, 1, 0, 0],358 [0, 0, 0, 0],359 [0, 0, 0, 0]360], 10)361vt.start_test('rules.level basic corner 2')362vt.test_invperm(rules.level, [363 [0, 0, 0, 0],364 [0, 1, 1, 0],365 [0, 1, 0, 0],366 [0, 0, 0, 0]367], 10)368vt.start_test('rules.level basic corner 3')369vt.test_invperm(rules.level, [370 [0, 0, 1, 1],371 [0, 0, 0, 1],372 [0, 0, 0, 0],373 [0, 0, 0, 0]374], 10)375vt.start_test('rules.level basic corner 4')376vt.test_invperm(rules.level, [377 [1, 1, 0, 0],378 [0, 0, 0, 0],379 [1, 0, 0, 0],380 [0, 0, 0, 0]381], 10)382vt.start_test('rules.level line 0')383vt.test_invperm(rules.level, [384 [0, 0, 0, 1],385 [0, 0, 1, 0],386 [0, 1, 0, 0],387 [1, 0, 0, 0]388], 4)389vt.start_test('rules.level line 1')390vt.test_invperm(rules.level, [391 [0, 1, 1, 1],392 [0, 0, 0, 0],393 [0, 0, 0, 0],394 [0, 0, 0, 0]395], 10)396vt.start_test('rules.level line 2')397vt.test_invperm(rules.level, [398 [0, 1, 1, 1],399 [0, 0, 0, 0],400 [0, 0, 0, 0],401 [1, 0, 0, 0]402], 11)403vt.start_test('rules.level line 3')404vt.test_invperm(rules.level, [405 [0, 1, 1, 1],406 [0, 0, 0, 0],407 [0, 0, 0, 0],408 [0, 0, 0, 1]409], 11)410vt.start_test('rules.level line 4')411vt.test_invperm(rules.level, [412 [1, 1, 1, 1],413 [0, 0, 0, 0],414 [0, 0, 0, 0],415 [0, 0, 0, 1]416], 19)417vt.start_test('rules.level line 5')418vt.test_invperm(rules.level, [419 [1, 1, 1, 1],420 [1, 0, 0, 1],421 [0, 0, 0, 0],422 [0, 0, 0, 0]423], 20)424vt.start_test('rules.level line 6')425vt.test_invperm(rules.level, [426 [1, 1, 1, 1],427 [0, 0, 0, 0],428 [0, 0, 0, 0],429 [0, 0, 0, 2]430], 99)431vt.start_test('rules.level line 7')432vt.test_invperm(rules.level, [433 [2, 0, 0, 0],434 [0, 1, 1, 1],435 [0, 0, 0, 0],436 [1, 0, 0, 0]437], 92)438vt.start_test('rules.level line 8')439vt.test_invperm(rules.level, [440 [0, 0, 0, 0],441 [0, 1, 1, 0],442 [0, 1, 2, 1],443 [0, 0, 0, 0]444], 92)445vt.start_test('rules.level full 1')446vt.test_invperm(rules.level, [447 [1, 1, 1, 1],448 [1, 1, 1, 1],449 [1, 1, 1, 1],450 [1, 1, 1, 1]451], 72)452vt.start_test('rules.level semi-full 1')453vt.test_invperm(rules.level, [454 [1, 0, 1, 0],455 [1, 0, 1, 0],456 [0, 0, 1, 1],457 [0, 0, 1, 1]458], 36)459vt.start_test('rules.level semi-full 2')460vt.test_invperm(rules.level, [461 [1, 0, 1, 0],462 [0, 1, 0, 1],463 [1, 0, 1, 0],464 [0, 1, 0, 1]465], 8)466vt.start_test('rules.level semi-full 3')467vt.test_invperm(rules.level, [468 [2, 0, 1, 0],469 [2, 0, 1, 0],470 [0, 2, 2, 0],471 [0, 1, 1, 0]472], 902)473vt.start_test('rules.level STEP0')474vt.test_invperm(rules.level, rules.STEP0, 6642)475vt.start_test('rules.level XFULLBOARD')476vt.test_invperm(rules.level, rules.XFULLBOARD, 432619463)477vt.start_test('rules.level FINAL1')478vt.test_invperm(rules.level, rules.FINAL1, 3476604868046168890712585943456)479vt.valid_current_stage("rules.level")480# -------------------------...

Full Screen

Full Screen

sarimax_script.py

Source:sarimax_script.py Github

copy

Full Screen

1import matplotlib.pyplot as plt2import pandas as pd3import numpy as np4import itertools5import statsmodels.api as sm6from statsmodels.tsa.stattools import adfuller7from datetime import datetime, timedelta8from dateutil.relativedelta import *9import statsmodels.tsa.api as smt10import seaborn as sns11from sklearn.metrics import mean_squared_error12import pickle13from script import *14#function to create training and testing set15def create_train_test(data, start_train, end_train, start_test, end_test, test_length=24):16 df_train = data.loc[start_train:end_train, :]17 df_test = data.loc[start_test:end_test, :]18 start = datetime.strptime(start_test, '%Y-%m-%d %H:%M:%S')19 date_list = [start + relativedelta(hours=x) for x in range(0,test_length)] #test set will always have 24 hours20 future = pd.DataFrame(index=date_list, columns= df_train.columns)21 df_train = pd.concat([df_train, future])22 return df_train, df_test23#function to add all exogenous variables24def add_exog(data, weather, start_time, end_time):25 #add dummy variables for precipitation26 precip = pd.get_dummies(weather.precip_type)27 data = data.join(precip)28 data['Day_of_Week'] = data.index.dayofweek29 data['Weekend'] = data.apply(is_weekend, axis=1)30 data['Temperature'] = weather.loc[start_time:end_time, 'temperature']31 data['Humidity'] = weather.loc[start_time:end_time, 'humidity']32 data['Precip_Intensity'] = weather.loc[start_time:end_time, 'precip_intensity']33 data.rename(columns={'rain':'Rain', 'sleet':'Sleet', 'snow':'Snow'}, inplace=True)34 #fill missing values with mean35 data['Temperature'] = data.Temperature.fillna(np.mean(data['Temperature']))36 data['Humidity'] = data.Humidity.fillna(np.mean(data['Humidity']))37 data['Precip_Intensity'] = data.Precip_Intensity.fillna(np.mean(data['Precip_Intensity']))38 return data39#function to start/end dates for train and test40def find_dates(building_id, length=1, total_length=30, final_date=None):41 start_train, end_test = find_egauge_dates(building_id, total_length)42 time_delta_1 = timedelta(days=length)43 time_delta_2 = timedelta(hours=1)44 end_train = end_test - time_delta_145 start_test = end_train + time_delta_246 start_train = str(start_train)47 end_train = str(end_train)48 start_test = str(start_test)49 end_test = str(end_test)50 return start_train, end_train, start_test, end_test51def fit_exog_arima(data, weather, building_id, length=1, total_length=30, test_length=24):52 start_train, end_train, start_test, end_test = find_dates(building_id, length=length, total_length=total_length)53 df_train, df_test = create_train_test(data, start_train, end_train, start_test, end_test, test_length)54 df_exog = add_exog(df_train, weather, start_train, end_test)55 exogenous = df_exog.loc[start_train:,['Weekend','Temperature','Humidity','car1']].astype(float)56 endogenous = df_exog.loc[:,'Hourly_Usage'].astype(float)57# low_aic = gridsearch_arima(endogenous,exogenous)58# arima_model = sm.tsa.statespace.SARIMAX(endog=endogenous,59# exog = exogenous,60# trend=None,61# order=low_aic[0],62# seasonal_order=low_aic[1],63# enforce_stationarity=False,64# enforce_invertibility=False)65 arima_model = sm.tsa.statespace.SARIMAX(endog=endogenous,66 exog = exogenous,67 trend=None,68 order=(1, 0, 1),69 seasonal_order=(0, 1, 1, 24),70 enforce_stationarity=False,71 enforce_invertibility=False)72 results = arima_model.fit()73 return df_exog, results74def plot_exog_arima(data, data_exog, model, building_id, length=1, total_length=30, test_length=24):75 start_train, end_train, start_test, end_test = find_dates(building_id, length=length, total_length=total_length)76 df_train, df_test = create_train_test(data, start_train, end_train, start_test, end_test, test_length=test_length)77 df_exog_train, df_exog_test = create_train_test(data_exog, start_train, end_train, start_test, end_test, test_length=test_length)78 mse, rmse = add_forecast(model, df_test, df_exog_train, start_test, end_test)79 plot_forecast(df_exog_train, 500)80 return mse, rmse, df_exog_train81#function to find optimal parameters and resulting AIC score82def gridsearch_arima(y, exog=None):83 p = d = q = range(0, 2)84 pdq = list(itertools.product(p, d, q))85 seasonal_pdq = [(x[0], x[1], x[2], 24) for x in list(itertools.product(p, d, q))]86 low_aic = [0,0,50000]87 for param in pdq:88 for param_seasonal in seasonal_pdq:89 try:90 model = sm.tsa.statespace.SARIMAX(y,91 exog=exog,92 order=param,93 seasonal_order=param_seasonal,94 enforce_stationarity=False,95 enforce_invertibility=False)96 results = model.fit()97 if results.aic < low_aic[2]:98 low_aic = [param, param_seasonal, results.aic]99# print('ARIMA{}x{}24 - AIC:{}'.format(param, param_seasonal, results.aic))100 except:101 continue102 return low_aic103#function to forecast with fitted model, returns MSE and RMSE104def add_forecast(model, test, train, start_time, end_time):105 train['forecast'] = model.predict(start=start_time, end=end_time)106 y_true = test.loc[start_time:end_time, 'Hourly_Usage']107 y_pred = train.loc[start_time:end_time, 'forecast']108 train.loc[start_time:end_time, 'Hourly_Usage'] = test.loc[start_time:end_time, 'Hourly_Usage']109 mse = mean_squared_error(y_true, y_pred)110 rmse = np.sqrt(mse)111 return mse, rmse112def plot_forecast(data, datapoints):113 fig = plt.figure(figsize=(16,8))114 plt.plot(data['Hourly_Usage'][datapoints:])115 plt.plot(data['forecast'])116 plt.legend()117#function to find mean car charge118def mean_car_charge(data, start, end):119 car_charge = {}120 for index in data.Time_Index.unique():121 car_charge[index] = np.mean(data[data.Time_Index==index].car1)122 return car_charge123#function to add all exogenous variables124def create_exog_endo(data, weather, building_id, length=1, total_length=30):125 start_train, end_train, start_test, end_test = find_dates(building_id, length, total_length)126 df_train, df_test = create_train_test(data, start_train, end_train, start_test, end_test, 24*length)127 car_charge = mean_car_charge(data, start_train,end_train)128 df_train['Time_Index'] = df_train.index.weekday_name+ df_train.index.hour.astype(str)129 df_train['Temperature'] = weather.loc[start_train:end_test, 'temperature']130 df_train['Humidity'] = weather.loc[start_train:end_test, 'humidity']131 for time in df_train.loc[start_test:end_test,:].index:132 df_train.loc[time,'car1'] = car_charge[df_train.loc[time,'Time_Index']]133 #fill missing values with mean134 df_train['Temperature'] = df_train.Temperature.fillna(np.mean(df_train['Temperature']))135 df_train['Humidity'] = df_train.Humidity.fillna(np.mean(df_train['Humidity']))136 exogenous = df_train.loc[start_train:,['Temperature','Humidity','car1']].astype(float)137 endogenous = df_train.loc[:,'Hourly_Usage'].astype(float)138 return df_train, exogenous, endogenous139#function to fit SARIMAX model with create_exog_endo140def fit_exog_arima_new(exogenous, endogenous):141 low_aic = gridsearch_arima(endogenous,exogenous)142 arima_model = sm.tsa.statespace.SARIMAX(endog=endogenous,143 exog = exogenous,144 trend=None,145 order=low_aic[0],146 seasonal_order=low_aic[1],147 enforce_stationarity=False,148 enforce_invertibility=False)149 arima_exog_results = arima_model.fit()...

Full Screen

Full Screen

test_api.py

Source:test_api.py Github

copy

Full Screen

...36 )37(options, args) = parser.parse_args()38server_url = options.server39data_dir = options.datadir40def start_test(x): print x41def testAddImage():42 print server_url43 print data_dir44 server = xmlrpclib.ServerProxy(server_url);45 assert server.createDb(1) == True46 start_test('add imgs')47 assert server.addImg(1, 7,data_dir+"DSC00007.JPG") == 148 assert server.addImg(1, 6,data_dir+"DSC00006.JPG") == 149 assert server.addImg(1, 14,data_dir+"DSC00014.JPG") == 150 assert server.addImg(1, 17,data_dir+"DSC00017.JPG") == 151 start_test('img count')52 assert server.getDbImgCount(1) == 453 start_test('image is on db')54 assert server.isImgOnDb(1,7) == True55 start_test('save db')56 assert server.saveAllDbs() > 057 start_test('reset db')58 assert server.resetDb(1) == 159 assert server.getDbImgCount(1) == 060 start_test('load db')61 assert server.loadAllDbs() > 062 assert server.getDbImgCount(1) == 463 assert server.isImgOnDb(1,7) == 164 assert server.isImgOnDb(1,733) == 065 start_test('remove img')66 assert server.removeImg(1,7) == 167 assert server.removeImg(1,73232) == 068 assert server.getDbImgCount(1) == 369 assert server.isImgOnDb(1,7) == 070 assert server.getDbImgIdList(1) == [6,14,17]71 start_test('list database spaces')72 assert 1 in server.getDbList()73 start_test('add more random images')74 fnames = [data_dir+"DSC00007.JPG",75 data_dir+"DSC00006.JPG",76 data_dir+"DSC00014.JPG",77 data_dir+"DSC00017.JPG"78 ]79 import random80 for i in range(20,60):81 assert server.addImg(1, i, random.choice(fnames)) == 182 start_test('add keywords')83 assert server.addKeywordImg(1,142,3) == False84 assert server.addKeywordImg(1,14,1) == True85 assert server.addKeywordImg(1,14,2) == True86 assert server.addKeywordImg(1,14,3) == True87 assert server.addKeywordImg(1,14,4) == True88 assert server.addKeywordImg(1,17,3) == True89 assert server.addKeywordImg(1,21,3) == True90 assert server.addKeywordImg(1,22,5) == True91 start_test('get keywords')92 assert server.getKeywordsImg(1,14) == [1,2,3,4]93 assert server.getKeywordsImg(1,17) == [3]94 assert server.getKeywordsImg(1,21) == [3]95 assert server.getKeywordsImg(1,20) == []96 start_test('remove keywords')97 assert server.removeAllKeywordImg(1,17) == True98 assert server.getKeywordsImg(1,17) == []99 start_test('save db')100 assert server.saveAllDbs() > 0101 start_test('reset db')102 assert server.resetDb(1) == 1103 assert server.getDbImgCount(1) == 0104 start_test('load db')105 assert server.loadAllDbs() > 0106 assert server.getDbImgCount(1) == 43107 start_test('get keywords')108 assert server.getKeywordsImg(1,14) == [1,2,3,4]109 start_test('query by a keyword')110 # 3: 14, 17, 21111 # 4: 14112 # 5: 22113 res = server.getAllImgsByKeywords(1, 30, 1, '3')114 assert 14 in res115 assert 17 in res116 assert 21 in res117 res = server.getAllImgsByKeywords(1, 30, 0, '3,4')118 assert 14 in res119 assert 17 in res120 assert 21 in res121 res = server.getAllImgsByKeywords(1, 30, 0, '3,4,5')122 assert 14 in res123 assert 17 in res124 assert 21 in res125 assert 22 in res126 res = server.getAllImgsByKeywords(1, 30, 1, '5')127 assert 22 in res128 res = server.getAllImgsByKeywords(1, 30, 1, '3,4')129 assert 14 in res130 start_test('query similarity')131 assert len(server.queryImgID(1,6, 3)) == 4132 start_test('query similarity by a keyword')133 #def queryImgIDKeywords(dbId, imgId, numres, kwJoinType, keywords):134 res = server.queryImgIDKeywords(1,6, 3,0,'3,4')135 resids = [r[0] for r in res]136 assert 17 in resids137 # start_test('mostPopularKeywords')138 #139 # assert server.addKeywordImg(1,50,1) == True140 # assert server.addKeywordImg(1,50,2) == True141 # assert server.addKeywordImg(1,50,3) == True142 # assert server.addKeywordImg(1,51,1) == True143 # assert server.addKeywordImg(1,51,2) == True144 # assert server.addKeywordImg(1,51,3) == True145 # assert server.addKeywordImg(1,52,3) == True146 #147 # # dbId, imgs, excludedKwds, count, mode148 # res = server.mostPopularKeywords(1, '50,51,52', '1', 3, 0)149 # resmap = {}150 # for i in range(len(res)/2):151 # resmap[res[i*2]] = res[i*2+1]...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Lemoncheesecake automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful