carmakerwork.cpp 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. #include "carmakerwork.h"
  2. using grpc::Channel;
  3. using grpc::ClientContext;
  4. using grpc::Status;
  5. carmakerwork::carmakerwork(std::string strserver)
  6. {
  7. mstrserver = strserver;
  8. mbRun = true;
  9. mpthread = new std::thread(&carmakerwork::threadwork,this);
  10. }
  11. carmakerwork::~carmakerwork()
  12. {
  13. mbRun = false;
  14. mpthread->join();
  15. }
  16. void carmakerwork::threadwork()
  17. {
  18. auto cargs = grpc::ChannelArguments();
  19. cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
  20. cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
  21. std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
  22. mstrserver, grpc::InsecureChannelCredentials(),cargs);
  23. std::unique_ptr<iv::ipg::Carmaker::Stub> stub_ = iv::ipg::Carmaker::NewStub(channel);
  24. gpr_timespec timespec;
  25. timespec.tv_sec = 10;//设置阻塞时间为2秒
  26. timespec.tv_nsec = 0;
  27. timespec.clock_type = GPR_TIMESPAN;
  28. int64_t interval = 100;
  29. int64_t lastreq = 0;
  30. int nRes = 0;
  31. std::shared_ptr<char> pout_ptr = nullptr;
  32. int64_t nWorkID;
  33. int nOutSize;
  34. int nnowork = 0;
  35. while(mbRun)
  36. {
  37. if(nRes == 0)
  38. {
  39. int64_t nnow = std::chrono::system_clock::now().time_since_epoch().count()/1000000;
  40. int64_t diff = static_cast<int64_t>(abs(nnow - lastreq)) ;
  41. if(diff<interval)
  42. {
  43. std::this_thread::sleep_for(std::chrono::milliseconds(interval-diff));
  44. }
  45. }
  46. iv::ipg::workReq request;
  47. // Container for the data we expect from the server.
  48. iv::ipg::workReply reply;
  49. request.set_mnres(nRes);
  50. if(nRes == 1)
  51. {
  52. if(pout_ptr != nullptr)
  53. {
  54. request.set_workid(nWorkID);
  55. request.set_xdata(pout_ptr.get(),static_cast<size_t>(nOutSize));
  56. }
  57. else
  58. {
  59. std::cout<<" carmaker work. nres is 1, but no data. please check."<<std::endl;
  60. }
  61. }
  62. nRes = 0; //Set res to Default;
  63. ClientContext context ;
  64. context.set_deadline(timespec);
  65. lastreq = std::chrono::system_clock::now().time_since_epoch().count()/1000000;
  66. Status status = stub_->convertwork(&context, request, &reply);
  67. if (status.ok()) {
  68. if(reply.mnres() == 1)
  69. {
  70. nnowork = 0;
  71. nWorkID = reply.workid();
  72. std::shared_ptr<char> pinput_ptr = nullptr;
  73. int ninputsize = static_cast<int>(reply.data_input().size()) ;
  74. pinput_ptr = std::shared_ptr<char>(new char[ninputsize]);
  75. memcpy(pinput_ptr.get(),reply.data_input().data(),ninputsize);
  76. nRes = ExecCarmakerWork(nWorkID,reply.strinputname(),reply.stroutputname(),reply.strcmd(),
  77. pinput_ptr,ninputsize,pout_ptr,nOutSize);
  78. }
  79. if(reply.mnres() == 0)
  80. {
  81. nnowork++;
  82. if(nnowork>=30)
  83. {
  84. nnowork = 0;
  85. std::cout<<" no work."<<std::endl;
  86. }
  87. nRes = 0;
  88. }
  89. } else {
  90. std::cout << status.error_code() << ": " << status.error_message()
  91. << std::endl;
  92. std::cout<<"RPC failed"<<std::endl;
  93. std::this_thread::sleep_for(std::chrono::milliseconds(3000));
  94. }
  95. }
  96. }