dist/tests/if_lib: Update to riot_pal

This removes all non-application based driver/devices and replaces with riot_pal.
riot_pal (riot protocol abstraction layer) can be installed with pip install riot_pal.
The purpose is ti simplify and modularize the interfaces and tests.
All tests using the if_lib interface are updated too.
This commit is contained in:
MrKevinWeiss 2018-10-08 14:16:04 +02:00
parent bbae45cc87
commit 5b83137fd1
12 changed files with 31 additions and 604 deletions

View File

@ -0,0 +1,3 @@
from .philip_if import PhilipIf
__all__ = ['PhilipIf']

View File

@ -1,75 +0,0 @@
# Copyright (C) 2018 Kevin Weiss <kevin.weiss@haw-hamburg.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
"""@package PyToAPI
This module provice the base interface for a device to a driver.
"""
import logging
from . import driver_manager
class BaseDevice:
"""Instance for devices to connect and untilize drivers."""
def __init__(self, *args, **kwargs):
logging.warning("deprecated, moving library to RIOT-OS/if_lib")
self._driver = driver_manager.driver_from_config(*args, **kwargs)
def close(self):
"""Closes the device connection."""
self._driver.close()
def _read(self):
"""Reads data from the driver."""
return self._driver.read()
def _write(self, data):
"""Writes data to the driver."""
return self._driver.write(data)
def is_connected_to_board(self):
"""Dummy - confirm if a connection is for target board."""
logging.warning("Check if board is connected: dummy should be"
" implmeneted in subclasses")
raise NotImplementedError()
@classmethod
def from_autodetect(cls, *args, **dev_config):
"""Connects to a range of possible configurations."""
configs = driver_manager.available_configs(*args, **dev_config)
logging.debug("Configs: %r", configs)
for config in configs:
for retry in range(0, 2):
logging.debug("Autodetect attempt: %d", retry)
conn = cls(**config)
try:
if conn.is_connected_to_board():
return conn
except Exception as err:
logging.debug("Cannot connect: %r", err)
conn.close()
raise ValueError("Could not locate board, check if board is"
"connected or is_connected_to_board is correct")
@classmethod
def copy_driver(cls, device):
"""Copies an the driver instance so multiple devices can use one driver."""
logging.debug("Cloning Driver: %r", device._driver)
return cls(dev_type='driver', driver=device._driver)
def main():
"""Tests basic usage of the class
Used for unit testing, information should be confirm with DEBUG info.
"""
logging.getLogger().setLevel(logging.DEBUG)
BaseDevice()
if __name__ == "__main__":
main()

View File

@ -1,42 +0,0 @@
# Copyright (C) 2018 Kevin Weiss <kevin.weiss@haw-hamburg.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
"""@package PyToAPI
This module assigns the drivers to the devices.
"""
import logging
from .serial_driver import SerialDriver
from .riot_driver import RiotDriver
def driver_from_config(dev_type='serial', *args, **kwargs):
"""Returns driver instance given configuration"""
if dev_type == 'serial':
return SerialDriver(*args, **kwargs)
elif dev_type == 'riot':
return RiotDriver(*args, **kwargs)
elif dev_type == 'driver':
return kwargs['driver']
raise NotImplementedError()
def available_configs(dev_type='serial', *args, **kwargs):
"""Returns possible configurations to attempt to connect to."""
if dev_type == 'serial':
return SerialDriver.get_configs(*args, **kwargs)
elif dev_type == 'riot':
return RiotDriver.get_configs(*args, **kwargs)
raise NotImplementedError()
def main():
"""Tests basic usage of the class"""
logging.getLogger().setLevel(logging.DEBUG)
logging.debug(available_configs())
logging.debug(driver_from_config())
if __name__ == "__main__":
main()

View File

