procsm.cpp 14 KB


  1. #include <iostream>
  2. #include <thread>
  3. #include <QTime>
  4. #include <QThread>
  5. #include "procsm.h"
  6. class AttachThread : public QThread
  7. {
  8. public:
  9. AttachThread(QSharedMemory * pa,bool & bAttach)
  10. {
  11. mbAttach = bAttach;
  12. mpa = pa;
  13. mbrun = true;
  14. }
  15. QSharedMemory * mpa;
  16. bool mbAttach = false;
  17. bool mbrun = true;
  18. void run()
  19. {
  20. mbAttach = mpa->attach();
  21. mbrun = false;
  22. }
  23. };
  24. procsm::procsm(const char * strsmname,const unsigned int nBufSize,const unsigned int nMaxPacCount,const int nMode)
  25. {
  26. // mnBufSize = nBufSize;
  27. // qDebug("create dbus");
  28. strncpy(mstrsmname,strsmname,256);
  29. mpASMPtr = new QSharedMemory(strsmname);
  30. char strasmname[256];
  31. if(nMode == ModeWrite)
  32. {
  33. bool bres = mpASMPtr->attach();
  34. if(bres == false)
  35. {
  36. mpASMPtr->create(sizeof(ASM_PTR));
  37. ASM_PTR * pasm = (ASM_PTR *)mpASMPtr->data();
  38. snprintf(strasmname,256,"%s_%lld",strsmname,QDateTime::currentMSecsSinceEpoch());
  39. pasm->mnshmsize = sizeof(procsm_info)+nMaxPacCount*sizeof(procsm_head) + nBufSize;
  40. pasm->mnUpdateTime = QDateTime::currentMSecsSinceEpoch();
  41. strncpy(pasm->mstrshmname,strasmname,256);
  42. mASM_State = *pasm;
  43. }
  44. else
  45. {
  46. ASM_PTR * pasm = (ASM_PTR *)mpASMPtr->data();
  47. mASM_State = *pasm;
  48. }
  49. }
  50. else
  51. {
  52. return;
  53. }
  54. // mpASM = new QSharedMemory(strsmname);
  55. mpASM = new QSharedMemory(strasmname);
  56. if(nMode == ModeWrite)
  57. {
  58. mmodulemsg_type.mnBufSize = nBufSize;
  59. mmodulemsg_type.mnMsgBufCount = nMaxPacCount;
  60. strncpy(mmodulemsg_type.mstrmsgname,strsmname,255);
  61. #ifdef USEDBUS
  62. mmsg = QDBusMessage::createSignal("/catarc/adc", "adc.adciv.modulecomm", strsmname);
  63. mmsg<<1;
  64. #endif
  65. bool bAttach = false;
  66. AttachThread AT(mpASM,bAttach);
  67. AT.start();
  68. QTime xTime;
  69. xTime.start();
  70. while(xTime.elapsed()<100)
  71. {
  72. if(AT.mbrun == false)
  73. {
  74. bAttach = AT.mbAttach;
  75. break;
  76. }
  77. }
  78. // qDebug("time is %d",xTime.elapsed());
  79. if(xTime.elapsed()>= 1000)
  80. {
  81. qDebug("in 1000ms Attach fail.terminate it .");
  82. AT.terminate();
  83. bAttach = false;
  84. }
  85. // if(!mpASM->attach())
  86. if(!bAttach)
  87. {
  88. mpASM->create(sizeof(procsm_info)+nMaxPacCount*sizeof(procsm_head) + nBufSize);
  89. char * p = (char *)mpASM->data();
  90. mpinfo = (procsm_info *)p;
  91. mphead = (procsm_head *)(p+sizeof(procsm_info));
  92. mpinfo->mCap = nMaxPacCount;
  93. mpinfo->mnBufSize = nBufSize;
  94. mpinfo->mFirst = 0;
  95. mpinfo->mNext = 0;
  96. mpinfo->mLock = 0;
  97. }
  98. if(mpASM->isAttached())
  99. {
  100. mbAttach = true;
  101. char * p = (char *)mpASM->data();
  102. mpinfo = (procsm_info *)p;
  103. mphead = (procsm_head *)(p+sizeof(procsm_info));
  104. mnMaxPacCount = mpinfo->mCap;
  105. mnBufSize = mpinfo->mnBufSize;
  106. // qDebug("attach successful");
  107. mstrtem = new char[mnBufSize];
  108. #ifdef USEDBUS
  109. mmsgres = QDBusMessage::createSignal("/catarc/adc", "adciv.interface", "modulemsgres");
  110. mmsgres<<1;
  111. bool bconnect = QDBusConnection::sessionBus().connect(QString(),"/catarc/adc", "adciv.interface", "modulemsgquery",this,SLOT(onQuery()));
  112. if(bconnect == false)
  113. {
  114. std::cout<<"procsm_if_readthread::procsm_if_readthread bconect is false"<<std::endl;
  115. }
  116. #endif
  117. }
  118. else
  119. {
  120. mbAttach = false;
  121. qDebug("Share Memory Error.");
  122. }
  123. }
  124. }
  125. void procsm::recreateasm(int nbufsize)
  126. {
  127. mnBufSize = nbufsize+100;
  128. char strasmname[256];
  129. ASM_PTR * pasm = (ASM_PTR *)mpASMPtr->data();
  130. snprintf(strasmname,256,"%s_%lld",mstrsmname,QDateTime::currentMSecsSinceEpoch());
  131. pasm->mnshmsize = sizeof(procsm_info)+mnMaxPacCount*sizeof(procsm_head) + mnBufSize;
  132. pasm->mnUpdateTime = QDateTime::currentMSecsSinceEpoch();
  133. strncpy(pasm->mstrshmname,strasmname,256);
  134. mASM_State = *pasm;
  135. mmodulemsg_type.mnBufSize = mnBufSize;
  136. mmodulemsg_type.mnMsgBufCount = mnMaxPacCount;
  137. strncpy(mmodulemsg_type.mstrmsgname,mASM_State.mstrshmname,255);
  138. mpASM = new QSharedMemory(mASM_State.mstrshmname);
  139. bool bAttach = false;
  140. AttachThread AT(mpASM,bAttach);
  141. AT.start();
  142. QTime xTime;
  143. xTime.start();
  144. while(xTime.elapsed()<100)
  145. {
  146. if(AT.mbrun == false)
  147. {
  148. bAttach = AT.mbAttach;
  149. break;
  150. }
  151. }
  152. // qDebug("time is %d",xTime.elapsed());
  153. if(xTime.elapsed()>= 1000)
  154. {
  155. qDebug("in 1000ms Attach fail.terminate it .");
  156. AT.terminate();
  157. bAttach = false;
  158. }
  159. // if(!mpASM->attach())
  160. if(!bAttach)
  161. {
  162. mpASM->create(sizeof(procsm_info)+mnMaxPacCount*sizeof(procsm_head) + mnBufSize);
  163. char * p = (char *)mpASM->data();
  164. mpinfo = (procsm_info *)p;
  165. mphead = (procsm_head *)(p+sizeof(procsm_info));
  166. mpinfo->mCap = mnMaxPacCount;
  167. mpinfo->mnBufSize = mnBufSize;
  168. mpinfo->mFirst = 0;
  169. mpinfo->mNext = 0;
  170. mpinfo->mLock = 0;
  171. }
  172. if(mpASM->isAttached())
  173. {
  174. mbAttach = true;
  175. char * p = (char *)mpASM->data();
  176. mpinfo = (procsm_info *)p;
  177. mphead = (procsm_head *)(p+sizeof(procsm_info));
  178. mnMaxPacCount = mpinfo->mCap;
  179. mnBufSize = mpinfo->mnBufSize;
  180. // qDebug("attach successful");
  181. mstrtem = new char[mnBufSize];
  182. }
  183. else
  184. {
  185. mbAttach = false;
  186. qDebug("Share Memory Error.");
  187. }
  188. }
  189. #ifdef USEDBUS
  190. void procsm::onQuery()
  191. {
  192. QByteArray ba;
  193. ba.append((char *)&mmodulemsg_type,sizeof(iv::modulemsg_type));
  194. QList<QVariant> x;
  195. x<<ba;
  196. mmsgres.setArguments(x);
  197. QDBusConnection::sessionBus().send(mmsgres);
  198. }
  199. #endif
  200. bool procsm::AttachMem()
  201. {
  202. mpASMPtr->attach();
  203. if(mpASMPtr->isAttached())
  204. {
  205. ASM_PTR * pasmptr = (ASM_PTR *)(mpASMPtr->data());
  206. mASM_State = * pasmptr;
  207. mpASM = new QSharedMemory(mASM_State.mstrshmname);
  208. mpASM->attach();
  209. if(mpASM->isAttached())
  210. {
  211. mbAttach = true;
  212. char * p = (char *)mpASM->data();
  213. mpinfo = (procsm_info *)p;
  214. mphead = (procsm_head *)(p+sizeof(procsm_info));
  215. mnMaxPacCount = mpinfo->mCap;
  216. mnBufSize = mpinfo->mnBufSize;
  217. return true;
  218. }
  219. else
  220. {
  221. return false;
  222. }
  223. }
  224. else
  225. {
  226. return false;
  227. }
  228. return false;
  229. mpASM->attach();
  230. if(mpASM->isAttached())
  231. {
  232. mbAttach = true;
  233. char * p = (char *)mpASM->data();
  234. mpinfo = (procsm_info *)p;
  235. mphead = (procsm_head *)(p+sizeof(procsm_info));
  236. mnMaxPacCount = mpinfo->mCap;
  237. mnBufSize = mpinfo->mnBufSize;
  238. return true;
  239. }
  240. else
  241. {
  242. return false;
  243. }
  244. }
  245. int procsm::MoveMem(const unsigned int nSize)
  246. {
  247. // qDebug("move mem");
  248. unsigned int nRemove = nSize;
  249. if(nRemove == 0)return -1;
  250. // unsigned int * pIndexFirst = (unsigned int *)mpASM->data();
  251. // unsigned int * pIndexNext = pIndexFirst+1;
  252. // qDebug("first = %d next = %d",*pIndexFirst,*pIndexNext);
  253. // unsigned int * pIndexNext = pIndexFirst;
  254. char * pH,*pD;
  255. pH = (char *)mpASM->data();pH = pH + sizeof(procsm_info);
  256. pD = (char *)mpASM->data();pD = pD + sizeof(procsm_info) + mnMaxPacCount * sizeof(procsm_head);
  257. procsm_head * phh = (procsm_head *)pH;
  258. unsigned int nPac = mpinfo->mNext - mpinfo->mFirst;
  259. if(nRemove >nPac)
  260. {
  261. // qDebug("procsm::MoveMem nRemove > nPac nRemove = %d",nRemove);
  262. nRemove = nPac;
  263. }
  264. if(nRemove == nPac)
  265. {
  266. mpinfo->mFirst = mpinfo->mFirst + (unsigned int)nRemove;
  267. return 0;
  268. }
  269. unsigned int i;
  270. int nDataMove = 0;
  271. for(i=0;i<nRemove;i++)
  272. {
  273. procsm_head * phd = phh+i;
  274. nDataMove = nDataMove + phd->mnLen;
  275. }
  276. unsigned int nDataTotal;
  277. for(i=0;i<(nPac - nRemove);i++)
  278. {
  279. memcpy(phh+i,phh+i+nRemove,sizeof(procsm_head));
  280. (phh+i)->mnPos = (phh+i)->mnPos - nDataMove;
  281. }
  282. nDataTotal = (phh + nPac-nRemove-1)->mnPos + (phh+nPac-nRemove-1)->mnLen;
  283. memcpy(mstrtem,pD+nDataMove,nDataTotal);
  284. memcpy(pD,mstrtem,nDataTotal);
  285. // for(i=0;i<nDataTotal;i++)
  286. // {
  287. // *(pD+i) = *(pD+i+nDataMove);
  288. // }
  289. mpinfo->mFirst = mpinfo->mFirst + (unsigned int)nRemove;
  290. return 0;
  291. }
  292. void procsm::checkasm()
  293. {
  294. ASM_PTR * pASM_PTR = (ASM_PTR * )mpASMPtr->data();
  295. if(pASM_PTR->mnUpdateTime == mASM_State.mnUpdateTime)
  296. {
  297. return;
  298. }
  299. mbAttach = false;
  300. AttachMem();
  301. }
  302. int procsm::writemsg(const char *str, const unsigned int nSize)
  303. {
  304. checkasm();
  305. if(nSize > mnBufSize)
  306. {
  307. if(nSize<1000000000)
  308. {
  309. recreateasm(nSize);
  310. checkasm();
  311. }
  312. else
  313. {
  314. qDebug("procsm::writemsg message size is very big");
  315. return -1;
  316. }
  317. }
  318. if(mbAttach == false)
  319. {
  320. std::cout<<"ShareMemory Attach fail."<<std::endl;
  321. return -1;
  322. }
  323. mpASM->lock();
  324. // unsigned int * pIndexFirst = (unsigned int *)mpASM->data();
  325. // unsigned int * pIndexNext = pIndexFirst+1;
  326. if(mpinfo->mLock == 1)
  327. {
  328. std::cout<<"ShareMemory have lock.Init."<<std::endl;
  329. mpinfo->mLock = 0;
  330. mpinfo->mFirst = 0;
  331. mpinfo->mNext = 0;
  332. }
  333. mpinfo->mLock =1;
  334. WRITEMSG:
  335. char * pH,*pD;
  336. QDateTime dt;
  337. pH = (char *)mpASM->data();pH = pH + sizeof(procsm_info);
  338. pD = (char *)mpASM->data();pD = pD + sizeof(procsm_info) + mnMaxPacCount * sizeof(procsm_head);
  339. procsm_head * phh = (procsm_head *)pH;
  340. unsigned int nPac = mpinfo->mNext - mpinfo->mFirst;
  341. if(nPac>=mnMaxPacCount)
  342. {
  343. unsigned int nRemove = mnMaxPacCount/3;
  344. if(nRemove == 0)nRemove = 1;
  345. MoveMem(nRemove);
  346. goto WRITEMSG;
  347. }
  348. if(nPac == 0)
  349. {
  350. memcpy(pD,str,nSize);
  351. dt = QDateTime::currentDateTime();
  352. // phh->mdt = dt;
  353. phh->SetDate(dt);
  354. // memcpy(&phh->mdt,&dt,sizeof(QDateTime));
  355. // phh->mdt = QDateTime::currentDateTime();
  356. phh->mindex = mpinfo->mNext;
  357. phh->mnPos = 0;
  358. phh->mnLen = nSize;
  359. mpinfo->mNext = mpinfo->mNext+1;
  360. }
  361. else
  362. {
  363. if(((phh+nPac-1)->mnPos+(phh+nPac-1)->mnLen + nSize)>=mnBufSize)
  364. {
  365. unsigned int nRemove = mnMaxPacCount/2;
  366. if(nRemove == 0)nRemove = 1;
  367. MoveMem(nRemove);
  368. goto WRITEMSG;
  369. }
  370. else
  371. {
  372. unsigned int nPos = (phh+nPac-1)->mnPos + (phh+nPac-1)->mnLen;
  373. // qDebug("write pos = %d",nPos);
  374. memcpy(pD+nPos,str,nSize);
  375. dt = QDateTime::currentDateTime();
  376. (phh+nPac)->SetDate(dt);
  377. // memcpy(&(phh+nPac)->mdt,&dt,sizeof(QDateTime));
  378. // (phh+nPac)->mdt = QDateTime::currentDateTime();
  379. (phh+nPac)->mindex = mpinfo->mNext;
  380. (phh+nPac)->mnPos = nPos;
  381. (phh+nPac)->mnLen = nSize;
  382. mpinfo->mNext = mpinfo->mNext+1;
  383. }
  384. }
  385. const unsigned int nTM = 0x6fffffff;
  386. if((mpinfo->mNext >nTM)&&(mpinfo->mFirst>nTM))
  387. {
  388. nPac = mpinfo->mNext - mpinfo->mFirst;
  389. unsigned int i;
  390. for(i=0;i<nPac;i++)
  391. {
  392. (phh+i)->mindex = (phh+i)->mindex-nTM;
  393. }
  394. mpinfo->mFirst = mpinfo->mFirst-nTM;
  395. mpinfo->mNext = mpinfo->mNext - nTM;
  396. }
  397. mpinfo->mLock = 0;
  398. mpASM->unlock();
  399. #ifdef USEDBUS
  400. QDBusConnection::sessionBus().send(mmsg);
  401. #endif
  402. return 0;
  403. }
  404. unsigned int procsm::getcurrentnext()
  405. {
  406. unsigned int nNext;
  407. mpASM->lock();
  408. nNext = mpinfo->mNext;
  409. mpASM->unlock();
  410. return nNext;
  411. }
  412. //if return 0 No Data.
  413. //if return -1 nMaxSize is small
  414. //if retrun -2 index is not in range,call getcurrentnext get position
  415. //if return > 0 readdata
  416. int procsm::readmsg(unsigned int index, char *str, unsigned int nMaxSize,unsigned int * nRead,QDateTime * pdt)
  417. {
  418. if(mbAttach == false)
  419. {
  420. std::cout<<"ShareMemory Attach fail."<<std::endl;
  421. return -1;
  422. }
  423. checkasm();
  424. int nRtn = 0;
  425. mpASM->lock();
  426. if((index< mpinfo->mFirst)||(index > mpinfo->mNext))
  427. {
  428. nRtn = -2;
  429. }
  430. if(nRtn != (-2))
  431. {
  432. if(index == mpinfo->mNext)
  433. {
  434. nRtn = 0;
  435. }
  436. else
  437. {
  438. char * pH,*pD;
  439. // pH = (char *)mpASM->data();pH = pH + 2*sizeof(unsigned int);
  440. // pD = (char *)mpASM->data();pD = pD + 2*sizeof(unsigned int) + mnMaxPacCount * sizeof(procsm_head);
  441. pD = (char *)mpASM->data();pD = pD+ sizeof(procsm_info) + mpinfo->mCap*sizeof(procsm_head);
  442. pH = (char *)mpASM->data();pH = pH+sizeof(procsm_info);
  443. procsm_head * phh = (procsm_head *)pH;
  444. unsigned int nPac = mpinfo->mNext - mpinfo->mFirst;
  445. if(nPac == 0)
  446. {
  447. nRtn = 0;
  448. }
  449. else
  450. {
  451. unsigned int nPos = index - mpinfo->mFirst;
  452. *nRead = (phh+nPos)->mnLen;
  453. if((phh+nPos)->mnLen > nMaxSize)
  454. {
  455. nRtn = -1;
  456. }
  457. else
  458. {
  459. // qDebug("read pos = %d",(phh+nPos)->mnPos);
  460. memcpy(str,pD + (phh+nPos)->mnPos,(phh+nPos)->mnLen);
  461. // qDebug("read pos = %d",(phh+nPos)->mnPos);
  462. nRtn = (phh+nPos)->mnLen;
  463. (phh+nPos)->GetDate(pdt);
  464. // memcpy(pdt,&((phh+nPos)->mdt),sizeof(QDateTime));
  465. }
  466. }
  467. }
  468. }
  469. mpASM->unlock();
  470. return nRtn;
  471. }