Browse Source

add driver_cloud_grpc_client_stream for grpc stream mode.

yuchuli 4 years ago
parent
commit
5c8a2bebd3

+ 58 - 0
src/driver/driver_cloud_grpc_client_stream/driver_cloud_grpc_client_stream.pro

@@ -0,0 +1,58 @@
+QT -= gui
+
+CONFIG += c++11 console
+CONFIG -= app_bundle
+
+QMAKE_LFLAGS += -no-pie
+
+# The following define makes your compiler emit warnings if you use
+# any Qt feature that has been marked deprecated (the exact warnings
+# depend on your compiler). Please consult the documentation of the
+# deprecated API in order to know how to port your code away from it.
+DEFINES += QT_DEPRECATED_WARNINGS
+
+# You can also make your code fail to compile if it uses deprecated APIs.
+# In order to do so, uncomment the following line.
+# You can also select to disable deprecated APIs only up to a certain version of Qt.
+#DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000    # disables all the APIs deprecated before Qt 6.0.0
+
+SOURCES += \
+        ../../include/msgtype/cloud.pb.cc \
+    ../../include/msgtype/uploadstreammsg.pb.cc \
+        main.cpp \
+    grpcclient.cpp \
+    uploadstreammsg.grpc.pb.cc
+
+# Default rules for deployment.
+qnx: target.path = /tmp/$${TARGET}/bin
+else: unix:!android: target.path = /opt/$${TARGET}/bin
+!isEmpty(target.path): INSTALLS += target
+
+!include(../../../include/common.pri ) {
+    error( "Couldn't find the common.pri file!" )
+}
+
+!include(../../../include/ivprotobuf.pri ) {
+    error( "Couldn't find the ivprotobuf.pri file!" )
+}
+
+!include(../../../include/ivboost.pri ) {
+    error( "Couldn't find the ivboost.pri file!" )
+}
+
+!include(../../../include/ivgrpc.pri ) {
+    error( "Couldn't find the ivgrpc.pri file!" )
+}
+
+!include(../../../include/ivyaml-cpp.pri ) {
+    error( "Couldn't find the ivyaml-cpp.pri file!" )
+}
+
+
+
+HEADERS += \
+    ../../include/msgtype/cloud.pb.h \
+    ../../include/msgtype/uploadstreammsg.pb.h \
+    grpcclient.h \
+    uploadstreammsg.grpc.pb.h
+

+ 47 - 0
src/driver/driver_cloud_grpc_client_stream/driver_cloud_grpc_client_stream.yaml

@@ -0,0 +1,47 @@
+server : 47.96.250.93
+port : 50051
+uploadinterval : 100
+
+VIN : AAAAAAAAAAAAAAAAA
+queryMD5 : 5d41402abc4b2a76b9719d911017c592
+ctrlMD5  : 5d41402abc4b2a76b9719d911017c592
+
+
+uploadmessage:
+  usbpic:
+    msgname: compresspic
+    buffersize: 10000000
+    buffercount: 1
+  hcp2_gpsimu:
+    msgname: hcp2_gpsimu
+    buffersize: 100000
+    buffercount: 1
+
+#  tracemap:
+#    msgname: tracemap
+#    buffersize: 10000000
+#    buffercount: 1
+
+  simpletrace:
+    msgname: simpletrace
+    buffersize: 10000000
+    buffercount: 1
+    bimportant: true
+    keeptime: 3000
+
+ctrlmessage:
+  xodrsrc:
+    msgname: xodrsrc
+    buffersize: 1000
+    buffercount: 1
+  xodrreq:
+    msgname: xodrreq
+    buffersize: 1000
+    buffercount: 1
+  remotectrl:
+    msgname: remotectrl
+    buffersize: 10000
+    buffercount: 1
+
+
+

+ 444 - 0
src/driver/driver_cloud_grpc_client_stream/grpcclient.cpp

