Extra functions

Pymodbus: Modbus Protocol Implementation.

Released under the BSD license

class pymodbus.ExceptionResponse(function_code: int, exception_code: int = 0, slave: int = 1, transaction: int = 0)

Bases: ModbusPDU

Base class for a modbus exception PDU.

ACKNOWLEDGE = 5
GATEWAY_NO_RESPONSE = 11
GATEWAY_PATH_UNAVIABLE = 10
ILLEGAL_ADDRESS = 2
ILLEGAL_FUNCTION = 1
ILLEGAL_VALUE = 3
MEMORY_PARITY_ERROR = 8
NEGATIVE_ACKNOWLEDGE = 7
SLAVE_BUSY = 6
SLAVE_FAILURE = 4
decode(data: bytes) None

Decode a modbus exception response.

encode() bytes

Encode a modbus exception response.

rtu_frame_size: int = 5
class pymodbus.FramerType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: str, Enum

Type of Modbus frame.

ASCII = 'ascii'
RTU = 'rtu'
SOCKET = 'socket'
TLS = 'tls'
exception pymodbus.ModbusException(string)

Bases: Exception

Base modbus exception.

isError()

Error

pymodbus.pymodbus_apply_logging_config(level: str | int = 10, log_file_name: str | None = None)

Apply basic logging configuration used by default by Pymodbus maintainers.

Parameters:
  • level – (optional) set log level, if not set it is inherited.

  • log_file_name – (optional) log additional to file

Please call this function to format logging appropriately when opening issues.

Modbus Device Controller.

These are the device management handlers. They should be maintained in the server context and the various methods should be inserted in the correct locations.

class pymodbus.device.DeviceInformationFactory

Bases: object

This is a helper.

That really just hides some of the complexity of processing the device information requests (function code 0x2b 0x0e).

classmethod get(control, read_code=DeviceInformation.BASIC, object_id=0)

Get the requested device data from the system.

Parameters:
  • control – The control block to pull data from

  • read_code – The read code to process

  • object_id – The specific object_id to read

Returns:

The requested data (id, length, value)

class pymodbus.device.ModbusDeviceIdentification(info=None, info_name=None)

Bases: object

This is used to supply the device identification.

For the readDeviceIdentification function

For more information read section 6.21 of the modbus application protocol.

property MajorMinorRevision
property ModelName
property ProductCode
property ProductName
property UserApplicationName
property VendorName
property VendorUrl
summary()

Return a summary of the main items.

Returns:

An dictionary of the main items

update(value)

Update the values of this identity.

using another identify as the value

Parameters:

value – The value to copy values from

class pymodbus.device.ModbusPlusStatistics

Bases: object

This is used to maintain the current modbus plus statistics count.

As of right now this is simply a stub to complete the modbus implementation. For more information, see the modbus implementation guide page 87.

encode()

Return a summary of the modbus plus statistics.

Returns:

54 16-bit words representing the status

reset()

Clear all of the modbus plus statistics.

summary()

Return a summary of the modbus plus statistics.

Returns:

54 16-bit words representing the status

Modbus Remote Events.

An event byte returned by the Get Communications Event Log function can be any one of four types. The type is defined by bit 7 (the high-order bit) in each byte. It may be further defined by bit 6.

class pymodbus.events.CommunicationRestartEvent

Bases: ModbusEvent

Restart remote device Initiated Communication.

The remote device stores this type of event byte when its communications port is restarted. The remote device can be restarted by the Diagnostics function (code 08), with sub-function Restart Communications Option (code 00 01).

That function also places the remote device into a “Continue on Error” or “Stop on Error” mode. If the remote device is placed into “Continue on Error” mode, the event byte is added to the existing event log. If the remote device is placed into “Stop on Error” mode, the byte is added to the log and the rest of the log is cleared to zeros.

The event is defined by a content of zero.

decode(event)

Decode the event message to its status bits.

Parameters:

event – The event to decode

Raises:

ParameterException

encode()

Encode the status bits to an event message.

Returns:

The encoded event message

value = 0
class pymodbus.events.EnteredListenModeEvent

Bases: ModbusEvent

Enter Remote device Listen Only Mode.

The remote device stores this type of event byte when it enters the Listen Only Mode. The event is defined by a content of 04 hex.

decode(event)

Decode the event message to its status bits.

Parameters:

event – The event to decode

Raises:

ParameterException

encode()

Encode the status bits to an event message.

Returns:

The encoded event message

value = 4
class pymodbus.events.ModbusEvent

Bases: ABC

Define modbus events.

abstract decode(event)

Decode the event message to its status bits.

Parameters:

event – The event to decode

