SDNSubscriber

Warning

The SDNSubscriber DataSource is only available in CODAC Core System distributions.

The SDNSubscriber DataSource can be used to receive application data over the ITER SDN network.

Typical use cases include subscribing to data from a remote application. This data can be used for monitoring or control purposes. In the latter case, the data is used to synchronise the RealTimeThread.

In this section, the LinuxTimer will be replaced by a SDNSubscriber and the RealTimeThread will execute every time an SDN topic arrives.

The publisher will be the application developed as part of the exercise in the previous section, which streams application data and statistics over SDN. The configuration file for the sender is ../Configurations/MassSpring/RTApp-MassSpring-28-Sender.cfg.

The SDNSubscriber configuration is similar to the UDPReceiver, with the main difference being that the topic name is set from the variable TUTORIAL_TOPIC_NAME, which is replaced by the Makefile.cfg. The ExecutionMode is set to RealTimeThread, which means that the pace of the RealTimeThread will be determined by the arrival of SDN topics.

Given that some application GAMs need to use a relative time reference, the SDNSubscriber also offers its Header as a signal to be further processed by other GAMs.

SDNSubscriber configuration.
 1        +SDNSubscriber = {
 2            Class = SDN::SDNSubscriber
 3            Interface = lo
 4            Topic = TUTORIAL_TOPIC_NAME_0
 5            ExecutionMode = RealTimeThread
 6            Signals = {
 7                Header = {
 8                    Type = uint8
 9                    NumberOfElements = 48
10                }
11                ReferencePosition = {
12                    Type = float64
13                }
14            }
15        }        

The GAMSDNSubscriber is an excellent example of how an IOGAM can be used to expand and cast signals. This strategy is used to expand the binary definition of the SDN header into individual signals that can be used by other GAMs.

GAMTimer replaced with GAMSDNSubscriber. The Frequency parameter still needs to be set to a non-zero value, but the actual pace of the RealTimeThread will be provided by the arrival of SDN packets.
 1        +GAMSDNSubscriber = {
 2            Class = IOGAM
 3            InputSignals = {
 4                Header = {
 5                    Type = uint8
 6                    NumberOfElements = 48
 7                    DataSource =  SDNSubscriber
 8                }
 9                ReferencePosition = {
10                    DataSource = SDNSubscriber
11                    Type = float64
12                    Frequency = 100
13                }
14            }
15            OutputSignals = {
16                UID = {
17                    DataSource = DDB1
18                    Type = uint8
19                    NumberOfElements = 4
20                }
21                Version = {
22                    DataSource = DDB1
23                    Type = uint8
24                    NumberOfElements = 4
25                }
26                Size = {
27                    DataSource = DDB1
28                    Type = uint32
29                }
30                TopicUID = {
31                    DataSource = DDB1
32                    Type = uint32
33                }
34                TopicVersion = {
35                    DataSource = DDB1
36                    Type = uint32
37                }
38                TopicSize = {
39                    DataSource = DDB1
40                    Type = uint32
41                }
42                TopicCounter = {
43                    DataSource = DDB1
44                    Type = uint64
45                }
46                SendTime = {
47                    DataSource = DDB1
48                    Type = uint64
49                }
50                RecvTime = {
51                    DataSource = DDB1
52                    Type = uint64
53                }
54                ReferencePosition = {
55                    DataSource = DDB1
56                    Type = float64
57                }
58            }
59        }

Given that some GAMs require a relative time reference, a MathExpressionGAM is used to compute and maintain a relative time based on the timestamp received in the SDN header. Since this signal is of type uint64, and because the MathExpressionGAM supports neither conditional statements nor the mod operator, a fairly complex expression is needed to compute the relative time in microseconds, as shown in the following code snippet. This provides a good example of why a custom GAM is often preferable for complex signal processing tasks, as discussed later in the tutorial.

