#include "procsm_if.h" #include #include procsm_if_readthread::procsm_if_readthread(procsm *pPSM,SMCallBack pCall,const char * strsmname,void * pparent) { mpparent = pparent; mpPSM = pPSM; mpCall = pCall; strncpy(mstrsmname,strsmname,255); #ifndef USE_GROUPUDP #ifdef USEDBUS bool bconnect = QDBusConnection::sessionBus().connect(QString(),"/catarc/adc", "adc.adciv.modulecomm", strsmname,this,SLOT(onNewMsg(int))); if(bconnect == false) { std::cout<<"procsm_if_readthread::procsm_if_readthread bconect is false"<setTimerType(Qt::PreciseTimer); delete timer; } #endif #endif } #ifdef USELCM void procsm_if_readthread::handlerMethod(const lcm::ReceiveBuffer *rbuf, const std::string &channel) { qDebug("lcm receiv data. "); mxindex++; QDateTime dt = QDateTime::currentDateTime(); if(mbFunPlus) { mFun((char *)rbuf->data,rbuf->data_size,mxindex,&dt,mstrsmname); } else { (*mpCall)((char *)rbuf->data,rbuf->data_size,mxindex,&dt,mstrsmname); } } #endif void procsm_if_readthread::puaseread() { mbRun = false; } void procsm_if_readthread::continueread() { mbRun = true; } void procsm_if_readthread::run() { #ifdef USELCM mlcm.subscribe(mstrsmname,&procsm_if_readthread::handlerMethod,this); while(!QThread::isInterruptionRequested()) { mlcm.handle(); } return; #endif QTime xTime; xTime.start(); unsigned int nBufLen = 1; unsigned int nRead; char * str = new char[nBufLen]; unsigned int index =0; QDateTime *pdt = new QDateTime(); bool bAttach = false; bool bFirstRead = true; //if First Read,not read the buffer data. while(!QThread::isInterruptionRequested()) { if(mbRun == false) { msleep(10); continue; } if(bAttach == false) { bAttach = mpPSM->AttachMem(); if(bAttach == false) { msleep(100); continue; } else { index = mpPSM->getcurrentnext(); } } int nRtn = mpPSM->readmsg(index,str,nBufLen,&nRead,pdt); if(nRtn == 0) { #ifndef USE_GROUPUDP #ifdef USEDBUS if(mbDBUSOK == true) { mWaitMutex.lock(); mwc.wait(&mWaitMutex,10); mWaitMutex.unlock(); } else { msleep(1); } #else msleep(1); #endif #else procsm_if * mpif = (procsm_if * )mpparent; if(mpif->WaitNotify()<0) { std::cout<<"Wait Notify Error. Please Check. "<getcurrentnext(); if(nRtn == -100) //Because New ShareMemory, read this. { if((index > 0)&&(bFirstRead == false) )index = index -1; } } else { if(nRtn >0) { if(mbFunPlus) { mFun(str,nRtn,index,pdt,mstrsmname); } else { (*mpCall)(str,nRtn,index,pdt,mstrsmname); } index++; } else { usleep(100); } } } } bFirstRead = false; } delete str; delete pdt; // qDebug("Thread finish."); } #ifdef USE_GROUPUDP void procsm_if_readthread::WakeRead() { mwc.wakeAll(); } #endif #ifndef USE_GROUPUDP #ifdef USEDBUS void procsm_if_readthread::onNewMsg(int x) { if(x == 100)std::cout<join(); #endif QTime xTime; xTime.start(); bool bDel = true; mpReadThread->requestInterruption(); while(!mpReadThread->isFinished()) { if(xTime.elapsed() > 1000) { bDel = false; std::cout<<"procsm_if Thread Read not finish."<writemsg(str,nSize); #ifdef USE_GROUPUDP SendGroupMsg(); #endif return nRes; } #ifdef USELCM void procsm_if::handlerMethod(const lcm::ReceiveBuffer *rbuf, const std::string &channel) { qDebug("receiv data. "); } #endif int procsm_if::listenmsg(SMCallBack pCall) { //#ifdef USELCM //// mlcm.subscribe(mstrsmname,&handlerMethod2); // mlcm.subscribe(mstrsmname,&procsm_if::handlerMethod,this); // while(true) // { // mlcm.handle(); // } // return 0; //#endif if(mnType == procsm::ModeWrite)return -1; //listening. mpReadThread = new procsm_if_readthread(mpPSM,pCall,mstrsmname,this); // mpReadThread->setPriority(QThread::TimeCriticalPriority); // mpReadThread->start(); mpReadThread->start(QThread::HighestPriority); #ifdef USE_GROUPUDP // mpthread = new std::thread(&procsm_if::ThreadGroupRecv,this); #endif // mnType = 1; return 0; } int procsm_if::listenmsg(ModuleFun xFun) { //#ifdef USELCM // mlcm.subscribe(mstrsmname,&procsm_if::handlerMethod,this); // while(true) // { // mlcm.handle(); // } // return 0; //#endif if(mnType == procsm::ModeWrite)return -1; //listening. mpReadThread = new procsm_if_readthread(mpPSM,xFun,mstrsmname,this); // mpReadThread->setPriority(QThread::TimeCriticalPriority); // mpReadThread->start(); mpReadThread->start(QThread::HighestPriority); #ifdef USE_GROUPUDP // mpthread = new std::thread(&procsm_if::ThreadGroupRecv,this); #endif // mnType = 1; return 0; } void procsm_if::stoplisten() { if(mnType != 1)return; mpReadThread->requestInterruption(); while(!mpReadThread->isFinished()); mnType = 0; // mpReadThread->deleteLater(); qDebug("stop listen ok"); } void procsm_if::pausecomm() { mbRun = false; if(mnType == procsm::ModeRead) { mpReadThread->puaseread(); } } void procsm_if::continuecomm() { mbRun = true; if(mnType == procsm::ModeRead) { mpReadThread->continueread(); } } #ifdef USE_GROUPUDP void procsm_if::InitSock() { std::string mcast_ip("236.236.100.100"); uint16_t mcast_port = 8888; notify_fd_ = socket(AF_INET, SOCK_DGRAM, 0); if (notify_fd_ == -1) { std::cout << "fail to create notify fd, " << strerror(errno); return ; } memset(¬ify_addr_, 0, sizeof(notify_addr_)); notify_addr_.sin_family = AF_INET; notify_addr_.sin_addr.s_addr = inet_addr(mcast_ip.c_str()); notify_addr_.sin_port = htons(mcast_port); listen_fd_ = socket(AF_INET, SOCK_DGRAM, 0); if (listen_fd_ == -1) { std::cout << "fail to create listen fd, " << strerror(errno); return ; } if (fcntl(listen_fd_, F_SETFL, O_NONBLOCK) == -1) { std::cout << "fail to set listen fd nonblock, " << strerror(errno); return ; } memset(&listen_addr_, 0, sizeof(listen_addr_)); listen_addr_.sin_family = AF_INET; listen_addr_.sin_addr.s_addr = htonl(INADDR_ANY); listen_addr_.sin_port = htons(mcast_port); int yes = 1; if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0) { std::cout << "fail to setsockopt SO_REUSEADDR, " << strerror(errno); return ; } if (bind(listen_fd_, (struct sockaddr*)&listen_addr_, sizeof(listen_addr_)) < 0) { std::cout << "fail to bind addr, " << strerror(errno); return ; } int loop = 1; if (setsockopt(listen_fd_, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) < 0) { std::cout << "fail to setsockopt IP_MULTICAST_LOOP, " << strerror(errno); return ; } struct ip_mreq mreq; mreq.imr_multiaddr.s_addr = inet_addr(mcast_ip.c_str()); mreq.imr_interface.s_addr = htonl(INADDR_ANY); if (setsockopt(listen_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { std::cout << "fail to setsockopt IP_ADD_MEMBERSHIP, " << strerror(errno); return ; } return ; } /** * @brief procsm_if::WaitNotify * @param timeout_ms * @return if have notify return 1,otherwise,return 0 */ int procsm_if::WaitNotify(int timeout_ms) { int nrtn = 0; struct pollfd fds; fds.fd = listen_fd_; fds.events = POLLIN; int ready_num = poll(&fds, 1, timeout_ms); if (ready_num > 0) { char buf[32] = {0}; // larger than ReadableInfo::kSize ssize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr); if (nbytes == -1) { std::cout << "fail to recvfrom, " << strerror(errno)<WakeRead(); //Waaa; } else { // std::cout<<"key error"<DeserializeFrom(buf, nbytes); } else if (ready_num == 0) { return 0; } else { nrtn = -2; if (errno == EINTR) { std::cout << "poll was interrupted."< 0) { char buf[32] = {0}; // larger than ReadableInfo::kSize ssize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr); if (nbytes == -1) { std::cout << "fail to recvfrom, " << strerror(errno); continue; } if(nbytes == 8) { // std::cout<<"recv key."<WakeRead(); //Waaa; } else { // std::cout<<"key error"<DeserializeFrom(buf, nbytes); } else if (ready_num == 0) { // std::cout << "timeout, no readableinfo."; continue; } else { if (errno == EINTR) { std::cout << "poll was interrupted."<=3)break; } for(i=0;i=3)break; } for(i=0;i<(int)strnlen(strsmname,256);i++) { qint64 temp = *(strsmname+i); xkey = xkey + temp; } return xkey; } void procsm_if::SendGroupMsg() { // qint64 timenow = std::chrono::system_clock::now().time_since_epoch().count(); ssize_t nbytes = sendto(notify_fd_, &mKey, sizeof(qint64), 0, (struct sockaddr*)¬ify_addr_, sizeof(notify_addr_)); if(nbytes <= 0) { std::cout<<"Send Fail."<