Explorar el Código

change driver_cloud_grpc_server_thread.

yuchuli hace 3 años
padre
commit
17c8f47c0c

+ 124 - 1
src/driver/driver_cloud_grpc_pc_thread/grpcpc.cpp

@@ -11,9 +11,16 @@ void ListenData(const char * strdata,const unsigned int nSize,const unsigned int
 
 
 grpcpc::grpcpc(std::string stryamlpath)
 grpcpc::grpcpc(std::string stryamlpath)
 {
 {
+
+    mstrpicmsgname[0] = "picfront";
+    mstrpicmsgname[1] = "picrear";
+    mstrpicmsgname[2] = "picleft";
+    mstrpicmsgname[3] = "picright";
+
+
     ggrpcpc = this;
     ggrpcpc = this;
     dec_yaml(stryamlpath.data());
     dec_yaml(stryamlpath.data());
-    int i;
+    unsigned int i;
     for(i=0;i<mvectormsgunit.size();i++)
     for(i=0;i<mvectormsgunit.size();i++)
     {
     {
         mvectormsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectormsgunit[i].mstrmsgname,mvectormsgunit[i].mnBufferSize,mvectormsgunit[i].mnBufferCount);
         mvectormsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectormsgunit[i].mstrmsgname,mvectormsgunit[i].mnBufferSize,mvectormsgunit[i].mnBufferCount);
@@ -23,6 +30,21 @@ grpcpc::grpcpc(std::string stryamlpath)
     {
     {
         mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectorctrlmsgunit[i].mstrmsgname,ListenData);
         mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectorctrlmsgunit[i].mstrmsgname,ListenData);
     }
     }
+
+    for(i=0;i<NUM_CAM;i++)
+    {
+        mpaPic[i] = iv::modulecomm::RegisterSend(mstrpicmsgname[i].data(),2000000,1);
+    }
+
+
+    for(i=0;i<NUM_CAM;i++)
+    {
+        unsigned int j;
+        for(j=0;j<NUM_THREAD_PERCAM;j++)
+        {
+            mpThread[i*NUM_THREAD_PERCAM + j] = new std::thread(&grpcpc::threadpicdownload,this,i);
+        }
+    }
 }
 }
 
 
 
 
@@ -348,3 +370,104 @@ std::string grpcpc::GetVIN()
 {
 {
     return gstrVIN;
     return gstrVIN;
 }
 }
+
+
+void grpcpc::threadpicdownload(int nCamPos)
+{
+    std::cout<<"thread cam "<<nCamPos<<"run"<<std::endl;
+    int nsize = mvectormsgunit.size();
+    int i;
+
+    std::string strcclientid = "test";
+
+    int ninterval = atoi(gstruploadinterval.data());
+    if(ninterval<=0)ninterval = 100;
+
+    QTime xTime;
+    xTime.start();
+    int nlastsend = xTime.elapsed();
+
+    std::string target_str = gstrserverip+":";
+    target_str = target_str + gstrserverport ;//std::to_string()
+    auto cargs = grpc::ChannelArguments();
+    cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
+    cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
+
+    std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
+             target_str, grpc::InsecureChannelCredentials(),cargs);
+
+    std::unique_ptr<iv::UploadThread::Stub> stub_ = iv::UploadThread::NewStub(channel);
+
+
+    iv::PicDownReqThread request;
+
+    int nid = 0;
+
+    // Container for the data we expect from the server.
+    iv::PicDownReplyThread reply;
+
+    gpr_timespec timespec;
+      timespec.tv_sec = 30;//设置阻塞时间为2秒
+      timespec.tv_nsec = 0;
+      timespec.clock_type = GPR_TIMESPAN;
+
+ //   ClientContext context;
+
+
+
+    while(true)
+    {
+        std::shared_ptr<char> pstr_ptr;
+        if((nCamPos<0)||(nCamPos >= NUM_CAM))
+        {
+            std::cout<<"Cam Pos Error. "<<"Pos: "<<nCamPos<<" TOTAL:"<<NUM_CAM<<std::endl;
+            std::this_thread::sleep_for(std::chrono::milliseconds(100));
+            continue;
+        }
+
+
+        request.set_strclientid(strcclientid);
+        request.set_ncampos(nCamPos);
+        request.set_strquerymd5(gstrqueryMD5);
+        request.set_strvin(gstrVIN);
+
+
+        ClientContext context ;
+        context.set_deadline(timespec);
+        qint64 time1 = QDateTime::currentMSecsSinceEpoch();
+
+
+        nlastsend = xTime.elapsed();
+        // The actual RPC.
+        Status status = stub_->querypic(&context, request, &reply);
+        if (status.ok()) {
+
+            if(reply.nres() == 1)
+            {
+                std::cout<<"pic time is "<<reply.npictime()<<std::endl;
+//                iv::cloud::cloudmsg xmsg;
+//                if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
+//                {
+//                    sharectrlmsg(&xmsg);
+//                }
+            }
+        } else {
+          std::cout << status.error_code() << ": " << status.error_message()
+                    << std::endl;
+          std::cout<<"RPC failed"<<std::endl;
+          if(status.error_code() == 4)
+          {
+              std::cout<<nCamPos<<" RPC Exceed Time, Create New stub_"<<std::endl;
+              channel = grpc::CreateCustomChannel(
+                       target_str, grpc::InsecureChannelCredentials(),cargs);
+
+              stub_ = iv::UploadThread::NewStub(channel);
+          }
+          std::this_thread::sleep_for(std::chrono::milliseconds(900));
+
+        }
+
+
+    }
+}
+

