123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596 |
- #ifndef _FASTDDS_RTPS_CACHECHANGE_H_
- #define _FASTDDS_RTPS_CACHECHANGE_H_
- #include <fastdds/rtps/common/Types.h>
- #include <fastdds/rtps/common/WriteParams.h>
- #include <fastdds/rtps/common/SerializedPayload.h>
- #include <fastdds/rtps/common/Time_t.h>
- #include <fastdds/rtps/common/InstanceHandle.h>
- #include <fastdds/rtps/common/FragmentNumber.h>
- #include <vector>
- namespace eprosima {
- namespace fastrtps {
- namespace rtps {
- enum RTPS_DllAPI ChangeKind_t
- {
- ALIVE,
- NOT_ALIVE_DISPOSED,
- NOT_ALIVE_UNREGISTERED,
- NOT_ALIVE_DISPOSED_UNREGISTERED
- };
- struct RTPS_DllAPI CacheChange_t
- {
-
- ChangeKind_t kind = ALIVE;
-
- GUID_t writerGUID;
-
- InstanceHandle_t instanceHandle;
-
- SequenceNumber_t sequenceNumber;
-
- SerializedPayload_t serializedPayload;
-
- bool isRead = false;
-
- Time_t sourceTimestamp;
-
- Time_t receptionTimestamp;
- WriteParams write_params;
- bool is_untyped_ = true;
-
- CacheChange_t()
- {
- }
- CacheChange_t(
- const CacheChange_t&) = delete;
- const CacheChange_t& operator =(
- const CacheChange_t&) = delete;
-
- CacheChange_t(
- uint32_t payload_size,
- bool is_untyped = false)
- : serializedPayload(payload_size)
- , is_untyped_(is_untyped)
- {
- }
-
- bool copy(
- const CacheChange_t* ch_ptr)
- {
- kind = ch_ptr->kind;
- writerGUID = ch_ptr->writerGUID;
- instanceHandle = ch_ptr->instanceHandle;
- sequenceNumber = ch_ptr->sequenceNumber;
- sourceTimestamp = ch_ptr->sourceTimestamp;
- write_params = ch_ptr->write_params;
- isRead = ch_ptr->isRead;
- fragment_size_ = ch_ptr->fragment_size_;
- fragment_count_ = ch_ptr->fragment_count_;
- first_missing_fragment_ = ch_ptr->first_missing_fragment_;
- return serializedPayload.copy(&ch_ptr->serializedPayload, !ch_ptr->is_untyped_);
- }
-
- void copy_not_memcpy(
- const CacheChange_t* ch_ptr)
- {
- kind = ch_ptr->kind;
- writerGUID = ch_ptr->writerGUID;
- instanceHandle = ch_ptr->instanceHandle;
- sequenceNumber = ch_ptr->sequenceNumber;
- sourceTimestamp = ch_ptr->sourceTimestamp;
- write_params = ch_ptr->write_params;
- isRead = ch_ptr->isRead;
-
- serializedPayload.encapsulation = ch_ptr->serializedPayload.encapsulation;
-
- setFragmentSize(ch_ptr->fragment_size_, false);
- }
- ~CacheChange_t()
- {
- }
-
- uint32_t getFragmentCount() const
- {
- return fragment_count_;
- }
-
- uint16_t getFragmentSize() const
- {
- return fragment_size_;
- }
-
- bool is_fully_assembled()
- {
- return first_missing_fragment_ >= fragment_count_;
- }
-
- void get_missing_fragments(
- FragmentNumberSet_t& frag_sns)
- {
-
- frag_sns.base(first_missing_fragment_ + 1);
-
- uint32_t current_frag = first_missing_fragment_;
- while (current_frag < fragment_count_)
- {
- frag_sns.add(current_frag + 1);
- current_frag = get_next_missing_fragment(current_frag);
- }
- }
-
- void setFragmentSize(
- uint16_t fragment_size,
- bool create_fragment_list = false)
- {
- fragment_size_ = fragment_size;
- fragment_count_ = 0;
- first_missing_fragment_ = 0;
- if (fragment_size > 0)
- {
-
- fragment_count_ = (serializedPayload.length + fragment_size - 1) / fragment_size;
- if (create_fragment_list)
- {
-
-
- size_t offset = 0;
- for (uint32_t i = 1; i <= fragment_count_; i++, offset += fragment_size_)
- {
- set_next_missing_fragment(i - 1, i);
- }
- }
- else
- {
-
-
- first_missing_fragment_ = fragment_count_;
- }
- }
- }
- bool add_fragments(
- const SerializedPayload_t& incoming_data,
- uint32_t fragment_starting_num,
- uint32_t fragments_in_submessage)
- {
- uint32_t original_offset = (fragment_starting_num - 1) * fragment_size_;
- uint32_t incoming_length = fragment_size_ * fragments_in_submessage;
- uint32_t last_fragment_index = fragment_starting_num + fragments_in_submessage - 1;
-
- if (last_fragment_index > fragment_count_)
- {
- return false;
- }
-
- if (last_fragment_index < fragment_count_)
- {
- if (incoming_data.length < incoming_length)
- {
- return false;
- }
- }
- else
- {
- incoming_length = serializedPayload.length - original_offset;
- }
- if (original_offset + incoming_length > serializedPayload.length)
- {
- return false;
- }
- if (received_fragments(fragment_starting_num - 1, fragments_in_submessage))
- {
- memcpy(
- &serializedPayload.data[original_offset],
- incoming_data.data, incoming_length);
- }
- return is_fully_assembled();
- }
- private:
-
- uint16_t fragment_size_ = 0;
-
- uint32_t fragment_count_ = 0;
-
- uint32_t first_missing_fragment_ = 0;
- uint32_t get_next_missing_fragment(
- uint32_t fragment_index)
- {
- uint32_t* ptr = next_fragment_pointer(fragment_index);
- return *ptr;
- }
- void set_next_missing_fragment(
- uint32_t fragment_index,
- uint32_t next_fragment_index)
- {
- uint32_t* ptr = next_fragment_pointer(fragment_index);
- *ptr = next_fragment_index;
- }
- uint32_t* next_fragment_pointer(
- uint32_t fragment_index)
- {
- size_t offset = fragment_size_;
- offset *= fragment_index;
- offset = (offset + 3) & ~3;
- return reinterpret_cast<uint32_t*>(&serializedPayload.data[offset]);
- }
-
- bool received_fragments(
- uint32_t initial_fragment,
- uint32_t num_of_fragments)
- {
- bool at_least_one_changed = false;
- if ( (fragment_size_ > 0) && (initial_fragment < fragment_count_) )
- {
- uint32_t last_fragment = initial_fragment + num_of_fragments;
- if (last_fragment > fragment_count_)
- {
- last_fragment = fragment_count_;
- }
- if (initial_fragment <= first_missing_fragment_)
- {
-
- while (first_missing_fragment_ < last_fragment)
- {
- first_missing_fragment_ = get_next_missing_fragment(first_missing_fragment_);
- at_least_one_changed = true;
- }
- }
- else
- {
-
- uint32_t current_frag = first_missing_fragment_;
- while (current_frag < initial_fragment)
- {
- uint32_t next_frag = get_next_missing_fragment(current_frag);
- if (next_frag >= initial_fragment)
- {
-
-
- uint32_t next_missing_fragment = next_frag;
- while (next_missing_fragment < last_fragment)
- {
- next_missing_fragment = get_next_missing_fragment(next_missing_fragment);
- at_least_one_changed = true;
- }
-
- if (at_least_one_changed)
- {
- set_next_missing_fragment(current_frag, next_missing_fragment);
- }
- break;
- }
- current_frag = next_frag;
- }
- }
- }
- return at_least_one_changed;
- }
- };
- #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
- enum ChangeForReaderStatus_t
- {
- UNSENT = 0,
- REQUESTED = 1,
- UNACKNOWLEDGED = 2,
- ACKNOWLEDGED = 3,
- UNDERWAY = 4
- };
- class ChangeForReader_t
- {
- friend struct ChangeForReaderCmp;
- public:
- ChangeForReader_t()
- : status_(UNSENT)
- , is_relevant_(true)
- , change_(nullptr)
- {
- }
- ChangeForReader_t(
- const ChangeForReader_t& ch)
- : status_(ch.status_)
- , is_relevant_(ch.is_relevant_)
- , seq_num_(ch.seq_num_)
- , change_(ch.change_)
- , unsent_fragments_(ch.unsent_fragments_)
- {
- }
-
-
- ChangeForReader_t(
- CacheChange_t* change)
- : status_(UNSENT)
- , is_relevant_(true)
- , seq_num_(change->sequenceNumber)
- , change_(change)
- {
- if (change->getFragmentSize() != 0)
- {
- unsent_fragments_.base(1u);
- unsent_fragments_.add_range(1u, change->getFragmentCount() + 1u);
- }
- }
- ChangeForReader_t(
- const SequenceNumber_t& seq_num)
- : status_(UNSENT)
- , is_relevant_(true)
- , seq_num_(seq_num)
- , change_(nullptr)
- {
- }
- ~ChangeForReader_t()
- {
- }
- ChangeForReader_t& operator =(
- const ChangeForReader_t& ch)
- {
- status_ = ch.status_;
- is_relevant_ = ch.is_relevant_;
- seq_num_ = ch.seq_num_;
- change_ = ch.change_;
- unsent_fragments_ = ch.unsent_fragments_;
- return *this;
- }
-
-
-
- CacheChange_t* getChange() const
- {
- return change_;
- }
- void setStatus(
- const ChangeForReaderStatus_t status)
- {
- status_ = status;
- }
- ChangeForReaderStatus_t getStatus() const
- {
- return status_;
- }
- void setRelevance(
- const bool relevance)
- {
- is_relevant_ = relevance;
- }
- bool isRelevant() const
- {
- return is_relevant_;
- }
- const SequenceNumber_t getSequenceNumber() const
- {
- return seq_num_;
- }
-
- void notValid()
- {
- is_relevant_ = false;
- change_ = nullptr;
- }
-
- bool isValid() const
- {
- return change_ != nullptr;
- }
- FragmentNumberSet_t getUnsentFragments() const
- {
- return unsent_fragments_;
- }
- void markAllFragmentsAsUnsent()
- {
- if (change_ != nullptr && change_->getFragmentSize() != 0)
- {
- unsent_fragments_.base(1u);
- unsent_fragments_.add_range(1u, change_->getFragmentCount() + 1u);
- }
- }
- void markFragmentsAsSent(
- const FragmentNumber_t& sentFragment)
- {
- unsent_fragments_.remove(sentFragment);
- if (!unsent_fragments_.empty() && unsent_fragments_.max() < change_->getFragmentCount())
- {
- FragmentNumber_t base = unsent_fragments_.base();
- FragmentNumber_t max = unsent_fragments_.max();
- assert(!unsent_fragments_.is_set(base));
-
- base = unsent_fragments_.min();
- unsent_fragments_.base_update(base);
-
- unsent_fragments_.add_range(max + 1u, change_->getFragmentCount() + 1u);
- }
- }
- void markFragmentsAsUnsent(
- const FragmentNumberSet_t& unsentFragments)
- {
- FragmentNumber_t other_base = unsentFragments.base();
- if (other_base < unsent_fragments_.base())
- {
- unsent_fragments_.base_update(other_base);
- }
- unsentFragments.for_each(
- [this](
- FragmentNumber_t element)
- {
- unsent_fragments_.add(element);
- });
- }
- private:
-
- ChangeForReaderStatus_t status_;
-
- bool is_relevant_;
-
- SequenceNumber_t seq_num_;
-
-
- CacheChange_t* change_;
- FragmentNumberSet_t unsent_fragments_;
- };
- struct ChangeForReaderCmp
- {
- bool operator ()(
- const ChangeForReader_t& a,
- const ChangeForReader_t& b) const
- {
- return a.seq_num_ < b.seq_num_;
- }
- };
- #endif
- }
- }
- }
- #endif
|