RealTimeThreadSynchronisation

The RealTimeThreadSynchronisation DataSource allows synchronising multiple RealTimeThreads.

Typical use cases include (see also the section on Synchronising multiple threads):

  • Sharing the workload between multiple threads.

  • Synchronising threads with different execution frequencies (e.g. a thread executing at 1 kHz and another thread executing at 10 kHz, where the 1 kHz thread needs to be synchronised with the 10 kHz thread).

In this example, one thread is used to receive data from a UDP stream and to synchronise the execution of the RealTimeThread with the arrival of UDP packets. This data will then be offered to a separate thread, which will be responsible for processing the data at 100 Hz and storing it in a file.

Thread configuration. Note that the CPUs could have been allocated to different cores.
 1    +States = {
 2        Class = ReferenceContainer
 3        +State1 = {
 4            Class = RealTimeState
 5            +Threads = {
 6                Class = ReferenceContainer
 7                +ThreadReceiver = {
 8                    Class = RealTimeThread
 9                    CPUs = 0x1
10                    Functions = {GAMPerfMonitorReceiver GAMUDPReceiver GAMThreadSyncOut}
11                }
12                +ThreadProcess = {
13                    Class = RealTimeThread
14                    CPUs = 0x1
15                    Functions = {GAMPerfMonitorProcess GAMThreadSyncIn GAMDownsample GAMDisturbanceWaveform GAMMathDisturbance GAMFilterDisturbance GAMController GAMSpringMass GAMMathModel GAMMathPositionErr GAMStats GAMPatchForceDims GAMForceStats GAMHist GAMMathExpr GAMConversion GAMMathTrigger GAMMathTriggerSecond GAMDisplay GAMWriter}
16                }
17            }
18        }        
19    }
The RealTimeThreadSynchronisation expects one thread to write data to it and one or more threads to read data from it.
1        +RTThreadSynch = {
2            Class = RealTimeThreadSynchronisation
3            Timeout = 5000 //Timeout in ms to wait for the thread to cycle.
4        }
The GAMThreadSyncOut writes data into RTThreadSynch.
 1        +GAMThreadSyncOut = {
 2            Class = IOGAM            
 3            InputSignals = {
 4                Counter = {
 5                    DataSource = DDBReceiver
 6                    Type = uint32
 7                }                
 8                Time = {
 9                    DataSource = DDBReceiver
10                    Type = uint32
11                }            
12                ReferencePosition = {
13                    DataSource = DDBReceiver
14                    Type = float64
15                }
16                ThreadReceiverCycleTime = {
17                    DataSource = DDBReceiver
18                    Type = uint32
19                }           
20            }
21            OutputSignals = {
22                Counter = {
23                    DataSource = RTThreadSynch
24                    Type = uint32
25                }                
26                Time = {
27                    DataSource = RTThreadSynch
28                    Type = uint32
29                }            
30                ReferencePosition = {
31                    DataSource = RTThreadSynch
32                    Type = float64
33                }
34                ThreadReceiverCycleTime = {
35                    DataSource = RTThreadSynch
36                    Type = uint32
37                }
38            }
39        }
The GAMThreadSyncIn reads data from RTThreadSynch. Note that by requesting Samples=10 the GAM will read 10 samples from RTThreadSynch and will only execute when 10 samples are available, thus allowing data to be processed at a lower frequency (e.g. 100 Hz). The 10 collected samples will then be offered as an array of 10 elements.
 1        +GAMThreadSyncIn = {
 2            Class = IOGAM
 3            InputSignals = {
 4                Counter = {
 5                    DataSource = RTThreadSynch
 6                    Type = uint32
 7                    Samples = 10
 8                }                
 9                Time = {
10                    DataSource = RTThreadSynch
11                    Type = uint32
12                    Samples = 10
13                }            
14                ReferencePosition = {
15                    DataSource = RTThreadSynch
16                    Type = float64
17                    Samples = 10
18                }
19                ThreadReceiverCycleTime = {
20                    DataSource = RTThreadSynch
21                    Type = uint32
22                    Samples = 10
23                }
24            }
25            OutputSignals = {
26                CounterN10 = {
27                    DataSource = DDBProcess 
28                    Type = uint32
29                    NumberOfElements = 10
30                }                
31                TimeN10 = {
32                    DataSource = DDBProcess 
33                    Type = uint32
34                    NumberOfElements = 10
35                }            
36                ReferencePositionN10 = {
37                    DataSource = DDBProcess 
38                    Type = float64
39                    NumberOfElements = 10
40                }
41                ThreadReceiverCycleTimeN10 = {
42                    DataSource = DDBProcess
43                    Type = uint32
44                    NumberOfElements = 10
45                }
46            }
The data needs to be downsampled. Ideally, it should have been decimated with a proper filter, but for simplicity in this example the data is just downsampled by taking one sample every 10.
 1        +GAMDownsample = {
 2            Class = IOGAM
 3            InputSignals = {
 4                CounterN10 = {
 5                    DataSource = DDBProcess 
 6                    Type = uint32
 7                    NumberOfElements = 10
 8                    Ranges = {{0 0}}
 9                }                
10                TimeN10 = {
11                    DataSource = DDBProcess 
12                    Type = uint32
13                    NumberOfElements = 10
14                    Ranges = {{0 0}}
15                }            
16                ReferencePositionN10 = {
17                    DataSource = DDBProcess 
18                    Type = float64
19                    NumberOfElements = 10
20                    Ranges = {{0 0}}
21                }
22                ThreadReceiverCycleTimeN10 = {
23                    DataSource = DDBProcess
24                    Type = uint32
25                    NumberOfElements = 10
26                    Ranges = {{0 0}}
27                }
28            }
29            OutputSignals = {
30                Counter = {
31                    DataSource = DDBProcess 
32                    Type = uint32
33                }                
34                Time  = {
35                    DataSource = DDBProcess 
36                    Type = uint32
37                }            
38                ReferencePosition = {
39                    DataSource = DDBProcess 
40                    Type = float64
41                }
42                ThreadReceiverCycleTime  = {
43                    DataSource = DDBProcess
44                    Type = uint32
45                }
46            }
47        }

Running the application

Start the receiver application with:

make -C ../Configurations/MassSpring/ -f Makefile.cfg
./MARTeApp.sh -f ../Configurations/MassSpring/RTApp-MassSpring-47-Receiver_Gen.cfg -l RealTimeLoader -s State1

Once the application is running, inspect the screen output and verify that it is running without any issues. The log should show entries similar to the following:

$ [Warning - Threads.cpp:181]: Failed to change the thread priority (likely due to insufficient permissions)
$ [Information - RealTimeLoader.cpp:111]: Started application in state State1
$ [Information - MARTeApp.cpp:135]: Application starting

Note

The RTThreadSynch is configured with a timeout of 5 seconds, which means that if the sender application is not started within 5 seconds, the receiver application will log a warning message and will continue to wait for data to arrive.

$ [Warning - GAMSchedulerI.cpp:429]: BrokerI GAMThreadSyncIn.InputBroker.RealTimeThreadSynchBroker failed, owner function: GAMThreadSyncIn, owner DataSource: RTThreadSynch
$ [FatalError - GAMScheduler.cpp:233]: Failed to ExecuteSingleCycle().
$ [Warning - GAMSchedulerI.cpp:429]: BrokerI GAMThreadSyncI

Open another terminal and start the sender application (which will be sending at 1 kHz) with:

./MARTeApp.sh -f ../Configurations/MassSpring/RTApp-MassSpring-47-Sender_Gen.cfg -l RealTimeLoader -s State1

Once the application is running, inspect the screen output and verify that it is running without any issues. The log should show entries similar to the following:

$ [Warning - Threads.cpp:181]: Failed to change the thread priority (likely due to insufficient permissions)
$ [Information - RealTimeLoader.cpp:111]: Started application in state State1
$ [Information - MARTeApp.cpp:135]: Application starting
$ [Information - LoggerBroker.cpp:152]: Time [0:0]:0
$ [Information - LoggerBroker.cpp:152]: Time [0:0]:1000000
...

Wait a few seconds and inspect the collected data:

python ../Test/Integrated/plot_mass_spring_csv.py -f ../Test/Integrated/MassSpring-47.csv -s ReferencePosition Position
python ../Test/Integrated/plot_mass_spring_csv.py -f ../Test/Integrated/MassSpring-47.csv -s ThreadReceiverCycleTime ThreadProcessCycleTime

Check that the cycle times are 1000 microseconds for the receiver thread and 10000 microseconds for the processing thread, which confirms that the synchronisation is working correctly.

Exercises

Ex. 1: Running two threads with different models

The objective of this exercise is to synchronise two threads with the same data producer. They will both run at the same frequency (100 Hz) but will implement different models for the mass-spring system (as shown in previous examples).

This use case could be used to compare the performance of two different models and implement a 1oo2 mechanism to switch between the two models in case of a failure.

  1. Edit the file ../Configurations/MassSpring/RTApp-MassSpring-48-Receiver.cfg and add an IOGAM named GAMThreadSyncIn2. This GAM shall read the signals Counter, Time, ReferencePosition and ThreadReceiverCycleTime from the RTThreadSynch DataSource, with 10 samples each, and offer them to the rest of the RealTimeThread as arrays of 10 elements.

  2. Add another thread to the RealTimeState and name it +ThreadProcess2. Make sure that the following GAMs are executed in the correct order: GAMPerfMonitorProcess2, GAMThreadSyncIn2, GAMDownsample2, GAMController2, GAMMathModel, GAMMathTrigger2 and GAMWriter2.

  3. Run make -C ../Configurations/MassSpring/ -f Makefile.cfg to generate the configuration file with the correct UDP port numbers.

make -C ../Configurations/MassSpring/ -f Makefile.cfg
./MARTeApp.sh -f ../Configurations/MassSpring/RTApp-MassSpring-48-Receiver_Gen.cfg -l RealTimeLoader -s State1
  1. Open another terminal and start the sender application (which will be sending at 1 kHz) with:

./MARTeApp.sh -f ../Configurations/MassSpring/RTApp-MassSpring-47-Sender_Gen.cfg -l RealTimeLoader -s State1

Wait a few seconds and inspect the collected data:

python ../Test/Integrated/plot_mass_spring_csv.py -f ../Test/Integrated/MassSpring-48-1.csv -s ReferencePosition Position
python ../Test/Integrated/plot_mass_spring_csv.py -f ../Test/Integrated/MassSpring-48-1.csv -s ThreadReceiverCycleTime ThreadProcess1CycleTime
python ../Test/Integrated/plot_mass_spring_csv.py -f ../Test/Integrated/MassSpring-48-2.csv -s ReferencePosition Position
python ../Test/Integrated/plot_mass_spring_csv.py -f ../Test/Integrated/MassSpring-48-2.csv -s ThreadReceiverCycleTime ThreadProcess2CycleTime
Solution

The solution is to modify the configuration file ../Configurations/MassSpring/RTApp-MassSpring-48-Receiver.cfg and add the IOGAM as described above.

GAM to synchronise the second thread.
 1        +GAMThreadSyncIn2 = {
 2            Class = IOGAM
 3            InputSignals = {
 4                Counter = {
 5                    DataSource = RTThreadSynch
 6                    Type = uint32
 7                    Samples = 10
 8                }                
 9                Time = {
10                    DataSource = RTThreadSynch
11                    Type = uint32
12                    Samples = 10
13                }            
14                ReferencePosition = {
15                    DataSource = RTThreadSynch
16                    Type = float64
17                    Samples = 10
18                }
19                ThreadReceiverCycleTime = {
20                    DataSource = RTThreadSynch
21                    Type = uint32
22                    Samples = 10
23                }
24            }
25            OutputSignals = {
26                CounterN10 = {
27                    DataSource = DDBProcess2 
28                    Type = uint32
29                    NumberOfElements = 10
30                }                
31                TimeN10 = {
32                    DataSource = DDBProcess2 
33                    Type = uint32
34                    NumberOfElements = 10
35                }            
36                ReferencePositionN10 = {
37                    DataSource = DDBProcess2 
38                    Type = float64
39                    NumberOfElements = 10
40                }
41                ThreadReceiverCycleTimeN10 = {
42                    DataSource = DDBProcess2
43                    Type = uint32
44                    NumberOfElements = 10
45                }
46            }
47        }

ThreadProcess2 is then added to the RealTimeState and the GAMs are added in the correct order.

Updated execution list.
 1    +States = {
 2        Class = ReferenceContainer
 3        +State1 = {
 4            Class = RealTimeState
 5            +Threads = {
 6                Class = ReferenceContainer
 7                +ThreadReceiver = {
 8                    Class = RealTimeThread
 9                    CPUs = 0x1
10                    Functions = {GAMPerfMonitorReceiver GAMUDPReceiver GAMThreadSyncOut}
11                }
12                +ThreadProcess1 = {
13                    Class = RealTimeThread
14                    CPUs = 0x1
15                    Functions = {GAMPerfMonitorProcess1 GAMThreadSyncIn1 GAMDownsample1 GAMController1 GAMSpringMass GAMMathTrigger1 GAMWriter1}
16                }
17                +ThreadProcess2 = {
18                    Class = RealTimeThread
19                    CPUs = 0x1
20                    Functions = {GAMPerfMonitorProcess2 GAMThreadSyncIn2 GAMDownsample2 GAMController2 GAMMathModel GAMMathTrigger2 GAMWriter2}
21                }
22            }
23        }        
24    }