+ 19 - 4
src/driver/driver_cloud_grpc_pc_thread/grpcpc.h

@@ -30,6 +30,14 @@
 
 
 #include "uploadthreadmsg.grpc.pb.h"
 #include "uploadthreadmsg.grpc.pb.h"
 
 
+#ifndef NUM_CAM
+#define NUM_CAM 4
+#endif
+
+#ifndef NUM_THREAD_PERCAM
+#define NUM_THREAD_PERCAM 4
+#endif
+
 using grpc::Channel;
 using grpc::Channel;
 using grpc::ClientContext;
 using grpc::ClientContext;
 using grpc::Status;
 using grpc::Status;
@@ -58,8 +66,8 @@ private:
     void run();
     void run();
 
 
 private:
 private:
-    std::string gstrserverip =  "140.143.237.38";
-    std::string gstrserverport = "9000";
+    std::string gstrserverip = "127.0.0.1";// "140.143.237.38";
+    std::string gstrserverport = "50051";//"9000";
     std::string gstruploadinterval = "100";
     std::string gstruploadinterval = "100";
     void * gpa;
     void * gpa;
     QMutex gMutexMsg;
     QMutex gMutexMsg;
@@ -69,8 +77,8 @@ private:
 
 
 
 
     std::string gstrVIN = "AAAAAAAAAAAAAAAAA";
     std::string gstrVIN = "AAAAAAAAAAAAAAAAA";
-    std::string gstrqueryMD5 = "5d41402abc4b2a76b9719d911017c592";
-    std::string gstrctrlMD5 = "5d41402abc4b2a76b9719d911017c592";
+    std::string gstrqueryMD5 = "5d41402abc4b2a76b9719d911017c591";
+    std::string gstrctrlMD5 = "5d41402abc4b2a76b9719d911017c591";
 
 
 
 
 
 
@@ -86,6 +94,13 @@ public:
     void UpdateData(const char * strdata,const unsigned int nSize,const char * strmemname);
     void UpdateData(const char * strdata,const unsigned int nSize,const char * strmemname);
     std::string GetVIN();
     std::string GetVIN();
 
 
+private:
+    void threadpicdownload(int nCamPos);
+    void * mpaPic[NUM_CAM];
+    std::string mstrpicmsgname[NUM_CAM];
+
+    std::thread * mpThread[NUM_CAM * NUM_THREAD_PERCAM];
+
 
 
 };
 };
 
 

+ 64 - 0
src/driver/driver_cloud_grpc_server_thread/clientime.cpp

