grpcclientthread.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. #include "grpcclientthread.h"
  2. grpcclientthread * ggrpcclient;
  3. #ifdef Android
  4. #include "adcintelligentshow.h"
  5. extern ADCIntelligentShow * gAShow;
  6. #endif
  7. void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
  8. {
  9. ggrpcclient->UpdateData(strdata,nSize,strmemname);
  10. // std::cout<<"name is "<<strmemname<<std::endl;
  11. }
  12. grpcclientthread::grpcclientthread()
  13. {
  14. ggrpcclient = this;
  15. }
  16. void grpcclientthread::dec_yaml(const char * stryamlpath)
  17. {
  18. YAML::Node config;
  19. try
  20. {
  21. config = YAML::LoadFile(stryamlpath);
  22. }
  23. catch(YAML::BadFile e)
  24. {
  25. qDebug("load error.");
  26. return;
  27. }
  28. std::vector<std::string> vecmodulename;
  29. if(config["server"])
  30. {
  31. mstrserverip = config["server"].as<std::string>();
  32. }
  33. if(config["port"])
  34. {
  35. mstrserverport = config["port"].as<std::string>();
  36. }
  37. if(config["uploadinterval"])
  38. {
  39. mstrqueryinterval = config["uploadinterval"].as<std::string>();
  40. }
  41. std::string strmsgname;
  42. if(config["querymessage"])
  43. {
  44. for(YAML::const_iterator it= config["querymessage"].begin(); it != config["querymessage"].end();++it)
  45. {
  46. std::string strtitle = it->first.as<std::string>();
  47. std::cout<<strtitle<<std::endl;
  48. if(config["querymessage"][strtitle]["msgname"]&&config["querymessage"][strtitle]["buffersize"]&&config["querymessage"][strtitle]["buffercount"])
  49. {
  50. iv::rpcmsgunit xmu;
  51. strmsgname = config["querymessage"][strtitle]["msgname"].as<std::string>();
  52. strncpy(xmu.mstrmsgname,strmsgname.data(),255);
  53. xmu.mnBufferSize = config["querymessage"][strtitle]["buffersize"].as<int>();
  54. xmu.mnBufferCount = config["querymessage"][strtitle]["buffercount"].as<int>();
  55. mvectorquerymsgunit.push_back(xmu);
  56. }
  57. }
  58. }
  59. else
  60. {
  61. }
  62. if(config["ctrlmessage"])
  63. {
  64. std::string strnodename = "ctrlmessage";
  65. for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
  66. {
  67. std::string strtitle = it->first.as<std::string>();
  68. std::cout<<strtitle<<std::endl;
  69. if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
  70. {
  71. iv::rpcmsgunit xmu;
  72. strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
  73. strncpy(xmu.mstrmsgname,strmsgname.data(),255);
  74. xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
  75. xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
  76. mvectorctrlmsgunit.push_back(xmu);
  77. }
  78. }
  79. }
  80. else
  81. {
  82. }
  83. return;
  84. }
  85. void grpcclientthread::addquerymsgunit(std::string strquerymsg, int nbuffsize, int nbuffcount)
  86. {
  87. int i;
  88. for(i=0;i<mvectorquerymsgunit.size();i++)
  89. {
  90. if(strncmp(mvectorquerymsgunit[i].mstrmsgname,strquerymsg.data(),255)==0)
  91. {
  92. std::cout<<"grpcclientthread::addquerymsgunit msg "<<strquerymsg<<" is exist."<<std::endl;
  93. return;
  94. }
  95. }
  96. iv::rpcmsgunit xmu;
  97. strncpy(xmu.mstrmsgname,strquerymsg.data(),255);
  98. xmu.mnBufferSize = nbuffsize;
  99. xmu.mnBufferCount = nbuffcount;
  100. mvectorquerymsgunit.push_back(xmu);
  101. }
  102. void grpcclientthread::addctrlmsgunit(std::string strctrlmsg, int nbuffsize, int nbuffcount)
  103. {
  104. int i;
  105. for(i=0;i<mvectorctrlmsgunit.size();i++)
  106. {
  107. if(strncmp(mvectorctrlmsgunit[i].mstrmsgname,strctrlmsg.data(),255)==0)
  108. {
  109. std::cout<<"grpcclientthread::addquerymsgunit msg "<<strctrlmsg<<" is exist."<<std::endl;
  110. return;
  111. }
  112. }
  113. iv::rpcmsgunit xmu;
  114. strncpy(xmu.mstrmsgname,strctrlmsg.data(),255);
  115. xmu.mnBufferSize = nbuffsize;
  116. xmu.mnBufferCount = nbuffcount;
  117. mvectorctrlmsgunit.push_back(xmu);
  118. }
  119. void grpcclientthread::startlisten()
  120. {
  121. int i;
  122. for(i=0;i<mvectorctrlmsgunit.size();i++)
  123. {
  124. mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectorctrlmsgunit[i].mstrmsgname,ListenData);
  125. }
  126. for(i=0;i<mvectorquerymsgunit.size();i++)
  127. {
  128. mvectorquerymsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectorquerymsgunit[i].mstrmsgname,mvectorquerymsgunit[i].mnBufferSize,
  129. mvectorquerymsgunit[i].mnBufferCount);
  130. }
  131. }
  132. void grpcclientthread::UpdateData(const char *strdata, const unsigned int nSize, const char *strmemname)
  133. {
  134. int nsize = mvectorctrlmsgunit.size();
  135. int i;
  136. for(i=0;i<nsize;i++)
  137. {
  138. if(strncmp(strmemname,mvectorctrlmsgunit[i].mstrmsgname,255) == 0)
  139. {
  140. mMutexMsg.lock();
  141. char * strtem = new char[nSize];
  142. memcpy(strtem,strdata,nSize);
  143. mvectorctrlmsgunit[i].mpstrmsgdata.reset(strtem);
  144. mvectorctrlmsgunit[i].mndatasize = nSize;
  145. mvectorctrlmsgunit[i].mbRefresh = true;
  146. mMutexMsg.unlock();
  147. break;
  148. }
  149. }
  150. }
  151. void grpcclientthread::run()
  152. {
  153. int nsize = mvectorquerymsgunit.size();
  154. int i;
  155. int ninterval = atoi(mstrqueryinterval.data());
  156. if(ninterval<=0)ninterval = 10;
  157. QTime xTime;
  158. xTime.start();
  159. int nlastsend = xTime.elapsed();
  160. std::string target_str = mstrserverip+":";
  161. target_str = target_str + mstrserverport ;//std::to_string()
  162. auto cargs = grpc::ChannelArguments();
  163. cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
  164. cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
  165. std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
  166. target_str, grpc::InsecureChannelCredentials(),cargs);
  167. std::unique_ptr<iv::ivgrpc::Stub> stub_ = iv::ivgrpc::NewStub(channel);
  168. int nid = 0;
  169. // Container for the data we expect from the server.
  170. gpr_timespec timespec;
  171. timespec.tv_sec = 3;//设置阻塞时间为2秒
  172. timespec.tv_nsec = 0;
  173. timespec.clock_type = GPR_TIMESPAN;
  174. int ndataindex = 0;
  175. // ClientContext context;
  176. int nctrlindex = 0;
  177. int nctrlnid = 0;
  178. while(!QThread::isInterruptionRequested())
  179. {
  180. iv::queryreq request;
  181. iv::queryReply reply;
  182. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  183. if((xTime.elapsed()-nlastsend)<ninterval)
  184. {
  185. continue;
  186. }
  187. request.set_nindex(ndataindex);
  188. request.set_strdevicename(mstrdevname);
  189. for(i=0;i<nsize;i++)request.add_strmsgname();
  190. for(i=0;i<nsize;i++)
  191. {
  192. request.set_strmsgname(i,mvectorquerymsgunit[i].mstrmsgname);
  193. }
  194. // std::cout<<"size is "<<nsize<<" msg size is "<<request.strmsgname_size()<<std::endl;
  195. ClientContext context ;
  196. context.set_deadline(timespec);
  197. Status status = stub_->query(&context, request, &reply);
  198. if (status.ok()) {
  199. // std::cout<<nid<<" upload successfully"<<std::endl;
  200. ndataindex = reply.nlastindex();
  201. for(i=0;i<reply.msg_size();i++)
  202. {
  203. sharequerymsg(&reply.msg(i));
  204. // int j;
  205. // for(j=0;j<reply.msg_size();j++)
  206. // {
  207. // sharequerymsg(&reply.msg(j));
  208. // }
  209. }
  210. } else {
  211. #ifndef Android
  212. std::cout << status.error_code() << ": " << status.error_message()
  213. << std::endl;
  214. std::cout<<"RPC failed"<<std::endl;
  215. #endif
  216. if(status.error_code() == 4)
  217. {
  218. #ifndef Android
  219. std::cout<<" RPC Exceed Time, Create New stub_"<<std::endl;
  220. #endif
  221. channel = grpc::CreateCustomChannel(
  222. target_str, grpc::InsecureChannelCredentials(),cargs);
  223. stub_ = iv::ivgrpc::NewStub(channel);
  224. }
  225. std::this_thread::sleep_for(std::chrono::milliseconds(900));
  226. }
  227. nlastsend = xTime.elapsed();
  228. iv::ctrlreq ctrlrequest;
  229. iv::ctrlReply ctrlreply;
  230. mMutexMsg.lock();
  231. for(i=0;i<mvectorctrlmsgunit.size();i++)
  232. {
  233. if(mvectorctrlmsgunit[i].mbRefresh)
  234. {
  235. mvectorctrlmsgunit[i].mbRefresh = false;
  236. iv::ModuleMsg xctrl;
  237. xctrl.set_msgname(mvectorctrlmsgunit[i].mstrmsgname);
  238. xctrl.set_index(nctrlindex);
  239. nctrlindex++;
  240. xctrl.set_xdata(mvectorctrlmsgunit[i].mpstrmsgdata.get(),mvectorctrlmsgunit[i].mndatasize);
  241. xctrl.set_nlen(mvectorctrlmsgunit[i].mndatasize);
  242. iv::ModuleMsg * pmsg = ctrlrequest.add_msg();
  243. pmsg->CopyFrom(xctrl);
  244. }
  245. }
  246. mMutexMsg.unlock();
  247. if(ctrlrequest.msg_size()>0)
  248. {
  249. ctrlrequest.set_nid(nctrlnid);
  250. nctrlnid++;
  251. ctrlrequest.set_strdevicename(mstrdevname);
  252. ClientContext contextctrl ;
  253. contextctrl.set_deadline(timespec);
  254. Status status2 = stub_->ctrl(&contextctrl, ctrlrequest, &ctrlreply);
  255. if (status2.ok()) {
  256. // std::cout<<nid<<" upload successfully"<<std::endl;
  257. } else {
  258. std::cout << status2.error_code() << ": " << status2.error_message()
  259. << std::endl;
  260. std::cout<<"RPC failed"<<std::endl;
  261. if(status2.error_code() == 4)
  262. {
  263. std::cout<<" RPC Exceed Time, Create New stub_"<<std::endl;
  264. channel = grpc::CreateCustomChannel(
  265. target_str, grpc::InsecureChannelCredentials(),cargs);
  266. stub_ = iv::ivgrpc::NewStub(channel);
  267. }
  268. std::this_thread::sleep_for(std::chrono::milliseconds(900));
  269. }
  270. }
  271. }
  272. }
  273. void grpcclientthread::sharequerymsg(const iv::ModuleMsg *pxmsg)
  274. {
  275. int i;
  276. // std::cout<<"msg is "<<pxmsg->msgname()<<std::endl;
  277. int nsize = mvectorquerymsgunit.size();
  278. for(i=0;i<nsize;i++)
  279. {
  280. if(strncmp(pxmsg->msgname().data(),mvectorquerymsgunit[i].mstrmsgname,256) == 0)
  281. {
  282. // std::cout<<"msg is "<<pxmsg->msgname()<<" size is: "<<pxmsg->xdata().size()<<std::endl;
  283. if(strncmp(pxmsg->msgname().data(),"tracemap",256) == 0)
  284. {
  285. std::cout<<"trace map size is "<<pxmsg->xdata().size()<<std::endl;
  286. }
  287. #ifndef Android
  288. iv::modulecomm::ModuleSendMsg(mvectorquerymsgunit[i].mpa,pxmsg->xdata().data(),pxmsg->xdata().size());
  289. #else
  290. gAShow->AndroidSetMsg(pxmsg->msgname().data(),pxmsg->xdata().data(),pxmsg->xdata().size());
  291. #endif
  292. break;
  293. }
  294. }
  295. }
  296. void grpcclientthread::setdevname(std::string strdevname)
  297. {
  298. mstrdevname = strdevname;
  299. }
  300. void grpcclientthread::setserverip(std::string strip)
  301. {
  302. mstrserverip = strip;
  303. }
  304. void grpcclientthread::setserverport(std::string strport)
  305. {
  306. mstrserverport = strport;
  307. }
  308. void grpcclientthread::setqueryinterval(std::string strinterval)
  309. {
  310. mstrqueryinterval = strinterval;
  311. }