abstract encode() bytes

Encode the status bits to an event message.

class pymodbus.events.RemoteReceiveEvent(overrun=False, listen=False, broadcast=False)

Bases: ModbusEvent

Remote device MODBUS Receive Event.

The remote device stores this type of event byte when a query message is received. It is stored before the remote device processes the message. This event is defined by bit 7 set to logic “1”. The other bits will be set to a logic “1” if the corresponding condition is TRUE. The bit layout is:

Bit Contents
----------------------------------
0   Not Used
2   Not Used
3   Not Used
4   Character Overrun
5   Currently in Listen Only Mode
6   Broadcast Receive
7   1
decode(event: bytes) None

Decode the event message to its status bits.

Parameters:

event – The event to decode

encode() bytes

Encode the status bits to an event message.

Returns:

The encoded event message

class pymodbus.events.RemoteSendEvent(read=False, slave_abort=False, slave_busy=False, slave_nak=False, write_timeout=False, listen=False)

Bases: ModbusEvent

Remote device MODBUS Send Event.

The remote device stores this type of event byte when it finishes processing a request message. It is stored if the remote device returned a normal or exception response, or no response.

This event is defined by bit 7 set to a logic “0”, with bit 6 set to a “1”. The other bits will be set to a logic “1” if the corresponding condition is TRUE. The bit layout is:

Bit Contents
-----------------------------------------------------------
0   Read Exception Sent (Exception Codes 1-3)
1   Slave Abort Exception Sent (Exception Code 4)
2   Slave Busy Exception Sent (Exception Codes 5-6)
3   Slave Program NAK Exception Sent (Exception Code 7)
4   Write Timeout Error Occurred
5   Currently in Listen Only Mode
6   1
7   0
decode(event)

Decode the event message to its status bits.

Parameters:

event – The event to decode

encode()

Encode the status bits to an event message.

Returns:

The encoded event message

Pymodbus Exceptions.

Custom exceptions to be used in the Modbus code.

exception pymodbus.exceptions.ConnectionException(string='')

Bases: ModbusException

Error resulting from a bad connection.

exception pymodbus.exceptions.InvalidMessageReceivedException(string='')

Bases: ModbusException

Error resulting from invalid response received or decoded.

exception pymodbus.exceptions.MessageRegisterException(string='')

Bases: ModbusException

Error resulting from failing to register a custom message request/response.

exception pymodbus.exceptions.ModbusIOException(string='', function_code=None)

Bases: ModbusException

Error resulting from data i/o.

exception pymodbus.exceptions.NoSuchSlaveException(string='')

Bases: ModbusException

Error resulting from making a request to a slave that does not exist.

exception pymodbus.exceptions.NotImplementedException(string='')

Bases: ModbusException

Error resulting from not implemented function.

exception pymodbus.exceptions.ParameterException(string='')

Bases: ModbusException

Error resulting from invalid parameter.

Modbus Payload Builders.

A collection of utilities for building and decoding modbus messages payloads.

class pymodbus.payload.BinaryPayloadBuilder(payload=None, byteorder=Endian.LITTLE, wordorder=Endian.BIG, repack=False)

Bases: object

A utility that helps build payload messages to be written with the various modbus messages.

It really is just a simple wrapper around the struct module, however it saves time looking up the format strings. What follows is a simple example:

builder = BinaryPayloadBuilder(byteorder=Endian.Little)
builder.add_8bit_uint(1)
builder.add_16bit_uint(2)
payload = builder.build()
add_16bit_float(value: float) None

Add a 16 bit float to the buffer.

Parameters:

value – The value to add to the buffer

add_16bit_int(value: int) None

Add a 16 bit signed int to the buffer.

Parameters:

value – The value to add to the buffer

add_16bit_uint(value: int) None

Add a 16 bit unsigned int to the buffer.

Parameters:

value – The value to add to the buffer

add_32bit_float(value: float) None

Add a 32 bit float to the buffer.

Parameters:

value – The value to add to the buffer

add_32bit_int(value: int) None

Add a 32 bit signed int to the buffer.

Parameters:

value – The value to add to the buffer

add_32bit_uint(value: int) None

Add a 32 bit unsigned int to the buffer.

Parameters:

value – The value to add to the buffer

add_64bit_float(value: float) None

Add a 64 bit float(double) to the buffer.

Parameters:

value – The value to add to the buffer

add_64bit_int(value: int) None

Add a 64 bit signed int to the buffer.

Parameters:

value – The value to add to the buffer

add_64bit_uint(value: int) None

Add a 64 bit unsigned int to the buffer.

Parameters:

value – The value to add to the buffer

add_8bit_int(value: int) None