MathExpressionGAM to compute relative time in microseconds (uint32).
 1        +GAMMathRelativeTime = {
 2            Class = MathExpressionGAM
 3            Expression = "
 4                OneSecondNano = (uint64)1000000000;
 5                TenMillisecondsNano = (uint64)10000000;
 6                TenMillisecondsMicro = (uint64)10000;
 7                TimeDeltaErrNano = (uint64)1000000; //Force the TimeDeltaUInt64 to be > 10 ms
 8                TimeDeltaUInt64 = (RecvTime - LastRecvTime) + TimeDeltaErrNano; //Compute the relative time in nanoseconds and make sure that it is always > 10 ms
 9                TimeDeltaUInt64Seconds = ((TimeDeltaUInt64) / OneSecondNano); //Get the seconds
10                TimeDeltaUInt64Mod = (TimeDeltaUInt64 - TimeDeltaUInt64Seconds * OneSecondNano); //Compute the mod, keeping only the seconds
11                TimeDeltaUInt64Microseconds = (TimeDeltaUInt64Mod / (uint64)TenMillisecondsNano) * (uint64)TenMillisecondsMicro; //Convert to microseconds
12                LastRecvTime = RecvTime; //Note that the first received packet will have a very number and thus the first delta above will be very large > uint32
13                TimeUInt64 = TimeUInt64 + (uint64)(TimeDeltaUInt64Microseconds);
14                Time = (uint32)(TimeUInt64);"
15            InputSignals = {

Running the application

Start the sender application with:

make -C ../Configurations/MassSpring/ -f Makefile.cfg
./MARTeApp.sh -f ../Configurations/MassSpring/RTApp-MassSpring-28-Publisher_Gen.cfg -l RealTimeLoader -s State1

Once the application is running, inspect the screen output and verify that the application is running without any issues. The log should show entries similar to the following:

$ [Warning - Threads.cpp:181]: Failed to change the thread priority (likely due to insufficient permissions)
$ [Information - RealTimeLoader.cpp:111]: Started application in state State1
$ [Information - MARTeApp.cpp:135]: Application starting
$ [Information - LoggerBroker.cpp:152]: Time [0:0]:0
$ [Information - SDNLoggerCallback.cpp:95]: [ccs::mcast] MessengerImpl::Receive - Received message from '127.0.0.1:10002'
$ [Information - SDNLoggerCallback.cpp:95]: [sdn::disc] MessengerImpl::Receive - Message ...
$ [Information - LoggerBroker.cpp:152]: Time [0:0]:1000000
...

Open another terminal and start the receiver application with:

./MARTeApp.sh -f ../Configurations/MassSpring/RTApp-MassSpring-28-Subscriber_Gen.cfg -l RealTimeLoader -s State1

Once the application is running, inspect the screen output and verify that the application is running without any issues. The log should show entries similar to the following:

$ [Warning - Threads.cpp:181]: Failed to change the thread priority (likely due to insufficient permissions)
$ [Information - RealTimeLoader.cpp:111]: Started application in state State1
$ [Information - MARTeApp.cpp:135]: Application starting
$ [Information - SDNLoggerCallback.cpp:95]: [ccs::mcast] MessengerImpl::Receive - Received message from '127.0.0.1:10002'
$ [Information - SDNLoggerCallback.cpp:95]: [sdn::disc] MessengerImpl::Receive - Message ...
$ [Information - LoggerBroker.cpp:152]: Time [0:0]:1000000
$ [Information - LoggerBroker.cpp:152]: Time [0:0]:2000000
...

Note

The actual time values will depend on the arrival of SDN packets, which in turn depends on the execution of the sender application. If the receiver application is started before the sender, the first Time value will not be zero, but will instead correspond to the Time from the first received SDN packet.

The CCS tool sdn-print will be used to monitor the SDN traffic and verify that the application is streaming data over the SDN network.

Open another console and generate the required .xml configuration files for sdn-print.

../Test/Integrated/GenerateSDNTopicFiles.sh

Export the required environment variables for the sdn-print tool and run it.

export SDN_TOPIC_PATH=../Test/Integrated
sdn-print -i lo -t mass-spring-0 -c -1

Notice that both MARTe2 and the sdn-print tool receive from the SDN topic source using multicast.

On another console, check the statistics from the publisher application:

export SDN_TOPIC_PATH=../Test/Integrated
sdn-print -i lo -t mass-spring-1 -c -1

And on another console, check the statistics from the subscriber application:

export SDN_TOPIC_PATH=../Test/Integrated
sdn-print -i lo -t mass-spring-2 -c -1

Exercises

Ex. 1: Detecting stalled data

In the example above, the application waits with the default SDNSubscriber timeout for packets to arrive. In some cases, it may be desirable to detect if the data stream has stalled (e.g. due to a network issue or because the sender application has stopped).

Warning

A DataSource failure propagates to the interfacing GAM and from there to the GAMScheduler.

This means that all subsequent GAMs in the execution list will not be executed.

Consequently, if a GAM needs to gracefully handle a failure, it must be executed before the GAM that will fail (which means that it will use data from the previous cycle).

  1. Edit the file ../Configurations/MassSpring/RTApp-MassSpring-29-Subscriber.cfg and add a MathExpressionGAM to detect stalled data based on the TopicCounter signal and name it SDNStalled.

  2. Add the SDNStalled signal to the GAMWriterStats GAM and to the SDNPublisherStats DataSource.

  3. Add the new MathExpressionGAM to the execution list before the GAMSDNSubscriber and move the GAMWriterStats also before the GAMSDNSubscriber.

  4. Run make -C ../Configurations/MassSpring/ -f Makefile.cfg to generate the configuration file with the correct topic names.

make -C ../Configurations/MassSpring/ -f Makefile.cfg
./MARTeApp.sh -f ../Configurations/MassSpring/RTApp-MassSpring-29-Subscriber_Gen.cfg -l RealTimeLoader -s State1
../Test/Integrated/GenerateSDNTopicFiles.sh
export SDN_TOPIC_PATH=../Test/Integrated
sdn-print -i lo -t mass-spring-3 -c -1
  1. Stop the publisher application and verify that the receiver detects the stalled data stream and that the SDNStalled signal is set to 1.

Solution

The solution is to modify the configuration file ../Configurations/MassSpring/RTApp-MassSpring-29-Subscriber.cfg and to add a MathExpressionGAM.

MathExpressionGAM to detect stalled data based on the TopicCounter signal.
 1        +GAMMathStalled = {
 2            Class = MathExpressionGAM
 3            Expression = "
 4                SDNStalled = (uint8)((TopicCounter - LastTopicCounter) == (uint32)0);
 5                LastTopicCounter = TopicCounter;"
 6            InputSignals = {
 7                LastTopicCounter = {
 8                    DataSource = DDB1
 9                    Type = uint64
10                }
11                TopicCounter = {
12                    DataSource = DDB1
13                    Type = uint64
14                }
15            }
16            OutputSignals = {
17                LastTopicCounter = {
18                    DataSource = DDB1
19                    Type = uint64
20                }
21                SDNStalled = {
22                    DataSource = DDB1
23                    Type = uint8
24                }
25            }
26        }

Add the SDNStalled signal to the GAMWriterStats GAM and to the SDNPublisherStats DataSource.

Updated GAMWriterStats with the SDNStalled signal.
 1        +GAMWriterStats = {
 2            Class = IOGAM
 3            InputSignals = {
 4                SDNStalled = {
 5                    DataSource = DDB1
 6                    Type = uint8
 7                }
 8                Thread1CycleTime = {
 9            OutputSignals = {
10                SDNStalled = {
11                    DataSource = SDNPublisherStats
12                    Type = uint8
13                }
14                Thread1CycleTime = {
Updated SDNPublisherStats with the SDNStalled signal.
1        +SDNPublisherStats = {
2            Class = SDN::SDNPublisher
3            Interface = lo
4            Topic = TUTORIAL_TOPIC_NAME_3
5            Signals = {
6                SDNStalled = {
7                    Type = uint8
8                }
9                Thread1CycleTime = {

Add the new MathExpressionGAM to the execution list before the GAMSDNSubscriber and move the GAMWriterStats also before the GAMSDNSubscriber.

Updated execution list.
 1    +States = {
 2        Class = ReferenceContainer
 3        +State1 = {
 4            Class = RealTimeState
 5            +Threads = {
 6                Class = ReferenceContainer
 7                +Thread1 = {
 8                    Class = RealTimeThread
 9                    CPUs = 0x1
10                    Functions = {GAMPerfMonitor GAMMathStalled GAMWriterStats GAMSDNSubscriber GAMMathRelativeTime GAMDisturbanceWaveform GAMMathDisturbance GAMFilterDisturbance GAMController GAMSpringMass GAMMathModel GAMMathPositionErr GAMStats GAMPatchForceDims GAMForceStats GAMHist GAMMathExpr GAMConversion GAMMathTrigger GAMMathTriggerSecond GAMDisplay GAMWriter}
11                }
12            }
13        }        
14    }