procsm.cpp 15 KB


  1. #include <iostream>
  2. #include <thread>
  3. #include <QTime>
  4. #include <QThread>
  5. #include <algorithm>
  6. #include "procsm.h"
  7. class AttachThread : public QThread
  8. {
  9. public:
  10. AttachThread(QSharedMemory * pa,bool & bAttach)
  11. {
  12. mbAttach = bAttach;
  13. mpa = pa;
  14. mbrun = true;
  15. }
  16. QSharedMemory * mpa;
  17. bool mbAttach = false;
  18. bool mbrun = true;
  19. void run()
  20. {
  21. mbAttach = mpa->attach();
  22. mbrun = false;
  23. }
  24. };
  25. procsm::procsm(const char * strsmname,const unsigned int nBufSize,const unsigned int nMaxPacCount,const int nMode)
  26. {
  27. // mnBufSize = nBufSize;
  28. // qDebug("create dbus");
  29. strncpy(mstrsmname,strsmname,256);
  30. mpASMPtr = new QSharedMemory(strsmname);
  31. char strasmname[256];
  32. if(nMode == ModeWrite)
  33. {
  34. bool bres = mpASMPtr->attach();
  35. if(bres == false)
  36. {
  37. mpASMPtr->create(sizeof(ASM_PTR));
  38. ASM_PTR * pasm = (ASM_PTR *)mpASMPtr->data();
  39. snprintf(strasmname,256,"%s_%lld",strsmname,QDateTime::currentMSecsSinceEpoch());
  40. pasm->mnshmsize = sizeof(procsm_info)+nMaxPacCount*sizeof(procsm_head) + nBufSize;
  41. pasm->mnUpdateTime = QDateTime::currentMSecsSinceEpoch();
  42. strncpy(pasm->mstrshmname,strasmname,256);
  43. mASM_State = *pasm;
  44. }
  45. else
  46. {
  47. ASM_PTR * pasm = (ASM_PTR *)mpASMPtr->data();
  48. mASM_State = *pasm;
  49. }
  50. }
  51. else
  52. {
  53. return;
  54. }
  55. // mpASM = new QSharedMemory(strsmname);
  56. mpASM = new QSharedMemory(strasmname);
  57. if(nMode == ModeWrite)
  58. {
  59. mmodulemsg_type.mnBufSize = nBufSize;
  60. mmodulemsg_type.mnMsgBufCount = nMaxPacCount;
  61. strncpy(mmodulemsg_type.mstrmsgname,strasmname,255);
  62. #ifdef USEDBUS
  63. mmsg = QDBusMessage::createSignal("/catarc/adc", "adc.adciv.modulecomm", strsmname);
  64. mmsg<<1;
  65. #endif
  66. bool bAttach = false;
  67. AttachThread AT(mpASM,bAttach);
  68. AT.start();
  69. QTime xTime;
  70. xTime.start();
  71. while(xTime.elapsed()<100)
  72. {
  73. if(AT.mbrun == false)
  74. {
  75. bAttach = AT.mbAttach;
  76. break;
  77. }
  78. }
  79. // qDebug("time is %d",xTime.elapsed());
  80. if(xTime.elapsed()>= 1000)
  81. {
  82. qDebug("in 1000ms Attach fail.terminate it .");
  83. AT.terminate();
  84. bAttach = false;
  85. }
  86. // if(!mpASM->attach())
  87. if(!bAttach)
  88. {
  89. mpASM->create(sizeof(procsm_info)+nMaxPacCount*sizeof(procsm_head) + nBufSize);
  90. char * p = (char *)mpASM->data();
  91. mpinfo = (procsm_info *)p;
  92. mphead = (procsm_head *)(p+sizeof(procsm_info));
  93. mpinfo->mCap = nMaxPacCount;
  94. mpinfo->mnBufSize = nBufSize;
  95. mpinfo->mFirst = 0;
  96. mpinfo->mNext = 0;
  97. mpinfo->mLock = 0;
  98. }
  99. if(mpASM->isAttached())
  100. {
  101. mbAttach = true;
  102. char * p = (char *)mpASM->data();
  103. mpinfo = (procsm_info *)p;
  104. mphead = (procsm_head *)(p+sizeof(procsm_info));
  105. mnMaxPacCount = mpinfo->mCap;
  106. mnBufSize = mpinfo->mnBufSize;
  107. // qDebug("attach successful");
  108. mstrtem = new char[mnBufSize];
  109. #ifdef USEDBUS
  110. mmsgres = QDBusMessage::createSignal("/catarc/adc", "adciv.interface", "modulemsgres");
  111. mmsgres<<1;
  112. bool bconnect = QDBusConnection::sessionBus().connect(QString(),"/catarc/adc", "adciv.interface", "modulemsgquery",this,SLOT(onQuery()));
  113. if(bconnect == false)
  114. {
  115. std::cout<<"procsm_if_readthread::procsm_if_readthread bconect is false"<<std::endl;
  116. }
  117. #endif
  118. }
  119. else
  120. {
  121. mbAttach = false;
  122. qDebug("Share Memory Error.");
  123. }
  124. }
  125. }
  126. void procsm::recreateasm(int nbufsize)
  127. {
  128. mpASMPtr->lock();
  129. qDebug("recreate asms");
  130. mnBufSize = std::max(nbufsize*11/10,nbufsize+1000);
  131. // mnBufSize = nbufsize+100;
  132. char strasmname[256];
  133. ASM_PTR * pasm = (ASM_PTR *)mpASMPtr->data();
  134. snprintf(strasmname,256,"%s_%lld",mstrsmname,QDateTime::currentMSecsSinceEpoch());
  135. pasm->mnshmsize = sizeof(procsm_info)+mnMaxPacCount*sizeof(procsm_head) + mnBufSize;
  136. pasm->mnUpdateTime = QDateTime::currentMSecsSinceEpoch();
  137. strncpy(pasm->mstrshmname,strasmname,256);
  138. mASM_State = *pasm;
  139. mmodulemsg_type.mnBufSize = mnBufSize;
  140. mmodulemsg_type.mnMsgBufCount = mnMaxPacCount;
  141. strncpy(mmodulemsg_type.mstrmsgname,mASM_State.mstrshmname,255);
  142. mpASM->lock();
  143. int noldmemsize = mpASM->size();
  144. char * px = new char[mpASM->size()];
  145. memcpy(px,mpASM->data(),noldmemsize);
  146. mpASM->unlock();
  147. mpASM->detach();
  148. qDebug("new asm name is %s,buffer size is %d ",mASM_State.mstrshmname,mnBufSize);
  149. mpASM = new QSharedMemory(mASM_State.mstrshmname);
  150. bool bAttach = false;
  151. AttachThread AT(mpASM,bAttach);
  152. AT.start();
  153. QTime xTime;
  154. xTime.start();
  155. while(xTime.elapsed()<100)
  156. {
  157. if(AT.mbrun == false)
  158. {
  159. bAttach = AT.mbAttach;
  160. break;
  161. }
  162. }
  163. // qDebug("time is %d",xTime.elapsed());
  164. if(xTime.elapsed()>= 1000)
  165. {
  166. qDebug("in 1000ms Attach fail.terminate it .");
  167. AT.terminate();
  168. bAttach = false;
  169. }
  170. // if(!mpASM->attach())
  171. if(!bAttach)
  172. {
  173. mpASM->create(sizeof(procsm_info)+mnMaxPacCount*sizeof(procsm_head) + mnBufSize);
  174. memcpy(mpASM->data(),px,noldmemsize);
  175. char * p = (char *)mpASM->data();
  176. mpinfo = (procsm_info *)p;
  177. mphead = (procsm_head *)(p+sizeof(procsm_info));
  178. mpinfo->mCap = mnMaxPacCount;
  179. mpinfo->mnBufSize = mnBufSize;
  180. // mpinfo->mFirst = nfirst;
  181. // mpinfo->mNext = nnext;
  182. // mpinfo->mLock = 0;
  183. }
  184. if(mpASM->isAttached())
  185. {
  186. mbAttach = true;
  187. char * p = (char *)mpASM->data();
  188. mpinfo = (procsm_info *)p;
  189. mphead = (procsm_head *)(p+sizeof(procsm_info));
  190. mnMaxPacCount = mpinfo->mCap;
  191. mnBufSize = mpinfo->mnBufSize;
  192. // qDebug("attach successful");
  193. // mstrtem = new char[mnBufSize];
  194. }
  195. else
  196. {
  197. mbAttach = false;
  198. qDebug("Share Memory Error.");
  199. }
  200. mpASMPtr->unlock();
  201. delete px;
  202. }
  203. #ifdef USEDBUS
  204. void procsm::onQuery()
  205. {
  206. QByteArray ba;
  207. ba.append((char *)&mmodulemsg_type,sizeof(iv::modulemsg_type));
  208. QList<QVariant> x;
  209. x<<ba;
  210. mmsgres.setArguments(x);
  211. QDBusConnection::sessionBus().send(mmsgres);
  212. }
  213. #endif
  214. bool procsm::AttachMem()
  215. {
  216. if(!mpASMPtr->isAttached())mpASMPtr->attach();
  217. if(mpASMPtr->isAttached())
  218. {
  219. ASM_PTR * pasmptr = (ASM_PTR *)(mpASMPtr->data());
  220. mASM_State = * pasmptr;
  221. if(mpASM != 0)
  222. {
  223. if(mpASM->isAttached())mpASM->detach();
  224. }
  225. mpASM = new QSharedMemory(mASM_State.mstrshmname);
  226. mpASM->attach();
  227. if(mpASM->isAttached())
  228. {
  229. mbAttach = true;
  230. char * p = (char *)mpASM->data();
  231. mpinfo = (procsm_info *)p;
  232. mphead = (procsm_head *)(p+sizeof(procsm_info));
  233. mnMaxPacCount = mpinfo->mCap;
  234. mnBufSize = mpinfo->mnBufSize;
  235. return true;
  236. }
  237. else
  238. {
  239. return false;
  240. }
  241. }
  242. else
  243. {
  244. return false;
  245. }
  246. return false;
  247. mpASM->attach();
  248. if(mpASM->isAttached())
  249. {
  250. mbAttach = true;
  251. char * p = (char *)mpASM->data();
  252. mpinfo = (procsm_info *)p;
  253. mphead = (procsm_head *)(p+sizeof(procsm_info));
  254. mnMaxPacCount = mpinfo->mCap;
  255. mnBufSize = mpinfo->mnBufSize;
  256. return true;
  257. }
  258. else
  259. {
  260. return false;
  261. }
  262. }
  263. int procsm::MoveMem(const unsigned int nSize)
  264. {
  265. // qDebug("move mem");
  266. unsigned int nRemove = nSize;
  267. if(nRemove == 0)return -1;
  268. // unsigned int * pIndexFirst = (unsigned int *)mpASM->data();
  269. // unsigned int * pIndexNext = pIndexFirst+1;
  270. // qDebug("first = %d next = %d",*pIndexFirst,*pIndexNext);
  271. // unsigned int * pIndexNext = pIndexFirst;
  272. char * pH,*pD;
  273. pH = (char *)mpASM->data();pH = pH + sizeof(procsm_info);
  274. pD = (char *)mpASM->data();pD = pD + sizeof(procsm_info) + mnMaxPacCount * sizeof(procsm_head);
  275. procsm_head * phh = (procsm_head *)pH;
  276. unsigned int nPac = mpinfo->mNext - mpinfo->mFirst;
  277. if(nRemove >nPac)
  278. {
  279. // qDebug("procsm::MoveMem nRemove > nPac nRemove = %d",nRemove);
  280. nRemove = nPac;
  281. }
  282. if(nRemove == nPac)
  283. {
  284. mpinfo->mFirst = mpinfo->mFirst + (unsigned int)nRemove;
  285. return 0;
  286. }
  287. unsigned int i;
  288. int nDataMove = 0;
  289. for(i=0;i<nRemove;i++)
  290. {
  291. procsm_head * phd = phh+i;
  292. nDataMove = nDataMove + phd->mnLen;
  293. }
  294. unsigned int nDataTotal;
  295. for(i=0;i<(nPac - nRemove);i++)
  296. {
  297. memcpy(phh+i,phh+i+nRemove,sizeof(procsm_head));
  298. (phh+i)->mnPos = (phh+i)->mnPos - nDataMove;
  299. }
  300. nDataTotal = (phh + nPac-nRemove-1)->mnPos + (phh+nPac-nRemove-1)->mnLen;
  301. memcpy(mstrtem,pD+nDataMove,nDataTotal);
  302. memcpy(pD,mstrtem,nDataTotal);
  303. // for(i=0;i<nDataTotal;i++)
  304. // {
  305. // *(pD+i) = *(pD+i+nDataMove);
  306. // }
  307. mpinfo->mFirst = mpinfo->mFirst + (unsigned int)nRemove;
  308. return 0;
  309. }
  310. void procsm::checkasm()
  311. {
  312. mpASMPtr->lock();
  313. ASM_PTR * pASM_PTR = (ASM_PTR * )mpASMPtr->data();
  314. if(pASM_PTR->mnUpdateTime == mASM_State.mnUpdateTime)
  315. {
  316. mpASMPtr->unlock();
  317. return;
  318. }
  319. qDebug("reattch mem.");
  320. mbAttach = false;
  321. AttachMem();
  322. mpASMPtr->unlock();
  323. }
  324. int procsm::writemsg(const char *str, const unsigned int nSize)
  325. {
  326. checkasm();
  327. if(nSize > mnBufSize)
  328. {
  329. if(nSize<1000000000)
  330. {
  331. recreateasm(nSize);
  332. checkasm();
  333. }
  334. else
  335. {
  336. qDebug("procsm::writemsg message size is very big");
  337. return -1;
  338. }
  339. }
  340. if(mbAttach == false)
  341. {
  342. std::cout<<"ShareMemory Attach fail."<<std::endl;
  343. return -1;
  344. }
  345. mpASM->lock();
  346. // unsigned int * pIndexFirst = (unsigned int *)mpASM->data();
  347. // unsigned int * pIndexNext = pIndexFirst+1;
  348. if(mpinfo->mLock == 1)
  349. {
  350. std::cout<<"ShareMemory have lock.Init."<<std::endl;
  351. mpinfo->mLock = 0;
  352. mpinfo->mFirst = 0;
  353. mpinfo->mNext = 0;
  354. }
  355. mpinfo->mLock =1;
  356. WRITEMSG:
  357. char * pH,*pD;
  358. QDateTime dt;
  359. pH = (char *)mpASM->data();pH = pH + sizeof(procsm_info);
  360. pD = (char *)mpASM->data();pD = pD + sizeof(procsm_info) + mnMaxPacCount * sizeof(procsm_head);
  361. procsm_head * phh = (procsm_head *)pH;
  362. unsigned int nPac = mpinfo->mNext - mpinfo->mFirst;
  363. if(nPac>=mnMaxPacCount)
  364. {
  365. unsigned int nRemove = mnMaxPacCount/3;
  366. if(nRemove == 0)nRemove = 1;
  367. MoveMem(nRemove);
  368. goto WRITEMSG;
  369. }
  370. if(nPac == 0)
  371. {
  372. memcpy(pD,str,nSize);
  373. dt = QDateTime::currentDateTime();
  374. // phh->mdt = dt;
  375. phh->SetDate(dt);
  376. // memcpy(&phh->mdt,&dt,sizeof(QDateTime));
  377. // phh->mdt = QDateTime::currentDateTime();
  378. phh->mindex = mpinfo->mNext;
  379. phh->mnPos = 0;
  380. phh->mnLen = nSize;
  381. mpinfo->mNext = mpinfo->mNext+1;
  382. }
  383. else
  384. {
  385. if(((phh+nPac-1)->mnPos+(phh+nPac-1)->mnLen + nSize)>=mnBufSize)
  386. {
  387. unsigned int nRemove = mnMaxPacCount/2;
  388. if(nRemove == 0)nRemove = 1;
  389. MoveMem(nRemove);
  390. goto WRITEMSG;
  391. }
  392. else
  393. {
  394. unsigned int nPos = (phh+nPac-1)->mnPos + (phh+nPac-1)->mnLen;
  395. // qDebug("write pos = %d",nPos);
  396. memcpy(pD+nPos,str,nSize);
  397. dt = QDateTime::currentDateTime();
  398. (phh+nPac)->SetDate(dt);
  399. // memcpy(&(phh+nPac)->mdt,&dt,sizeof(QDateTime));
  400. // (phh+nPac)->mdt = QDateTime::currentDateTime();
  401. (phh+nPac)->mindex = mpinfo->mNext;
  402. (phh+nPac)->mnPos = nPos;
  403. (phh+nPac)->mnLen = nSize;
  404. mpinfo->mNext = mpinfo->mNext+1;
  405. }
  406. }
  407. const unsigned int nTM = 0x6fffffff;
  408. if((mpinfo->mNext >nTM)&&(mpinfo->mFirst>nTM))
  409. {
  410. nPac = mpinfo->mNext - mpinfo->mFirst;
  411. unsigned int i;
  412. for(i=0;i<nPac;i++)
  413. {
  414. (phh+i)->mindex = (phh+i)->mindex-nTM;
  415. }
  416. mpinfo->mFirst = mpinfo->mFirst-nTM;
  417. mpinfo->mNext = mpinfo->mNext - nTM;
  418. }
  419. mpinfo->mLock = 0;
  420. mpASM->unlock();
  421. #ifdef USEDBUS
  422. QDBusConnection::sessionBus().send(mmsg);
  423. #endif
  424. return 0;
  425. }
  426. unsigned int procsm::getcurrentnext()
  427. {
  428. checkasm();
  429. unsigned int nNext;
  430. mpASM->lock();
  431. nNext = mpinfo->mNext;
  432. mpASM->unlock();
  433. return nNext;
  434. }
  435. //if return 0 No Data.
  436. //if return -1 nMaxSize is small
  437. //if retrun -2 index is not in range,call getcurrentnext get position
  438. //if return > 0 readdata
  439. int procsm::readmsg(unsigned int index, char *str, unsigned int nMaxSize,unsigned int * nRead,QDateTime * pdt)
  440. {
  441. checkasm();
  442. if(mbAttach == false)
  443. {
  444. std::cout<<"ShareMemory Attach fail."<<std::endl;
  445. return -1;
  446. }
  447. int nRtn = 0;
  448. mpASM->lock();
  449. if((index< mpinfo->mFirst)||(index > mpinfo->mNext))
  450. {
  451. nRtn = -2;
  452. }
  453. if(nRtn != (-2))
  454. {
  455. if(index == mpinfo->mNext)
  456. {
  457. nRtn = 0;
  458. }
  459. else
  460. {
  461. char * pH,*pD;
  462. // pH = (char *)mpASM->data();pH = pH + 2*sizeof(unsigned int);
  463. // pD = (char *)mpASM->data();pD = pD + 2*sizeof(unsigned int) + mnMaxPacCount * sizeof(procsm_head);
  464. pD = (char *)mpASM->data();pD = pD+ sizeof(procsm_info) + mpinfo->mCap*sizeof(procsm_head);
  465. pH = (char *)mpASM->data();pH = pH+sizeof(procsm_info);
  466. procsm_head * phh = (procsm_head *)pH;
  467. unsigned int nPac = mpinfo->mNext - mpinfo->mFirst;
  468. if(nPac == 0)
  469. {
  470. nRtn = 0;
  471. }
  472. else
  473. {
  474. unsigned int nPos = index - mpinfo->mFirst;
  475. *nRead = (phh+nPos)->mnLen;
  476. if((phh+nPos)->mnLen > nMaxSize)
  477. {
  478. nRtn = -1;
  479. }
  480. else
  481. {
  482. // qDebug("read pos = %d",(phh+nPos)->mnPos);
  483. memcpy(str,pD + (phh+nPos)->mnPos,(phh+nPos)->mnLen);
  484. // qDebug("read pos = %d",(phh+nPos)->mnPos);
  485. nRtn = (phh+nPos)->mnLen;
  486. (phh+nPos)->GetDate(pdt);
  487. // memcpy(pdt,&((phh+nPos)->mdt),sizeof(QDateTime));
  488. }
  489. }
  490. }
  491. }
  492. mpASM->unlock();
  493. return nRtn;
  494. }