Add a 8 bit signed int to the buffer.

Parameters:

value – The value to add to the buffer

add_8bit_uint(value: int) None

Add a 8 bit unsigned int to the buffer.

Parameters:

value – The value to add to the buffer

add_bits(values: list[bool]) None

Add a collection of bits to be encoded.

If these are less than a multiple of eight, they will be left padded with 0 bits to make it so.

Parameters:

values – The value to add to the buffer

add_string(value: str) None

Add a string to the buffer.

Parameters:

value – The value to add to the buffer

build() list[bytes]

Return the payload buffer as a list.

This list is two bytes per element and can thus be treated as a list of registers.

Returns:

The payload buffer as a list

classmethod deprecate()

Log warning.

encode() bytes

Get the payload buffer encoded in bytes.

reset() None

Reset the payload buffer.

to_coils() list[bool]

Convert the payload buffer into a coil layout that can be used as a context block.

Returns:

The coil layout to use as a block

to_registers()

Convert the payload buffer to register layout that can be used as a context block.

Returns:

The register layout to use as a block

class pymodbus.payload.BinaryPayloadDecoder(payload, byteorder=Endian.LITTLE, wordorder=Endian.BIG)

Bases: object

A utility that helps decode payload messages from a modbus response message.

It really is just a simple wrapper around the struct module, however it saves time looking up the format strings. What follows is a simple example:

decoder = BinaryPayloadDecoder(payload)
first   = decoder.decode_8bit_uint()
second  = decoder.decode_16bit_uint()
classmethod bit_chunks(coils, size=8)

Return bit chunks.

decode_16bit_float()

Decode a 16 bit float from the buffer.

decode_16bit_int()

Decode a 16 bit signed int from the buffer.

decode_16bit_uint()

Decode a 16 bit unsigned int from the buffer.

decode_32bit_float()

Decode a 32 bit float from the buffer.

decode_32bit_int()

Decode a 32 bit signed int from the buffer.

decode_32bit_uint()

Decode a 32 bit unsigned int from the buffer.

decode_64bit_float()

Decode a 64 bit float(double) from the buffer.

decode_64bit_int()

Decode a 64 bit signed int from the buffer.

decode_64bit_uint()

Decode a 64 bit unsigned int from the buffer.

decode_8bit_int()

Decode a 8 bit signed int from the buffer.

decode_8bit_uint()

Decode a 8 bit unsigned int from the buffer.

decode_bits(package_len=1)

Decode a byte worth of bits from the buffer.

decode_string(size=1)

Decode a string from the buffer.

Parameters:

size – The size of the string to decode

classmethod deprecate()

Log warning.

classmethod fromCoils(coils, byteorder=Endian.LITTLE, _wordorder=Endian.BIG)

Initialize a payload decoder with the result of reading of coils.

classmethod fromRegisters(registers, byteorder=Endian.LITTLE, wordorder=Endian.BIG)

Initialize a payload decoder.

With the result of reading a collection of registers from a modbus device.

The registers are treated as a list of 2 byte values. We have to do this because of how the data has already been decoded by the rest of the library.

Parameters:
  • registers – The register results to initialize with

  • byteorder – The Byte order of each word

  • wordorder – The endianness of the word (when wordcount is >= 2)

Returns:

An initialized PayloadDecoder

Raises:

ParameterException

reset()

Reset the decoder pointer back to the start.

skip_bytes(nbytes)

Skip n bytes in the buffer.

Parameters:

nbytes – The number of bytes to skip

Transaction.

class pymodbus.transaction.TransactionManager(params: CommParams, framer: FramerBase, retries: int, is_server: bool, trace_packet: Callable[[bool, bytes], bytes] | None, trace_pdu: Callable[[bool, ModbusPDU], ModbusPDU] | None, trace_connect: Callable[[bool], None] | None, sync_client=None)

Bases: ModbusProtocol

Transaction manager.

This is the central class of the library, providing a separation between API and communication: - clients/servers calls the manager to execute requests/responses - transport/framer/pdu is by the manager to communicate with the devices

Transaction manager handles: - Execution of requests (client), with retries and locking - Sending of responses (server), with retries - Connection management (on top of what transport offers) - No response (temporarily) from a device

Transaction manager offers: - a simple execute interface for requests (client) - a simple send interface for responses (server) - external trace methods tracing outgoing/incoming packets/PDUs (byte stream)

callback_connected() None

Call when connection is succcesfull.

callback_data(data: bytes, addr: tuple | None = None) int

Handle received data.

callback_disconnected(exc: Exception | None) None

Call when connection is lost.

callback_new_connection()

Call when listener receive new connection request.

