procsm_if.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. #include "procsm_if.h"
  2. #include <QTimer>
  3. #include <iostream>
  4. procsm_if_readthread::procsm_if_readthread(procsm *pPSM,SMCallBack pCall,const char * strsmname,void * pparent)
  5. {
  6. mpparent = pparent;
  7. mpPSM = pPSM;
  8. mpCall = pCall;
  9. strncpy(mstrsmname,strsmname,255);
  10. #ifndef USE_GROUPUDP
  11. #ifdef USEDBUS
  12. bool bconnect = QDBusConnection::sessionBus().connect(QString(),"/catarc/adc", "adc.adciv.modulecomm", strsmname,this,SLOT(onNewMsg(int)));
  13. if(bconnect == false)
  14. {
  15. std::cout<<"procsm_if_readthread::procsm_if_readthread bconect is false"<<std::endl;
  16. }
  17. #endif
  18. #endif
  19. }
  20. procsm_if_readthread::procsm_if_readthread(procsm *pPSM,ModuleFun xFun,const char * strsmname,void * pparent)
  21. {
  22. mpparent = pparent;
  23. mpPSM = pPSM;
  24. mFun = xFun;
  25. strncpy(mstrsmname,strsmname,255);
  26. mbFunPlus = true;
  27. #ifndef USE_GROUPUDP
  28. #ifdef USEDBUS
  29. bool bconnect = QDBusConnection::sessionBus().connect(QString(),"/catarc/adc", "adc.adciv.modulecomm", strsmname,this,SLOT(onNewMsg(int)));
  30. if(bconnect == false)
  31. {
  32. std::cout<<"procsm_if_readthread::procsm_if_readthread bconect is false"<<std::endl;
  33. mbDBUSOK = false;
  34. QTimer * timer = new QTimer();
  35. timer->setTimerType(Qt::PreciseTimer);
  36. delete timer;
  37. }
  38. #endif
  39. #endif
  40. }
  41. #ifdef USELCM
  42. void procsm_if_readthread::handlerMethod(const lcm::ReceiveBuffer *rbuf, const std::string &channel)
  43. {
  44. qDebug("lcm receiv data. ");
  45. mxindex++;
  46. QDateTime dt = QDateTime::currentDateTime();
  47. if(mbFunPlus)
  48. {
  49. mFun((char *)rbuf->data,rbuf->data_size,mxindex,&dt,mstrsmname);
  50. }
  51. else
  52. {
  53. (*mpCall)((char *)rbuf->data,rbuf->data_size,mxindex,&dt,mstrsmname);
  54. }
  55. }
  56. #endif
  57. void procsm_if_readthread::puaseread()
  58. {
  59. mbRun = false;
  60. }
  61. void procsm_if_readthread::continueread()
  62. {
  63. mbRun = true;
  64. }
  65. void procsm_if_readthread::run()
  66. {
  67. #ifdef USELCM
  68. mlcm.subscribe(mstrsmname,&procsm_if_readthread::handlerMethod,this);
  69. while(!QThread::isInterruptionRequested())
  70. {
  71. mlcm.handle();
  72. }
  73. return;
  74. #endif
  75. QTime xTime;
  76. xTime.start();
  77. unsigned int nBufLen = 1;
  78. unsigned int nRead;
  79. char * str = new char[nBufLen];
  80. unsigned int index =0;
  81. QDateTime *pdt = new QDateTime();
  82. bool bAttach = false;
  83. bool bFirstRead = true; //if First Read,not read the buffer data.
  84. while(!QThread::isInterruptionRequested())
  85. {
  86. if(mbRun == false)
  87. {
  88. msleep(10);
  89. continue;
  90. }
  91. if(bAttach == false)
  92. {
  93. bAttach = mpPSM->AttachMem();
  94. if(bAttach == false)
  95. {
  96. msleep(100);
  97. continue;
  98. }
  99. else
  100. {
  101. index = mpPSM->getcurrentnext();
  102. }
  103. }
  104. int nRtn = mpPSM->readmsg(index,str,nBufLen,&nRead,pdt);
  105. if(nRtn == 0)
  106. {
  107. #ifndef USE_GROUPUDP
  108. #ifdef USEDBUS
  109. if(mbDBUSOK == true)
  110. {
  111. mWaitMutex.lock();
  112. mwc.wait(&mWaitMutex,10);
  113. mWaitMutex.unlock();
  114. }
  115. else
  116. {
  117. msleep(1);
  118. }
  119. #else
  120. msleep(1);
  121. #endif
  122. #else
  123. procsm_if * mpif = (procsm_if * )mpparent;
  124. if(mpif->WaitNotify()<0)
  125. {
  126. std::cout<<"Wait Notify Error. Please Check. "<<std::endl;
  127. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  128. }
  129. // mWaitMutex.lock();
  130. // mwc.wait(&mWaitMutex,100);
  131. // mWaitMutex.unlock();
  132. #endif
  133. }
  134. else
  135. {
  136. if(nRtn == -1)
  137. {
  138. nBufLen = nRead;
  139. delete str;
  140. if(nBufLen < 1)nBufLen = 1;
  141. str = new char[nBufLen];
  142. // std::cout<<"modulecomm resize str to "<<nBufLen<<std::endl;
  143. }
  144. else
  145. {
  146. if((nRtn == -2) ||(nRtn == -100)||(nRtn == -101))
  147. {
  148. index = mpPSM->getcurrentnext();
  149. if(nRtn == -100) //Because New ShareMemory, read this.
  150. {
  151. if((index > 0)&&(bFirstRead == false) )index = index -1;
  152. }
  153. }
  154. else
  155. {
  156. if(nRtn >0)
  157. {
  158. if(mbFunPlus)
  159. {
  160. mFun(str,nRtn,index,pdt,mstrsmname);
  161. }
  162. else
  163. {
  164. (*mpCall)(str,nRtn,index,pdt,mstrsmname);
  165. }
  166. index++;
  167. }
  168. else
  169. {
  170. usleep(100);
  171. }
  172. }
  173. }
  174. }
  175. bFirstRead = false;
  176. }
  177. delete str;
  178. delete pdt;
  179. // qDebug("Thread finish.");
  180. }
  181. #ifdef USE_GROUPUDP
  182. void procsm_if_readthread::WakeRead()
  183. {
  184. mwc.wakeAll();
  185. }
  186. #endif
  187. #ifndef USE_GROUPUDP
  188. #ifdef USEDBUS
  189. void procsm_if_readthread::onNewMsg(int x)
  190. {
  191. if(x == 100)std::cout<<x<<std::endl;
  192. mwc.wakeAll();
  193. // qDebug("wake");
  194. }
  195. #endif
  196. #endif
  197. procsm_if::procsm_if(const char * strsmname,const unsigned int nBufSize,const unsigned int nMaxPacCount,const int nMode)
  198. {
  199. strncpy(mstrsmname,strsmname,255);
  200. #ifdef USELCM
  201. if(nMode == procsm::ModeWrite)
  202. {
  203. }
  204. else
  205. {
  206. }
  207. return;
  208. #endif
  209. mpPSM = new procsm(strsmname,nBufSize,nMaxPacCount,nMode);
  210. mnType = nMode;
  211. mTimer.setTimerType(Qt::PreciseTimer);
  212. #ifdef USE_GROUPUDP
  213. InitSock();
  214. mKey = CalcKey(mstrsmname);
  215. #endif
  216. }
  217. procsm_if::~procsm_if()
  218. {
  219. if(mnType == procsm::ModeRead)
  220. {
  221. #ifdef USE_GROUPUDP
  222. mbGroupRecvRun = false;
  223. // mpthread->join();
  224. #endif
  225. QTime xTime;
  226. xTime.start();
  227. bool bDel = true;
  228. mpReadThread->requestInterruption();
  229. while(!mpReadThread->isFinished())
  230. {
  231. if(xTime.elapsed() > 1000)
  232. {
  233. bDel = false;
  234. std::cout<<"procsm_if Thread Read not finish."<<std::endl;
  235. break;
  236. }
  237. }
  238. if(bDel)delete mpReadThread;
  239. }
  240. else
  241. {
  242. }
  243. delete mpPSM;
  244. }
  245. int procsm_if::writemsg(const char *str, const unsigned int nSize)
  246. {
  247. if(mbRun == false)return -2;
  248. #ifdef USELCM
  249. int nres = mlcm.publish(mstrsmname,str,nSize);
  250. qDebug("publish message. res = %d",nres);
  251. return 0;
  252. #endif
  253. if(mnType == procsm::ModeRead)return -1; //this is listen.
  254. int nRes = mpPSM->writemsg(str,nSize);
  255. #ifdef USE_GROUPUDP
  256. SendGroupMsg();
  257. #endif
  258. return nRes;
  259. }
  260. #ifdef USELCM
  261. void procsm_if::handlerMethod(const lcm::ReceiveBuffer *rbuf, const std::string &channel)
  262. {
  263. qDebug("receiv data. ");
  264. }
  265. #endif
  266. int procsm_if::listenmsg(SMCallBack pCall)
  267. {
  268. //#ifdef USELCM
  269. //// mlcm.subscribe(mstrsmname,&handlerMethod2);
  270. // mlcm.subscribe(mstrsmname,&procsm_if::handlerMethod,this);
  271. // while(true)
  272. // {
  273. // mlcm.handle();
  274. // }
  275. // return 0;
  276. //#endif
  277. if(mnType == procsm::ModeWrite)return -1; //listening.
  278. mpReadThread = new procsm_if_readthread(mpPSM,pCall,mstrsmname,this);
  279. // mpReadThread->setPriority(QThread::TimeCriticalPriority);
  280. // mpReadThread->start();
  281. mpReadThread->start(QThread::HighestPriority);
  282. #ifdef USE_GROUPUDP
  283. // mpthread = new std::thread(&procsm_if::ThreadGroupRecv,this);
  284. #endif
  285. // mnType = 1;
  286. return 0;
  287. }
  288. int procsm_if::listenmsg(ModuleFun xFun)
  289. {
  290. //#ifdef USELCM
  291. // mlcm.subscribe(mstrsmname,&procsm_if::handlerMethod,this);
  292. // while(true)
  293. // {
  294. // mlcm.handle();
  295. // }
  296. // return 0;
  297. //#endif
  298. if(mnType == procsm::ModeWrite)return -1; //listening.
  299. mpReadThread = new procsm_if_readthread(mpPSM,xFun,mstrsmname,this);
  300. // mpReadThread->setPriority(QThread::TimeCriticalPriority);
  301. // mpReadThread->start();
  302. mpReadThread->start(QThread::HighestPriority);
  303. #ifdef USE_GROUPUDP
  304. // mpthread = new std::thread(&procsm_if::ThreadGroupRecv,this);
  305. #endif
  306. // mnType = 1;
  307. return 0;
  308. }
  309. void procsm_if::stoplisten()
  310. {
  311. if(mnType != 1)return;
  312. mpReadThread->requestInterruption();
  313. while(!mpReadThread->isFinished());
  314. mnType = 0;
  315. // mpReadThread->deleteLater();
  316. qDebug("stop listen ok");
  317. }
  318. void procsm_if::pausecomm()
  319. {
  320. mbRun = false;
  321. if(mnType == procsm::ModeRead)
  322. {
  323. mpReadThread->puaseread();
  324. }
  325. }
  326. void procsm_if::continuecomm()
  327. {
  328. mbRun = true;
  329. if(mnType == procsm::ModeRead)
  330. {
  331. mpReadThread->continueread();
  332. }
  333. }
  334. #ifdef USE_GROUPUDP
  335. void procsm_if::InitSock()
  336. {
  337. std::string mcast_ip("236.236.100.100");
  338. uint16_t mcast_port = 8888;
  339. notify_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
  340. if (notify_fd_ == -1) {
  341. std::cout << "fail to create notify fd, " << strerror(errno);
  342. return ;
  343. }
  344. memset(&notify_addr_, 0, sizeof(notify_addr_));
  345. notify_addr_.sin_family = AF_INET;
  346. notify_addr_.sin_addr.s_addr = inet_addr(mcast_ip.c_str());
  347. notify_addr_.sin_port = htons(mcast_port);
  348. listen_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
  349. if (listen_fd_ == -1) {
  350. std::cout << "fail to create listen fd, " << strerror(errno);
  351. return ;
  352. }
  353. if (fcntl(listen_fd_, F_SETFL, O_NONBLOCK) == -1) {
  354. std::cout << "fail to set listen fd nonblock, " << strerror(errno);
  355. return ;
  356. }
  357. memset(&listen_addr_, 0, sizeof(listen_addr_));
  358. listen_addr_.sin_family = AF_INET;
  359. listen_addr_.sin_addr.s_addr = htonl(INADDR_ANY);
  360. listen_addr_.sin_port = htons(mcast_port);
  361. int yes = 1;
  362. if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
  363. std::cout << "fail to setsockopt SO_REUSEADDR, " << strerror(errno);
  364. return ;
  365. }
  366. if (bind(listen_fd_, (struct sockaddr*)&listen_addr_, sizeof(listen_addr_)) <
  367. 0) {
  368. std::cout << "fail to bind addr, " << strerror(errno);
  369. return ;
  370. }
  371. int loop = 1;
  372. if (setsockopt(listen_fd_, IPPROTO_IP, IP_MULTICAST_LOOP, &loop,
  373. sizeof(loop)) < 0) {
  374. std::cout << "fail to setsockopt IP_MULTICAST_LOOP, " << strerror(errno);
  375. return ;
  376. }
  377. struct ip_mreq mreq;
  378. mreq.imr_multiaddr.s_addr = inet_addr(mcast_ip.c_str());
  379. mreq.imr_interface.s_addr = htonl(INADDR_ANY);
  380. if (setsockopt(listen_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
  381. sizeof(mreq)) < 0) {
  382. std::cout << "fail to setsockopt IP_ADD_MEMBERSHIP, " << strerror(errno);
  383. return ;
  384. }
  385. return ;
  386. }
  387. /**
  388. * @brief procsm_if::WaitNotify
  389. * @param timeout_ms
  390. * @return if have notify return 1,otherwise,return 0
  391. */
  392. int procsm_if::WaitNotify(int timeout_ms)
  393. {
  394. int nrtn = 0;
  395. struct pollfd fds;
  396. fds.fd = listen_fd_;
  397. fds.events = POLLIN;
  398. int ready_num = poll(&fds, 1, timeout_ms);
  399. if (ready_num > 0) {
  400. char buf[32] = {0}; // larger than ReadableInfo::kSize
  401. ssize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);
  402. if (nbytes == -1) {
  403. std::cout << "fail to recvfrom, " << strerror(errno)<<std::endl;
  404. return -1;
  405. }
  406. if(nbytes == 8)
  407. {
  408. // std::cout<<"recv key."<<std::endl;
  409. qint64 * pkey = (qint64 *)buf;
  410. if(*pkey == mKey)
  411. {
  412. return 1;
  413. // mpReadThread->WakeRead();
  414. //Waaa;
  415. }
  416. else
  417. {
  418. // std::cout<<"key error"<<std::endl;
  419. }
  420. }
  421. // return info->DeserializeFrom(buf, nbytes);
  422. } else if (ready_num == 0) {
  423. return 0;
  424. } else {
  425. nrtn = -2;
  426. if (errno == EINTR) {
  427. std::cout << "poll was interrupted."<<std::endl;
  428. } else {
  429. std::cout << "fail to poll, " << strerror(errno);
  430. }
  431. }
  432. return nrtn;
  433. }
  434. void procsm_if::ThreadGroupRecv()
  435. {
  436. int timeout_ms = 100;
  437. while(mbGroupRecvRun)
  438. {
  439. // std::cout<<"poll"<<std::endl;
  440. struct pollfd fds;
  441. fds.fd = listen_fd_;
  442. fds.events = POLLIN;
  443. int ready_num = poll(&fds, 1, timeout_ms);
  444. if (ready_num > 0) {
  445. char buf[32] = {0}; // larger than ReadableInfo::kSize
  446. ssize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);
  447. if (nbytes == -1) {
  448. std::cout << "fail to recvfrom, " << strerror(errno);
  449. continue;
  450. }
  451. if(nbytes == 8)
  452. {
  453. // std::cout<<"recv key."<<std::endl;
  454. qint64 * pkey = (qint64 *)buf;
  455. if(*pkey == mKey)
  456. {
  457. mpReadThread->WakeRead();
  458. //Waaa;
  459. }
  460. else
  461. {
  462. // std::cout<<"key error"<<std::endl;
  463. }
  464. }
  465. // return info->DeserializeFrom(buf, nbytes);
  466. } else if (ready_num == 0) {
  467. // std::cout << "timeout, no readableinfo.";
  468. continue;
  469. } else {
  470. if (errno == EINTR) {
  471. std::cout << "poll was interrupted."<<std::endl;
  472. } else {
  473. std::cout << "fail to poll, " << strerror(errno);
  474. }
  475. }
  476. }
  477. }
  478. qint64 procsm_if::CalcKey(const char *strsmname)
  479. {
  480. qint64 xkey = 0;
  481. int i;
  482. int nlen = strnlen(strsmname,256);
  483. for(i=0;i<nlen;i++)
  484. {
  485. qint64 temp = *(strsmname+i);
  486. temp = temp<<(4+i);
  487. xkey = xkey + temp;
  488. if(i>=3)break;
  489. }
  490. for(i=0;i<nlen;i++)
  491. {
  492. qint64 temp = *(strsmname+nlen-i-1);
  493. temp = temp<<i;
  494. xkey = xkey + temp;
  495. if(i>=3)break;
  496. }
  497. for(i=0;i<(int)strnlen(strsmname,256);i++)
  498. {
  499. qint64 temp = *(strsmname+i);
  500. xkey = xkey + temp;
  501. }
  502. return xkey;
  503. }
  504. void procsm_if::SendGroupMsg()
  505. {
  506. // qint64 timenow = std::chrono::system_clock::now().time_since_epoch().count();
  507. ssize_t nbytes =
  508. sendto(notify_fd_, &mKey, sizeof(qint64), 0,
  509. (struct sockaddr*)&notify_addr_, sizeof(notify_addr_));
  510. if(nbytes <= 0)
  511. {
  512. std::cout<<"Send Fail."<<std::endl;
  513. }
  514. }
  515. #endif