@ -1,87 +0,0 @@
# Copyright (C) 2018 Kevin Weiss <kevin.weiss@haw-hamburg.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
"""@package PyToAPI
This module handles parsing of information from RIOT shell base tests.
"""
import logging
from .base_device import BaseDevice
class DutShell(BaseDevice):
"""Parses commands and resposes from the shell."""
COMMAND = 'Command: '
SUCCESS = 'Success: '
ERROR = 'Error: '
TIMEOUT = 'Timeout: '
RESULT_SUCCESS = 'Success'
RESULT_ERROR = 'Error'
RESULT_TIMEOUT = 'Timeout'
@staticmethod
def _try_parse_data(data):
if ('[' in data) and (']' in data):
parsed_data = []
data = data[data.find("[")+1:data.find("]")]
data_list = data.split(', ')
for value in data_list:
try:
parsed_data.append(int(value, 0))
except ValueError:
parsed_data.append(value)
logging.debug(parsed_data)
return parsed_data
return None
def send_cmd(self, send_cmd):
"""Returns a dictionary with information from the event.
msg - The message from the response, only used for information.
cmd - The command sent, used to track what has occured.
data - Parsed information of the data requested.
result - Either success, error or timeout.
"""
self._write(send_cmd)
response = self._read()
cmd_info = {'cmd': send_cmd, 'data': None}
while response != '':
if self.COMMAND in response:
cmd_info['msg'] = response.replace(self.COMMAND, '')
cmd_info['cmd'] = cmd_info['msg'].replace('\n', '')
if self.SUCCESS in response:
clean_msg = response.replace(self.SUCCESS, '')
cmd_info['msg'] = clean_msg.replace('\n', '')
cmd_info['result'] = self.RESULT_SUCCESS
cmd_info['data'] = self._try_parse_data(cmd_info['msg'])
break
if self.ERROR in response:
clean_msg = response.replace(self.ERROR, '')
cmd_info['msg'] = clean_msg.replace('\n', '')
cmd_info['result'] = self.RESULT_ERROR
break
response = self._read()
if response == '':
cmd_info['result'] = self.RESULT_TIMEOUT
logging.debug(self.RESULT_TIMEOUT)
return cmd_info
def test_node():
"""Simple test to ensure commuication with the node."""
b_if = DutShell()
b_if.send_cmd('i2c_get_id')
def main():
"""Tests TestShellIf class"""
logging.getLogger().setLevel(logging.DEBUG)
test_node()
if __name__ == "__main__":
main()

View File

