From 85aaf1385e2ad45b655733b4c61dec49fba2af8c Mon Sep 17 00:00:00 2001 From: S Groesz Date: Tue, 10 Nov 2020 01:47:47 +0000 Subject: [PATCH] Update for Python3 --- .gitignore | 5 + pty-gpib-emulator/python/PTY_Interface.py | 6 +- .../python/enumerate_interfaces.py | 36 + vendortools/enumip.py | 2 + vendortools/nfcli.py | 156 +- vendortools/nfutil.py | 1316 +++++++++-------- vendortools/prologic_sim.py | 137 ++ 7 files changed, 936 insertions(+), 722 deletions(-) create mode 100644 .gitignore create mode 100644 pty-gpib-emulator/python/enumerate_interfaces.py create mode 100644 vendortools/prologic_sim.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ad6b940 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +__pycache__ +*.bak +*.swp +*.egg-info +*.bak2 diff --git a/pty-gpib-emulator/python/PTY_Interface.py b/pty-gpib-emulator/python/PTY_Interface.py index 9d4764d..e998d7b 100644 --- a/pty-gpib-emulator/python/PTY_Interface.py +++ b/pty-gpib-emulator/python/PTY_Interface.py @@ -58,7 +58,7 @@ class Interface: while (self.running): tty_in = os.read(self.m, readLen) for tty_out in self.handleInput(tty_in): - os.write(self.m, tty_out) + os.write(self.m, tty_out.encode()) ##%% handleInput(tty_in) @@ -67,7 +67,7 @@ class Interface: ## address. That device must handle the command via dev.handleCMD(cmd, args) ## def handleInput(self, tty_in): - tty_in = tty_in.strip() + tty_in = tty_in.strip().decode() if tty_in != "" and self.addr in self.devices: dev = self.devices[self.addr] @@ -83,5 +83,5 @@ class Interface: ## def addDevice(self, dev, addr): self.devices[addr] = dev - + diff --git a/pty-gpib-emulator/python/enumerate_interfaces.py b/pty-gpib-emulator/python/enumerate_interfaces.py new file mode 100644 index 0000000..3bb80c1 --- /dev/null +++ b/pty-gpib-emulator/python/enumerate_interfaces.py @@ -0,0 +1,36 @@ +# Use those functions to enumerate all interfaces available on the system using Python. +# found on + +import socket +import fcntl +import struct +import array + +def all_interfaces(): + max_possible = 128 # arbitrary. raise if needed. + bytes = max_possible * 32 + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + names = array.array('B', '\0' * bytes) + outbytes = struct.unpack('iL', fcntl.ioctl( + s.fileno(), + 0x8912, # SIOCGIFCONF + struct.pack('iL', bytes, names.buffer_info()[0]) + ))[0] + namestr = names.tostring() + lst = [] + for i in range(0, outbytes, 40): + name = namestr[i:i+16].split('\0', 1)[0] + ip = namestr[i+20:i+24] + lst.append((name, ip)) + return lst + +def format_ip(addr): + return str(ord(addr[0])) + '.' + \ + str(ord(addr[1])) + '.' + \ + str(ord(addr[2])) + '.' + \ + str(ord(addr[3])) + + +ifs = all_interfaces() +for i in ifs: + print "%12s %s" % (i[0], format_ip(i[1])) diff --git a/vendortools/enumip.py b/vendortools/enumip.py index b5919eb..3bd7910 100755 --- a/vendortools/enumip.py +++ b/vendortools/enumip.py @@ -51,6 +51,7 @@ IOCTL_SIOCGIFCONF = 0x8912 def enumIpUnix(): import fcntl + # struct_size 40 if system is 64 bit struct_size = 40 if (sys.maxsize > 2**32) else 32 inbytes = 128 * struct_size # Maximum 128 interfaces @@ -68,3 +69,4 @@ def enumIpUnix(): struct_size)] return [ip for ip in iplist if ip != '127.0.0.1'] + diff --git a/vendortools/nfcli.py b/vendortools/nfcli.py index 11c5550..b6c825e 100755 --- a/vendortools/nfcli.py +++ b/vendortools/nfcli.py @@ -11,16 +11,18 @@ from enumip import * #----------------------------------------------------------------------------- def usage(): - print - print "Usage: nfcli --list --eth_addr=ADDR --ip_type=TYPE --ip_addr=IP, --netmask=MASK, --gateway=GW" - print "Search and configure Prologix GPIB-ETHERNET Controllers." - print "--help : display this help" - print "--list : search for controllers" - print "--eth_addr=ADDR : configure controller with Ethernet address ADDR" - print "--ip_type=TYPE : set controller ip address type to TYPE (\"static\" or \"dhcp\")" - print "--ip_addr=IP : set controller address to IP" - print "--netmask=MASK : set controller network mask to MASK" - print "--gateway=GW : set controller default gateway to GW" + print() + print("Usage: nfcli --list --eth_addr=ADDR --ip_type=TYPE --ip_addr=IP,", + "--netmask=MASK, --gateway=GW") + print("Search and configure Prologix GPIB-ETHERNET Controllers.") + print("--help : display this help") + print("--list : search for controllers") + print("--eth_addr=ADDR : configure controller with Ethernet address ADDR") + print('--ip_type=TYPE : set controller ip address type to TYPE', + '("static" or "dhcp")') + print("--ip_addr=IP : set controller address to IP") + print("--netmask=MASK : set controller network mask to MASK") + print("--gateway=GW : set controller default gateway to GW") #----------------------------------------------------------------------------- @@ -37,19 +39,19 @@ def ValidateNetParams(ip_str, mask_str, gw_str): try: ip = socket.inet_aton(ip_str) except: - print "IP address is invalid." + print("IP address is invalid.") return False try: mask = socket.inet_aton(mask_str) except: - print "Network mask is invalid." + print("Network mask is invalid.") return False try: gw = socket.inet_aton(gw_str) except: - print "Gateway address is invalid." + print("Gateway address is invalid.") return False # Validate network mask @@ -59,12 +61,12 @@ def ValidateNetParams(ip_str, mask_str, gw_str): # Exclude restricted masks if (mask == 0) or (mask == 0xFFFFFFFF): - print "Network mask is invalid." + print("Network mask is invalid.") return False # Exclude non-left-contiguous masks if (((mask + (mask & -mask)) & 0xFFFFFFFF) != 0): - print "Network mask is not contiguous." + print("Network mask is not contiguous.") return False @@ -78,7 +80,7 @@ def ValidateNetParams(ip_str, mask_str, gw_str): # Exclude restricted addresses # 0.0.0.0 is valid if ((gw != 0) and ((octet1 == 0) or (octet1 == 127) or (octet1 > 223))): - print "Gateway address is invalid." + print("Gateway address is invalid.") return False # Validate IP address @@ -90,17 +92,17 @@ def ValidateNetParams(ip_str, mask_str, gw_str): # Exclude restricted addresses if ((octet1 == 0) or (octet1 == 127) or (octet1 > 223)): - print "IP address is invalid." + print("IP address is invalid.") return False # Exclude subnet network address if ((ip & ~mask) == 0): - print "IP address is invalid." + print("IP address is invalid.") return False # Exclude subnet broadcast address if ((ip & ~mask) == (0xFFFFFFFF & ~mask)): - print "IP address is invalid." + print("IP address is invalid.") return False return True @@ -139,9 +141,16 @@ def main(): eth_addr = None try: - opts, args = getopt.getopt(sys.argv[1:], '', ['help', 'list', 'eth_addr=', 'ip_type=', 'ip_addr=', 'netmask=', 'gateway=']) - except getopt.GetoptError, err: - print str(err) + opts, args = getopt.getopt(sys.argv[1:], '', + ['help', + 'list', + 'eth_addr=', + 'ip_type=', + 'ip_addr=', + 'netmask=', + 'gateway=']) + except getopt.GetoptError as err: + print(err) sys.exit(1) # Check for unparsed parameters @@ -150,7 +159,7 @@ def main(): sys.exit(1) for o, a in opts: - if o == "--help": + if o == "--help": showhelp = True elif o == "--list": search = True @@ -168,22 +177,22 @@ def main(): if (len(opts) == 0) or (showhelp): usage() sys.exit(1) - + if search: if not eth_addr is None: - print "--list and --eth_addr are not compatible." + print("--list and --eth_addr are not compatible.") invalid_args = True if not ip_type is None: - print "--list and --ip_type are not compatible." + print("--list and --ip_type are not compatible.") invalid_args = True if not ip_addr is None: - print "--list and --ip_addr are not compatible." + print("--list and --ip_addr are not compatible.") invalid_args = True if not netmask is None: - print "--list and --netmask are not compatible." + print("--list and --netmask are not compatible.") invalid_args = True if not gateway is None: - print "--list and --gateway are not compatible." + print("--list and --gateway are not compatible.") invalid_args = True else: @@ -191,19 +200,19 @@ def main(): eth_addr = eth_addr.strip().replace(":", "").replace("-", "") eth_addr = eth_addr.decode('hex') except: - print "Invalid Ethernet address." + print("Invalid Ethernet address.") sys.exit(1) if len(eth_addr) != 6: - print "Invalid Ethernet address." + print("Invalid Ethernet address.") sys.exit(1) - if ip_type in ["Static", "static"]: + if ip_type.lower() in ["static"]: ip_type = NF_IP_STATIC - elif ip_type in ["Dynamic", "dynamic", "Dhcp", "dhcp"]: + elif ip_type.lower in ["dynamic", "dhcp"]: ip_type = NF_IP_DYNAMIC else: - print "--ip_type must be 'static' or 'dhcp'." + print("--ip_type must be 'static' or 'dhcp'.") sys.exit(1) if ip_type == NF_IP_STATIC: @@ -227,17 +236,17 @@ def main(): if ip_addr is None: ip_addr = "0.0.0.0" else: - print "--ip_addr not allowed when --ip_type=dhcp." + print("--ip_addr not allowed when --ip_type=dhcp.") invalid_args = True if netmask is None: netmask = "0.0.0.0" else: - print "--netmask not allowed when --ip_type=dhcp." + print("--netmask not allowed when --ip_type=dhcp.") invalid_args = True if gateway is None: gateway = "0.0.0.0" else: - print "--gateway not allowed when --ip_type=dhcp." + print("--gateway not allowed when --ip_type=dhcp.") invalid_args = True if invalid_args: @@ -246,33 +255,33 @@ def main(): global seq - sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) - + # sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) + iplist = enumIp(); if len(iplist) == 0: - print "Host has no IP address." + print("Host has no IP address.") sys.exit(1) - + devices = {} - - for ip in iplist: - print "Searching through network interface:", ip - + + for ip in iplist: + print("Searching through network interface:", ip) + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) port = 0 - + try: s.bind((ip, port)) - except socket.error, e: - print "Bind error on send socket:", e + except socket.error as e: + print("Bind error on send socket:", e) sys.exit(1) - port = s.getsockname()[1] - + port = s.getsockname()[1] + r = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) r.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -281,12 +290,12 @@ def main(): try: r.bind(('', port)) - except socket.error, e: - print "Bind error on receive socket:", e + except socket.error as e: + print("Bind error on receive socket:", e) sys.exit(1) - + d = Discover(s, r) - print + print() for k in d: d[k]['host_ip'] = ip @@ -296,15 +305,16 @@ def main(): r.close() if search: - print - print "Found", len(devices), "Prologix GPIB-ETHERNET Controller(s)." + print() + print("Found", len(devices), "Prologix GPIB-ETHERNET Controller(s).") for key in devices: PrintDetails(devices[key]) - print + print() else: if eth_addr in devices: - print "Updating network settings of Prologix GPIB-ETHERNET Controller", FormatEthAddr(eth_addr) + print("Updating Prologix GPIB-ETHERNET Controller settings", + FormatEthAddr(eth_addr)) device = devices[eth_addr] @@ -317,11 +327,11 @@ def main(): try: s.bind((device['host_ip'], port)) - except socket.error, e: - print "Bind error on send socket:", e + except socket.error as e: + print("Bind error on send socket:", e) sys.exit(1) - port = s.getsockname()[1] + port = s.getsockname()[1] r = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) r.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -331,27 +341,31 @@ def main(): try: r.bind(('', port)) - except socket.error, e: - print "Bind error on receive socket:", e + except socket.error as e: + print("Bind error on receive socket:", e) sys.exit(1) result = Assignment(s, r, eth_addr, ip_type, ip_addr, netmask, gateway) - print + print() if len(result) == 0: - print "Network settings update failed." + print("Network settings update failed.") else: if result['result'] == NF_SUCCESS: - print "Network settings updated successfully." + print("Network settings updated successfully.") else: - print "Network settings update failed." + print("Network settings update failed.") else: - print "Prologix GPIB-ETHERNET Controller", FormatEthAddr(eth_addr), "already configured for DHCP." + print("Prologix GPIB-ETHERNET Controller", + FormatEthAddr(eth_addr), + "already configured for DHCP.") else: - print "Prologix GPIB-ETHERNET Controller", FormatEthAddr(eth_addr), "not found." - + print("Prologix GPIB-ETHERNET Controller", + FormatEthAddr(eth_addr), + "not found.") + + - #----------------------------------------------------------------------------- if __name__ == "__main__": main() diff --git a/vendortools/nfutil.py b/vendortools/nfutil.py index 37101bf..d654346 100755 --- a/vendortools/nfutil.py +++ b/vendortools/nfutil.py @@ -1,648 +1,668 @@ -import struct -import random -import socket -import array -import time -import sys -import socket - - -NETFINDER_SERVER_PORT = 3040 - -NF_IDENTIFY = 0 -NF_IDENTIFY_REPLY = 1 -NF_ASSIGNMENT = 2 -NF_ASSIGNMENT_REPLY = 3 -NF_FLASH_ERASE = 4 -NF_FLASH_ERASE_REPLY = 5 -NF_BLOCK_SIZE = 6 -NF_BLOCK_SIZE_REPLY = 7 -NF_BLOCK_WRITE = 8 -NF_BLOCK_WRITE_REPLY = 9 -NF_VERIFY = 10 -NF_VERIFY_REPLY = 11 -NF_REBOOT = 12 -NF_SET_ETHERNET_ADDRESS = 13 -NF_SET_ETHERNET_ADDRESS_REPLY = 14 -NF_TEST = 15 -NF_TEST_REPLY = 16 - -NF_SUCCESS = 0 -NF_CRC_MISMATCH = 1 -NF_INVALID_MEMORY_TYPE = 2 -NF_INVALID_SIZE = 3 -NF_INVALID_IP_TYPE = 4 - -NF_MAGIC = 0x5A - -NF_IP_DYNAMIC = 0 -NF_IP_STATIC = 1 - -NF_ALERT_OK = 0x00 -NF_ALERT_WARN = 0x01 -NF_ALERT_ERROR = 0xFF - -NF_MODE_BOOTLOADER = 0 -NF_MODE_APPLICATION = 1 - -NF_MEMORY_FLASH = 0 -NF_MEMORY_EEPROM = 1 - -NF_REBOOT_CALL_BOOTLOADER = 0 -NF_REBOOT_RESET = 1 - - -HEADER_FMT = "!2cH6s2x" -IDENTIFY_FMT = HEADER_FMT -IDENTIFY_REPLY_FMT = "!H6c4s4s4s4s4s4s32s" -ASSIGNMENT_FMT = "!3xc4s4s4s32x" -ASSIGNMENT_REPLY_FMT = "!c3x" -FLASH_ERASE_FMT = HEADER_FMT -FLASH_ERASE_REPLY_FMT = HEADER_FMT -BLOCK_SIZE_FMT = HEADER_FMT -BLOCK_SIZE_REPLY_FMT = "!H2x" -BLOCK_WRITE_FMT = "!cxHI" -BLOCK_WRITE_REPLY_FMT = "!c3x" -VERIFY_FMT = HEADER_FMT -VERIFY_REPLY_FMT = "!c3x" -REBOOT_FMT = "!c3x" -SET_ETHERNET_ADDRESS_FMT = "!6s2x" -SET_ETHERNET_ADDRESS_REPLY_FMT = HEADER_FMT -TEST_FMT = HEADER_FMT -TEST_REPLY_FMT = "!32s" - -MAX_ATTEMPTS = 10 -MAX_TIMEOUT = 0.5 - - -#----------------------------------------------------------------------------- -def MkHeader(id, seq, eth_addr): - return struct.pack( - HEADER_FMT, - chr(NF_MAGIC), - chr(id), - seq, - eth_addr - ); - - -#----------------------------------------------------------------------------- -def MkIdentify(seq): - return MkHeader(NF_IDENTIFY, seq, '\xFF\xFF\xFF\xFF\xFF\xFF') - - -#----------------------------------------------------------------------------- -def MkAssignment(seq, eth_addr, ip_type, ip_addr, netmask, gateway): - - return MkHeader(NF_ASSIGNMENT, seq, eth_addr) + \ - struct.pack( - ASSIGNMENT_FMT, - chr(ip_type), - socket.inet_aton(ip_addr), - socket.inet_aton(netmask), - socket.inet_aton(gateway) - ) - - -#----------------------------------------------------------------------------- -def MkFlashErase(seq, eth_addr): - return MkHeader(NF_FLASH_ERASE, seq, eth_addr) - - -#----------------------------------------------------------------------------- -def MkBlockSize(seq, eth_addr): - return MkHeader(NF_BLOCK_SIZE, seq, eth_addr) - - -#----------------------------------------------------------------------------- -def MkBlockWrite(seq, eth_addr, memtype, addr, data): - return MkHeader(NF_BLOCK_WRITE, seq, eth_addr) + \ - struct.pack( - BLOCK_WRITE_FMT, - chr(memtype), - len(data), - addr, - ) + \ - data - - -#----------------------------------------------------------------------------- -def MkVerify(seq, eth_addr): - return MkHeader(NF_VERIFY, seq, eth_addr) - - -#----------------------------------------------------------------------------- -def MkReboot(seq, eth_addr, reboottype): - return MkHeader(NF_REBOOT, seq, eth_addr) + \ - struct.pack( - REBOOT_FMT, - chr(reboottype) - ); - - -#----------------------------------------------------------------------------- -def MkTest(seq, eth_addr): - return MkHeader(NF_TEST, seq, eth_addr) - - -#----------------------------------------------------------------------------- -def MkSetEthernetAddress(seq, eth_addr, new_eth_addr): - return MkHeader(NF_SET_ETHERNET_ADDRESS, seq, eth_addr) + \ - struct.pack( - SET_ETHERNET_ADDRESS_FMT, - new_eth_addr - ); - - -#----------------------------------------------------------------------------- -def UnMkHeader(msg): - params = struct.unpack( - HEADER_FMT, - msg - ); - - d = {} - d['magic'] = ord(params[0]) - d['id'] = ord(params[1]) - d['sequence'] = params[2] - d['eth_addr'] = params[3] - return d - - -#----------------------------------------------------------------------------- -def UnMkIdentifyReply(msg): - hdrlen = struct.calcsize(HEADER_FMT) - - d = UnMkHeader(msg[0:hdrlen]) - - params = struct.unpack( - IDENTIFY_REPLY_FMT, - msg[hdrlen:] - ); - - d['uptime_days'] = params[0] - d['uptime_hrs'] = ord(params[1]) - d['uptime_min'] = ord(params[2]) - d['uptime_secs'] = ord(params[3]) - d['mode'] = ord(params[4]) - d['alert'] = ord(params[5]) - d['ip_type'] = ord(params[6]) - d['ip_addr'] = params[7] - d['ip_netmask'] = params[8] - d['ip_gw'] = params[9] - d['app_ver'] = params[10] - d['boot_ver'] = params[11] - d['hw_ver'] = params[12] - d['name'] = params[13] - return d - - -#----------------------------------------------------------------------------- -def UnMkAssignmentReply(msg): - hdrlen = struct.calcsize(HEADER_FMT) - - d = UnMkHeader(msg[0:hdrlen]) - - params = struct.unpack( - ASSIGNMENT_REPLY_FMT, - msg[hdrlen:] - ); - - d['result'] = ord(params[0]) - return d - - -#----------------------------------------------------------------------------- -def UnMkFlashEraseReply(msg): - return UnMkHeader(msg) - - -#----------------------------------------------------------------------------- -def UnMkBlockSizeReply(msg): - hdrlen = struct.calcsize(HEADER_FMT) - - d = UnMkHeader(msg[0:hdrlen]) - - params = struct.unpack( - BLOCK_SIZE_REPLY_FMT, - msg[hdrlen:] - ); - - d['size'] = params[0] - return d - - -#----------------------------------------------------------------------------- -def UnMkBlockWriteReply(msg): - hdrlen = struct.calcsize(HEADER_FMT) - - d = UnMkHeader(msg[0:hdrlen]) - - params = struct.unpack( - BLOCK_WRITE_REPLY_FMT, - msg[hdrlen:] - ); - - d['result'] = ord(params[0]) - return d - - -#----------------------------------------------------------------------------- -def UnMkVerifyReply(msg): - hdrlen = struct.calcsize(HEADER_FMT) - - d = UnMkHeader(msg[0:hdrlen]) - - params = struct.unpack( - VERIFY_REPLY_FMT, - msg[hdrlen:] - ); - - d['result'] = ord(params[0]) - return d - - -#----------------------------------------------------------------------------- -def UnMkTestReply(msg): - hdrlen = struct.calcsize(HEADER_FMT) - - d = UnMkHeader(msg[0:hdrlen]) - - params = struct.unpack( - TEST_REPLY_FMT, - msg[hdrlen:] - ); - - result = '' - for i in params[0]: - if ord(i) == 0: - break - result = result + i - - d['result'] = result - return d - - -#----------------------------------------------------------------------------- -def UnMkSetEthernetAddressReply(msg): - return UnMkHeader(msg) - - -#----------------------------------------------------------------------------- -def SendMsg(s, msg): - - try: - s.sendto(msg, ('', NETFINDER_SERVER_PORT)) - except socket.error, e: - print e - return False; - - return True; - -#----------------------------------------------------------------------------- -def RecvMsg(s): - try: - return s.recv(256) - except socket.error: # ignore socket errors - return '' - -#----------------------------------------------------------------------------- -def Discover(s, r): - - devices = {} - - attempts = 2 - while attempts > 0: - attempts = attempts - 1 - - seq = random.randint(1, 65535) - msg = MkIdentify(seq) - - if (SendMsg(s, msg)): - exp = time.time() + MAX_TIMEOUT - while time.time() < exp: - sys.stdout.write('.') - reply = RecvMsg(r) - if len(reply) != struct.calcsize(HEADER_FMT) + struct.calcsize(IDENTIFY_REPLY_FMT): - continue - - d = UnMkIdentifyReply(reply) - - if d['magic'] != NF_MAGIC: - continue - if d['id'] != NF_IDENTIFY_REPLY: - continue - if d['sequence'] != seq: - continue - - devices[d['eth_addr']] = d - - return devices - - -#----------------------------------------------------------------------------- -def Identify(s, r, eth_addr): - - attempts = 2 - while attempts > 0: - attempts = attempts - 1 - - seq = random.randint(1, 65535) - msg = MkIdentify(seq) - - if (SendMsg(s, msg)): - exp = time.time() + 2 # Longer timeout - - while time.time() < exp: - sys.stdout.write('.') - reply = RecvMsg(r) - if len(reply) != struct.calcsize(HEADER_FMT) + struct.calcsize(IDENTIFY_REPLY_FMT): - continue - - d = UnMkIdentifyReply(reply) - - if d['magic'] != NF_MAGIC: - continue - if d['id'] != NF_IDENTIFY_REPLY: - continue - if d['sequence'] != seq: - continue - if d['eth_addr'] != eth_addr: - continue - - return d - - return {} - - -#----------------------------------------------------------------------------- -def Assignment(s, r, eth_addr, ip_type, ip_addr, netmask, gateway): - - attempts = MAX_ATTEMPTS - while attempts > 0: - attempts = attempts - 1 - - seq = random.randint(1, 65535) - msg = MkAssignment(seq, eth_addr, ip_type, ip_addr, netmask, gateway) - - if (SendMsg(s, msg)): - exp = time.time() + MAX_TIMEOUT - - while time.time() < exp: - sys.stdout.write('.') - reply = RecvMsg(r) - - if len(reply) != struct.calcsize(HEADER_FMT) + struct.calcsize(ASSIGNMENT_REPLY_FMT): - continue - - d = UnMkAssignmentReply(reply) - - if d['magic'] != NF_MAGIC: - continue - if d['id'] != NF_ASSIGNMENT_REPLY: - continue - if d['sequence'] != seq: - continue - if d['eth_addr'] != eth_addr: - continue - - return d - - return {} - - -#----------------------------------------------------------------------------- -def FlashErase(s, r, eth_addr): - - attempts = MAX_ATTEMPTS - while attempts > 0: - attempts = attempts - 1 - - seq = random.randint(1, 65535) - msg = MkFlashErase(seq, eth_addr) - - if (SendMsg(s, msg)): - exp = time.time() + 10 # Flash erase could take a while - - while time.time() < exp: - sys.stdout.write('.') - reply = RecvMsg(r) - if len(reply) != struct.calcsize(HEADER_FMT): - continue - - d = UnMkFlashEraseReply(reply) - - if d['magic'] != NF_MAGIC: - continue - if d['id'] != NF_FLASH_ERASE_REPLY: - continue - if d['sequence'] != seq: - continue - if d['eth_addr'] != eth_addr: - continue - - return d - - return {} - - -#----------------------------------------------------------------------------- -def BlockSize(s, r, eth_addr): - - attempts = MAX_ATTEMPTS - while attempts > 0: - attempts = attempts - 1 - - seq = random.randint(1, 65535) - msg = MkBlockSize(seq, eth_addr) - - if (SendMsg(s, msg)): - exp = time.time() + MAX_TIMEOUT - - while time.time() < exp: - sys.stdout.write('.') - reply = RecvMsg(r) - if len(reply) != struct.calcsize(HEADER_FMT) + struct.calcsize(BLOCK_SIZE_REPLY_FMT): - continue - - d = UnMkBlockSizeReply(reply) - - if d['magic'] != NF_MAGIC: - continue - if d['id'] != NF_BLOCK_SIZE_REPLY: - continue - if d['sequence'] != seq: - continue - if d['eth_addr'] != eth_addr: - continue - - return d - - return {} - - -#----------------------------------------------------------------------------- -def BlockWrite(s, r, eth_addr, memtype, addr, data): - - attempts = MAX_ATTEMPTS - while attempts > 0: - attempts = attempts - 1 - - seq = random.randint(1, 65535) - msg = MkBlockWrite(seq, eth_addr, memtype, addr, data) - - if (SendMsg(s, msg)): - exp = time.time() + MAX_TIMEOUT - - while time.time() < exp: - #sys.stdout.write('.'), - reply = RecvMsg(r) - if len(reply) != struct.calcsize(HEADER_FMT) + struct.calcsize(BLOCK_WRITE_REPLY_FMT): - continue - - d = UnMkBlockWriteReply(reply) - - if d['magic'] != NF_MAGIC: - continue - if d['id'] != NF_BLOCK_WRITE_REPLY: - continue - if d['sequence'] != seq: - continue - if d['eth_addr'] != eth_addr: - continue - - return d - - return {} - - -#----------------------------------------------------------------------------- -def Verify(s, r, eth_addr): - - attempts = MAX_ATTEMPTS - while attempts > 0: - attempts = attempts - 1 - - seq = random.randint(1, 65535) - msg = MkVerify(seq, eth_addr) - - if (SendMsg(s, msg)): - exp = time.time() + MAX_TIMEOUT - - while time.time() < exp: - sys.stdout.write('.') - reply = RecvMsg(r) - if len(reply) != struct.calcsize(HEADER_FMT) + struct.calcsize(VERIFY_REPLY_FMT): - continue - - d = UnMkVerifyReply(reply) - - if d['magic'] != NF_MAGIC: - continue - if d['id'] != NF_VERIFY_REPLY: - continue - if d['sequence'] != seq: - continue - if d['eth_addr'] != eth_addr: - continue - - return d - - return {} - - -#----------------------------------------------------------------------------- -def Reboot(s, eth_addr, reboottype): - seq = random.randint(1, 65535) - msg = MkReboot(seq, eth_addr, reboottype) - - if (SendMsg(s, msg)): - return - - -#----------------------------------------------------------------------------- -def Test(s, r, eth_addr): - seq = random.randint(1, 65535) - msg = MkTest(seq, eth_addr) - - if (SendMsg(s, msg)): - exp = time.time() + MAX_TIMEOUT - - while time.time() < exp: - sys.stdout.write('.') - reply = RecvMsg(r) - if len(reply) != struct.calcsize(HEADER_FMT) + struct.calcsize(TEST_REPLY_FMT): - continue - - d = UnMkTestReply(reply) - - if d['magic'] != NF_MAGIC: - continue - if d['id'] != NF_TEST_REPLY: - continue - if d['sequence'] != seq: - continue - if d['eth_addr'] != eth_addr: - continue - - return d - - return {} - - -#----------------------------------------------------------------------------- -def SetEthernetAddress(s, r, eth_addr, new_eth_addr): - seq = random.randint(1, 65535) - msg = MkSetEthernetAddress(seq, eth_addr, new_eth_addr) - - if (SendMsg(s, msg)): - exp = time.time() + MAX_TIMEOUT - - while time.time() < exp: - sys.stdout.write('.') - reply = RecvMsg(r) - if len(reply) != struct.calcsize(HEADER_FMT): - continue - - d = UnMkSetEthernetAddressReply(reply) - - if d['magic'] != NF_MAGIC: - continue - if d['id'] != NF_SET_ETHERNET_ADDRESS_REPLY: - continue - if d['sequence'] != seq: - continue - if d['eth_addr'] != eth_addr: - continue - - return d - - return {} - -#----------------------------------------------------------------------------- -def FormatEthAddr(a): - return "%02X-%02X-%02X-%02X-%02X-%02X" % (ord(a[0]), ord(a[1]), ord(a[2]), ord(a[3]), ord(a[4]), ord(a[5])) - - -#----------------------------------------------------------------------------- -def PrintDetails(d): - - print - print "Ethernet Address:", FormatEthAddr(d['eth_addr']) - print "Hardware:", socket.inet_ntoa(d['hw_ver']), "Bootloader:", socket.inet_ntoa(d['boot_ver']), "Application:", socket.inet_ntoa(d['app_ver']) - print "Uptime:", d['uptime_days'], 'days', d['uptime_hrs'], 'hours', d['uptime_min'], 'minutes', d['uptime_secs'], 'seconds' - if d['ip_type'] == NF_IP_STATIC: - print "Static IP" - elif d['ip_type'] == NF_IP_DYNAMIC: - print "Dynamic IP" - else: - print "Unknown IP type" - print "IP Address:", socket.inet_ntoa(d['ip_addr']), "Mask:", socket.inet_ntoa(d['ip_netmask']), "Gateway:", socket.inet_ntoa(d['ip_gw']) - print "Mode:", - if d['mode'] == NF_MODE_BOOTLOADER: - print 'Bootloader' - elif d['mode'] == NF_MODE_APPLICATION: - print 'Application' - else: - print 'Unknown' - +# Python >= 3.6 + +import struct +import random +import socket +import array +import time +import sys +import socket + + +NETFINDER_SERVER_PORT = 3040 + +NF_IDENTIFY = 0 +NF_IDENTIFY_REPLY = 1 +NF_ASSIGNMENT = 2 +NF_ASSIGNMENT_REPLY = 3 +NF_FLASH_ERASE = 4 +NF_FLASH_ERASE_REPLY = 5 +NF_BLOCK_SIZE = 6 +NF_BLOCK_SIZE_REPLY = 7 +NF_BLOCK_WRITE = 8 +NF_BLOCK_WRITE_REPLY = 9 +NF_VERIFY = 10 +NF_VERIFY_REPLY = 11 +NF_REBOOT = 12 +NF_SET_ETHERNET_ADDRESS = 13 +NF_SET_ETHERNET_ADDRESS_REPLY = 14 +NF_TEST = 15 +NF_TEST_REPLY = 16 + +NF_SUCCESS = 0 +NF_CRC_MISMATCH = 1 +NF_INVALID_MEMORY_TYPE = 2 +NF_INVALID_SIZE = 3 +NF_INVALID_IP_TYPE = 4 + +NF_MAGIC = 0x5A + +NF_IP_DYNAMIC = 0 +NF_IP_STATIC = 1 + +NF_ALERT_OK = 0x00 +NF_ALERT_WARN = 0x01 +NF_ALERT_ERROR = 0xFF + +NF_MODE_BOOTLOADER = 0 +NF_MODE_APPLICATION = 1 + +NF_MEMORY_FLASH = 0 +NF_MEMORY_EEPROM = 1 + +NF_REBOOT_CALL_BOOTLOADER = 0 +NF_REBOOT_RESET = 1 + + +HEADER_FMT = "!2cH6s2x" +IDENTIFY_FMT = HEADER_FMT +IDENTIFY_REPLY_FMT = "!H6c4s4s4s4s4s4s32s" +ASSIGNMENT_FMT = "!3xc4s4s4s32x" +ASSIGNMENT_REPLY_FMT = "!c3x" +FLASH_ERASE_FMT = HEADER_FMT +FLASH_ERASE_REPLY_FMT = HEADER_FMT +BLOCK_SIZE_FMT = HEADER_FMT +BLOCK_SIZE_REPLY_FMT = "!H2x" +BLOCK_WRITE_FMT = "!cxHI" +BLOCK_WRITE_REPLY_FMT = "!c3x" +VERIFY_FMT = HEADER_FMT +VERIFY_REPLY_FMT = "!c3x" +REBOOT_FMT = "!c3x" +SET_ETHERNET_ADDRESS_FMT = "!6s2x" +SET_ETHERNET_ADDRESS_REPLY_FMT = HEADER_FMT +TEST_FMT = HEADER_FMT +TEST_REPLY_FMT = "!32s" + +MAX_ATTEMPTS = 10 +MAX_TIMEOUT = 0.5 + + +#----------------------------------------------------------------------------- +def MkHeader(header, seq, eth_addr): + return struct.pack( + HEADER_FMT, + bytes([NF_MAGIC]), + bytes([header]), + seq, + eth_addr + ) + + +#----------------------------------------------------------------------------- +def MkIdentify(seq): + return MkHeader(NF_IDENTIFY, seq, bytes([0xFF]) * 6) + + +#----------------------------------------------------------------------------- +def MkAssignment(seq, eth_addr, ip_type, ip_addr, netmask, gateway): + + return MkHeader(NF_ASSIGNMENT, seq, eth_addr) + \ + struct.pack( + ASSIGNMENT_FMT, + chr(ip_type), + socket.inet_aton(ip_addr), + socket.inet_aton(netmask), + socket.inet_aton(gateway) + ) + + +#----------------------------------------------------------------------------- +def MkFlashErase(seq, eth_addr): + return MkHeader(NF_FLASH_ERASE, seq, eth_addr) + + +#----------------------------------------------------------------------------- +def MkBlockSize(seq, eth_addr): + return MkHeader(NF_BLOCK_SIZE, seq, eth_addr) + + +#----------------------------------------------------------------------------- +def MkBlockWrite(seq, eth_addr, memtype, addr, data): + return MkHeader(NF_BLOCK_WRITE, seq, eth_addr) + \ + struct.pack( + BLOCK_WRITE_FMT, + chr(memtype), + len(data), + addr, + ) + \ + data + + +#----------------------------------------------------------------------------- +def MkVerify(seq, eth_addr): + return MkHeader(NF_VERIFY, seq, eth_addr) + + +#----------------------------------------------------------------------------- +def MkReboot(seq, eth_addr, reboottype): + return MkHeader(NF_REBOOT, seq, eth_addr) + \ + struct.pack( + REBOOT_FMT, + chr(reboottype) + ) + + +#----------------------------------------------------------------------------- +def MkTest(seq, eth_addr): + return MkHeader(NF_TEST, seq, eth_addr) + + +#----------------------------------------------------------------------------- +def MkSetEthernetAddress(seq, eth_addr, new_eth_addr): + return MkHeader(NF_SET_ETHERNET_ADDRESS, seq, eth_addr) + \ + struct.pack( + SET_ETHERNET_ADDRESS_FMT, + new_eth_addr + ) + + +#----------------------------------------------------------------------------- +def UnMkHeader(msg): + params = struct.unpack( + HEADER_FMT, + msg + ) + + d = {} + d['magic'] = ord(params[0]) + d['id'] = ord(params[1]) + d['sequence'] = params[2] + d['eth_addr'] = params[3] + return d + + +#----------------------------------------------------------------------------- +def UnMkIdentifyReply(msg): + hdrlen = struct.calcsize(HEADER_FMT) + + d = UnMkHeader(msg[0:hdrlen]) + + params = struct.unpack( + IDENTIFY_REPLY_FMT, + msg[hdrlen:] + ) + + d['uptime_days'] = params[0] + d['uptime_hrs'] = ord(params[1]) + d['uptime_min'] = ord(params[2]) + d['uptime_secs'] = ord(params[3]) + d['mode'] = ord(params[4]) + d['alert'] = ord(params[5]) + d['ip_type'] = ord(params[6]) + d['ip_addr'] = params[7] + d['ip_netmask'] = params[8] + d['ip_gw'] = params[9] + d['app_ver'] = params[10] + d['boot_ver'] = params[11] + d['hw_ver'] = params[12] + d['name'] = params[13] + return d + + +#----------------------------------------------------------------------------- +def UnMkAssignmentReply(msg): + hdrlen = struct.calcsize(HEADER_FMT) + + d = UnMkHeader(msg[0:hdrlen]) + + params = struct.unpack( + ASSIGNMENT_REPLY_FMT, + msg[hdrlen:] + ) + + d['result'] = ord(params[0]) + return d + + +#----------------------------------------------------------------------------- +def UnMkFlashEraseReply(msg): + return UnMkHeader(msg) + + +#----------------------------------------------------------------------------- +def UnMkBlockSizeReply(msg): + hdrlen = struct.calcsize(HEADER_FMT) + + d = UnMkHeader(msg[0:hdrlen]) + + params = struct.unpack( + BLOCK_SIZE_REPLY_FMT, + msg[hdrlen:] + ) + + d['size'] = params[0] + return d + + +#----------------------------------------------------------------------------- +def UnMkBlockWriteReply(msg): + hdrlen = struct.calcsize(HEADER_FMT) + + d = UnMkHeader(msg[0:hdrlen]) + + params = struct.unpack( + BLOCK_WRITE_REPLY_FMT, + msg[hdrlen:] + ) + + d['result'] = ord(params[0]) + return d + + +#----------------------------------------------------------------------------- +def UnMkVerifyReply(msg): + hdrlen = struct.calcsize(HEADER_FMT) + + d = UnMkHeader(msg[0:hdrlen]) + + params = struct.unpack( + VERIFY_REPLY_FMT, + msg[hdrlen:] + ) + + d['result'] = ord(params[0]) + return d + + +#----------------------------------------------------------------------------- +def UnMkTestReply(msg): + hdrlen = struct.calcsize(HEADER_FMT) + + d = UnMkHeader(msg[0:hdrlen]) + + params = struct.unpack( + TEST_REPLY_FMT, + msg[hdrlen:] + ) + + result = '' + for i in params[0]: + if ord(i) == 0: + break + result = result + i + + d['result'] = result + return d + + +#----------------------------------------------------------------------------- +def UnMkSetEthernetAddressReply(msg): + return UnMkHeader(msg) + + +#----------------------------------------------------------------------------- +def SendMsg(s, msg): + + try: + s.sendto(msg, ('', NETFINDER_SERVER_PORT)) + except socket.error as e: + print(e) + return False + + return True + +#----------------------------------------------------------------------------- +def RecvMsg(s): + try: + return s.recv(256) + except socket.error: # ignore socket errors + return '' + +#----------------------------------------------------------------------------- +def Discover(s, r): + + devices = {} + + attempts = 2 + while attempts > 0: + attempts = attempts - 1 + + seq = random.randint(1, 65535) + msg = MkIdentify(seq) + + if (SendMsg(s, msg)): + exp = time.time() + MAX_TIMEOUT + while time.time() < exp: + sys.stdout.write('.') + reply = RecvMsg(r) + if len(reply) != struct.calcsize(HEADER_FMT) + struct.calcsize(IDENTIFY_REPLY_FMT): + continue + + d = UnMkIdentifyReply(reply) + + if d['magic'] != NF_MAGIC: + continue + if d['id'] != NF_IDENTIFY_REPLY: + continue + if d['sequence'] != seq: + continue + + devices[d['eth_addr']] = d + + return devices + + +#----------------------------------------------------------------------------- +def Identify(s, r, eth_addr): + + attempts = 2 + while attempts > 0: + attempts = attempts - 1 + + seq = random.randint(1, 65535) + msg = MkIdentify(seq) + + if (SendMsg(s, msg)): + exp = time.time() + 2 # Longer timeout + + while time.time() < exp: + sys.stdout.write('.') + reply = RecvMsg(r) + if len(reply) != struct.calcsize(HEADER_FMT) + struct.calcsize(IDENTIFY_REPLY_FMT): + continue + + d = UnMkIdentifyReply(reply) + + if d['magic'] != NF_MAGIC: + continue + if d['id'] != NF_IDENTIFY_REPLY: + continue + if d['sequence'] != seq: + continue + if d['eth_addr'] != eth_addr: + continue + + return d + + return {} + + +#----------------------------------------------------------------------------- +def Assignment(s, r, eth_addr, ip_type, ip_addr, netmask, gateway): + + attempts = MAX_ATTEMPTS + while attempts > 0: + attempts = attempts - 1 + + seq = random.randint(1, 65535) + msg = MkAssignment(seq, eth_addr, ip_type, ip_addr, netmask, gateway) + + if (SendMsg(s, msg)): + exp = time.time() + MAX_TIMEOUT + + while time.time() < exp: + sys.stdout.write('.') + reply = RecvMsg(r) + + if len(reply) != struct.calcsize(HEADER_FMT) + struct.calcsize(ASSIGNMENT_REPLY_FMT): + continue + + d = UnMkAssignmentReply(reply) + + if d['magic'] != NF_MAGIC: + continue + if d['id'] != NF_ASSIGNMENT_REPLY: + continue + if d['sequence'] != seq: + continue + if d['eth_addr'] != eth_addr: + continue + + return d + + return {} + + +#----------------------------------------------------------------------------- +def FlashErase(s, r, eth_addr): + + attempts = MAX_ATTEMPTS + while attempts > 0: + attempts = attempts - 1 + + seq = random.randint(1, 65535) + msg = MkFlashErase(seq, eth_addr) + + if (SendMsg(s, msg)): + exp = time.time() + 10 # Flash erase could take a while + + while time.time() < exp: + sys.stdout.write('.') + reply = RecvMsg(r) + if len(reply) != struct.calcsize(HEADER_FMT): + continue + + d = UnMkFlashEraseReply(reply) + + if d['magic'] != NF_MAGIC: + continue + if d['id'] != NF_FLASH_ERASE_REPLY: + continue + if d['sequence'] != seq: + continue + if d['eth_addr'] != eth_addr: + continue + + return d + + return {} + + +#----------------------------------------------------------------------------- +def BlockSize(s, r, eth_addr): + + attempts = MAX_ATTEMPTS + while attempts > 0: + attempts = attempts - 1 + + seq = random.randint(1, 65535) + msg = MkBlockSize(seq, eth_addr) + + if (SendMsg(s, msg)): + exp = time.time() + MAX_TIMEOUT + + while time.time() < exp: + sys.stdout.write('.') + reply = RecvMsg(r) + if len(reply) != struct.calcsize(HEADER_FMT) + struct.calcsize(BLOCK_SIZE_REPLY_FMT): + continue + + d = UnMkBlockSizeReply(reply) + + if d['magic'] != NF_MAGIC: + continue + if d['id'] != NF_BLOCK_SIZE_REPLY: + continue + if d['sequence'] != seq: + continue + if d['eth_addr'] != eth_addr: + continue + + return d + + return {} + + +#----------------------------------------------------------------------------- +def BlockWrite(s, r, eth_addr, memtype, addr, data): + + attempts = MAX_ATTEMPTS + while attempts > 0: + attempts = attempts - 1 + + seq = random.randint(1, 65535) + msg = MkBlockWrite(seq, eth_addr, memtype, addr, data) + + if (SendMsg(s, msg)): + exp = time.time() + MAX_TIMEOUT + + while time.time() < exp: + #sys.stdout.write('.'), + reply = RecvMsg(r) + if len(reply) != struct.calcsize(HEADER_FMT) + struct.calcsize(BLOCK_WRITE_REPLY_FMT): + continue + + d = UnMkBlockWriteReply(reply) + + if d['magic'] != NF_MAGIC: + continue + if d['id'] != NF_BLOCK_WRITE_REPLY: + continue + if d['sequence'] != seq: + continue + if d['eth_addr'] != eth_addr: + continue + + return d + + return {} + + +#----------------------------------------------------------------------------- +def Verify(s, r, eth_addr): + + attempts = MAX_ATTEMPTS + while attempts > 0: + attempts = attempts - 1 + + seq = random.randint(1, 65535) + msg = MkVerify(seq, eth_addr) + + if (SendMsg(s, msg)): + exp = time.time() + MAX_TIMEOUT + + while time.time() < exp: + sys.stdout.write('.') + reply = RecvMsg(r) + if len(reply) != struct.calcsize(HEADER_FMT) + struct.calcsize(VERIFY_REPLY_FMT): + continue + + d = UnMkVerifyReply(reply) + + if d['magic'] != NF_MAGIC: + continue + if d['id'] != NF_VERIFY_REPLY: + continue + if d['sequence'] != seq: + continue + if d['eth_addr'] != eth_addr: + continue + + return d + + return {} + + +#----------------------------------------------------------------------------- +def Reboot(s, eth_addr, reboottype): + seq = random.randint(1, 65535) + msg = MkReboot(seq, eth_addr, reboottype) + + if (SendMsg(s, msg)): + return + + +#----------------------------------------------------------------------------- +def Test(s, r, eth_addr): + seq = random.randint(1, 65535) + msg = MkTest(seq, eth_addr) + + if (SendMsg(s, msg)): + exp = time.time() + MAX_TIMEOUT + + while time.time() < exp: + sys.stdout.write('.') + reply = RecvMsg(r) + if len(reply) != struct.calcsize(HEADER_FMT) + struct.calcsize(TEST_REPLY_FMT): + continue + + d = UnMkTestReply(reply) + + if d['magic'] != NF_MAGIC: + continue + if d['id'] != NF_TEST_REPLY: + continue + if d['sequence'] != seq: + continue + if d['eth_addr'] != eth_addr: + continue + + return d + + return {} + + +#----------------------------------------------------------------------------- +def SetEthernetAddress(s, r, eth_addr, new_eth_addr): + seq = random.randint(1, 65535) + msg = MkSetEthernetAddress(seq, eth_addr, new_eth_addr) + + if (SendMsg(s, msg)): + exp = time.time() + MAX_TIMEOUT + + while time.time() < exp: + sys.stdout.write('.') + reply = RecvMsg(r) + if len(reply) != struct.calcsize(HEADER_FMT): + continue + + d = UnMkSetEthernetAddressReply(reply) + + if d['magic'] != NF_MAGIC: + continue + if d['id'] != NF_SET_ETHERNET_ADDRESS_REPLY: + continue + if d['sequence'] != seq: + continue + if d['eth_addr'] != eth_addr: + continue + + return d + + return {} + +#----------------------------------------------------------------------------- +def FormatEthAddr(a): + return "%02X-%02X-%02X-%02X-%02X-%02X" % (ord(a[0]), ord(a[1]), ord(a[2]), ord(a[3]), ord(a[4]), ord(a[5])) + + +#----------------------------------------------------------------------------- +def PrintDetails(d): + + print() + print("Ethernet Address:", FormatEthAddr(d['eth_addr'])) + print("Hardware:", + socket.inet_ntoa(d['hw_ver']), + "Bootloader:", + socket.inet_ntoa(d['boot_ver']), + "Application:", + socket.inet_ntoa(d['app_ver'])) + print("Uptime:", + d['uptime_days'], + 'days', + d['uptime_hrs'], + 'hours', + d['uptime_min'], + 'minutes', + d['uptime_secs'], + 'seconds') + if d['ip_type'] == NF_IP_STATIC: + print("Static IP") + elif d['ip_type'] == NF_IP_DYNAMIC: + print("Dynamic IP") + else: + print("Unknown IP type") + print("IP Address:", + socket.inet_ntoa(d['ip_addr']), + "Mask:", + socket.inet_ntoa(d['ip_netmask']), + "Gateway:", + socket.inet_ntoa(d['ip_gw'])) + print("Mode:", end = " ") + if d['mode'] == NF_MODE_BOOTLOADER: + print('Bootloader') + elif d['mode'] == NF_MODE_APPLICATION: + print('Application') + else: + print('Unknown') + diff --git a/vendortools/prologic_sim.py b/vendortools/prologic_sim.py new file mode 100644 index 0000000..7d81a2a --- /dev/null +++ b/vendortools/prologic_sim.py @@ -0,0 +1,137 @@ +# Prologic GPIB-LAN simulator +from datetime import datetime +import socket + +NETFINDER_SERVER_PORT = 3040 + +NF_IDENTIFY = 0 +NF_IDENTIFY_REPLY = 1 +NF_ASSIGNMENT = 2 +NF_ASSIGNMENT_REPLY = 3 +NF_FLASH_ERASE = 4 +NF_FLASH_ERASE_REPLY = 5 +NF_BLOCK_SIZE = 6 +NF_BLOCK_SIZE_REPLY = 7 +NF_BLOCK_WRITE = 8 +NF_BLOCK_WRITE_REPLY = 9 +NF_VERIFY = 10 +NF_VERIFY_REPLY = 11 +NF_REBOOT = 12 +NF_SET_ETHERNET_ADDRESS = 13 +NF_SET_ETHERNET_ADDRESS_REPLY = 14 +NF_TEST = 15 +NF_TEST_REPLY = 16 + +NF_SUCCESS = 0 +NF_CRC_MISMATCH = 1 +NF_INVALID_MEMORY_TYPE = 2 +NF_INVALID_SIZE = 3 +NF_INVALID_IP_TYPE = 4 + +NF_MAGIC = 0x5A + +NF_IP_DYNAMIC = 0 +NF_IP_STATIC = 1 + +NF_ALERT_OK = 0x00 +NF_ALERT_WARN = 0x01 +NF_ALERT_ERROR = 0xFF + +NF_MODE_BOOTLOADER = 0 +NF_MODE_APPLICATION = 1 + +NF_MEMORY_FLASH = 0 +NF_MEMORY_EEPROM = 1 + +NF_REBOOT_CALL_BOOTLOADER = 0 +NF_REBOOT_RESET = 1 + + +HEADER_FMT = "!2cH6s2x" +IDENTIFY_FMT = HEADER_FMT +IDENTIFY_REPLY_FMT = "!H6c4s4s4s4s4s4s32s" +ASSIGNMENT_FMT = "!3xc4s4s4s32x" +ASSIGNMENT_REPLY_FMT = "!c3x" +FLASH_ERASE_FMT = HEADER_FMT +FLASH_ERASE_REPLY_FMT = HEADER_FMT +BLOCK_SIZE_FMT = HEADER_FMT +BLOCK_SIZE_REPLY_FMT = "!H2x" +BLOCK_WRITE_FMT = "!cxHI" +BLOCK_WRITE_REPLY_FMT = "!c3x" +VERIFY_FMT = HEADER_FMT +VERIFY_REPLY_FMT = "!c3x" +REBOOT_FMT = "!c3x" +SET_ETHERNET_ADDRESS_FMT = "!6s2x" +SET_ETHERNET_ADDRESS_REPLY_FMT = HEADER_FMT +TEST_FMT = HEADER_FMT +TEST_REPLY_FMT = "!32s" + +MAX_ATTEMPTS = 10 +MAX_TIMEOUT = 0.5 + + +class gpib_eth(): + """ + Prologix GPIB Ethernet Simulator Version 1.0 + Emulates HW Version 1.2; SW Version 1.6.6.0; BOOT Version ?; + """ + def __init__(self): + self.up_time = datetime.now() + self.ip_addr = bytes([192, 168, 0, 128]) + self.mode = NF_MODE_APPLICATION # [BOOTLOADER, APPLICATION] + self.alert = NF_ALERT_OK + self.ip_type = NF_IP_DYNAMIC # [DYNAMIC, STATIC] + self.eth_addr = b'\xF0' * 6 # MAC address 6 bytes + self.ip_netmask = bytes([255, 255, 255, 0]) + self.ip_gateway = bytes([192, 168, 0, 1]) + self.app_version = bytes([1, 6, 6, 0]) + self.bootloader_version = bytes([1, 0, 0, 0]) # ?? + self.hw_version = bytes([1, 2, 0, 0]) + self.name = "Virtual Prologic GPIB-Eth Device"# 32 char string + + def identify(self, seq): + # packet header (HEADER_FMT): + # 1 byte - NF_MAGIC + # 1 byte - NF_IDENTIFY_REPLY + # 2 byte - seq (unsigned short) + # 6 byte - eth_addr + # 2 byte - 2x Null (pad bytes) + # + # identify body (IDENTIFY_REPLY_FMT): + # 1 byte - Uptime days + # 1 byte - Uptime hours + # 1 byte - Uptime minutes + # 1 byte - Uptime seconds + # 1 byte - self.mode + # 1 byte - self.alert + # 1 byte - self.ip_type + # 4 byte - self.ip_addr + # 4 byte - self.ip_gateway + # 4 byte - self.app_version + # 4 byte - self.boootloader_version + # 4 byte - self.hw_version + # 32 byte - self.name + pass # To Do + + def send(self, msg): + """ + Send data (msg) + """ + + def receive(self): + """ + Receive data (msg) + """ + # Identify Command (IDENTIFY_FMT): + # 1 byte - NF_MAGIC + # 1 byte - NF_IDENTIFY + # 2 byte - sequence ID (unsigned short) + # 6 byte - Eth addr ((b'\xFF' * 6) for broadcast) + # 2 byte - Null byte x2 (pad bytes) [EOM ?] + rcv_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + rcv_sock.bind(('', NETFINDER_SERVER_PORT)) + msg = rcv_sock.recvfrom(1024) + return msg + + +