@@ -0,0 +1,444 @@
+#include "grpcclient.h"
+
+grpcclient * ggrpcclient;
+
+void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
+{
+    ggrpcclient->UpdateData(strdata,nSize,strmemname);
+
+}
+
+
+grpcclient::grpcclient(std::string stryamlpath)
+{
+    ggrpcclient = this;
+    dec_yaml(stryamlpath.data());
+
+    int i;
+    for(i=0;i<mvectormsgunit.size();i++)
+    {
+        mvectormsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectormsgunit[i].mstrmsgname,ListenData);
+    }
+
+    for(i=0;i<mvectorctrlmsgunit.size();i++)
+    {
+        mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectorctrlmsgunit[i].mstrmsgname,mvectorctrlmsgunit[i].mnBufferSize,
+                                                                 mvectorctrlmsgunit[i].mnBufferCount);
+    }
+}
+
+void grpcclient::threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writer,bool * pbrun)
+{
+    int nsize = mvectormsgunit.size();
+    int i;
+
+    int ninterval = atoi(gstruploadinterval.data());
+    if(ninterval<=0)ninterval = 100;
+
+    QTime xTime;
+    xTime.start();
+    int nlastsend = xTime.elapsed();
+
+    int nid= 0;
+
+    while(*pbrun)
+    {
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        if((xTime.elapsed()-nlastsend)<ninterval)
+        {
+            continue;
+        }
+
+        bool bImportant = false;
+        int nkeeptime = 0;
+        iv::cloud::cloudmsg xmsg;
+        xmsg.set_xtime(QDateTime::currentMSecsSinceEpoch());
+        gMutexMsg.lock();
+        for(i=0;i<nsize;i++)
+        {
+            if(mvectormsgunit[i].mbRefresh)
+            {
+                mvectormsgunit[i].mbRefresh = false;
+                if(mvectormsgunit[i].mbImportant)
+                {
+                    bImportant = true;
+                }
+                if(mvectormsgunit[i].mnkeeptime > nkeeptime)
+                {
+                    nkeeptime = mvectormsgunit[i].mnkeeptime;
+                }
+                iv::cloud::cloudunit xcloudunit;
+                xcloudunit.set_msgname(mvectormsgunit[i].mstrmsgname);
+                xcloudunit.set_data(mvectormsgunit[i].mpstrmsgdata.get(),mvectormsgunit[i].mndatasize);
+                iv::cloud::cloudunit * pcu = xmsg.add_xclouddata();
+                pcu->CopyFrom(xcloudunit);
+            }
+
+        }
+        gMutexMsg.unlock();
+
+        int nbytesize = xmsg.ByteSize();
+        char * strbuf = new char[nbytesize];
+        std::shared_ptr<char> pstrbuf;
+        pstrbuf.reset(strbuf);
+        if(xmsg.SerializeToArray(strbuf,nbytesize))
+        {
+            iv::UploadRequestStream request;
+            qint64 time1 = QDateTime::currentMSecsSinceEpoch();
+
+            request.set_id(nid);
+            request.set_ntime(time1);
+            request.set_strquerymd5(gstrqueryMD5);
+            request.set_strctrlmd5(gstrctrlMD5);
+            request.set_strvin(gstrVIN);
+            request.set_xdata(strbuf,nbytesize);
+            request.set_kepptime(nkeeptime);
+            request.set_bimportant(bImportant);
+            nid++;
+
+            nlastsend = xTime.elapsed();
+
+            bool bsend = writer->Write(request);
+            std::cout<<"send msg. rtn is "<<bsend<<std::endl;
+
+
+        }
+
+
+    }
+}
+
+void grpcclient::run()
+{
+    int nsize = mvectormsgunit.size();
+    int i;
+
+    int ninterval = atoi(gstruploadinterval.data());
+    if(ninterval<=0)ninterval = 100;
+
+    QTime xTime;
+    xTime.start();
+    int nlastsend = xTime.elapsed();
+
+    std::string target_str = gstrserverip+":";
+    target_str = target_str + gstrserverport ;//std::to_string()
+    auto cargs = grpc::ChannelArguments();
+    cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
+    cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
+
+    std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
+             target_str, grpc::InsecureChannelCredentials(),cargs);
+
+    std::unique_ptr<iv::UploadStream::Stub> stub_ = iv::UploadStream::NewStub(channel);
+
+
+    iv::UploadRequestStream request;
+
+    int nfail = 0;
+
+    while(!QThread::isInterruptionRequested())
+    {
+        ClientContext context ;
+
+
+
+
+        std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writerRead(stub_->upload(&context));
+
+        bool bRun = true;
+        std::thread * pthread = new std::thread(&grpcclient::threadsend,this,writerRead,&bRun);
+        (void )pthread;
+        iv::UploadReplyStream reply;
+        while (writerRead->Read(&reply)) {
+            nfail = 0;
+    //        std::cout << "接收到回复:" << reply.remsg()<<"--\n" << std::endl;
+            if(reply.nres() == 1)
+            {
+                iv::cloud::cloudmsg xmsg;
+                if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
+                {
+                    sharectrlmsg(&xmsg);
+                }
+            }
+            std::cout<<"read data from server."<<std::endl;
+        }
+        bRun = false;
+        pthread->join();
+        nfail++;
+        if(nfail > 100)std::this_thread::sleep_for(std::chrono::milliseconds(3000));
+        else std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        std::cout<<"reconnnect to server. nfail is "<<nfail<<std::endl;
+    }
+
+    int nid = 0;
+
+    // Container for the data we expect from the server.
+//    iv::UploadReply reply;
+
+    gpr_timespec timespec;
+      timespec.tv_sec = 30;//设置阻塞时间为2秒
+      timespec.tv_nsec = 0;
+      timespec.clock_type = GPR_TIMESPAN;
+
+ //   ClientContext context;
+
+
+
+//    while(!QThread::isInterruptionRequested())
+//    {
+//        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+//        if((xTime.elapsed()-nlastsend)<ninterval)
+//        {
+//            continue;
+//        }
+
+//            bool bImportant = false;
+//            int nkeeptime = 0;
+//            iv::cloud::cloudmsg xmsg;
+//            xmsg.set_xtime(QDateTime::currentMSecsSinceEpoch());
+//            gMutexMsg.lock();
+//            for(i=0;i<nsize;i++)
+//            {
+//                if(mvectormsgunit[i].mbRefresh)
+//                {
+//                    mvectormsgunit[i].mbRefresh = false;
+//                    if(mvectormsgunit[i].mbImportant)
+//                    {
+//                        bImportant = true;
+//                    }
+//                    if(mvectormsgunit[i].mnkeeptime > nkeeptime)
+//                    {
+//                        nkeeptime = mvectormsgunit[i].mnkeeptime;
+//                    }
+//                    iv::cloud::cloudunit xcloudunit;
+//                    xcloudunit.set_msgname(mvectormsgunit[i].mstrmsgname);
+//                    xcloudunit.set_data(mvectormsgunit[i].mpstrmsgdata.get(),mvectormsgunit[i].mndatasize);
+//                    iv::cloud::cloudunit * pcu = xmsg.add_xclouddata();
+//                    pcu->CopyFrom(xcloudunit);
+//                }
+
+//            }
+//            gMutexMsg.unlock();
+
+//            int nbytesize = xmsg.ByteSize();
+//            char * strbuf = new char[nbytesize];
+//            std::shared_ptr<char> pstrbuf;
+//            pstrbuf.reset(strbuf);
+//            if(xmsg.SerializeToArray(strbuf,nbytesize))
+//            {
+
+//                ClientContext context ;
+//                context.set_deadline(timespec);
+//                qint64 time1 = QDateTime::currentMSecsSinceEpoch();
+
+//                request.set_id(nid);
+//                request.set_ntime(time1);
+//                request.set_strquerymd5(gstrqueryMD5);
+//                request.set_strctrlmd5(gstrctrlMD5);
+//                request.set_strvin(gstrVIN);
+//                request.set_xdata(strbuf,nbytesize);
+//                request.set_kepptime(nkeeptime);
+//                request.set_bimportant(bImportant);
+//                nid++;
+
+//                nlastsend = xTime.elapsed();
+//                // The actual RPC.
+//                Status status = stub_->upload(&context, request, &reply);
+//                if (status.ok()) {
+//                    std::cout<<"  data size is "<<nbytesize<<std::endl;
+//                    std::cout<<nid<<" upload successfully"<<std::endl;
+//                    if(reply.nres() == 1)
+//                    {
+//                        iv::cloud::cloudmsg xmsg;
+//                        if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
+//                        {
+//                            sharectrlmsg(&xmsg);
+//                        }
+//                    }
+//                } else {
+//                  std::cout << status.error_code() << ": " << status.error_message()
+//                            << std::endl;
+//                  std::cout<<"RPC failed"<<std::endl;
+//                  if(status.error_code() == 4)
+//                  {
+//                      std::cout<<" RPC Exceed Time, Create New stub_"<<std::endl;
+//                      channel = grpc::CreateCustomChannel(
+//                               target_str, grpc::InsecureChannelCredentials(),cargs);
+
+//                      stub_ = iv::Upload::NewStub(channel);
+//                  }
+//                  std::this_thread::sleep_for(std::chrono::milliseconds(900));
+
+//                }
+
+//            }
+
+
+//    }
+
+
+}
+
+
+void grpcclient::dec_yaml(const char * stryamlpath)
+{
+
+    YAML::Node config;
+    try
+    {
+        config = YAML::LoadFile(stryamlpath);
+    }
+    catch(YAML::BadFile e)
+    {
+        qDebug("load error.");
+        return;
+    }
+
+    std::vector<std::string> vecmodulename;
+
+
+    if(config["server"])
+    {
+        gstrserverip = config["server"].as<std::string>();
+    }
+    if(config["port"])
+    {
+        gstrserverport = config["port"].as<std::string>();
+    }
+    if(config["uploadinterval"])
+    {
+        gstruploadinterval = config["uploadinterval"].as<std::string>();
+    }
+
+    if(config["VIN"])
+    {
+        gstrVIN = config["VIN"].as<std::string>();
+    }
+
+    if(config["queryMD5"])
+    {
+        gstrqueryMD5 = config["queryMD5"].as<std::string>();
+    }
+    else
+    {
+        return;
+    }
+
+    if(config["ctrlMD5"])
+    {
+        gstrctrlMD5 = config["ctrlMD5"].as<std::string>();
+    }
+
+
+    std::string strmsgname;
+
+    if(config["uploadmessage"])
+    {
+
+        for(YAML::const_iterator it= config["uploadmessage"].begin(); it != config["uploadmessage"].end();++it)
+        {
+            std::string strtitle = it->first.as<std::string>();
+            std::cout<<strtitle<<std::endl;
+
+            if(config["uploadmessage"][strtitle]["msgname"]&&config["uploadmessage"][strtitle]["buffersize"]&&config["uploadmessage"][strtitle]["buffercount"])
+            {
+                iv::msgunit xmu;
+                strmsgname = config["uploadmessage"][strtitle]["msgname"].as<std::string>();
+                strncpy(xmu.mstrmsgname,strmsgname.data(),255);
+                xmu.mnBufferSize = config["uploadmessage"][strtitle]["buffersize"].as<int>();
+                xmu.mnBufferCount = config["uploadmessage"][strtitle]["buffercount"].as<int>();
+                if(config["uploadmessage"][strtitle]["bimportant"])
+                {
+                   std::string strimportant =    config["uploadmessage"][strtitle]["bimportant"].as<std::string>();
+                   if(strimportant == "true")
+                   {
+                       xmu.mbImportant = true;
+                   }
+                }
+                if(config["uploadmessage"][strtitle]["keeptime"])
+                {
+                   std::string strkeep =    config["uploadmessage"][strtitle]["keeptime"].as<std::string>();
+                   xmu.mnkeeptime = atoi(strkeep.data());
+                }
+                mvectormsgunit.push_back(xmu);
+            }
+        }
+    }
+    else
+    {
+
+
+    }
+
+    if(!config["ctrlMD5"])
+    {
+       return;
+    }
+
+    if(config["ctrlmessage"])
+    {
+        std::string strnodename = "ctrlmessage";
+        for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
+        {
+            std::string strtitle = it->first.as<std::string>();
+            std::cout<<strtitle<<std::endl;
+
+            if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
+            {
+                iv::msgunit xmu;
+                strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
+                strncpy(xmu.mstrmsgname,strmsgname.data(),255);
+                xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
+                xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
+                mvectorctrlmsgunit.push_back(xmu);
+            }
+        }
+    }
+    else
+    {
+
+    }
+
+    return;
+
+}
+
+void grpcclient::sharectrlmsg(iv::cloud::cloudmsg * pxmsg)
+{
+    int i;
+    int nsize = pxmsg->xclouddata_size();
+    for(i=0;i<nsize;i++)
+    {
+        int j;
+        int nquerysize = mvectorctrlmsgunit.size();
+        for(j=0;j<nquerysize;j++)
+        {
+            if(strncmp(pxmsg->xclouddata(i).msgname().data(), mvectorctrlmsgunit[j].mstrmsgname,255) == 0)
+            {
+ //               qDebug("size is %d ",pxmsg->xclouddata(i).data().size());
+                iv::modulecomm::ModuleSendMsg(mvectorctrlmsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size());
+                break;
+            }
+        }
+    }
+}
+
+void grpcclient::UpdateData(const char *strdata, const unsigned int nSize, const char *strmemname)
+{
+    int nsize = mvectormsgunit.size();
+    int i;
+    for(i=0;i<nsize;i++)
+    {
+        if(strncmp(strmemname,mvectormsgunit[i].mstrmsgname,255) == 0)
+        {
+            gMutexMsg.lock();
+            char * strtem = new char[nSize];
+            memcpy(strtem,strdata,nSize);
+            mvectormsgunit[i].mpstrmsgdata.reset(strtem);
+            mvectormsgunit[i].mndatasize = nSize;
+            mvectormsgunit[i].mbRefresh = true;
+            gMutexMsg.unlock();
+            break;
+        }
+    }
+}

