#include #include #include #include #include "cumsgbuffer.h" #include "pcmsgbuffer.h" #include #include #include #include #include #include #include "uploadstreammsg.grpc.pb.h" using grpc::Server; using grpc::ServerBuilder; using grpc::ServerContext; using grpc::Status; #include #include static cumsgbuffer gcumsgbuf; static pcmsgbuffer gpcmsgbuf; void uploadsend(::grpc::ServerReaderWriter* stream,bool * pbrun, std::string * pstrvin,std::string * pstrmd5,bool *pbUpdatemd4orvin,QMutex * pmutex,int * preqid,qint64 * pnsendtime,qint64 * recvtime,QMutex * pmutexidtime) { std::string strvin; std::string strmd5; pmutex->lock(); strvin = *pstrmd5; strmd5 = *pstrvin; pmutex->unlock(); QTime xTime; xTime.start(); int nlastsend = xTime.elapsed(); while(*pbrun) { if(*pbUpdatemd4orvin) { pmutex->lock(); strvin = *pstrvin; strmd5 = *pstrmd5; *pbUpdatemd4orvin = false; pmutex->unlock(); } int id; qint64 ntime; float framerate; std::vector xvectorctrldata; int nres = gpcmsgbuf.getmsg(strvin,strmd5,id,ntime,&xvectorctrldata,&framerate); iv::UploadReplyStream reply; pmutexidtime->lock(); reply.set_nreqsendtime(*pnsendtime); reply.set_nreqid(*preqid); reply.set_npausetime(QDateTime::currentMSecsSinceEpoch() - *recvtime); pmutexidtime->unlock(); if(nres == 1) { reply.set_nres(nres); reply.set_xdata(xvectorctrldata.data(),xvectorctrldata.size()); reply.set_framerate(framerate); stream->Write(reply); nlastsend = xTime.elapsed(); } else { if(abs(xTime.elapsed() - nlastsend)>1000) { reply.set_nres(nres); reply.set_framerate(0); stream->Write(reply); nlastsend = xTime.elapsed(); } } std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } qint64 calcserverlatency(std::vector & xvectorserverlatency,qint64 nlatency) { xvectorserverlatency.push_back(nlatency); while(xvectorserverlatency.size()>5)xvectorserverlatency.erase(xvectorserverlatency.begin()); if(xvectorserverlatency.size()<1)return 0; int nsize = xvectorserverlatency.size(); int i; qint64 nrtn = 0; for(i=0;i* stream,bool * pbrun, std::string * pstrvin,std::string * pstrmd5,bool *pbUpdatemd4orvin,QMutex * pmutex,qint64 * pnserverlatency) { std::string strvin; std::string strmd5; pmutex->lock(); strvin = *pstrmd5; strmd5 = *pstrvin; pmutex->unlock(); QTime xTime; xTime.start(); int nlastsend = xTime.elapsed(); qint64 nlastdatatime = 0; qint64 nculatency; while(*pbrun) { if(*pbUpdatemd4orvin) { pmutex->lock(); strvin = *pstrvin; strmd5 = *pstrmd5; *pbUpdatemd4orvin = false; pmutex->unlock(); } int id; qint64 ntime; std::vector xvectorquerydata; int nres = gcumsgbuf.getmsg(strvin,strmd5,nlastdatatime,id,ntime,&xvectorquerydata,&nculatency); nlastdatatime = ntime; iv::queryReplyStream reply; reply.set_nres(nres); if(nres > 0) { reply.set_xdata(xvectorquerydata.data(),xvectorquerydata.size()); reply.set_id(id); reply.set_ntime(ntime); reply.set_nculatency(nculatency); reply.set_nserverlatency(*pnserverlatency); reply.set_nsendtime(QDateTime::currentMSecsSinceEpoch()); stream->Write(reply); nlastsend = xTime.elapsed(); } else { if(abs(xTime.elapsed() - nlastsend)>1000) { stream->Write(reply); nlastsend = xTime.elapsed(); } } std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } // Logic and data behind the server's behavior. class UploadServiceImpl final : public iv::UploadStream::Service { Status upload(ServerContext* context, ::grpc::ServerReaderWriter* stream) override { iv::UploadRequestStream request; bool brun = true; std::string strctrlmd5 = "md5"; std::string strvin = "aaa"; bool bUpdatemd4orvin = false; QMutex uploadmutex; int reqid = -1; qint64 pnsendtime = 0; qint64 nrecvtime = 0; QMutex mutexidtime; std::thread * pthread = new std::thread(uploadsend,stream,&brun,&strvin,&strctrlmd5,&bUpdatemd4orvin,&uploadmutex, &reqid,&pnsendtime,&nrecvtime,&mutexidtime); std::cout<<"new connect."<Read(&request)) { std::cout<<" rec req."< xvectordata; qDebug("size is %d",request.xdata().size()); if((strctrlmd5 != request.strctrlmd5())||(strvin != request.strvin())) { uploadmutex.lock(); strctrlmd5 = request.strctrlmd5(); strvin = request.strvin(); bUpdatemd4orvin = true; uploadmutex.unlock(); } if(request.xdata().size()>0) { xvectordata.resize(request.xdata().size()); memcpy(xvectordata.data(),request.xdata().data(),request.xdata().size()); } mutexidtime.lock(); reqid = request.id(); pnsendtime = request.nsendtime(); nrecvtime = QDateTime::currentMSecsSinceEpoch(); mutexidtime.unlock(); gcumsgbuf.addmsg(request.id(),request.ntime(),request.strvin(),request.strquerymd5(),request.strctrlmd5(), &xvectordata,request.bimportant(),request.kepptime(),request.nlatency()); // std::cout << "收到请求,类型为" << request.askmsg() <<"\n"<join(); std::cout<<"dis connect."<* stream) override { iv::queryReqStream request; bool brun = true; std::string strctrlmd5 = "md5"; std::string strquerymd5 = "md5"; std::string strvin = "aaa"; bool bUpdatemd4orvin = false; QMutex uploadmutex; qint64 nreplysendtime; qint64 npausetime; std::vector xvectorserverlatency; qint64 nserverlatency = 0; std::thread * pthread = new std::thread(queryctrlsend,stream,&brun,&strvin,&strquerymd5,&bUpdatemd4orvin,&uploadmutex,&nserverlatency); std::cout<<"new connect."<Read(&request)) { std::cout<<" rec req."< xvectordata; qDebug("size is %d",request.xdata().size()); if((strquerymd5 != request.strquerymd5())||(strvin != request.strvin())) { uploadmutex.lock(); strquerymd5 = request.strquerymd5(); strvin = request.strvin(); bUpdatemd4orvin = true; uploadmutex.unlock(); } if(request.xdata().size()>0) { xvectordata.resize(request.xdata().size()); memcpy(xvectordata.data(),request.xdata().data(),request.xdata().size()); } nreplysendtime = request.nreplysendtime(); npausetime = request.npausetime(); if(nreplysendtime > 0) { nserverlatency = calcserverlatency(xvectorserverlatency,QDateTime::currentMSecsSinceEpoch() - nreplysendtime - npausetime); } static int tempid = 0; tempid++; int nid = gpcmsgbuf.addmsg(tempid,request.ntime(),request.strvin(),request.strctrlmd5(),&xvectordata, request.bimportant(),request.kepptime(),request.nsuggestframerate()); (void)&nid; // std::cout << "收到请求,类型为" << request.askmsg() <<"\n"<join(); std::cout<<"dis connect."< xvectorquerydata; // int nres = gcumsgbuf.getmsg(request->strvin(),request->strquerymd5(),request->nlasttime(),id,ntime,&xvectorquerydata); // reply->set_nres(nres); // if(nres > 0) // { // reply->set_data(xvectorquerydata.data(),xvectorquerydata.size()); // reply->set_id(id); // reply->set_ntime(ntime); // } // return Status::OK; // } // Status ctrl(ServerContext* context, const iv::ctrlreq* request, // iv::ctrlReply * reply) override { // std::vector xvectordata; // if(request->data().size()>0) // { // xvectordata.resize(request->data().size()); // memcpy(xvectordata.data(),request->data().data(),request->data().size()); // } // int nid = gpcmsgbuf.addmsg(request->id(),request->ntime(),request->strvin(),request->strctrlmd5(),&xvectordata, // request->bimportant(),request->kepptime()); // reply->set_nsendid(nid); // return Status::OK; // } }; void RunServer() { std::string server_address("0.0.0.0:50051"); UploadServiceImpl service; grpc::EnableDefaultHealthCheckService(true); // grpc::reflection::InitProtoReflectionServerBuilderPlugin(); ServerBuilder builder; // Listen on the given address without any authentication mechanism. builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.SetMaxReceiveMessageSize(300000000); // builder.SetMaxMessageSize(100000000); // builder.SetMaxSendMessageSize(100000000); // Register "service" as the instance through which we'll communicate with // clients. In this case it corresponds to an *synchronous* service. builder.RegisterService(&service); // Finally assemble the server. std::unique_ptr server(builder.BuildAndStart()); std::cout << "Server listening on " << server_address << std::endl; // Wait for the server to shutdown. Note that some other thread must be // responsible for shutting down the server for this call to ever return. server->Wait(); } int main(int argc, char *argv[]) { QCoreApplication a(argc, argv); gpcmsgbuf.start(); RunServer(); return a.exec(); }