dummy_trace_connect(connect: bool) None

Do dummy trace.

dummy_trace_packet(sending: bool, data: bytes) bytes

Do dummy trace.

dummy_trace_pdu(sending: bool, pdu: ModbusPDU) ModbusPDU

Do dummy trace.

async execute(no_response_expected: bool, request: ModbusPDU) ModbusPDU

Execute requests asynchronously.

REMARK: this method is identical to sync_execute, apart from the lock and try/except.

any changes in either method MUST be mirrored !!!

getNextTID() int

Retrieve the next transaction identifier.

pdu_send(pdu: ModbusPDU, addr: tuple | None = None) None

Build byte stream and send.

sync_execute(no_response_expected: bool, request: ModbusPDU) ModbusPDU

Execute requests asynchronously.

REMARK: this method is identical to execute, apart from the lock and sync_receive.

any changes in either method MUST be mirrored !!!

sync_get_response(dev_id) ModbusPDU

Receive until PDU is correct or timeout.

Modbus Utilities.

A collection of utilities for packing data, unpacking data computing checksums, and decode checksums.

pymodbus.utilities.dict_property(store, index)

Create class properties from a dictionary.

Basically this allows you to remove a lot of possible boilerplate code.

Parameters:
  • store – The store store to pull from

  • index – The index into the store to close over

Returns:

An initialized property set

pymodbus.utilities.hexlify_packets(packet)

Return hex representation of bytestring received.

Parameters:

packet

Returns:

pymodbus.utilities.pack_bitstring(bits: list[bool]) bytes

Create a bytestring out of a list of bits.

Parameters:

bits – A list of bits

example:

bits   = [False, True, False, True]
result = pack_bitstring(bits)
pymodbus.utilities.unpack_bitstring(data: bytes) list[bool]

Create bit list out of a bytestring.

Parameters:

data – The modbus data packet to decode

example:

bytes  = "bytes to decode"
result = unpack_bitstring(bytes)

PDU classes

Modbus Request/Response Decoders.

class pymodbus.pdu.decoders.DecodePDU(is_server: bool)

Bases: object

Decode pdu requests/responses (server/client).

decode(frame: bytes) ModbusPDU | None

Decode a frame.

lookupPduClass(data: bytes) type[ModbusPDU] | None

Use function_code to determine the class of the PDU.

register(custom_class: type[ModbusPDU]) None

Register a function and sub function class with the decoder.

Bit Reading Request/Response messages.

class pymodbus.pdu.bit_message.ReadCoilsRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

ReadCoilsRequest.

decode(data: bytes) None

Decode a request pdu.

encode() bytes

Encode a request pdu.

function_code: int = 1
get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Byte Count(1 byte) + Quantity of Coils (n Bytes)/8, if the remainder is different of 0 then N = N+1

rtu_frame_size: int = 8
async update_datastore(context: ModbusSlaveContext) ModbusPDU

Run request against a datastore.

class pymodbus.pdu.bit_message.ReadCoilsResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

ReadCoilsResponse.

decode(data: bytes) None

Decode response pdu.

encode() bytes

Encode response pdu.

function_code: int = 1
rtu_byte_count_pos: int = 2
class pymodbus.pdu.bit_message.ReadDiscreteInputsRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ReadCoilsRequest

ReadDiscreteInputsRequest.

function_code: int = 2
class pymodbus.pdu.bit_message.ReadDiscreteInputsResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ReadCoilsResponse

ReadDiscreteInputsResponse.

function_code: int = 2
class pymodbus.pdu.bit_message.WriteMultipleCoilsRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

WriteMultipleCoilsRequest.

decode(data: bytes) None

Decode a write coils request.

encode() bytes

Encode write coils request.

function_code: int = 15
get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Output Address (2 byte) + Quantity of Outputs (2 Bytes) :return:

rtu_byte_count_pos: int = 6
async update_datastore(context: ModbusSlaveContext) ModbusPDU

Run a request against a datastore.

class pymodbus.pdu.bit_message.WriteMultipleCoilsResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

WriteMultipleCoilsResponse.

decode(data: bytes) None

Decode a write coils response.

encode() bytes

Encode write coils response.

function_code: int = 15
rtu_frame_size: int = 8
class pymodbus.pdu.bit_message.WriteSingleCoilRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: WriteSingleCoilResponse

WriteSingleCoilRequest.

get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Output Address (2 byte) + Output Value (2 Bytes)

async update_datastore(context: ModbusSlaveContext) ModbusPDU

Run a request against a datastore.

class pymodbus.pdu.bit_message.WriteSingleCoilResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

WriteSingleCoilResponse.

