|
@@ -0,0 +1,216 @@
|
|
|
+// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
|
|
|
+//
|
|
|
+// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
+// you may not use this file except in compliance with the License.
|
|
|
+// You may obtain a copy of the License at
|
|
|
+//
|
|
|
+// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+//
|
|
|
+// Unless required by applicable law or agreed to in writing, software
|
|
|
+// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+// See the License for the specific language governing permissions and
|
|
|
+// limitations under the License.
|
|
|
+
|
|
|
+/*!
|
|
|
+ * @file TopicsSubscriber.cpp
|
|
|
+ * This file contains the implementation of the subscriber functions.
|
|
|
+ *
|
|
|
+ * This file was generated by the tool fastcdrgen.
|
|
|
+ */
|
|
|
+
|
|
|
+#include <fastrtps/participant/Participant.h>
|
|
|
+#include <fastrtps/attributes/ParticipantAttributes.h>
|
|
|
+#include <fastrtps/subscriber/Subscriber.h>
|
|
|
+#include <fastrtps/attributes/SubscriberAttributes.h>
|
|
|
+
|
|
|
+#include <fastrtps/Domain.h>
|
|
|
+
|
|
|
+#include <fastcdr/FastBuffer.h>
|
|
|
+#include <fastcdr/FastCdr.h>
|
|
|
+#include <fastcdr/Cdr.h>
|
|
|
+
|
|
|
+#include <fastrtps/participant/Participant.h>
|
|
|
+#include <fastrtps/attributes/ParticipantAttributes.h>
|
|
|
+#include <fastrtps/attributes/PublisherAttributes.h>
|
|
|
+#include <fastrtps/publisher/Publisher.h>
|
|
|
+#include <fastrtps/Domain.h>
|
|
|
+#include <fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h>
|
|
|
+#include <fastdds/rtps/transport/UDPv4TransportDescriptor.h>
|
|
|
+#include <fastdds/rtps/transport/TCPv4TransportDescriptor.h>
|
|
|
+
|
|
|
+#include "TopicsSubscriber.h"
|
|
|
+
|
|
|
+using namespace eprosima::fastrtps;
|
|
|
+using namespace eprosima::fastrtps::rtps;
|
|
|
+
|
|
|
+using namespace eprosima::fastdds::rtps;
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+#include <QDateTime>
|
|
|
+
|
|
|
+TopicsSubscriber::TopicsSubscriber() : mp_participant(nullptr), mp_subscriber(nullptr) {}
|
|
|
+
|
|
|
+TopicsSubscriber::~TopicsSubscriber() { Domain::removeParticipant(mp_participant);}
|
|
|
+
|
|
|
+bool TopicsSubscriber::init(const char * strtopic,const char * strpubip,const unsigned short nPort)
|
|
|
+{
|
|
|
+
|
|
|
+ strncpy(mstrtopic,strtopic,255);
|
|
|
+ mnPort = nPort;
|
|
|
+ // Create RTPSParticipant
|
|
|
+
|
|
|
+ ParticipantAttributes PParam;
|
|
|
+ PParam.rtps.setName("Participant_subscriber"); //You can put the name you want
|
|
|
+
|
|
|
+ PParam.rtps.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::SIMPLE;
|
|
|
+ PParam.rtps.builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol = true;
|
|
|
+ PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationReaderANDSubscriptionWriter = true;
|
|
|
+ PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationWriterANDSubscriptionReader = true;
|
|
|
+ PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite;
|
|
|
+
|
|
|
+ // SharedMem transport configuration
|
|
|
+ PParam.rtps.useBuiltinTransports = false;
|
|
|
+
|
|
|
+ auto tcp2_transport = std::make_shared<TCPv4TransportDescriptor>();
|
|
|
+
|
|
|
+ //Set initial peers.
|
|
|
+ Locator_t initial_peer_locator;
|
|
|
+ initial_peer_locator.kind = LOCATOR_KIND_TCPv4;
|
|
|
+ IPLocator::setIPv4(initial_peer_locator, strpubip);
|
|
|
+ initial_peer_locator.port = mnPort;
|
|
|
+ PParam.rtps.builtin.initialPeersList.push_back(initial_peer_locator);
|
|
|
+
|
|
|
+// PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite;
|
|
|
+// PParam.rtps.builtin.discovery_config.leaseDuration_announcementperiod = Duration_t(5, 0);
|
|
|
+// PParam.rtps.setName("Participant_sub");
|
|
|
+
|
|
|
+ tcp2_transport->wait_for_tcp_negotiation = false;
|
|
|
+ //Link the Transport Layer to the Participant.
|
|
|
+ PParam.rtps.userTransports.push_back(tcp2_transport);
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ mp_participant = Domain::createParticipant(PParam);
|
|
|
+ if(mp_participant == nullptr)
|
|
|
+ {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ //Register the type
|
|
|
+
|
|
|
+ Domain::registerType(mp_participant, static_cast<TopicDataType*>(&myType));
|
|
|
+
|
|
|
+ // Create Subscriber
|
|
|
+
|
|
|
+ SubscriberAttributes Rparam;
|
|
|
+ Rparam.topic.topicKind = NO_KEY;
|
|
|
+ Rparam.topic.topicDataType = myType.getName(); //Must be registered before the creation of the subscriber
|
|
|
+ Rparam.topic.topicName = strtopic;//"TopicsPubSubTopic";
|
|
|
+
|
|
|
+ Rparam.topic.historyQos.depth = 30;
|
|
|
+ Rparam.topic.resourceLimitsQos.max_samples = 50;
|
|
|
+ Rparam.topic.resourceLimitsQos.allocated_samples = 20;
|
|
|
+
|
|
|
+ Rparam.qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
|
|
|
+
|
|
|
+
|
|
|
+ mp_subscriber = Domain::createSubscriber(mp_participant,Rparam, static_cast<SubscriberListener*>(&m_listener));
|
|
|
+ if(mp_subscriber == nullptr)
|
|
|
+ {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
+void TopicsSubscriber::SubListener::onSubscriptionMatched(Subscriber* sub,MatchingInfo& info)
|
|
|
+{
|
|
|
+ (void)sub;
|
|
|
+
|
|
|
+ if (info.status == MATCHED_MATCHING)
|
|
|
+ {
|
|
|
+ n_matched++;
|
|
|
+ std::cout << "Subscriber matched" << std::endl;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ n_matched--;
|
|
|
+ std::cout << "Subscriber unmatched" << std::endl;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+void TopicsSubscriber::SubListener::setReceivedTopicFunction(ModuleFun xFun)
|
|
|
+{
|
|
|
+ mFun = xFun;
|
|
|
+ mbSetFun = true;
|
|
|
+}
|
|
|
+void TopicsSubscriber::SubListener::onNewDataMessage(Subscriber* sub)
|
|
|
+{
|
|
|
+ // Take data
|
|
|
+ TopicSample::Message st;
|
|
|
+ static int ncount = 0;
|
|
|
+
|
|
|
+ static int nmaxlatancy = 0;
|
|
|
+
|
|
|
+ std::cout<<"new msg"<<std::endl;
|
|
|
+
|
|
|
+
|
|
|
+// char * strbuf = new char[1000000];
|
|
|
+// eprosima::fastcdr::FastBuffer pbuf(strbuf,1000000);
|
|
|
+// eprosima::fastcdr::Cdr * pxcdr;//
|
|
|
+// pxcdr = new eprosima::fastcdr::Cdr(pbuf);
|
|
|
+
|
|
|
+// if(sub->takeNextData(pxcdr, &m_info))
|
|
|
+// {
|
|
|
+// if(m_info.sampleKind == ALIVE)
|
|
|
+// {
|
|
|
+// // Print your structure data here.
|
|
|
+// ++n_msg;
|
|
|
+// std::cout << "Sample received, count=" << n_msg<<std::endl;
|
|
|
+// st.deserialize(*pxcdr);
|
|
|
+
|
|
|
+// std::cout<<" size is "<<TopicSample::Message::getCdrSerializedSize(st)<<std::endl;
|
|
|
+// }
|
|
|
+// }
|
|
|
+
|
|
|
+// return;
|
|
|
+
|
|
|
+// sub->get_first_untaken_info(&m_info);
|
|
|
+ std::cout<<"count is "<<sub->getUnreadCount()<<std::endl;
|
|
|
+
|
|
|
+ if(sub->takeNextData(&st, &m_info))
|
|
|
+ {
|
|
|
+ if(m_info.sampleKind == ALIVE)
|
|
|
+ {
|
|
|
+ // Print your structure data here.
|
|
|
+ ++n_msg;
|
|
|
+ ncount++;
|
|
|
+ std::cout << "Sample received, count=" << st.counter() <<" total: "<<ncount<<std::endl;
|
|
|
+ qint64 timex = QDateTime::currentMSecsSinceEpoch();
|
|
|
+ int nlatancy = (timex - st.sendtime());
|
|
|
+ if(nlatancy>nmaxlatancy)nmaxlatancy = nlatancy;
|
|
|
+ std::cout<<" latency is "<<nlatancy<<" max: "<<nmaxlatancy<<std::endl;
|
|
|
+ std::cout<<" size is "<<st.xdata().size()<<std::endl;
|
|
|
+ QDateTime dt = QDateTime::fromMSecsSinceEpoch(st.sendtime());
|
|
|
+ if(mbSetFun) mFun((char *)(st.xdata().data()),st.xdata().size(),st.counter(),&dt,st.msgname().data());
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void TopicsSubscriber::setReceivedTopicFunction(ModuleFun xFun)
|
|
|
+{
|
|
|
+ m_listener.setReceivedTopicFunction(xFun);
|
|
|
+}
|
|
|
+
|
|
|
+void TopicsSubscriber::run()
|
|
|
+{
|
|
|
+ std::cout << "Waiting for Data, press Enter to stop the Subscriber. "<<std::endl;
|
|
|
+ std::cin.ignore();
|
|
|
+ std::cout << "Shutting down the Subscriber." << std::endl;
|
|
|
+}
|
|
|
+
|