Documentation Home Page ◇ 4DV-SIM Home Page
Pour la documentation en FRANÇAIS, utilisez l'outil de traduction de votre navigateur Chrome, Edge ou Safari. Voir un exemple.
EXAMPLE : CONNECT 4DV-SIM and external Python application with ProtocolBinary
Page Contents
This example demonstrates how to receive and send messages using the 4DV-SIM simulator, via its ProtocolBinary and the TCP medium.
Binary Message Structure
Header (12 bytes)
The header of each message sent from the simulator interface contains a set of information that follows the same structure as the message body described below.
These fields are located just after the TimeStamps and before the message body:
Day
Frequency
Hour
Latency
Minute
Month
Second
Temperature
TimeSync
Year
Payload
The payload consists of several messages with the following structure:
4DV-EDITOR
Add the interface to the logical view, then make the connections as shown below.
In this example, we send a ramp signal that the Python script can read and use to return a value to the simulator (temperature). Using the Debug block, we can visualise the temperature value.
Media: TCP
Protocol: ProtocolBinary
Connexions |
Connect the output ‘s' of the Ramp block to the 'io0’ input of the ProtocolBinary block: Ramp.s → TCP.io0 |
INTERFACE Parameters | |
The simulator is configured to transmit data to IP address 127.0.0.1 on port 9000. | |
Ramp Parameters | |
The Slope parameter is set to 2 to better visualise the ramp progression. A value of two per second will clearly show the difference, which should theoretically be twice the TimeStamp. | |
Python Script
The script below implements a TCP server that:
Receives binary messages from the simulator
Decodes them (according to the described structure)
Displays their content
Extracts values if needed
Sends a simulated value (
io0) back to the simulator via the binary protocol
Dependencies
No external libraries are required. The script uses only standard Python modules:
import socket
import struct
import sysBinary Type Mapping
This dictionary is used to decode the data types used in the 4DV-SIM binary protocol.
# ============================================
# Binary Protocol Type Mapping (4DV-SIM)
# ============================================
BASE_TYPE_MAP = {
0x2: ('q', 8), # int64
0x4: ('i', 4), # int32
0x8: ('h', 2), # int16
0x10: ('b', 1), # int8
0x20: ('Q', 8), # uint64
0x40: ('I', 4), # uint32
0x80: ('H', 2), # uint16
0x100: ('B', 1), # uint8
0x200: ('d', 8), # double
0x400: ('f', 4), # float
}
BUFFER_FLAG = 0x1 # Indicates if the data is an arrayUtility Functions
Read a terminated string \0
Reads a string terminated by a null byte (\x00) from a binary stream:
# ============================================
# Utility Functions
# ============================================
def read_null_terminated_string(data, offset):
"""Reads a null-terminated string from the byte buffer."""
end = data.find(b'\x00', offset)
return data[offset:end].decode(), end + 1Decode a Binary Message
Decodes a message from the binary stream. Returns a dictionary containing:
name: field name
number: number of values
type: data type (hex)
values: tuple of values
def parse_message(data, offset):
"""Parses a single message from the binary payload."""
name, offset = read_null_terminated_string(data, offset)
number = struct.unpack_from('<I', data, offset)[0]
offset += 4
dtype = struct.unpack_from('<H', data, offset)[0]
offset += 2
is_buffer = dtype & BUFFER_FLAG
base_type = dtype & ~BUFFER_FLAG
if base_type not in BASE_TYPE_MAP:
raise ValueError(f"Unknown base type: {hex(base_type)}")
fmt, size = BASE_TYPE_MAP[base_type]
count = number if is_buffer else 1
total_size = count * size
values = struct.unpack_from(f'<{count}{fmt}', data, offset)
offset += total_size
return {
'name': name,
'number': count,
'type': hex(dtype),
'values': values
}, offsetSend a Message to the Client
Builds and sends a binary message containing a double value (type 0x200) named io0.
def send_message(conn, data):
"""Sends a response message with a data using BinaryProtocol."""
name = b'io0\x00'
number = 1
dtype = 0x200 # double
value = struct.pack('<d', data)
message = name + struct.pack('<I', number) + struct.pack('<H', dtype) + value
total_size = len(message) + 8
header = struct.pack('<I', total_size) + struct.pack('<d', data)
conn.sendall(header + message)Processing Received Messages
This function:
Receives a 12-byte header (size + timestamp)
Reads the binary payload
Decodes each message using
parse_messageDisplays the extracted values
Replies with a message containing the temperature
The extracted messages are stored in a dictionary named extracted for later use.
# ============================================
# Message Handling
# ============================================
def get_value(extracted, name, default=None):
val = extracted.get(name)
if isinstance(val, tuple) and len(val) > 0:
return val[0]
return default
def handle_client(conn, addr):
"""Handles a client connection: receives, parses, and responds."""
print(f"Connected by {addr}")
try:
while True:
header = conn.recv(12)
if not header:
break
size, timestamp = struct.unpack('<Id', header)
payload = conn.recv(size - 8)
offset = 0
print(f"\nTimestamp: {timestamp:.6f}s")
# Dictionary to store extracted values
extracted = {}
while offset < len(payload):
msg, offset = parse_message(payload, offset)
print(f" {msg['name']} ({msg['number']} values, type {msg['type']}): {msg['values']}")
extracted[msg['name']] = msg['values']
# Example: extract data
temp = get_value(extracted, 'Temperature', default=None) # 26.76
send_message(conn, temp)
except (ConnectionResetError, BrokenPipeError):
print(f"Client {addr} disconnected.")
finally:
conn.close()Starting the Server
Creates a TCP server that listens for incoming connections and calls handle_client.
# ============================================
# Server Setup
# ============================================
def start_server(host='127.0.0.1', port=9000):
"""Starts the TCP server and listens for incoming connections."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((host, port))
s.listen()
print(f"Listening on {host}:{port}")
try:
while True:
conn, addr = s.accept()
handle_client(conn, addr)
except KeyboardInterrupt:
print("\nServer shutting down gracefully.")
sys.exit(0)
# ============================================
# Entry Point
# ============================================
if __name__ == "__main__":
start_server()Terminal’s log
Here is an example of data received and decoded by this script:
Timestamp: 36.871000
Day (1 values, type 0x4): (10,)
Frequency (1 values, type 0x200): (0.0,)
Hour (1 values, type 0x4): (12,)
Latency (1 values, type 0x200): (0.0,)
Minute (1 values, type 0x4): (0,)
Month (1 values, type 0x4): (6,)
Second (1 values, type 0x200): (3.779,)
Temperature (1 values, type 0x200): (26.761331491894538,)
TimeSync (4 values, type 0x11): (116, 114, 117, 101)
Year (1 values, type 0x4): (2025,)
io0 (1 values, type 0x200): (71.74000000000001,)