Browse Source

change modulecomm. change datatime aquire,use std::chrono, latency is down to < 300us.

yuchuli 3 years ago
parent
commit
de72d8acc2

+ 6 - 2
src/common/ivservice/main.cpp

@@ -83,10 +83,14 @@ void threadclient()
 
     nsendsize = nsendsize * 1;
     if(nsendsize >= 1000000)break;
-    std::cout<<"Send Time : "<<QDateTime::currentMSecsSinceEpoch()<<std::endl;
+    qint64 nsendtime = std::chrono::system_clock::now().time_since_epoch().count();
+
     if(iv::service::Client::HAVE_RES ==  xclient.SendRequest(pstr_req,nsendsize,pstr_res,nressize))
     {
-        std::cout<<"Res Time: "<<QDateTime::currentMSecsSinceEpoch()<<std::endl;
+        qint64 nrecvtime = std::chrono::system_clock::now().time_since_epoch().count();
+        qint64 nms = nrecvtime/1000000;
+        qint64 nns = nrecvtime - nms *1000000;
+        std::cout<<"Res Time: "<<nms<<" . "<<nns<<"  diff:"<<(nrecvtime - nsendtime)<<"ns"<<std::endl;
         std::cout<<"   nres: "<<nressize<<std::endl;
     }
     else

+ 11 - 6
src/common/modulecomm/shm/procsm.cpp

@@ -12,6 +12,8 @@
 #include <QFile>
 #include <QDir>
 
+#include "procsm_if.h"
+
 
 //#define RESET   "\033[0m"
 //#define BLACK   "\033[30m"      /* Black */
@@ -648,11 +650,12 @@ WRITEMSG:
     if(nPac == 0)
     {
         memcpy(pD,str,nSize);
-        dt = QDateTime::currentDateTime();
+//        dt = QDateTime::currentDateTime();
     //    phh->mdt = dt;
-        phh->SetDate(dt);
+//        phh->SetDate(dt);
    //     memcpy(&phh->mdt,&dt,sizeof(QDateTime));
    //     phh->mdt = QDateTime::currentDateTime();
+        phh->sendtime = std::chrono::system_clock::now().time_since_epoch().count();
         phh->mindex = mpinfo->mNext;
         phh->mnPos = 0;
         phh->mnLen = nSize;
@@ -672,9 +675,9 @@ WRITEMSG:
             unsigned int nPos = (phh+nPac-1)->mnPos + (phh+nPac-1)->mnLen;
      //       qDebug("write pos = %d",nPos);
             memcpy(pD+nPos,str,nSize);
-            dt = QDateTime::currentDateTime();
-            (phh+nPac)->SetDate(dt);
- //           memcpy(&(phh+nPac)->mdt,&dt,sizeof(QDateTime));
+//            dt = QDateTime::currentDateTime();
+//            (phh+nPac)->SetDate(dt);
+            (phh+nPac)->sendtime = std::chrono::system_clock::now().time_since_epoch().count();
  //           (phh+nPac)->mdt = QDateTime::currentDateTime();
             (phh+nPac)->mindex = mpinfo->mNext;
             (phh+nPac)->mnPos = nPos;
@@ -794,7 +797,9 @@ int procsm::readmsg(unsigned int index, char *str, unsigned int nMaxSize,unsigne
                    memcpy(str,pD + (phh+nPos)->mnPos,(phh+nPos)->mnLen);
           //         qDebug("read pos = %d",(phh+nPos)->mnPos);
                    nRtn = (phh+nPos)->mnLen;
-                   (phh+nPos)->GetDate(pdt);
+  //                 pdt->setDate(QDate(2030,1,1));
+                   pdt->setMSecsSinceEpoch((phh+nPos)->sendtime/1000000);
+//                   (phh+nPos)->GetDate(pdt);
            //        memcpy(pdt,&((phh+nPos)->mdt),sizeof(QDateTime));
                 }
             }

+ 22 - 20
src/common/modulecomm/shm/procsm.h

