123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437 |
- // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- /**
- * @file RTPSWriter.h
- */
- #ifndef _FASTDDS_RTPS_RTPSWRITER_H_
- #define _FASTDDS_RTPS_RTPSWRITER_H_
- #include <fastdds/rtps/Endpoint.h>
- #include <fastdds/rtps/messages/RTPSMessageGroup.h>
- #include <fastdds/rtps/attributes/WriterAttributes.h>
- #include <fastrtps/qos/LivelinessLostStatus.h>
- #include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
- #include <fastdds/rtps/common/LocatorSelector.hpp>
- #include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp>
- #include <vector>
- #include <memory>
- #include <functional>
- #include <chrono>
- #include <mutex>
- namespace eprosima {
- namespace fastrtps {
- namespace rtps {
- class WriterListener;
- class WriterHistory;
- class FlowController;
- struct CacheChange_t;
- /**
- * Class RTPSWriter, manages the sending of data to the readers. Is always associated with a HistoryCache.
- * @ingroup WRITER_MODULE
- */
- class RTPSWriter : public Endpoint, public RTPSMessageSenderInterface
- {
- friend class WriterHistory;
- friend class RTPSParticipantImpl;
- friend class RTPSMessageGroup;
- friend class AsyncInterestTree;
- protected:
- RTPSWriter(
- RTPSParticipantImpl*,
- const GUID_t& guid,
- const WriterAttributes& att,
- WriterHistory* hist,
- WriterListener* listen = nullptr);
- virtual ~RTPSWriter();
- public:
- /**
- * Create a new change based with the provided changeKind.
- * @param data Data of the change.
- * @param changeKind The type of change.
- * @param handle InstanceHandle to assign.
- * @return Pointer to the CacheChange or nullptr if incorrect.
- */
- template<typename T>
- CacheChange_t* new_change(
- T& data,
- ChangeKind_t changeKind,
- InstanceHandle_t handle = c_InstanceHandle_Unknown)
- {
- return new_change([data]() -> uint32_t {
- return (uint32_t)T::getCdrSerializedSize(data);
- }, changeKind, handle);
- }
- RTPS_DllAPI CacheChange_t* new_change(
- const std::function<uint32_t()>& dataCdrSerializedSize,
- ChangeKind_t changeKind,
- InstanceHandle_t handle = c_InstanceHandle_Unknown);
- /**
- * Add a matched reader.
- * @param data Pointer to the ReaderProxyData object added.
- * @return True if added.
- */
- RTPS_DllAPI virtual bool matched_reader_add(
- const ReaderProxyData& data) = 0;
- /**
- * Remove a matched reader.
- * @param reader_guid GUID of the reader to remove.
- * @return True if removed.
- */
- RTPS_DllAPI virtual bool matched_reader_remove(
- const GUID_t& reader_guid) = 0;
- /**
- * Tells us if a specific Reader is matched against this writer.
- * @param reader_guid GUID of the reader to check.
- * @return True if it was matched.
- */
- RTPS_DllAPI virtual bool matched_reader_is_matched(
- const GUID_t& reader_guid) = 0;
- /**
- * Check if a specific change has been acknowledged by all Readers.
- * Is only useful in reliable Writer. In BE Writers returns false when pending to be sent.
- * @return True if acknowledged by all.
- */
- RTPS_DllAPI virtual bool is_acked_by_all(
- const CacheChange_t* /*a_change*/) const
- {
- return false;
- }
- /**
- * Waits until all changes were acknowledged or max_wait.
- * @return True if all were acknowledged.
- */
- RTPS_DllAPI virtual bool wait_for_all_acked(
- const Duration_t& /*max_wait*/)
- {
- return true;
- }
- /**
- * Update the Attributes of the Writer.
- * @param att New attributes
- */
- RTPS_DllAPI virtual void updateAttributes(
- const WriterAttributes& att) = 0;
- /**
- * This method triggers the send operation for unsent changes.
- * @return number of messages sent
- */
- RTPS_DllAPI virtual void send_any_unsent_changes() = 0;
- /**
- * Get Min Seq Num in History.
- * @return Minimum sequence number in history
- */
- RTPS_DllAPI SequenceNumber_t get_seq_num_min();
- /**
- * Get Max Seq Num in History.
- * @return Maximum sequence number in history
- */
- RTPS_DllAPI SequenceNumber_t get_seq_num_max();
- /**
- * Get maximum size of the serialized type
- * @return Maximum size of the serialized type
- */
- RTPS_DllAPI uint32_t getTypeMaxSerialized();
- //!Get maximum size of the data
- uint32_t getMaxDataSize();
- //! Calculates the maximum size of the data
- uint32_t calculateMaxDataSize(
- uint32_t length);
- /**
- * Get listener
- * @return Listener
- */
- RTPS_DllAPI inline WriterListener* getListener()
- {
- return mp_listener;
- }
- RTPS_DllAPI inline bool set_listener(
- WriterListener* listener)
- {
- mp_listener = listener;
- return true;
- }
- /**
- * Get the publication mode
- * @return publication mode
- */
- RTPS_DllAPI inline bool isAsync() const
- {
- return is_async_;
- }
- /**
- * Remove an specified max number of changes
- * @param max Maximum number of changes to remove.
- * @return at least one change has been removed
- */
- RTPS_DllAPI bool remove_older_changes(
- unsigned int max = 0);
- /**
- * Tries to remove a change waiting a maximum of the provided microseconds.
- * @param max_blocking_time_point Maximum time to wait for.
- * @param lock Lock of the Change list.
- * @return at least one change has been removed
- */
- virtual bool try_remove_change(
- const std::chrono::steady_clock::time_point& max_blocking_time_point,
- std::unique_lock<RecursiveTimedMutex>& lock) = 0;
- /*
- * Adds a flow controller that will apply to this writer exclusively.
- * @param controller
- */
- virtual void add_flow_controller(
- std::unique_ptr<FlowController> controller) = 0;
- /**
- * Get RTPS participant
- * @return RTPS participant
- */
- inline RTPSParticipantImpl* getRTPSParticipant() const
- {
- return mp_RTPSParticipant;
- }
- /**
- * Enable or disable sending data to readers separately
- * NOTE: This will only work for synchronous writers
- * @param enable If separate sending should be enabled
- */
- void set_separate_sending (
- bool enable)
- {
- m_separateSendingEnabled = enable;
- }
- /**
- * Inform if data is sent to readers separatedly
- * @return true if separate sending is enabled
- */
- bool get_separate_sending () const
- {
- return m_separateSendingEnabled;
- }
- /**
- * Process an incoming ACKNACK submessage.
- * @param[in] writer_guid GUID of the writer the submessage is directed to.
- * @param[in] reader_guid GUID of the reader originating the submessage.
- * @param[in] ack_count Count field of the submessage.
- * @param[in] sn_set Sequence number bitmap field of the submessage.
- * @param[in] final_flag Final flag field of the submessage.
- * @param[out] result true if the writer could process the submessage.
- * Only valid when returned value is true.
- * @return true when the submessage was destinated to this writer, false otherwise.
- */
- virtual bool process_acknack(
- const GUID_t& writer_guid,
- const GUID_t& reader_guid,
- uint32_t ack_count,
- const SequenceNumberSet_t& sn_set,
- bool final_flag,
- bool& result)
- {
- (void)reader_guid; (void)ack_count; (void)sn_set; (void)final_flag;
- result = false;
- return writer_guid == m_guid;
- }
- /**
- * Process an incoming NACKFRAG submessage.
- * @param[in] writer_guid GUID of the writer the submessage is directed to.
- * @param[in] reader_guid GUID of the reader originating the submessage.
- * @param[in] ack_count Count field of the submessage.
- * @param[in] seq_num Sequence number field of the submessage.
- * @param[in] fragments_state Fragment number bitmap field of the submessage.
- * @param[out] result true if the writer could process the submessage.
- * Only valid when returned value is true.
- * @return true when the submessage was destinated to this writer, false otherwise.
- */
- virtual bool process_nack_frag(
- const GUID_t& writer_guid,
- const GUID_t& reader_guid,
- uint32_t ack_count,
- const SequenceNumber_t& seq_num,
- const FragmentNumberSet_t fragments_state,
- bool& result)
- {
- (void)reader_guid; (void)ack_count; (void)seq_num; (void)fragments_state;
- result = false;
- return writer_guid == m_guid;
- }
- /**
- * @brief A method to retrieve the liveliness kind
- * @return Liveliness kind
- */
- const LivelinessQosPolicyKind& get_liveliness_kind() const;
- /**
- * @brief A method to retrieve the liveliness lease duration
- * @return Lease durtation
- */
- const Duration_t& get_liveliness_lease_duration() const;
- /**
- * @brief A method to return the liveliness announcement period
- * @return The announcement period
- */
- const Duration_t& get_liveliness_announcement_period() const;
- //! Liveliness lost status of this writer
- LivelinessLostStatus liveliness_lost_status_;
- /**
- * Check if the destinations managed by this sender interface have changed.
- *
- * @return true if destinations have changed, false otherwise.
- */
- bool destinations_have_changed() const override;
- /**
- * Get a GUID prefix representing all destinations.
- *
- * @return When all the destinations share the same prefix (i.e. belong to the same participant)
- * that prefix is returned. When there are no destinations, or they belong to different
- * participants, c_GuidPrefix_Unknown is returned.
- */
- GuidPrefix_t destination_guid_prefix() const override;
- /**
- * Get the GUID prefix of all the destination participants.
- *
- * @return a const reference to a vector with the GUID prefix of all destination participants.
- */
- const std::vector<GuidPrefix_t>& remote_participants() const override;
- /**
- * Get the GUID of all destinations.
- *
- * @return a const reference to a vector with the GUID of all destinations.
- */
- const std::vector<GUID_t>& remote_guids() const override;
- /**
- * Send a message through this interface.
- *
- * @param message Pointer to the buffer with the message already serialized.
- * @param max_blocking_time_point Future timepoint where blocking send should end.
- */
- bool send(
- CDRMessage_t* message,
- std::chrono::steady_clock::time_point& max_blocking_time_point) const override;
- protected:
- //!Is the data sent directly or announced by HB and THEN send to the ones who ask for it?.
- bool m_pushMode;
- //!WriterHistory
- WriterHistory* mp_history;
- //!Listener
- WriterListener* mp_listener;
- //!Asynchronous publication activated
- bool is_async_;
- //!Separate sending activated
- bool m_separateSendingEnabled;
- LocatorSelector locator_selector_;
- ResourceLimitedVector<GUID_t> all_remote_readers_;
- ResourceLimitedVector<GuidPrefix_t> all_remote_participants_;
- void add_guid(
- const GUID_t& remote_guid);
- void compute_selected_guids();
- void update_cached_info_nts();
- /**
- * Initialize the header of hte CDRMessages.
- */
- void init_header();
- /**
- * Add a change to the unsent list.
- * @param change Pointer to the change to add.
- * @param max_blocking_time
- */
- virtual void unsent_change_added_to_history(
- CacheChange_t* change,
- const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0;
- /**
- * Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
- * @param a_change Pointer to the change that is going to be removed.
- * @return True if removed correctly.
- */
- virtual bool change_removed_by_history(
- CacheChange_t* a_change) = 0;
- #if HAVE_SECURITY
- SerializedPayload_t encrypt_payload_;
- bool encrypt_cachechange(
- CacheChange_t* change);
- #endif
- //! The liveliness kind of this writer
- LivelinessQosPolicyKind liveliness_kind_;
- //! The liveliness lease duration of this writer
- Duration_t liveliness_lease_duration_;
- //! The liveliness announcement period
- Duration_t liveliness_announcement_period_;
- private:
- RTPSWriter& operator =(
- const RTPSWriter&) = delete;
- RTPSWriter* next_[2];
- };
- }
- } /* namespace rtps */
- } /* namespace eprosima */
- #endif /* _FASTDDS_RTPS_RTPSWRITER_H_ */
|