+ 91 - 0
src/driver/driver_cloud_grpc_client_stream/grpcclient.h

@@ -0,0 +1,91 @@
+#ifndef GRPCCLIENT_H
+#define GRPCCLIENT_H
+
+#include <QThread>
+
+#include <yaml-cpp/yaml.h>
+
+#include <QDateTime>
+
+#include <iostream>
+
+#include <vector>
+
+#include <memory>
+
+#include <QMutex>
+
+#include <thread>
+
+#include "modulecomm.h"
+
+#include "cloud.pb.h"
+
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include <grpcpp/grpcpp.h>
+
+#include "uploadstreammsg.grpc.pb.h"
+
+
+namespace iv {
+struct msgunit
+{
+    char mstrmsgname[256];
+    int mnBufferSize = 10000;
+    int mnBufferCount = 1;
+    void * mpa;
+    std::shared_ptr<char> mpstrmsgdata;
+    int mndatasize = 0;
+    bool mbRefresh = false;
+    bool mbImportant = false;
+    int mnkeeptime = 100;
+};
+}
+
+
+using grpc::Channel;
+using grpc::ClientContext;
+using grpc::Status;
+
+class grpcclient : public QThread
+{
+public:
+    grpcclient(std::string stryamlpath);
+
+private:
+    std::string gstrserverip =  "0.0.0.0";//"123.57.212.138";
+    std::string gstrserverport = "50051";//"9000";
+    std::string gstruploadinterval = "1000";
+    void * gpa;
+    QMutex gMutexMsg;
+    std::thread * guploadthread;
+
+
+
+
+    std::vector<iv::msgunit> mvectormsgunit;
+
+    std::vector<iv::msgunit> mvectorctrlmsgunit;
+
+
+    std::string gstrVIN = "AAAAAAAAAAAAAAAAA";
+    std::string gstrqueryMD5 = "5d41402abc4b2a76b9719d911017c591";//"5d41402abc4b2a76b9719d911017c592";
+    std::string gstrctrlMD5 = "5d41402abc4b2a76b9719d911017c591";
+
+
+
+    int gindex = 0;
+public:
+    void UpdateData(const char * strdata,const unsigned int nSize,const char * strmemname);
+private:
+    void run();
+    void dec_yaml(const char * stryamlpath);
+    void sharectrlmsg(iv::cloud::cloudmsg * pxmsg);
+
+    void threadsend(std::shared_ptr<::grpc::ClientReaderWriter<iv::UploadRequestStream, iv::UploadReplyStream> > writer,bool * pbrun);
+};
+
+#endif // GRPCCLIENT_H