@ -1,212 +0,0 @@
# Copyright (C) 2018 Kevin Weiss <kevin.weiss@haw-hamburg.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
"""@package PyToAPI
This module handles functions for a low level shell interface.
"""
import logging
import errno
import os
from .base_device import BaseDevice
class LLShell(BaseDevice):
"""Handles basic functions and commands for memory map interface."""
READ_REG_CMD = "rr"
WRITE_REG_CMD = "wr"
EXECUTE_CMD = "ex"
RESET_CMD = "mcu_rst"
SUCCESS = '0'
RESULT_SUCCESS = 'Success'
RESULT_ERROR = 'Error'
RESULT_TIMEOUT = 'Timeout'
@staticmethod
def _try_parse_data(data):
parsed_data = None
if len(data) > 1:
# response contains data
try:
if len(data[1]) - 2 <= 8:
parsed_data = int(data[1], 0)
else:
d_len = len(data[1]) - 1
parsed_data = bytearray.fromhex(data[1][2:d_len])
except ValueError:
parsed_data = data[1:]
return parsed_data
@staticmethod
def _error_msg(data):
s_errcode = errno.errorcode[data]
s_errmsg = os.strerror(data)
return "{}-{} [{}]".format(s_errcode, s_errmsg, data)
def _populate_cmd_info(self, data):
cmd_info = {}
try:
if data[0] == self.SUCCESS:
cmd_info['data'] = self._try_parse_data(data)
cmd_info['msg '] = "EOK-command success [0]"
cmd_info['result'] = self.RESULT_SUCCESS
logging.debug(self.RESULT_SUCCESS)
else:
# put error code in data
cmd_info['data'] = int(data[0], 0)
cmd_info['msg'] = self._error_msg(cmd_info['data'])
cmd_info['result'] = self.RESULT_ERROR
logging.debug(self.RESULT_ERROR)
logging.debug(cmd_info['msg'])
except Exception as exc:
cmd_info['msg'] = "Unknown Error {}".format(exc)
cmd_info['data'] = data[0]
cmd_info['result'] = self.RESULT_ERROR
logging.debug(self.RESULT_ERROR)
logging.debug(exc)
return cmd_info
def send_cmd(self, send_cmd):
"""Returns a dictionary with information from the event.
msg - The message from the response, only used for information.
cmd - The command sent, used to track what has occured.
data - Parsed information of the data requested.
result - Either success, error or timeout.
"""
self._write(send_cmd)
data = self._read()
cmd_info = {'cmd': send_cmd}
if data == "":
cmd_info['msg'] = "Timeout occured"
cmd_info['data'] = None
cmd_info['result'] = self.RESULT_TIMEOUT
logging.debug(self.RESULT_TIMEOUT)
else:
data = data.replace('\n', '')
data = data.split(',')
cmd_info.update(self._populate_cmd_info(data))
return cmd_info
def read_bytes(self, index, size=1):
"""Reads bytes in the register map."""
logging.debug("FXN: read_bytes(%r,%r)", index, size)
cmd = '{} {} {}'.format(self.READ_REG_CMD, index, size)
return self.send_cmd(cmd)
def write_bytes(self, index, data, size=4):
"""Writes bytes in the register map."""
logging.debug("FXN: write_bytes(%r,%r)", index, data)
cmd = "{} {}".format(self.WRITE_REG_CMD, index)
if isinstance(data, list):
for i in range(0, len(data)):
if len(data) - i - 1 < len(data):
cmd += ' {}'.format(data[len(data) - i - 1])
else:
cmd += ' 0'
else:
for i in range(0, size):
cmd += ' {}'.format((data >> ((i) * 8)) & 0xFF)
return self.send_cmd(cmd)
def read_bits(self, index, offset, bit_amount):
"""Read specific bits in the register map."""
logging.debug("FXN: read_bits(%r, %r, %r)", index, offset, bit_amount)
bytes_to_read = int((bit_amount - 1 + offset)/8 + 1)
bit_mask = (2 ** bit_amount) - 1
cmd_info = self.read_bytes(index, bytes_to_read)
if cmd_info['result'] == self.RESULT_SUCCESS:
cmd_info['cmd'] += ', read_bits {} {} {}'.format(index, offset,
bit_amount)
cmd_info['data'] = cmd_info['data'] >> offset
cmd_info['data'] = cmd_info['data'] & bit_mask
logging.debug("Bits: %r", cmd_info['data'])
return cmd_info
def write_bits(self, index, offset, bit_amount, data):
"""Modifies specific bits in the register map."""
cmd_sent = ""
logging.debug("FXN: write_bits"
"(%r, %r, %r, %r)", index, offset, bit_amount, data)
bytes_to_read = int((bit_amount - 1 + offset)/8 + 1)
cmd_info = self.read_bytes(index, bytes_to_read)
if cmd_info['result'] != self.RESULT_SUCCESS:
return cmd_info
cmd_sent += cmd_info['cmd']
bit_mask = int((2 ** bit_amount) - 1)
bit_mask = bit_mask << offset
cmd_info['data'] = cmd_info['data'] & (~bit_mask)
data = cmd_info['data'] | ((data << offset) & bit_mask)
cmd_info = self.write_bytes(index, data, bytes_to_read)
cmd_sent += cmd_info['cmd']
if cmd_info['result'] == self.RESULT_SUCCESS:
cmd_sent += ',write_bits {} {} {} {}'.format(index, offset,
bit_amount, data)
cmd_info['cmd'] = cmd_sent
return cmd_info
def execute_changes(self):
"""Executes device configuration changes."""
logging.debug("FXN: execute_changes")
return self.send_cmd(self.EXECUTE_CMD)
def reset_mcu(self):
"""Resets the device."""
logging.debug("FXN: reset_mcu")
return self.send_cmd(self.RESET_CMD)
def test_bpt():
"""Tests if basic functions work on the BPT memory map"""
b_if = LLShell()
b_if.reset_mcu()
b_if.execute_changes()
index = 152
b_if.read_bytes(index)
b_if.write_bytes(index, 0x0a0b0c0d)
b_if.read_bytes(index, 4)
b_if.write_bytes(index, 0x01, 1)
b_if.read_bytes(index, 4)
b_if.write_bytes(index, [9, 8, 7, 6, 5, 4, 3, 2])
b_if.read_bytes(index, 8)
b_if.write_bytes(index, 0)
b_if.read_bytes(index, 1)
b_if.write_bits(index, 0, 1, 1)
b_if.read_bytes(index, 1)
b_if.read_bits(index, 0, 1)
b_if.write_bits(index, 0, 2, 2)
b_if.read_bits(index, 0, 2)
b_if.write_bits(index, 1, 3, 6)
b_if.read_bits(index, 1, 3)
b_if.write_bits(index, 0, 8, 0xa5)
b_if.read_bits(index, 0, 8)
b_if.write_bits(index, 1, 7, 0x7F)
b_if.read_bits(index, 1, 7)
b_if.write_bits(index, 0, 9, 0x142)
b_if.read_bits(index, 0, 9)
b_if.write_bits(index, 1, 1, 1)
b_if.read_bits(index, 0, 3)
b_if.write_bits(index, 2, 1, 1)
b_if.read_bits(index, 0, 3)
b_if.reset_mcu()
def main():
"""Tests DeviceShellIf class"""
logging.getLogger().setLevel(logging.DEBUG)
test_bpt()
if __name__ == "__main__":
main()