@@ -0,0 +1,64 @@
+#include "clientime.h"
+
+clientime::clientime()
+{
+
+}
+
+iv::clientinfo * clientime::FindClientInfo(std::string strclientid)
+{
+    unsigned int i;
+    bool bFind = false;
+    unsigned int nPos= 0;
+    for(i=0;i<mvectorclient.size();i++)
+    {
+        if(strclientid == mvectorclient[i].mstrclientid)
+        {
+            bFind = true;
+            nPos = i;
+            break;
+        }
+    }
+    if(bFind)
+    {
+        return &mvectorclient[nPos];
+    }
+
+    iv::clientinfo xclient;
+    xclient.mstrclientid = strclientid;
+    for(i=0;i<CAM_NUM;i++)
+    {
+        xclient.mnCamLastTime[i] = 0;
+    }
+    mvectorclient.push_back(xclient);
+    return &mvectorclient[mvectorclient.size() -1];
+
+}
+
+int clientime::SetClientCamLastTime(std::string strclientid, int nCamPos, qint64 nLastTime)
+{
+    if((nCamPos<0)||(nCamPos >= CAM_NUM))
+    {
+        return -1;
+    }
+
+    mMutex.lock();
+    iv::clientinfo * pclient = FindClientInfo(strclientid);
+    pclient->mnCamLastTime[nCamPos] = nLastTime;
+    mMutex.unlock();
+    return 0;
+}
+
+int clientime::GetClientCamLastTime(std::string strclientid, int nCamPos, qint64 &nLastTime)
+{
+    if((nCamPos<0)||(nCamPos >= CAM_NUM))
+    {
+        return -1;
+    }
+    mMutex.lock();
+    iv::clientinfo * pclient = FindClientInfo(strclientid);
+    nLastTime = pclient->mnCamLastTime[nCamPos];
+    mMutex.unlock();
+    return 0;
+
+}

+ 40 - 0
src/driver/driver_cloud_grpc_server_thread/clientime.h

@@ -0,0 +1,40 @@
+#ifndef CLIENTIME_H
+#define CLIENTIME_H
+
+#include <string>
+#include <vector>
+#include <QThread>
+#include <QMutex>
+
+#ifndef CAM_NUM
+#define  CAM_NUM 4
+#endif
+
+namespace  iv {
+
+struct clientinfo
+{
+  std::string mstrclientid;
+  qint64 mnCamLastTime[CAM_NUM];
+};
+}
+
+
+class clientime
+{
+public:
+    clientime();
+
+private:
+    std::vector<iv::clientinfo> mvectorclient;
+    QMutex mMutex;
+
+private:
+    inline iv::clientinfo * FindClientInfo(std::string strclientid);
+
+public:
+    int GetClientCamLastTime(std::string strclientid,int nCamPos,qint64 & nLastTime);
+    int SetClientCamLastTime(std::string strclientid,int nCamPos,qint64 nLastTime);
+};
+
+#endif // CLIENTIME_H

+ 13 - 1
src/driver/driver_cloud_grpc_server_thread/cumsgbuffer.cpp