+ 472 - 0
src/driver/driver_cloud_grpc_client_stream/main.cpp

@@ -0,0 +1,472 @@
+#include <QCoreApplication>
+
+#include "grpcclient.h"
+
+#include "ivversion.h"
+
+/*
+#include <yaml-cpp/yaml.h>
+
+#include <QDateTime>
+
+#include <iostream>
+
+#include <vector>
+
+#include <memory>
+
+#include <QMutex>
+
+#include <thread>
+
+#include "modulecomm.h"
+
+#include "cloud.pb.h"
+
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include <grpcpp/grpcpp.h>
+
+#include "uploadmsg.grpc.pb.h"
+
+
+using grpc::Channel;
+using grpc::ClientContext;
+using grpc::Status;
+
+
+void test()
+{
+    std::string target_str = "0.0.0.0:50051";
+
+    auto cargs = grpc::ChannelArguments();
+    cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
+    cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
+
+    std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
+             target_str, grpc::InsecureChannelCredentials(),cargs);
+
+    std::unique_ptr<iv::Upload::Stub> stub_ = iv::Upload::NewStub(channel);
+
+
+    iv::UploadRequest request;
+
+
+
+    // Container for the data we expect from the server.
+    iv::UploadReply reply;
+
+    int nid = 0;
+
+    nid = 1;
+
+    while(1)
+    {
+
+
+
+
+        // Context for the client. It could be used to convey extra information to
+        // the server and/or tweak certain RPC behaviors.
+
+
+        ClientContext context ;
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        qint64 time1 = QDateTime::currentMSecsSinceEpoch();
+
+        request.set_id(nid);
+        request.set_ntime(time1);
+        nid++;
+        // The actual RPC.
+        Status status = stub_->upload(&context, request, &reply);
+        if (status.ok()) {
+            std::cout<<nid<<" upload successfully"<<std::endl;
+//            qint64 time2;
+
+//            memcpy(&time2,reply.data().data(),8);
+//            qint64 time3 = QDateTime::currentMSecsSinceEpoch();
+//            std::cout<<"reply data size is "<<reply.data().size()<<std::endl;
+//            std::cout<<" latency is "<<(time2 - time1)<<" 2 is "<<(time3 - time2)<<std::endl;
+//          return reply.message();
+        } else {
+          std::cout << status.error_code() << ": " << status.error_message()
+                    << std::endl;
+          std::cout<<"RPC failed"<<std::endl;
+          std::this_thread::sleep_for(std::chrono::milliseconds(900));
+
+//          delete pcontext;
+//          pcontext = new ClientContext;
+
+//          channel = grpc::CreateCustomChannel(
+//                   target_str, grpc::InsecureChannelCredentials(),cargs);
+
+//          stub_ = iv::Upload::NewStub(channel);
+        }
+    }
+}
+
+std::string gstrserverip =  "0.0.0.0";//"123.57.212.138";
+std::string gstrserverport = "50051";//"9000";
+std::string gstruploadinterval = "1000";
+void * gpa;
+QMutex gMutexMsg;
+std::thread * guploadthread;
+
+
+
+
+std::vector<iv::msgunit> mvectormsgunit;
+
+std::vector<iv::msgunit> mvectorctrlmsgunit;
+
+
+std::string gstrVIN = "AAAAAAAAAAAAAAAAA";
+std::string gstrqueryMD5 = "5d41402abc4b2a76b9719d911017c591";//"5d41402abc4b2a76b9719d911017c592";
+std::string gstrctrlMD5 = "5d41402abc4b2a76b9719d911017c591";
+
+
+
+int gindex = 0;
+
+void ListenData(const char * strdata,const unsigned int nSize,const unsigned int index,const QDateTime * dt,const char * strmemname)
+{
+
+    int nsize = mvectormsgunit.size();
+    int i;
+    for(i=0;i<nsize;i++)
+    {
+        if(strncmp(strmemname,mvectormsgunit[i].mstrmsgname,255) == 0)
+        {
+            gMutexMsg.lock();
+            char * strtem = new char[nSize];
+            memcpy(strtem,strdata,nSize);
+            mvectormsgunit[i].mpstrmsgdata.reset(strtem);
+            mvectormsgunit[i].mndatasize = nSize;
+            mvectormsgunit[i].mbRefresh = true;
+            gMutexMsg.unlock();
+            break;
+        }
+    }
+}
+
+
+void sharectrlmsg(iv::cloud::cloudmsg * pxmsg)
+{
+    int i;
+    int nsize = pxmsg->xclouddata_size();
+    for(i=0;i<nsize;i++)
+    {
+        int j;
+        int nquerysize = mvectorctrlmsgunit.size();
+        for(j=0;j<nquerysize;j++)
+        {
+            if(strncmp(pxmsg->xclouddata(i).msgname().data(), mvectorctrlmsgunit[j].mstrmsgname,255) == 0)
+            {
+ //               qDebug("size is %d ",pxmsg->xclouddata(i).data().size());
+                iv::modulecomm::ModuleSendMsg(mvectorctrlmsgunit[j].mpa,pxmsg->xclouddata(i).data().data(),pxmsg->xclouddata(i).data().size());
+                break;
+            }
+        }
+    }
+}
+
+
+void threadupload()
+{
+    int nsize = mvectormsgunit.size();
+    int i;
+
+    int ninterval = atoi(gstruploadinterval.data());
+    if(ninterval<=0)ninterval = 100;
+
+    QTime xTime;
+    xTime.start();
+    int nlastsend = xTime.elapsed();
+
+    std::string target_str = gstrserverip+":";
+    target_str = target_str + gstrserverport ;//std::to_string()
+    auto cargs = grpc::ChannelArguments();
+    cargs.SetMaxReceiveMessageSize(1024 * 1024 * 1024); // 1 GB
+    cargs.SetMaxSendMessageSize(1024 * 1024 * 1024);
+
+    std::shared_ptr<Channel> channel = grpc::CreateCustomChannel(
+             target_str, grpc::InsecureChannelCredentials(),cargs);
+
+    std::unique_ptr<iv::Upload::Stub> stub_ = iv::Upload::NewStub(channel);
+
+
+    iv::UploadRequest request;
+
+    int nid = 0;
+
+    // Container for the data we expect from the server.
+    iv::UploadReply reply;
+
+    gpr_timespec timespec;
+      timespec.tv_sec = 30;//设置阻塞时间为2秒
+      timespec.tv_nsec = 0;
+      timespec.clock_type = GPR_TIMESPAN;
+
+ //   ClientContext context;
+
+
+
+    while(true)
+    {
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        if((xTime.elapsed()-nlastsend)<ninterval)
+        {
+            continue;
+        }
+
+            bool bImportant = false;
+            int nkeeptime = 0;
+            iv::cloud::cloudmsg xmsg;
+            xmsg.set_xtime(QDateTime::currentMSecsSinceEpoch());
+            gMutexMsg.lock();
+            for(i=0;i<nsize;i++)
+            {
+                if(mvectormsgunit[i].mbRefresh)
+                {
+                    mvectormsgunit[i].mbRefresh = false;
+                    if(mvectormsgunit[i].mbImportant)
+                    {
+                        bImportant = true;
+                    }
+                    if(mvectormsgunit[i].mnkeeptime > nkeeptime)
+                    {
+                        nkeeptime = mvectormsgunit[i].mnkeeptime;
+                    }
+                    iv::cloud::cloudunit xcloudunit;
+                    xcloudunit.set_msgname(mvectormsgunit[i].mstrmsgname);
+                    xcloudunit.set_data(mvectormsgunit[i].mpstrmsgdata.get(),mvectormsgunit[i].mndatasize);
+                    iv::cloud::cloudunit * pcu = xmsg.add_xclouddata();
+                    pcu->CopyFrom(xcloudunit);
+                }
+
+            }
+            gMutexMsg.unlock();
+
+            int nbytesize = xmsg.ByteSize();
+            char * strbuf = new char[nbytesize];
+            std::shared_ptr<char> pstrbuf;
+            pstrbuf.reset(strbuf);
+            if(xmsg.SerializeToArray(strbuf,nbytesize))
+            {
+
+                ClientContext context ;
+                context.set_deadline(timespec);
+                qint64 time1 = QDateTime::currentMSecsSinceEpoch();
+
+                request.set_id(nid);
+                request.set_ntime(time1);
+                request.set_strquerymd5(gstrqueryMD5);
+                request.set_strctrlmd5(gstrctrlMD5);
+                request.set_strvin(gstrVIN);
+                request.set_xdata(strbuf,nbytesize);
+                request.set_kepptime(nkeeptime);
+                request.set_bimportant(bImportant);
+                nid++;
+                // The actual RPC.
+                Status status = stub_->upload(&context, request, &reply);
+                if (status.ok()) {
+                    std::cout<<nid<<" upload successfully"<<std::endl;
+                    if(reply.nres() == 1)
+                    {
+                        iv::cloud::cloudmsg xmsg;
+                        if(xmsg.ParseFromArray(reply.xdata().data(),reply.xdata().size()))
+                        {
+                            sharectrlmsg(&xmsg);
+                        }
+                    }
+                } else {
+                  std::cout << status.error_code() << ": " << status.error_message()
+                            << std::endl;
+                  std::cout<<"RPC failed"<<std::endl;
+                  if(status.error_code() == 4)
+                  {
+                      std::cout<<" RPC Exceed Time, Create New stub_"<<std::endl;
+                      channel = grpc::CreateCustomChannel(
+                               target_str, grpc::InsecureChannelCredentials(),cargs);
+
+                      stub_ = iv::Upload::NewStub(channel);
+                  }
+                  std::this_thread::sleep_for(std::chrono::milliseconds(900));
+
+                }
+
+            }
+            nlastsend = xTime.elapsed();
+
+    }
+}
+
+
+void dec_yaml(const char * stryamlpath)
+{
+
+    YAML::Node config;
+    try
+    {
+        config = YAML::LoadFile(stryamlpath);
+    }
+    catch(YAML::BadFile e)
+    {
+        qDebug("load error.");
+        return;
+    }
+
+    std::vector<std::string> vecmodulename;
+
+
+    if(config["server"])
+    {
+        gstrserverip = config["server"].as<std::string>();
+    }
+    if(config["port"])
+    {
+        gstrserverport = config["port"].as<std::string>();
+    }
+    if(config["uploadinterval"])
+    {
+        gstruploadinterval = config["uploadinterval"].as<std::string>();
+    }
+
+    if(config["VIN"])
+    {
+        gstrVIN = config["VIN"].as<std::string>();
+    }
+
+    if(config["queryMD5"])
+    {
+        gstrqueryMD5 = config["queryMD5"].as<std::string>();
+    }
+    else
+    {
+        return;
+    }
+
+    if(config["ctrlMD5"])
+    {
+        gstrctrlMD5 = config["ctrlMD5"].as<std::string>();
+    }
+
+
+    std::string strmsgname;
+
+    if(config["uploadmessage"])
+    {
+
+        for(YAML::const_iterator it= config["uploadmessage"].begin(); it != config["uploadmessage"].end();++it)
+        {
+            std::string strtitle = it->first.as<std::string>();
+            std::cout<<strtitle<<std::endl;
+
+            if(config["uploadmessage"][strtitle]["msgname"]&&config["uploadmessage"][strtitle]["buffersize"]&&config["uploadmessage"][strtitle]["buffercount"])
+            {
+                iv::msgunit xmu;
+                strmsgname = config["uploadmessage"][strtitle]["msgname"].as<std::string>();
+                strncpy(xmu.mstrmsgname,strmsgname.data(),255);
+                xmu.mnBufferSize = config["uploadmessage"][strtitle]["buffersize"].as<int>();
+                xmu.mnBufferCount = config["uploadmessage"][strtitle]["buffercount"].as<int>();
+                if(config["uploadmessage"][strtitle]["bimportant"])
+                {
+                   std::string strimportant =    config["uploadmessage"][strtitle]["bimportant"].as<std::string>();
+                   if(strimportant == "true")
+                   {
+                       xmu.mbImportant = true;
+                   }
+                }
+                if(config["uploadmessage"][strtitle]["keeptime"])
+                {
+                   std::string strkeep =    config["uploadmessage"][strtitle]["keeptime"].as<std::string>();
+                   xmu.mnkeeptime = atoi(strkeep.data());
+                }
+                mvectormsgunit.push_back(xmu);
+            }
+        }
+    }
+    else
+    {
+
+
+    }
+
+    if(!config["ctrlMD5"])
+    {
+       return;
+    }
+
+    if(config["ctrlmessage"])
+    {
+        std::string strnodename = "ctrlmessage";
+        for(YAML::const_iterator it= config[strnodename].begin(); it != config[strnodename].end();++it)
+        {
+            std::string strtitle = it->first.as<std::string>();
+            std::cout<<strtitle<<std::endl;
+
+            if(config[strnodename][strtitle]["msgname"]&&config[strnodename][strtitle]["buffersize"]&&config[strnodename][strtitle]["buffercount"])
+            {
+                iv::msgunit xmu;
+                strmsgname = config[strnodename][strtitle]["msgname"].as<std::string>();
+                strncpy(xmu.mstrmsgname,strmsgname.data(),255);
+                xmu.mnBufferSize = config[strnodename][strtitle]["buffersize"].as<int>();
+                xmu.mnBufferCount = config[strnodename][strtitle]["buffercount"].as<int>();
+                mvectorctrlmsgunit.push_back(xmu);
+            }
+        }
+    }
+    else
+    {
+
+    }
+
+    return;
+
+}
+
+
+*/
+
+int main(int argc, char *argv[])
+{
+    showversion("driver_cloud_grpc_client");
+    QCoreApplication a(argc, argv);
+
+ //   std::thread * ptest = new std::thread(test);
+
+ //   return a.exec();
+
+    char stryamlpath[256];
+    if(argc<2)
+    {
+        snprintf(stryamlpath,255,"driver_cloud_grpc_client_stream.yaml");
+//        strncpy(stryamlpath,abs_ymlpath,255);
+    }
+    else
+    {
+        strncpy(stryamlpath,argv[1],255);
+    }
+//    dec_yaml(stryamlpath);
+
+    grpcclient * pgrpcclient = new grpcclient(stryamlpath);
+    pgrpcclient->start();
+
+//    int i;
+//    for(i=0;i<mvectormsgunit.size();i++)
+//    {
+//        mvectormsgunit[i].mpa = iv::modulecomm::RegisterRecv(mvectormsgunit[i].mstrmsgname,ListenData);
+//    }
+
+//    for(i=0;i<mvectorctrlmsgunit.size();i++)
+//    {
+//        mvectorctrlmsgunit[i].mpa = iv::modulecomm::RegisterSend(mvectorctrlmsgunit[i].mstrmsgname,mvectorctrlmsgunit[i].mnBufferSize,
+//                                                                 mvectorctrlmsgunit[i].mnBufferCount);
+//    }
+
+//    guploadthread = new std::thread(threadupload);
+
+    return a.exec();
+}

