Update for Python3

This commit is contained in:
S Groesz 2020-11-10 01:47:47 +00:00
parent fbfb32f8b1
commit 85aaf1385e
7 changed files with 936 additions and 722 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
__pycache__
*.bak
*.swp
*.egg-info
*.bak2

View File

@ -58,7 +58,7 @@ class Interface:
while (self.running):
tty_in = os.read(self.m, readLen)
for tty_out in self.handleInput(tty_in):
os.write(self.m, tty_out)
os.write(self.m, tty_out.encode())
##%% handleInput(tty_in)
@ -67,7 +67,7 @@ class Interface:
## address. That device must handle the command via dev.handleCMD(cmd, args)
##
def handleInput(self, tty_in):
tty_in = tty_in.strip()
tty_in = tty_in.strip().decode()
if tty_in != "" and self.addr in self.devices:
dev = self.devices[self.addr]

View File

@ -0,0 +1,36 @@
# Use those functions to enumerate all interfaces available on the system using Python.
# found on <http://code.activestate.com/recipes/439093/#c1>
import socket
import fcntl
import struct
import array
def all_interfaces():
max_possible = 128 # arbitrary. raise if needed.
bytes = max_possible * 32
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
names = array.array('B', '\0' * bytes)
outbytes = struct.unpack('iL', fcntl.ioctl(
s.fileno(),
0x8912, # SIOCGIFCONF
struct.pack('iL', bytes, names.buffer_info()[0])
))[0]
namestr = names.tostring()
lst = []
for i in range(0, outbytes, 40):
name = namestr[i:i+16].split('\0', 1)[0]
ip = namestr[i+20:i+24]
lst.append((name, ip))
return lst
def format_ip(addr):
return str(ord(addr[0])) + '.' + \
str(ord(addr[1])) + '.' + \
str(ord(addr[2])) + '.' + \
str(ord(addr[3]))
ifs = all_interfaces()
for i in ifs:
print "%12s %s" % (i[0], format_ip(i[1]))

View File

@ -51,6 +51,7 @@ IOCTL_SIOCGIFCONF = 0x8912
def enumIpUnix():
import fcntl
# struct_size 40 if system is 64 bit
struct_size = 40 if (sys.maxsize > 2**32) else 32
inbytes = 128 * struct_size # Maximum 128 interfaces
@ -68,3 +69,4 @@ def enumIpUnix():
struct_size)]
return [ip for ip in iplist if ip != '127.0.0.1']

View File

