EMIPLIB

miprtpsynchronizer.h

Go to the documentation of this file.
00001 /*
00002     
00003   This file is a part of EMIPLIB, the EDM Media over IP Library.
00004   
00005   Copyright (C) 2006-2011  Hasselt University - Expertise Centre for
00006                       Digital Media (EDM) (http://www.edm.uhasselt.be)
00007 
00008   This library is free software; you can redistribute it and/or
00009   modify it under the terms of the GNU Lesser General Public
00010   License as published by the Free Software Foundation; either
00011   version 2.1 of the License, or (at your option) any later version.
00012 
00013   This library is distributed in the hope that it will be useful,
00014   but WITHOUT ANY WARRANTY; without even the implied warranty of
00015   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00016   Lesser General Public License for more details.
00017 
00018   You should have received a copy of the GNU Lesser General Public
00019   License along with this library; if not, write to the Free Software
00020   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  
00021   USA
00022 
00023 */
00024 
00029 #ifndef MIPRTPSYNCHRONIZER_H
00030 
00031 #define MIPRTPSYNCHRONIZER_H
00032 
00033 #include "mipconfig.h"
00034 #include "miperrorbase.h"
00035 #include "miptime.h"
00036 #include <jthread/jmutex.h>
00037 #include <list>
00038 #include <map>
00039 #include <string.h>
00040 
00041 //#include <iostream>
00042 
00049 class EMIPLIB_IMPORTEXPORT MIPRTPSynchronizer : public MIPErrorBase
00050 {
00051 public:
00052         MIPRTPSynchronizer();
00053         ~MIPRTPSynchronizer();
00054 
00060         void lock()                                                                             { m_mutex.Lock(); }
00061 
00063         void unlock()                                                                           { m_mutex.Unlock(); }
00064 
00066         void clear();
00067         
00071         void setTolerance(MIPTime t)                                                            { m_tolerance = t; }
00072         
00073         bool registerStream(const uint8_t *pCName, size_t cnameLength, real_t timestampUnit, int64_t *streamID);
00074         bool unregisterStream(int64_t streamID);
00075         bool setStreamInfo(int64_t streamID, MIPTime SRwallclock, uint32_t SRtimestamp, uint32_t curTimestamp,
00076                            MIPTime outputStreamOffset, MIPTime totalComponentDelay);
00077         MIPTime calculateSynchronizationOffset(int64_t streamID);
00078 private:
00079         class CNameInfo
00080         {
00081         public:
00082                 CNameInfo(uint8_t *pCName = 0, size_t cnameLength = 0)                          { m_pCName = pCName; m_cnameLength = cnameLength; }
00083                 ~CNameInfo()                                                                    { }
00084                 uint8_t *getCName() const                                                       { return m_pCName; }
00085                 size_t getCNameLength() const                                                   { return m_cnameLength; }
00086                 
00087                 bool operator()(CNameInfo c1, CNameInfo c2) const
00088                 {
00089                         if (c1.getCNameLength() < c2.getCNameLength())
00090                                 return true;
00091                         if (c1.getCNameLength() > c2.getCNameLength())
00092                                 return false;
00093                         if (memcmp(c1.getCName(), c2.getCName(), c1.getCNameLength()) >= 0)
00094                                 return false;
00095                         return true;
00096                 }
00097         private:
00098                 uint8_t *m_pCName;
00099                 size_t m_cnameLength;
00100         };
00101         
00102         class StreamGroup;
00103         
00104         class StreamInfo
00105         {
00106         public:
00107                 StreamInfo(StreamGroup *pGroup, real_t tsUnit)                                  { m_pGroup = pGroup; m_infoSet = false; m_tsUnit = tsUnit; }
00108                 ~StreamInfo()                                                                   { }
00109                 StreamGroup *getGroup()                                                         { return m_pGroup; }
00110                 bool isInfoSet() const                                                          { return m_infoSet; }
00111                 void setInfo(MIPTime SRwallclock, uint32_t SRtimestamp, uint32_t curTimestamp,
00112                              MIPTime outputStreamOffset, MIPTime totalComponentDelay)           
00113                 {
00114                         m_infoSet = true;
00115                         m_lastInfoUpdateTime = MIPTime::getCurrentTime();
00116                         m_SRwallclock = SRwallclock;
00117                         m_SRtimestamp = SRtimestamp;
00118                         m_outputStreamOffset = outputStreamOffset;
00119                         m_totalComponentDelay = totalComponentDelay;
00120                         m_lastTimestamp = curTimestamp;
00121                 }
00122                 MIPTime getLastUpdateTime() const                                               { return m_lastInfoUpdateTime; }
00123                 uint32_t getLastTimestamp() const                                               { return m_lastTimestamp; }
00124                 MIPTime getSRWallclockTime() const                                              { return m_SRwallclock; }
00125                 uint32_t getSRTimestamp() const                                                 { return m_SRtimestamp; }
00126                 MIPTime getOutputStreamOffset() const                                           { return m_outputStreamOffset; }
00127                 MIPTime getTotalComponentDelay() const                                          { return m_totalComponentDelay; }
00128                 real_t getTimestampUnit() const                                                 { return m_tsUnit; }
00129 
00130                 MIPTime getSynchronizationOffset() const                                        { return m_syncOffset; }
00131                 
00132                 void calculateRemoteWallclockTime(MIPTime refTime)
00133                 {
00134                         MIPTime lastTimeDiff = refTime;
00135                         lastTimeDiff -= m_lastInfoUpdateTime;
00136 
00137 //                      std::cout << std::endl;
00138 //                      std::cout << "lastTimeDiff: " << lastTimeDiff.getString() << std::endl;
00139 
00140                         int32_t tsDiff;
00141                         
00142                         if ((m_lastTimestamp - m_SRtimestamp) < 0x80000000)
00143                                 tsDiff = (int32_t)(m_lastTimestamp - m_SRtimestamp);
00144                         else
00145                                 tsDiff = -(int32_t)(m_SRtimestamp - m_lastTimestamp);
00146 
00147                         MIPTime tsTimeDiff = MIPTime(((real_t)tsDiff)*m_tsUnit);
00148 
00149 //                      std::cout << "tsTimeDiff: " << tsTimeDiff.getString() << std::endl;
00150 
00151                         MIPTime wallclock = m_SRwallclock;
00152 
00153 //                      std::cout << "m_SRwallclock: " << wallclock.getString() << std::endl;
00154                         
00155                         wallclock += tsTimeDiff;
00156                         wallclock += lastTimeDiff;
00157 
00158                         // 'wallclock' would be our result if there would be no component delay, output stream delay etc
00159                         // Because of this, a sample played at 'refTime' will actually be older (an earlier timestamp)
00160 
00161 //                      std::cout << "outputStreamOffset: " << m_outputStreamOffset.getString() << std::endl;
00162 //                      std::cout << "totalComponentDelay: " << m_totalComponentDelay.getString() << std::endl;
00163 //                      std::cout << "syncOffset: " << m_syncOffset.getString() << std::endl;
00164                         
00165                         wallclock -= m_outputStreamOffset;
00166                         wallclock -= m_totalComponentDelay;
00167                         wallclock -= m_syncOffset;
00168 
00169 //                      std::cout << "Remote wallclock: " << wallclock.getString() << std::endl << std::endl;
00170                         
00171                         m_remoteWallclockTime = wallclock;
00172                 }
00173                 MIPTime getRemoteWallclockTime() const                                          { return m_remoteWallclockTime; }
00174                 void setOffsetAdjustment(MIPTime a)                                             { m_adjustment = a; }
00175                 void acceptAdjustment()                                                         { m_syncOffset += m_adjustment; }
00176                 void adjustOffset(MIPTime t)                                                    { m_syncOffset -= t; }
00177         private:
00178                 StreamGroup *m_pGroup;
00179                 bool m_infoSet;
00180                 MIPTime m_lastInfoUpdateTime;
00181                 MIPTime m_SRwallclock;
00182                 MIPTime m_outputStreamOffset;
00183                 MIPTime m_totalComponentDelay;
00184                 uint32_t m_SRtimestamp, m_lastTimestamp;
00185                 real_t m_tsUnit;
00186 
00187                 MIPTime m_syncOffset;
00188                 MIPTime m_adjustment;
00189                 MIPTime m_remoteWallclockTime;
00190         };
00191 
00192         class StreamGroup
00193         {
00194         public:
00195                 StreamGroup()                                                                   { }
00196                 ~StreamGroup()                                                                  { }
00197                 std::list<StreamInfo *> &streams()                                              { return m_streams; }
00198                 void setStreamsChanged(bool f)                                                  { m_streamsChanged = f; }
00199                 bool didStreamsChange() const                                                   { return m_streamsChanged; }
00200                 MIPTime getLastCalculationTime() const                                          { return m_lastCalcTime; }
00201                 MIPTime calculateOffsets()
00202                 {
00203                         MIPTime referenceTime = MIPTime::getCurrentTime();
00204                         m_lastCalcTime = referenceTime;
00205                         
00206                         std::list<StreamInfo *>::iterator it;
00207                         MIPTime maxRemoteWallclockTime, minRemoteWallclockTime; 
00208                         bool extSet = false;
00209                         int i = 0;
00210                         
00211                         for (it = m_streams.begin() ; it != m_streams.end() ; it++, i++)
00212                         {
00213                                 if ((*it)->isInfoSet())
00214                                 {
00215                                         //std::cout << "Calculating " << i << std::endl;
00216                                         (*it)->calculateRemoteWallclockTime(referenceTime);
00217                                         
00218                                         MIPTime t = (*it)->getRemoteWallclockTime();
00219 
00220                                         if (!extSet)
00221                                         {
00222                                                 extSet = true;
00223                                                 maxRemoteWallclockTime = t;
00224                                                 minRemoteWallclockTime = t;
00225                                         }
00226                                         else
00227                                         {
00228                                                 if (t > maxRemoteWallclockTime)
00229                                                         maxRemoteWallclockTime = t;
00230                                                 if (t < minRemoteWallclockTime)
00231                                                         minRemoteWallclockTime = t;
00232                                         }
00233                                 }
00234                 //              else
00235                 //                      std::cout << "Not calculating " << i << std::endl;
00236                         }
00237 
00238                         MIPTime diff = maxRemoteWallclockTime;
00239                         diff -= minRemoteWallclockTime;
00240 
00241                         for (it = m_streams.begin() ; it != m_streams.end() ; it++)
00242                         {
00243                                 if ((*it)->isInfoSet())
00244                                 {
00245                                         MIPTime adjustment = (*it)->getRemoteWallclockTime();
00246                                         adjustment -= minRemoteWallclockTime;
00247                                         (*it)->setOffsetAdjustment(adjustment);
00248                                 }
00249                         }
00250                         
00251                         return diff;
00252                 }
00253                 
00254                 void acceptNewOffsets()
00255                 {
00256                         std::list<StreamInfo *>::iterator it;
00257                         MIPTime minOffset;
00258                         bool extSet = false;
00259 
00260                         for (it = m_streams.begin() ; it != m_streams.end() ; it++)
00261                         {
00262                                 if ((*it)->isInfoSet())
00263                                 {
00264                                         (*it)->acceptAdjustment();
00265                                         MIPTime t2 = (*it)->getSynchronizationOffset();
00266                                         
00267                                         if (!extSet)
00268                                         {
00269                                                 extSet = true;
00270                                                 minOffset = t2;
00271                                         }
00272                                         else
00273                                         {
00274                                                 if (t2 < minOffset)
00275                                                         minOffset = t2;
00276                                         }
00277                                 }
00278                         }
00279 
00280                         for (it = m_streams.begin() ; it != m_streams.end() ; it++)
00281                         {
00282                                 if ((*it)->isInfoSet())
00283                                         (*it)->adjustOffset(minOffset);
00284                         }
00285                 }
00286         private:
00287                 std::list<StreamInfo *> m_streams;
00288                 bool m_streamsChanged;
00289                 MIPTime m_lastCalcTime;
00290         };
00291 
00292         std::map<CNameInfo, StreamGroup *, CNameInfo> m_cnameTable;
00293         std::map<int64_t, StreamInfo *> m_streamIDTable;
00294         
00295         MIPTime m_tolerance;
00296         int64_t m_nextStreamID;
00297         jthread::JMutex m_mutex;
00298 };
00299 
00300 #endif // MIPRTPSYNCHRONIZER_H
00301