Browse Source

change modulecommm,add group udp notify message replace dbus, test ok,use DEFINES+= USE_GROUPUDP.

yuchuli 3 years ago
parent
commit
a7026caaea

+ 5 - 0
src/common/modulecomm/modulecomm.pro

@@ -3,6 +3,7 @@ QT -= gui
 QT += dbus
 QT += xml
 
+
 TEMPLATE = lib
 DEFINES += MODULECOMM_LIBRARY
 
@@ -15,6 +16,10 @@ unix:system("./../../../include/linuxsystemtest.sh ")
 unix:include(./../../../include/systemdef.pri)
 win32: DEFINES += SYSTEM_WIN
 
+
+if(contains(DEFINES,USE_GROUPUDP)){
+QT += network
+}
 DEFINES += USEDBUS
 
 CONFIG += c++11

+ 10 - 0
src/common/modulecomm/shm/procsm.cpp

@@ -214,9 +214,12 @@ procsm::procsm(const char * strsmname,const unsigned int nBufSize,const unsigned
         mmodulemsg_type.mnPID = getpid();
 #endif
 
+
+#ifndef USE_GROUPUDP
 #ifdef USEDBUS
         mmsg = QDBusMessage::createSignal("/catarc/adc",  "adc.adciv.modulecomm", strsmname);
         mmsg<<1;
+#endif
 #endif
 
         bool bAttach= true;
@@ -300,6 +303,7 @@ procsm::procsm(const char * strsmname,const unsigned int nBufSize,const unsigned
     //        qDebug("attach successful");
             mstrtem = new char[mnBufSize];
 
+#ifndef USE_GROUPUDP
 #ifdef USEDBUS
             mmsgres = QDBusMessage::createSignal("/catarc/adc",  "adciv.interface", "modulemsgres");
             mmsgres<<1;
@@ -309,6 +313,7 @@ procsm::procsm(const char * strsmname,const unsigned int nBufSize,const unsigned
             {
                 std::cout<<"procsm_if_readthread::procsm_if_readthread bconect is false"<<std::endl;
             }
+#endif
 #endif
         }
         else
@@ -449,6 +454,7 @@ void procsm::recreateasm(int nbufsize)
 
 }
 
+#ifndef USE_GROUPUDP
 #ifdef USEDBUS
     void procsm::onQuery()
     {
@@ -462,6 +468,7 @@ void procsm::recreateasm(int nbufsize)
     }
 
 #endif
+#endif
 
 bool procsm::AttachMem()
 {
@@ -692,9 +699,12 @@ WRITEMSG:
 
     mpinfo->mLock = 0;
     mpASM->unlock();
+
+#ifndef USE_GROUPUDP
 #ifdef USEDBUS
     QDBusConnection::sessionBus().send(mmsg);
 #endif
+#endif
 //    std::cout<<"write msg."<<std::endl;
     return 0;
 }

+ 11 - 2
src/common/modulecomm/shm/procsm.h

@@ -8,11 +8,15 @@
 #include <QVariant>
 #include <thread>
 
+#ifndef USE_GROUPUDP
 #ifdef USEDBUS
 #include <QtDBus/QDBusMessage>
 #include <QtDBus/QDBusConnection>
 
 #endif
+#endif
+
+
 
 #include "ivmodulemsg_type.h"
 
@@ -74,10 +78,10 @@ public:
 
 class procsm : public QObject
 {
-#ifdef USEDBUS
+//#ifdef USEDBUS
     Q_OBJECT
 
-#endif
+//#endif
 public:
     procsm(const char * strsmname,const unsigned int nBufSize,const unsigned int nMaxPacCount,const int nMode);
     ~procsm();
@@ -107,17 +111,22 @@ public:
 
     iv::modulemsg_type mmodulemsg_type;
 
+#ifndef USE_GROUPUDP
 #ifdef USEDBUS
 private slots:
     void onQuery();
 
+#endif
 #endif
 private:
+
+#ifndef USE_GROUPUDP
 #ifdef USEDBUS
     QDBusMessage mmsg;
     QDBusMessage mmsgres;  //Response Message Query;
 
 #endif
+#endif
 
 private:
     int checkasm();

+ 212 - 1
src/common/modulecomm/shm/procsm_if.cpp

@@ -11,6 +11,7 @@ procsm_if_readthread::procsm_if_readthread(procsm *pPSM,SMCallBack pCall,const c
     mpCall = pCall;
     strncpy(mstrsmname,strsmname,255);
 
+#ifndef USE_GROUPUDP
 #ifdef USEDBUS
     bool bconnect = QDBusConnection::sessionBus().connect(QString(),"/catarc/adc",  "adc.adciv.modulecomm", strsmname,this,SLOT(onNewMsg(int)));
     if(bconnect == false)
@@ -18,6 +19,7 @@ procsm_if_readthread::procsm_if_readthread(procsm *pPSM,SMCallBack pCall,const c
         std::cout<<"procsm_if_readthread::procsm_if_readthread bconect is false"<<std::endl;
     }
 #endif
+#endif
 }
 
 procsm_if_readthread::procsm_if_readthread(procsm *pPSM,ModuleFun xFun,const char * strsmname)
@@ -27,6 +29,7 @@ procsm_if_readthread::procsm_if_readthread(procsm *pPSM,ModuleFun xFun,const cha
     strncpy(mstrsmname,strsmname,255);
     mbFunPlus = true;
 
+#ifndef USE_GROUPUDP
 #ifdef USEDBUS
     bool bconnect = QDBusConnection::sessionBus().connect(QString(),"/catarc/adc",  "adc.adciv.modulecomm", strsmname,this,SLOT(onNewMsg(int)));
     if(bconnect == false)
@@ -38,6 +41,7 @@ procsm_if_readthread::procsm_if_readthread(procsm *pPSM,ModuleFun xFun,const cha
         delete timer;
     }
 #endif
+#endif
 }
 
 
@@ -114,6 +118,7 @@ void procsm_if_readthread::run()
         int nRtn = mpPSM->readmsg(index,str,nBufLen,&nRead,pdt);
         if(nRtn == 0)
         {
+#ifndef USE_GROUPUDP
 #ifdef USEDBUS
             if(mbDBUSOK == true)
             {
@@ -127,6 +132,11 @@ void procsm_if_readthread::run()
             }
 #else
             msleep(1);
+#endif
+#else
+            mWaitMutex.lock();
+            mwc.wait(&mWaitMutex,100);
+            mWaitMutex.unlock();
 #endif
         }
         else
@@ -177,6 +187,16 @@ void procsm_if_readthread::run()
 //    qDebug("Thread finish.");
 }
 
+
+#ifdef USE_GROUPUDP
+    void procsm_if_readthread::WakeRead()
+    {
+        mwc.wakeAll();
+    }
+
+#endif
+
+#ifndef USE_GROUPUDP
 #ifdef USEDBUS
 
 void procsm_if_readthread::onNewMsg(int x)
@@ -186,6 +206,7 @@ void procsm_if_readthread::onNewMsg(int x)
 //    qDebug("wake");
 }
 