View File

@ -9,17 +9,16 @@ This module handles offset and sizes dictated by the memory map for the BPT.
This module is autogenerated based on the memory map.
"""
import logging
from .ll_shell import LLShell
try:
from riot_pal import LLShell
except ImportError:
raise ImportError('Cannot find riot_pal, try "pip install riot_pal"')
class BptIf(LLShell):
class PhilipIf(LLShell):
"""Getters and setters for the memory map."""
DEVICE_NUM = 0x42A5
def is_connected_to_board(self):
"""Checks if board is connected."""
return self.get_sys_device_num()["data"] == self.DEVICE_NUM
def get_sys_sn_12(self):
"""Unique ID of the device"""
return self.read_bytes(0, 12)
@ -714,22 +713,19 @@ class BptIf(LLShell):
def main():
"""Tests all functions with default values."""
logging.getLogger().setLevel(logging.DEBUG)
try:
bpt = BptIf.from_autodetect(baud=9600)
except Exception as ex:
logging.debug("Failed to autodetect BptIf: " + str(ex))
bpt = BptIf.from_autodetect()
bpt2 = BptIf.copy_driver(bpt)
bpt.execute_changes()
bpt2.execute_changes()
bpt.reset_mcu()
cmds = bpt.get_command_list()
if1 = PhilipIf()
if2 = PhilipIf.copy_driver(if1)
if1.execute_changes()
if2.execute_changes()
if1.reset_mcu()
cmds = if1.get_command_list()
logging.debug("==========================================================")
for cmd in cmds:
cmd()
logging.debug("------------------------------------------------------")
logging.debug("==========================================================")
bpt.reset_mcu()
if1.reset_mcu()
if __name__ == "__main__":

View File

@ -1,46 +0,0 @@
# Copyright (C) 2018 Kevin Weiss <kevin.weiss@haw-hamburg.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
"""@package PyToAPI
This module handles generic connection and IO to the serial driver.
"""
class RiotDriver:
"""Contains all reusable functions for connecting, sending and recieveing
data.
"""
used_devices = []
def __init__(self):
raise NotImplementedError()
def close(self):
"""Close serial connection."""
raise NotImplementedError()
def read(self):
"""Read and decode data."""
raise NotImplementedError()
def write(self, data):
"""Tries write data."""
raise NotImplementedError()
@staticmethod
def get_configs():
"""Gets available serial configurations."""
raise NotImplementedError()
def main():
"""Tests basic usage of the class"""
if __name__ == "__main__":
main()

View File

@ -1,108 +0,0 @@
# Copyright (C) 2018 Kevin Weiss <kevin.weiss@haw-hamburg.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
"""@package PyToAPI
This module handles generic connection and IO to the serial driver.
"""
import logging
import time
import serial
import serial.tools.list_ports
class SerialDriver:
"""Contains all reusable functions for connecting, sending and recieveing
data.
"""
DEFAULT_TIMEOUT = 1
DEFAULT_BAUDRATE = 115200
DEFAULT_PORT = '/dev/ttyACM0'
used_devices = []
def __init__(self, *args, **kwargs):
if 'timeout' not in kwargs:
kwargs['timeout'] = self.DEFAULT_TIMEOUT
if len(args) < 2:
if 'baudrate' not in kwargs:
kwargs['baudrate'] = self.DEFAULT_BAUDRATE
if len(args) == 0:
if 'port' not in kwargs:
kwargs['port'] = self.DEFAULT_PORT
logging.debug("Serial connection args %r -- %r", args, kwargs)
try:
self._dev = serial.Serial(*args, **kwargs)
except serial.SerialException:
self._dev = serial.serial_for_url(*args, **kwargs)
SerialDriver.used_devices.append(self._dev.port)
time.sleep(0.1)
# A time delay is needed ensure everything is flushed correctly
self._dev.reset_input_buffer()
self._dev.reset_output_buffer()
def close(self):
"""Close serial connection."""
logging.debug("Closing %s", self._dev.port)
SerialDriver.used_devices.remove(self._dev.port)
self._dev.close()
def read(self):
"""Read and decode data."""
try:
res_bytes = self._dev.readline()
response = res_bytes.decode("utf-8", errors="replace")
except Exception as exc:
response = 'ERR'
logging.debug(exc)
logging.debug("Response: %s", response.replace('\n', ''))
return response
def write(self, data):
"""Tries write data."""
logging.debug("Sending: " + data)
self._dev.write((data + '\n').encode('utf-8'))
@staticmethod
def get_configs(baudrate=DEFAULT_BAUDRATE, timeout=1):
"""Gets available serial configurations."""
portlist = serial.tools.list_ports.comports()
available_configs = []
for element in portlist:
if element.device not in SerialDriver.used_devices:
available_configs.append({'port': element.device,
'baudrate': baudrate,
'timeout': timeout})
logging.debug("Ports available: %r", available_configs)
return available_configs
def main():
"""Tests basic usage of the class
Used for unit testing, information should be confirm with DEBUG info.
"""
logging.getLogger().setLevel(logging.DEBUG)
SerialDriver.get_configs()
ddif = SerialDriver()
ddif.close()
ddif = SerialDriver(port='/dev/ttyACM0')
logging.debug("Used devices: %r", ddif.used_devices)
ddif.close()
ddif = SerialDriver(port='/dev/ttyACM0', baudrate=115200)
logging.debug("Used devices: %r", ddif.used_devices)
ddif.close()
ddif = SerialDriver(port='/dev/ttyACM0', baudrate=115200, timeout=1)
logging.debug("Used devices: %r", ddif.used_devices)
ddif.close()
logging.debug("Used devices: %r", ddif.used_devices)
if __name__ == "__main__":
main()

View File

@ -7,8 +7,10 @@
This module handles parsing of information from RIOT periph_i2c test.
"""
import logging
from if_lib.dut_shell import DutShell
try:
from riot_pal import DutShell
except ImportError:
raise ImportError('Cannot find riot_pal, try "pip install riot_pal"')
class PeriphI2CIf(DutShell):
@ -120,7 +122,7 @@ def main():
logging.getLogger().setLevel(logging.DEBUG)
try:
i2c = PeriphI2CIf.from_autodetect()
i2c = PeriphI2CIf()
cmds = i2c.get_command_list()
logging.debug("======================================================")
for cmd in cmds:

