Problemlösungen mit Python
Kommunikation uC - PC
Aufgabenstellung
Eine uC-Anwendung, die Sensorwerte ausliest, soll diese an einen PC weitersenden, wenn der uC per I/O dazu aufgefordert wird. Es muss also auf dem PC ein Programm an der seriellen Schnittstelle auf Daten vom uC warten, diese entgegennehmen und speichern.
Konkret ist also folgendes zu implementieren:
- Auf dem uC ist der Teil des uC-Programmes zu implementieren, der bei Schalten eines zu konfigurierenden DINPs Daten von Sensoren ausliest und diese gemäß eines zu definierenden Protokolls auf die serielle Schnittstelle schreibt.
- Auf dem PC ist eine Software zu implementieren, die Daten gemäß genanntem Protokoll von der Schnittstelle liest und die Nutzdaten entsprechend speichert.
Protokoll
Wir wählen folgendes Protokoll, das auf dem uC einfach zu implementieren sein sollte. Alle Daten werden binär übertragen, eine Wandlung nach ASCII, was den Aufruf von Funktionen wir sprintf() notwendig machen würde, ist also nicht nötig.
Wir werden versuchen, folgendes Protokoll zu implementieren:
0x16 SYN # SYNchronous idle
0x02 STX # Start of TeXt
0xXX LEN # Länge über alles in Bytes (also inklusive SYM und STX)
0xXX MSG # 0x01: Sensor 1; 0x02: Sensor 2; usw.
# 0x11: Taster 1 gedrückt; 0x12: Taster 2 gedrückt; usw.
0xXX Data Byte 1
:
:
:
0xXX Data Byte n
0xXX CRC Hi Byte # 2. Byte CRC-Code über Bytes von LEN bis und mit zum letzten Daten-Byte
0xXX CRC Lo Byte # 1. Byte CRC-Code über Bytes von LEN bis und mit zum letzten Daten-Byte
0x03 ETX # End Of TeXt
0x04 EOT # End Of Transmission
Entwurf
Auf dem uC wird der entsprechende Code in C implementiert werden. Die Software, die auf dem PC läuft und die Daten , die der uC sendet, entgegeben nimmt und speichert, wird in Python geschrieben. Das GUI-Binding steht noch nicht fest, in der engeren Auswagl befinden sich aber wxPython und PySide.
Um eine Vorstellung davon zu bekommen, was uns erwarten könnte, schreiben wir ein Konsolen-Programm, das wir auf dem Sender und dem Empfänger laufen lassen können. Als Empfänger dient wie auch bei der endgültigen Applikation ein PC, als Sender verwenden wir ebenfalls einen PC.
Message zum Senden
class MessageUC2PC:
"""Messages, die vom uC an den PC gesendet werden.
Wenn wir die Message b i n ä r senden, dann könnten wir folgendes Protokoll verwenden:
0x16 SYN # SYNchronous idle
0x02 STX # Start of TeXt
0xXX LEN # Länge über alles in Bytes (also inklusive SYM und STX)
0xXX MSG # 0x01: Sensor 1; 0x02: Sensor 2; usw.
# 0x11: Taster 1 gedrückt; 0x12: Taster 2 gedrückt; usw.
# m.a.W: Bedeutung der Daten, die gesendet werden.
0xXX Data Byte 1
:
:
:
0xXX Data Byte n
0xXX CRC Hi Byte # 2. Byte CRC-Code über Bytes von LEN bis und mit zum letzten Daten-Byte
0xXX CRC Lo Byte # 1. Byte CRC-Code über Bytes von LEN bis und mit zum letzten Daten-Byte
0x03 ETX # End Of TeXt
0x04 EOT # End Of Transmission
"""
_SYN = 0x16
_STX = 0x02
_ETX = 0x03
_EOT = 0x04
_MSG_SENSOR_1 = 1
_MSG_SENSOR_2 = 2
_MSG_SENSOR_3 = 3
_MSG_SENSOR_4 = 4
_MSG_SENSOR_5 = 5
_MSG_KEY_1 = 0x11
_MSG_KEY_2 = 0x12
_MSG_KEY_3 = 0x13
@staticmethod
def Data_bytes_to_int16( bytes):
"""Bytes zu einem Winkelwert zusammenbauen.
MSB links, LSB rechts, oder anders rum: Int16_to_data_bytes()[0] ist
das MSB, Int16_to_data_bytes()[-1] das LSB ("MATLAB-Reihenfolge").
"""
angle = bytes[0] << 8
angle |= bytes[1]
return angle
@staticmethod
def Int16_to_data_bytes( angle):
"""Winkel in Bytes zerlegen.
MSB links, LSB rechts, oder anders rum: Int16_to_data_bytes()[0] ist
das MSB, Int16_to_data_bytes()[-1] das LSB ("MATLAB-Reihenfolge").
"""
bytes = [(angle >> 8) & 0xFF, angle & 0xFF]
return bytes
def __init__( self, msg_nbr, data_bytes):
"""Ctor.
"""
self._msg_nbr = msg_nbr
self._data_bytes = data_bytes
self._len = 0
return
def as_displayable_binary_message( self):
"""Message zur Anzeige auf Bildschirm aufbereiten.
Da die Message Steuerzeichen enthält, kann die sendetaugliche Message
nicht direkt angezeigt werden.
"""
return map( str, self.as_sendable_binary_message())
def as_sendable_binary_message( self):
"""Message zum Senden aufbereiten.
USAGE:
serial.write( msg.as_sendable_binary_message())
"""
self._len = 2 + 1 + 1 + len( self._data_bytes) + 2 + 2
# ^ ^ ^ ^ ^ ^
# | | | | | |
# | | | | | ETX + EOT
# | | | Daten CRC 1 + CRC 2
# | | Message-Nr
# | Message-Länge
# SYN + STX
#
net_data_bytes = [ self._len
, self._msg_nbr
] \
+ self._data_bytes
msg_bytes = [ chr( self._SYN)
, chr( self._STX)
, chr( self._len)
, chr( self._msg_nbr)
] \
+ map( chr, self._data_bytes) \
+ [ chr( self._crc_hi_( net_data_bytes))
, chr( self._crc_lo_( net_data_bytes))
, chr( self._ETX)
, chr( self._EOT)
]
msg = ''.join( msg_bytes)
return msg
def _crc16_( self, data_bytes):
"""2DO, muss also noch implementiert werden. """
return 0 # 2DO
def _crc_hi_( self, data_bytes):
"""Hi-Byte des CRC. 2DO, muss also noch implementiert werden. """
return self._crc16_( data_bytes) & 0xFF
def _crc_lo_( self, data_bytes):
"""Lo-Byte des CRC. """
return self._crc16_( data_bytes) & 0xFF
def data( self):
"""Zugriff auf ausgesuchte Daten der Message: Nutzdaten.
"""
return self.Data_bytes_to_int16( self._data_bytes)
def msg_nbr( self):
"""Zugriff auf ausgesuchte Daten der Message: Nummer der Message.
"""
return self._msg_nbr
Debugging
Die folgende Methode erlaubt es uns, ganz einfach den Namen der Methode zu ermitteln, in der sie aufgerufen wird.
def WhoAmI( self=None, pretty=0):
"""Debugging: Returns the name of the function or method this function is called in.
PARAMS:
self:
Instance of class of calling method.
Used to document the class the method belongs to.
"""
if self is not None:
name = "%s::%s" % (self.__class__.__name__, sys._getframe(1).f_code.co_name)
else:
name = sys._getframe(1).f_code.co_name
if pretty:
name += "()"
return name
Sende-Teil
def send_12345():
"""Funktion zum Sende (Aufruf bei 's').
Diese Funktion stellt den Teil dar, der auf dem uC zu schreiben ist, in
C vermutlich.
"""
com_nbr = 0
e = None
# Ein Message-Objekt erzeugen, das an Nutzdaten einfach einen
# Integer = 12345 hält.
#
msg = MessageUC2PC( MessageUC2PC._MSG_SENSOR_1, MessageUC2PC.Int16_to_data_bytes( 12345))
for com_nbr in (0, 1, 2):
try:
# Schnittstelle öffnen
#
com = serial.Serial( "/dev/ttyUSB%d" % com_nbr)
# Message auf Schnittstelle schreiben
#
com.write( msg.as_sendable_binary_message())
return
except (serial.serialutil.portNotOpenError, serial.serialutil.SerialException), e:
pass
finally:
# Schnittstelle wieder schließen. 'finally' wird auf jeden Fall
# aufgerufen, auch bei Auftreten einer Exception. So stellen
# wir sicher, dass die Schnittstelle sicher geschlossen wird.
#
com.close()
# Wenn eine Exception aufgetreten ist, werfen wir sie erneut (wir
# hatten sie ja abgefangen per try-except-finally), damit
# das Hauptprogramm entsprechend reagieren kann.
#
if e:
raise e
return
# Hier beginnt das eigentliche Hauptprogramm.
#
# Wir stzen zuerst das Menü auf, das wir zur Auswahl anbieten wollen.
#
menu_line_tuples = { "x": ("Exit", sys.exit)
, "r": ("Receive endlessly (exit by CTRL-C)", receive)
, "s": ("Senden des Wertes 12345", send_12345)
}
max_len_menu_line_text = 0
while True:
# Menü ausgeben
#
print
choices = menu_line_tuples.keys()
choices.sort()
for choice in choices:
text = menu_line_tuples[choice][0]
menu_line_text = "%s...%s" % (choice, text)
max_len_menu_line_text = max( max_len_menu_line_text, len( menu_line_text))
print menu_line_text
print "-" * max_len_menu_line_text
# Auf User warten
choice = raw_input( "> ")
# Machen, was der User gewählt hat
#
try:
text, function = menu_line_tuples[choice]
print "Your choice has been '%s: %s'" % (choice, text)
time.sleep( 0.5)
# function kann jetzt receive oder send_12345 sein, je nachdem,
# was der User gedrückt hat.
function()
except KeyError:
pass
except (serial.serialutil.portNotOpenError, serial.serialutil.SerialException), e:
# Wenn send_12345() oder receive() eine Probleme hatte, dann geben
# wir den Grund hier aus (= die Exception, die erneut geworfen
# worden ist - Sie erinnern sich?)
#
print e
time.sleep( 0.5)
return
Empfangsteil
def receive():
"""Funktion wird gerufen, wenn User ein 'r' drückt.
Dieser Teil stellt den Empfangsteil auf dem PC dar und kann auch in
MATLAB realisiert werden, oder in Java, oder C, oder...
Diese Funktion ist innerhalb der Funktion Main() (Hauptprogramm) definiert.
Das muss natürlich nicht so sein, man hätte sie genauso gut außerhalb
def'en können.
"""
com_nbr = 0
e = None
for com_nbr in (0, 1, 2):
try:
# Schnittstelle öffnen. Wir nehmen die erste, die keine Exception mehr verursacht,
#
com = serial.Serial( "/dev/ttyUSB%d" % com_nbr, timeout=0.5)
# Jetzt lassen wir die State-Machine fürs Empfangen laufen und
# zwar so lange, bis sie per rsm.is_finished() anzeigt, dass
# sie ein Datenpaket empfangen hat. Übergabe-Parameter für die
# SM ist der Port, an dem sie auf Daten warten muss.
#
rsm = ReceiverSM( com)
while not rsm.is_finished():
rsm.run()
# Wir geben hier mal die Daten und nur die Daten aus. Wenn
# man uns ein 12345 geschickt hat, dann muss uns msg.data()
# das auch liefern. M.a.W.: msg.data() stückelt die einzelnen
# Daten-Bytes wieder zu einem Integer zusammen.
#
msg = rsm.msg()
print "Winkel, den uC uns gesendet hat = %d. " % msg.data()
return
except (serial.serialutil.portNotOpenError, serial.serialutil.SerialException), e:
print e
finally:
# Schnittstelle wieder schließen. 'finally' wird auf jeden Fall
# aufgerufen, auch bei Auftreten einer Exception. So stellen
# wir sicher, dass die Schnittstelle sicher geschlossen wird.
#
com.close()
if e:
raise e
return
Gesamtprogramm
Das gesamte Programm ist also
# -*- coding: utf8
import sys
import serial
import time
def WhoAmI( self=None, pretty=0):
"""Debugging: Returns the name of the function or method this function is called in.
PARAMS:
self:
Instance of class of calling method.
Used to document the class the method belongs to.
"""
if self is not None:
name = "%s::%s" % (self.__class__.__name__, sys._getframe(1).f_code.co_name)
else:
name = sys._getframe(1).f_code.co_name
if pretty:
name += "()"
return name
class MessageUC2PC:
"""Messages, die vom uC an den PC gesendet werden.
Wenn wir die Message b i n ä r senden, dann könnten wir folgendes Protokoll verwenden:
0x16 SYN # SYNchronous idle
0x02 STX # Start of TeXt
0xXX LEN # Länge über alles in Bytes (also inklusive SYM und STX)
0xXX MSG # 0x01: Sensor 1; 0x02: Sensor 2; usw.
# 0x11: Taster 1 gedrückt; 0x12: Taster 2 gedrückt; usw.
# m.a.W: Bedeutung der Daten, die gesendet werden.
0xXX Data Byte 1
:
:
:
0xXX Data Byte n
0xXX CRC Hi Byte # 2. Byte CRC-Code über Bytes von LEN bis und mit zum letzten Daten-Byte
0xXX CRC Lo Byte # 1. Byte CRC-Code über Bytes von LEN bis und mit zum letzten Daten-Byte
0x03 ETX # End Of TeXt
0x04 EOT # End Of Transmission
"""
_SYN = 0x16
_STX = 0x02
_ETX = 0x03
_EOT = 0x04
_MSG_SENSOR_1 = 1
_MSG_SENSOR_2 = 2
_MSG_SENSOR_3 = 3
_MSG_SENSOR_4 = 4
_MSG_SENSOR_5 = 5
_MSG_KEY_1 = 0x11
_MSG_KEY_2 = 0x12
_MSG_KEY_3 = 0x13
@staticmethod
def Data_bytes_to_int16( bytes):
"""Bytes zu einem Winkelwert zusammenbauen.
MSB links, LSB rechts, oder anders rum: Int16_to_data_bytes()[0] ist
das MSB, Int16_to_data_bytes()[-1] das LSB ("MATLAB-Reihenfolge").
"""
angle = bytes[0] << 8
angle |= bytes[1]
return angle
@staticmethod
def Int16_to_data_bytes( angle):
"""Winkel in Bytes zerlegen.
MSB links, LSB rechts, oder anders rum: Int16_to_data_bytes()[0] ist
das MSB, Int16_to_data_bytes()[-1] das LSB ("MATLAB-Reihenfolge").
"""
bytes = [(angle >> 8) & 0xFF, angle & 0xFF]
return bytes
def __init__( self, msg_nbr, data_bytes):
"""Ctor.
"""
self._msg_nbr = msg_nbr
self._data_bytes = data_bytes
self._len = 0
return
def as_displayable_binary_message( self):
"""Message zur Anzeige auf Bildschirm aufbereiten.
Da die Message Steuerzeichen enthält, kann die sendetaugliche Message
nicht direkt angezeigt werden.
"""
return map( str, self.as_sendable_binary_message())
def as_sendable_binary_message( self):
"""Message zum Senden aufbereiten.
USAGE:
serial.write( msg.as_sendable_binary_message())
"""
self._len = 2 + 1 + 1 + len( self._data_bytes) + 2 + 2
# ^ ^ ^ ^ ^ ^
# | | | | | |
# | | | | | ETX + EOT
# | | | Daten CRC 1 + CRC 2
# | | Message-Nr
# | Message-Länge
# SYN + STX
#
net_data_bytes = [ self._len
, self._msg_nbr
] \
+ self._data_bytes
msg_bytes = [ chr( self._SYN)
, chr( self._STX)
, chr( self._len)
, chr( self._msg_nbr)
] \
+ map( chr, self._data_bytes) \
+ [ chr( self._crc_hi_( net_data_bytes))
, chr( self._crc_lo_( net_data_bytes))
, chr( self._ETX)
, chr( self._EOT)
]
msg = ''.join( msg_bytes)
return msg
def _crc16_( self, data_bytes):
"""2DO, muss also noch implementiert werden. """
return 0 # 2DO
def _crc_hi_( self, data_bytes):
"""Hi-Byte des CRC. 2DO, muss also noch implementiert werden. """
return self._crc16_( data_bytes) & 0xFF
def _crc_lo_( self, data_bytes):
"""Lo-Byte des CRC. """
return self._crc16_( data_bytes) & 0xFF
def data( self):
"""Zugriff auf ausgesuchte Daten der Message: Nutzdaten.
"""
return self.Data_bytes_to_int16( self._data_bytes)
def msg_nbr( self):
"""Zugriff auf ausgesuchte Daten der Message: Nummer der Message.
"""
return self._msg_nbr
class ReceiverSM:
"""State Machine, die auf dem Empfänger (PC) läuft.
"""
def __init__( self, com_port):
"""Ctor.
"""
self._com_port = com_port
self._state_method_ = self._S__IDLE_
self._message_len = 0
self._message_bytes = []
self._message_byte = None
self._message = None
self._data_bytes = []
self._is_finished = False
return
def is_finished( self):
"""True, sobald ein gültiges Datenpaket erkannt worden ist.
"""
return self._is_finished
def msg( self):
"""Message-Objekt, das das empfangene Datenpaket repräsentiert.
"""
return self._message
def reset( self):
"""SM zurücksetzen für neuerlichen versuch, ein gültiges Datenpaket zu empfangen (z.B. nach Auftreten eines Timeouts).
"""
self._message_bytes = []
self._data_bytes = []
self._is_finished = False
self._state_method_ = self._S__IDLE_
return
def run( self):
"""Haupt-Methode.
USAGE:
while not sm.is_finished():
sm.run()
msg = sm.message()
"""
self._is_finished = False
while not self._is_finished:
byte = self._com_port.read()
# Vom geöffneten COM-Port lesen
if not byte:
# Timeout!
self.reset()
# Alles zurück an den Start für neuerlichen Versuch
continue
self._state_method_( ord( byte))
# Aktuellen State mit empfangenen Daten versorgen.
# Der State entscheidet dann, wie's weitergeht.
return
def _S__IDLE_( self, byte):
"""Warten auf SYN.
"""
print WhoAmI( self)
if byte == MessageUC2PC._SYN:
self.reset()
self._message_bytes = [byte]
self._state_method_ = self._S__GOT_SYN__AWAIT_STX_
# Weiter mit Warten auf STX
return
def _S__GOT_SYN__AWAIT_STX_( self, byte):
"""Warten auf STX.
"""
print WhoAmI( self)
if byte == MessageUC2PC._STX:
self._message_bytes.append( byte)
self._state_method_ = self._S__GOT_STX__AWAIT_LEN_
# Weiter mit Warten auf LEN
else:
self._state_method_ = self._S__IDLE_
# Falsches Byte an dieser Stelle, also Abbruch
return
def _S__GOT_STX__AWAIT_LEN_( self, byte):
"""Warten auf LEN.
"""
print WhoAmI( self)
self._message_len = byte
self._message_bytes.append( byte)
self._state_method_ = self._S__GOT_LEN__AWAIT_MSG_
# Weiter mit Warten auf MSG
return
def _S__GOT_LEN__AWAIT_MSG_( self, byte):
"""Warten auf MSG.
"""
print WhoAmI( self)
self._message_byte = byte
self._message_bytes.append( byte)
self._data_bytes = []
self._state_method_ = self._S__GOT_MSG__AWAIT_DATA_
# Weiter mit Warten auf Daten
return
def _S__GOT_MSG__AWAIT_DATA_( self, byte):
"""Daten empfangen.
"""
print WhoAmI( self)
self._message_bytes.append( byte)
self._data_bytes.append( byte)
if len( self._message_bytes) == self._message_len - 4:
self._state_method_ = self._S__GOT_DATA__AWAIT_CRC_HI_
# Alle Daten empfangen, weiter mit Warten auf CRC
return
def _S__GOT_DATA__AWAIT_CRC_HI_( self, byte):
"""Warten auf CRC.
"""
print WhoAmI( self)
self._message_bytes.append( byte)
self._state_method_ = self._S__GOT_CRC_HI__AWAIT_CRC_LO_
return
def _S__GOT_CRC_HI__AWAIT_CRC_LO_( self, byte):
"""Warten auf CRC.
"""
print WhoAmI( self)
self._message_bytes.append( byte)
self._state_method_ = self._S__GOT_CRC_LO__AWAIT_ETX_
# Weiter mit Warten auf ETX
return
def _S__GOT_CRC_LO__AWAIT_ETX_( self, byte):
"""Warten auf ETX.
"""
print WhoAmI( self)
if byte == MessageUC2PC._ETX:
self._message_bytes.append( byte)
self._state_method_ = self._S__GOT_ETX__AWAIT_EOT_
# Weiter mit Warten auf EOT
else:
self._state_method_ = self._S__IDLE_
# Falsches Byte an dieser Stelle, also Abbruch
return
def _S__GOT_ETX__AWAIT_EOT_( self, byte):
"""Warten auf EOT.
"""
print WhoAmI( self)
if byte == MessageUC2PC._EOT:
self._message_bytes.append( byte)
self._message = MessageUC2PC( self._message_byte, self._data_bytes)
print map( str, self._message.as_displayable_binary_message())
self._is_finished = True
self._state_method_ = self._S__IDLE_
else:
self._state_method_ = self._S__IDLE_
# Falsches Byte an dieser Stelle, also Abbruch
return
def Main():
"""Hauptptogramm.
"""
def send_12345():
"""Funktion zum Sende (Aufruf bei 's').
Diese Funktion stellt den Teil dar, der auf dem uC zu schreiben ist, in
C vermutlich.
"""
com_nbr = 0
e = None
# Ein Message-Objekt erzeugen, das an Nutzdaten einfach einen
# Integer = 12345 hält.
#
msg = MessageUC2PC( MessageUC2PC._MSG_SENSOR_1, MessageUC2PC.Int16_to_data_bytes( 12345))
for com_nbr in (0, 1, 2):
try:
# Schnittstelle öffnen
#
com = serial.Serial( "/dev/ttyUSB%d" % com_nbr)
# Message auf Schnittstelle schreiben
#
com.write( msg.as_sendable_binary_message())
return
except (serial.serialutil.portNotOpenError, serial.serialutil.SerialException), e:
pass
finally:
# Schnittstelle wieder schließen. 'finally' wird auf jeden Fall
# aufgerufen, auch bei Auftreten einer Exception. So stellen
# wir sicher, dass die Schnittstelle sicher geschlossen wird.
#
com.close()
# Wenn eine Exception aufgetreten ist, werfen wir sie erneut (wir
# hatten sie ja abgefangen per try-except-finally), damit
# das Hauptprogramm entsprechend reagieren kann.
#
if e:
raise e
return
# Hier beginnt das eigentliche Hauptprogramm.
#
# Wir stzen zuerst das Menü auf, das wir zur Auswahl anbieten wollen.
#
menu_line_tuples = { "x": ("Exit", sys.exit)
, "r": ("Receive endlessly (exit by CTRL-C)", receive)
, "s": ("Senden des Wertes 12345", send_12345)
}
max_len_menu_line_text = 0
while True:
# Menü ausgeben
#
print
choices = menu_line_tuples.keys()
choices.sort()
for choice in choices:
text = menu_line_tuples[choice][0]
menu_line_text = "%s...%s" % (choice, text)
max_len_menu_line_text = max( max_len_menu_line_text, len( menu_line_text))
print menu_line_text
print "-" * max_len_menu_line_text
# Auf User warten
choice = raw_input( "> ")
# Machen, was der User gewählt hat
#
try:
text, function = menu_line_tuples[choice]
print "Your choice has been '%s: %s'" % (choice, text)
time.sleep( 0.5)
# function kann jetzt receive oder send_12345 sein, je nachdem,
# was der User gedrückt hat.
function()
except KeyError:
pass
except (serial.serialutil.portNotOpenError, serial.serialutil.SerialException), e:
# Wenn send_12345() oder receive() eine Probleme hatte, dann geben
# wir den Grund hier aus (= die Exception, die erneut geworfen
# worden ist - Sie erinnern sich?)
#
print e
time.sleep( 0.5)
return
def receive():
"""Funktion wird gerufen, wenn User ein 'r' drückt.
Dieser Teil stellt den Empfangsteil auf dem PC dar und kann auch in
MATLAB realisiert werden, oder in Java, oder C, oder...
Diese Funktion ist innerhalb der Funktion Main() (Hauptprogramm) definiert.
Das muss natürlich nicht so sein, man hätte sie genauso gut außerhalb
def'en können.
"""
com_nbr = 0
e = None
for com_nbr in (0, 1, 2):
try:
# Schnittstelle öffnen. Wir nehmen die erste, die keine Exception mehr verursacht,
#
com = serial.Serial( "/dev/ttyUSB%d" % com_nbr, timeout=0.5)
# Jetzt lassen wir die State-Machine fürs Empfangen laufen und
# zwar so lange, bis sie per rsm.is_finished() anzeigt, dass
# sie ein Datenpaket empfangen hat. Übergabe-Parameter für die
# SM ist der Port, an dem sie auf Daten warten muss.
#
rsm = ReceiverSM( com)
while not rsm.is_finished():
rsm.run()
# Wir geben hier mal die Daten und nur die Daten aus. Wenn
# man uns ein 12345 geschickt hat, dann muss uns msg.data()
# das auch liefern. M.a.W.: msg.data() stückelt die einzelnen
# Daten-Bytes wieder zu einem Integer zusammen.
#
msg = rsm.msg()
print "Winkel, den uC uns gesendet hat = %d. " % msg.data()
return
except (serial.serialutil.portNotOpenError, serial.serialutil.SerialException), e:
print e
finally:
# Schnittstelle wieder schließen. 'finally' wird auf jeden Fall
# aufgerufen, auch bei Auftreten einer Exception. So stellen
# wir sicher, dass die Schnittstelle sicher geschlossen wird.
#
com.close()
if e:
raise e
return
if __name__ == "__main__":
Main()
raw_input( "Press any key to exit...")
Zugriff auf Hardware von SIGMATEK
Folgende Methode ermöglicht den lesenden Zugriff auf eine Server-Variable auf einem CPC der Fa. SIGMATEK GmbH & Co KG. Die Adresse der Variable auf dem CPC wird gespeichert, was den Zugriff erheblich beschleunigt.
def cpc_DINT_svr( self, obj_name, svr_name):
"""Reads a server variable of type DINT from a SIGMATEK CPC.
.. note::
Object addresses are remembered to speed up consecutive accesses!
"""
# Get at the objects address
addr = C.byref
value = None
_svr_address = C.c_ulong( 0)
_value = C.c_long( 0)
path_name = "%s.%s" % (obj_name, svr_name)
try:
_obj_address = self._obj_address_by_obj_name[path_name]
# Try to remember the address of this
# server variable.
success = True
except KeyError:
# Couldn't remember server variable.
_obj_name = C.create_string_buffer( path_name)
_obj_address = C.c_ulong( 0)
_mode = C.c_ulong( 0)
_class_name = C.create_string_buffer( "", 255)
success = self._dll is not None and self._dll.LslGetObject( _obj_name, addr( _obj_address), addr( _mode), _class_name)
# Get the address of this server variable.
# 10 ms runtime needed for this!
if success:
self._obj_address_by_obj_name[path_name] = _obj_address
# Remember the address of this server
# variable.
# Read-access the object
if success:
if _obj_address.value:
_svr_address = C.c_long( _obj_address.value)
success = self._dll.LslReadFromSvr( _svr_address, addr( _value))
# 10 ms runtime needed for this!
if success:
value = int( _value.value)
return value
Folgende Methode ermöglicht den schreibenden Zugriff auf eine Server-Variable auf einem CPC der Fa. SIGMATEK GmbH & Co KG. Die Adresse der Variable auf dem CPC wird gespeichert, was den Zugriff erheblich beschleunigt.
def cpc_DINT_svr__set( self, obj_name, svr_name, value):
"""Writes a server variable of type DINT to a SIGMATEK CPC.
.. note::
Object addresses are remembered to speed up consecutive accesses!
"""
# Get at the objects address
addr = C.byref
_svr_address = C.c_ulong( 0)
_value = C.c_long( value)
path_name = "%s.%s" % (obj_name, svr_name)
try:
_obj_address = self._obj_address_by_obj_name[path_name]
# Try to remember the address of this
# server variable.
success = True
except KeyError:
# Couldn't remember server variable.
_obj_name = C.create_string_buffer( path_name)
_obj_address = C.c_ulong( 0)
_mode = C.c_ulong( 0)
_class_name = C.create_string_buffer( "", 255)
success = self._dll is not None and self._dll.LslGetObject( _obj_name, addr( _obj_address), addr( _mode), _class_name)
# Get the address of this server variable.
# 10 ms runtime needed for this!
if success:
self._obj_address_by_obj_name[path_name] = _obj_address
# Remember the address of this server
# variable.
# Write-access the object
if success:
if _obj_address.value:
_svr_address = C.c_long( _obj_address.value)
success = self._dll.LslWriteToSvr( _svr_address, _value)
# 10 ms runtime needed for this!
return success
Der Zugriff auf Client-Variablen erfolgt entsprechend.