#include "intercomm.h" #include #include #include namespace iv { struct InterListenUnit { QMutex mWaitMutex; SMCallBack mpCall; ModuleFun mFun; QWaitCondition * mpwc; bool mbFunPlus = false; }; struct interunit { char strintername[256]; char * strdatabuf; int nbufsize = 0; int nPacCount; QMutex mMutexUnit; std::vector mvectorlisten; QWaitCondition mwc; bool mbHaveWriter = false; }; std::vector gvectorinter; QMutex gMutexInter; static interunit * FindInterUnitByName(const char * strname) { interunit * p = 0; int i; int nsize; nsize = gvectorinter.size(); for(i=0;istrintername,256) == 0) { return (gvectorinter.at(i)); } } return p; } intercomm::intercomm(const char * strsmname,const unsigned int nBufSize,const unsigned int nMaxPacCount,const int nMode) { strncpy(mstrsmname,strsmname,256); mnMode = nMode; if(nMode == ModeWrite) { gMutexInter.lock(); interunit * p = FindInterUnitByName(strsmname); if(p == 0) { interunit * pnewinter = new interunit; strncpy(pnewinter->strintername,strsmname,256); pnewinter->strdatabuf = new char[sizeof(procinter_info)+nMaxPacCount*sizeof(procinter_head) + nBufSize]; pnewinter->nPacCount = nMaxPacCount; pnewinter->nbufsize = nBufSize; gvectorinter.push_back(pnewinter); mpa = pnewinter; } else { p->mMutexUnit.lock(); delete p->strdatabuf; p->strdatabuf = new char[sizeof(procinter_info)+nMaxPacCount*sizeof(procinter_head) + nBufSize]; p->nPacCount = nMaxPacCount; p->nbufsize = nBufSize; p->mMutexUnit.unlock(); mpa = p; } interunit * pinter = (interunit * )mpa; char * pdata = (char *)pinter->strdatabuf; mpinfo = (procinter_info *)pdata; mphead = (procinter_head *)(pdata+sizeof(procinter_info)); mpinfo->mCap = nMaxPacCount; mpinfo->mnBufSize = nBufSize; mpinfo->mFirst = 0; mpinfo->mNext = 0; mpinfo->mLock = 0; pinter->mbHaveWriter = true; gMutexInter.unlock(); } else { gMutexInter.lock(); interunit * p = FindInterUnitByName(strsmname); if(p == 0) { interunit * pnewinter = new interunit; strncpy(pnewinter->strintername,strsmname,256); gvectorinter.push_back(pnewinter); mpa = pnewinter; } else { mpa = p; } interunit * pinter = (interunit * )mpa; char * pdata = (char *)pinter->strdatabuf; mpinfo = (procinter_info *)pdata; mphead = (procinter_head *)(pdata+sizeof(procinter_info)); gMutexInter.unlock(); } } intercomm::~intercomm() { if(mnMode == ModeRead) { stoplisten(); interunit * p = (interunit *)mpa; p->mMutexUnit.lock(); InterListenUnit * plisten = (InterListenUnit *)mplistenunit; int i; for(i=0;imvectorlisten.size();i++) { if(plisten == p->mvectorlisten.at(i)) { p->mvectorlisten.erase(p->mvectorlisten.begin() + i); delete plisten; break; } } p->mMutexUnit.unlock(); } } int intercomm::listenmsg(ModuleFun xFun) { if(mnMode == ModeWrite) { std::cout<<"intercomm::listenmsg this is Write. Can't Listen."<mMutexUnit.lock(); InterListenUnit * pnewlisten = new InterListenUnit; pnewlisten->mbFunPlus = true; pnewlisten->mFun = xFun; pnewlisten->mpwc = &p->mwc; p->mvectorlisten.push_back(pnewlisten); p->mMutexUnit.unlock(); mplistenunit = (void *)pnewlisten; mplistenthread = new std::thread(&intercomm::listernrun,this); return 0; } int intercomm::listenmsg(SMCallBack pCall) { if(mnMode == ModeWrite) { std::cout<<"intercomm::listenmsg this is Write. Can't Listen."<mMutexUnit.lock(); InterListenUnit * pnewlisten = new InterListenUnit; pnewlisten->mbFunPlus = false; pnewlisten->mpCall = pCall; pnewlisten->mpwc = &p->mwc; p->mvectorlisten.push_back(pnewlisten); p->mMutexUnit.unlock(); mplistenunit = (void *)pnewlisten; mplistenthread = new std::thread(&intercomm::listernrun,this); return 0; } void intercomm::stoplisten() { mblistenrun = false; if(mplistenthread != 0) { mplistenthread->join(); mplistenthread = 0; } } void intercomm::pausecomm() { mbPause = true; } void intercomm::continuecomm() { mbPause = false; } void intercomm::listernrun() { InterListenUnit * pILU = (InterListenUnit * )mplistenunit; QTime xTime; xTime.start(); unsigned int nBufLen = 1; unsigned int nRead; char * str = new char[nBufLen]; unsigned int index =0; QDateTime *pdt = new QDateTime(); interunit * pinter = (interunit *)mpa; while(mblistenrun) { if(mbPause) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); continue; } if(pinter->mbHaveWriter == false) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); continue; } pILU->mWaitMutex.lock(); pILU->mpwc->wait(&pILU->mWaitMutex,100); pILU->mWaitMutex.unlock(); int nRtn = readmsg(index,str,nBufLen,&nRead,pdt); while ((nRtn != 0)&&(mblistenrun)) { if(nRtn == -1) { nBufLen = nRead; delete str; if(nBufLen < 1)nBufLen = 1; str = new char[nBufLen]; } else { if(nRtn == -2) { index = getcurrentnext(); } else { if(nRtn >0) { if(pILU->mbFunPlus) { pILU->mFun(str,nRtn,index,pdt,mstrsmname); } else { (*pILU->mpCall)(str,nRtn,index,pdt,mstrsmname); } index++; } else { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } } nRtn = readmsg(index,str,nBufLen,&nRead,pdt); } } delete str; delete pdt; } int intercomm::writemsg(const char *str, const unsigned int nSize) { if(mbPause)return -2; if(mnMode == ModeRead) { std::cout<<"Register read. can't write."<mMutexUnit.lock(); if(nSize > pinter->nbufsize) { // qDebug("procsm::writemsg message size is very big"); int nnewsize = std::max(nSize*11/10,nSize+100); char * strdatabuf = new char[sizeof(procinter_info)+ pinter->nPacCount*sizeof(procinter_head) + nnewsize]; memcpy(strdatabuf,pinter->strdatabuf,sizeof(procinter_info)+ pinter->nPacCount*sizeof(procinter_head) + pinter->nbufsize); pinter->nbufsize = nnewsize; delete pinter->strdatabuf; pinter->strdatabuf = strdatabuf; qDebug("inter resize buf,new buffer size is %d",nnewsize); } char * pdata = (char *)pinter->strdatabuf; mpinfo = (procinter_info *)pdata; mphead = (procinter_head *)(pdata+sizeof(procinter_info)); WRITEMSG: char * pH,*pD; QDateTime dt; pH = (char *)pinter->strdatabuf;pH = pH + sizeof(procinter_info); pD = (char *)pinter->strdatabuf;pD = pD + sizeof(procinter_info) + pinter->nPacCount * sizeof(procinter_head); procinter_head * phh = (procinter_head *)pH; unsigned int nPac = mpinfo->mNext - mpinfo->mFirst; if(nPac>=pinter->nPacCount) { unsigned int nRemove = pinter->nPacCount/3; if(nRemove == 0)nRemove = 1; MoveMem(nRemove); goto WRITEMSG; } if(nPac == 0) { memcpy(pD,str,nSize); dt = QDateTime::currentDateTime(); phh->SetDate(dt); phh->mindex = mpinfo->mNext; phh->mnPos = 0; phh->mnLen = nSize; mpinfo->mNext = mpinfo->mNext+1; } else { if(((phh+nPac-1)->mnPos+(phh+nPac-1)->mnLen + nSize)>=pinter->nbufsize) { unsigned int nRemove = pinter->nPacCount/2; if(nRemove == 0)nRemove = 1; MoveMem(nRemove); goto WRITEMSG; } else { unsigned int nPos = (phh+nPac-1)->mnPos + (phh+nPac-1)->mnLen; memcpy(pD+nPos,str,nSize); dt = QDateTime::currentDateTime(); (phh+nPac)->SetDate(dt); (phh+nPac)->mindex = mpinfo->mNext; (phh+nPac)->mnPos = nPos; (phh+nPac)->mnLen = nSize; mpinfo->mNext = mpinfo->mNext+1; } } const unsigned int nTM = 0x6fffffff; if((mpinfo->mNext >nTM)&&(mpinfo->mFirst>nTM)) { nPac = mpinfo->mNext - mpinfo->mFirst; unsigned int i; for(i=0;imindex = (phh+i)->mindex-nTM; } mpinfo->mFirst = mpinfo->mFirst-nTM; mpinfo->mNext = mpinfo->mNext - nTM; } pinter->mMutexUnit.unlock(); pinter->mwc.wakeAll(); return 0; } int intercomm::MoveMem(const unsigned int nSize) { unsigned int nRemove = nSize; if(nRemove == 0)return -1; interunit * pinter = (interunit *)mpa; char * pH,*pD; pH = (char *)pinter->strdatabuf;pH = pH + sizeof(procinter_info); pD = (char *)pinter->strdatabuf;pD = pD + sizeof(procinter_info) + pinter->nPacCount * sizeof(procinter_head); procinter_head * phh = (procinter_head *)pH; unsigned int nPac = mpinfo->mNext - mpinfo->mFirst; if(nRemove >nPac) { nRemove = nPac; } if(nRemove == nPac) { mpinfo->mFirst = mpinfo->mFirst + (unsigned int)nRemove; return 0; } unsigned int i; int nDataMove = 0; for(i=0;imnLen; } unsigned int nDataTotal; for(i=0;i<(nPac - nRemove);i++) { memcpy(phh+i,phh+i+nRemove,sizeof(procinter_head)); (phh+i)->mnPos = (phh+i)->mnPos - nDataMove; } nDataTotal = (phh + nPac-nRemove-1)->mnPos + (phh+nPac-nRemove-1)->mnLen; char * strtem = new char[pinter->nbufsize]; memcpy(strtem,pD+nDataMove,nDataTotal); memcpy(pD,strtem,nDataTotal); delete strtem; mpinfo->mFirst = mpinfo->mFirst + (unsigned int)nRemove; return 0; } int intercomm::readmsg(unsigned int index, char *str, unsigned int nMaxSize, unsigned int *nRead, QDateTime *pdt) { int nRtn = 0; interunit * pinter = (interunit *)mpa; if(pinter->nbufsize == 0)return 0; pinter->mMutexUnit.lock(); char * pdata = (char *)pinter->strdatabuf; mpinfo = (procinter_info *)pdata; mphead = (procinter_head *)(pdata+sizeof(procinter_info)); if((index< mpinfo->mFirst)||(index > mpinfo->mNext)) { nRtn = -2; } if(nRtn != (-2)) { if(index == mpinfo->mNext) { nRtn = 0; } else { char * pH,*pD; // pH = (char *)mpASM->data();pH = pH + 2*sizeof(unsigned int); // pD = (char *)mpASM->data();pD = pD + 2*sizeof(unsigned int) + mnMaxPacCount * sizeof(procsm_head); pD = (char *)pinter->strdatabuf;pD = pD+ sizeof(procinter_info) + mpinfo->mCap*sizeof(procinter_head); pH = (char *)pinter->strdatabuf;pH = pH+sizeof(procinter_info); procinter_head * phh = (procinter_head *)pH; unsigned int nPac = mpinfo->mNext - mpinfo->mFirst; if(nPac == 0) { nRtn = 0; } else { unsigned int nPos = index - mpinfo->mFirst; *nRead = (phh+nPos)->mnLen; if((phh+nPos)->mnLen > nMaxSize) { nRtn = -1; } else { // qDebug("read pos = %d",(phh+nPos)->mnPos); memcpy(str,pD + (phh+nPos)->mnPos,(phh+nPos)->mnLen); // qDebug("read pos = %d",(phh+nPos)->mnPos); nRtn = (phh+nPos)->mnLen; (phh+nPos)->GetDate(pdt); // memcpy(pdt,&((phh+nPos)->mdt),sizeof(QDateTime)); } } } } pinter->mMutexUnit.unlock(); return nRtn; } unsigned int intercomm::getcurrentnext() { unsigned int nNext; interunit * pinter = (interunit *)mpa; char * pdata = (char *)pinter->strdatabuf; mpinfo = (procinter_info *)pdata; mphead = (procinter_head *)(pdata+sizeof(procinter_info)); nNext = mpinfo->mNext; return nNext; } }