View File

@ -14,8 +14,8 @@ import argparse
import time
import logging
import periph_i2c_if
from if_lib import bpt_if
from periph_i2c_if import PeriphI2CIf
from if_lib import PhilipIf
BPT_ADDR = 85
BPT_USER_REG = 152
@ -337,15 +337,8 @@ def main():
if args.dut_baud is not None:
baud = int(args.dut_baud, 0)
if args.bpt_port is None:
bpt = bpt_if.BptIf.from_autodetect()
else:
bpt = bpt_if.BptIf(port=args.bpt_port)
if args.dut_port is None:
i2c = periph_i2c_if.PeriphI2CIf.from_autodetect(baudrate=baud)
else:
i2c = periph_i2c_if.PeriphI2CIf(port=args.dut_port, baudrate=baud)
bpt = PhilipIf(port=args.bpt_port)
i2c = PeriphI2CIf(port=args.dut_port, baudrate=baud)
print('Starting Test periph_i2c')
test_list = []

View File

@ -6,7 +6,10 @@
"""@package PyToAPI
This module handles parsing of information from RIOT periph_uart test.
"""
from if_lib.dut_shell import DutShell
try:
from riot_pal import DutShell
except ImportError:
raise ImportError('Cannot find riot_pal, try "pip install riot_pal"')
class PeriphUartIf(DutShell):

View File

@ -17,7 +17,7 @@ import random
import string
from periph_uart_if import PeriphUartIf
from if_lib import bpt_if
from if_lib import PhilipIf
def kwexpect(val1, val2, level="WARN"):
@ -250,7 +250,7 @@ def main():
raise ValueError('Invalid log level: %s' % loglevel)
logging.basicConfig(level=loglevel)
bpt = bpt_if.BptIf(port=args.bpt_port)
bpt = PhilipIf(port=args.bpt_port)
uart = PeriphUartIf(port=args.dut_port)
print('Starting Test periph_uart')