Best Python code snippet using pyscreenshot_python
core.py
Source:core.py
1from amaranth import Cat2from amaranth import Mux3from amaranth import Signal4from amaranth import Module5from amaranth import Memory6from amaranth import Elaboratable7from amaranth.build import Platform8from amaranth_soc.wishbone.bus import Interface9from typing import List10from altair.gateware.core.isa import Funct311from altair.gateware.core.isa import ExceptionCause12from altair.gateware.core.csr import CSRFile13from altair.gateware.core.lsu import LoadStoreUnit14from altair.gateware.core.decoder import DecoderUnit15from altair.gateware.core.exception import ExceptionUnit16from altair.gateware.core.divider import Divider17from altair.gateware.core.multiplier import Multiplier18from altair.gateware.debug.trigger import TriggerModule19class Core(Elaboratable):20 def __init__(self,21 # Reset22 reset_address: int = 0x8000_0000,23 # ISA24 enable_rv32m: bool = False,25 enable_rv32a: bool = False,26 enable_extra_csr: bool = False,27 enable_user_mode: bool = False,28 # Trigger29 enable_triggers: bool = False,30 ntriggers: int = 4,31 # Debug32 debug_enable: bool = False,33 # Identification34 hartid: int = 035 ) -> None:36 # ----------------------------------------------------------------------37 # configuration38 self.reset_address = reset_address39 self.enable_rv32m = enable_rv32m40 self.enable_rv32a = enable_rv32a41 self.enable_extra_csr = enable_extra_csr42 self.enable_user_mode = enable_user_mode43 self.enable_trigger = enable_triggers44 self.trigger_ntriggers = ntriggers45 self.debug_enable = debug_enable46 features = ['err', 'lock'] if enable_rv32a else ['err']47 # Instantiate units48 self._lsu = LoadStoreUnit(features=features)49 self._decoder = DecoderUnit(self.enable_rv32m, self.enable_rv32a)50 self._csr = CSRFile()51 self._exceptunit = ExceptionUnit(csrf=self._csr,52 hartid=hartid,53 enable_rv32m=self.enable_rv32m,54 enable_extra_csr=self.enable_extra_csr,55 enable_user_mode=self.enable_user_mode,56 reset_address=self.reset_address)57 gprf = Memory(width=32, depth=32)58 self._gprf_rp1 = gprf.read_port(transparent=False)59 self._gprf_rp2 = gprf.read_port(transparent=False)60 self._gprf_wp = gprf.write_port()61 if self.enable_rv32m:62 self._multiplier = Multiplier()63 self._divider = Divider()64 if self.enable_trigger:65 self._trigger = TriggerModule(privmode=self._exceptunit.m_privmode,66 ntriggers=self.trigger_ntriggers,67 csrf=self._csr,68 enable_user_mode=self.enable_user_mode)69 # IO70 self.wbport = Interface(addr_width=30, data_width=32, granularity=8, features=features, name='wbport')71 self.external_interrupt = Signal() # input72 self.timer_interrupt = Signal() # input73 self.software_interrupt = Signal() # input74 def port_list(self) -> List:75 mport = [getattr(self.wbport, name) for name, _, _ in self.wbport.layout]76 return [77 *mport,78 self.external_interrupt,79 self.timer_interrupt,80 self.software_interrupt81 ]82 def str2value(self, string: str):83 val = 084 for idx, x in enumerate(string[::-1]):85 val += ord(x) << (idx << 3)86 return val87 def elaborate(self, platform: Platform) -> Module:88 m = Module()89 # ----------------------------------------------------------------------90 # signals91 debug_state = Signal(8 * 10)92 pc = Signal(32, reset=self.reset_address)93 pc4 = Signal(32)94 b_taken = Signal()95 mult_result = Signal(32)96 mult_ack = Signal()97 div_result = Signal(32)98 div_ack = Signal()99 instruction = Signal(32)100 alu_a = Signal(32)101 alu_b = Signal(32)102 cmp_b = Signal(32)103 add_out = Signal(32)104 logic_out = Signal(32)105 shift_out = Signal(32)106 csr_out = Signal(32)107 ld_out = Signal(32)108 is_eq = Signal()109 is_lt = Signal()110 is_ltu = Signal()111 ltx_cmp_out = Signal()112 jb_error = Signal()113 multdiv = Signal()114 csr_src = Signal(32)115 csr_wdata = Signal(32)116 # ----------------------------------------------------------------------117 # Register units118 m.submodules.lsu = self._lsu119 m.submodules.decoder = self._decoder120 m.submodules.csr = self._csr121 m.submodules.exception = self._exceptunit122 m.submodules += self._gprf_rp1, self._gprf_rp2, self._gprf_wp123 # optional units: register and connect124 if self.enable_rv32m:125 m.submodules.multiplier = self._multiplier126 m.submodules.divider = self._divider127 m.d.comb += [128 self._multiplier.op.eq(self._decoder.funct3),129 self._multiplier.dat1.eq(self._gprf_rp1.data),130 self._multiplier.dat2.eq(self._gprf_rp2.data),131 self._multiplier.valid.eq(multdiv & self._decoder.is_mul),132 mult_result.eq(self._multiplier.result),133 mult_ack.eq(self._multiplier.ready),134 self._divider.op.eq(self._decoder.funct3),135 self._divider.dat1.eq(self._gprf_rp1.data),136 self._divider.dat2.eq(self._gprf_rp2.data),137 self._divider.valid.eq(multdiv & self._decoder.is_div),138 div_result.eq(self._divider.result),139 div_ack.eq(self._divider.ready)140 ]141 else:142 m.d.comb += [143 mult_result.eq(0xdead0000),144 div_result.eq(0x0000beef),145 mult_ack.eq(0),146 div_ack.eq(0)147 ]148 if self.enable_trigger:149 m.submodules.trigger = self._trigger150 m.d.comb += [151 self._trigger.x_pc.eq(pc),152 self._trigger.x_bus_addr.eq(add_out)153 ]154 # Memory port155 m.d.comb += self._lsu.mport.connect(self.wbport)156 # ALU A157 with m.If(self._decoder.inst_lui):158 m.d.comb += alu_a.eq(0)159 with m.Elif(self._decoder.inst_auipc | self._decoder.inst_jal | self._decoder.is_b):160 m.d.comb += alu_a.eq(pc)161 with m.Else():162 m.d.comb += alu_a.eq(self._gprf_rp1.data)163 # ALU B164 with m.If(self._decoder.inst_lui | self._decoder.inst_auipc | self._decoder.is_j | self._decoder.is_b | self._decoder.is_ld | self._decoder.is_st | self._decoder.is_imm):165 m.d.comb += alu_b.eq(self._decoder.immediate)166 with m.Elif(self._decoder.inst_sub):167 m.d.comb += alu_b.eq(~self._gprf_rp2.data)168 if self.enable_rv32a:169 with m.Elif(self._decoder.is_amo | self._decoder.is_lrsc):170 m.d.comb += alu_b.eq(0)171 with m.Else():172 m.d.comb += alu_b.eq(self._gprf_rp2.data)173 # CMP174 with m.If(self._decoder.inst_slti | self._decoder.inst_sltiu):175 m.d.comb += cmp_b.eq(self._decoder.immediate)176 with m.Else():177 m.d.comb += cmp_b.eq(self._gprf_rp2.data)178 # ALU179 m.d.sync += add_out.eq(alu_a + alu_b + self._decoder.inst_sub)180 # logic181 with m.If(self._decoder.inst_and | self._decoder.inst_andi):182 m.d.sync += logic_out.eq(alu_a & alu_b)183 with m.Elif(self._decoder.inst_or | self._decoder.inst_ori):184 m.d.sync += logic_out.eq(alu_a | alu_b)185 with m.Else():186 m.d.sync += logic_out.eq(alu_a ^ alu_b)187 # compare188 m.d.sync += [189 is_eq.eq(self._gprf_rp1.data == cmp_b),190 is_lt.eq(self._gprf_rp1.data.as_signed() < cmp_b.as_signed()),191 is_ltu.eq(self._gprf_rp1.data < cmp_b)192 ]193 m.d.comb += ltx_cmp_out.eq((is_lt & (self._decoder.inst_slt | self._decoder.inst_slti)) |194 (is_ltu & (self._decoder.inst_sltu | self._decoder.inst_sltiu)))195 # shift196 with m.If(self._decoder.inst_sll | self._decoder.inst_slli):197 m.d.sync += shift_out.eq(alu_a << alu_b[0:5])198 with m.Elif(self._decoder.inst_srl | self._decoder.inst_srli):199 m.d.sync += shift_out.eq(alu_a >> alu_b[0:5])200 with m.Else():201 m.d.sync += shift_out.eq(alu_a.as_signed() >> alu_b[0:5])202 # JMP/Branch203 beq = is_eq & self._decoder.inst_beq204 bne = ~is_eq & self._decoder.inst_bne205 blt = is_lt & self._decoder.inst_blt206 bge = ~is_lt & self._decoder.inst_bge207 bltu = is_ltu & self._decoder.inst_bltu208 bgeu = ~is_ltu & self._decoder.inst_bgeu209 m.d.comb += [210 b_taken.eq(beq | bne | blt | bge | bltu | bgeu),211 jb_error.eq((self._decoder.is_j | b_taken) & add_out[1]) # check for misalignment212 ]213 # CSR port214 m.d.comb += self._csr.privmode.eq(self._exceptunit.m_privmode)215 with m.If(self._decoder.funct3[2]):216 m.d.sync += csr_src.eq(self._decoder.gpr_rs1_q)217 with m.Else():218 m.d.sync += csr_src.eq(self._gprf_rp1.data)219 with m.If(self._decoder.funct3[:2] == 0b01): # write220 m.d.comb += csr_wdata.eq(csr_src)221 with m.Elif(self._decoder.funct3[:2] == 0b10): # set222 m.d.comb += csr_wdata.eq(self._csr.port.dat_r | csr_src)223 with m.Else(): # clear224 m.d.comb += csr_wdata.eq(self._csr.port.dat_r & ~csr_src)225 # Default: do not read the RF226 m.d.comb += [227 self._gprf_rp1.en.eq(0),228 self._gprf_rp2.en.eq(0)229 ]230 # Decoder231 m.d.comb += self._decoder.privmode.eq(self._exceptunit.m_privmode)232 # Interrupts233 m.d.comb += [234 self._exceptunit.external_interrupt.eq(self.external_interrupt),235 self._exceptunit.software_interrupt.eq(self.software_interrupt),236 self._exceptunit.timer_interrupt.eq(self.timer_interrupt)237 ]238 m.d.comb += self._exceptunit.m_pc.eq(pc)239 # Atomic Memory Operations240 if self.enable_rv32a:241 amo_rdata = Signal(32)242 amo_wdata = Signal(32)243 amo_strobe = Signal()244 amo_done = Signal()245 amo_write = Signal()246 with m.FSM(name='amo'):247 with m.State('load'):248 m.d.comb += amo_strobe.eq(1)249 with m.If(self._lsu.ready & self._decoder.is_amo):250 m.d.sync += amo_rdata.eq(self._lsu.load_data)251 m.next = 'modify'252 with m.State('modify'):253 m.d.comb += amo_strobe.eq(0)254 with m.If(self._decoder.inst_amoadd):255 m.d.sync += amo_wdata.eq(amo_rdata + self._gprf_rp2.data)256 with m.Elif(self._decoder.inst_amoand):257 m.d.sync += amo_wdata.eq(amo_rdata & self._gprf_rp2.data)258 with m.Elif(self._decoder.inst_amomax):259 m.d.sync += amo_wdata.eq(Mux(amo_rdata.as_signed() > self._gprf_rp2.data.as_signed(), amo_rdata, self._gprf_rp2.data))260 with m.Elif(self._decoder.inst_amomaxu):261 m.d.sync += amo_wdata.eq(Mux(amo_rdata > self._gprf_rp2.data, amo_rdata, self._gprf_rp2.data))262 with m.Elif(self._decoder.inst_amomin):263 m.d.sync += amo_wdata.eq(Mux(amo_rdata.as_signed() > self._gprf_rp2.data.as_signed(), self._gprf_rp2.data, amo_rdata))264 with m.Elif(self._decoder.inst_amominu):265 m.d.sync += amo_wdata.eq(Mux(amo_rdata > self._gprf_rp2.data, self._gprf_rp2.data, amo_rdata))266 with m.Elif(self._decoder.inst_amoswap):267 m.d.sync += amo_wdata.eq(self._gprf_rp2.data)268 with m.Elif(self._decoder.inst_amoxor):269 m.d.sync += amo_wdata.eq(amo_rdata ^ self._gprf_rp2.data)270 with m.Elif(self._decoder.inst_amoor):271 m.d.sync += amo_wdata.eq(amo_rdata | self._gprf_rp2.data)272 m.next = 'store'273 with m.State('store'):274 m.d.comb += [275 amo_strobe.eq(1),276 amo_write.eq(1)277 ]278 with m.If(self._lsu.ready):279 m.d.comb += amo_done.eq(1)280 m.next = 'load'281 with m.Elif(self._lsu.error):282 m.next = 'load'283 # ----------------------------------------------------------------------284 # Main FSM285 with m.FSM(name='main'):286 with m.State('RESET'):287 m.d.comb += debug_state.eq(self.str2value('RESET'))288 m.next = 'FETCH'289 with m.State('FETCH'):290 m.d.comb += debug_state.eq(self.str2value('FETCH'))291 # connect LSU292 m.d.comb += [293 self._lsu.address.eq(pc),294 self._lsu.store_data.eq(0xdead_c0de),295 self._lsu.write.eq(0),296 self._lsu.cycle.eq(1),297 self._lsu.strobe.eq(1),298 self._lsu.op.eq(Funct3.W)299 ]300 # pre-decoding301 m.d.comb += [302 self._decoder.instruction_f.eq(self._lsu.load_data), # start decoding303 self._decoder.enable.eq(self._lsu.ready),304 self._gprf_rp1.addr.eq(self._decoder.gpr_rs1),305 self._gprf_rp1.en.eq(1),306 self._gprf_rp2.addr.eq(self._decoder.gpr_rs2),307 self._gprf_rp2.en.eq(1)308 ]309 m.d.sync += instruction.eq(self._lsu.load_data) # latch the instruction310 with m.If(self._lsu.ready):311 m.next = 'EXECUTE'312 with m.Elif(self._lsu.error | self._lsu.misaligned):313 m.d.sync += [314 self._exceptunit.enable.eq(1),315 self._exceptunit.edata.eq(pc),316 self._exceptunit.ecode.eq(ExceptionCause.E_INST_ADDR_MISALIGNED),317 self._exceptunit.m_exception.eq(1)318 ]319 with m.If(self._lsu.error):320 m.d.sync += self._exceptunit.ecode.eq(ExceptionCause.E_INST_ACCESS_FAULT)321 m.next = 'TRAP'322 with m.State('EXECUTE'):323 m.d.comb += debug_state.eq(self.str2value('EXECUTE'))324 if self.enable_trigger:325 m.d.comb += self._trigger.x_valid.eq(1)326 with m.If(self._exceptunit.m_interrupt):327 m.d.sync += [328 self._exceptunit.enable.eq(1),329 self._exceptunit.edata.eq(instruction)330 ]331 m.next = 'TRAP'332 if self.enable_trigger:333 with m.Elif(self._trigger.trap):334 m.d.sync += [335 self._exceptunit.enable.eq(1),336 self._exceptunit.edata.eq(pc),337 self._exceptunit.ecode.eq(ExceptionCause.E_BREAKPOINT),338 self._exceptunit.m_exception.eq(1)339 ]340 m.next = 'TRAP'341 with m.Else():342 with m.If(self._decoder.is_shift | self._decoder.use_alu):343 m.next = 'COMMIT'344 with m.Elif(self._decoder.is_mul | self._decoder.is_div):345 m.d.comb += multdiv.eq(1)346 with m.If(mult_ack | div_ack):347 m.next = 'COMMIT'348 with m.Elif(self._decoder.inst_fence | self._decoder.inst_fencei | self._decoder.inst_wfi):349 m.d.sync += pc.eq(pc4)350 if self.enable_extra_csr:351 m.d.comb += self._exceptunit.w_retire.eq(1)352 m.next = 'FETCH'353 with m.Elif(self._decoder.is_ld | self._decoder.is_st | self._decoder.is_lrsc):354 m.next = 'MEMLS/LRSC'355 if self.enable_rv32a:356 with m.Elif(self._decoder.is_amo):357 m.next = 'AMO'358 with m.Elif(self._decoder.is_csr):359 m.next = 'CSR'360 with m.Else():361 m.d.sync += [362 self._exceptunit.enable.eq(1),363 self._exceptunit.edata.eq(instruction),364 self._exceptunit.ecode.eq(ExceptionCause.E_ILLEGAL_INST),365 self._exceptunit.m_exception.eq(~self._decoder.inst_mret),366 self._exceptunit.m_mret.eq(self._decoder.inst_mret)367 ]368 with m.If(self._decoder.inst_xcall):369 m.d.sync += self._exceptunit.ecode.eq(ExceptionCause.E_ECALL_FROM_M) # check priviledge mode...370 with m.Elif(self._decoder.inst_xbreak):371 m.d.sync += [372 self._exceptunit.edata.eq(pc),373 self._exceptunit.ecode.eq(ExceptionCause.E_BREAKPOINT)374 ]375 m.next = 'TRAP'376 with m.State('MEMLS/LRSC'):377 m.d.comb += debug_state.eq(self.str2value('MEMLS/LRSC'))378 is_ld = self._decoder.is_ld | self._decoder.inst_lr379 is_st = self._decoder.is_st | self._decoder.inst_sc380 valid = 1381 if self.enable_trigger:382 valid = valid & ~self._trigger.trap383 m.d.comb += [384 self._trigger.x_valid.eq(1),385 self._trigger.x_load.eq(is_ld),386 self._trigger.x_store.eq(is_st),387 ]388 # connect LSU389 m.d.comb += [390 self._lsu.address.eq(add_out),391 self._lsu.store_data.eq(self._gprf_rp2.data),392 self._lsu.write.eq(is_st),393 self._lsu.cycle.eq(valid),394 self._lsu.strobe.eq(valid),395 self._lsu.op.eq(self._decoder.funct3)396 ]397 if self.enable_rv32a:398 m.d.comb += self._lsu.lrsc.eq(self._decoder.is_lrsc)399 # Next state and extra logic400 ready = self._lsu.ready401 with m.If(ready):402 m.d.sync += ld_out.eq(self._lsu.load_data)403 m.next = 'COMMIT'404 with m.Elif(self._lsu.error | self._lsu.misaligned):405 m.d.sync += [406 self._exceptunit.enable.eq(1),407 self._exceptunit.edata.eq(add_out),408 self._exceptunit.m_exception.eq(1)409 ]410 with m.If(is_ld & self._lsu.error):411 m.d.sync += self._exceptunit.ecode.eq(ExceptionCause.E_LOAD_ACCESS_FAULT)412 with m.If(is_ld & self._lsu.misaligned):413 m.d.sync += self._exceptunit.ecode.eq(ExceptionCause.E_LOAD_ADDR_MISALIGNED)414 with m.If(is_st & self._lsu.error):415 m.d.sync += self._exceptunit.ecode.eq(ExceptionCause.E_STORE_AMO_ACCESS_FAULT)416 with m.If(is_st & self._lsu.misaligned):417 m.d.sync += self._exceptunit.ecode.eq(ExceptionCause.E_STORE_AMO_ADDR_MISALIGNED)418 m.next = 'TRAP'419 if self.enable_trigger:420 with m.If(self._trigger.trap):421 m.d.sync += [422 self._exceptunit.enable.eq(1),423 self._exceptunit.edata.eq(pc),424 self._exceptunit.ecode.eq(ExceptionCause.E_BREAKPOINT),425 self._exceptunit.m_exception.eq(1)426 ]427 m.next = 'TRAP'428 if self.enable_rv32a:429 with m.State('AMO'):430 m.d.comb += debug_state.eq(self.str2value('AMO'))431 # connect LSU432 m.d.comb += [433 self._lsu.address.eq(add_out),434 self._lsu.store_data.eq(amo_wdata),435 self._lsu.write.eq(amo_write),436 self._lsu.cycle.eq(1),437 self._lsu.strobe.eq(amo_strobe),438 self._lsu.op.eq(self._decoder.funct3)439 ]440 with m.If(amo_done):441 m.d.sync += ld_out.eq(amo_rdata)442 m.next = 'COMMIT'443 with m.Elif(self._lsu.error | self._lsu.misaligned):444 m.d.sync += [445 self._exceptunit.enable.eq(1),446 self._exceptunit.edata.eq(add_out),447 self._exceptunit.m_exception.eq(1)448 ]449 with m.If(self._lsu.error):450 m.d.sync += self._exceptunit.ecode.eq(ExceptionCause.E_STORE_AMO_ACCESS_FAULT)451 with m.If(self._lsu.misaligned):452 m.d.sync += self._exceptunit.ecode.eq(ExceptionCause.E_STORE_AMO_ADDR_MISALIGNED)453 m.next = 'TRAP'454 with m.State('CSR'):455 m.d.comb += debug_state.eq(self.str2value('CSR'))456 # CSR457 m.d.comb += [458 self._csr.port.addr.eq(self._decoder.csr_addr),459 self._csr.port.dat_w.eq(csr_wdata),460 self._csr.port.we.eq(self._decoder.csr_we),461 self._csr.port.valid.eq(1)462 ]463 with m.If(self._csr.port.ready):464 m.d.sync += csr_out.eq(self._csr.port.dat_r)465 m.next = 'COMMIT'466 with m.If(self._csr.invalid & self._decoder.is_csr):467 m.d.sync += [468 self._exceptunit.enable.eq(1),469 self._exceptunit.edata.eq(instruction),470 self._exceptunit.ecode.eq(ExceptionCause.E_ILLEGAL_INST),471 self._exceptunit.m_exception.eq(1)472 ]473 m.next = 'TRAP'474 with m.State('COMMIT'):475 m.d.comb += debug_state.eq(self.str2value('COMMIT'))476 with m.If(self._decoder.is_j | b_taken):477 with m.If(~jb_error):478 m.d.sync += pc.eq(Cat(0, add_out[1:]))479 with m.Else():480 m.d.sync += pc.eq(pc4)481 with m.If(self._decoder.gpr_rd.any()):482 m.d.comb += [483 self._gprf_wp.addr.eq(self._decoder.gpr_rd),484 self._gprf_wp.en.eq(self._decoder.is_j | self._decoder.is_ld | self._decoder.is_csr | self._decoder.is_logic |485 self._decoder.is_cmp | self._decoder.is_shift | self._decoder.is_add | self._decoder.is_mul |486 self._decoder.is_div | self._decoder.is_amo | self._decoder.is_lrsc)487 ]488 # BFMux489 with m.If(self._decoder.is_j):490 m.d.comb += self._gprf_wp.data.eq(pc4)491 with m.Elif(self._decoder.is_ld):492 m.d.comb += self._gprf_wp.data.eq(ld_out)493 with m.Elif(self._decoder.is_csr):494 m.d.comb += self._gprf_wp.data.eq(csr_out)495 with m.Elif(self._decoder.is_logic):496 m.d.comb += self._gprf_wp.data.eq(logic_out)497 with m.Elif(self._decoder.is_cmp):498 m.d.comb += self._gprf_wp.data.eq(ltx_cmp_out)499 with m.Elif(self._decoder.is_shift):500 m.d.comb += self._gprf_wp.data.eq(shift_out)501 with m.Elif(self._decoder.is_mul):502 m.d.comb += self._gprf_wp.data.eq(mult_result)503 with m.Elif(self._decoder.is_div):504 m.d.comb += self._gprf_wp.data.eq(div_result)505 if self.enable_rv32a:506 with m.Elif(self._decoder.is_amo):507 m.d.comb += self._gprf_wp.data.eq(amo_rdata)508 with m.Elif(self._decoder.is_lrsc):509 m.d.comb += self._gprf_wp.data.eq(ld_out)510 with m.Else():511 m.d.comb += self._gprf_wp.data.eq(add_out)512 m.next = 'FETCH'513 with m.If(jb_error):514 m.d.comb += self._gprf_wp.en.eq(0)515 m.d.sync += [516 self._exceptunit.enable.eq(1),517 self._exceptunit.edata.eq(Cat(0, add_out[1:])),518 self._exceptunit.ecode.eq(ExceptionCause.E_INST_ADDR_MISALIGNED),519 self._exceptunit.m_exception.eq(1)520 ]521 m.next = 'TRAP'522 if self.enable_extra_csr:523 with m.Else():524 m.d.comb += self._exceptunit.w_retire.eq(1)525 with m.State('TRAP'):526 m.d.comb += debug_state.eq(self.str2value('TRAP'))527 with m.If(self._decoder.inst_mret):528 m.d.sync += pc.eq(self._exceptunit.mepc)529 with m.Else():530 m.d.sync += pc.eq(self._exceptunit.mtvec)531 m.d.sync += [532 self._exceptunit.enable.eq(0),533 self._exceptunit.m_mret.eq(0),534 self._exceptunit.m_exception.eq(0)535 ]536 if self.enable_extra_csr:537 m.d.comb += self._exceptunit.w_retire.eq(1)538 m.next = 'FETCH'539 # ----------------------------------------------------------------------540 # New PC541 m.d.comb += pc4.eq(pc + 4)...
decoder.pyi
Source:decoder.pyi
1from collections.abc import Callable2from typing import Any3from typing_extensions import TypeAlias4from google.protobuf.descriptor import Descriptor, FieldDescriptor5from google.protobuf.message import Message6_Decoder: TypeAlias = Callable[[str, int, int, Message, dict[FieldDescriptor, Any]], int]7_NewDefault: TypeAlias = Callable[[Message], Message]8def ReadTag(buffer, pos): ...9Int32Decoder: _Decoder10Int64Decoder: _Decoder11UInt32Decoder: _Decoder12UInt64Decoder: _Decoder13SInt32Decoder: _Decoder14SInt64Decoder: _Decoder15Fixed32Decoder: _Decoder16Fixed64Decoder: _Decoder17SFixed32Decoder: _Decoder18SFixed64Decoder: _Decoder19FloatDecoder: _Decoder20DoubleDecoder: _Decoder21BoolDecoder: _Decoder22def EnumDecoder(23 field_number: int,24 is_repeated: bool,25 is_packed: bool,26 key: FieldDescriptor,27 new_default: _NewDefault,28 clear_if_default: bool = ...,29) -> _Decoder: ...30def StringDecoder(31 field_number: int,32 is_repeated: bool,33 is_packed: bool,34 key: FieldDescriptor,35 new_default: _NewDefault,36 clear_if_default: bool = ...,37) -> _Decoder: ...38def BytesDecoder(39 field_number: int,40 is_repeated: bool,41 is_packed: bool,42 key: FieldDescriptor,43 new_default: _NewDefault,44 clear_if_default: bool = ...,45) -> _Decoder: ...46def GroupDecoder(47 field_number: int, is_repeated: bool, is_packed: bool, key: FieldDescriptor, new_default: _NewDefault48) -> _Decoder: ...49def MessageDecoder(50 field_number: int, is_repeated: bool, is_packed: bool, key: FieldDescriptor, new_default: _NewDefault51) -> _Decoder: ...52MESSAGE_SET_ITEM_TAG: bytes53def MessageSetItemDecoder(descriptor: Descriptor) -> _Decoder: ...54def MapDecoder(field_descriptor, new_default, is_message_map) -> _Decoder: ......
_api.py
Source:_api.py
1import os2import attr3from _avif import ffi, lib4class AVIFError(Exception):5 pass6def _succeed(avif_result):7 """8 Raise an exception if a ``libavif`` library function failed.9 """10 if avif_result != lib.AVIF_RESULT_OK:11 c_string = lib.avifResultToString(avif_result)12 raise AVIFError(ffi.string(c_string).decode("utf-8"))13def _avifDecoder():14 """15 Create an FFI-managed avifDecoder.16 """17 return ffi.gc(lib.avifDecoderCreate(), lib.avifDecoderDestroy)18@attr.s19class Decoder:20 """21 An AVIF decoder.22 """23 _decoder = attr.ib(factory=_avifDecoder)24 def parse_path(self, path):25 _succeed(lib.avifDecoderSetIOFile(self._decoder, os.fsencode(path)))26 _succeed(lib.avifDecoderParse(self._decoder))27 return self._decoder.image28 def parse_data(self, data):29 _succeed(lib.avifDecoderSetIOMemory(self._decoder, data, len(data)))30 _succeed(lib.avifDecoderParse(self._decoder))31 return self._decoder.image32 def next_images(self):33 while True:34 res = lib.avifDecoderNextImage(self._decoder)35 if res == lib.AVIF_RESULT_NO_IMAGES_REMAINING:36 break37 _succeed(res)38 yield self._decoder.image39 def nth_image(self, n):40 _succeed(lib.avifDecoderNthImage(self._decoder, n))...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!