+#endif
 #endif
 
 procsm_if::procsm_if(const char * strsmname,const unsigned int nBufSize,const unsigned int nMaxPacCount,const int nMode)
@@ -207,12 +228,21 @@ procsm_if::procsm_if(const char * strsmname,const unsigned int nBufSize,const un
 
     mTimer.setTimerType(Qt::PreciseTimer);
 
+#ifdef USE_GROUPUDP
+    InitSock();
+    mKey = CalcKey(mstrsmname);
+#endif
+
 }
 
 procsm_if::~procsm_if()
 {
     if(mnType == procsm::ModeRead)
     {
+#ifdef USE_GROUPUDP
+        mbGroupRecvRun = false;
+        mpthread->join();
+#endif
 
         QTime xTime;
         xTime.start();
@@ -228,6 +258,12 @@ procsm_if::~procsm_if()
             }
         }
         if(bDel)delete mpReadThread;
+
+
+    }
+    else
+    {
+
     }
     delete mpPSM;
 }
@@ -243,7 +279,13 @@ int procsm_if::writemsg(const char *str, const unsigned int nSize)
     return 0;
 #endif
     if(mnType == procsm::ModeRead)return -1; //this is listen.
-    return mpPSM->writemsg(str,nSize);
+    int nRes =  mpPSM->writemsg(str,nSize);
+
+#ifdef USE_GROUPUDP
+    SendGroupMsg();
+#endif
+
+    return nRes;
 }
 
 #ifdef USELCM
