procsm_if.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589
  1. #include "procsm_if.h"
  2. #include <QTimer>
  3. #include <iostream>
  4. procsm_if_readthread::procsm_if_readthread(procsm *pPSM,SMCallBack pCall,const char * strsmname,void * pparent)
  5. {
  6. mpparent = pparent;
  7. mpPSM = pPSM;
  8. mpCall = pCall;
  9. strncpy(mstrsmname,strsmname,255);
  10. #ifndef USE_GROUPUDP
  11. #ifdef USEDBUS
  12. bool bconnect = QDBusConnection::sessionBus().connect(QString(),"/catarc/adc", "adc.adciv.modulecomm", strsmname,this,SLOT(onNewMsg(int)));
  13. if(bconnect == false)
  14. {
  15. std::cout<<"procsm_if_readthread::procsm_if_readthread bconect is false"<<std::endl;
  16. }
  17. #endif
  18. #endif
  19. }
  20. procsm_if_readthread::procsm_if_readthread(procsm *pPSM,ModuleFun xFun,const char * strsmname,void * pparent)
  21. {
  22. mpparent = pparent;
  23. mpPSM = pPSM;
  24. mFun = xFun;
  25. strncpy(mstrsmname,strsmname,255);
  26. mbFunPlus = true;
  27. #ifndef USE_GROUPUDP
  28. #ifdef USEDBUS
  29. bool bconnect = QDBusConnection::sessionBus().connect(QString(),"/catarc/adc", "adc.adciv.modulecomm", strsmname,this,SLOT(onNewMsg(int)));
  30. if(bconnect == false)
  31. {
  32. std::cout<<"procsm_if_readthread::procsm_if_readthread bconect is false"<<std::endl;
  33. mbDBUSOK = false;
  34. QTimer * timer = new QTimer();
  35. timer->setTimerType(Qt::PreciseTimer);
  36. delete timer;
  37. }
  38. #endif
  39. #endif
  40. }
  41. #ifdef USELCM
  42. void procsm_if_readthread::handlerMethod(const lcm::ReceiveBuffer *rbuf, const std::string &channel)
  43. {
  44. qDebug("lcm receiv data. ");
  45. mxindex++;
  46. QDateTime dt = QDateTime::currentDateTime();
  47. if(mbFunPlus)
  48. {
  49. mFun((char *)rbuf->data,rbuf->data_size,mxindex,&dt,mstrsmname);
  50. }
  51. else
  52. {
  53. (*mpCall)((char *)rbuf->data,rbuf->data_size,mxindex,&dt,mstrsmname);
  54. }
  55. }
  56. #endif
  57. void procsm_if_readthread::puaseread()
  58. {
  59. mbRun = false;
  60. }
  61. void procsm_if_readthread::continueread()
  62. {
  63. mbRun = true;
  64. }
  65. void procsm_if_readthread::run()
  66. {
  67. #ifdef USELCM
  68. mlcm.subscribe(mstrsmname,&procsm_if_readthread::handlerMethod,this);
  69. while(!QThread::isInterruptionRequested())
  70. {
  71. mlcm.handle();
  72. }
  73. return;
  74. #endif
  75. QTime xTime;
  76. xTime.start();
  77. unsigned int nBufLen = 1;
  78. unsigned int nRead;
  79. char * str = new char[nBufLen];
  80. unsigned int index =0;
  81. QDateTime *pdt = new QDateTime();
  82. bool bAttach = false;
  83. while(!QThread::isInterruptionRequested())
  84. {
  85. if(mbRun == false)
  86. {
  87. msleep(10);
  88. continue;
  89. }
  90. if(bAttach == false)
  91. {
  92. bAttach = mpPSM->AttachMem();
  93. if(bAttach == false)
  94. {
  95. msleep(100);
  96. continue;
  97. }
  98. else
  99. {
  100. index = mpPSM->getcurrentnext();
  101. }
  102. }
  103. int nRtn = mpPSM->readmsg(index,str,nBufLen,&nRead,pdt);
  104. if(nRtn == 0)
  105. {
  106. #ifndef USE_GROUPUDP
  107. #ifdef USEDBUS
  108. if(mbDBUSOK == true)
  109. {
  110. mWaitMutex.lock();
  111. mwc.wait(&mWaitMutex,10);
  112. mWaitMutex.unlock();
  113. }
  114. else
  115. {
  116. msleep(1);
  117. }
  118. #else
  119. msleep(1);
  120. #endif
  121. #else
  122. procsm_if * mpif = (procsm_if * )mpparent;
  123. if(mpif->WaitNotify()<0)
  124. {
  125. std::cout<<"Wait Notify Error. Please Check. "<<std::endl;
  126. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  127. }
  128. // mWaitMutex.lock();
  129. // mwc.wait(&mWaitMutex,100);
  130. // mWaitMutex.unlock();
  131. #endif
  132. }
  133. else
  134. {
  135. if(nRtn == -1)
  136. {
  137. nBufLen = nRead;
  138. delete str;
  139. if(nBufLen < 1)nBufLen = 1;
  140. str = new char[nBufLen];
  141. // std::cout<<"modulecomm resize str to "<<nBufLen<<std::endl;
  142. }
  143. else
  144. {
  145. if((nRtn == -2) ||(nRtn == -100)||(nRtn == -101))
  146. {
  147. index = mpPSM->getcurrentnext();
  148. if(nRtn == -100) //Because New ShareMemory, read this.
  149. {
  150. if(index > 0)index = index -1;
  151. }
  152. }
  153. else
  154. {
  155. if(nRtn >0)
  156. {
  157. if(mbFunPlus)
  158. {
  159. mFun(str,nRtn,index,pdt,mstrsmname);
  160. }
  161. else
  162. {
  163. (*mpCall)(str,nRtn,index,pdt,mstrsmname);
  164. }
  165. index++;
  166. }
  167. else
  168. {
  169. usleep(100);
  170. }
  171. }
  172. }
  173. }
  174. }
  175. delete str;
  176. delete pdt;
  177. // qDebug("Thread finish.");
  178. }
  179. #ifdef USE_GROUPUDP
  180. void procsm_if_readthread::WakeRead()
  181. {
  182. mwc.wakeAll();
  183. }
  184. #endif
  185. #ifndef USE_GROUPUDP
  186. #ifdef USEDBUS
  187. void procsm_if_readthread::onNewMsg(int x)
  188. {
  189. if(x == 100)std::cout<<x<<std::endl;
  190. mwc.wakeAll();
  191. // qDebug("wake");
  192. }
  193. #endif
  194. #endif
  195. procsm_if::procsm_if(const char * strsmname,const unsigned int nBufSize,const unsigned int nMaxPacCount,const int nMode)
  196. {
  197. strncpy(mstrsmname,strsmname,255);
  198. #ifdef USELCM
  199. if(nMode == procsm::ModeWrite)
  200. {
  201. }
  202. else
  203. {
  204. }
  205. return;
  206. #endif
  207. mpPSM = new procsm(strsmname,nBufSize,nMaxPacCount,nMode);
  208. mnType = nMode;
  209. mTimer.setTimerType(Qt::PreciseTimer);
  210. #ifdef USE_GROUPUDP
  211. InitSock();
  212. mKey = CalcKey(mstrsmname);
  213. #endif
  214. }
  215. procsm_if::~procsm_if()
  216. {
  217. if(mnType == procsm::ModeRead)
  218. {
  219. #ifdef USE_GROUPUDP
  220. mbGroupRecvRun = false;
  221. mpthread->join();
  222. #endif
  223. QTime xTime;
  224. xTime.start();
  225. bool bDel = true;
  226. mpReadThread->requestInterruption();
  227. while(!mpReadThread->isFinished())
  228. {
  229. if(xTime.elapsed() > 1000)
  230. {
  231. bDel = false;
  232. std::cout<<"procsm_if Thread Read not finish."<<std::endl;
  233. break;
  234. }
  235. }
  236. if(bDel)delete mpReadThread;
  237. }
  238. else
  239. {
  240. }
  241. delete mpPSM;
  242. }
  243. int procsm_if::writemsg(const char *str, const unsigned int nSize)
  244. {
  245. if(mbRun == false)return -2;
  246. #ifdef USELCM
  247. int nres = mlcm.publish(mstrsmname,str,nSize);
  248. qDebug("publish message. res = %d",nres);
  249. return 0;
  250. #endif
  251. if(mnType == procsm::ModeRead)return -1; //this is listen.
  252. int nRes = mpPSM->writemsg(str,nSize);
  253. #ifdef USE_GROUPUDP
  254. SendGroupMsg();
  255. #endif
  256. return nRes;
  257. }
  258. #ifdef USELCM
  259. void procsm_if::handlerMethod(const lcm::ReceiveBuffer *rbuf, const std::string &channel)
  260. {
  261. qDebug("receiv data. ");
  262. }
  263. #endif
  264. int procsm_if::listenmsg(SMCallBack pCall)
  265. {
  266. //#ifdef USELCM
  267. //// mlcm.subscribe(mstrsmname,&handlerMethod2);
  268. // mlcm.subscribe(mstrsmname,&procsm_if::handlerMethod,this);
  269. // while(true)
  270. // {
  271. // mlcm.handle();
  272. // }
  273. // return 0;
  274. //#endif
  275. if(mnType == procsm::ModeWrite)return -1; //listening.
  276. mpReadThread = new procsm_if_readthread(mpPSM,pCall,mstrsmname,this);
  277. // mpReadThread->setPriority(QThread::TimeCriticalPriority);
  278. // mpReadThread->start();
  279. mpReadThread->start(QThread::HighestPriority);
  280. #ifdef USE_GROUPUDP
  281. // mpthread = new std::thread(&procsm_if::ThreadGroupRecv,this);
  282. #endif
  283. // mnType = 1;
  284. return 0;
  285. }
  286. int procsm_if::listenmsg(ModuleFun xFun)
  287. {
  288. //#ifdef USELCM
  289. // mlcm.subscribe(mstrsmname,&procsm_if::handlerMethod,this);
  290. // while(true)
  291. // {
  292. // mlcm.handle();
  293. // }
  294. // return 0;
  295. //#endif
  296. if(mnType == procsm::ModeWrite)return -1; //listening.
  297. mpReadThread = new procsm_if_readthread(mpPSM,xFun,mstrsmname,this);
  298. // mpReadThread->setPriority(QThread::TimeCriticalPriority);
  299. // mpReadThread->start();
  300. mpReadThread->start(QThread::HighestPriority);
  301. #ifdef USE_GROUPUDP
  302. // mpthread = new std::thread(&procsm_if::ThreadGroupRecv,this);
  303. #endif
  304. // mnType = 1;
  305. return 0;
  306. }
  307. void procsm_if::stoplisten()
  308. {
  309. if(mnType != 1)return;
  310. mpReadThread->requestInterruption();
  311. while(!mpReadThread->isFinished());
  312. mnType = 0;
  313. // mpReadThread->deleteLater();
  314. qDebug("stop listen ok");
  315. }
  316. void procsm_if::pausecomm()
  317. {
  318. mbRun = false;
  319. if(mnType == procsm::ModeRead)
  320. {
  321. mpReadThread->puaseread();
  322. }
  323. }
  324. void procsm_if::continuecomm()
  325. {
  326. mbRun = true;
  327. if(mnType == procsm::ModeRead)
  328. {
  329. mpReadThread->continueread();
  330. }
  331. }
  332. #ifdef USE_GROUPUDP
  333. void procsm_if::InitSock()
  334. {
  335. std::string mcast_ip("236.236.100.100");
  336. uint16_t mcast_port = 8888;
  337. notify_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
  338. if (notify_fd_ == -1) {
  339. std::cout << "fail to create notify fd, " << strerror(errno);
  340. return ;
  341. }
  342. memset(&notify_addr_, 0, sizeof(notify_addr_));
  343. notify_addr_.sin_family = AF_INET;
  344. notify_addr_.sin_addr.s_addr = inet_addr(mcast_ip.c_str());
  345. notify_addr_.sin_port = htons(mcast_port);
  346. listen_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
  347. if (listen_fd_ == -1) {
  348. std::cout << "fail to create listen fd, " << strerror(errno);
  349. return ;
  350. }
  351. if (fcntl(listen_fd_, F_SETFL, O_NONBLOCK) == -1) {
  352. std::cout << "fail to set listen fd nonblock, " << strerror(errno);
  353. return ;
  354. }
  355. memset(&listen_addr_, 0, sizeof(listen_addr_));
  356. listen_addr_.sin_family = AF_INET;
  357. listen_addr_.sin_addr.s_addr = htonl(INADDR_ANY);
  358. listen_addr_.sin_port = htons(mcast_port);
  359. int yes = 1;
  360. if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
  361. std::cout << "fail to setsockopt SO_REUSEADDR, " << strerror(errno);
  362. return ;
  363. }
  364. if (bind(listen_fd_, (struct sockaddr*)&listen_addr_, sizeof(listen_addr_)) <
  365. 0) {
  366. std::cout << "fail to bind addr, " << strerror(errno);
  367. return ;
  368. }
  369. int loop = 1;
  370. if (setsockopt(listen_fd_, IPPROTO_IP, IP_MULTICAST_LOOP, &loop,
  371. sizeof(loop)) < 0) {
  372. std::cout << "fail to setsockopt IP_MULTICAST_LOOP, " << strerror(errno);
  373. return ;
  374. }
  375. struct ip_mreq mreq;
  376. mreq.imr_multiaddr.s_addr = inet_addr(mcast_ip.c_str());
  377. mreq.imr_interface.s_addr = htonl(INADDR_ANY);
  378. if (setsockopt(listen_fd_, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
  379. sizeof(mreq)) < 0) {
  380. std::cout << "fail to setsockopt IP_ADD_MEMBERSHIP, " << strerror(errno);
  381. return ;
  382. }
  383. return ;
  384. }
  385. /**
  386. * @brief procsm_if::WaitNotify
  387. * @param timeout_ms
  388. * @return if have notify return 1,otherwise,return 0
  389. */
  390. int procsm_if::WaitNotify(int timeout_ms)
  391. {
  392. int nrtn = 0;
  393. struct pollfd fds;
  394. fds.fd = listen_fd_;
  395. fds.events = POLLIN;
  396. int ready_num = poll(&fds, 1, timeout_ms);
  397. if (ready_num > 0) {
  398. char buf[32] = {0}; // larger than ReadableInfo::kSize
  399. ssize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);
  400. if (nbytes == -1) {
  401. std::cout << "fail to recvfrom, " << strerror(errno);
  402. return -1;
  403. }
  404. if(nbytes == 8)
  405. {
  406. // std::cout<<"recv key."<<std::endl;
  407. qint64 * pkey = (qint64 *)buf;
  408. if(*pkey == mKey)
  409. {
  410. return 1;
  411. // mpReadThread->WakeRead();
  412. //Waaa;
  413. }
  414. else
  415. {
  416. // std::cout<<"key error"<<std::endl;
  417. }
  418. }
  419. // return info->DeserializeFrom(buf, nbytes);
  420. } else if (ready_num == 0) {
  421. return 0;
  422. } else {
  423. nrtn = -2;
  424. if (errno == EINTR) {
  425. std::cout << "poll was interrupted."<<std::endl;
  426. } else {
  427. std::cout << "fail to poll, " << strerror(errno);
  428. }
  429. }
  430. return nrtn;
  431. }
  432. void procsm_if::ThreadGroupRecv()
  433. {
  434. int timeout_ms = 100;
  435. while(mbGroupRecvRun)
  436. {
  437. // std::cout<<"poll"<<std::endl;
  438. struct pollfd fds;
  439. fds.fd = listen_fd_;
  440. fds.events = POLLIN;
  441. int ready_num = poll(&fds, 1, timeout_ms);
  442. if (ready_num > 0) {
  443. char buf[32] = {0}; // larger than ReadableInfo::kSize
  444. ssize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);
  445. if (nbytes == -1) {
  446. std::cout << "fail to recvfrom, " << strerror(errno);
  447. continue;
  448. }
  449. if(nbytes == 8)
  450. {
  451. // std::cout<<"recv key."<<std::endl;
  452. qint64 * pkey = (qint64 *)buf;
  453. if(*pkey == mKey)
  454. {
  455. mpReadThread->WakeRead();
  456. //Waaa;
  457. }
  458. else
  459. {
  460. // std::cout<<"key error"<<std::endl;
  461. }
  462. }
  463. // return info->DeserializeFrom(buf, nbytes);
  464. } else if (ready_num == 0) {
  465. // std::cout << "timeout, no readableinfo.";
  466. continue;
  467. } else {
  468. if (errno == EINTR) {
  469. std::cout << "poll was interrupted."<<std::endl;
  470. } else {
  471. std::cout << "fail to poll, " << strerror(errno);
  472. }
  473. }
  474. }
  475. }
  476. qint64 procsm_if::CalcKey(const char *strsmname)
  477. {
  478. qint64 xkey = 0;
  479. unsigned int i;
  480. int nlen = strnlen(strsmname,256);
  481. for(i=0;i<nlen;i++)
  482. {
  483. qint64 temp = *(strsmname+i);
  484. temp = temp<<(4+i);
  485. xkey = xkey + temp;
  486. if(i>=3)break;
  487. }
  488. for(i=0;i<nlen;i++)
  489. {
  490. qint64 temp = *(strsmname+nlen-i-1);
  491. temp = temp<<i;
  492. xkey = xkey + temp;
  493. if(i>=3)break;
  494. }
  495. for(i=0;i<strnlen(strsmname,256);i++)
  496. {
  497. qint64 temp = *(strsmname+i);
  498. xkey = xkey + temp;
  499. }
  500. return xkey;
  501. }
  502. void procsm_if::SendGroupMsg()
  503. {
  504. // qint64 timenow = std::chrono::system_clock::now().time_since_epoch().count();
  505. ssize_t nbytes =
  506. sendto(notify_fd_, &mKey, sizeof(qint64), 0,
  507. (struct sockaddr*)&notify_addr_, sizeof(notify_addr_));
  508. if(nbytes <= 0)
  509. {
  510. std::cout<<"Send Fail."<<std::endl;
  511. }
  512. }
  513. #endif