procsm.cpp 15 KB


  1. #include <iostream>
  2. #include <thread>
  3. #include <QTime>
  4. #include <QThread>
  5. #include <algorithm>
  6. #include "procsm.h"
  7. class AttachThread : public QThread
  8. {
  9. public:
  10. AttachThread(QSharedMemory * pa,bool & bAttach)
  11. {
  12. mbAttach = bAttach;
  13. mpa = pa;
  14. mbrun = true;
  15. }
  16. QSharedMemory * mpa;
  17. bool mbAttach = false;
  18. bool mbrun = true;
  19. void run()
  20. {
  21. mbAttach = mpa->attach();
  22. mbrun = false;
  23. }
  24. };
  25. procsm::procsm(const char * strsmname,const unsigned int nBufSize,const unsigned int nMaxPacCount,const int nMode)
  26. {
  27. // mnBufSize = nBufSize;
  28. // qDebug("create dbus");
  29. strncpy(mstrsmname,strsmname,256);
  30. mpASMPtr = new QSharedMemory(strsmname);
  31. char strasmname[256];
  32. if(nMode == ModeWrite)
  33. {
  34. bool bres = mpASMPtr->attach();
  35. if(bres == false)
  36. {
  37. mpASMPtr->create(sizeof(ASM_PTR));
  38. ASM_PTR * pasm = (ASM_PTR *)mpASMPtr->data();
  39. snprintf(strasmname,256,"%s_%lld",strsmname,QDateTime::currentMSecsSinceEpoch());
  40. pasm->mnshmsize = sizeof(procsm_info)+nMaxPacCount*sizeof(procsm_head) + nBufSize;
  41. pasm->mnUpdateTime = QDateTime::currentMSecsSinceEpoch();
  42. strncpy(pasm->mstrshmname,strasmname,256);
  43. mASM_State = *pasm;
  44. }
  45. else
  46. {
  47. ASM_PTR * pasm = (ASM_PTR *)mpASMPtr->data();
  48. mASM_State = *pasm;
  49. }
  50. }
  51. else
  52. {
  53. return;
  54. }
  55. // mpASM = new QSharedMemory(strsmname);
  56. mpASM = new QSharedMemory(strasmname);
  57. if(nMode == ModeWrite)
  58. {
  59. mmodulemsg_type.mnBufSize = nBufSize;
  60. mmodulemsg_type.mnMsgBufCount = nMaxPacCount;
  61. strncpy(mmodulemsg_type.mstrmsgname,strasmname,255);
  62. #ifdef USEDBUS
  63. mmsg = QDBusMessage::createSignal("/catarc/adc", "adc.adciv.modulecomm", strsmname);
  64. mmsg<<1;
  65. #endif
  66. bool bAttach = false;
  67. AttachThread AT(mpASM,bAttach);
  68. AT.start();
  69. QTime xTime;
  70. xTime.start();
  71. while(xTime.elapsed()<100)
  72. {
  73. if(AT.mbrun == false)
  74. {
  75. bAttach = AT.mbAttach;
  76. break;
  77. }
  78. }
  79. // qDebug("time is %d",xTime.elapsed());
  80. if(xTime.elapsed()>= 1000)
  81. {
  82. qDebug("in 1000ms Attach fail.terminate it .");
  83. AT.terminate();
  84. bAttach = false;
  85. }
  86. // if(!mpASM->attach())
  87. if(!bAttach)
  88. {
  89. mpASM->create(sizeof(procsm_info)+nMaxPacCount*sizeof(procsm_head) + nBufSize);
  90. char * p = (char *)mpASM->data();
  91. mpinfo = (procsm_info *)p;
  92. mphead = (procsm_head *)(p+sizeof(procsm_info));
  93. mpinfo->mCap = nMaxPacCount;
  94. mpinfo->mnBufSize = nBufSize;
  95. mpinfo->mFirst = 0;
  96. mpinfo->mNext = 0;
  97. mpinfo->mLock = 0;
  98. }
  99. if(mpASM->isAttached())
  100. {
  101. mbAttach = true;
  102. char * p = (char *)mpASM->data();
  103. mpinfo = (procsm_info *)p;
  104. mphead = (procsm_head *)(p+sizeof(procsm_info));
  105. mnMaxPacCount = mpinfo->mCap;
  106. mnBufSize = mpinfo->mnBufSize;
  107. // qDebug("attach successful");
  108. mstrtem = new char[mnBufSize];
  109. #ifdef USEDBUS
  110. mmsgres = QDBusMessage::createSignal("/catarc/adc", "adciv.interface", "modulemsgres");
  111. mmsgres<<1;
  112. bool bconnect = QDBusConnection::sessionBus().connect(QString(),"/catarc/adc", "adciv.interface", "modulemsgquery",this,SLOT(onQuery()));
  113. if(bconnect == false)
  114. {
  115. std::cout<<"procsm_if_readthread::procsm_if_readthread bconect is false"<<std::endl;
  116. }
  117. #endif
  118. }
  119. else
  120. {
  121. mbAttach = false;
  122. qDebug("Share Memory Error.");
  123. }
  124. }
  125. }
  126. void procsm::recreateasm(int nbufsize)
  127. {
  128. mpASMPtr->lock();
  129. qDebug("recreate asms");
  130. mnBufSize = std::max(nbufsize*11/10,nbufsize+1000);
  131. // mnBufSize = nbufsize+100;
  132. char strasmname[256];
  133. ASM_PTR * pasm = (ASM_PTR *)mpASMPtr->data();
  134. snprintf(strasmname,256,"%s_%lld",mstrsmname,QDateTime::currentMSecsSinceEpoch());
  135. pasm->mnshmsize = sizeof(procsm_info)+mnMaxPacCount*sizeof(procsm_head) + mnBufSize;
  136. pasm->mnUpdateTime = QDateTime::currentMSecsSinceEpoch();
  137. strncpy(pasm->mstrshmname,strasmname,256);
  138. mASM_State = *pasm;
  139. mmodulemsg_type.mnBufSize = mnBufSize;
  140. mmodulemsg_type.mnMsgBufCount = mnMaxPacCount;
  141. strncpy(mmodulemsg_type.mstrmsgname,mASM_State.mstrshmname,255);
  142. mpASM->lock();
  143. int noldmemsize = mpASM->size();
  144. char * px = new char[mpASM->size()];
  145. memcpy(px,mpASM->data(),noldmemsize);
  146. mpASM->unlock();
  147. mpASM->detach();
  148. qDebug("new asm name is %s,buffer size is %d ",mASM_State.mstrshmname,mnBufSize);
  149. mpASM = new QSharedMemory(mASM_State.mstrshmname);
  150. bool bAttach = false;
  151. AttachThread AT(mpASM,bAttach);
  152. AT.start();
  153. QTime xTime;
  154. xTime.start();
  155. while(xTime.elapsed()<100)
  156. {
  157. if(AT.mbrun == false)
  158. {
  159. bAttach = AT.mbAttach;
  160. break;
  161. }
  162. }
  163. // qDebug("time is %d",xTime.elapsed());
  164. if(xTime.elapsed()>= 1000)
  165. {
  166. qDebug("in 1000ms Attach fail.terminate it .");
  167. AT.terminate();
  168. bAttach = false;
  169. }
  170. // if(!mpASM->attach())
  171. if(!bAttach)
  172. {
  173. mpASM->create(sizeof(procsm_info)+mnMaxPacCount*sizeof(procsm_head) + mnBufSize);
  174. memcpy(mpASM->data(),px,noldmemsize);
  175. char * p = (char *)mpASM->data();
  176. mpinfo = (procsm_info *)p;
  177. mphead = (procsm_head *)(p+sizeof(procsm_info));
  178. mpinfo->mCap = mnMaxPacCount;
  179. mpinfo->mnBufSize = mnBufSize;
  180. // mpinfo->mFirst = nfirst;
  181. // mpinfo->mNext = nnext;
  182. // mpinfo->mLock = 0;
  183. }
  184. if(mpASM->isAttached())
  185. {
  186. mbAttach = true;
  187. char * p = (char *)mpASM->data();
  188. mpinfo = (procsm_info *)p;
  189. mphead = (procsm_head *)(p+sizeof(procsm_info));
  190. mnMaxPacCount = mpinfo->mCap;
  191. mnBufSize = mpinfo->mnBufSize;
  192. // qDebug("attach successful");
  193. // mstrtem = new char[mnBufSize];
  194. }
  195. else
  196. {
  197. mbAttach = false;
  198. qDebug("Share Memory Error.");
  199. }
  200. mpASMPtr->unlock();
  201. delete px;
  202. }
  203. #ifdef USEDBUS
  204. void procsm::onQuery()
  205. {
  206. QByteArray ba;
  207. ba.append((char *)&mmodulemsg_type,sizeof(iv::modulemsg_type));
  208. QList<QVariant> x;
  209. x<<ba;
  210. mmsgres.setArguments(x);
  211. QDBusConnection::sessionBus().send(mmsgres);
  212. }
  213. #endif
  214. bool procsm::AttachMem()
  215. {
  216. if(!mpASMPtr->isAttached())mpASMPtr->attach();
  217. if(mpASMPtr->isAttached())
  218. {
  219. ASM_PTR * pasmptr = (ASM_PTR *)(mpASMPtr->data());
  220. mASM_State = * pasmptr;
  221. if(mpASM != 0)
  222. {
  223. if(mpASM->isAttached())mpASM->detach();
  224. delete mpASM;
  225. }
  226. mpASM = new QSharedMemory(mASM_State.mstrshmname);
  227. mpASM->attach();
  228. if(mpASM->isAttached())
  229. {
  230. mbAttach = true;
  231. char * p = (char *)mpASM->data();
  232. mpinfo = (procsm_info *)p;
  233. mphead = (procsm_head *)(p+sizeof(procsm_info));
  234. mnMaxPacCount = mpinfo->mCap;
  235. mnBufSize = mpinfo->mnBufSize;
  236. return true;
  237. }
  238. else
  239. {
  240. return false;
  241. }
  242. }
  243. else
  244. {
  245. return false;
  246. }
  247. return false;
  248. mpASM->attach();
  249. if(mpASM->isAttached())
  250. {
  251. mbAttach = true;
  252. char * p = (char *)mpASM->data();
  253. mpinfo = (procsm_info *)p;
  254. mphead = (procsm_head *)(p+sizeof(procsm_info));
  255. mnMaxPacCount = mpinfo->mCap;
  256. mnBufSize = mpinfo->mnBufSize;
  257. return true;
  258. }
  259. else
  260. {
  261. return false;
  262. }
  263. }
  264. int procsm::MoveMem(const unsigned int nSize)
  265. {
  266. // qDebug("move mem");
  267. unsigned int nRemove = nSize;
  268. if(nRemove == 0)return -1;
  269. // unsigned int * pIndexFirst = (unsigned int *)mpASM->data();
  270. // unsigned int * pIndexNext = pIndexFirst+1;
  271. // qDebug("first = %d next = %d",*pIndexFirst,*pIndexNext);
  272. // unsigned int * pIndexNext = pIndexFirst;
  273. char * pH,*pD;
  274. pH = (char *)mpASM->data();pH = pH + sizeof(procsm_info);
  275. pD = (char *)mpASM->data();pD = pD + sizeof(procsm_info) + mnMaxPacCount * sizeof(procsm_head);
  276. procsm_head * phh = (procsm_head *)pH;
  277. unsigned int nPac = mpinfo->mNext - mpinfo->mFirst;
  278. if(nRemove >nPac)
  279. {
  280. // qDebug("procsm::MoveMem nRemove > nPac nRemove = %d",nRemove);
  281. nRemove = nPac;
  282. }
  283. if(nRemove == nPac)
  284. {
  285. mpinfo->mFirst = mpinfo->mFirst + (unsigned int)nRemove;
  286. return 0;
  287. }
  288. unsigned int i;
  289. int nDataMove = 0;
  290. for(i=0;i<nRemove;i++)
  291. {
  292. procsm_head * phd = phh+i;
  293. nDataMove = nDataMove + phd->mnLen;
  294. }
  295. unsigned int nDataTotal;
  296. for(i=0;i<(nPac - nRemove);i++)
  297. {
  298. memcpy(phh+i,phh+i+nRemove,sizeof(procsm_head));
  299. (phh+i)->mnPos = (phh+i)->mnPos - nDataMove;
  300. }
  301. nDataTotal = (phh + nPac-nRemove-1)->mnPos + (phh+nPac-nRemove-1)->mnLen;
  302. memcpy(mstrtem,pD+nDataMove,nDataTotal);
  303. memcpy(pD,mstrtem,nDataTotal);
  304. // for(i=0;i<nDataTotal;i++)
  305. // {
  306. // *(pD+i) = *(pD+i+nDataMove);
  307. // }
  308. mpinfo->mFirst = mpinfo->mFirst + (unsigned int)nRemove;
  309. return 0;
  310. }
  311. void procsm::checkasm()
  312. {
  313. mpASMPtr->lock();
  314. ASM_PTR * pASM_PTR = (ASM_PTR * )mpASMPtr->data();
  315. if(pASM_PTR->mnUpdateTime == mASM_State.mnUpdateTime)
  316. {
  317. mpASMPtr->unlock();
  318. return;
  319. }
  320. qDebug("reattch mem.");
  321. mbAttach = false;
  322. AttachMem();
  323. mpASMPtr->unlock();
  324. }
  325. int procsm::writemsg(const char *str, const unsigned int nSize)
  326. {
  327. checkasm();
  328. if(nSize > mnBufSize)
  329. {
  330. if(nSize<1000000000)
  331. {
  332. recreateasm(nSize);
  333. checkasm();
  334. }
  335. else
  336. {
  337. qDebug("procsm::writemsg message size is very big");
  338. return -1;
  339. }
  340. }
  341. if(mbAttach == false)
  342. {
  343. std::cout<<"ShareMemory Attach fail."<<std::endl;
  344. return -1;
  345. }
  346. mpASM->lock();
  347. // unsigned int * pIndexFirst = (unsigned int *)mpASM->data();
  348. // unsigned int * pIndexNext = pIndexFirst+1;
  349. if(mpinfo->mLock == 1)
  350. {
  351. std::cout<<"ShareMemory have lock.Init."<<std::endl;
  352. mpinfo->mLock = 0;
  353. mpinfo->mFirst = 0;
  354. mpinfo->mNext = 0;
  355. }
  356. mpinfo->mLock =1;
  357. WRITEMSG:
  358. char * pH,*pD;
  359. QDateTime dt;
  360. pH = (char *)mpASM->data();pH = pH + sizeof(procsm_info);
  361. pD = (char *)mpASM->data();pD = pD + sizeof(procsm_info) + mnMaxPacCount * sizeof(procsm_head);
  362. procsm_head * phh = (procsm_head *)pH;
  363. unsigned int nPac = mpinfo->mNext - mpinfo->mFirst;
  364. if(nPac>=mnMaxPacCount)
  365. {
  366. unsigned int nRemove = mnMaxPacCount/3;
  367. if(nRemove == 0)nRemove = 1;
  368. MoveMem(nRemove);
  369. goto WRITEMSG;
  370. }
  371. if(nPac == 0)
  372. {
  373. memcpy(pD,str,nSize);
  374. dt = QDateTime::currentDateTime();
  375. // phh->mdt = dt;
  376. phh->SetDate(dt);
  377. // memcpy(&phh->mdt,&dt,sizeof(QDateTime));
  378. // phh->mdt = QDateTime::currentDateTime();
  379. phh->mindex = mpinfo->mNext;
  380. phh->mnPos = 0;
  381. phh->mnLen = nSize;
  382. mpinfo->mNext = mpinfo->mNext+1;
  383. }
  384. else
  385. {
  386. if(((phh+nPac-1)->mnPos+(phh+nPac-1)->mnLen + nSize)>=mnBufSize)
  387. {
  388. unsigned int nRemove = mnMaxPacCount/2;
  389. if(nRemove == 0)nRemove = 1;
  390. MoveMem(nRemove);
  391. goto WRITEMSG;
  392. }
  393. else
  394. {
  395. unsigned int nPos = (phh+nPac-1)->mnPos + (phh+nPac-1)->mnLen;
  396. // qDebug("write pos = %d",nPos);
  397. memcpy(pD+nPos,str,nSize);
  398. dt = QDateTime::currentDateTime();
  399. (phh+nPac)->SetDate(dt);
  400. // memcpy(&(phh+nPac)->mdt,&dt,sizeof(QDateTime));
  401. // (phh+nPac)->mdt = QDateTime::currentDateTime();
  402. (phh+nPac)->mindex = mpinfo->mNext;
  403. (phh+nPac)->mnPos = nPos;
  404. (phh+nPac)->mnLen = nSize;
  405. mpinfo->mNext = mpinfo->mNext+1;
  406. }
  407. }
  408. const unsigned int nTM = 0x6fffffff;
  409. if((mpinfo->mNext >nTM)&&(mpinfo->mFirst>nTM))
  410. {
  411. nPac = mpinfo->mNext - mpinfo->mFirst;
  412. unsigned int i;
  413. for(i=0;i<nPac;i++)
  414. {
  415. (phh+i)->mindex = (phh+i)->mindex-nTM;
  416. }
  417. mpinfo->mFirst = mpinfo->mFirst-nTM;
  418. mpinfo->mNext = mpinfo->mNext - nTM;
  419. }
  420. mpinfo->mLock = 0;
  421. mpASM->unlock();
  422. #ifdef USEDBUS
  423. QDBusConnection::sessionBus().send(mmsg);
  424. #endif
  425. return 0;
  426. }
  427. unsigned int procsm::getcurrentnext()
  428. {
  429. checkasm();
  430. unsigned int nNext;
  431. mpASM->lock();
  432. nNext = mpinfo->mNext;
  433. mpASM->unlock();
  434. return nNext;
  435. }
  436. //if return 0 No Data.
  437. //if return -1 nMaxSize is small
  438. //if retrun -2 index is not in range,call getcurrentnext get position
  439. //if return > 0 readdata
  440. int procsm::readmsg(unsigned int index, char *str, unsigned int nMaxSize,unsigned int * nRead,QDateTime * pdt)
  441. {
  442. checkasm();
  443. if(mbAttach == false)
  444. {
  445. std::cout<<"ShareMemory Attach fail."<<std::endl;
  446. return -1;
  447. }
  448. int nRtn = 0;
  449. mpASM->lock();
  450. if((index< mpinfo->mFirst)||(index > mpinfo->mNext))
  451. {
  452. nRtn = -2;
  453. }
  454. if(nRtn != (-2))
  455. {
  456. if(index == mpinfo->mNext)
  457. {
  458. nRtn = 0;
  459. }
  460. else
  461. {
  462. char * pH,*pD;
  463. // pH = (char *)mpASM->data();pH = pH + 2*sizeof(unsigned int);
  464. // pD = (char *)mpASM->data();pD = pD + 2*sizeof(unsigned int) + mnMaxPacCount * sizeof(procsm_head);
  465. pD = (char *)mpASM->data();pD = pD+ sizeof(procsm_info) + mpinfo->mCap*sizeof(procsm_head);
  466. pH = (char *)mpASM->data();pH = pH+sizeof(procsm_info);
  467. procsm_head * phh = (procsm_head *)pH;
  468. unsigned int nPac = mpinfo->mNext - mpinfo->mFirst;
  469. if(nPac == 0)
  470. {
  471. nRtn = 0;
  472. }
  473. else
  474. {
  475. unsigned int nPos = index - mpinfo->mFirst;
  476. *nRead = (phh+nPos)->mnLen;
  477. if((phh+nPos)->mnLen > nMaxSize)
  478. {
  479. nRtn = -1;
  480. }
  481. else
  482. {
  483. // qDebug("read pos = %d",(phh+nPos)->mnPos);
  484. memcpy(str,pD + (phh+nPos)->mnPos,(phh+nPos)->mnLen);
  485. // qDebug("read pos = %d",(phh+nPos)->mnPos);
  486. nRtn = (phh+nPos)->mnLen;
  487. (phh+nPos)->GetDate(pdt);
  488. // memcpy(pdt,&((phh+nPos)->mdt),sizeof(QDateTime));
  489. }
  490. }
  491. }
  492. }
  493. mpASM->unlock();
  494. return nRtn;
  495. }