@ -11,16 +11,18 @@ from enumip import *
#-----------------------------------------------------------------------------
def usage():
print
print "Usage: nfcli --list --eth_addr=ADDR --ip_type=TYPE --ip_addr=IP, --netmask=MASK, --gateway=GW"
print "Search and configure Prologix GPIB-ETHERNET Controllers."
print "--help : display this help"
print "--list : search for controllers"
print "--eth_addr=ADDR : configure controller with Ethernet address ADDR"
print "--ip_type=TYPE : set controller ip address type to TYPE (\"static\" or \"dhcp\")"
print "--ip_addr=IP : set controller address to IP"
print "--netmask=MASK : set controller network mask to MASK"
print "--gateway=GW : set controller default gateway to GW"
print()
print("Usage: nfcli --list --eth_addr=ADDR --ip_type=TYPE --ip_addr=IP,",
"--netmask=MASK, --gateway=GW")
print("Search and configure Prologix GPIB-ETHERNET Controllers.")
print("--help : display this help")
print("--list : search for controllers")
print("--eth_addr=ADDR : configure controller with Ethernet address ADDR")
print('--ip_type=TYPE : set controller ip address type to TYPE',
'("static" or "dhcp")')
print("--ip_addr=IP : set controller address to IP")
print("--netmask=MASK : set controller network mask to MASK")
print("--gateway=GW : set controller default gateway to GW")
#-----------------------------------------------------------------------------
@ -37,19 +39,19 @@ def ValidateNetParams(ip_str, mask_str, gw_str):
try:
ip = socket.inet_aton(ip_str)
except:
print "IP address is invalid."
print("IP address is invalid.")
return False
try:
mask = socket.inet_aton(mask_str)
except:
print "Network mask is invalid."
print("Network mask is invalid.")
return False
try:
gw = socket.inet_aton(gw_str)
except:
print "Gateway address is invalid."
print("Gateway address is invalid.")
return False
# Validate network mask
@ -59,12 +61,12 @@ def ValidateNetParams(ip_str, mask_str, gw_str):
# Exclude restricted masks
if (mask == 0) or (mask == 0xFFFFFFFF):
print "Network mask is invalid."
print("Network mask is invalid.")
return False
# Exclude non-left-contiguous masks
if (((mask + (mask & -mask)) & 0xFFFFFFFF) != 0):
print "Network mask is not contiguous."
print("Network mask is not contiguous.")
return False
@ -78,7 +80,7 @@ def ValidateNetParams(ip_str, mask_str, gw_str):
# Exclude restricted addresses
# 0.0.0.0 is valid
if ((gw != 0) and ((octet1 == 0) or (octet1 == 127) or (octet1 > 223))):
print "Gateway address is invalid."
print("Gateway address is invalid.")
return False
# Validate IP address
@ -90,17 +92,17 @@ def ValidateNetParams(ip_str, mask_str, gw_str):
# Exclude restricted addresses
if ((octet1 == 0) or (octet1 == 127) or (octet1 > 223)):
print "IP address is invalid."
print("IP address is invalid.")
return False
# Exclude subnet network address
if ((ip & ~mask) == 0):
print "IP address is invalid."
print("IP address is invalid.")
return False
# Exclude subnet broadcast address
if ((ip & ~mask) == (0xFFFFFFFF & ~mask)):
print "IP address is invalid."
print("IP address is invalid.")
return False
return True
@ -139,9 +141,16 @@ def main():
eth_addr = None
try:
opts, args = getopt.getopt(sys.argv[1:], '', ['help', 'list', 'eth_addr=', 'ip_type=', 'ip_addr=', 'netmask=', 'gateway='])
except getopt.GetoptError, err:
print str(err)
opts, args = getopt.getopt(sys.argv[1:], '',
['help',
'list',
'eth_addr=',
'ip_type=',
'ip_addr=',
'netmask=',
'gateway='])
except getopt.GetoptError as err:
print(err)
sys.exit(1)
# Check for unparsed parameters
@ -171,19 +180,19 @@ def main():
if search:
if not eth_addr is None:
print "--list and --eth_addr are not compatible."
print("--list and --eth_addr are not compatible.")
invalid_args = True
if not ip_type is None:
print "--list and --ip_type are not compatible."
print("--list and --ip_type are not compatible.")
invalid_args = True
if not ip_addr is None:
print "--list and --ip_addr are not compatible."
print("--list and --ip_addr are not compatible.")
invalid_args = True
if not netmask is None:
print "--list and --netmask are not compatible."
print("--list and --netmask are not compatible.")
invalid_args = True
if not gateway is None:
print "--list and --gateway are not compatible."
print("--list and --gateway are not compatible.")
invalid_args = True
else:
@ -191,19 +200,19 @@ def main():
eth_addr = eth_addr.strip().replace(":", "").replace("-", "")
eth_addr = eth_addr.decode('hex')
except:
print "Invalid Ethernet address."
print("Invalid Ethernet address.")
sys.exit(1)
if len(eth_addr) != 6:
print "Invalid Ethernet address."
print("Invalid Ethernet address.")
sys.exit(1)
if ip_type in ["Static", "static"]:
if ip_type.lower() in ["static"]:
ip_type = NF_IP_STATIC
elif ip_type in ["Dynamic", "dynamic", "Dhcp", "dhcp"]:
elif ip_type.lower in ["dynamic", "dhcp"]:
ip_type = NF_IP_DYNAMIC
else:
print "--ip_type must be 'static' or 'dhcp'."
print("--ip_type must be 'static' or 'dhcp'.")
sys.exit(1)
if ip_type == NF_IP_STATIC:
@ -227,17 +236,17 @@ def main():
if ip_addr is None:
ip_addr = "0.0.0.0"
else:
print "--ip_addr not allowed when --ip_type=dhcp."
print("--ip_addr not allowed when --ip_type=dhcp.")
invalid_args = True
if netmask is None:
netmask = "0.0.0.0"
else:
print "--netmask not allowed when --ip_type=dhcp."
print("--netmask not allowed when --ip_type=dhcp.")
invalid_args = True
if gateway is None:
gateway = "0.0.0.0"
else:
print "--gateway not allowed when --ip_type=dhcp."
print("--gateway not allowed when --ip_type=dhcp.")
invalid_args = True
if invalid_args:
@ -246,18 +255,18 @@ def main():
global seq
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
# sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
iplist = enumIp();
if len(iplist) == 0:
print "Host has no IP address."
print("Host has no IP address.")
sys.exit(1)
devices = {}
for ip in iplist:
print "Searching through network interface:", ip
print("Searching through network interface:", ip)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@ -267,8 +276,8 @@ def main():
try:
s.bind((ip, port))
except socket.error, e:
print "Bind error on send socket:", e
except socket.error as e:
print("Bind error on send socket:", e)
sys.exit(1)
port = s.getsockname()[1]
@ -281,12 +290,12 @@ def main():
try:
r.bind(('', port))
except socket.error, e:
print "Bind error on receive socket:", e
except socket.error as e:
print("Bind error on receive socket:", e)
sys.exit(1)
d = Discover(s, r)
print
print()
for k in d:
d[k]['host_ip'] = ip
@ -296,15 +305,16 @@ def main():
r.close()
if search:
print
print "Found", len(devices), "Prologix GPIB-ETHERNET Controller(s)."
print()
print("Found", len(devices), "Prologix GPIB-ETHERNET Controller(s).")
for key in devices:
PrintDetails(devices[key])
print
print()
else:
if eth_addr in devices:
print "Updating network settings of Prologix GPIB-ETHERNET Controller", FormatEthAddr(eth_addr)
print("Updating Prologix GPIB-ETHERNET Controller settings",
FormatEthAddr(eth_addr))
device = devices[eth_addr]
@ -317,8 +327,8 @@ def main():
try:
s.bind((device['host_ip'], port))
except socket.error, e:
print "Bind error on send socket:", e
except socket.error as e:
print("Bind error on send socket:", e)
sys.exit(1)
port = s.getsockname()[1]
@ -331,24 +341,28 @@ def main():
try:
r.bind(('', port))
except socket.error, e:
print "Bind error on receive socket:", e
except socket.error as e:
print("Bind error on receive socket:", e)
sys.exit(1)
result = Assignment(s, r, eth_addr, ip_type, ip_addr, netmask, gateway)
print
print()
if len(result) == 0:
print "Network settings update failed."
print("Network settings update failed.")
else:
if result['result'] == NF_SUCCESS:
print "Network settings updated successfully."
print("Network settings updated successfully.")
else:
print "Network settings update failed."
print("Network settings update failed.")
else:
print "Prologix GPIB-ETHERNET Controller", FormatEthAddr(eth_addr), "already configured for DHCP."
print("Prologix GPIB-ETHERNET Controller",
FormatEthAddr(eth_addr),
"already configured for DHCP.")
else:
print "Prologix GPIB-ETHERNET Controller", FormatEthAddr(eth_addr), "not found."
print("Prologix GPIB-ETHERNET Controller",
FormatEthAddr(eth_addr),
"not found.")