@@ -252,6 +294,9 @@ int procsm_if::writemsg(const char *str, const unsigned int nSize)
         qDebug("receiv data. ");
     }
 #endif
+
+
+
 int procsm_if::listenmsg(SMCallBack pCall)
 {
 //#ifdef USELCM
@@ -268,6 +313,11 @@ int procsm_if::listenmsg(SMCallBack pCall)
 //    mpReadThread->setPriority(QThread::TimeCriticalPriority);
 //    mpReadThread->start();
     mpReadThread->start(QThread::HighestPriority);
+
+#ifdef USE_GROUPUDP
+    mpthread = new std::thread(&procsm_if::ThreadGroupRecv,this);
+
+#endif
 //    mnType = 1;
     return 0;
 }
@@ -287,6 +337,11 @@ int procsm_if::listenmsg(ModuleFun xFun)
 //    mpReadThread->setPriority(QThread::TimeCriticalPriority);
 //    mpReadThread->start();
     mpReadThread->start(QThread::HighestPriority);
+
+#ifdef USE_GROUPUDP
+    mpthread = new std::thread(&procsm_if::ThreadGroupRecv,this);
+
+#endif
 //    mnType = 1;
     return 0;
 }
@@ -319,5 +374,161 @@ void procsm_if::continuecomm()
     }
 }
 
+#ifdef USE_GROUPUDP
+
+void procsm_if::InitSock()
+{
+    std::string mcast_ip("236.236.100.100");
+     uint16_t mcast_port = 8888;
+
+
+     notify_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
+     if (notify_fd_ == -1) {
+       std::cout << "fail to create notify fd, " << strerror(errno);
+       return ;
+     }
+
+     memset(&notify_addr_, 0, sizeof(notify_addr_));
+     notify_addr_.sin_family = AF_INET;
+     notify_addr_.sin_addr.s_addr = inet_addr(mcast_ip.c_str());
+     notify_addr_.sin_port = htons(mcast_port);
+
+     listen_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
+     if (listen_fd_ == -1) {
+       std::cout << "fail to create listen fd, " << strerror(errno);
+       return ;
+     }
+
+     if (fcntl(listen_fd_, F_SETFL, O_NONBLOCK) == -1) {
+       std::cout << "fail to set listen fd nonblock, " << strerror(errno);
+       return ;
+     }
+
+     memset(&listen_addr_, 0, sizeof(listen_addr_));
+     listen_addr_.sin_family = AF_INET;
+     listen_addr_.sin_addr.s_addr = htonl(INADDR_ANY);
+     listen_addr_.sin_port = htons(mcast_port);
+
+     int yes = 1;
+     if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
+       std::cout << "fail to setsockopt SO_REUSEADDR, " << strerror(errno);
+       return ;
+     }
+
+     if (bind(listen_fd_, (struct sockaddr*)&listen_addr_, sizeof(listen_addr_)) <
+         0) {
+       std::cout << "fail to bind addr, " << strerror(errno);
+       return ;
+     }
+
+     int loop = 1;
+     if (setsockopt(listen_fd_, IPPROTO_IP, IP_MULTICAST_LOOP, &loop,
+                    sizeof(loop)) < 0) {
+       std::cout << "fail to setsockopt IP_MULTICAST_LOOP, " << strerror(errno);
+       return ;
+     }
+
+     struct ip_mreq mreq;
+     mreq.imr_multiaddr.s_addr = inet_addr(mcast_ip.c_str());
+     mreq.imr_interface.s_addr = htonl(INADDR_ANY);
+     if (setsockopt(listen_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
+                    sizeof(mreq)) < 0) {
+       std::cout << "fail to setsockopt IP_ADD_MEMBERSHIP, " << strerror(errno);
+       return ;
+     }
+
+     return ;
+
+}
+
+
+
+void procsm_if::ThreadGroupRecv()
+{
+    int timeout_ms = 100;
+    while(mbGroupRecvRun)
+    {
+ //       std::cout<<"poll"<<std::endl;
+        struct pollfd fds;
+        fds.fd = listen_fd_;
+        fds.events = POLLIN;
+        int ready_num = poll(&fds, 1, timeout_ms);
+        if (ready_num > 0) {
+            char buf[32] = {0};  // larger than ReadableInfo::kSize
+            ssize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);
+            if (nbytes == -1) {
+                std::cout << "fail to recvfrom, " << strerror(errno);
+                continue;
+            }
+            if(nbytes == 8)
+            {
+//                std::cout<<"recv key."<<std::endl;
+                qint64 * pkey = (qint64 *)buf;
+                if(*pkey == mKey)
+                {
+                    mpReadThread->WakeRead();
+                    //Waaa;
+                }
+                else
+                {
+                    std::cout<<"key error"<<std::endl;
+                }
+            }
+  //          return info->DeserializeFrom(buf, nbytes);
+        } else if (ready_num == 0) {
+//            std::cout << "timeout, no readableinfo.";
+            continue;
+        } else {
+            if (errno == EINTR) {
+                std::cout << "poll was interrupted."<<std::endl;
+            } else {
+                std::cout << "fail to poll, " << strerror(errno);
+            }
+        }
+    }
+}
+
+qint64 procsm_if::CalcKey(const char *strsmname)
+{
+    qint64 xkey = 0;
+    unsigned int i;
+    int nlen = strnlen(strsmname,256);
+    for(i=0;i<nlen;i++)
+    {
+        qint64 temp = *(strsmname+i);
+        temp = temp<<(4+i);
+        xkey = xkey + temp;
+        if(i>=3)break;
+    }
+    for(i=0;i<nlen;i++)
+    {
+        qint64 temp = *(strsmname+nlen-i-1);
+        temp = temp<<i;
+        xkey = xkey + temp;
+        if(i>=3)break;
+    }
+    for(i=0;i<strnlen(strsmname,256);i++)
+    {
+        qint64 temp = *(strsmname+i);
+        xkey = xkey + temp;
+    }
+    return xkey;
+
+}
+
+void procsm_if::SendGroupMsg()
+{
+//    qint64 timenow = std::chrono::system_clock::now().time_since_epoch().count();
+    ssize_t nbytes =
+        sendto(notify_fd_, &mKey, sizeof(qint64), 0,
+               (struct sockaddr*)&notify_addr_, sizeof(notify_addr_));
+
+    if(nbytes <= 0)
+    {
+        std::cout<<"Send Fail."<<std::endl;
+    }
+}
+#endif
+
 
 

