123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590 |
- #include "procsm_if.h"
- #include <QTimer>
- #include <iostream>
- procsm_if_readthread::procsm_if_readthread(procsm *pPSM,SMCallBack pCall,const char * strsmname,void * pparent)
- {
- mpparent = pparent;
- mpPSM = pPSM;
- mpCall = pCall;
- strncpy(mstrsmname,strsmname,255);
- #ifndef USE_GROUPUDP
- #ifdef USEDBUS
- bool bconnect = QDBusConnection::sessionBus().connect(QString(),"/catarc/adc", "adc.adciv.modulecomm", strsmname,this,SLOT(onNewMsg(int)));
- if(bconnect == false)
- {
- std::cout<<"procsm_if_readthread::procsm_if_readthread bconect is false"<<std::endl;
- }
- #endif
- #endif
- }
- procsm_if_readthread::procsm_if_readthread(procsm *pPSM,ModuleFun xFun,const char * strsmname,void * pparent)
- {
- mpparent = pparent;
- mpPSM = pPSM;
- mFun = xFun;
- strncpy(mstrsmname,strsmname,255);
- mbFunPlus = true;
- #ifndef USE_GROUPUDP
- #ifdef USEDBUS
- bool bconnect = QDBusConnection::sessionBus().connect(QString(),"/catarc/adc", "adc.adciv.modulecomm", strsmname,this,SLOT(onNewMsg(int)));
- if(bconnect == false)
- {
- std::cout<<"procsm_if_readthread::procsm_if_readthread bconect is false"<<std::endl;
- mbDBUSOK = false;
- QTimer * timer = new QTimer();
- timer->setTimerType(Qt::PreciseTimer);
- delete timer;
- }
- #endif
- #endif
- }
- #ifdef USELCM
- void procsm_if_readthread::handlerMethod(const lcm::ReceiveBuffer *rbuf, const std::string &channel)
- {
- qDebug("lcm receiv data. ");
- mxindex++;
- QDateTime dt = QDateTime::currentDateTime();
- if(mbFunPlus)
- {
- mFun((char *)rbuf->data,rbuf->data_size,mxindex,&dt,mstrsmname);
- }
- else
- {
- (*mpCall)((char *)rbuf->data,rbuf->data_size,mxindex,&dt,mstrsmname);
- }
- }
- #endif
- void procsm_if_readthread::puaseread()
- {
- mbRun = false;
- }
- void procsm_if_readthread::continueread()
- {
- mbRun = true;
- }
- void procsm_if_readthread::run()
- {
- #ifdef USELCM
- mlcm.subscribe(mstrsmname,&procsm_if_readthread::handlerMethod,this);
- while(!QThread::isInterruptionRequested())
- {
- mlcm.handle();
- }
- return;
- #endif
- QTime xTime;
- xTime.start();
- unsigned int nBufLen = 1;
- unsigned int nRead;
- char * str = new char[nBufLen];
- unsigned int index =0;
- QDateTime *pdt = new QDateTime();
- bool bAttach = false;
- bool bFirstRead = true; //if First Read,not read the buffer data.
- while(!QThread::isInterruptionRequested())
- {
- if(mbRun == false)
- {
- msleep(10);
- continue;
- }
- if(bAttach == false)
- {
- bAttach = mpPSM->AttachMem();
- if(bAttach == false)
- {
- msleep(100);
- continue;
- }
- else
- {
- index = mpPSM->getcurrentnext();
- }
- }
- int nRtn = mpPSM->readmsg(index,str,nBufLen,&nRead,pdt);
- if(nRtn == 0)
- {
- #ifndef USE_GROUPUDP
- #ifdef USEDBUS
- if(mbDBUSOK == true)
- {
- mWaitMutex.lock();
- mwc.wait(&mWaitMutex,10);
- mWaitMutex.unlock();
- }
- else
- {
- msleep(1);
- }
- #else
- msleep(1);
- #endif
- #else
- procsm_if * mpif = (procsm_if * )mpparent;
- if(mpif->WaitNotify()<0)
- {
- std::cout<<"Wait Notify Error. Please Check. "<<std::endl;
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
- }
- // mWaitMutex.lock();
- // mwc.wait(&mWaitMutex,100);
- // mWaitMutex.unlock();
- #endif
- }
- else
- {
- if(nRtn == -1)
- {
- nBufLen = nRead;
- delete str;
- if(nBufLen < 1)nBufLen = 1;
- str = new char[nBufLen];
- // std::cout<<"modulecomm resize str to "<<nBufLen<<std::endl;
- }
- else
- {
- if((nRtn == -2) ||(nRtn == -100)||(nRtn == -101))
- {
- index = mpPSM->getcurrentnext();
- if(nRtn == -100) //Because New ShareMemory, read this.
- {
- if((index > 0)&&(bFirstRead == false) )index = index -1;
- }
- }
- else
- {
- if(nRtn >0)
- {
- if(mbFunPlus)
- {
- mFun(str,nRtn,index,pdt,mstrsmname);
- }
- else
- {
- (*mpCall)(str,nRtn,index,pdt,mstrsmname);
- }
- index++;
- }
- else
- {
- usleep(100);
- }
- }
- }
- }
- bFirstRead = false;
- }
- delete str;
- delete pdt;
- // qDebug("Thread finish.");
- }
- #ifdef USE_GROUPUDP
- void procsm_if_readthread::WakeRead()
- {
- mwc.wakeAll();
- }
- #endif
- #ifndef USE_GROUPUDP
- #ifdef USEDBUS
- void procsm_if_readthread::onNewMsg(int x)
- {
- if(x == 100)std::cout<<x<<std::endl;
- mwc.wakeAll();
- // qDebug("wake");
- }
- #endif
- #endif
- procsm_if::procsm_if(const char * strsmname,const unsigned int nBufSize,const unsigned int nMaxPacCount,const int nMode)
- {
- strncpy(mstrsmname,strsmname,255);
- #ifdef USELCM
- if(nMode == procsm::ModeWrite)
- {
- }
- else
- {
- }
- return;
- #endif
- mpPSM = new procsm(strsmname,nBufSize,nMaxPacCount,nMode);
- mnType = nMode;
- mTimer.setTimerType(Qt::PreciseTimer);
- #ifdef USE_GROUPUDP
- InitSock();
- mKey = CalcKey(mstrsmname);
- #endif
- }
- procsm_if::~procsm_if()
- {
- if(mnType == procsm::ModeRead)
- {
- #ifdef USE_GROUPUDP
- mbGroupRecvRun = false;
- // mpthread->join();
- #endif
- QTime xTime;
- xTime.start();
- bool bDel = true;
- mpReadThread->requestInterruption();
- while(!mpReadThread->isFinished())
- {
- if(xTime.elapsed() > 1000)
- {
- bDel = false;
- std::cout<<"procsm_if Thread Read not finish."<<std::endl;
- break;
- }
- }
- if(bDel)delete mpReadThread;
- }
- else
- {
- }
- delete mpPSM;
- }
- int procsm_if::writemsg(const char *str, const unsigned int nSize)
- {
- if(mbRun == false)return -2;
- #ifdef USELCM
- int nres = mlcm.publish(mstrsmname,str,nSize);
- qDebug("publish message. res = %d",nres);
- return 0;
- #endif
- if(mnType == procsm::ModeRead)return -1; //this is listen.
- int nRes = mpPSM->writemsg(str,nSize);
- #ifdef USE_GROUPUDP
- SendGroupMsg();
- #endif
- return nRes;
- }
- #ifdef USELCM
- void procsm_if::handlerMethod(const lcm::ReceiveBuffer *rbuf, const std::string &channel)
- {
- qDebug("receiv data. ");
- }
- #endif
- int procsm_if::listenmsg(SMCallBack pCall)
- {
- //#ifdef USELCM
- //// mlcm.subscribe(mstrsmname,&handlerMethod2);
- // mlcm.subscribe(mstrsmname,&procsm_if::handlerMethod,this);
- // while(true)
- // {
- // mlcm.handle();
- // }
- // return 0;
- //#endif
- if(mnType == procsm::ModeWrite)return -1; //listening.
- mpReadThread = new procsm_if_readthread(mpPSM,pCall,mstrsmname,this);
- // mpReadThread->setPriority(QThread::TimeCriticalPriority);
- // mpReadThread->start();
- mpReadThread->start(QThread::HighestPriority);
- #ifdef USE_GROUPUDP
- // mpthread = new std::thread(&procsm_if::ThreadGroupRecv,this);
- #endif
- // mnType = 1;
- return 0;
- }
- int procsm_if::listenmsg(ModuleFun xFun)
- {
- //#ifdef USELCM
- // mlcm.subscribe(mstrsmname,&procsm_if::handlerMethod,this);
- // while(true)
- // {
- // mlcm.handle();
- // }
- // return 0;
- //#endif
- if(mnType == procsm::ModeWrite)return -1; //listening.
- mpReadThread = new procsm_if_readthread(mpPSM,xFun,mstrsmname,this);
- // mpReadThread->setPriority(QThread::TimeCriticalPriority);
- // mpReadThread->start();
- mpReadThread->start(QThread::HighestPriority);
- #ifdef USE_GROUPUDP
- // mpthread = new std::thread(&procsm_if::ThreadGroupRecv,this);
- #endif
- // mnType = 1;
- return 0;
- }
- void procsm_if::stoplisten()
- {
- if(mnType != 1)return;
- mpReadThread->requestInterruption();
- while(!mpReadThread->isFinished());
- mnType = 0;
- // mpReadThread->deleteLater();
- qDebug("stop listen ok");
- }
- void procsm_if::pausecomm()
- {
- mbRun = false;
- if(mnType == procsm::ModeRead)
- {
- mpReadThread->puaseread();
- }
- }
- void procsm_if::continuecomm()
- {
- mbRun = true;
- if(mnType == procsm::ModeRead)
- {
- mpReadThread->continueread();
- }
- }
- #ifdef USE_GROUPUDP
- void procsm_if::InitSock()
- {
- std::string mcast_ip("236.236.100.100");
- uint16_t mcast_port = 8888;
- notify_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
- if (notify_fd_ == -1) {
- std::cout << "fail to create notify fd, " << strerror(errno);
- return ;
- }
- memset(¬ify_addr_, 0, sizeof(notify_addr_));
- notify_addr_.sin_family = AF_INET;
- notify_addr_.sin_addr.s_addr = inet_addr(mcast_ip.c_str());
- notify_addr_.sin_port = htons(mcast_port);
- listen_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
- if (listen_fd_ == -1) {
- std::cout << "fail to create listen fd, " << strerror(errno);
- return ;
- }
- if (fcntl(listen_fd_, F_SETFL, O_NONBLOCK) == -1) {
- std::cout << "fail to set listen fd nonblock, " << strerror(errno);
- return ;
- }
- memset(&listen_addr_, 0, sizeof(listen_addr_));
- listen_addr_.sin_family = AF_INET;
- listen_addr_.sin_addr.s_addr = htonl(INADDR_ANY);
- listen_addr_.sin_port = htons(mcast_port);
- int yes = 1;
- if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
- std::cout << "fail to setsockopt SO_REUSEADDR, " << strerror(errno);
- return ;
- }
- if (bind(listen_fd_, (struct sockaddr*)&listen_addr_, sizeof(listen_addr_)) <
- 0) {
- std::cout << "fail to bind addr, " << strerror(errno);
- return ;
- }
- int loop = 1;
- if (setsockopt(listen_fd_, IPPROTO_IP, IP_MULTICAST_LOOP, &loop,
- sizeof(loop)) < 0) {
- std::cout << "fail to setsockopt IP_MULTICAST_LOOP, " << strerror(errno);
- return ;
- }
- struct ip_mreq mreq;
- mreq.imr_multiaddr.s_addr = inet_addr(mcast_ip.c_str());
- mreq.imr_interface.s_addr = htonl(INADDR_ANY);
- if (setsockopt(listen_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
- sizeof(mreq)) < 0) {
- std::cout << "fail to setsockopt IP_ADD_MEMBERSHIP, " << strerror(errno);
- return ;
- }
- return ;
- }
- /**
- * @brief procsm_if::WaitNotify
- * @param timeout_ms
- * @return if have notify return 1,otherwise,return 0
- */
- int procsm_if::WaitNotify(int timeout_ms)
- {
- int nrtn = 0;
- struct pollfd fds;
- fds.fd = listen_fd_;
- fds.events = POLLIN;
- int ready_num = poll(&fds, 1, timeout_ms);
- if (ready_num > 0) {
- char buf[32] = {0}; // larger than ReadableInfo::kSize
- ssize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);
- if (nbytes == -1) {
- std::cout << "fail to recvfrom, " << strerror(errno)<<std::endl;
- return -1;
- }
- if(nbytes == 8)
- {
- // std::cout<<"recv key."<<std::endl;
- qint64 * pkey = (qint64 *)buf;
- if(*pkey == mKey)
- {
- return 1;
- // mpReadThread->WakeRead();
- //Waaa;
- }
- else
- {
- // std::cout<<"key error"<<std::endl;
- }
- }
- // return info->DeserializeFrom(buf, nbytes);
- } else if (ready_num == 0) {
- return 0;
- } else {
- nrtn = -2;
- if (errno == EINTR) {
- std::cout << "poll was interrupted."<<std::endl;
- } else {
- std::cout << "fail to poll, " << strerror(errno);
- }
- }
- return nrtn;
- }
- void procsm_if::ThreadGroupRecv()
- {
- int timeout_ms = 100;
- while(mbGroupRecvRun)
- {
- // std::cout<<"poll"<<std::endl;
- struct pollfd fds;
- fds.fd = listen_fd_;
- fds.events = POLLIN;
- int ready_num = poll(&fds, 1, timeout_ms);
- if (ready_num > 0) {
- char buf[32] = {0}; // larger than ReadableInfo::kSize
- ssize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);
- if (nbytes == -1) {
- std::cout << "fail to recvfrom, " << strerror(errno);
- continue;
- }
- if(nbytes == 8)
- {
- // std::cout<<"recv key."<<std::endl;
- qint64 * pkey = (qint64 *)buf;
- if(*pkey == mKey)
- {
- mpReadThread->WakeRead();
- //Waaa;
- }
- else
- {
- // std::cout<<"key error"<<std::endl;
- }
- }
- // return info->DeserializeFrom(buf, nbytes);
- } else if (ready_num == 0) {
- // std::cout << "timeout, no readableinfo.";
- continue;
- } else {
- if (errno == EINTR) {
- std::cout << "poll was interrupted."<<std::endl;
- } else {
- std::cout << "fail to poll, " << strerror(errno);
- }
- }
- }
- }
- qint64 procsm_if::CalcKey(const char *strsmname)
- {
- qint64 xkey = 0;
- int i;
- int nlen = strnlen(strsmname,256);
- for(i=0;i<nlen;i++)
- {
- qint64 temp = *(strsmname+i);
- temp = temp<<(4+i);
- xkey = xkey + temp;
- if(i>=3)break;
- }
- for(i=0;i<nlen;i++)
- {
- qint64 temp = *(strsmname+nlen-i-1);
- temp = temp<<i;
- xkey = xkey + temp;
- if(i>=3)break;
- }
- for(i=0;i<(int)strnlen(strsmname,256);i++)
- {
- qint64 temp = *(strsmname+i);
- xkey = xkey + temp;
- }
- return xkey;
- }
- void procsm_if::SendGroupMsg()
- {
- // qint64 timenow = std::chrono::system_clock::now().time_since_epoch().count();
- ssize_t nbytes =
- sendto(notify_fd_, &mKey, sizeof(qint64), 0,
- (struct sockaddr*)¬ify_addr_, sizeof(notify_addr_));
- if(nbytes <= 0)
- {
- std::cout<<"Send Fail."<<std::endl;
- }
- }
- #endif
|