RealTimeThreadSynchronisation
The RealTimeThreadSynchronisation DataSource allows synchronising multiple RealTimeThreads.
Typical use cases include (see also the section on Synchronising multiple threads):
Sharing the workload between multiple threads.
Synchronising threads with different execution frequencies (e.g. a thread executing at 1 kHz and another thread executing at 10 kHz, where the 1 kHz thread needs to be synchronised with the 10 kHz thread).
In this example, one thread is used to receive data from a UDP stream and to synchronise the execution of the RealTimeThread with the arrival of UDP packets. This data will then be offered to a separate thread, which will be responsible for processing the data at 100 Hz and storing it in a file.
CPUs could have been allocated to different cores. 1 +States = {
2 Class = ReferenceContainer
3 +State1 = {
4 Class = RealTimeState
5 +Threads = {
6 Class = ReferenceContainer
7 +ThreadReceiver = {
8 Class = RealTimeThread
9 CPUs = 0x1
10 Functions = {GAMPerfMonitorReceiver GAMUDPReceiver GAMThreadSyncOut}
11 }
12 +ThreadProcess = {
13 Class = RealTimeThread
14 CPUs = 0x1
15 Functions = {GAMPerfMonitorProcess GAMThreadSyncIn GAMDownsample GAMDisturbanceWaveform GAMMathDisturbance GAMFilterDisturbance GAMController GAMSpringMass GAMMathModel GAMMathPositionErr GAMStats GAMPatchForceDims GAMForceStats GAMHist GAMMathExpr GAMConversion GAMMathTrigger GAMMathTriggerSecond GAMDisplay GAMWriter}
16 }
17 }
18 }
19 }
RealTimeThreadSynchronisation expects one thread to write data to it and one or more threads to read data from it.1 +RTThreadSynch = {
2 Class = RealTimeThreadSynchronisation
3 Timeout = 5000 //Timeout in ms to wait for the thread to cycle.
4 }
GAMThreadSyncOut writes data into RTThreadSynch. 1 +GAMThreadSyncOut = {
2 Class = IOGAM
3 InputSignals = {
4 Counter = {
5 DataSource = DDBReceiver
6 Type = uint32
7 }
8 Time = {
9 DataSource = DDBReceiver
10 Type = uint32
11 }
12 ReferencePosition = {
13 DataSource = DDBReceiver
14 Type = float64
15 }
16 ThreadReceiverCycleTime = {
17 DataSource = DDBReceiver
18 Type = uint32
19 }
20 }
21 OutputSignals = {
22 Counter = {
23 DataSource = RTThreadSynch
24 Type = uint32
25 }
26 Time = {
27 DataSource = RTThreadSynch
28 Type = uint32
29 }
30 ReferencePosition = {
31 DataSource = RTThreadSynch
32 Type = float64
33 }
34 ThreadReceiverCycleTime = {
35 DataSource = RTThreadSynch
36 Type = uint32
37 }
38 }
39 }
GAMThreadSyncIn reads data from RTThreadSynch. Note that by requesting Samples=10 the GAM will read 10 samples from RTThreadSynch and will only execute when 10 samples are available, thus allowing data to be processed at a lower frequency (e.g. 100 Hz). The 10 collected samples will then be offered as an array of 10 elements. 1 +GAMThreadSyncIn = {
2 Class = IOGAM
3 InputSignals = {
4 Counter = {
5 DataSource = RTThreadSynch
6 Type = uint32
7 Samples = 10
8 }
9 Time = {
10 DataSource = RTThreadSynch
11 Type = uint32
12 Samples = 10
13 }
14 ReferencePosition = {
15 DataSource = RTThreadSynch
16 Type = float64
17 Samples = 10
18 }
19 ThreadReceiverCycleTime = {
20 DataSource = RTThreadSynch
21 Type = uint32
22 Samples = 10
23 }
24 }
25 OutputSignals = {
26 CounterN10 = {
27 DataSource = DDBProcess
28 Type = uint32
29 NumberOfElements = 10
30 }
31 TimeN10 = {
32 DataSource = DDBProcess
33 Type = uint32
34 NumberOfElements = 10
35 }
36 ReferencePositionN10 = {
37 DataSource = DDBProcess
38 Type = float64
39 NumberOfElements = 10
40 }
41 ThreadReceiverCycleTimeN10 = {
42 DataSource = DDBProcess
43 Type = uint32
44 NumberOfElements = 10
45 }
46 }
1 +GAMDownsample = {
2 Class = IOGAM
3 InputSignals = {
4 CounterN10 = {
5 DataSource = DDBProcess
6 Type = uint32
7 NumberOfElements = 10
8 Ranges = {{0 0}}
9 }
10 TimeN10 = {
11 DataSource = DDBProcess
12 Type = uint32
13 NumberOfElements = 10
14 Ranges = {{0 0}}
15 }
16 ReferencePositionN10 = {
17 DataSource = DDBProcess
18 Type = float64
19 NumberOfElements = 10
20 Ranges = {{0 0}}
21 }
22 ThreadReceiverCycleTimeN10 = {
23 DataSource = DDBProcess
24 Type = uint32
25 NumberOfElements = 10
26 Ranges = {{0 0}}
27 }
28 }
29 OutputSignals = {
30 Counter = {
31 DataSource = DDBProcess
32 Type = uint32
33 }
34 Time = {
35 DataSource = DDBProcess
36 Type = uint32
37 }
38 ReferencePosition = {
39 DataSource = DDBProcess
40 Type = float64
41 }
42 ThreadReceiverCycleTime = {
43 DataSource = DDBProcess
44 Type = uint32
45 }
46 }
47 }
Running the application
Start the receiver application with:
make -C ../Configurations/MassSpring/ -f Makefile.cfg
./MARTeApp.sh -f ../Configurations/MassSpring/RTApp-MassSpring-47-Receiver_Gen.cfg -l RealTimeLoader -s State1
Once the application is running, inspect the screen output and verify that it is running without any issues. The log should show entries similar to the following:
$ [Warning - Threads.cpp:181]: Failed to change the thread priority (likely due to insufficient permissions)
$ [Information - RealTimeLoader.cpp:111]: Started application in state State1
$ [Information - MARTeApp.cpp:135]: Application starting
Note
The RTThreadSynch is configured with a timeout of 5 seconds, which means that if the sender application is not started within 5 seconds, the receiver application will log a warning message and will continue to wait for data to arrive.
$ [Warning - GAMSchedulerI.cpp:429]: BrokerI GAMThreadSyncIn.InputBroker.RealTimeThreadSynchBroker failed, owner function: GAMThreadSyncIn, owner DataSource: RTThreadSynch
$ [FatalError - GAMScheduler.cpp:233]: Failed to ExecuteSingleCycle().
$ [Warning - GAMSchedulerI.cpp:429]: BrokerI GAMThreadSyncI
Open another terminal and start the sender application (which will be sending at 1 kHz) with:
./MARTeApp.sh -f ../Configurations/MassSpring/RTApp-MassSpring-47-Sender_Gen.cfg -l RealTimeLoader -s State1
Once the application is running, inspect the screen output and verify that it is running without any issues. The log should show entries similar to the following:
$ [Warning - Threads.cpp:181]: Failed to change the thread priority (likely due to insufficient permissions)
$ [Information - RealTimeLoader.cpp:111]: Started application in state State1
$ [Information - MARTeApp.cpp:135]: Application starting
$ [Information - LoggerBroker.cpp:152]: Time [0:0]:0
$ [Information - LoggerBroker.cpp:152]: Time [0:0]:1000000
...
Wait a few seconds and inspect the collected data:
python ../Test/Integrated/plot_mass_spring_csv.py -f ../Test/Integrated/MassSpring-47.csv -s ReferencePosition Position
python ../Test/Integrated/plot_mass_spring_csv.py -f ../Test/Integrated/MassSpring-47.csv -s ThreadReceiverCycleTime ThreadProcessCycleTime
Check that the cycle times are 1000 microseconds for the receiver thread and 10000 microseconds for the processing thread, which confirms that the synchronisation is working correctly.
Exercises
Ex. 1: Running two threads with different models
The objective of this exercise is to synchronise two threads with the same data producer. They will both run at the same frequency (100 Hz) but will implement different models for the mass-spring system (as shown in previous examples).
This use case could be used to compare the performance of two different models and implement a 1oo2 mechanism to switch between the two models in case of a failure.
Edit the file
../Configurations/MassSpring/RTApp-MassSpring-48-Receiver.cfgand add an IOGAM namedGAMThreadSyncIn2. This GAM shall read the signalsCounter,Time,ReferencePositionandThreadReceiverCycleTimefrom theRTThreadSynchDataSource, with 10 samples each, and offer them to the rest of the RealTimeThread as arrays of 10 elements.Add another thread to the
RealTimeStateand name it+ThreadProcess2. Make sure that the following GAMs are executed in the correct order:GAMPerfMonitorProcess2,GAMThreadSyncIn2,GAMDownsample2,GAMController2,GAMMathModel,GAMMathTrigger2andGAMWriter2.Run
make -C ../Configurations/MassSpring/ -f Makefile.cfgto generate the configuration file with the correct UDP port numbers.
make -C ../Configurations/MassSpring/ -f Makefile.cfg
./MARTeApp.sh -f ../Configurations/MassSpring/RTApp-MassSpring-48-Receiver_Gen.cfg -l RealTimeLoader -s State1
Open another terminal and start the sender application (which will be sending at 1 kHz) with:
./MARTeApp.sh -f ../Configurations/MassSpring/RTApp-MassSpring-47-Sender_Gen.cfg -l RealTimeLoader -s State1
Wait a few seconds and inspect the collected data:
python ../Test/Integrated/plot_mass_spring_csv.py -f ../Test/Integrated/MassSpring-48-1.csv -s ReferencePosition Position
python ../Test/Integrated/plot_mass_spring_csv.py -f ../Test/Integrated/MassSpring-48-1.csv -s ThreadReceiverCycleTime ThreadProcess1CycleTime
python ../Test/Integrated/plot_mass_spring_csv.py -f ../Test/Integrated/MassSpring-48-2.csv -s ReferencePosition Position
python ../Test/Integrated/plot_mass_spring_csv.py -f ../Test/Integrated/MassSpring-48-2.csv -s ThreadReceiverCycleTime ThreadProcess2CycleTime
Solution
The solution is to modify the configuration file ../Configurations/MassSpring/RTApp-MassSpring-48-Receiver.cfg and add the IOGAM as described above.
1 +GAMThreadSyncIn2 = {
2 Class = IOGAM
3 InputSignals = {
4 Counter = {
5 DataSource = RTThreadSynch
6 Type = uint32
7 Samples = 10
8 }
9 Time = {
10 DataSource = RTThreadSynch
11 Type = uint32
12 Samples = 10
13 }
14 ReferencePosition = {
15 DataSource = RTThreadSynch
16 Type = float64
17 Samples = 10
18 }
19 ThreadReceiverCycleTime = {
20 DataSource = RTThreadSynch
21 Type = uint32
22 Samples = 10
23 }
24 }
25 OutputSignals = {
26 CounterN10 = {
27 DataSource = DDBProcess2
28 Type = uint32
29 NumberOfElements = 10
30 }
31 TimeN10 = {
32 DataSource = DDBProcess2
33 Type = uint32
34 NumberOfElements = 10
35 }
36 ReferencePositionN10 = {
37 DataSource = DDBProcess2
38 Type = float64
39 NumberOfElements = 10
40 }
41 ThreadReceiverCycleTimeN10 = {
42 DataSource = DDBProcess2
43 Type = uint32
44 NumberOfElements = 10
45 }
46 }
47 }
ThreadProcess2 is then added to the RealTimeState and the GAMs are added in the correct order.
1 +States = {
2 Class = ReferenceContainer
3 +State1 = {
4 Class = RealTimeState
5 +Threads = {
6 Class = ReferenceContainer
7 +ThreadReceiver = {
8 Class = RealTimeThread
9 CPUs = 0x1
10 Functions = {GAMPerfMonitorReceiver GAMUDPReceiver GAMThreadSyncOut}
11 }
12 +ThreadProcess1 = {
13 Class = RealTimeThread
14 CPUs = 0x1
15 Functions = {GAMPerfMonitorProcess1 GAMThreadSyncIn1 GAMDownsample1 GAMController1 GAMSpringMass GAMMathTrigger1 GAMWriter1}
16 }
17 +ThreadProcess2 = {
18 Class = RealTimeThread
19 CPUs = 0x1
20 Functions = {GAMPerfMonitorProcess2 GAMThreadSyncIn2 GAMDownsample2 GAMController2 GAMMathModel GAMMathTrigger2 GAMWriter2}
21 }
22 }
23 }
24 }