decode(data: bytes) None

Decode a write coil request.

encode() bytes

Encode write coil request.

function_code: int = 5
rtu_frame_size: int = 8

Diagnostic Record Read/Write.

class pymodbus.pdu.diag_message.ChangeAsciiInputDelimiterRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ChangeAsciiInputDelimiterRequest.

sub_function_code: int = 3
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

update_datastore the diagnostic request on the given device.

class pymodbus.pdu.diag_message.ChangeAsciiInputDelimiterResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ChangeAsciiInputDelimiterResponse.

sub_function_code: int = 3
class pymodbus.pdu.diag_message.ClearCountersRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ClearCountersRequest.

sub_function_code: int = 10
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

update_datastore the diagnostic request on the given device.

class pymodbus.pdu.diag_message.ClearCountersResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ClearCountersResponse.

sub_function_code: int = 10
class pymodbus.pdu.diag_message.ClearOverrunCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ClearOverrunCountRequest.

sub_function_code: int = 20
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

update_datastore the diagnostic request on the given device.

class pymodbus.pdu.diag_message.ClearOverrunCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ClearOverrunCountResponse.

sub_function_code: int = 20
class pymodbus.pdu.diag_message.DiagnosticBase(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

DiagnosticBase.

decode(data: bytes) None

Decode a diagnostic request.

encode() bytes

Encode a diagnostic response.

function_code: int = 8
get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Sub function code (2 byte) + Data (2 * N bytes)

rtu_frame_size: int = 8
sub_function_code: int = 9999
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

Implement dummy.

class pymodbus.pdu.diag_message.ForceListenOnlyModeRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ForceListenOnlyModeRequest.

sub_function_code: int = 4
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

update_datastore the diagnostic request on the given device.

class pymodbus.pdu.diag_message.ForceListenOnlyModeResponse(dev_id=1, transaction_id=0)

Bases: DiagnosticBase

ForceListenOnlyModeResponse.

This does not send a response

sub_function_code: int = 4
class pymodbus.pdu.diag_message.GetClearModbusPlusRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

GetClearModbusPlusRequest.

encode()

Encode a diagnostic response.

get_response_pdu_size()

Return size of the respaonse.

Func_code (1 byte) + Sub function code (2 byte) + Operation (2 byte) + Data (108 bytes)

sub_function_code: int = 21
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

update_datastore the diagnostic request on the given device.

class pymodbus.pdu.diag_message.GetClearModbusPlusResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

GetClearModbusPlusResponse.

sub_function_code: int = 21
class pymodbus.pdu.diag_message.RestartCommunicationsOptionRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

RestartCommunicationsOptionRequest.

sub_function_code: int = 1
class pymodbus.pdu.diag_message.RestartCommunicationsOptionResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

RestartCommunicationsOptionResponse.

sub_function_code: int = 1
class pymodbus.pdu.diag_message.ReturnBusCommunicationErrorCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnBusCommunicationErrorCountRequest.

sub_function_code: int = 12
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

update_datastore the diagnostic request on the given device.

class pymodbus.pdu.diag_message.ReturnBusCommunicationErrorCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnBusCommunicationErrorCountResponse.

sub_function_code: int = 12
class pymodbus.pdu.diag_message.ReturnBusExceptionErrorCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnBusExceptionErrorCountRequest.

sub_function_code: int = 13
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

update_datastore the diagnostic request on the given device.

class pymodbus.pdu.diag_message.ReturnBusExceptionErrorCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnBusExceptionErrorCountResponse.

sub_function_code: int = 13
class pymodbus.pdu.diag_message.ReturnBusMessageCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnBusMessageCountRequest.

sub_function_code: int = 11
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

update_datastore the diagnostic request on the given device.

class pymodbus.pdu.diag_message.ReturnBusMessageCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnBusMessageCountResponse.

sub_function_code: int = 11
class pymodbus.pdu.diag_message.ReturnDiagnosticRegisterRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnDiagnosticRegisterRequest.

sub_function_code: int = 2
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

update_datastore the diagnostic request on the given device.

class pymodbus.pdu.diag_message.ReturnDiagnosticRegisterResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnDiagnosticRegisterResponse.

sub_function_code: int = 2
class pymodbus.pdu.diag_message.ReturnIopOverrunCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnIopOverrunCountRequest.

sub_function_code: int = 19
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

update_datastore the diagnostic request on the given device.

class pymodbus.pdu.diag_message.ReturnIopOverrunCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnIopOverrunCountResponse.

sub_function_code: int = 19
class pymodbus.pdu.diag_message.ReturnQueryDataRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnQueryDataRequest.

sub_function_code: int = 0
class pymodbus.pdu.diag_message.ReturnQueryDataResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnQueryDataResponse.

sub_function_code: int = 0
class pymodbus.pdu.diag_message.ReturnSlaveBusCharacterOverrunCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnSlaveBusCharacterOverrunCountRequest.

sub_function_code: int = 18
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

update_datastore the diagnostic request on the given device.

class pymodbus.pdu.diag_message.ReturnSlaveBusCharacterOverrunCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnSlaveBusCharacterOverrunCountResponse.

sub_function_code: int = 18
class pymodbus.pdu.diag_message.ReturnSlaveBusyCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnSlaveBusyCountRequest.

sub_function_code: int = 17
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

update_datastore the diagnostic request on the given device.

class pymodbus.pdu.diag_message.ReturnSlaveBusyCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnSlaveBusyCountResponse.

sub_function_code: int = 17
class pymodbus.pdu.diag_message.ReturnSlaveMessageCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnSlaveMessageCountRequest.

sub_function_code: int = 14
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

update_datastore the diagnostic request on the given device.

class pymodbus.pdu.diag_message.ReturnSlaveMessageCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnSlaveMessageCountResponse.

sub_function_code: int = 14
class pymodbus.pdu.diag_message.ReturnSlaveNAKCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnSlaveNAKCountRequest.

sub_function_code: int = 16
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

update_datastore the diagnostic request on the given device.

class pymodbus.pdu.diag_message.ReturnSlaveNAKCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnSlaveNAKCountResponse.

sub_function_code: int = 16
class pymodbus.pdu.diag_message.ReturnSlaveNoResponseCountRequest(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnSlaveNoResponseCountRequest.

sub_function_code: int = 15
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

update_datastore the diagnostic request on the given device.

class pymodbus.pdu.diag_message.ReturnSlaveNoResponseCountResponse(message: bytes | int | list | tuple | None = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: DiagnosticBase

ReturnSlaveNoResponseCountResponse.

sub_function_code: int = 15

File Record Read/Write Messages.

class pymodbus.pdu.file_message.FileRecord(file_number: int = 0, record_number: int = 0, record_data: bytes = b'', record_length: int = 0)

Bases: object

Represents a file record and its relevant data.

file_number: int = 0
record_data: bytes = b''
record_length: int = 0
record_number: int = 0
class pymodbus.pdu.file_message.ReadFifoQueueRequest(address: int = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

ReadFifoQueueRequest.

decode(data: bytes) None

Decode the incoming request.

encode() bytes

Encode the request packet.

function_code: int = 24
rtu_frame_size: int = 6
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

Run a read exception status request against the store.

class pymodbus.pdu.file_message.ReadFifoQueueResponse(values: list[int] | None = None, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

ReadFifoQueueResponse.

classmethod calculateRtuFrameSize(buffer: bytes) int

Calculate the size of the message.

decode(data: bytes) None

Decode a the response.

encode() bytes

Encode the response.

function_code: int = 24
class pymodbus.pdu.file_message.ReadFileRecordRequest(records: list[FileRecord] | None = None, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

ReadFileRecordRequest.

decode(data: bytes) None

Decode the incoming request.

encode() bytes

Encode the request packet.

function_code: int = 20
get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Quantity of record (each 7 bytes),

rtu_byte_count_pos: int = 2
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

Run a read exception status request against the store.

class pymodbus.pdu.file_message.ReadFileRecordResponse(records: list[FileRecord] | None = None, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

ReadFileRecordResponse.

decode(data: bytes) None

Decode the response.

encode() bytes

Encode the response.

function_code: int = 20
rtu_byte_count_pos: int = 2
class pymodbus.pdu.file_message.WriteFileRecordRequest(records: list[FileRecord] | None = None, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

WriteFileRecordRequest.

decode(data: bytes) None

Decode the incoming request.

encode() bytes

Encode the request packet.

function_code: int = 21
get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Quantity of record (each 7 bytes),

rtu_byte_count_pos: int = 2
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

Run the write file record request against the context.

class pymodbus.pdu.file_message.WriteFileRecordResponse(records: list[FileRecord] | None = None, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

The normal response is an echo of the request.

decode(data: bytes) None

Decode the incoming request.

encode() bytes

Encode the response.

function_code: int = 21
rtu_byte_count_pos: int = 2

Encapsulated Interface (MEI) Transport Messages.

class pymodbus.pdu.mei_message.ReadDeviceInformationRequest(read_code: int | None = None, object_id: int = 0, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

ReadDeviceInformationRequest.

decode(data: bytes) None

Decode data part of the message.

encode() bytes

Encode the request packet.

function_code: int = 43
rtu_frame_size: int = 7
sub_function_code: int = 14
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

Run a read exception status request against the store.

class pymodbus.pdu.mei_message.ReadDeviceInformationResponse(read_code: int | None = None, information: dict | None = None, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

ReadDeviceInformationResponse.

classmethod calculateRtuFrameSize(buffer: bytes) int

Calculate the size of the message.

decode(data: bytes) None

Decode a the response.

encode() bytes

Encode the response.

function_code: int = 43
sub_function_code: int = 14

Diagnostic record read/write.

class pymodbus.pdu.other_message.GetCommEventCounterRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

GetCommEventCounterRequest.

decode(_data: bytes) None

Decode data part of the message.

encode() bytes

Encode the message.

function_code: int = 11
rtu_frame_size: int = 4
async update_datastore(_context) ModbusPDU

Run a read exception status request against the store.

class pymodbus.pdu.other_message.GetCommEventCounterResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

GetCommEventCounterRequest.

decode(data: bytes) None

Decode a the response.

encode() bytes

Encode the response.

function_code: int = 11
rtu_frame_size: int = 8
class pymodbus.pdu.other_message.GetCommEventLogRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

GetCommEventLogRequest.

decode(_data: bytes) None

Decode data part of the message.

encode() bytes

Encode the message.

function_code: int = 12
rtu_frame_size: int = 4
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

Run a read exception status request against the store.

class pymodbus.pdu.other_message.GetCommEventLogResponse(status: bool = True, message_count: int = 0, event_count: int = 0, events: list[int] | None = None, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

GetCommEventLogRequest.

decode(data: bytes) None

Decode a the response.

encode() bytes

Encode the response.

function_code: int = 12
rtu_byte_count_pos: int = 2
class pymodbus.pdu.other_message.ReadExceptionStatusRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

ReadExceptionStatusRequest.

decode(_data: bytes) None

Decode data part of the message.

encode() bytes

Encode the message.

function_code: int = 7
rtu_frame_size: int = 4
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

Run a read exception status request against the store.

class pymodbus.pdu.other_message.ReadExceptionStatusResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

ReadExceptionStatusResponse.

decode(data: bytes) None

Decode a the response.

encode() bytes

Encode the response.

function_code: int = 7
rtu_frame_size: int = 5
class pymodbus.pdu.other_message.ReportSlaveIdRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

ReportSlaveIdRequest.

decode(_data: bytes) None

Decode data part of the message.

encode() bytes

Encode the message.

function_code: int = 17
rtu_frame_size: int = 4
async update_datastore(_context: ModbusSlaveContext) ModbusPDU

Run a report slave id request against the store.

class pymodbus.pdu.other_message.ReportSlaveIdResponse(identifier: bytes = b'\x00', status: bool = True, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

ReportSlaveIdRequeste.

decode(data: bytes) None

Decode a the response.

Since the identifier is device dependent, we just return the raw value that a user can decode to whatever it should be.

encode() bytes

Encode the response.

function_code: int = 17
rtu_byte_count_pos: int = 2

Contains base classes for modbus request/response/error packets.

class pymodbus.pdu.pdu.ExceptionResponse(function_code: int, exception_code: int = 0, slave: int = 1, transaction: int = 0)

Bases: ModbusPDU

Base class for a modbus exception PDU.

ACKNOWLEDGE = 5
GATEWAY_NO_RESPONSE = 11
GATEWAY_PATH_UNAVIABLE = 10
ILLEGAL_ADDRESS = 2
ILLEGAL_FUNCTION = 1
ILLEGAL_VALUE = 3
MEMORY_PARITY_ERROR = 8
NEGATIVE_ACKNOWLEDGE = 7
SLAVE_BUSY = 6
SLAVE_FAILURE = 4
address: int
bits: list[bool]
count: int
decode(data: bytes) None

Decode a modbus exception response.

dev_id: int
encode() bytes

Encode a modbus exception response.

fut: Future
registers: list[int]
rtu_frame_size: int = 5
status: int
transaction_id: int
class pymodbus.pdu.pdu.ModbusPDU(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: object

Base class for all Modbus messages.

classmethod calculateRtuFrameSize(data: bytes) int

Calculate the size of a PDU.

abstract decode(data: bytes) None

Decode data part of the message.

abstract encode() bytes

Encode the message.

function_code: int = 0
get_response_pdu_size() int

Calculate response pdu size.

isError() bool

Check if the error is a success or failure.

rtu_byte_count_pos: int = 0
rtu_frame_size: int = 0
sub_function_code: int = -1
async update_datastore(context: ModbusSlaveContext) ModbusPDU

Run request against a datastore.

validateAddress(address: int = -1) None

Validate API supplied address.

validateCount(max_count: int, count: int = -1) None

Validate API supplied count.

Register Reading Request/Response.

class pymodbus.pdu.register_message.MaskWriteRegisterRequest(address=0, and_mask=65535, or_mask=0, dev_id=1, transaction_id=0)

Bases: ModbusPDU

MaskWriteRegisterRequest.

decode(data: bytes) None

Decode the incoming request.

encode() bytes

Encode the request packet.

function_code: int = 22
rtu_frame_size: int = 10
async update_datastore(context: ModbusSlaveContext) ModbusPDU

Run a mask write register request against the store.

class pymodbus.pdu.register_message.MaskWriteRegisterResponse(address=0, and_mask=65535, or_mask=0, dev_id=1, transaction_id=0)

Bases: ModbusPDU

MaskWriteRegisterResponse.

decode(data: bytes) None

Decode a the response.

encode() bytes

Encode the response.

function_code: int = 22
rtu_frame_size: int = 10
class pymodbus.pdu.register_message.ReadHoldingRegistersRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

ReadHoldingRegistersRequest.

decode(data: bytes) None

Decode a register request packet.

encode() bytes

Encode the request packet.

function_code: int = 3
get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Byte Count(1 byte) + 2 * Quantity of registers (== byte count).

rtu_frame_size: int = 8
async update_datastore(context: ModbusSlaveContext) ModbusPDU

Run a read holding request against a datastore.

class pymodbus.pdu.register_message.ReadHoldingRegistersResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

ReadHoldingRegistersResponse.

decode(data: bytes) None

Decode a register response packet.

encode() bytes

Encode the response packet.

function_code: int = 3
rtu_byte_count_pos: int = 2
class pymodbus.pdu.register_message.ReadInputRegistersRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ReadHoldingRegistersRequest

ReadInputRegistersRequest.

function_code: int = 4
class pymodbus.pdu.register_message.ReadInputRegistersResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ReadHoldingRegistersResponse

ReadInputRegistersResponse.

function_code: int = 4
class pymodbus.pdu.register_message.ReadWriteMultipleRegistersRequest(read_address: int = 0, read_count: int = 0, write_address: int = 0, write_registers: list[int] | None = None, dev_id: int = 1, transaction_id: int = 0)

Bases: ModbusPDU

ReadWriteMultipleRegistersRequest.

decode(data: bytes) None

Decode the register request packet.

encode() bytes

Encode the request packet.

function_code: int = 23
get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Byte Count(1 byte) + 2 * Quantity of Coils (n Bytes)

rtu_byte_count_pos: int = 10
async update_datastore(context: ModbusSlaveContext) ModbusPDU

Run a write single register request against a datastore.

class pymodbus.pdu.register_message.ReadWriteMultipleRegistersResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ReadHoldingRegistersResponse

ReadWriteMultipleRegistersResponse.

function_code: int = 23
class pymodbus.pdu.register_message.WriteMultipleRegistersRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

WriteMultipleRegistersRequest.

decode(data: bytes) None

Decode a write single register packet packet request.

encode() bytes

Encode a write single register packet packet request.

function_code: int = 16
get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Starting Address (2 byte) + Quantity of Registers (2 Bytes)

rtu_byte_count_pos: int = 6
async update_datastore(context: ModbusSlaveContext) ModbusPDU

Run a write single register request against a datastore.

class pymodbus.pdu.register_message.WriteMultipleRegistersResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

WriteMultipleRegistersResponse.

decode(data: bytes) None

Decode a write single register packet packet request.

encode() bytes

Encode a write single register packet packet request.

function_code: int = 16
rtu_frame_size: int = 8
class pymodbus.pdu.register_message.WriteSingleRegisterRequest(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: WriteSingleRegisterResponse

WriteSingleRegisterRequest.

get_response_pdu_size() int

Get response pdu size.

Func_code (1 byte) + Register Address(2 byte) + Register Value (2 bytes)

async update_datastore(context: ModbusSlaveContext) ModbusPDU

Run a write single register request against a datastore.

class pymodbus.pdu.register_message.WriteSingleRegisterResponse(dev_id: int = 0, transaction_id: int = 0, address: int = 0, count: int = 0, bits: list[bool] | None = None, registers: list[int] | None = None, status: int = 1)

Bases: ModbusPDU

WriteSingleRegisterResponse.

decode(data: bytes) None

Decode a write single register packet packet request.

encode() bytes

Encode a write single register packet packet request.

function_code: int = 6
rtu_frame_size: int = 8