@@ -43,35 +43,36 @@ public:
 class procsm_head
 {
 public:
-    unsigned short mYear;
-    unsigned char mMonth;
-    unsigned char mDay;
-    unsigned char mHour;
-    unsigned char mMinute;
-    unsigned char mSec;
-    unsigned short mMSec;
+    qint64 sendtime; //ns from epoch.
+//    unsigned short mYear;
+//    unsigned char mMonth;
+//    unsigned char mDay;
+//    unsigned char mHour;
+//    unsigned char mMinute;
+//    unsigned char mSec;
+//    unsigned short mMSec;
     unsigned int mindex;
     unsigned int mnPos;
     unsigned int mnLen;
 public:
     void SetDate(QDateTime dt)
     {
-        mYear = dt.date().year();
-        mMonth = dt.date().month();
-        mDay = dt.date().day();
-        mHour = dt.time().hour();
-        mMinute = dt.time().minute();
-        mSec = dt.time().second();
-        mMSec = dt.time().msec();
+//        mYear = dt.date().year();
+//        mMonth = dt.date().month();
+//        mDay = dt.date().day();
+//        mHour = dt.time().hour();
+//        mMinute = dt.time().minute();
+//        mSec = dt.time().second();
+//        mMSec = dt.time().msec();
     }
     void GetDate(QDateTime * pdt)
     {
-        QDate dt;
-        dt.setDate(mYear,mMonth,mDay);
-        QTime time;
-        time.setHMS(mHour,mMinute,mSec,mMSec);
-        pdt->setDate(dt);
-        pdt->setTime(time);
+//        QDate dt;
+//        dt.setDate(mYear,mMonth,mDay);
+//        QTime time;
+//        time.setHMS(mHour,mMinute,mSec,mMSec);
+//        pdt->setDate(dt);
+//        pdt->setTime(time);
 
     }
 };
@@ -105,6 +106,7 @@ private:
 
     char * mstrtem;
 
+
 public:
     const static int ModeRead = 1;
     const static int ModeWrite = 0;

+ 64 - 9
src/common/modulecomm/shm/procsm_if.cpp

@@ -5,8 +5,9 @@
 
 
 
-procsm_if_readthread::procsm_if_readthread(procsm *pPSM,SMCallBack pCall,const char * strsmname)
+procsm_if_readthread::procsm_if_readthread(procsm *pPSM,SMCallBack pCall,const char * strsmname,void * pparent)
 {
+    mpparent = pparent;
     mpPSM = pPSM;
     mpCall = pCall;
     strncpy(mstrsmname,strsmname,255);
@@ -22,8 +23,9 @@ procsm_if_readthread::procsm_if_readthread(procsm *pPSM,SMCallBack pCall,const c
 #endif
 }
 
-procsm_if_readthread::procsm_if_readthread(procsm *pPSM,ModuleFun xFun,const char * strsmname)
+procsm_if_readthread::procsm_if_readthread(procsm *pPSM,ModuleFun xFun,const char * strsmname,void * pparent)
 {
+    mpparent = pparent;
     mpPSM = pPSM;
     mFun = xFun;
     strncpy(mstrsmname,strsmname,255);
@@ -134,9 +136,15 @@ void procsm_if_readthread::run()
             msleep(1);
 #endif
 #else
-            mWaitMutex.lock();
-            mwc.wait(&mWaitMutex,100);
-            mWaitMutex.unlock();
+            procsm_if * mpif = (procsm_if * )mpparent;
+            if(mpif->WaitNotify()<0)
+            {
+                std::cout<<"Wait Notify Error. Please Check. "<<std::endl;
+                std::this_thread::sleep_for(std::chrono::milliseconds(10));
+            }
+//            mWaitMutex.lock();
+//            mwc.wait(&mWaitMutex,100);
+//            mWaitMutex.unlock();
 #endif
         }
         else
@@ -309,13 +317,13 @@ int procsm_if::listenmsg(SMCallBack pCall)
 //    return 0;
 //#endif
     if(mnType == procsm::ModeWrite)return -1; //listening.
-    mpReadThread = new procsm_if_readthread(mpPSM,pCall,mstrsmname);
+    mpReadThread = new procsm_if_readthread(mpPSM,pCall,mstrsmname,this);
 //    mpReadThread->setPriority(QThread::TimeCriticalPriority);
 //    mpReadThread->start();
     mpReadThread->start(QThread::HighestPriority);
 
 #ifdef USE_GROUPUDP
-    mpthread = new std::thread(&procsm_if::ThreadGroupRecv,this);
+//    mpthread = new std::thread(&procsm_if::ThreadGroupRecv,this);
 
 #endif
 //    mnType = 1;
@@ -333,13 +341,13 @@ int procsm_if::listenmsg(ModuleFun xFun)
 //    return 0;
 //#endif
     if(mnType == procsm::ModeWrite)return -1; //listening.
-    mpReadThread = new procsm_if_readthread(mpPSM,xFun,mstrsmname);
+    mpReadThread = new procsm_if_readthread(mpPSM,xFun,mstrsmname,this);
 //    mpReadThread->setPriority(QThread::TimeCriticalPriority);
 //    mpReadThread->start();
     mpReadThread->start(QThread::HighestPriority);
 
 #ifdef USE_GROUPUDP
-    mpthread = new std::thread(&procsm_if::ThreadGroupRecv,this);
+//    mpthread = new std::thread(&procsm_if::ThreadGroupRecv,this);
 
 #endif
 //    mnType = 1;
@@ -441,7 +449,54 @@ void procsm_if::InitSock()
 
 }
 
