123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- #include "carmakerwork.h"
- using grpc::Channel;
- using grpc::ClientContext;
- using grpc::Status;
- carmakerwork::carmakerwork(std::string strserver)
- {
- mstrserver = strserver;
- mbRun = true;
- mpthread = new std::thread(&carmakerwork::threadwork,this);
- }
- carmakerwork::~carmakerwork()
- {
- mbRun = false;
- mpthread->join();
- }
- void carmakerwork::threadwork()
- {
- auto cargs = grpc::ChannelArguments();
- cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
- cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
- std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
- mstrserver, grpc::InsecureChannelCredentials(),cargs);
- std::unique_ptr<iv::ipg::Carmaker::Stub> stub_ = iv::ipg::Carmaker::NewStub(channel);
- gpr_timespec timespec;
- timespec.tv_sec = 10;//设置阻塞时间为2秒
- timespec.tv_nsec = 0;
- timespec.clock_type = GPR_TIMESPAN;
- int64_t interval = 100;
- int64_t lastreq = 0;
- int nRes = 0;
- std::shared_ptr<char> pout_ptr = nullptr;
- int64_t nWorkID;
- int nOutSize;
- int nnowork = 0;
- while(mbRun)
- {
- if(nRes == 0)
- {
- int64_t nnow = std::chrono::system_clock::now().time_since_epoch().count()/1000000;
- int64_t diff = static_cast<int64_t>(abs(nnow - lastreq)) ;
- if(diff<interval)
- {
- std::this_thread::sleep_for(std::chrono::milliseconds(interval-diff));
- }
- }
- iv::ipg::workReq request;
- // Container for the data we expect from the server.
- iv::ipg::workReply reply;
- request.set_mnres(nRes);
- if(nRes == 1)
- {
- if(pout_ptr != nullptr)
- {
- request.set_workid(nWorkID);
- request.set_xdata(pout_ptr.get(),static_cast<size_t>(nOutSize));
- }
- else
- {
- std::cout<<" carmaker work. nres is 1, but no data. please check."<<std::endl;
- }
- }
- nRes = 0; //Set res to Default;
- ClientContext context ;
- context.set_deadline(timespec);
- lastreq = std::chrono::system_clock::now().time_since_epoch().count()/1000000;
- Status status = stub_->convertwork(&context, request, &reply);
- if (status.ok()) {
- if(reply.mnres() == 1)
- {
- nnowork = 0;
- nWorkID = reply.workid();
- std::shared_ptr<char> pinput_ptr = nullptr;
- int ninputsize = static_cast<int>(reply.data_input().size()) ;
- pinput_ptr = std::shared_ptr<char>(new char[ninputsize]);
- memcpy(pinput_ptr.get(),reply.data_input().data(),ninputsize);
- nRes = ExecCarmakerWork(nWorkID,reply.strinputname(),reply.stroutputname(),reply.strcmd(),
- pinput_ptr,ninputsize,pout_ptr,nOutSize);
- }
- if(reply.mnres() == 0)
- {
- nnowork++;
- if(nnowork>=30)
- {
- nnowork = 0;
- std::cout<<" no work."<<std::endl;
- }
- nRes = 0;
- }
- } else {
- std::cout << status.error_code() << ": " << status.error_message()
- << std::endl;
- std::cout<<"RPC failed"<<std::endl;
- std::this_thread::sleep_for(std::chrono::milliseconds(3000));
- }
- }
- }
|