View File

@ -1,3 +1,5 @@
# Python >= 3.6
import struct
import random
import socket
@ -76,19 +78,19 @@ MAX_TIMEOUT = 0.5
#-----------------------------------------------------------------------------
def MkHeader(id, seq, eth_addr):
def MkHeader(header, seq, eth_addr):
return struct.pack(
HEADER_FMT,
chr(NF_MAGIC),
chr(id),
bytes([NF_MAGIC]),
bytes([header]),
seq,
eth_addr
);
)
#-----------------------------------------------------------------------------
def MkIdentify(seq):
return MkHeader(NF_IDENTIFY, seq, '\xFF\xFF\xFF\xFF\xFF\xFF')
return MkHeader(NF_IDENTIFY, seq, bytes([0xFF]) * 6)
#-----------------------------------------------------------------------------
@ -137,7 +139,7 @@ def MkReboot(seq, eth_addr, reboottype):
struct.pack(
REBOOT_FMT,
chr(reboottype)
);
)
#-----------------------------------------------------------------------------
@ -151,7 +153,7 @@ def MkSetEthernetAddress(seq, eth_addr, new_eth_addr):
struct.pack(
SET_ETHERNET_ADDRESS_FMT,
new_eth_addr
);
)
#-----------------------------------------------------------------------------
@ -159,7 +161,7 @@ def UnMkHeader(msg):
params = struct.unpack(
HEADER_FMT,
msg
);
)
d = {}
d['magic'] = ord(params[0])
@ -178,7 +180,7 @@ def UnMkIdentifyReply(msg):
params = struct.unpack(
IDENTIFY_REPLY_FMT,
msg[hdrlen:]
);
)
d['uptime_days'] = params[0]
d['uptime_hrs'] = ord(params[1])
@ -206,7 +208,7 @@ def UnMkAssignmentReply(msg):
params = struct.unpack(
ASSIGNMENT_REPLY_FMT,
msg[hdrlen:]
);
)
d['result'] = ord(params[0])
return d
@ -226,7 +228,7 @@ def UnMkBlockSizeReply(msg):
params = struct.unpack(
BLOCK_SIZE_REPLY_FMT,
msg[hdrlen:]
);
)
d['size'] = params[0]
return d
@ -241,7 +243,7 @@ def UnMkBlockWriteReply(msg):
params = struct.unpack(
BLOCK_WRITE_REPLY_FMT,
msg[hdrlen:]
);
)
d['result'] = ord(params[0])
return d
@ -256,7 +258,7 @@ def UnMkVerifyReply(msg):
params = struct.unpack(
VERIFY_REPLY_FMT,
msg[hdrlen:]
);
)
d['result'] = ord(params[0])
return d
@ -271,7 +273,7 @@ def UnMkTestReply(msg):
params = struct.unpack(
TEST_REPLY_FMT,
msg[hdrlen:]
);
)
result = ''
for i in params[0]:
@ -293,11 +295,11 @@ def SendMsg(s, msg):
try:
s.sendto(msg, ('<broadcast>', NETFINDER_SERVER_PORT))
except socket.error, e:
print e
return False;
except socket.error as e:
print(e)
return False
return True;
return True
#-----------------------------------------------------------------------------
def RecvMsg(s):
@ -627,22 +629,40 @@ def FormatEthAddr(a):
#-----------------------------------------------------------------------------
def PrintDetails(d):
print
print "Ethernet Address:", FormatEthAddr(d['eth_addr'])
print "Hardware:", socket.inet_ntoa(d['hw_ver']), "Bootloader:", socket.inet_ntoa(d['boot_ver']), "Application:", socket.inet_ntoa(d['app_ver'])
print "Uptime:", d['uptime_days'], 'days', d['uptime_hrs'], 'hours', d['uptime_min'], 'minutes', d['uptime_secs'], 'seconds'
print()
print("Ethernet Address:", FormatEthAddr(d['eth_addr']))
print("Hardware:",
socket.inet_ntoa(d['hw_ver']),
"Bootloader:",
socket.inet_ntoa(d['boot_ver']),
"Application:",
socket.inet_ntoa(d['app_ver']))
print("Uptime:",
d['uptime_days'],
'days',
d['uptime_hrs'],
'hours',
d['uptime_min'],
'minutes',
d['uptime_secs'],
'seconds')
if d['ip_type'] == NF_IP_STATIC:
print "Static IP"
print("Static IP")
elif d['ip_type'] == NF_IP_DYNAMIC:
print "Dynamic IP"
print("Dynamic IP")
else:
print "Unknown IP type"
print "IP Address:", socket.inet_ntoa(d['ip_addr']), "Mask:", socket.inet_ntoa(d['ip_netmask']), "Gateway:", socket.inet_ntoa(d['ip_gw'])
print "Mode:",
print("Unknown IP type")
print("IP Address:",
socket.inet_ntoa(d['ip_addr']),
"Mask:",
socket.inet_ntoa(d['ip_netmask']),
"Gateway:",
socket.inet_ntoa(d['ip_gw']))
print("Mode:", end = " ")
if d['mode'] == NF_MODE_BOOTLOADER:
print 'Bootloader'
print('Bootloader')
elif d['mode'] == NF_MODE_APPLICATION:
print 'Application'
print('Application')
else:
print 'Unknown'
print('Unknown')

