123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491 |
- #include "intercomm.h"
- #include <QMutex>
- #include <QWaitCondition>
- #include <iostream>
- namespace iv {
- struct InterListenUnit
- {
- QMutex mWaitMutex;
- SMCallBack mpCall;
- ModuleFun mFun;
- QWaitCondition * mpwc;
- bool mbFunPlus = false;
- };
- struct interunit
- {
- char strintername[256];
- char * strdatabuf;
- int nbufsize = 0;
- int nPacCount;
- QMutex mMutexUnit;
- std::vector<InterListenUnit *> mvectorlisten;
- QWaitCondition mwc;
- bool mbHaveWriter = false;
- };
- std::vector<interunit *> gvectorinter;
- QMutex gMutexInter;
- static interunit * FindInterUnitByName(const char * strname)
- {
- interunit * p = 0;
- int i;
- int nsize;
- nsize = gvectorinter.size();
- for(i=0;i<nsize;i++)
- {
- if(strncmp(strname,gvectorinter.at(i)->strintername,256) == 0)
- {
- return (gvectorinter.at(i));
- }
- }
- return p;
- }
- intercomm::intercomm(const char * strsmname,const unsigned int nBufSize,const unsigned int nMaxPacCount,const int nMode)
- {
- strncpy(mstrsmname,strsmname,256);
- mnMode = nMode;
- if(nMode == ModeWrite)
- {
- gMutexInter.lock();
- interunit * p = FindInterUnitByName(strsmname);
- if(p == 0)
- {
- interunit * pnewinter = new interunit;
- strncpy(pnewinter->strintername,strsmname,256);
- pnewinter->strdatabuf = new char[sizeof(procinter_info)+nMaxPacCount*sizeof(procinter_head) + nBufSize];
- pnewinter->nPacCount = nMaxPacCount;
- pnewinter->nbufsize = nBufSize;
- gvectorinter.push_back(pnewinter);
- mpa = pnewinter;
- }
- else
- {
- p->mMutexUnit.lock();
- delete p->strdatabuf;
- p->strdatabuf = new char[sizeof(procinter_info)+nMaxPacCount*sizeof(procinter_head) + nBufSize];
- p->nPacCount = nMaxPacCount;
- p->nbufsize = nBufSize;
- p->mMutexUnit.unlock();
- mpa = p;
- }
- interunit * pinter = (interunit * )mpa;
- char * pdata = (char *)pinter->strdatabuf;
- mpinfo = (procinter_info *)pdata;
- mphead = (procinter_head *)(pdata+sizeof(procinter_info));
- mpinfo->mCap = nMaxPacCount;
- mpinfo->mnBufSize = nBufSize;
- mpinfo->mFirst = 0;
- mpinfo->mNext = 0;
- mpinfo->mLock = 0;
- pinter->mbHaveWriter = true;
- gMutexInter.unlock();
- }
- else
- {
- gMutexInter.lock();
- interunit * p = FindInterUnitByName(strsmname);
- if(p == 0)
- {
- interunit * pnewinter = new interunit;
- strncpy(pnewinter->strintername,strsmname,256);
- gvectorinter.push_back(pnewinter);
- mpa = pnewinter;
- }
- else
- {
- mpa = p;
- }
- interunit * pinter = (interunit * )mpa;
- char * pdata = (char *)pinter->strdatabuf;
- mpinfo = (procinter_info *)pdata;
- mphead = (procinter_head *)(pdata+sizeof(procinter_info));
- gMutexInter.unlock();
- }
- }
- intercomm::~intercomm()
- {
- if(mnMode == ModeRead)
- {
- stoplisten();
- interunit * p = (interunit *)mpa;
- p->mMutexUnit.lock();
- InterListenUnit * plisten = (InterListenUnit *)mplistenunit;
- int i;
- for(i=0;i<p->mvectorlisten.size();i++)
- {
- if(plisten == p->mvectorlisten.at(i))
- {
- p->mvectorlisten.erase(p->mvectorlisten.begin() + i);
- delete plisten;
- break;
- }
- }
- p->mMutexUnit.unlock();
- }
- }
- int intercomm::listenmsg(ModuleFun xFun)
- {
- if(mnMode == ModeWrite)
- {
- std::cout<<"intercomm::listenmsg this is Write. Can't Listen."<<std::endl;
- return - 1;
- }
- interunit * p = (interunit *)mpa;
- p->mMutexUnit.lock();
- InterListenUnit * pnewlisten = new InterListenUnit;
- pnewlisten->mbFunPlus = true;
- pnewlisten->mFun = xFun;
- pnewlisten->mpwc = &p->mwc;
- p->mvectorlisten.push_back(pnewlisten);
- p->mMutexUnit.unlock();
- mplistenunit = (void *)pnewlisten;
- mplistenthread = new std::thread(&intercomm::listernrun,this);
- return 0;
- }
- int intercomm::listenmsg(SMCallBack pCall)
- {
- if(mnMode == ModeWrite)
- {
- std::cout<<"intercomm::listenmsg this is Write. Can't Listen."<<std::endl;
- return - 1;
- }
- interunit * p = (interunit *)mpa;
- p->mMutexUnit.lock();
- InterListenUnit * pnewlisten = new InterListenUnit;
- pnewlisten->mbFunPlus = false;
- pnewlisten->mpCall = pCall;
- pnewlisten->mpwc = &p->mwc;
- p->mvectorlisten.push_back(pnewlisten);
- p->mMutexUnit.unlock();
- mplistenunit = (void *)pnewlisten;
- mplistenthread = new std::thread(&intercomm::listernrun,this);
- return 0;
- }
- void intercomm::stoplisten()
- {
- mblistenrun = false;
- if(mplistenthread != 0)
- {
- mplistenthread->join();
- mplistenthread = 0;
- }
- }
- void intercomm::pausecomm()
- {
- mbPause = true;
- }
- void intercomm::continuecomm()
- {
- mbPause = false;
- }
- void intercomm::listernrun()
- {
- InterListenUnit * pILU = (InterListenUnit * )mplistenunit;
- QTime xTime;
- xTime.start();
- unsigned int nBufLen = 1;
- unsigned int nRead;
- char * str = new char[nBufLen];
- unsigned int index =0;
- QDateTime *pdt = new QDateTime();
- interunit * pinter = (interunit *)mpa;
- while(mblistenrun)
- {
- if(mbPause)
- {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- continue;
- }
- if(pinter->mbHaveWriter == false)
- {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- continue;
- }
- pILU->mWaitMutex.lock();
- pILU->mpwc->wait(&pILU->mWaitMutex,100);
- pILU->mWaitMutex.unlock();
- int nRtn = readmsg(index,str,nBufLen,&nRead,pdt);
- while ((nRtn != 0)&&(mblistenrun))
- {
- if(nRtn == -1)
- {
- nBufLen = nRead;
- delete str;
- if(nBufLen < 1)nBufLen = 1;
- str = new char[nBufLen];
- }
- else
- {
- if(nRtn == -2)
- {
- index = getcurrentnext();
- }
- else
- {
- if(nRtn >0)
- {
- if(pILU->mbFunPlus)
- {
- pILU->mFun(str,nRtn,index,pdt,mstrsmname);
- }
- else
- {
- (*pILU->mpCall)(str,nRtn,index,pdt,mstrsmname);
- }
- index++;
- }
- else
- {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- }
- }
- }
- nRtn = readmsg(index,str,nBufLen,&nRead,pdt);
- }
- }
- delete str;
- delete pdt;
- }
- int intercomm::writemsg(const char *str, const unsigned int nSize)
- {
- if(mbPause)return -2;
- if(mnMode == ModeRead)
- {
- std::cout<<"Register read. can't write."<<std::endl;
- return -1;
- }
- interunit * pinter = (interunit *)mpa;
- pinter->mMutexUnit.lock();
- if(nSize > pinter->nbufsize)
- {
-
- int nnewsize = std::max(nSize*11/10,nSize+100);
- char * strdatabuf = new char[sizeof(procinter_info)+ pinter->nPacCount*sizeof(procinter_head) + nnewsize];
- memcpy(strdatabuf,pinter->strdatabuf,sizeof(procinter_info)+ pinter->nPacCount*sizeof(procinter_head) + pinter->nbufsize);
- pinter->nbufsize = nnewsize;
- delete pinter->strdatabuf;
- pinter->strdatabuf = strdatabuf;
- qDebug("inter resize buf,new buffer size is %d",nnewsize);
- }
- char * pdata = (char *)pinter->strdatabuf;
- mpinfo = (procinter_info *)pdata;
- mphead = (procinter_head *)(pdata+sizeof(procinter_info));
- WRITEMSG:
- char * pH,*pD;
- QDateTime dt;
- pH = (char *)pinter->strdatabuf;pH = pH + sizeof(procinter_info);
- pD = (char *)pinter->strdatabuf;pD = pD + sizeof(procinter_info) + pinter->nPacCount * sizeof(procinter_head);
- procinter_head * phh = (procinter_head *)pH;
- unsigned int nPac = mpinfo->mNext - mpinfo->mFirst;
- if(nPac>=pinter->nPacCount)
- {
- unsigned int nRemove = pinter->nPacCount/3;
- if(nRemove == 0)nRemove = 1;
- MoveMem(nRemove);
- goto WRITEMSG;
- }
- if(nPac == 0)
- {
- memcpy(pD,str,nSize);
- dt = QDateTime::currentDateTime();
- phh->SetDate(dt);
- phh->mindex = mpinfo->mNext;
- phh->mnPos = 0;
- phh->mnLen = nSize;
- mpinfo->mNext = mpinfo->mNext+1;
- }
- else
- {
- if(((phh+nPac-1)->mnPos+(phh+nPac-1)->mnLen + nSize)>=pinter->nbufsize)
- {
- unsigned int nRemove = pinter->nPacCount/2;
- if(nRemove == 0)nRemove = 1;
- MoveMem(nRemove);
- goto WRITEMSG;
- }
- else
- {
- unsigned int nPos = (phh+nPac-1)->mnPos + (phh+nPac-1)->mnLen;
- memcpy(pD+nPos,str,nSize);
- dt = QDateTime::currentDateTime();
- (phh+nPac)->SetDate(dt);
- (phh+nPac)->mindex = mpinfo->mNext;
- (phh+nPac)->mnPos = nPos;
- (phh+nPac)->mnLen = nSize;
- mpinfo->mNext = mpinfo->mNext+1;
- }
- }
- const unsigned int nTM = 0x6fffffff;
- if((mpinfo->mNext >nTM)&&(mpinfo->mFirst>nTM))
- {
- nPac = mpinfo->mNext - mpinfo->mFirst;
- unsigned int i;
- for(i=0;i<nPac;i++)
- {
- (phh+i)->mindex = (phh+i)->mindex-nTM;
- }
- mpinfo->mFirst = mpinfo->mFirst-nTM;
- mpinfo->mNext = mpinfo->mNext - nTM;
- }
- pinter->mMutexUnit.unlock();
- pinter->mwc.wakeAll();
- return 0;
- }
- int intercomm::MoveMem(const unsigned int nSize)
- {
- unsigned int nRemove = nSize;
- if(nRemove == 0)return -1;
- interunit * pinter = (interunit *)mpa;
- char * pH,*pD;
- pH = (char *)pinter->strdatabuf;pH = pH + sizeof(procinter_info);
- pD = (char *)pinter->strdatabuf;pD = pD + sizeof(procinter_info) + pinter->nPacCount * sizeof(procinter_head);
- procinter_head * phh = (procinter_head *)pH;
- unsigned int nPac = mpinfo->mNext - mpinfo->mFirst;
- if(nRemove >nPac)
- {
- nRemove = nPac;
- }
- if(nRemove == nPac)
- {
- mpinfo->mFirst = mpinfo->mFirst + (unsigned int)nRemove;
- return 0;
- }
- unsigned int i;
- int nDataMove = 0;
- for(i=0;i<nRemove;i++)
- {
- procinter_head * phd = phh+i;
- nDataMove = nDataMove + phd->mnLen;
- }
- unsigned int nDataTotal;
- for(i=0;i<(nPac - nRemove);i++)
- {
- memcpy(phh+i,phh+i+nRemove,sizeof(procinter_head));
- (phh+i)->mnPos = (phh+i)->mnPos - nDataMove;
- }
- nDataTotal = (phh + nPac-nRemove-1)->mnPos + (phh+nPac-nRemove-1)->mnLen;
- char * strtem = new char[pinter->nbufsize];
- memcpy(strtem,pD+nDataMove,nDataTotal);
- memcpy(pD,strtem,nDataTotal);
- delete strtem;
- mpinfo->mFirst = mpinfo->mFirst + (unsigned int)nRemove;
- return 0;
- }
- int intercomm::readmsg(unsigned int index, char *str, unsigned int nMaxSize, unsigned int *nRead, QDateTime *pdt)
- {
- int nRtn = 0;
- interunit * pinter = (interunit *)mpa;
- if(pinter->nbufsize == 0)return 0;
- pinter->mMutexUnit.lock();
- char * pdata = (char *)pinter->strdatabuf;
- mpinfo = (procinter_info *)pdata;
- mphead = (procinter_head *)(pdata+sizeof(procinter_info));
- if((index< mpinfo->mFirst)||(index > mpinfo->mNext))
- {
- nRtn = -2;
- }
- if(nRtn != (-2))
- {
- if(index == mpinfo->mNext)
- {
- nRtn = 0;
- }
- else
- {
- char * pH,*pD;
-
-
- pD = (char *)pinter->strdatabuf;pD = pD+ sizeof(procinter_info) + mpinfo->mCap*sizeof(procinter_head);
- pH = (char *)pinter->strdatabuf;pH = pH+sizeof(procinter_info);
- procinter_head * phh = (procinter_head *)pH;
- unsigned int nPac = mpinfo->mNext - mpinfo->mFirst;
- if(nPac == 0)
- {
- nRtn = 0;
- }
- else
- {
- unsigned int nPos = index - mpinfo->mFirst;
- *nRead = (phh+nPos)->mnLen;
- if((phh+nPos)->mnLen > nMaxSize)
- {
- nRtn = -1;
- }
- else
- {
-
- memcpy(str,pD + (phh+nPos)->mnPos,(phh+nPos)->mnLen);
-
- nRtn = (phh+nPos)->mnLen;
- (phh+nPos)->GetDate(pdt);
-
- }
- }
- }
- }
- pinter->mMutexUnit.unlock();
- return nRtn;
- }
- unsigned int intercomm::getcurrentnext()
- {
- unsigned int nNext;
- interunit * pinter = (interunit *)mpa;
- char * pdata = (char *)pinter->strdatabuf;
- mpinfo = (procinter_info *)pdata;
- mphead = (procinter_head *)(pdata+sizeof(procinter_info));
- nNext = mpinfo->mNext;
- return nNext;
- }
- }
|