+ 3 - 0
src/driver/driver_cloud_grpc_client_stream/prototocpp.txt

@@ -0,0 +1,3 @@
+protoc -I . --plugin=protoc-gen-grpc=/home/yuchuli/git/grpc-framework/build2/grpc_cpp_plugin --grpc_out=. uploadmsg.proto
+
+protoc -I . --cpp_out=. uploadmsg.proto 

+ 73 - 0
src/driver/driver_cloud_grpc_client_stream/uploadstreammsg.proto

@@ -0,0 +1,73 @@
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "io.grpc.adc.uploadmsg";
+option java_outer_classname = "UploadMsgProto";
+option objc_class_prefix = "HLW";
+
+package iv;
+
+// The Upload service definition.
+service UploadStream {
+  // Sends a Upload
+  rpc upload (stream UploadRequestStream) returns (stream UploadReplyStream) {}
+  rpc queryctrl (stream queryReqStream) returns (stream queryReplyStream) {}
+  
+}
+
+// The request message containing the user's name.
+message UploadRequestStream {
+ // string name = 1;
+
+ //   int nres; //0 no message 1 have message
+    int32 id = 1;
+    int64 ntime = 2;
+    string strVIN = 3;
+    string strqueryMD5 = 4;
+    string strctrlMD5 = 5;
+    bytes xdata = 6;
+    bool bimportant = 7;  //if 1, is important.
+    int32 kepptime = 8;   //If important keep this data before query ms.
+}
+
+// The response message containing the greetings
+message UploadReplyStream {
+  int32 nres = 1;  //0 no message 1 have ctrl message
+  bytes xdata = 2;
+//  string message = 1;
+}
+
+
+message queryReqStream {
+  string strvin = 1;
+  string strqueryMD5 = 2;
+  int64 ntime = 3;  
+  string strctrlMD5 = 4;
+  bytes data = 5;
+  bool bimportant = 6;  //if 1, is important.
+  int32 kepptime = 7;   //If important keep this data before ctrl ms.  if -1 must send.
+  int32 ntype = 8;  //0 only query  1 ctrl.
+}
+
+message queryReplyStream {
+    int32 nres = 1;  //0 not online  1 online  -1 querMD5 error  -2 ctrlMD5 error 
+    int32 id = 2;
+    int64 ntime = 3;
+    bytes data = 4;
+}
+
+