137
vendortools/prologic_sim.py Normal file
View File

@ -0,0 +1,137 @@
# Prologic GPIB-LAN simulator
from datetime import datetime
import socket
NETFINDER_SERVER_PORT = 3040
NF_IDENTIFY = 0
NF_IDENTIFY_REPLY = 1
NF_ASSIGNMENT = 2
NF_ASSIGNMENT_REPLY = 3
NF_FLASH_ERASE = 4
NF_FLASH_ERASE_REPLY = 5
NF_BLOCK_SIZE = 6
NF_BLOCK_SIZE_REPLY = 7
NF_BLOCK_WRITE = 8
NF_BLOCK_WRITE_REPLY = 9
NF_VERIFY = 10
NF_VERIFY_REPLY = 11
NF_REBOOT = 12
NF_SET_ETHERNET_ADDRESS = 13
NF_SET_ETHERNET_ADDRESS_REPLY = 14
NF_TEST = 15
NF_TEST_REPLY = 16
NF_SUCCESS = 0
NF_CRC_MISMATCH = 1
NF_INVALID_MEMORY_TYPE = 2
NF_INVALID_SIZE = 3
NF_INVALID_IP_TYPE = 4
NF_MAGIC = 0x5A
NF_IP_DYNAMIC = 0
NF_IP_STATIC = 1
NF_ALERT_OK = 0x00
NF_ALERT_WARN = 0x01
NF_ALERT_ERROR = 0xFF
NF_MODE_BOOTLOADER = 0
NF_MODE_APPLICATION = 1
NF_MEMORY_FLASH = 0
NF_MEMORY_EEPROM = 1
NF_REBOOT_CALL_BOOTLOADER = 0
NF_REBOOT_RESET = 1
HEADER_FMT = "!2cH6s2x"
IDENTIFY_FMT = HEADER_FMT
IDENTIFY_REPLY_FMT = "!H6c4s4s4s4s4s4s32s"
ASSIGNMENT_FMT = "!3xc4s4s4s32x"
ASSIGNMENT_REPLY_FMT = "!c3x"
FLASH_ERASE_FMT = HEADER_FMT
FLASH_ERASE_REPLY_FMT = HEADER_FMT
BLOCK_SIZE_FMT = HEADER_FMT
BLOCK_SIZE_REPLY_FMT = "!H2x"
BLOCK_WRITE_FMT = "!cxHI"
BLOCK_WRITE_REPLY_FMT = "!c3x"
VERIFY_FMT = HEADER_FMT
VERIFY_REPLY_FMT = "!c3x"
REBOOT_FMT = "!c3x"
SET_ETHERNET_ADDRESS_FMT = "!6s2x"
SET_ETHERNET_ADDRESS_REPLY_FMT = HEADER_FMT
TEST_FMT = HEADER_FMT
TEST_REPLY_FMT = "!32s"
MAX_ATTEMPTS = 10
MAX_TIMEOUT = 0.5
class gpib_eth():
"""
Prologix GPIB Ethernet Simulator Version 1.0
Emulates HW Version 1.2; SW Version 1.6.6.0; BOOT Version ?;
"""
def __init__(self):
self.up_time = datetime.now()
self.ip_addr = bytes([192, 168, 0, 128])
self.mode = NF_MODE_APPLICATION # [BOOTLOADER, APPLICATION]
self.alert = NF_ALERT_OK
self.ip_type = NF_IP_DYNAMIC # [DYNAMIC, STATIC]
self.eth_addr = b'\xF0' * 6 # MAC address 6 bytes
self.ip_netmask = bytes([255, 255, 255, 0])
self.ip_gateway = bytes([192, 168, 0, 1])
self.app_version = bytes([1, 6, 6, 0])
self.bootloader_version = bytes([1, 0, 0, 0]) # ??
self.hw_version = bytes([1, 2, 0, 0])
self.name = "Virtual Prologic GPIB-Eth Device"# 32 char string
def identify(self, seq):
# packet header (HEADER_FMT):
# 1 byte - NF_MAGIC
# 1 byte - NF_IDENTIFY_REPLY
# 2 byte - seq (unsigned short)
# 6 byte - eth_addr
# 2 byte - 2x Null (pad bytes)
#
# identify body (IDENTIFY_REPLY_FMT):
# 1 byte - Uptime days
# 1 byte - Uptime hours
# 1 byte - Uptime minutes
# 1 byte - Uptime seconds
# 1 byte - self.mode
# 1 byte - self.alert
# 1 byte - self.ip_type
# 4 byte - self.ip_addr
# 4 byte - self.ip_gateway
# 4 byte - self.app_version
# 4 byte - self.boootloader_version
# 4 byte - self.hw_version
# 32 byte - self.name
pass # To Do
def send(self, msg):
"""
Send data (msg)
"""
def receive(self):
"""
Receive data (msg)
"""
# Identify Command (IDENTIFY_FMT):
# 1 byte - NF_MAGIC
# 1 byte - NF_IDENTIFY
# 2 byte - sequence ID (unsigned short)
# 6 byte - Eth addr ((b'\xFF' * 6) for broadcast)
# 2 byte - Null byte x2 (pad bytes) [EOM ?]
rcv_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
rcv_sock.bind(('<broadcast>', NETFINDER_SERVER_PORT))
msg = rcv_sock.recvfrom(1024)
return msg