+/**
+ * @brief procsm_if::WaitNotify
+ * @param timeout_ms
+ * @return if have notify return 1,otherwise,return 0
+ */
+int procsm_if::WaitNotify(int timeout_ms)
+{
+    int nrtn = 0;
+    struct pollfd fds;
+    fds.fd = listen_fd_;
+    fds.events = POLLIN;
+    int ready_num = poll(&fds, 1, timeout_ms);
+    if (ready_num > 0) {
+        char buf[32] = {0};  // larger than ReadableInfo::kSize
+        ssize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);
+        if (nbytes == -1) {
+            std::cout << "fail to recvfrom, " << strerror(errno);
+            return -1;
+        }
+        if(nbytes == 8)
+        {
+//                std::cout<<"recv key."<<std::endl;
+            qint64 * pkey = (qint64 *)buf;
+            if(*pkey == mKey)
+            {
+                return 1;
+//                mpReadThread->WakeRead();
+                //Waaa;
+            }
+            else
+            {
+//                    std::cout<<"key error"<<std::endl;
+            }
+        }
+//          return info->DeserializeFrom(buf, nbytes);
+    } else if (ready_num == 0) {
+        return 0;
+    } else {
+        nrtn = -2;
+        if (errno == EINTR) {
+            std::cout << "poll was interrupted."<<std::endl;
+        } else {
+            std::cout << "fail to poll, " << strerror(errno);
+        }
+    }
 
+    return nrtn;
+}
 
 void procsm_if::ThreadGroupRecv()
 {

+ 6 - 2
src/common/modulecomm/shm/procsm_if.h

@@ -43,8 +43,8 @@ public:
 #ifdef USELCM
 
 #endif
-    procsm_if_readthread(procsm * pPSM,SMCallBack pCall,const char * strsmname);
-    procsm_if_readthread(procsm * pPSM,ModuleFun xFun,const char * strsmname);
+    procsm_if_readthread(procsm * pPSM,SMCallBack pCall,const char * strsmname,void * pparent);
+    procsm_if_readthread(procsm * pPSM,ModuleFun xFun,const char * strsmname,void * pparent);
 
     void puaseread();
     void continueread();
@@ -73,6 +73,7 @@ private:
     bool mbFunPlus = false;
     bool mbRun = true;
     bool mbDBUSOK = true;
+    void * mpparent;
 
 #ifdef USE_GROUPUDP
 public:
@@ -132,6 +133,9 @@ private:
 
     void SendGroupMsg();
 
+public:
+    int WaitNotify(int timeout_ms = 100);
+
 #endif
 
 

+ 1 - 0
src/common/modulecomm/testmodulecomm.pro

@@ -9,6 +9,7 @@ unix:system("./../../../include/linuxsystemtest.sh ")
 unix:include(./../../../include/systemdef.pri)
 win32: DEFINES += SYSTEM_WIN
 
+DEFINES += USE_GROUPUDP
 
 if(contains(DEFINES,USE_GROUPUDP)){
 QT += network