@@ -148,7 +148,7 @@ void cumsgbuffer::addPicData(std::string strVIN, const char *strdata, const unsi
 }
 }
 
 
 int cumsgbuffer::getPicData(std::string strVIN, std::string strqueryMD5,std::shared_ptr<char> &pdata_ptr,
 int cumsgbuffer::getPicData(std::string strVIN, std::string strqueryMD5,std::shared_ptr<char> &pdata_ptr,
-                            unsigned int &ndatasize, qint64 &nLastPicTime, int nCamPos)
+                            unsigned int &ndatasize,std::string strclientid, int nCamPos,qint64 & nPicTime)
 {
 {
     mMutex.lock();
     mMutex.lock();
     iv::cumsg * pmsg = 0;
     iv::cumsg * pmsg = 0;
@@ -184,14 +184,26 @@ int cumsgbuffer::getPicData(std::string strVIN, std::string strqueryMD5,std::sha
         return -3;
         return -3;
     }
     }
 
 
+    qint64 nLastPicTime = 0;
+
+    if(mclienttime.GetClientCamLastTime(strclientid,nCamPos,nLastPicTime) < 0)
+    {
+        std::cout<<"cumsgbuffer::getPicData Can't Get Camera LastTime"<<std::endl;
+        mMutex.unlock();
+        return -4;
+    }
+
     iv::PicData xPicData;
     iv::PicData xPicData;
     int nrtn = pmsg->mpicbuf[nCamPos].GetData(nLastPicTime,xPicData);
     int nrtn = pmsg->mpicbuf[nCamPos].GetData(nLastPicTime,xPicData);
     if(nrtn == 1)
     if(nrtn == 1)
     {
     {
         pdata_ptr = xPicData.mdata_ptr;
         pdata_ptr = xPicData.mdata_ptr;
         ndatasize = xPicData.mdatasize;
         ndatasize = xPicData.mdatasize;
+        nPicTime = nLastPicTime;
     }
     }
 
 
+    mclienttime.SetClientCamLastTime(strclientid,nCamPos,nLastPicTime);
+
 
 
     mMutex.unlock();
     mMutex.unlock();
     return nrtn;
     return nrtn;

+ 6 - 3
src/driver/driver_cloud_grpc_server_thread/cumsgbuffer.h

@@ -9,10 +9,12 @@
 
 
 #include <memory>
 #include <memory>
 
 
-
 #include <picbuf.h>
 #include <picbuf.h>
+#include "clientime.h"
 
 
+#ifndef CAM_NUM
 #define CAM_NUM 4
 #define CAM_NUM 4
+#endif
 
 
 namespace iv {
 namespace iv {
 struct cumsg
 struct cumsg
@@ -41,6 +43,7 @@ public:
 
 
 private:
 private:
     std::vector<iv::cumsg> mvectormsg;
     std::vector<iv::cumsg> mvectormsg;
+    clientime mclienttime;
 
 
     QMutex mMutex;
     QMutex mMutex;
 
 
@@ -58,8 +61,8 @@ public:
 
 
     void addPicData(std::string strVIN,const char * strdata,const unsigned int ndatasize,qint64 npictime,int nCamPos);
     void addPicData(std::string strVIN,const char * strdata,const unsigned int ndatasize,qint64 npictime,int nCamPos);
 
 
-    int getPicData(std::string strVIN,std::string strqueryMD5,std::shared_ptr<char> & pdata_ptr,unsigned int & ndatasize,qint64 & nLastPicTime
-                   ,int nCamPos);
+    int getPicData(std::string strVIN,std::string strqueryMD5,std::shared_ptr<char> & pdata_ptr,unsigned int & ndatasize,
+                   std::string strclientid,int nCamPos,qint64 & nPicTime);
 
 
 
 
 };
 };

+ 4 - 2
src/driver/driver_cloud_grpc_server_thread/driver_cloud_grpc_server_thread.pro

@@ -23,7 +23,8 @@ SOURCES += \
         cumsgbuffer.cpp \
         cumsgbuffer.cpp \
         main.cpp \
         main.cpp \
         pcmsgbuffer.cpp \
         pcmsgbuffer.cpp \
-        picbuf.cpp
+        picbuf.cpp \
+    clientime.cpp
 
 
 # Default rules for deployment.
 # Default rules for deployment.
 qnx: target.path = /tmp/$${TARGET}/bin
 qnx: target.path = /tmp/$${TARGET}/bin
@@ -54,4 +55,5 @@ HEADERS += \
     ../driver_cloud_grpc_thread/uploadthreadmsg.grpc.pb.h \
     ../driver_cloud_grpc_thread/uploadthreadmsg.grpc.pb.h \
     cumsgbuffer.h \
     cumsgbuffer.h \
     pcmsgbuffer.h  \
     pcmsgbuffer.h  \
-    picbuf.h
+    picbuf.h \
+    clientime.h

+ 17 - 0
src/driver/driver_cloud_grpc_server_thread/main.cpp

@@ -120,6 +120,23 @@ class UploadServiceImpl final : public iv::UploadThread::Service {
 
 
   }
   }
 
 
+  Status querypic(grpc::ServerContext *context, const iv::PicDownReqThread *request, iv::PicDownReplyThread *response) override {
+
+        std::shared_ptr<char> pstr_data;
+        unsigned int ndatasize;
+        qint64 nPicTime;
+        int nres =gcumsgbuf.getPicData(request->strvin(),request->strquerymd5(),pstr_data,ndatasize,request->strclientid(), request->ncampos(),nPicTime);
+        response->set_nres(nres);
+        if(nres == 1)
+        {
+            response->set_ncampos(request->ncampos());
+            response->set_npictime(nPicTime);
+            response->set_xdata(pstr_data.get(),ndatasize);
+        }
+
+        return Status::OK;
+  }
+
 //  Status ctrl(ServerContext* context, const iv::ctrlreq* request,
 //  Status ctrl(ServerContext* context, const iv::ctrlreq* request,
 //                  iv::ctrlReply * reply) override {
 //                  iv::ctrlReply * reply) override {