# Prologic GPIB-LAN simulator from datetime import datetime import socket NETFINDER_SERVER_PORT = 3040 NF_IDENTIFY = 0 NF_IDENTIFY_REPLY = 1 NF_ASSIGNMENT = 2 NF_ASSIGNMENT_REPLY = 3 NF_FLASH_ERASE = 4 NF_FLASH_ERASE_REPLY = 5 NF_BLOCK_SIZE = 6 NF_BLOCK_SIZE_REPLY = 7 NF_BLOCK_WRITE = 8 NF_BLOCK_WRITE_REPLY = 9 NF_VERIFY = 10 NF_VERIFY_REPLY = 11 NF_REBOOT = 12 NF_SET_ETHERNET_ADDRESS = 13 NF_SET_ETHERNET_ADDRESS_REPLY = 14 NF_TEST = 15 NF_TEST_REPLY = 16 NF_SUCCESS = 0 NF_CRC_MISMATCH = 1 NF_INVALID_MEMORY_TYPE = 2 NF_INVALID_SIZE = 3 NF_INVALID_IP_TYPE = 4 NF_MAGIC = 0x5A NF_IP_DYNAMIC = 0 NF_IP_STATIC = 1 NF_ALERT_OK = 0x00 NF_ALERT_WARN = 0x01 NF_ALERT_ERROR = 0xFF NF_MODE_BOOTLOADER = 0 NF_MODE_APPLICATION = 1 NF_MEMORY_FLASH = 0 NF_MEMORY_EEPROM = 1 NF_REBOOT_CALL_BOOTLOADER = 0 NF_REBOOT_RESET = 1 HEADER_FMT = "!2cH6s2x" IDENTIFY_FMT = HEADER_FMT IDENTIFY_REPLY_FMT = "!H6c4s4s4s4s4s4s32s" ASSIGNMENT_FMT = "!3xc4s4s4s32x" ASSIGNMENT_REPLY_FMT = "!c3x" FLASH_ERASE_FMT = HEADER_FMT FLASH_ERASE_REPLY_FMT = HEADER_FMT BLOCK_SIZE_FMT = HEADER_FMT BLOCK_SIZE_REPLY_FMT = "!H2x" BLOCK_WRITE_FMT = "!cxHI" BLOCK_WRITE_REPLY_FMT = "!c3x" VERIFY_FMT = HEADER_FMT VERIFY_REPLY_FMT = "!c3x" REBOOT_FMT = "!c3x" SET_ETHERNET_ADDRESS_FMT = "!6s2x" SET_ETHERNET_ADDRESS_REPLY_FMT = HEADER_FMT TEST_FMT = HEADER_FMT TEST_REPLY_FMT = "!32s" MAX_ATTEMPTS = 10 MAX_TIMEOUT = 0.5 class nf_header(): """ Wrapper for netfind header packets """ def __init__(self, hdr=None): if hdr is None: # create a default header hdr = bytes([NF_MAGIC, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0]) if type(hdr) != bytes: raise TypeError("hdr must by bytes object") if len(hdr) != bytes: raise ValueError("hdr must be 12 bytes long") if hdr[0] != NF_MAGIC: raise ValueError("First byte of hdr must be NF_MAGIC byte") if hdr[10] != 0 and hdr[11] != 0: raise ValueError("Last two bytes of hdr should be NULL") self.__header = bytearray(hdr) @property def id(self): return self.__header[0] @property def command(self): return self.__header[1] @command.setter def command(self, cmd): if type(cmd) not in (bytes, int): raise TypeError("cmd must be or ") if type(cmd) == bytes: if len(cmd) != 1: raise ValueError("cmd must be between 0 and 16") cmd = ord(cmd) # convert bytes to int if type(cmd) == int: if cmd < 0 or cmd > 16: raise ValueError("cmd must be > 0 and < 16") self.__header[1] = cmd @property def seq(self): return int.from_bytes(self.__header[2:4], "big") @seq.setter def seq(self, seq): if type(seq) not in (bytes, int): raise TypeError("Seq must be int or bytes object") if type(seq) == bytes: if len(seq) != 2: raise ValueError("Seq of type must be 2 bytes") seq = int.from_bytes(seq, "big") if type(seq) == int and (seq < 1 or seq > 65535): raise ValueError("Seq must be greater than 0 and less than 65536") # by this point seq should be a valid int between 1 and 65535 self.__header[2:4] = seq.to_bytes(length=2, byteorder="big" signed=False) @property def target(self): return self.__header[4:10] @target.setter def target(self, addr): if type(addr) != bytes: raise TypeError("addr must be bytes object") if len(addr) != 6: raise ValueError("addr must be 6 bytes") self.__header[4:10] = addr @property def pad(self): return self.__header[10:] @property def bytes(self): return bytes(self.__header) class gpib_eth(): """ Prologix GPIB Ethernet Simulator Version 1.0 Emulates HW Version 1.2; SW Version 1.6.6.0; BOOT Version ?; """ def __init__(self): self.up_time = datetime.now() self.ip_addr = bytes([192, 168, 0, 128]) self.mode = NF_MODE_APPLICATION # [BOOTLOADER, APPLICATION] self.alert = NF_ALERT_OK self.ip_type = NF_IP_DYNAMIC # [DYNAMIC, STATIC] self.eth_addr = b'\xF0' * 6 # MAC address 6 bytes self.ip_netmask = bytes([255, 255, 255, 0]) self.ip_gateway = bytes([192, 168, 0, 1]) self.app_version = bytes([1, 6, 6, 0]) self.bootloader_version = bytes([1, 0, 0, 0]) # ?? self.hw_version = bytes([1, 2, 0, 0]) self.name = "Virtual Prologic GPIB-Eth Device"# 32 char string def identify(self, seq): # packet header (HEADER_FMT): # 1 byte - NF_MAGIC # 1 byte - NF_IDENTIFY_REPLY # 2 byte - seq (unsigned short) # 6 byte - eth_addr # 2 byte - 2x Null (pad bytes) # # identify body (IDENTIFY_REPLY_FMT): # 1 byte - Uptime days # 1 byte - Uptime hours # 1 byte - Uptime minutes # 1 byte - Uptime seconds # 1 byte - self.mode # 1 byte - self.alert # 1 byte - self.ip_type # 4 byte - self.ip_addr # 4 byte - self.ip_gateway # 4 byte - self.app_version # 4 byte - self.boootloader_version # 4 byte - self.hw_version # 32 byte - self.name pass # To Do def send(self, msg): """ Send data (msg) """ def monitor_broadcast(self): broadcast_msg = self.receive('', NETFINDER_SERVER_PORT) return broadcast_msg def receive(self, addr, port, buffer=1024): """ Receive data (msg) """ # Identify Command (IDENTIFY_FMT): # 1 byte - NF_MAGIC # 1 byte - NF_IDENTIFY # 2 byte - sequence ID (unsigned short) # 6 byte - Eth addr ((b'\xFF' * 6) for broadcast) # 2 byte - Null byte x2 (pad bytes) [EOM ?] rcv_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) rcv_sock.bind((addr, port)) netpack = rcv_sock.recvfrom(buffer) if len(netpack) => 12 and netpack[0] == NF_MAGIC: packet = {} header = nf_header(netpack[0:12]) # Header format: 2 char (bytes) - NF_MAGIC + Command return msg