+ 73 - 0
src/include/proto3/uploadstreammsg.proto

@@ -0,0 +1,73 @@
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "io.grpc.adc.uploadmsg";
+option java_outer_classname = "UploadMsgProto";
+option objc_class_prefix = "HLW";
+
+package iv;
+
+// The Upload service definition.
+service UploadStream {
+  // Sends a Upload
+  rpc upload (stream UploadRequestStream) returns (stream UploadReplyStream) {}
+  rpc queryctrl (stream queryReqStream) returns (stream queryReplyStream) {}
+  
+}
+
+// The request message containing the user's name.
+message UploadRequestStream {
+ // string name = 1;
+
+ //   int nres; //0 no message 1 have message
+    int32 id = 1;
+    int64 ntime = 2;
+    string strVIN = 3;
+    string strqueryMD5 = 4;
+    string strctrlMD5 = 5;
+    bytes xdata = 6;
+    bool bimportant = 7;  //if 1, is important.
+    int32 kepptime = 8;   //If important keep this data before query ms.
+}
+
+// The response message containing the greetings
+message UploadReplyStream {
+  int32 nres = 1;  //0 no message 1 have ctrl message
+  bytes xdata = 2;
+//  string message = 1;
+}
+
+
+message queryReqStream {
+  string strvin = 1;
+  string strqueryMD5 = 2;
+  int64 ntime = 3;  
+  string strctrlMD5 = 4;
+  bytes data = 5;
+  bool bimportant = 6;  //if 1, is important.
+  int32 kepptime = 7;   //If important keep this data before ctrl ms.  if -1 must send.
+  int32 ntype = 8;  //0 only query  1 ctrl.
+}
+
+message queryReplyStream {
+    int32 nres = 1;  //0 not online  1 online  -1 querMD5 error  -2 ctrlMD5 error 
+    int32 id = 2;
+    int64 ntime = 3;
+    bytes data = 4;
+}
+
+