123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572 |
- #include <iostream>
- #include <thread>
- #include <QTime>
- #include <QThread>
- #include <algorithm>
- #include "procsm.h"
- class AttachThread : public QThread
- {
- public:
- AttachThread(QSharedMemory * pa,bool & bAttach)
- {
- mbAttach = bAttach;
- mpa = pa;
- mbrun = true;
- }
- QSharedMemory * mpa;
- bool mbAttach = false;
- bool mbrun = true;
- void run()
- {
- mbAttach = mpa->attach();
- mbrun = false;
- }
- };
- procsm::procsm(const char * strsmname,const unsigned int nBufSize,const unsigned int nMaxPacCount,const int nMode)
- {
- // mnBufSize = nBufSize;
- // qDebug("create dbus");
- strncpy(mstrsmname,strsmname,256);
- mpASMPtr = new QSharedMemory(strsmname);
- char strasmname[256];
- if(nMode == ModeWrite)
- {
- bool bres = mpASMPtr->attach();
- if(bres == false)
- {
- mpASMPtr->create(sizeof(ASM_PTR));
- ASM_PTR * pasm = (ASM_PTR *)mpASMPtr->data();
- snprintf(strasmname,256,"%s_%lld",strsmname,QDateTime::currentMSecsSinceEpoch());
- pasm->mnshmsize = sizeof(procsm_info)+nMaxPacCount*sizeof(procsm_head) + nBufSize;
- pasm->mnUpdateTime = QDateTime::currentMSecsSinceEpoch();
- strncpy(pasm->mstrshmname,strasmname,256);
- mASM_State = *pasm;
- }
- else
- {
- ASM_PTR * pasm = (ASM_PTR *)mpASMPtr->data();
- mASM_State = *pasm;
- }
- }
- else
- {
- return;
- }
- // mpASM = new QSharedMemory(strsmname);
- mpASM = new QSharedMemory(strasmname);
- if(nMode == ModeWrite)
- {
- mmodulemsg_type.mnBufSize = nBufSize;
- mmodulemsg_type.mnMsgBufCount = nMaxPacCount;
- strncpy(mmodulemsg_type.mstrmsgname,strasmname,255);
- #ifdef USEDBUS
- mmsg = QDBusMessage::createSignal("/catarc/adc", "adc.adciv.modulecomm", strsmname);
- mmsg<<1;
- #endif
- bool bAttach = false;
- AttachThread AT(mpASM,bAttach);
- AT.start();
- QTime xTime;
- xTime.start();
- while(xTime.elapsed()<100)
- {
- if(AT.mbrun == false)
- {
- bAttach = AT.mbAttach;
- break;
- }
- }
- // qDebug("time is %d",xTime.elapsed());
- if(xTime.elapsed()>= 1000)
- {
- qDebug("in 1000ms Attach fail.terminate it .");
- AT.terminate();
- bAttach = false;
- }
- // if(!mpASM->attach())
- if(!bAttach)
- {
- mpASM->create(sizeof(procsm_info)+nMaxPacCount*sizeof(procsm_head) + nBufSize);
- char * p = (char *)mpASM->data();
- mpinfo = (procsm_info *)p;
- mphead = (procsm_head *)(p+sizeof(procsm_info));
- mpinfo->mCap = nMaxPacCount;
- mpinfo->mnBufSize = nBufSize;
- mpinfo->mFirst = 0;
- mpinfo->mNext = 0;
- mpinfo->mLock = 0;
- }
- if(mpASM->isAttached())
- {
- mbAttach = true;
- char * p = (char *)mpASM->data();
- mpinfo = (procsm_info *)p;
- mphead = (procsm_head *)(p+sizeof(procsm_info));
- mnMaxPacCount = mpinfo->mCap;
- mnBufSize = mpinfo->mnBufSize;
- // qDebug("attach successful");
- mstrtem = new char[mnBufSize];
- #ifdef USEDBUS
- mmsgres = QDBusMessage::createSignal("/catarc/adc", "adciv.interface", "modulemsgres");
- mmsgres<<1;
- bool bconnect = QDBusConnection::sessionBus().connect(QString(),"/catarc/adc", "adciv.interface", "modulemsgquery",this,SLOT(onQuery()));
- if(bconnect == false)
- {
- std::cout<<"procsm_if_readthread::procsm_if_readthread bconect is false"<<std::endl;
- }
- #endif
- }
- else
- {
- mbAttach = false;
- qDebug("Share Memory Error.");
- }
- }
- }
- void procsm::recreateasm(int nbufsize)
- {
- mpASMPtr->lock();
- qDebug("recreate asms");
- mnBufSize = std::max(nbufsize*11/10,nbufsize+1000);
- // mnBufSize = nbufsize+100;
- char strasmname[256];
- ASM_PTR * pasm = (ASM_PTR *)mpASMPtr->data();
- snprintf(strasmname,256,"%s_%lld",mstrsmname,QDateTime::currentMSecsSinceEpoch());
- pasm->mnshmsize = sizeof(procsm_info)+mnMaxPacCount*sizeof(procsm_head) + mnBufSize;
- pasm->mnUpdateTime = QDateTime::currentMSecsSinceEpoch();
- strncpy(pasm->mstrshmname,strasmname,256);
- mASM_State = *pasm;
- mmodulemsg_type.mnBufSize = mnBufSize;
- mmodulemsg_type.mnMsgBufCount = mnMaxPacCount;
- strncpy(mmodulemsg_type.mstrmsgname,mASM_State.mstrshmname,255);
- mpASM->lock();
- int noldmemsize = mpASM->size();
- char * px = new char[mpASM->size()];
- memcpy(px,mpASM->data(),noldmemsize);
- mpASM->unlock();
- mpASM->detach();
- qDebug("new asm name is %s,buffer size is %d ",mASM_State.mstrshmname,mnBufSize);
- mpASM = new QSharedMemory(mASM_State.mstrshmname);
- bool bAttach = false;
- AttachThread AT(mpASM,bAttach);
- AT.start();
- QTime xTime;
- xTime.start();
- while(xTime.elapsed()<100)
- {
- if(AT.mbrun == false)
- {
- bAttach = AT.mbAttach;
- break;
- }
- }
- // qDebug("time is %d",xTime.elapsed());
- if(xTime.elapsed()>= 1000)
- {
- qDebug("in 1000ms Attach fail.terminate it .");
- AT.terminate();
- bAttach = false;
- }
- // if(!mpASM->attach())
- if(!bAttach)
- {
- mpASM->create(sizeof(procsm_info)+mnMaxPacCount*sizeof(procsm_head) + mnBufSize);
- memcpy(mpASM->data(),px,noldmemsize);
- char * p = (char *)mpASM->data();
- mpinfo = (procsm_info *)p;
- mphead = (procsm_head *)(p+sizeof(procsm_info));
- mpinfo->mCap = mnMaxPacCount;
- mpinfo->mnBufSize = mnBufSize;
- // mpinfo->mFirst = nfirst;
- // mpinfo->mNext = nnext;
- // mpinfo->mLock = 0;
- }
- if(mpASM->isAttached())
- {
- mbAttach = true;
- char * p = (char *)mpASM->data();
- mpinfo = (procsm_info *)p;
- mphead = (procsm_head *)(p+sizeof(procsm_info));
- mnMaxPacCount = mpinfo->mCap;
- mnBufSize = mpinfo->mnBufSize;
- // qDebug("attach successful");
- // mstrtem = new char[mnBufSize];
- }
- else
- {
- mbAttach = false;
- qDebug("Share Memory Error.");
- }
- mpASMPtr->unlock();
- delete px;
- }
- #ifdef USEDBUS
- void procsm::onQuery()
- {
- QByteArray ba;
- ba.append((char *)&mmodulemsg_type,sizeof(iv::modulemsg_type));
- QList<QVariant> x;
- x<<ba;
- mmsgres.setArguments(x);
- QDBusConnection::sessionBus().send(mmsgres);
- }
- #endif
- bool procsm::AttachMem()
- {
- if(!mpASMPtr->isAttached())mpASMPtr->attach();
- if(mpASMPtr->isAttached())
- {
- ASM_PTR * pasmptr = (ASM_PTR *)(mpASMPtr->data());
- mASM_State = * pasmptr;
- if(mpASM != 0)
- {
- if(mpASM->isAttached())mpASM->detach();
- delete mpASM;
- }
- mpASM = new QSharedMemory(mASM_State.mstrshmname);
- mpASM->attach();
- if(mpASM->isAttached())
- {
- mbAttach = true;
- char * p = (char *)mpASM->data();
- mpinfo = (procsm_info *)p;
- mphead = (procsm_head *)(p+sizeof(procsm_info));
- mnMaxPacCount = mpinfo->mCap;
- mnBufSize = mpinfo->mnBufSize;
- return true;
- }
- else
- {
- return false;
- }
- }
- else
- {
- return false;
- }
- return false;
- mpASM->attach();
- if(mpASM->isAttached())
- {
- mbAttach = true;
- char * p = (char *)mpASM->data();
- mpinfo = (procsm_info *)p;
- mphead = (procsm_head *)(p+sizeof(procsm_info));
- mnMaxPacCount = mpinfo->mCap;
- mnBufSize = mpinfo->mnBufSize;
- return true;
- }
- else
- {
- return false;
- }
- }
- int procsm::MoveMem(const unsigned int nSize)
- {
- // qDebug("move mem");
- unsigned int nRemove = nSize;
- if(nRemove == 0)return -1;
- // unsigned int * pIndexFirst = (unsigned int *)mpASM->data();
- // unsigned int * pIndexNext = pIndexFirst+1;
- // qDebug("first = %d next = %d",*pIndexFirst,*pIndexNext);
- // unsigned int * pIndexNext = pIndexFirst;
- char * pH,*pD;
- pH = (char *)mpASM->data();pH = pH + sizeof(procsm_info);
- pD = (char *)mpASM->data();pD = pD + sizeof(procsm_info) + mnMaxPacCount * sizeof(procsm_head);
- procsm_head * phh = (procsm_head *)pH;
- unsigned int nPac = mpinfo->mNext - mpinfo->mFirst;
- if(nRemove >nPac)
- {
- // qDebug("procsm::MoveMem nRemove > nPac nRemove = %d",nRemove);
- nRemove = nPac;
- }
- if(nRemove == nPac)
- {
- mpinfo->mFirst = mpinfo->mFirst + (unsigned int)nRemove;
- return 0;
- }
- unsigned int i;
- int nDataMove = 0;
- for(i=0;i<nRemove;i++)
- {
- procsm_head * phd = phh+i;
- nDataMove = nDataMove + phd->mnLen;
- }
- unsigned int nDataTotal;
- for(i=0;i<(nPac - nRemove);i++)
- {
- memcpy(phh+i,phh+i+nRemove,sizeof(procsm_head));
- (phh+i)->mnPos = (phh+i)->mnPos - nDataMove;
- }
- nDataTotal = (phh + nPac-nRemove-1)->mnPos + (phh+nPac-nRemove-1)->mnLen;
- memcpy(mstrtem,pD+nDataMove,nDataTotal);
- memcpy(pD,mstrtem,nDataTotal);
- // for(i=0;i<nDataTotal;i++)
- // {
- // *(pD+i) = *(pD+i+nDataMove);
- // }
- mpinfo->mFirst = mpinfo->mFirst + (unsigned int)nRemove;
- return 0;
- }
- void procsm::checkasm()
- {
- mpASMPtr->lock();
- ASM_PTR * pASM_PTR = (ASM_PTR * )mpASMPtr->data();
- if(pASM_PTR->mnUpdateTime == mASM_State.mnUpdateTime)
- {
- mpASMPtr->unlock();
- return;
- }
- qDebug("reattch mem.");
- mbAttach = false;
- AttachMem();
- mpASMPtr->unlock();
- }
- int procsm::writemsg(const char *str, const unsigned int nSize)
- {
- checkasm();
- if(nSize > mnBufSize)
- {
- if(nSize<1000000000)
- {
- recreateasm(nSize);
- checkasm();
- }
- else
- {
- qDebug("procsm::writemsg message size is very big");
- return -1;
- }
- }
- if(mbAttach == false)
- {
- std::cout<<"ShareMemory Attach fail."<<std::endl;
- return -1;
- }
- mpASM->lock();
- // unsigned int * pIndexFirst = (unsigned int *)mpASM->data();
- // unsigned int * pIndexNext = pIndexFirst+1;
- if(mpinfo->mLock == 1)
- {
- std::cout<<"ShareMemory have lock.Init."<<std::endl;
- mpinfo->mLock = 0;
- mpinfo->mFirst = 0;
- mpinfo->mNext = 0;
- }
- mpinfo->mLock =1;
- WRITEMSG:
- char * pH,*pD;
- QDateTime dt;
- pH = (char *)mpASM->data();pH = pH + sizeof(procsm_info);
- pD = (char *)mpASM->data();pD = pD + sizeof(procsm_info) + mnMaxPacCount * sizeof(procsm_head);
- procsm_head * phh = (procsm_head *)pH;
- unsigned int nPac = mpinfo->mNext - mpinfo->mFirst;
- if(nPac>=mnMaxPacCount)
- {
- unsigned int nRemove = mnMaxPacCount/3;
- if(nRemove == 0)nRemove = 1;
- MoveMem(nRemove);
- goto WRITEMSG;
- }
- if(nPac == 0)
- {
- memcpy(pD,str,nSize);
- dt = QDateTime::currentDateTime();
- // phh->mdt = dt;
- phh->SetDate(dt);
- // memcpy(&phh->mdt,&dt,sizeof(QDateTime));
- // phh->mdt = QDateTime::currentDateTime();
- phh->mindex = mpinfo->mNext;
- phh->mnPos = 0;
- phh->mnLen = nSize;
- mpinfo->mNext = mpinfo->mNext+1;
- }
- else
- {
- if(((phh+nPac-1)->mnPos+(phh+nPac-1)->mnLen + nSize)>=mnBufSize)
- {
- unsigned int nRemove = mnMaxPacCount/2;
- if(nRemove == 0)nRemove = 1;
- MoveMem(nRemove);
- goto WRITEMSG;
- }
- else
- {
- unsigned int nPos = (phh+nPac-1)->mnPos + (phh+nPac-1)->mnLen;
- // qDebug("write pos = %d",nPos);
- memcpy(pD+nPos,str,nSize);
- dt = QDateTime::currentDateTime();
- (phh+nPac)->SetDate(dt);
- // memcpy(&(phh+nPac)->mdt,&dt,sizeof(QDateTime));
- // (phh+nPac)->mdt = QDateTime::currentDateTime();
- (phh+nPac)->mindex = mpinfo->mNext;
- (phh+nPac)->mnPos = nPos;
- (phh+nPac)->mnLen = nSize;
- mpinfo->mNext = mpinfo->mNext+1;
- }
- }
- const unsigned int nTM = 0x6fffffff;
- if((mpinfo->mNext >nTM)&&(mpinfo->mFirst>nTM))
- {
- nPac = mpinfo->mNext - mpinfo->mFirst;
- unsigned int i;
- for(i=0;i<nPac;i++)
- {
- (phh+i)->mindex = (phh+i)->mindex-nTM;
- }
- mpinfo->mFirst = mpinfo->mFirst-nTM;
- mpinfo->mNext = mpinfo->mNext - nTM;
- }
- mpinfo->mLock = 0;
- mpASM->unlock();
- #ifdef USEDBUS
- QDBusConnection::sessionBus().send(mmsg);
- #endif
- return 0;
- }
- unsigned int procsm::getcurrentnext()
- {
- checkasm();
- unsigned int nNext;
- mpASM->lock();
- nNext = mpinfo->mNext;
- mpASM->unlock();
- return nNext;
- }
- //if return 0 No Data.
- //if return -1 nMaxSize is small
- //if retrun -2 index is not in range,call getcurrentnext get position
- //if return > 0 readdata
- int procsm::readmsg(unsigned int index, char *str, unsigned int nMaxSize,unsigned int * nRead,QDateTime * pdt)
- {
- checkasm();
- if(mbAttach == false)
- {
- std::cout<<"ShareMemory Attach fail."<<std::endl;
- return -1;
- }
- int nRtn = 0;
- mpASM->lock();
- if((index< mpinfo->mFirst)||(index > mpinfo->mNext))
- {
- nRtn = -2;
- }
- if(nRtn != (-2))
- {
- if(index == mpinfo->mNext)
- {
- nRtn = 0;
- }
- else
- {
- char * pH,*pD;
- // pH = (char *)mpASM->data();pH = pH + 2*sizeof(unsigned int);
- // pD = (char *)mpASM->data();pD = pD + 2*sizeof(unsigned int) + mnMaxPacCount * sizeof(procsm_head);
- pD = (char *)mpASM->data();pD = pD+ sizeof(procsm_info) + mpinfo->mCap*sizeof(procsm_head);
- pH = (char *)mpASM->data();pH = pH+sizeof(procsm_info);
- procsm_head * phh = (procsm_head *)pH;
- unsigned int nPac = mpinfo->mNext - mpinfo->mFirst;
- if(nPac == 0)
- {
- nRtn = 0;
- }
- else
- {
- unsigned int nPos = index - mpinfo->mFirst;
- *nRead = (phh+nPos)->mnLen;
- if((phh+nPos)->mnLen > nMaxSize)
- {
- nRtn = -1;
- }
- else
- {
- // qDebug("read pos = %d",(phh+nPos)->mnPos);
- memcpy(str,pD + (phh+nPos)->mnPos,(phh+nPos)->mnLen);
- // qDebug("read pos = %d",(phh+nPos)->mnPos);
- nRtn = (phh+nPos)->mnLen;
- (phh+nPos)->GetDate(pdt);
- // memcpy(pdt,&((phh+nPos)->mdt),sizeof(QDateTime));
- }
- }
- }
- }
- mpASM->unlock();
- return nRtn;
- }
|