Extra functions
Pymodbus: Modbus Protocol Implementation.
Released under the BSD license
- class pymodbus.ExceptionResponse(function_code: int, exception_code: int = 0, slave: int = 1, transaction: int = 0)
Bases:
ModbusPDU
Base class for a modbus exception PDU.
- ACKNOWLEDGE = 5
- GATEWAY_NO_RESPONSE = 11
- GATEWAY_PATH_UNAVIABLE = 10
- ILLEGAL_ADDRESS = 2
- ILLEGAL_FUNCTION = 1
- ILLEGAL_VALUE = 3
- MEMORY_PARITY_ERROR = 8
- NEGATIVE_ACKNOWLEDGE = 7
- SLAVE_BUSY = 6
- SLAVE_FAILURE = 4
- decode(data: bytes) None
Decode a modbus exception response.
- encode() bytes
Encode a modbus exception response.
- rtu_frame_size: int = 5
- class pymodbus.FramerType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)
Bases:
str
,Enum
Type of Modbus frame.
- ASCII = 'ascii'
- RTU = 'rtu'
- SOCKET = 'socket'
- TLS = 'tls'
- exception pymodbus.ModbusException(string)
Bases:
Exception
Base modbus exception.
- isError()
Error
- pymodbus.pymodbus_apply_logging_config(level: str | int = 10, log_file_name: str | None = None)
Apply basic logging configuration used by default by Pymodbus maintainers.
- Parameters:
level – (optional) set log level, if not set it is inherited.
log_file_name – (optional) log additional to file
Please call this function to format logging appropriately when opening issues.
Modbus Device Controller.
These are the device management handlers. They should be maintained in the server context and the various methods should be inserted in the correct locations.
- class pymodbus.device.DeviceInformationFactory
Bases:
object
This is a helper.
That really just hides some of the complexity of processing the device information requests (function code 0x2b 0x0e).
- classmethod get(control, read_code=DeviceInformation.BASIC, object_id=0)
Get the requested device data from the system.
- Parameters:
control – The control block to pull data from
read_code – The read code to process
object_id – The specific object_id to read
- Returns:
The requested data (id, length, value)
- class pymodbus.device.ModbusDeviceIdentification(info=None, info_name=None)
Bases:
object
This is used to supply the device identification.
For the readDeviceIdentification function
For more information read section 6.21 of the modbus application protocol.
- property MajorMinorRevision
- property ModelName
- property ProductCode
- property ProductName
- property UserApplicationName
- property VendorName
- property VendorUrl
- summary()
Return a summary of the main items.
- Returns:
An dictionary of the main items
- update(value)
Update the values of this identity.
using another identify as the value
- Parameters:
value – The value to copy values from
- class pymodbus.device.ModbusPlusStatistics
Bases:
object
This is used to maintain the current modbus plus statistics count.
As of right now this is simply a stub to complete the modbus implementation. For more information, see the modbus implementation guide page 87.
- encode()
Return a summary of the modbus plus statistics.
- Returns:
54 16-bit words representing the status
- reset()
Clear all of the modbus plus statistics.
- summary()
Return a summary of the modbus plus statistics.
- Returns:
54 16-bit words representing the status
Modbus Remote Events.
An event byte returned by the Get Communications Event Log function can be any one of four types. The type is defined by bit 7 (the high-order bit) in each byte. It may be further defined by bit 6.
- class pymodbus.events.CommunicationRestartEvent
Bases:
ModbusEvent
Restart remote device Initiated Communication.
The remote device stores this type of event byte when its communications port is restarted. The remote device can be restarted by the Diagnostics function (code 08), with sub-function Restart Communications Option (code 00 01).
That function also places the remote device into a “Continue on Error” or “Stop on Error” mode. If the remote device is placed into “Continue on Error” mode, the event byte is added to the existing event log. If the remote device is placed into “Stop on Error” mode, the byte is added to the log and the rest of the log is cleared to zeros.
The event is defined by a content of zero.
- decode(event)
Decode the event message to its status bits.
- Parameters:
event – The event to decode
- Raises:
- encode()
Encode the status bits to an event message.
- Returns:
The encoded event message
- value = 0
- class pymodbus.events.EnteredListenModeEvent
Bases:
ModbusEvent
Enter Remote device Listen Only Mode.
The remote device stores this type of event byte when it enters the Listen Only Mode. The event is defined by a content of 04 hex.
- decode(event)
Decode the event message to its status bits.
- Parameters:
event – The event to decode
- Raises:
- encode()
Encode the status bits to an event message.
- Returns:
The encoded event message
- value = 4
- class pymodbus.events.ModbusEvent
Bases:
ABC
Define modbus events.
- abstract decode(event)
Decode the event message to its status bits.
- Parameters:
event – The event to decode
- abstract encode() bytes
Encode the status bits to an event message.
- class pymodbus.events.RemoteReceiveEvent(overrun=False, listen=False, broadcast=False)
Bases:
ModbusEvent
Remote device MODBUS Receive Event.
The remote device stores this type of event byte when a query message is received. It is stored before the remote device processes the message. This event is defined by bit 7 set to logic “1”. The other bits will be set to a logic “1” if the corresponding condition is TRUE. The bit layout is:
Bit Contents ---------------------------------- 0 Not Used 2 Not Used 3 Not Used 4 Character Overrun 5 Currently in Listen Only Mode 6 Broadcast Receive 7 1
- decode(event: bytes) None
Decode the event message to its status bits.
- Parameters:
event – The event to decode
- encode() bytes
Encode the status bits to an event message.
- Returns:
The encoded event message
- class pymodbus.events.RemoteSendEvent(read=False, slave_abort=False, slave_busy=False, slave_nak=False, write_timeout=False, listen=False)
Bases:
ModbusEvent
Remote device MODBUS Send Event.
The remote device stores this type of event byte when it finishes processing a request message. It is stored if the remote device returned a normal or exception response, or no response.
This event is defined by bit 7 set to a logic “0”, with bit 6 set to a “1”. The other bits will be set to a logic “1” if the corresponding condition is TRUE. The bit layout is:
Bit Contents ----------------------------------------------------------- 0 Read Exception Sent (Exception Codes 1-3) 1 Slave Abort Exception Sent (Exception Code 4) 2 Slave Busy Exception Sent (Exception Codes 5-6) 3 Slave Program NAK Exception Sent (Exception Code 7) 4 Write Timeout Error Occurred 5 Currently in Listen Only Mode 6 1 7 0
- decode(event)
Decode the event message to its status bits.
- Parameters:
event – The event to decode
- encode()
Encode the status bits to an event message.
- Returns:
The encoded event message
Pymodbus Exceptions.
Custom exceptions to be used in the Modbus code.
- exception pymodbus.exceptions.ConnectionException(string='')
Bases:
ModbusException
Error resulting from a bad connection.
- exception pymodbus.exceptions.InvalidMessageReceivedException(string='')
Bases:
ModbusException
Error resulting from invalid response received or decoded.
- exception pymodbus.exceptions.MessageRegisterException(string='')
Bases:
ModbusException
Error resulting from failing to register a custom message request/response.
- exception pymodbus.exceptions.ModbusIOException(string='', function_code=None)
Bases:
ModbusException
Error resulting from data i/o.
- exception pymodbus.exceptions.NoSuchSlaveException(string='')
Bases:
ModbusException
Error resulting from making a request to a slave that does not exist.
- exception pymodbus.exceptions.NotImplementedException(string='')
Bases:
ModbusException
Error resulting from not implemented function.
- exception pymodbus.exceptions.ParameterException(string='')
Bases:
ModbusException
Error resulting from invalid parameter.
Modbus Payload Builders.
A collection of utilities for building and decoding modbus messages payloads.
- class pymodbus.payload.BinaryPayloadBuilder(payload=None, byteorder=Endian.LITTLE, wordorder=Endian.BIG, repack=False)
Bases:
object
A utility that helps build payload messages to be written with the various modbus messages.
It really is just a simple wrapper around the struct module, however it saves time looking up the format strings. What follows is a simple example:
builder = BinaryPayloadBuilder(byteorder=Endian.Little) builder.add_8bit_uint(1) builder.add_16bit_uint(2) payload = builder.build()
- add_16bit_float(value: float) None
Add a 16 bit float to the buffer.
- Parameters:
value – The value to add to the buffer
- add_16bit_int(value: int) None
Add a 16 bit signed int to the buffer.
- Parameters:
value – The value to add to the buffer
- add_16bit_uint(value: int) None
Add a 16 bit unsigned int to the buffer.
- Parameters:
value – The value to add to the buffer
- add_32bit_float(value: float) None
Add a 32 bit float to the buffer.
- Parameters:
value – The value to add to the buffer
- add_32bit_int(value: int) None
Add a 32 bit signed int to the buffer.
- Parameters:
value – The value to add to the buffer
- add_32bit_uint(value: int) None
Add a 32 bit unsigned int to the buffer.
- Parameters:
value – The value to add to the buffer
- add_64bit_float(value: float) None
Add a 64 bit float(double) to the buffer.
- Parameters:
value – The value to add to the buffer
- add_64bit_int(value: int) None
Add a 64 bit signed int to the buffer.
- Parameters:
value – The value to add to the buffer
- add_64bit_uint(value: int) None
Add a 64 bit unsigned int to the buffer.
- Parameters:
value – The value to add to the buffer
- add_8bit_int(value: int) None
Add a 8 bit signed int to the buffer.
- Parameters:
value – The value to add to the buffer
- add_8bit_uint(value: int) None
Add a 8 bit unsigned int to the buffer.
- Parameters:
value – The value to add to the buffer
- add_bits(values: list[bool]) None
Add a collection of bits to be encoded.
If these are less than a multiple of eight, they will be left padded with 0 bits to make it so.
- Parameters:
values – The value to add to the buffer
- add_string(value: str) None
Add a string to the buffer.
- Parameters:
value – The value to add to the buffer
- build() list[bytes]
Return the payload buffer as a list.
This list is two bytes per element and can thus be treated as a list of registers.
- Returns:
The payload buffer as a list
- classmethod deprecate()
Log warning.
- encode() bytes
Get the payload buffer encoded in bytes.
- reset() None
Reset the payload buffer.
- to_coils() list[bool]
Convert the payload buffer into a coil layout that can be used as a context block.
- Returns:
The coil layout to use as a block
- to_registers()
Convert the payload buffer to register layout that can be used as a context block.
- Returns:
The register layout to use as a block
- class pymodbus.payload.BinaryPayloadDecoder(payload, byteorder=Endian.LITTLE, wordorder=Endian.BIG)
Bases:
object
A utility that helps decode payload messages from a modbus response message.
It really is just a simple wrapper around the struct module, however it saves time looking up the format strings. What follows is a simple example:
decoder = BinaryPayloadDecoder(payload) first = decoder.decode_8bit_uint() second = decoder.decode_16bit_uint()
- classmethod bit_chunks(coils, size=8)
Return bit chunks.
- decode_16bit_float()
Decode a 16 bit float from the buffer.
- decode_16bit_int()
Decode a 16 bit signed int from the buffer.
- decode_16bit_uint()
Decode a 16 bit unsigned int from the buffer.
- decode_32bit_float()
Decode a 32 bit float from the buffer.
- decode_32bit_int()
Decode a 32 bit signed int from the buffer.
- decode_32bit_uint()
Decode a 32 bit unsigned int from the buffer.
- decode_64bit_float()
Decode a 64 bit float(double) from the buffer.
- decode_64bit_int()
Decode a 64 bit signed int from the buffer.
- decode_64bit_uint()
Decode a 64 bit unsigned int from the buffer.
- decode_8bit_int()
Decode a 8 bit signed int from the buffer.
- decode_8bit_uint()
Decode a 8 bit unsigned int from the buffer.
- decode_bits(package_len=1)
Decode a byte worth of bits from the buffer.
- decode_string(size=1)
Decode a string from the buffer.
- Parameters:
size – The size of the string to decode
- classmethod deprecate()
Log warning.
- classmethod fromCoils(coils, byteorder=Endian.LITTLE, _wordorder=Endian.BIG)
Initialize a payload decoder with the result of reading of coils.
- classmethod fromRegisters(registers, byteorder=Endian.LITTLE, wordorder=Endian.BIG)
Initialize a payload decoder.
With the result of reading a collection of registers from a modbus device.
The registers are treated as a list of 2 byte values. We have to do this because of how the data has already been decoded by the rest of the library.
- Parameters:
registers – The register results to initialize with
byteorder – The Byte order of each word
wordorder – The endianness of the word (when wordcount is >= 2)
- Returns:
An initialized PayloadDecoder
- Raises:
- reset()
Reset the decoder pointer back to the start.
- skip_bytes(nbytes)
Skip n bytes in the buffer.
- Parameters:
nbytes – The number of bytes to skip
Transaction.
- class pymodbus.transaction.TransactionManager(params: CommParams, framer: FramerBase, retries: int, is_server: bool, trace_packet: Callable[[bool, bytes], bytes] | None, trace_pdu: Callable[[bool, ModbusPDU], ModbusPDU] | None, trace_connect: Callable[[bool], None] | None, sync_client=None)
Bases:
ModbusProtocol
Transaction manager.
This is the central class of the library, providing a separation between API and communication: - clients/servers calls the manager to execute requests/responses - transport/framer/pdu is by the manager to communicate with the devices
Transaction manager handles: - Execution of requests (client), with retries and locking - Sending of responses (server), with retries - Connection management (on top of what transport offers) - No response (temporarily) from a device
Transaction manager offers: - a simple execute interface for requests (client) - a simple send interface for responses (server) - external trace methods tracing outgoing/incoming packets/PDUs (byte stream)
- callback_connected() None
Call when connection is succcesfull.
- callback_data(data: bytes, addr: tuple | None = None) int
Handle received data.
- callback_disconnected(exc: Exception | None) None
Call when connection is lost.
- callback_new_connection()
Call when listener receive new connection request.
- dummy_trace_connect(connect: bool) None
Do dummy trace.
- dummy_trace_packet(sending: bool, data: bytes) bytes
Do dummy trace.
- dummy_trace_pdu(sending: bool, pdu: ModbusPDU) ModbusPDU
Do dummy trace.
- async execute(no_response_expected: bool, request: ModbusPDU) ModbusPDU
Execute requests asynchronously.
- REMARK: this method is identical to sync_execute, apart from the lock and try/except.
any changes in either method MUST be mirrored !!!
- getNextTID() int
Retrieve the next transaction identifier.
- pdu_send(pdu: ModbusPDU, addr: tuple | None = None) None
Build byte stream and send.
- sync_execute(no_response_expected: bool, request: ModbusPDU) ModbusPDU
Execute requests asynchronously.
- REMARK: this method is identical to execute, apart from the lock and sync_receive.
any changes in either method MUST be mirrored !!!
- sync_get_response(dev_id) ModbusPDU
Receive until PDU is correct or timeout.
Modbus Utilities.
A collection of utilities for packing data, unpacking data computing checksums, and decode checksums.
- pymodbus.utilities.dict_property(store, index)
Create class properties from a dictionary.
Basically this allows you to remove a lot of possible boilerplate code.
- Parameters:
store – The store store to pull from
index – The index into the store to close over
- Returns:
An initialized property set
- pymodbus.utilities.hexlify_packets(packet)
Return hex representation of bytestring received.
- Parameters:
packet
- Returns:
- pymodbus.utilities.pack_bitstring(bits: list[bool]) bytes
Create a bytestring out of a list of bits.
- Parameters:
bits – A list of bits
example:
bits = [False, True, False, True] result = pack_bitstring(bits)
- pymodbus.utilities.unpack_bitstring(data: bytes) list[bool]
Create bit list out of a bytestring.
- Parameters:
data – The modbus data packet to decode
example:
bytes = "bytes to decode" result = unpack_bitstring(bytes)
PDU classes
Modbus Request/Response Decoders.
- class pymodbus.pdu.decoders.DecodePDU(is_server: bool)
Bases:
object
Decode pdu requests/responses (server/client).
- decode(frame: bytes) ModbusPDU | None
Decode a frame.
- lookupPduClass(data: bytes) type[ModbusPDU] | None
Use function_code to determine the class of the PDU.
- register(custom_class: type[ModbusPDU]) None
Register a function and sub function class with the decoder.
Bit Reading Request/Response messages.
- class pymodbus.pdu.bit_message.ReadCoilsRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDU
ReadCoilsRequest.
- decode(data: bytes) None
Decode a request pdu.
- encode() bytes
Encode a request pdu.
- function_code: int = 1
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Byte Count(1 byte) + Quantity of Coils (n Bytes)/8, if the remainder is different of 0 then N = N+1
- rtu_frame_size: int = 8
- async update_datastore(context: ModbusSlaveContext) ModbusPDU
Run request against a datastore.
- class pymodbus.pdu.bit_message.ReadCoilsResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDU
ReadCoilsResponse.
- decode(data: bytes) None
Decode response pdu.
- encode() bytes
Encode response pdu.
- function_code: int = 1
- rtu_byte_count_pos: int = 2
- class pymodbus.pdu.bit_message.ReadDiscreteInputsRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ReadCoilsRequest
ReadDiscreteInputsRequest.
- function_code: int = 2
- class pymodbus.pdu.bit_message.ReadDiscreteInputsResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ReadCoilsResponse
ReadDiscreteInputsResponse.
- function_code: int = 2
- class pymodbus.pdu.bit_message.WriteMultipleCoilsRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDU
WriteMultipleCoilsRequest.
- decode(data: bytes) None
Decode a write coils request.
- encode() bytes
Encode write coils request.
- function_code: int = 15
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Output Address (2 byte) + Quantity of Outputs (2 Bytes) :return:
- rtu_byte_count_pos: int = 6
- async update_datastore(context: ModbusSlaveContext) ModbusPDU
Run a request against a datastore.
- class pymodbus.pdu.bit_message.WriteMultipleCoilsResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDU
WriteMultipleCoilsResponse.
- decode(data: bytes) None
Decode a write coils response.
- encode() bytes
Encode write coils response.
- function_code: int = 15
- rtu_frame_size: int = 8
- class pymodbus.pdu.bit_message.WriteSingleCoilRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
WriteSingleCoilResponse
WriteSingleCoilRequest.
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Output Address (2 byte) + Output Value (2 Bytes)
- async update_datastore(context: ModbusSlaveContext) ModbusPDU
Run a request against a datastore.
- class pymodbus.pdu.bit_message.WriteSingleCoilResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDU
WriteSingleCoilResponse.
- decode(data: bytes) None
Decode a write coil request.
- encode() bytes
Encode write coil request.
- function_code: int = 5
- rtu_frame_size: int = 8
Diagnostic Record Read/Write.
- class pymodbus.pdu.diag_message.ChangeAsciiInputDelimiterRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ChangeAsciiInputDelimiterRequest.
- sub_function_code: int = 3
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ChangeAsciiInputDelimiterResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ChangeAsciiInputDelimiterResponse.
- sub_function_code: int = 3
- class pymodbus.pdu.diag_message.ClearCountersRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ClearCountersRequest.
- sub_function_code: int = 10
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ClearCountersResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ClearCountersResponse.
- sub_function_code: int = 10
- class pymodbus.pdu.diag_message.ClearOverrunCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ClearOverrunCountRequest.
- sub_function_code: int = 20
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ClearOverrunCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ClearOverrunCountResponse.
- sub_function_code: int = 20
- class pymodbus.pdu.diag_message.DiagnosticBase(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDU
DiagnosticBase.
- decode(data: bytes) None
Decode a diagnostic request.
- encode() bytes
Encode a diagnostic response.
- function_code: int = 8
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Sub function code (2 byte) + Data (2 * N bytes)
- rtu_frame_size: int = 8
- sub_function_code: int = 9999
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
Implement dummy.
- class pymodbus.pdu.diag_message.ForceListenOnlyModeRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ForceListenOnlyModeRequest.
- sub_function_code: int = 4
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ForceListenOnlyModeResponse(dev_id=1, transaction_id=0)
Bases:
DiagnosticBase
ForceListenOnlyModeResponse.
This does not send a response
- sub_function_code: int = 4
- class pymodbus.pdu.diag_message.GetClearModbusPlusRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
GetClearModbusPlusRequest.
- encode()
Encode a diagnostic response.
- get_response_pdu_size()
Return size of the respaonse.
Func_code (1 byte) + Sub function code (2 byte) + Operation (2 byte) + Data (108 bytes)
- sub_function_code: int = 21
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.GetClearModbusPlusResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
GetClearModbusPlusResponse.
- sub_function_code: int = 21
- class pymodbus.pdu.diag_message.RestartCommunicationsOptionRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
RestartCommunicationsOptionRequest.
- sub_function_code: int = 1
- class pymodbus.pdu.diag_message.RestartCommunicationsOptionResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
RestartCommunicationsOptionResponse.
- sub_function_code: int = 1
- class pymodbus.pdu.diag_message.ReturnBusCommunicationErrorCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnBusCommunicationErrorCountRequest.
- sub_function_code: int = 12
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnBusCommunicationErrorCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnBusCommunicationErrorCountResponse.
- sub_function_code: int = 12
- class pymodbus.pdu.diag_message.ReturnBusExceptionErrorCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnBusExceptionErrorCountRequest.
- sub_function_code: int = 13
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnBusExceptionErrorCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnBusExceptionErrorCountResponse.
- sub_function_code: int = 13
- class pymodbus.pdu.diag_message.ReturnBusMessageCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnBusMessageCountRequest.
- sub_function_code: int = 11
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnBusMessageCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnBusMessageCountResponse.
- sub_function_code: int = 11
- class pymodbus.pdu.diag_message.ReturnDiagnosticRegisterRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnDiagnosticRegisterRequest.
- sub_function_code: int = 2
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnDiagnosticRegisterResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnDiagnosticRegisterResponse.
- sub_function_code: int = 2
- class pymodbus.pdu.diag_message.ReturnIopOverrunCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnIopOverrunCountRequest.
- sub_function_code: int = 19
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnIopOverrunCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnIopOverrunCountResponse.
- sub_function_code: int = 19
- class pymodbus.pdu.diag_message.ReturnQueryDataRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnQueryDataRequest.
- sub_function_code: int = 0
- class pymodbus.pdu.diag_message.ReturnQueryDataResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnQueryDataResponse.
- sub_function_code: int = 0
- class pymodbus.pdu.diag_message.ReturnSlaveBusCharacterOverrunCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnSlaveBusCharacterOverrunCountRequest.
- sub_function_code: int = 18
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnSlaveBusCharacterOverrunCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnSlaveBusCharacterOverrunCountResponse.
- sub_function_code: int = 18
- class pymodbus.pdu.diag_message.ReturnSlaveBusyCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnSlaveBusyCountRequest.
- sub_function_code: int = 17
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnSlaveBusyCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnSlaveBusyCountResponse.
- sub_function_code: int = 17
- class pymodbus.pdu.diag_message.ReturnSlaveMessageCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnSlaveMessageCountRequest.
- sub_function_code: int = 14
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnSlaveMessageCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnSlaveMessageCountResponse.
- sub_function_code: int = 14
- class pymodbus.pdu.diag_message.ReturnSlaveNAKCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnSlaveNAKCountRequest.
- sub_function_code: int = 16
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnSlaveNAKCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnSlaveNAKCountResponse.
- sub_function_code: int = 16
- class pymodbus.pdu.diag_message.ReturnSlaveNoResponseCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnSlaveNoResponseCountRequest.
- sub_function_code: int = 15
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
update_datastore the diagnostic request on the given device.
- class pymodbus.pdu.diag_message.ReturnSlaveNoResponseCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
DiagnosticBase
ReturnSlaveNoResponseCountResponse.
- sub_function_code: int = 15
File Record Read/Write Messages.
- class pymodbus.pdu.file_message.FileRecord(file_number: int = 0, record_number: int = 0, record_data: bytes = b'', record_length: int = 0)
Bases:
object
Represents a file record and its relevant data.
- file_number: int = 0
- record_data: bytes = b''
- record_length: int = 0
- record_number: int = 0
- class pymodbus.pdu.file_message.ReadFifoQueueRequest(address: int = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDU
ReadFifoQueueRequest.
- decode(data: bytes) None
Decode the incoming request.
- encode() bytes
Encode the request packet.
- function_code: int = 24
- rtu_frame_size: int = 6
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
Run a read exception status request against the store.
- class pymodbus.pdu.file_message.ReadFifoQueueResponse(values: list[int] | None = None, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDU
ReadFifoQueueResponse.
- classmethod calculateRtuFrameSize(buffer: bytes) int
Calculate the size of the message.
- decode(data: bytes) None
Decode a the response.
- encode() bytes
Encode the response.
- function_code: int = 24
- class pymodbus.pdu.file_message.ReadFileRecordRequest(records: list[FileRecord] | None = None, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDU
ReadFileRecordRequest.
- decode(data: bytes) None
Decode the incoming request.
- encode() bytes
Encode the request packet.
- function_code: int = 20
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Quantity of record (each 7 bytes),
- rtu_byte_count_pos: int = 2
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
Run a read exception status request against the store.
- class pymodbus.pdu.file_message.ReadFileRecordResponse(records: list[FileRecord] | None = None, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDU
ReadFileRecordResponse.
- decode(data: bytes) None
Decode the response.
- encode() bytes
Encode the response.
- function_code: int = 20
- rtu_byte_count_pos: int = 2
- class pymodbus.pdu.file_message.WriteFileRecordRequest(records: list[FileRecord] | None = None, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDU
WriteFileRecordRequest.
- decode(data: bytes) None
Decode the incoming request.
- encode() bytes
Encode the request packet.
- function_code: int = 21
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Quantity of record (each 7 bytes),
- rtu_byte_count_pos: int = 2
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
Run the write file record request against the context.
- class pymodbus.pdu.file_message.WriteFileRecordResponse(records: list[FileRecord] | None = None, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDU
The normal response is an echo of the request.
- decode(data: bytes) None
Decode the incoming request.
- encode() bytes
Encode the response.
- function_code: int = 21
- rtu_byte_count_pos: int = 2
Encapsulated Interface (MEI) Transport Messages.
- class pymodbus.pdu.mei_message.ReadDeviceInformationRequest(read_code: int | None = None, object_id: int = 0, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDU
ReadDeviceInformationRequest.
- decode(data: bytes) None
Decode data part of the message.
- encode() bytes
Encode the request packet.
- function_code: int = 43
- rtu_frame_size: int = 7
- sub_function_code: int = 14
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
Run a read exception status request against the store.
- class pymodbus.pdu.mei_message.ReadDeviceInformationResponse(read_code: int | None = None, information: dict | None = None, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDU
ReadDeviceInformationResponse.
- classmethod calculateRtuFrameSize(buffer: bytes) int
Calculate the size of the message.
- decode(data: bytes) None
Decode a the response.
- encode() bytes
Encode the response.
- function_code: int = 43
- sub_function_code: int = 14
Diagnostic record read/write.
- class pymodbus.pdu.other_message.GetCommEventCounterRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDU
GetCommEventCounterRequest.
- decode(_data: bytes) None
Decode data part of the message.
- encode() bytes
Encode the message.
- function_code: int = 11
- rtu_frame_size: int = 4
- async update_datastore(_context) ModbusPDU
Run a read exception status request against the store.
- class pymodbus.pdu.other_message.GetCommEventCounterResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDU
GetCommEventCounterRequest.
- decode(data: bytes) None
Decode a the response.
- encode() bytes
Encode the response.
- function_code: int = 11
- rtu_frame_size: int = 8
- class pymodbus.pdu.other_message.GetCommEventLogRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDU
GetCommEventLogRequest.
- decode(_data: bytes) None
Decode data part of the message.
- encode() bytes
Encode the message.
- function_code: int = 12
- rtu_frame_size: int = 4
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
Run a read exception status request against the store.
- class pymodbus.pdu.other_message.GetCommEventLogResponse(status: bool = True, message_count: int = 0, event_count: int = 0, events: list[int] | None = None, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDU
GetCommEventLogRequest.
- decode(data: bytes) None
Decode a the response.
- encode() bytes
Encode the response.
- function_code: int = 12
- rtu_byte_count_pos: int = 2
- class pymodbus.pdu.other_message.ReadExceptionStatusRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDU
ReadExceptionStatusRequest.
- decode(_data: bytes) None
Decode data part of the message.
- encode() bytes
Encode the message.
- function_code: int = 7
- rtu_frame_size: int = 4
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
Run a read exception status request against the store.
- class pymodbus.pdu.other_message.ReadExceptionStatusResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDU
ReadExceptionStatusResponse.
- decode(data: bytes) None
Decode a the response.
- encode() bytes
Encode the response.
- function_code: int = 7
- rtu_frame_size: int = 5
- class pymodbus.pdu.other_message.ReportSlaveIdRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDU
ReportSlaveIdRequest.
- decode(_data: bytes) None
Decode data part of the message.
- encode() bytes
Encode the message.
- function_code: int = 17
- rtu_frame_size: int = 4
- async update_datastore(_context: ModbusSlaveContext) ModbusPDU
Run a report slave id request against the store.
- class pymodbus.pdu.other_message.ReportSlaveIdResponse(identifier: bytes = b'\x00', status: bool = True, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDU
ReportSlaveIdRequeste.
- decode(data: bytes) None
Decode a the response.
Since the identifier is device dependent, we just return the raw value that a user can decode to whatever it should be.
- encode() bytes
Encode the response.
- function_code: int = 17
- rtu_byte_count_pos: int = 2
Contains base classes for modbus request/response/error packets.
- class pymodbus.pdu.pdu.ExceptionResponse(function_code: int, exception_code: int = 0, slave: int = 1, transaction: int = 0)
Bases:
ModbusPDU
Base class for a modbus exception PDU.
- ACKNOWLEDGE = 5
- GATEWAY_NO_RESPONSE = 11
- GATEWAY_PATH_UNAVIABLE = 10
- ILLEGAL_ADDRESS = 2
- ILLEGAL_FUNCTION = 1
- ILLEGAL_VALUE = 3
- MEMORY_PARITY_ERROR = 8
- NEGATIVE_ACKNOWLEDGE = 7
- SLAVE_BUSY = 6
- SLAVE_FAILURE = 4
- address: int
- bits: list[bool]
- count: int
- decode(data: bytes) None
Decode a modbus exception response.
- dev_id: int
- encode() bytes
Encode a modbus exception response.
- fut: Future
- registers: list[int]
- rtu_frame_size: int = 5
- status: int
- transaction_id: int
- class pymodbus.pdu.pdu.ModbusPDU(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
object
Base class for all Modbus messages.
- classmethod calculateRtuFrameSize(data: bytes) int
Calculate the size of a PDU.
- abstract decode(data: bytes) None
Decode data part of the message.
- abstract encode() bytes
Encode the message.
- function_code: int = 0
- get_response_pdu_size() int
Calculate response pdu size.
- isError() bool
Check if the error is a success or failure.
- rtu_byte_count_pos: int = 0
- rtu_frame_size: int = 0
- sub_function_code: int = -1
- async update_datastore(context: ModbusSlaveContext) ModbusPDU
Run request against a datastore.
- validateAddress(address: int = -1) None
Validate API supplied address.
- validateCount(max_count: int, count: int = -1) None
Validate API supplied count.
Register Reading Request/Response.
- class pymodbus.pdu.register_message.MaskWriteRegisterRequest(address=0, and_mask=65535, or_mask=0, dev_id=1, transaction_id=0)
Bases:
ModbusPDU
MaskWriteRegisterRequest.
- decode(data: bytes) None
Decode the incoming request.
- encode() bytes
Encode the request packet.
- function_code: int = 22
- rtu_frame_size: int = 10
- async update_datastore(context: ModbusSlaveContext) ModbusPDU
Run a mask write register request against the store.
- class pymodbus.pdu.register_message.MaskWriteRegisterResponse(address=0, and_mask=65535, or_mask=0, dev_id=1, transaction_id=0)
Bases:
ModbusPDU
MaskWriteRegisterResponse.
- decode(data: bytes) None
Decode a the response.
- encode() bytes
Encode the response.
- function_code: int = 22
- rtu_frame_size: int = 10
- class pymodbus.pdu.register_message.ReadHoldingRegistersRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDU
ReadHoldingRegistersRequest.
- decode(data: bytes) None
Decode a register request packet.
- encode() bytes
Encode the request packet.
- function_code: int = 3
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Byte Count(1 byte) + 2 * Quantity of registers (== byte count).
- rtu_frame_size: int = 8
- async update_datastore(context: ModbusSlaveContext) ModbusPDU
Run a read holding request against a datastore.
- class pymodbus.pdu.register_message.ReadHoldingRegistersResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDU
ReadHoldingRegistersResponse.
- decode(data: bytes) None
Decode a register response packet.
- encode() bytes
Encode the response packet.
- function_code: int = 3
- rtu_byte_count_pos: int = 2
- class pymodbus.pdu.register_message.ReadInputRegistersRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ReadHoldingRegistersRequest
ReadInputRegistersRequest.
- function_code: int = 4
- class pymodbus.pdu.register_message.ReadInputRegistersResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ReadHoldingRegistersResponse
ReadInputRegistersResponse.
- function_code: int = 4
- class pymodbus.pdu.register_message.ReadWriteMultipleRegistersRequest(read_address: int = 0, read_count: int = 0, write_address: int = 0, write_registers: list[int] | None = None, dev_id: int = 1, transaction_id: int = 0)
Bases:
ModbusPDU
ReadWriteMultipleRegistersRequest.
- decode(data: bytes) None
Decode the register request packet.
- encode() bytes
Encode the request packet.
- function_code: int = 23
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Byte Count(1 byte) + 2 * Quantity of Coils (n Bytes)
- rtu_byte_count_pos: int = 10
- async update_datastore(context: ModbusSlaveContext) ModbusPDU
Run a write single register request against a datastore.
- class pymodbus.pdu.register_message.ReadWriteMultipleRegistersResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ReadHoldingRegistersResponse
ReadWriteMultipleRegistersResponse.
- function_code: int = 23
- class pymodbus.pdu.register_message.WriteMultipleRegistersRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDU
WriteMultipleRegistersRequest.
- decode(data: bytes) None
Decode a write single register packet packet request.
- encode() bytes
Encode a write single register packet packet request.
- function_code: int = 16
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Starting Address (2 byte) + Quantity of Registers (2 Bytes)
- rtu_byte_count_pos: int = 6
- async update_datastore(context: ModbusSlaveContext) ModbusPDU
Run a write single register request against a datastore.
- class pymodbus.pdu.register_message.WriteMultipleRegistersResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDU
WriteMultipleRegistersResponse.
- decode(data: bytes) None
Decode a write single register packet packet request.
- encode() bytes
Encode a write single register packet packet request.
- function_code: int = 16
- rtu_frame_size: int = 8
- class pymodbus.pdu.register_message.WriteSingleRegisterRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
WriteSingleRegisterResponse
WriteSingleRegisterRequest.
- get_response_pdu_size() int
Get response pdu size.
Func_code (1 byte) + Register Address(2 byte) + Register Value (2 bytes)
- async update_datastore(context: ModbusSlaveContext) ModbusPDU
Run a write single register request against a datastore.
- class pymodbus.pdu.register_message.WriteSingleRegisterResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)
Bases:
ModbusPDU
WriteSingleRegisterResponse.
- decode(data: bytes) None
Decode a write single register packet packet request.
- encode() bytes
Encode a write single register packet packet request.
- function_code: int = 6
- rtu_frame_size: int = 8