procsm_if.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600
  1. #include "procsm_if.h"
  2. #include <QTimer>
  3. #include <iostream>
  4. #include <thread>
  5. #include <chrono>
  6. procsm_if_readthread::procsm_if_readthread(procsm *pPSM,SMCallBack pCall,const char * strsmname,void * pparent)
  7. {
  8. mpparent = pparent;
  9. mpPSM = pPSM;
  10. mpCall = pCall;
  11. strncpy(mstrsmname,strsmname,255);
  12. #ifndef USE_GROUPUDP
  13. #ifdef USEDBUS
  14. bool bconnect = QDBusConnection::sessionBus().connect(QString(),"/catarc/adc", "adc.adciv.modulecomm", strsmname,this,SLOT(onNewMsg(int)));
  15. if(bconnect == false)
  16. {
  17. std::cout<<"procsm_if_readthread::procsm_if_readthread bconect is false"<<std::endl;
  18. }
  19. #endif
  20. #endif
  21. }
  22. procsm_if_readthread::procsm_if_readthread(procsm *pPSM,ModuleFun xFun,const char * strsmname,void * pparent)
  23. {
  24. mpparent = pparent;
  25. mpPSM = pPSM;
  26. mFun = xFun;
  27. strncpy(mstrsmname,strsmname,255);
  28. mbFunPlus = true;
  29. #ifndef USE_GROUPUDP
  30. #ifdef USEDBUS
  31. bool bconnect = QDBusConnection::sessionBus().connect(QString(),"/catarc/adc", "adc.adciv.modulecomm", strsmname,this,SLOT(onNewMsg(int)));
  32. if(bconnect == false)
  33. {
  34. std::cout<<"procsm_if_readthread::procsm_if_readthread bconect is false"<<std::endl;
  35. mbDBUSOK = false;
  36. QTimer * timer = new QTimer();
  37. timer->setTimerType(Qt::PreciseTimer);
  38. delete timer;
  39. }
  40. #endif
  41. #endif
  42. }
  43. #ifdef USELCM
  44. void procsm_if_readthread::handlerMethod(const lcm::ReceiveBuffer *rbuf, const std::string &channel)
  45. {
  46. qDebug("lcm receiv data. ");
  47. mxindex++;
  48. QDateTime dt = QDateTime::currentDateTime();
  49. if(mbFunPlus)
  50. {
  51. mFun((char *)rbuf->data,rbuf->data_size,mxindex,&dt,mstrsmname);
  52. }
  53. else
  54. {
  55. (*mpCall)((char *)rbuf->data,rbuf->data_size,mxindex,&dt,mstrsmname);
  56. }
  57. }
  58. #endif
  59. void procsm_if_readthread::puaseread()
  60. {
  61. mbRun = false;
  62. }
  63. void procsm_if_readthread::continueread()
  64. {
  65. mbRun = true;
  66. }
  67. void procsm_if_readthread::run()
  68. {
  69. #ifdef USELCM
  70. mlcm.subscribe(mstrsmname,&procsm_if_readthread::handlerMethod,this);
  71. while(!QThread::isInterruptionRequested())
  72. {
  73. mlcm.handle();
  74. }
  75. return;
  76. #endif
  77. // QTime xTime;
  78. // xTime.start();
  79. unsigned int nBufLen = 1;
  80. unsigned int nRead;
  81. char * str = new char[nBufLen];
  82. unsigned int index =0;
  83. QDateTime *pdt = new QDateTime();
  84. bool bAttach = false;
  85. bool bFirstRead = true; //if First Read,not read the buffer data.
  86. while(!QThread::isInterruptionRequested())
  87. {
  88. if(mbRun == false)
  89. {
  90. msleep(10);
  91. continue;
  92. }
  93. if(bAttach == false)
  94. {
  95. bAttach = mpPSM->AttachMem();
  96. if(bAttach == false)
  97. {
  98. msleep(100);
  99. continue;
  100. }
  101. else
  102. {
  103. index = mpPSM->getcurrentnext();
  104. }
  105. }
  106. int nRtn = mpPSM->readmsg(index,str,nBufLen,&nRead,pdt);
  107. if(nRtn == 0)
  108. {
  109. #ifndef USE_GROUPUDP
  110. #ifdef USEDBUS
  111. if(mbDBUSOK == true)
  112. {
  113. mWaitMutex.lock();
  114. mwc.wait(&mWaitMutex,10);
  115. mWaitMutex.unlock();
  116. }
  117. else
  118. {
  119. msleep(1);
  120. }
  121. #else
  122. msleep(1);
  123. #endif
  124. #else
  125. procsm_if * mpif = (procsm_if * )mpparent;
  126. if(mpif->WaitNotify()<0)
  127. {
  128. std::cout<<"Wait Notify Error. Please Check. "<<std::endl;
  129. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  130. }
  131. // mWaitMutex.lock();
  132. // mwc.wait(&mWaitMutex,100);
  133. // mWaitMutex.unlock();
  134. #endif
  135. }
  136. else
  137. {
  138. if(nRtn == -1)
  139. {
  140. nBufLen = nRead;
  141. delete str;
  142. if(nBufLen < 1)nBufLen = 1;
  143. str = new char[nBufLen];
  144. // std::cout<<"modulecomm resize str to "<<nBufLen<<std::endl;
  145. }
  146. else
  147. {
  148. if((nRtn == -2) ||(nRtn == -100)||(nRtn == -101))
  149. {
  150. index = mpPSM->getcurrentnext();
  151. //comment next code, skip some message, when change asm.
  152. // if(nRtn == -100) //Because New ShareMemory, read this.
  153. // {
  154. // if((index > 0)&&(bFirstRead == false) )index = index -1;
  155. // }
  156. }
  157. else
  158. {
  159. if(nRtn >0)
  160. {
  161. if(mbFunPlus)
  162. {
  163. mFun(str,nRtn,index,pdt,mstrsmname);
  164. }
  165. else
  166. {
  167. (*mpCall)(str,nRtn,index,pdt,mstrsmname);
  168. }
  169. index++;
  170. }
  171. else
  172. {
  173. usleep(100);
  174. }
  175. }
  176. }
  177. }
  178. bFirstRead = false;
  179. }
  180. delete str;
  181. delete pdt;
  182. // qDebug("Thread finish.");
  183. }
  184. #ifdef USE_GROUPUDP
  185. void procsm_if_readthread::WakeRead()
  186. {
  187. mwc.wakeAll();
  188. }
  189. #endif
  190. #ifndef USE_GROUPUDP
  191. #ifdef USEDBUS
  192. void procsm_if_readthread::onNewMsg(int x)
  193. {
  194. if(x == 100)std::cout<<x<<std::endl;
  195. mwc.wakeAll();
  196. // qDebug("wake");
  197. }
  198. #endif
  199. #endif
  200. procsm_if::procsm_if(const char * strsmname,const unsigned int nBufSize,const unsigned int nMaxPacCount,const int nMode)
  201. {
  202. strncpy(mstrsmname,strsmname,255);
  203. #ifdef USELCM
  204. if(nMode == procsm::ModeWrite)
  205. {
  206. }
  207. else
  208. {
  209. }
  210. return;
  211. #endif
  212. mpPSM = new procsm(strsmname,nBufSize,nMaxPacCount,nMode);
  213. mnType = nMode;
  214. mTimer.setTimerType(Qt::PreciseTimer);
  215. #ifdef USE_GROUPUDP
  216. InitSock();
  217. mKey = CalcKey(mstrsmname);
  218. #endif
  219. }
  220. procsm_if::~procsm_if()
  221. {
  222. if(mnType == procsm::ModeRead)
  223. {
  224. #ifdef USE_GROUPUDP
  225. mbGroupRecvRun = false;
  226. // mpthread->join();
  227. #endif
  228. int64_t nstart = std::chrono::system_clock::now().time_since_epoch().count();
  229. // QTime xTime;
  230. // xTime.start();
  231. bool bDel = true;
  232. mpReadThread->requestInterruption();
  233. while(!mpReadThread->isFinished())
  234. {
  235. int64_t nnow = std::chrono::system_clock::now().time_since_epoch().count();
  236. int64_t ndiff = nnow - nstart;
  237. if(abs(ndiff)>1e9)
  238. // if(xTime.elapsed() > 1000)
  239. {
  240. bDel = false;
  241. std::cout<<"procsm_if Thread Read not finish."<<std::endl;
  242. break;
  243. }
  244. }
  245. if(bDel)delete mpReadThread;
  246. }
  247. else
  248. {
  249. }
  250. delete mpPSM;
  251. }
  252. int procsm_if::writemsg(const char *str, const unsigned int nSize)
  253. {
  254. if(mbRun == false)return -2;
  255. #ifdef USELCM
  256. int nres = mlcm.publish(mstrsmname,str,nSize);
  257. qDebug("publish message. res = %d",nres);
  258. mmutex.unlock();
  259. return 0;
  260. #endif
  261. if(mnType == procsm::ModeRead)return -1; //this is listen.
  262. int nRes = mpPSM->writemsg(str,nSize);
  263. #ifdef USE_GROUPUDP
  264. SendGroupMsg();
  265. #endif
  266. return nRes;
  267. }
  268. #ifdef USELCM
  269. void procsm_if::handlerMethod(const lcm::ReceiveBuffer *rbuf, const std::string &channel)
  270. {
  271. qDebug("receiv data. ");
  272. }
  273. #endif
  274. int procsm_if::listenmsg(SMCallBack pCall)
  275. {
  276. //#ifdef USELCM
  277. //// mlcm.subscribe(mstrsmname,&handlerMethod2);
  278. // mlcm.subscribe(mstrsmname,&procsm_if::handlerMethod,this);
  279. // while(true)
  280. // {
  281. // mlcm.handle();
  282. // }
  283. // return 0;
  284. //#endif
  285. if(mnType == procsm::ModeWrite)return -1; //listening.
  286. mpReadThread = new procsm_if_readthread(mpPSM,pCall,mstrsmname,this);
  287. // mpReadThread->setPriority(QThread::TimeCriticalPriority);
  288. // mpReadThread->start();
  289. mpReadThread->start(QThread::HighestPriority);
  290. #ifdef USE_GROUPUDP
  291. // mpthread = new std::thread(&procsm_if::ThreadGroupRecv,this);
  292. #endif
  293. // mnType = 1;
  294. return 0;
  295. }
  296. int procsm_if::listenmsg(ModuleFun xFun)
  297. {
  298. //#ifdef USELCM
  299. // mlcm.subscribe(mstrsmname,&procsm_if::handlerMethod,this);
  300. // while(true)
  301. // {
  302. // mlcm.handle();
  303. // }
  304. // return 0;
  305. //#endif
  306. if(mnType == procsm::ModeWrite)return -1; //listening.
  307. mpReadThread = new procsm_if_readthread(mpPSM,xFun,mstrsmname,this);
  308. // mpReadThread->setPriority(QThread::TimeCriticalPriority);
  309. // mpReadThread->start();
  310. mpReadThread->start(QThread::HighestPriority);
  311. #ifdef USE_GROUPUDP
  312. // mpthread = new std::thread(&procsm_if::ThreadGroupRecv,this);
  313. #endif
  314. // mnType = 1;
  315. return 0;
  316. }
  317. void procsm_if::stoplisten()
  318. {
  319. if(mnType != 1)return;
  320. mpReadThread->requestInterruption();
  321. while(!mpReadThread->isFinished());
  322. mnType = 0;
  323. // mpReadThread->deleteLater();
  324. qDebug("stop listen ok");
  325. }
  326. void procsm_if::pausecomm()
  327. {
  328. mbRun = false;
  329. if(mnType == procsm::ModeRead)
  330. {
  331. mpReadThread->puaseread();
  332. }
  333. }
  334. void procsm_if::continuecomm()
  335. {
  336. mbRun = true;
  337. if(mnType == procsm::ModeRead)
  338. {
  339. mpReadThread->continueread();
  340. }
  341. }
  342. #ifdef USE_GROUPUDP
  343. void procsm_if::InitSock()
  344. {
  345. std::string mcast_ip("236.236.100.100");
  346. uint16_t mcast_port = 8888;
  347. notify_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
  348. if (notify_fd_ == -1) {
  349. std::cout << "fail to create notify fd, " << strerror(errno);
  350. return ;
  351. }
  352. memset(&notify_addr_, 0, sizeof(notify_addr_));
  353. notify_addr_.sin_family = AF_INET;
  354. notify_addr_.sin_addr.s_addr = inet_addr(mcast_ip.c_str());
  355. notify_addr_.sin_port = htons(mcast_port);
  356. listen_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
  357. if (listen_fd_ == -1) {
  358. std::cout << "fail to create listen fd, " << strerror(errno);
  359. return ;
  360. }
  361. if (fcntl(listen_fd_, F_SETFL, O_NONBLOCK) == -1) {
  362. std::cout << "fail to set listen fd nonblock, " << strerror(errno);
  363. return ;
  364. }
  365. memset(&listen_addr_, 0, sizeof(listen_addr_));
  366. listen_addr_.sin_family = AF_INET;
  367. listen_addr_.sin_addr.s_addr = htonl(INADDR_ANY);
  368. listen_addr_.sin_port = htons(mcast_port);
  369. int yes = 1;
  370. if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
  371. std::cout << "fail to setsockopt SO_REUSEADDR, " << strerror(errno);
  372. return ;
  373. }
  374. if (bind(listen_fd_, (struct sockaddr*)&listen_addr_, sizeof(listen_addr_)) <
  375. 0) {
  376. std::cout << "fail to bind addr, " << strerror(errno);
  377. return ;
  378. }
  379. int loop = 1;
  380. if (setsockopt(listen_fd_, IPPROTO_IP, IP_MULTICAST_LOOP, &loop,
  381. sizeof(loop)) < 0) {
  382. std::cout << "fail to setsockopt IP_MULTICAST_LOOP, " << strerror(errno);
  383. return ;
  384. }
  385. struct ip_mreq mreq;
  386. mreq.imr_multiaddr.s_addr = inet_addr(mcast_ip.c_str());
  387. mreq.imr_interface.s_addr = htonl(INADDR_ANY);
  388. if (setsockopt(listen_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
  389. sizeof(mreq)) < 0) {
  390. std::cout << "fail to setsockopt IP_ADD_MEMBERSHIP, " << strerror(errno);
  391. return ;
  392. }
  393. return ;
  394. }
  395. /**
  396. * @brief procsm_if::WaitNotify
  397. * @param timeout_ms
  398. * @return if have notify return 1,otherwise,return 0
  399. */
  400. int procsm_if::WaitNotify(int timeout_ms)
  401. {
  402. int nrtn = 0;
  403. struct pollfd fds;
  404. fds.fd = listen_fd_;
  405. fds.events = POLLIN;
  406. int ready_num = poll(&fds, 1, timeout_ms);
  407. if (ready_num > 0) {
  408. char buf[32] = {0}; // larger than ReadableInfo::kSize
  409. ssize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);
  410. if (nbytes == -1) {
  411. std::cout << "fail to recvfrom, " << strerror(errno)<<std::endl;
  412. return -1;
  413. }
  414. if(nbytes == 8)
  415. {
  416. // std::cout<<"recv key."<<std::endl;
  417. qint64 * pkey = (qint64 *)buf;
  418. if(*pkey == mKey)
  419. {
  420. return 1;
  421. // mpReadThread->WakeRead();
  422. //Waaa;
  423. }
  424. else
  425. {
  426. // std::cout<<"key error"<<std::endl;
  427. }
  428. }
  429. // return info->DeserializeFrom(buf, nbytes);
  430. } else if (ready_num == 0) {
  431. return 0;
  432. } else {
  433. nrtn = -2;
  434. if (errno == EINTR) {
  435. std::cout << "poll was interrupted."<<std::endl;
  436. } else {
  437. std::cout << "fail to poll, " << strerror(errno);
  438. }
  439. }
  440. return nrtn;
  441. }
  442. void procsm_if::ThreadGroupRecv()
  443. {
  444. int timeout_ms = 100;
  445. while(mbGroupRecvRun)
  446. {
  447. // std::cout<<"poll"<<std::endl;
  448. struct pollfd fds;
  449. fds.fd = listen_fd_;
  450. fds.events = POLLIN;
  451. int ready_num = poll(&fds, 1, timeout_ms);
  452. if (ready_num > 0) {
  453. char buf[32] = {0}; // larger than ReadableInfo::kSize
  454. ssize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);
  455. if (nbytes == -1) {
  456. std::cout << "fail to recvfrom, " << strerror(errno);
  457. continue;
  458. }
  459. if(nbytes == 8)
  460. {
  461. // std::cout<<"recv key."<<std::endl;
  462. qint64 * pkey = (qint64 *)buf;
  463. if(*pkey == mKey)
  464. {
  465. mpReadThread->WakeRead();
  466. //Waaa;
  467. }
  468. else
  469. {
  470. // std::cout<<"key error"<<std::endl;
  471. }
  472. }
  473. // return info->DeserializeFrom(buf, nbytes);
  474. } else if (ready_num == 0) {
  475. // std::cout << "timeout, no readableinfo.";
  476. continue;
  477. } else {
  478. if (errno == EINTR) {
  479. std::cout << "poll was interrupted."<<std::endl;
  480. } else {
  481. std::cout << "fail to poll, " << strerror(errno);
  482. }
  483. }
  484. }
  485. }
  486. qint64 procsm_if::CalcKey(const char *strsmname)
  487. {
  488. qint64 xkey = 0;
  489. int i;
  490. int nlen = strnlen(strsmname,256);
  491. for(i=0;i<nlen;i++)
  492. {
  493. qint64 temp = *(strsmname+i);
  494. temp = temp<<(4+i);
  495. xkey = xkey + temp;
  496. if(i>=3)break;
  497. }
  498. for(i=0;i<nlen;i++)
  499. {
  500. qint64 temp = *(strsmname+nlen-i-1);
  501. temp = temp<<i;
  502. xkey = xkey + temp;
  503. if(i>=3)break;
  504. }
  505. for(i=0;i<(int)strnlen(strsmname,256);i++)
  506. {
  507. qint64 temp = *(strsmname+i);
  508. xkey = xkey + temp;
  509. }
  510. return xkey;
  511. }
  512. void procsm_if::SendGroupMsg()
  513. {
  514. // qint64 timenow = std::chrono::system_clock::now().time_since_epoch().count();
  515. ssize_t nbytes =
  516. sendto(notify_fd_, &mKey, sizeof(qint64), 0,
  517. (struct sockaddr*)&notify_addr_, sizeof(notify_addr_));
  518. if(nbytes <= 0)
  519. {
  520. std::cout<<"Send Fail."<<std::endl;
  521. }
  522. }
  523. #endif