#include <iostream> #include <thread> #include <QTime> #include <QThread> #include <algorithm> #include "procsm.h" class AttachThread : public QThread { public: AttachThread(QSharedMemory * pa,bool & bAttach) { mbAttach = bAttach; mpa = pa; mbrun = true; } QSharedMemory * mpa; bool mbAttach = false; bool mbrun = true; void run() { mbAttach = mpa->attach(); mbrun = false; } }; procsm::procsm(const char * strsmname,const unsigned int nBufSize,const unsigned int nMaxPacCount,const int nMode) { // mnBufSize = nBufSize; // qDebug("create dbus"); strncpy(mstrsmname,strsmname,256); mpASMPtr = new QSharedMemory(strsmname); char strasmname[256]; if(nMode == ModeWrite) { bool bres = mpASMPtr->attach(); if(bres == false) { mpASMPtr->create(sizeof(ASM_PTR)); ASM_PTR * pasm = (ASM_PTR *)mpASMPtr->data(); snprintf(strasmname,256,"%s_%lld",strsmname,QDateTime::currentMSecsSinceEpoch()); pasm->mnshmsize = sizeof(procsm_info)+nMaxPacCount*sizeof(procsm_head) + nBufSize; pasm->mnUpdateTime = QDateTime::currentMSecsSinceEpoch(); strncpy(pasm->mstrshmname,strasmname,256); mASM_State = *pasm; } else { ASM_PTR * pasm = (ASM_PTR *)mpASMPtr->data(); mASM_State = *pasm; } } else { return; } // mpASM = new QSharedMemory(strsmname); mpASM = new QSharedMemory(strasmname); if(nMode == ModeWrite) { mmodulemsg_type.mnBufSize = nBufSize; mmodulemsg_type.mnMsgBufCount = nMaxPacCount; strncpy(mmodulemsg_type.mstrmsgname,strasmname,255); #ifdef USEDBUS mmsg = QDBusMessage::createSignal("/catarc/adc", "adc.adciv.modulecomm", strsmname); mmsg<<1; #endif bool bAttach = false; AttachThread AT(mpASM,bAttach); AT.start(); QTime xTime; xTime.start(); while(xTime.elapsed()<100) { if(AT.mbrun == false) { bAttach = AT.mbAttach; break; } } // qDebug("time is %d",xTime.elapsed()); if(xTime.elapsed()>= 1000) { qDebug("in 1000ms Attach fail.terminate it ."); AT.terminate(); bAttach = false; } // if(!mpASM->attach()) if(!bAttach) { mpASM->create(sizeof(procsm_info)+nMaxPacCount*sizeof(procsm_head) + nBufSize); char * p = (char *)mpASM->data(); mpinfo = (procsm_info *)p; mphead = (procsm_head *)(p+sizeof(procsm_info)); mpinfo->mCap = nMaxPacCount; mpinfo->mnBufSize = nBufSize; mpinfo->mFirst = 0; mpinfo->mNext = 0; mpinfo->mLock = 0; } if(mpASM->isAttached()) { mbAttach = true; char * p = (char *)mpASM->data(); mpinfo = (procsm_info *)p; mphead = (procsm_head *)(p+sizeof(procsm_info)); mnMaxPacCount = mpinfo->mCap; mnBufSize = mpinfo->mnBufSize; // qDebug("attach successful"); mstrtem = new char[mnBufSize]; #ifdef USEDBUS mmsgres = QDBusMessage::createSignal("/catarc/adc", "adciv.interface", "modulemsgres"); mmsgres<<1; bool bconnect = QDBusConnection::sessionBus().connect(QString(),"/catarc/adc", "adciv.interface", "modulemsgquery",this,SLOT(onQuery())); if(bconnect == false) { std::cout<<"procsm_if_readthread::procsm_if_readthread bconect is false"<<std::endl; } #endif } else { mbAttach = false; qDebug("Share Memory Error."); } } } void procsm::recreateasm(int nbufsize) { mpASMPtr->lock(); qDebug("recreate asms"); mnBufSize = std::max(nbufsize*11/10,nbufsize+1000); // mnBufSize = nbufsize+100; char strasmname[256]; ASM_PTR * pasm = (ASM_PTR *)mpASMPtr->data(); snprintf(strasmname,256,"%s_%lld",mstrsmname,QDateTime::currentMSecsSinceEpoch()); pasm->mnshmsize = sizeof(procsm_info)+mnMaxPacCount*sizeof(procsm_head) + mnBufSize; pasm->mnUpdateTime = QDateTime::currentMSecsSinceEpoch(); strncpy(pasm->mstrshmname,strasmname,256); mASM_State = *pasm; mmodulemsg_type.mnBufSize = mnBufSize; mmodulemsg_type.mnMsgBufCount = mnMaxPacCount; strncpy(mmodulemsg_type.mstrmsgname,mASM_State.mstrshmname,255); mpASM->lock(); int noldmemsize = mpASM->size(); char * px = new char[mpASM->size()]; memcpy(px,mpASM->data(),noldmemsize); mpASM->unlock(); mpASM->detach(); qDebug("new asm name is %s,buffer size is %d ",mASM_State.mstrshmname,mnBufSize); mpASM = new QSharedMemory(mASM_State.mstrshmname); bool bAttach = false; AttachThread AT(mpASM,bAttach); AT.start(); QTime xTime; xTime.start(); while(xTime.elapsed()<100) { if(AT.mbrun == false) { bAttach = AT.mbAttach; break; } } // qDebug("time is %d",xTime.elapsed()); if(xTime.elapsed()>= 1000) { qDebug("in 1000ms Attach fail.terminate it ."); AT.terminate(); bAttach = false; } // if(!mpASM->attach()) if(!bAttach) { mpASM->create(sizeof(procsm_info)+mnMaxPacCount*sizeof(procsm_head) + mnBufSize); memcpy(mpASM->data(),px,noldmemsize); char * p = (char *)mpASM->data(); mpinfo = (procsm_info *)p; mphead = (procsm_head *)(p+sizeof(procsm_info)); mpinfo->mCap = mnMaxPacCount; mpinfo->mnBufSize = mnBufSize; // mpinfo->mFirst = nfirst; // mpinfo->mNext = nnext; // mpinfo->mLock = 0; } if(mpASM->isAttached()) { mbAttach = true; char * p = (char *)mpASM->data(); mpinfo = (procsm_info *)p; mphead = (procsm_head *)(p+sizeof(procsm_info)); mnMaxPacCount = mpinfo->mCap; mnBufSize = mpinfo->mnBufSize; // qDebug("attach successful"); // mstrtem = new char[mnBufSize]; } else { mbAttach = false; qDebug("Share Memory Error."); } mpASMPtr->unlock(); delete px; } #ifdef USEDBUS void procsm::onQuery() { QByteArray ba; ba.append((char *)&mmodulemsg_type,sizeof(iv::modulemsg_type)); QList<QVariant> x; x<<ba; mmsgres.setArguments(x); QDBusConnection::sessionBus().send(mmsgres); } #endif bool procsm::AttachMem() { if(!mpASMPtr->isAttached())mpASMPtr->attach(); if(mpASMPtr->isAttached()) { ASM_PTR * pasmptr = (ASM_PTR *)(mpASMPtr->data()); mASM_State = * pasmptr; if(mpASM != 0) { if(mpASM->isAttached())mpASM->detach(); delete mpASM; } mpASM = new QSharedMemory(mASM_State.mstrshmname); mpASM->attach(); if(mpASM->isAttached()) { mbAttach = true; char * p = (char *)mpASM->data(); mpinfo = (procsm_info *)p; mphead = (procsm_head *)(p+sizeof(procsm_info)); mnMaxPacCount = mpinfo->mCap; mnBufSize = mpinfo->mnBufSize; return true; } else { return false; } } else { return false; } return false; mpASM->attach(); if(mpASM->isAttached()) { mbAttach = true; char * p = (char *)mpASM->data(); mpinfo = (procsm_info *)p; mphead = (procsm_head *)(p+sizeof(procsm_info)); mnMaxPacCount = mpinfo->mCap; mnBufSize = mpinfo->mnBufSize; return true; } else { return false; } } int procsm::MoveMem(const unsigned int nSize) { // qDebug("move mem"); unsigned int nRemove = nSize; if(nRemove == 0)return -1; // unsigned int * pIndexFirst = (unsigned int *)mpASM->data(); // unsigned int * pIndexNext = pIndexFirst+1; // qDebug("first = %d next = %d",*pIndexFirst,*pIndexNext); // unsigned int * pIndexNext = pIndexFirst; char * pH,*pD; pH = (char *)mpASM->data();pH = pH + sizeof(procsm_info); pD = (char *)mpASM->data();pD = pD + sizeof(procsm_info) + mnMaxPacCount * sizeof(procsm_head); procsm_head * phh = (procsm_head *)pH; unsigned int nPac = mpinfo->mNext - mpinfo->mFirst; if(nRemove >nPac) { // qDebug("procsm::MoveMem nRemove > nPac nRemove = %d",nRemove); nRemove = nPac; } if(nRemove == nPac) { mpinfo->mFirst = mpinfo->mFirst + (unsigned int)nRemove; return 0; } unsigned int i; int nDataMove = 0; for(i=0;i<nRemove;i++) { procsm_head * phd = phh+i; nDataMove = nDataMove + phd->mnLen; } unsigned int nDataTotal; for(i=0;i<(nPac - nRemove);i++) { memcpy(phh+i,phh+i+nRemove,sizeof(procsm_head)); (phh+i)->mnPos = (phh+i)->mnPos - nDataMove; } nDataTotal = (phh + nPac-nRemove-1)->mnPos + (phh+nPac-nRemove-1)->mnLen; memcpy(mstrtem,pD+nDataMove,nDataTotal); memcpy(pD,mstrtem,nDataTotal); // for(i=0;i<nDataTotal;i++) // { // *(pD+i) = *(pD+i+nDataMove); // } mpinfo->mFirst = mpinfo->mFirst + (unsigned int)nRemove; return 0; } void procsm::checkasm() { mpASMPtr->lock(); ASM_PTR * pASM_PTR = (ASM_PTR * )mpASMPtr->data(); if(pASM_PTR->mnUpdateTime == mASM_State.mnUpdateTime) { mpASMPtr->unlock(); return; } qDebug("reattch mem."); mbAttach = false; AttachMem(); mpASMPtr->unlock(); } int procsm::writemsg(const char *str, const unsigned int nSize) { checkasm(); if(nSize > mnBufSize) { if(nSize<1000000000) { recreateasm(nSize); checkasm(); } else { qDebug("procsm::writemsg message size is very big"); return -1; } } if(mbAttach == false) { std::cout<<"ShareMemory Attach fail."<<std::endl; return -1; } mpASM->lock(); // unsigned int * pIndexFirst = (unsigned int *)mpASM->data(); // unsigned int * pIndexNext = pIndexFirst+1; if(mpinfo->mLock == 1) { std::cout<<"ShareMemory have lock.Init."<<std::endl; mpinfo->mLock = 0; mpinfo->mFirst = 0; mpinfo->mNext = 0; } mpinfo->mLock =1; WRITEMSG: char * pH,*pD; QDateTime dt; pH = (char *)mpASM->data();pH = pH + sizeof(procsm_info); pD = (char *)mpASM->data();pD = pD + sizeof(procsm_info) + mnMaxPacCount * sizeof(procsm_head); procsm_head * phh = (procsm_head *)pH; unsigned int nPac = mpinfo->mNext - mpinfo->mFirst; if(nPac>=mnMaxPacCount) { unsigned int nRemove = mnMaxPacCount/3; if(nRemove == 0)nRemove = 1; MoveMem(nRemove); goto WRITEMSG; } if(nPac == 0) { memcpy(pD,str,nSize); dt = QDateTime::currentDateTime(); // phh->mdt = dt; phh->SetDate(dt); // memcpy(&phh->mdt,&dt,sizeof(QDateTime)); // phh->mdt = QDateTime::currentDateTime(); phh->mindex = mpinfo->mNext; phh->mnPos = 0; phh->mnLen = nSize; mpinfo->mNext = mpinfo->mNext+1; } else { if(((phh+nPac-1)->mnPos+(phh+nPac-1)->mnLen + nSize)>=mnBufSize) { unsigned int nRemove = mnMaxPacCount/2; if(nRemove == 0)nRemove = 1; MoveMem(nRemove); goto WRITEMSG; } else { unsigned int nPos = (phh+nPac-1)->mnPos + (phh+nPac-1)->mnLen; // qDebug("write pos = %d",nPos); memcpy(pD+nPos,str,nSize); dt = QDateTime::currentDateTime(); (phh+nPac)->SetDate(dt); // memcpy(&(phh+nPac)->mdt,&dt,sizeof(QDateTime)); // (phh+nPac)->mdt = QDateTime::currentDateTime(); (phh+nPac)->mindex = mpinfo->mNext; (phh+nPac)->mnPos = nPos; (phh+nPac)->mnLen = nSize; mpinfo->mNext = mpinfo->mNext+1; } } const unsigned int nTM = 0x6fffffff; if((mpinfo->mNext >nTM)&&(mpinfo->mFirst>nTM)) { nPac = mpinfo->mNext - mpinfo->mFirst; unsigned int i; for(i=0;i<nPac;i++) { (phh+i)->mindex = (phh+i)->mindex-nTM; } mpinfo->mFirst = mpinfo->mFirst-nTM; mpinfo->mNext = mpinfo->mNext - nTM; } mpinfo->mLock = 0; mpASM->unlock(); #ifdef USEDBUS QDBusConnection::sessionBus().send(mmsg); #endif return 0; } unsigned int procsm::getcurrentnext() { checkasm(); unsigned int nNext; mpASM->lock(); nNext = mpinfo->mNext; mpASM->unlock(); return nNext; } //if return 0 No Data. //if return -1 nMaxSize is small //if retrun -2 index is not in range,call getcurrentnext get position //if return > 0 readdata int procsm::readmsg(unsigned int index, char *str, unsigned int nMaxSize,unsigned int * nRead,QDateTime * pdt) { checkasm(); if(mbAttach == false) { std::cout<<"ShareMemory Attach fail."<<std::endl; return -1; } int nRtn = 0; mpASM->lock(); if((index< mpinfo->mFirst)||(index > mpinfo->mNext)) { nRtn = -2; } if(nRtn != (-2)) { if(index == mpinfo->mNext) { nRtn = 0; } else { char * pH,*pD; // pH = (char *)mpASM->data();pH = pH + 2*sizeof(unsigned int); // pD = (char *)mpASM->data();pD = pD + 2*sizeof(unsigned int) + mnMaxPacCount * sizeof(procsm_head); pD = (char *)mpASM->data();pD = pD+ sizeof(procsm_info) + mpinfo->mCap*sizeof(procsm_head); pH = (char *)mpASM->data();pH = pH+sizeof(procsm_info); procsm_head * phh = (procsm_head *)pH; unsigned int nPac = mpinfo->mNext - mpinfo->mFirst; if(nPac == 0) { nRtn = 0; } else { unsigned int nPos = index - mpinfo->mFirst; *nRead = (phh+nPos)->mnLen; if((phh+nPos)->mnLen > nMaxSize) { nRtn = -1; } else { // qDebug("read pos = %d",(phh+nPos)->mnPos); memcpy(str,pD + (phh+nPos)->mnPos,(phh+nPos)->mnLen); // qDebug("read pos = %d",(phh+nPos)->mnPos); nRtn = (phh+nPos)->mnLen; (phh+nPos)->GetDate(pdt); // memcpy(pdt,&((phh+nPos)->mdt),sizeof(QDateTime)); } } } } mpASM->unlock(); return nRtn; }