mod.rs

Overview

This file implements the Attestation Send Service, responsible for managing the creation, tracking, and distribution of attestations related to unfinalized blocks within the blockchain protocol. It periodically evaluates the state of candidate blocks, generates attestations as needed, and sends them to interested parties in the network. The service ensures that attestations are sent in compliance with protocol timing constraints and handles retransmission logic. It also collects and reports metrics associated with block attestation events.

The file primarily contains the AttestationSendService struct, which encapsulates the core logic, and the AttestationSendServiceHandler struct, which manages the service execution thread.


Key Components

Enum: AttestationAction

enum AttestationAction {
    ThisBlock(Envelope<GoshBLS, AttestationData>),
    // Fork variant commented out
}

Struct: TrackedState

struct TrackedState {
    block_state: BlockState,
    interested_parties_received_blocks: Option<HashSet<(NodeIdentifier, AttestationTargetType)>>,
    first_send_timestamp: Option<std::time::Instant>,
    last_send_timestamp: Option<std::time::Instant>,
    last_send_destinations: Option<HashSet<(NodeIdentifier, AttestationTargetType)>>,
}

Struct: AttestationSendService

pub struct AttestationSendService {
    pub pulse_timeout: Duration,
    resend_attestation_timeout: Duration,
    node_id: NodeIdentifier,
    thread_id: ThreadIdentifier,
    bls_keys_map: Arc<Mutex<HashMap<PubKey, (Secret, RndSeed)>>>,
    block_state_repository: BlockStateRepository,
    tracking: HashMap<BlockIdentifier, TrackedState>,
    block_state_repository_last_modified: u32,
    condidates_set_last_modified: u32,
    network_direct_tx: NetDirectSender<NodeIdentifier, NetworkMessage>,
    start_time: Instant,
    _rng: SmallRng,
    metrics: Option<BlockProductionMetrics>,
    authority: Arc<Mutex<Authority>>,
}

Methods of AttestationSendService

evaluate

pub fn evaluate(
    &mut self,
    candidates: &UnfinalizedBlocksSnapshot,
    loopback_attestations: Arc<Mutex<CollectedAttestations>>,
    candidate_block_repository: &impl Repository<CandidateBlock = Envelope<GoshBLS, AckiNackiBlock>>,
    deadline: Instant,
) -> Instant
let next_deadline = attestation_service.evaluate(
    &candidates_snapshot,
    loopback_attestations.clone(),
    &candidate_repo,
    current_deadline,
);

try_get_attestation

fn try_get_attestation(&self, state: &TrackedState) -> anyhow::Result<Envelope<GoshBLS, AttestationData>>

try_get_fallback_attestation

fn try_get_fallback_attestation(&self, state: &TrackedState) -> anyhow::Result<Envelope<GoshBLS, AttestationData>>

pulse

fn pulse(
    &mut self,
    loopback_attestations: Arc<Mutex<CollectedAttestations>>,
    _candidate_block_repository: &impl Repository<CandidateBlock = Envelope<GoshBLS, AckiNackiBlock>>,
) -> Instant

send_block_attestation

fn send_block_attestation(
    &self,
    destination_node_id: NodeIdentifier,
    attestation: AttestationAction,
) -> anyhow::Result<()>

generate_attestation

pub fn generate_attestation(
    bls_keys_map: Arc<Mutex<HashMap<PubKey, (Secret, RndSeed)>>>,
    node_id: &NodeIdentifier,
    block_state: &BlockState,
    attestation_target_type: AttestationTargetType,
) -> anyhow::Result<Envelope<GoshBLS, AttestationData>>

stop_tracking_finalized_and_invalidated_candidates

fn stop_tracking_finalized_and_invalidated_candidates(&mut self)

append_for_tracking

fn append_for_tracking(&mut self, candidates: &UnfinalizedBlocksSnapshot)

update_interested_parties_received_blocks

fn update_interested_parties_received_blocks(&mut self, candidates: &UnfinalizedBlocksSnapshot)

collect_attested_blocks_to_receiver

fn collect_attested_blocks_to_receiver(
    &mut self,
    candidate: &BlockState,
) -> HashMap<BlockIdentifier, (NodeIdentifier, AttestationTargetType)>

update_tracking

fn update_tracking(
    &mut self,
    attested_blocks_to_parties: HashMap<BlockIdentifier, (NodeIdentifier, AttestationTargetType)>,
)

handle_attestation_metrics

fn handle_attestation_metrics(&self, block_id: &BlockIdentifier)

handle_attestation_metrics_inner

fn handle_attestation_metrics_inner(&self, block_id: &BlockIdentifier) -> anyhow::Result<()>

Struct: AttestationSendServiceHandler

pub struct AttestationSendServiceHandler {
    _service_handler: std::thread::JoinHandle<()>,
}

Methods of AttestationSendServiceHandler

new

pub fn new(
    mut attestation_sender_service: AttestationSendService,
    repository: RepositoryImpl,
    last_block_attestations: Arc<Mutex<CollectedAttestations>>,
    block_state_repository: BlockStateRepository,
    unprocessed_blocks_cache: UnfinalizedCandidateBlockCollection,
) -> Self

Implementation Details and Algorithms


Interaction with Other System Components


Visual Diagram: Class Diagram of AttestationSendService

classDiagram
class AttestationSendService {
+pulse_timeout: Duration
-resend_attestation_timeout: Duration
-node_id: NodeIdentifier
-thread_id: ThreadIdentifier
-bls_keys_map: Arc<Mutex<HashMap>>
-block_state_repository: BlockStateRepository
-tracking: HashMap<BlockIdentifier, TrackedState>
-network_direct_tx: NetDirectSender
-start_time: Instant
-_rng: SmallRng
-metrics: Option<BlockProductionMetrics>
-authority: Arc<Mutex<Authority>>
+evaluate()
-try_get_attestation()
-try_get_fallback_attestation()
-pulse()
-send_block_attestation()
+generate_attestation()
-stop_tracking_finalized_and_invalidated_candidates()
-append_for_tracking()
-update_interested_parties_received_blocks()
-collect_attested_blocks_to_receiver()
-update_tracking()
-handle_attestation_metrics()
}
class TrackedState {
-block_state: BlockState
-interested_parties_received_blocks: Option<HashSet>
-first_send_timestamp: Option<Instant>
-last_send_timestamp: Option<Instant>
-last_send_destinations: Option<HashSet>
+getters()
+setters()
}
AttestationSendService "1" *-- "many" TrackedState : tracking

Usage Example of Service Initialization

let attestation_service = AttestationSendService::builder()
    .pulse_timeout(Duration::from_millis(100))
    .resend_attestation_timeout(Duration::from_secs(10))
    .node_id(my_node_id)
    .thread_id(my_thread_id)
    .bls_keys_map(arc_bls_keys_map)
    .block_state_repository(block_state_repo)
    .network_direct_tx(net_sender)
    .authority(arc_authority)
    .build();

let handler = AttestationSendServiceHandler::new(
    attestation_service,
    repository_impl,
    last_block_attestations,
    block_state_repository,
    unprocessed_blocks_cache,
);

This starts the service in a dedicated thread that will handle attestations autonomously.


Notes


For deeper understanding of related data structures such as BlockState or Envelope, and cryptographic components like GoshBLS, refer to associated modules and topics like block flow trace and BLS Signature Scheme.