+ 41 - 0
src/common/modulecomm/shm/procsm_if.h

@@ -7,10 +7,27 @@
 #include <QMutex>
 #include <functional>
 
+#ifndef USE_GROUPUDP
 #ifdef USEDBUS
 #include <QtDBus/QDBusMessage>
 #include <QtDBus/QDBusConnection>
 
+#endif
+#endif
+
+
+#ifdef USE_GROUPUDP
+
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <cstring>
+#include <string>
+#include <thread>
+
 #endif
 
 #include "procsm.h"
@@ -32,9 +49,11 @@ public:
     void puaseread();
     void continueread();
 private slots:
+#ifndef USE_GROUPUDP
 #ifdef USEDBUS
     void onNewMsg(int x);
 #endif
+#endif
 private:
 
 #ifdef USELCM
@@ -54,6 +73,11 @@ private:
     bool mbFunPlus = false;
     bool mbRun = true;
     bool mbDBUSOK = true;
+
+#ifdef USE_GROUPUDP
+public:
+    void WakeRead();
+#endif
 };
 
 class procsm_if
@@ -92,6 +116,23 @@ private:
 
 
 
+#ifdef USE_GROUPUDP
+    int notify_fd_ = -1;
+    struct sockaddr_in notify_addr_;
+    int listen_fd_ = -1;
+    struct sockaddr_in listen_addr_;
+    void ThreadGroupRecv();
+    void InitSock();
+    std::thread * mpthread;
+    bool mbGroupRecvRun = true;
+
+    qint64 mKey;
+
+    qint64 CalcKey(const char * strsmname);
+
+    void SendGroupMsg();
+
+#endif
 
 
 };

+ 15 - 0
src/common/modulecomm/testmodulecomm.pro

@@ -1,5 +1,20 @@
 QT -= gui
 
+unix:DEFINES += RUNSYSTEMTEST
+
+if(contains(DEFINES,RUNSYSTEMTEST)){
+unix:system("./../../../include/linuxsystemtest.sh ")
+}
+
+unix:include(./../../../include/systemdef.pri)
+win32: DEFINES += SYSTEM_WIN
+
+
+if(contains(DEFINES,USE_GROUPUDP)){
+QT += network
+}
+
+
 QT += dbus
 QT       += xml