UDPReceiver
The UDPReceiver DataSource can be used to receive application data over UDP.
Typical use cases include subscribing to data from a remote application. This data can be used for monitoring or control purposes. In the latter case, the data is used to synchronise the RealTimeThread.
In this section, the LinuxTimer will be replaced by a UDPReceiver and the RealTimeThread will execute every time a UDP packet arrives. This way, the ReferencePosition signal will be synchronised with the arrival of UDP packets, which are generated by the sender application.
The sender is the application developed in the previous section, which streams application statistics and the ReferencePosition over UDP. The configuration file for the sender is ../Configurations/MassSpring/RTApp-MassSpring-24-Sender.cfg.
TUTORIAL_UDP_PORT_1, which is replaced by Makefile.cfg and must match the port number used by the sender application. 1 +UDPReceiver = {
2 Class = UDP::UDPReceiver
3 Port = TUTORIAL_UDP_PORT_1
4 Signals = {
5 Counter = {
6 Type = uint32
7 }
8 Time = {
9 Type = uint32
10 }
11 ReferencePosition = {
12 Type = float64
13 }
14 }
15 }
GAMTimer replaced with GAMUDPReceiver. The Frequency parameter still needs to be set to a non-zero value, but the actual pace of the RealTimeThread will be determined by the arrival of UDP packets. 1 +GAMUDPReceiver = {
2 Class = IOGAM
3 InputSignals = {
4 Counter = {
5 DataSource = UDPReceiver
6 Type = uint32
7 }
8 Time = {
9 Frequency = 100
10 DataSource = UDPReceiver
11 Type = uint32
12 }
13 ReferencePosition = {
14 DataSource = UDPReceiver
15 Type = float64
16 }
17 }
18 OutputSignals = {
19 Counter = {
20 DataSource = DDB1
21 Type = uint32
22 }
23 Time = {
24 DataSource = DDB1
25 Type = uint32
26 }
27 ReferencePosition = {
28 DataSource = DDB1
29 Type = float64
30 }
31 }
32 }
Running the application
Start the sender application with:
make -C ../Configurations/MassSpring/ -f Makefile.cfg
./MARTeApp.sh -f ../Configurations/MassSpring/RTApp-MassSpring-24-Sender_Gen.cfg -l RealTimeLoader -s State1
Once the application is running, inspect the screen output and verify that the application is running without issues. The log should show entries similar to the following:
$ [Warning - Threads.cpp:181]: Failed to change the thread priority (likely due to insufficient permissions)
$ [Information - RealTimeLoader.cpp:111]: Started application in state State1
$ [Information - MARTeApp.cpp:135]: Application starting
$ [Information - LoggerBroker.cpp:152]: Time [0:0]:0
$ [Information - LoggerBroker.cpp:152]: Time [0:0]:1000000
...
Open another terminal and start the receiver application with:
./MARTeApp.sh -f ../Configurations/MassSpring/RTApp-MassSpring-24-Receiver_Gen.cfg -l RealTimeLoader -s State1
Once the application is running, inspect the screen output and verify that the application is running without issues. The log should show entries similar to the following:
$ [Warning - Threads.cpp:181]: Failed to change the thread priority (likely due to insufficient permissions)
$ [Information - RealTimeLoader.cpp:111]: Started application in state State1
$ [Information - MARTeApp.cpp:135]: Application starting
$ [Information - LoggerBroker.cpp:152]: Time [0:0]:14000000
...
Note
The actual time values will depend on the arrival of UDP packets, which in turn depends on the execution of the sender application. If the receiver application is started before the sender, the first Time value will not be zero, but will correspond to the Time of the first received UDP packet.
Exercises
Ex. 1: Detecting stalled data
In the example above, the receiver application waits indefinitely for UDP packets to arrive. In some cases, it may be desirable to detect if the data stream has stalled (e.g. due to a network issue or because the sender application has stopped). This can be achieved by using the Timeout parameter of the UDPReceiver DataSource, which specifies a timeout in milliseconds after which the UDPReceiver will return a failure.
Warning
A DataSource failure propagates to the interfacing GAM and then to the GAMScheduler.
This means that all subsequent GAMs in the execution list will not be executed.
Consequently, if a GAM needs to gracefully handle a failure, it must be executed before the GAM that will fail (which means it will use data from the previous cycle).
Edit the file
../Configurations/MassSpring/RTApp-MassSpring-25-Receiver.cfgand modify the DataSourceUDPWriterReferenceto have aTimeoutof1.0 s.Add a
MathExpressionGAMto detect stalled data based on theTimesignal and name itUDPStalled.Add the
UDPStalledsignal to theGAMWriterStatsGAM and to theUDPWriterStatsDataSource.Add the new
MathExpressionGAMto the execution list before theGAMUDPReceiver, and also move theGAMWriterStatsbefore theGAMUDPReceiver.Run
make -C ../Configurations/MassSpring/ -f Makefile.cfgto generate the configuration file with the correct UDP port numbers.
make -C ../Configurations/MassSpring/ -f Makefile.cfg
./MARTeApp.sh -f ../Configurations/MassSpring/RTApp-MassSpring-25-Receiver_Gen.cfg -l RealTimeLoader -s State1
TUTORIAL_UDP_PORT_4=`awk '/\+UDPWriterStats/,/}/ {if ($1=="Port") print $3}' ../Configurations/MassSpring/RTApp-MassSpring-25-Receiver_Gen.cfg`;echo "TUTORIAL_UDP_PORT_4=$TUTORIAL_UDP_PORT_4"
python ../Test/Integrated/udp_monitor.py -p $TUTORIAL_UDP_PORT_4 -s 3
Stop the sender application and verify on the python output that the receiver application detects the stalled data stream.
Solution
The solution is to modify the configuration file ../Configurations/MassSpring/RTApp-MassSpring-25-Receiver.cfg and to add a MathExpressionGAM.
Time signal. 1 +GAMMathStalled = {
2 Class = MathExpressionGAM
3 Expression = "
4 UDPStalled = (uint8)((Time - LastTime) == (uint32)0);
5 LastTime = Time;"
6 InputSignals = {
7 LastTime= {
8 DataSource = DDB1
9 Type = uint32
10 }
11 Time = {
12 DataSource = DDB1
13 Type = uint32
14 }
15 }
16 OutputSignals = {
17 LastTime= {
18 DataSource = DDB1
19 Type = uint32
20 }
21 UDPStalled = {
22 DataSource = DDB1
23 Type = uint8
24 }
25 }
26 }
Add the UDPStalled signal to the GAMWriterStats GAM and to the UDPWriterStats DataSource.
GAMWriterStats with the UDPStalled signal. 1 +GAMWriterStats = {
2 Class = IOGAM
3 InputSignals = {
4 UDPStalled = {
5 DataSource = DDB1
6 Type = uint8
7 }
8 Thread1CycleTime = {
9 OutputSignals = {
10 UDPStalled = {
11 DataSource = UDPWriterStats
12 Type = uint8
13 }
14 Thread1CycleTime = {
UDPWriterStats with the UDPStalled signal. 1 +UDPWriterStats = {
2 Class = UDP::UDPSender
3 Address = "127.0.0.1"
4 Port = TUTORIAL_UDP_PORT_4
5 ExecutionMode = RealTimeThread
6 Signals = {
7 UDPStalled = {
8 Type = uint8
9 }
10 Thread1CycleTime = {
Add the new MathExpressionGAM to the execution list before the GAMUDPReceiver, and also move the GAMWriterStats before the GAMUDPReceiver.
1 +States = {
2 Class = ReferenceContainer
3 +State1 = {
4 Class = RealTimeState
5 +Threads = {
6 Class = ReferenceContainer
7 +Thread1 = {
8 Class = RealTimeThread
9 CPUs = 0x1
10 Functions = {GAMPerfMonitor GAMMathStalled GAMWriterStats GAMUDPReceiver GAMDisturbanceWaveform GAMMathDisturbance GAMFilterDisturbance GAMController GAMSpringMass GAMMathModel GAMMathPositionErr GAMStats GAMPatchForceDims GAMForceStats GAMHist GAMMathExpr GAMConversion GAMMathTrigger GAMMathTriggerSecond GAMDisplay GAMWriter}
11 }
12 }
13 }
14 }