SDNSubscriber
Warning
The SDNSubscriber DataSource is only available in CODAC Core System distributions.
The SDNSubscriber DataSource can be used to receive application data over the ITER SDN network.
Typical use cases include subscribing to data from a remote application. This data can be used for monitoring or control purposes. In the latter case, the data is used to synchronise the RealTimeThread.
In this section, the LinuxTimer will be replaced by a SDNSubscriber and the RealTimeThread will execute every time an SDN topic arrives.
The publisher will be the application developed as part of the exercise in the previous section, which streams application data and statistics over SDN. The configuration file for the sender is ../Configurations/MassSpring/RTApp-MassSpring-28-Sender.cfg.
The SDNSubscriber configuration is similar to the UDPReceiver, with the main difference being that the topic name is set from the variable TUTORIAL_TOPIC_NAME, which is replaced by the Makefile.cfg. The ExecutionMode is set to RealTimeThread, which means that the pace of the RealTimeThread will be determined by the arrival of SDN topics.
Given that some application GAMs need to use a relative time reference, the SDNSubscriber also offers its Header as a signal to be further processed by other GAMs.
1 +SDNSubscriber = {
2 Class = SDN::SDNSubscriber
3 Interface = lo
4 Topic = TUTORIAL_TOPIC_NAME_0
5 ExecutionMode = RealTimeThread
6 Signals = {
7 Header = {
8 Type = uint8
9 NumberOfElements = 48
10 }
11 ReferencePosition = {
12 Type = float64
13 }
14 }
15 }
The GAMSDNSubscriber is an excellent example of how an IOGAM can be used to expand and cast signals. This strategy is used to expand the binary definition of the SDN header into individual signals that can be used by other GAMs.
GAMTimer replaced with GAMSDNSubscriber. The Frequency parameter still needs to be set to a non-zero value, but the actual pace of the RealTimeThread will be provided by the arrival of SDN packets. 1 +GAMSDNSubscriber = {
2 Class = IOGAM
3 InputSignals = {
4 Header = {
5 Type = uint8
6 NumberOfElements = 48
7 DataSource = SDNSubscriber
8 }
9 ReferencePosition = {
10 DataSource = SDNSubscriber
11 Type = float64
12 Frequency = 100
13 }
14 }
15 OutputSignals = {
16 UID = {
17 DataSource = DDB1
18 Type = uint8
19 NumberOfElements = 4
20 }
21 Version = {
22 DataSource = DDB1
23 Type = uint8
24 NumberOfElements = 4
25 }
26 Size = {
27 DataSource = DDB1
28 Type = uint32
29 }
30 TopicUID = {
31 DataSource = DDB1
32 Type = uint32
33 }
34 TopicVersion = {
35 DataSource = DDB1
36 Type = uint32
37 }
38 TopicSize = {
39 DataSource = DDB1
40 Type = uint32
41 }
42 TopicCounter = {
43 DataSource = DDB1
44 Type = uint64
45 }
46 SendTime = {
47 DataSource = DDB1
48 Type = uint64
49 }
50 RecvTime = {
51 DataSource = DDB1
52 Type = uint64
53 }
54 ReferencePosition = {
55 DataSource = DDB1
56 Type = float64
57 }
58 }
59 }
Given that some GAMs require a relative time reference, a MathExpressionGAM is used to compute and maintain a relative time based on the timestamp received in the SDN header. Since this signal is of type uint64, and because the MathExpressionGAM supports neither conditional statements nor the mod operator, a fairly complex expression is needed to compute the relative time in microseconds, as shown in the following code snippet. This provides a good example of why a custom GAM is often preferable for complex signal processing tasks, as discussed later in the tutorial.
MathExpressionGAM to compute relative time in microseconds (uint32). 1 +GAMMathRelativeTime = {
2 Class = MathExpressionGAM
3 Expression = "
4 OneSecondNano = (uint64)1000000000;
5 TenMillisecondsNano = (uint64)10000000;
6 TenMillisecondsMicro = (uint64)10000;
7 TimeDeltaErrNano = (uint64)1000000; //Force the TimeDeltaUInt64 to be > 10 ms
8 TimeDeltaUInt64 = (RecvTime - LastRecvTime) + TimeDeltaErrNano; //Compute the relative time in nanoseconds and make sure that it is always > 10 ms
9 TimeDeltaUInt64Seconds = ((TimeDeltaUInt64) / OneSecondNano); //Get the seconds
10 TimeDeltaUInt64Mod = (TimeDeltaUInt64 - TimeDeltaUInt64Seconds * OneSecondNano); //Compute the mod, keeping only the seconds
11 TimeDeltaUInt64Microseconds = (TimeDeltaUInt64Mod / (uint64)TenMillisecondsNano) * (uint64)TenMillisecondsMicro; //Convert to microseconds
12 LastRecvTime = RecvTime; //Note that the first received packet will have a very number and thus the first delta above will be very large > uint32
13 TimeUInt64 = TimeUInt64 + (uint64)(TimeDeltaUInt64Microseconds);
14 Time = (uint32)(TimeUInt64);"
15 InputSignals = {
Running the application
Start the sender application with:
make -C ../Configurations/MassSpring/ -f Makefile.cfg
./MARTeApp.sh -f ../Configurations/MassSpring/RTApp-MassSpring-28-Publisher_Gen.cfg -l RealTimeLoader -s State1
Once the application is running, inspect the screen output and verify that the application is running without any issues. The log should show entries similar to the following:
$ [Warning - Threads.cpp:181]: Failed to change the thread priority (likely due to insufficient permissions)
$ [Information - RealTimeLoader.cpp:111]: Started application in state State1
$ [Information - MARTeApp.cpp:135]: Application starting
$ [Information - LoggerBroker.cpp:152]: Time [0:0]:0
$ [Information - SDNLoggerCallback.cpp:95]: [ccs::mcast] MessengerImpl::Receive - Received message from '127.0.0.1:10002'
$ [Information - SDNLoggerCallback.cpp:95]: [sdn::disc] MessengerImpl::Receive - Message ...
$ [Information - LoggerBroker.cpp:152]: Time [0:0]:1000000
...
Open another terminal and start the receiver application with:
./MARTeApp.sh -f ../Configurations/MassSpring/RTApp-MassSpring-28-Subscriber_Gen.cfg -l RealTimeLoader -s State1
Once the application is running, inspect the screen output and verify that the application is running without any issues. The log should show entries similar to the following:
$ [Warning - Threads.cpp:181]: Failed to change the thread priority (likely due to insufficient permissions)
$ [Information - RealTimeLoader.cpp:111]: Started application in state State1
$ [Information - MARTeApp.cpp:135]: Application starting
$ [Information - SDNLoggerCallback.cpp:95]: [ccs::mcast] MessengerImpl::Receive - Received message from '127.0.0.1:10002'
$ [Information - SDNLoggerCallback.cpp:95]: [sdn::disc] MessengerImpl::Receive - Message ...
$ [Information - LoggerBroker.cpp:152]: Time [0:0]:1000000
$ [Information - LoggerBroker.cpp:152]: Time [0:0]:2000000
...
Note
The actual time values will depend on the arrival of SDN packets, which in turn depends on the execution of the sender application. If the receiver application is started before the sender, the first Time value will not be zero, but will instead correspond to the Time from the first received SDN packet.
The CCS tool sdn-print will be used to monitor the SDN traffic and verify that the application is streaming data over the SDN network.
Open another console and generate the required .xml configuration files for sdn-print.
../Test/Integrated/GenerateSDNTopicFiles.sh
Export the required environment variables for the sdn-print tool and run it.
export SDN_TOPIC_PATH=../Test/Integrated
sdn-print -i lo -t mass-spring-0 -c -1
Notice that both MARTe2 and the sdn-print tool receive from the SDN topic source using multicast.
On another console, check the statistics from the publisher application:
export SDN_TOPIC_PATH=../Test/Integrated
sdn-print -i lo -t mass-spring-1 -c -1
And on another console, check the statistics from the subscriber application:
export SDN_TOPIC_PATH=../Test/Integrated
sdn-print -i lo -t mass-spring-2 -c -1
Exercises
Ex. 1: Detecting stalled data
In the example above, the application waits with the default SDNSubscriber timeout for packets to arrive. In some cases, it may be desirable to detect if the data stream has stalled (e.g. due to a network issue or because the sender application has stopped).
Warning
A DataSource failure propagates to the interfacing GAM and from there to the GAMScheduler.
This means that all subsequent GAMs in the execution list will not be executed.
Consequently, if a GAM needs to gracefully handle a failure, it must be executed before the GAM that will fail (which means that it will use data from the previous cycle).
Edit the file
../Configurations/MassSpring/RTApp-MassSpring-29-Subscriber.cfgand add aMathExpressionGAMto detect stalled data based on theTopicCountersignal and name itSDNStalled.Add the
SDNStalledsignal to theGAMWriterStatsGAM and to theSDNPublisherStatsDataSource.Add the new
MathExpressionGAMto the execution list before theGAMSDNSubscriberand move theGAMWriterStatsalso before theGAMSDNSubscriber.Run
make -C ../Configurations/MassSpring/ -f Makefile.cfgto generate the configuration file with the correct topic names.
make -C ../Configurations/MassSpring/ -f Makefile.cfg
./MARTeApp.sh -f ../Configurations/MassSpring/RTApp-MassSpring-29-Subscriber_Gen.cfg -l RealTimeLoader -s State1
../Test/Integrated/GenerateSDNTopicFiles.sh
export SDN_TOPIC_PATH=../Test/Integrated
sdn-print -i lo -t mass-spring-3 -c -1
Stop the publisher application and verify that the receiver detects the stalled data stream and that the
SDNStalledsignal is set to 1.
Solution
The solution is to modify the configuration file ../Configurations/MassSpring/RTApp-MassSpring-29-Subscriber.cfg and to add a MathExpressionGAM.
TopicCounter signal. 1 +GAMMathStalled = {
2 Class = MathExpressionGAM
3 Expression = "
4 SDNStalled = (uint8)((TopicCounter - LastTopicCounter) == (uint32)0);
5 LastTopicCounter = TopicCounter;"
6 InputSignals = {
7 LastTopicCounter = {
8 DataSource = DDB1
9 Type = uint64
10 }
11 TopicCounter = {
12 DataSource = DDB1
13 Type = uint64
14 }
15 }
16 OutputSignals = {
17 LastTopicCounter = {
18 DataSource = DDB1
19 Type = uint64
20 }
21 SDNStalled = {
22 DataSource = DDB1
23 Type = uint8
24 }
25 }
26 }
Add the SDNStalled signal to the GAMWriterStats GAM and to the SDNPublisherStats DataSource.
GAMWriterStats with the SDNStalled signal. 1 +GAMWriterStats = {
2 Class = IOGAM
3 InputSignals = {
4 SDNStalled = {
5 DataSource = DDB1
6 Type = uint8
7 }
8 Thread1CycleTime = {
9 OutputSignals = {
10 SDNStalled = {
11 DataSource = SDNPublisherStats
12 Type = uint8
13 }
14 Thread1CycleTime = {
SDNPublisherStats with the SDNStalled signal.1 +SDNPublisherStats = {
2 Class = SDN::SDNPublisher
3 Interface = lo
4 Topic = TUTORIAL_TOPIC_NAME_3
5 Signals = {
6 SDNStalled = {
7 Type = uint8
8 }
9 Thread1CycleTime = {
Add the new MathExpressionGAM to the execution list before the GAMSDNSubscriber and move the GAMWriterStats also before the GAMSDNSubscriber.
1 +States = {
2 Class = ReferenceContainer
3 +State1 = {
4 Class = RealTimeState
5 +Threads = {
6 Class = ReferenceContainer
7 +Thread1 = {
8 Class = RealTimeThread
9 CPUs = 0x1
10 Functions = {GAMPerfMonitor GAMMathStalled GAMWriterStats GAMSDNSubscriber GAMMathRelativeTime GAMDisturbanceWaveform GAMMathDisturbance GAMFilterDisturbance GAMController GAMSpringMass GAMMathModel GAMMathPositionErr GAMStats GAMPatchForceDims GAMForceStats GAMHist GAMMathExpr GAMConversion GAMMathTrigger GAMMathTriggerSecond GAMDisplay GAMWriter}
11 }
12 }
13 }
14 }