NOS Testbed (NOS-T)

Contents:

  • Overview
  • Installation
  • Learning Resources
    • Science Event Dashboard Test Suite
    • FireSat+ Test Suite
    • Downlink Test Suite
      • Introduction
        • NOS-T
        • Downlink Test Suite
      • Setup
        • Integrated Development Environment Installation
        • NOS-T Tools Installation
        • RabbitMQ Event Broker Setup
        • Repository Cloning
      • Component Applications Overview
        • SatelliteStorage
        • Manager
      • Execution
        • YAML Configuration File
        • Cesium Access Token and Assets
        • Setting Up Environment Files
        • Executing the Downlink Test Suite
      • Conclusion
  • Operator’s Guide
  • API Reference
  • Example Test Suites
  • Resources Library
  • Release Documents
  • Publications
  • Contributing
  • Frequently Asked Questions (FAQ)
NOS Testbed (NOS-T)
  • Learning Resources
  • Downlink Test Suite
  • View page source

Downlink Test Suite

This tutorial contains information for those who are just starting out and builds up to show how complex test suites can be built.

Introduction

NOS-T

The New Observing Strategies Testbed (NOS-T) is a computational environment designed to develop, test, mature, and socialize innovative operating concepts and technologies for NASA’s New Observing Strategies (NOS). It provides a robust infrastructure that enables the integration and orchestration of user-contributed applications to simulate system-of-systems test cases, allowing for true distributed control over constituent systems. This environment empowers principal investigators to conduct virtual Earth science mission simulations, systematically varying parameters to evaluate the effectiveness of proposed observing strategies. The resulting data and outcomes support the advancement of technology readiness levels and foster innovation in Earth science measurement techniques.

At the core of NOS-T is a loosely coupled, event-driven architecture (EDA), where applications communicate state changes through event notifications transmitted over a network. These events, encapsulated as message payloads, carry the necessary data to reflect system state transitions. This architecture enhances scalability and reliability by replicating event-handling functions across infrastructure instances, while maintaining modularity through a simple event-handling interface. NOS-T also aligns with service-oriented architecture (SOA) principles, as applications can trigger services in response to specific events.

A key component of this architecture is the event broker (or message broker), which centralizes communication by managing the exchange of event notifications between applications. Rather than requiring direct connections between all applications, each client connects only to the broker, simplifying the communication topology and improving system manageability. This design enables seamless coordination among user applications and the NOS-T manager application via a shared information system infrastructure, facilitating the execution of complex, distributed Earth science mission scenarios.

Downlink Test Suite

The Downlink test suite demonstrates how to model satellite data downlink operations using the NOS-T Tools library. This test suite consists of four main applications:

  • SatelliteStorage: Models satellites that generate and store data, then downlink when in contact with ground stations

  • Grounds: Models a network of ground stations that receive data from satellites

  • Outages: Introduces random outages in ground station operations

  • Manager: Orchestrates the simulation timing and execution

  • Scoreboard: Geospatial visualization tool that displays the data from the other applications

The operational concept for the Downlink test suite is that satellites generate data during their orbits and must downlink this data when they come in range of operational ground stations. Ground stations may experience random outages, creating realistic constraints on data downlink opportunities.

Setup

This section will show you how to set up NOS-T assuming you are a beginner to both coding and the testbed. The setup phase involves:

  1. Integrated Development Environment Installation

  2. NOS-T Tools Installation

  3. RabbitMQ Event Broker Setup

  4. Repository Cloning

Integrated Development Environment Installation

An Integrated Development Environment (IDE) will make developing applications and interacting with NOS-T much easier. Below is a list of recommended IDEs:

  • Visual Studio Code (VS Code): Lightweight, highly customizable, supports many languages via extensions.

  • IntelliJ IDEA: Excellent for Java and Kotlin, with strong support for other JVM languages.

  • Eclipse: Popular for Java development, also supports C/C++, Python, and more.

  • PyCharm: Feature-rich Python IDE by JetBrains, great for web development and data science.

Note

Users are encouraged to use VS Code due to its lightweight design, ease of setup, and strong community support. It also offers built-in support for Jupyter notebooks, making it especially helpful for getting started with NOS-T development.


NOS-T Tools Installation

The NOS-T Tools library is available on PyPi and can be installed using pip, the standard package manager for Python or conda. Below are the instructions for both methods.

Pip

To install NOS-T tools using pip, follow these steps:

  1. Upgrade pip to the latest version:

python -m pip install --upgrade pip
  1. Install NOS-T Tools:

python3 -m pip install nost-tools

To install additional libraries required to run the NOS-T Tools examples:

python3 -m pip install "nost_tools[examples]"

Conda

For instructions on how to install Conda, see the: Conda documentation.

To install NOS-T tools using conda:

  1. Create a new conda environment (recommended):

conda create -n nost python=3.11
conda activate nost
  1. Install NOS-T tools:

python3 -m pip install nost-tools

To install additional libraries required to run the examples:

python3 -m pip install "nost_tools[examples]"

Note

Following the instructions above will install the Python packages that the test suite depends on to run. The details of these dependencies, including version numbers, can otherwise be found in the requirements file.


RabbitMQ Event Broker Setup

Refer to the RabbitMQ Broker on Local Host guide for instructions on how to set up a RabbitMQ event broker locally.

Important

The test suite uses a RabbitMQ event broker to facilitate communication between applications. Ensure that your RabbitMQ broker is running and accessible before proceeding with the test suite.


Repository Cloning

The recommended way to access the example code is by cloning the NOS-T Tools repository from GitHub. If you’re unfamiliar with how to clone a Git repository, you can find detailed instructions here.

Clone the repository:

git clone git@github.com:code-lab-org/nost-tools.git

If the above fails, you can also try the HTTPS version:

git clone https://github.com/code-lab-org/nost-tools.git

This will create a directory called nost-tools in your current working directory. Inside this directory, you will find the example code under the examples/downlink/ folder.

Component Applications Overview

The Downlink test suite consists of several components. Below is an overview of the main applications involved in the FireSat+ test suite: SatelliteStorage and Manager.

Note

The Grounds, Outages, and Scoreboard applications are not covered in this tutorial, but you can find more information about them here.

SatelliteStorage

A key component of the Downlink test suite is the SatelliteStorage application (main_satelliteStorage.py). This application models satellites generating and storing data, and downlinking when in view of ground stations. You will be guided through the how each code block works, to help understand the purpose of different components in an application.

The first section of the code includes import statements that bring in the necessary dependencies for building the application. The imports at the top are standard Python libraries, while those at the bottom are sourced from the NOS-T tools library.

import logging
from datetime import timedelta
import pandas as pd
from skyfield.api import EarthSatellite, load, wgs84

from nost_tools.application_utils import ShutDownObserver
from nost_tools.configuration import ConnectionConfig
from nost_tools.entity import Entity
from nost_tools.managed_application import ManagedApplication
from nost_tools.publisher import WallclockTimeIntervalPublisher

The next section imports the message schema configuration, which defines the structure for how Satellites transmit data.

from satelliteStorage_config_files.schemas import (
    GroundLocation,
    LinkCharge,
    LinkStart,
    OutageReport,
    OutageRestore,
    SatelliteAllReady,
    SatelliteReady,
    SatelliteState,
)

A logger is initialized for monitoring the behavior and flow of the program. You can find more details about the different logging levels here.

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

Following that, the function get_elevation_angle calculates the elevation angle between a satellite and a ground station. This is crucial for determining whether the satellite can downlink data to the ground station. It takes three parameters: t (time), sat (Skyview EarthSatellite object), and loc (geographic location on surface specified by latitude-longitude). The function returns the elevation angle in degrees. These inputs are used in mathematical computations to determine and return the elevation angle of a satellite at a given time and location, with respoect to the topocentric horizon.

def get_elevation_angle(t, sat, loc):
    """
    Returns the elevation angle (degrees) of satellite with respect to the topocentric horizon

    Args:
        t (:obj:`Time`): Time object of skyfield.timelib module
        sat (:obj:`EarthSatellite`): Skyview EarthSatellite object from skyfield.sgp4lib module
        loc (:obj:`GeographicPosition`): Geographic location on surface specified by latitude-longitude from skyfield.toposlib module

    Returns:
        float : alt.degrees
            Elevation angle (degrees) of satellite with respect to the topocentric horizon
    """
    difference = sat - loc
    topocentric = difference.at(t)
    # NOTE: Topos uses term altitude for what we are referring to as elevation
    alt, az, distance = topocentric.altaz()
    return alt.degrees

Next, the check_in_range function checks if a satellite is within range of a ground station based on the elevation angle. It takes three parameters: t (time), sat (Skyview EarthSatellite object), and grounds (dataframe of ground station locations, minimum elevation angles for communication, and operational status). The function returns a boolean indicating whether the satellite is in range of the ground station and the ground ID number.

def check_in_range(t, satellite, grounds):
    """
    Checks if the satellite is in range of any of the operational ground stations

    Args:
        t (:obj:`Time`): Time object of skyfield.timelib module
        satellite (:obj:`EarthSatellite`): Skyview EarthSatellite object from skyfield.sgp4lib module
        grounds (:obj:`DataFrame`): Dataframe of ground station locations, minimum elevation angles for communication, and operational status (T/F)

    Returns:
        bool, int :
            isInRange
                True/False indicating visibility of satellite to any operational ground station
            groundId
                groundId of the ground station currently in comm range (NOTE: If in range of two ground stations simultaneously, will return first groundId)
    """
    isInRange = False
    groundId = None
    for k, ground in grounds.iterrows():
        if ground.operational:
            groundLatLon = wgs84.latlon(ground.latitude, ground.longitude)
            satelliteElevation = get_elevation_angle(t, satellite, groundLatLon)
            if satelliteElevation >= ground.elevAngle:
                isInRange = True
                groundId = k
                break
    return isInRange, groundId

SatelliteStorage class

The next section of code blocks define the SatelliteStorage class. In object-oriented programming, a class is a replicable object that can be assigned unique parameters to generate a diverse collection of similar objects.

The SatelliteStorage class leverages the NOS-T tools library ‘Entity’ object class to construct the constellation chain.

The first two functions in the Constellation class, init and initialize, prepare the test run for startup by initializing data.

class SatelliteStorage(Entity):
    """
    *This object class inherits properties from the Entity object class in the NOS-T tools library*

    Args:
        cName (str): A string containing the name for the application that tracks the constituent EarthSatellites for this simulation
        app (:obj:`ManagedApplication`): An application containing a test-run namespace, a name and description for the app, client credentials, and simulation timing instructions
        id (:obj:`list`): List of unique *int* ids for each constituent satellite
        names (:obj:`list`): List of unique *str* for each  constituent satellite (must be same length as **id**)
        ES (:obj:`list`): Optional list of :obj:`EarthSatellite` objects to be included in the simulation (NOTE: at least one of **ES** or **tles** MUST be specified, or an exception will be thrown)
        tles (:obj:`list`): Optional list of Two-Line Element *str* to be converted into :obj:`EarthSatellite` objects and included in the simulation

    Attributes:
        groundTimes (:obj:`dict`): Dictionary with keys corresponding to unique satellite name and values corresponding to sequential list of ground access opportunities - *NOTE:* each value is initialized as an empty :obj:`list` and opportunities are appended chronologically
        grounds (:obj:`DataFrame`): Dataframe containing information about ground stations with unique groundId (*int*), latitude-longitude location (:obj:`GeographicPosition`), min_elevation (*float*) angle constraints, and operational status (*bool*) - *NOTE:* initialized as **None**
        satellites (:obj:`list`): List of constituent :obj:`EarthSatellite` objects - *NOTE:* must be same length as **id**
        positions (:obj:`list`): List of current latitude-longitude-altitude locations (:obj:`GeographicPosition`) of each constituent satellite - *NOTE:* must be same length as **id**
        next_positions (:obj:`list`): List of next latitude-longitude-altitude locations (:obj:`GeographicPosition`) of each constituent satellite - *NOTE:* must be same length as **id**
        ssr_capacity (:obj:`list`): List of **fixed** Solid-State Recorder (SSR) capacities in Gigabits (*int*) for each constituent satellite - *NOTE:* must be same length as **id**
        capacity_used (:obj:`list`): list of values (*float*) representing current fraction of SSR capacity used for each constituent satellite, continuously updated through simulation - *NOTE:* must be same length as **id**
        instrument_rates (:obj:`list`): list of **fixed** instrument data collection rates in Gigabits/second (*float*) for each constituent satellite - *NOTE:* must be same length as **id**
        cost_mode (:obj:`list`): list of *str* representing one of three cost modes used to update cumulative costs: :obj:`discrete` (per downlink), :obj:`continuous` (fixed contract), or :obj:`both` - *NOTE:* must be same length as **id**
        fixed_rates (:obj:`list`): list of **fixed** rates of cost accumulation in dollars/second for :obj:`continuous` cost_mode for each constituent satellite - *NOTE:* must be same length as **id**, but ignored if cost_mode for corresponding satellite is :obj:`discrete`
        linkCounts (:obj:`list`): list of cumulative counts of link opportunies (*int*) for each constituent satellite - *NOTE:* must be same length as **id**, initialized as list of zeros
        linkStatus (:obj:`list`): list of states (*bool*) indicating whether or not each constituent satellite is currently in view of an available ground station - *NOTE:* must be same length as **id**, each satellite state initialized as :obj:`False`
        cumulativeCostBySat (:obj:`dict`): Dictionary with keys corresponding to unique satellite name and values corresponding to current cumulative costs in dollars (*float*) accrued by each constituent satellite, continuously updated throughout the simulation - *NOTE:* must be same length as **id**, each satellite cost initialized as zero dollars
        cumulativeCosts (float): Sum of values in cumulativeCostBySat in dollars, continuously updated throughout the simulation

    """

    ts = load.timescale()
    PROPERTY_POSITION = "position"
    PROPERTY_LINKCHARGE = "linkCharge"

    def __init__(self, cName, app, id, names, ES=None, tles=None):
        super().__init__(cName)
        self.app = app
        self.id = id
        self.names = names
        self.groundTimes = {j: [] for j in self.names}
        self.grounds = None
        self.satellites = []
        if ES is not None:
            for satellite in ES:
                self.satellites.append(satellite)
        if tles is not None:
            for i, tle in enumerate(tles):
                self.satellites.append(
                    EarthSatellite(tle[0], tle[1], self.names[i], self.ts)
                )
        self.positions = self.next_positions = [None for satellite in self.satellites]
        self.ssr_capacity = config.rc.application_configuration[
            "SSR_CAPACITY"
        ]  # in Gigabits
        self.capacity_used = config.rc.application_configuration[
            "CAPACITY_USED"
        ]  # fraction from 0 to 1
        self.instrument_rates = config.rc.application_configuration[
            "INSTRUMENT_RATES"
        ]  # in Gigabits/second
        self.cost_mode = config.rc.application_configuration["COST_MODE"]
        self.fixed_rates = config.rc.application_configuration["FIXED_RATES"]
        self.linkCounts = [0 for satellite in self.satellites]
        self.linkStatus = [False for satellite in self.satellites]
        self.cumulativeCostBySat = {l: 0.00 for l in self.names}
        self.cumulativeCosts = 0.00

    def initialize(self, init_time):
        """
        Activates the :obj:`SatelliteStorage` object with all constituent satellites at a specified initial scenario time

        Args:
            init_time (:obj:`datetime`): Initial scenario time for simulating propagation of satellites
        """
        super().initialize(init_time)
        self.grounds = pd.DataFrame(
            {
                "groundId": pd.Series([], dtype="int"),
                "latitude": pd.Series([], dtype="float"),
                "longitude": pd.Series([], dtype="float"),
                "elevAngle": pd.Series([], dtype="float"),
                "operational": pd.Series([], dtype="bool"),
                "downlinkRate": pd.Series([], dtype="float"),
                "costPerSecond": pd.Series([], dtype="float"),
                "costMode": pd.Series([], dtype="str"),
            }
        )
        self.positions = self.next_positions = [
            wgs84.subpoint(satellite.at(self.ts.from_datetime(init_time)))
            for satellite in self.satellites
        ]

        for i, satellite in enumerate(self.satellites):

            self.app.send_message(
                self.app.app_name,
                "ready",
                SatelliteReady(
                    id=self.id[i], name=self.names[i], ssr_capacity=self.ssr_capacity[i]
                ).model_dump_json(),
            )
        self.app.send_message(
            self.app.app_name, "allReady", SatelliteAllReady().model_dump_json()
        )

The next two functions, tick and tock, are very important for executing time-managed test suites. Generally, the tick function computes the current state of an application. Any cumbersome functions like simulations should be performed here. The tock function commits the state changes. You want this done as quickly as possible to maintain consistent timing between applications.

    def tick(self, time_step):
        """
        Computes the next :obj:`SatelliteStorage` state after the specified scenario duration and the next simulation scenario time

        Args:
            time_step (:obj:`timedelta`): Duration between current and next simulation scenario time
        """
        # tik = time.time()
        super().tick(time_step)
        self.next_positions = [
            wgs84.subpoint(
                satellite.at(self.ts.from_datetime(self.get_time() + time_step))
            )
            for satellite in self.satellites
        ]
        for i, satellite in enumerate(self.satellites):
            then = self.ts.from_datetime(self.get_time() + time_step)
            isInRange, groundId = check_in_range(then, satellite, self.grounds)
            self.capacity_used[i] = self.capacity_used[i] + (
                (self.instrument_rates[i] * time_step.total_seconds())
                / self.ssr_capacity[i]
            )
            if self.cost_mode[i] == "continuous" or self.cost_mode[i] == "both":
                self.cumulativeCostBySat[self.names[i]] = (
                    self.cumulativeCostBySat[self.names[i]]
                    + self.fixed_rates[i] * time_step.total_seconds()
                )
                self.cumulativeCosts = (
                    self.cumulativeCosts
                    + self.fixed_rates[i] * time_step.total_seconds()
                )
            if isInRange:
                if not self.linkStatus[i]:
                    self.linkStatus[i] = True
                    self.linkCounts[i] = self.linkCounts[i] + 1
            else:
                if self.linkStatus[i]:
                    self.linkStatus[i] = False

    def tock(self):
        """
        Commits the next :obj:`SatelliteStorage` state and advances simulation scenario time

        """
        # tik = time.time()
        self.positions = self.next_positions
        super().tock()
        for i, satellite in enumerate(self.satellites):
            if self.cost_mode[i] == "continuous" or self.cost_mode[i] == "both":
                self.app.send_message(
                    self.app.app_name,
                    "linkCharge",
                    LinkCharge(
                        groundId=100,
                        satId=self.id[i],
                        satName=self.names[i],
                        linkId=0,
                        end=self.get_time(),
                        duration=0,
                        dataOffload=0,
                        downlinkCost=0,
                        cumulativeCostBySat=self.cumulativeCostBySat[self.names[i]],
                        cumulativeCosts=self.cumulativeCosts,
                    ).model_dump_json(),
                )
                logger.info("satelliteStorage.linkCharge message sent.")

The function on_ground processes ground station information received from published messages. When a ground station publishes its location and operational parameters, this function updates the satellite’s internal database of ground stations. It extracts the station name, location (latitude, longitude), and minimum elevation angle required for communication, then stores this information for later use when determining if satellites can establish contact with ground stations.

    def on_ground(self, ch, method, properties, body):
        """
        Callback function appends a dictionary of information for a new ground station to grounds :obj:`list` when message detected on the *PREFIX/ground/location* topic.

        Ground station information is published at beginning of simulation, and the :obj:`list` is converted to a :obj:`DataFrame` when the Constellation is initialized.

        Args:
            client (:obj:`MQTT Client`): Client that connects application to the event broker using the MQTT protocol. Includes user credentials, tls certificates, and host server-port information.
            userdata: User defined data of any type (not currently used)
            message (:obj:`message`): Contains *topic* the client subscribed to and *payload* message content as attributes

        """
        body = body.decode("utf-8")
        location = GroundLocation.model_validate_json(body)
        if location.groundId in self.grounds.groundId:
            self.grounds[self.grounds.groundId == location.groundId].latitude = (
                location.latitude
            )
            self.grounds[self.grounds.groundId == location.groundId].longitude = (
                location.longitude
            )
            self.grounds[self.grounds.groundId == location.groundId].elevAngle = (
                location.elevAngle
            )
            self.grounds[self.grounds.groundId == location.groundId].operational = (
                location.operational
            )
            self.grounds[self.grounds.groundId == location.groundId].downlinkRate = (
                location.downlinkRate
            )
            self.grounds[self.grounds.groundId == location.groundId].costPerSecond = (
                location.costPerSecond
            )
            logger.info(
                f"Station {location.groundId} updated at time {self.get_time()}."
            )
        else:
            location = {
                "groundId": location.groundId,
                "latitude": location.latitude,
                "longitude": location.longitude,
                "elevAngle": location.elevAngle,
                "operational": location.operational,
                "downlinkRate": location.downlinkRate,
                "costPerSecond": location.costPerSecond,
                "costMode": location.costMode,
            }

            # Create a DataFrame from the location dictionary
            new_data = pd.DataFrame([location])

            # Concatenate the new data with the existing DataFrame
            self.grounds = pd.concat([self.grounds, new_data], ignore_index=True)

The function on_linkStart handles the start of a downlink operation. When a satellite comes within range of a ground station, this function initiates the data transfer process. It extracts information about which satellite and ground station are involved, calculates the maximum possible data transfer based on the satellite’s stored data and ground station downlink rate, and then begins the simulated data transfer. It also records the start time of the link for later billing calculations.

    def on_linkStart(self, ch, method, properties, body):
        """
        Callback function when message detected on the *PREFIX/ground/linkStart* topic.

        Args:
            client (:obj:`MQTT Client`): Client that connects application to the event broker using the MQTT protocol. Includes user credentials, tls certificates, and host server-port information.
            userdata: User defined data of any type (not currently used)
            message (:obj:`message`): Contains *topic* the client subscribed to and *payload* message content as attributes

        """
        body = body.decode("utf-8")
        downlinkStart = LinkStart.model_validate_json(body)
        logger.info(f"Downlink start: {downlinkStart}")
        if downlinkStart.groundId != -1:
            self.groundTimes[downlinkStart.satName].append(
                {
                    "groundId": downlinkStart.groundId,
                    "satId": downlinkStart.satId,
                    "satName": downlinkStart.satName,
                    "linkId": downlinkStart.linkId,
                    "start": downlinkStart.start,
                    "end": None,
                    "duration": None,
                    "initialData": downlinkStart.data,
                    "dataOffload": None,
                    "downlinkCost": None,
                },
            )

The function on_linkCharge processes downlink cost information. After completing a downlink session, this function receives the cost details associated with the data transfer. It updates the satellite’s internal accounting records with both the base cost for establishing the connection and the per-gigabyte cost for the data transferred. This allows the test suite to track the economic aspects of satellite operations and optimize downlink strategies accordingly.

    def on_linkCharge(self, ch, method, properties, body):
        """
        Callback function when message detected on the *PREFIX/ground/linkCharge* topic.

        Args:
            client (:obj:`MQTT Client`): Client that connects application to the event broker using the MQTT protocol. Includes user credentials, tls certificates, and host server-port information.
            userdata: User defined data of any type (not currently used)
            message (:obj:`message`): Contains *topic* the client subscribed to and *payload* message content as attributes

        """
        body = body.decode("utf-8")
        downlinkCharge = LinkCharge.model_validate_json(body)
        logger.info(f"Downlink charge: {downlinkCharge}")
        if downlinkCharge.groundId != -1:
            self.groundTimes[downlinkCharge.satName][downlinkCharge.linkId][
                "end"
            ] = downlinkCharge.end
            self.groundTimes[downlinkCharge.satName][downlinkCharge.linkId][
                "duration"
            ] = downlinkCharge.duration
            self.groundTimes[downlinkCharge.satName][downlinkCharge.linkId][
                "dataOffload"
            ] = downlinkCharge.dataOffload
            self.groundTimes[downlinkCharge.satName][downlinkCharge.linkId][
                "downlinkCost"
            ] = downlinkCharge.downlinkCost
            self.capacity_used[downlinkCharge.satId] = self.capacity_used[
                downlinkCharge.satId
            ] - (downlinkCharge.dataOffload / self.ssr_capacity[downlinkCharge.satId])
            if self.capacity_used[downlinkCharge.satId] < 0:
                self.capacity_used[downlinkCharge.satId] = 0
            self.cumulativeCostBySat[downlinkCharge.satName] = (
                self.cumulativeCostBySat[downlinkCharge.satName]
                + downlinkCharge.downlinkCost
            )
            self.cumulativeCosts = self.cumulativeCosts + downlinkCharge.downlinkCost

The function on_outage handles ground station outage notifications. When a ground station experiences a simulated failure, this function updates the satellite’s internal database to mark that station as unavailable. This prevents the satellite from attempting to establish contact with non-operational ground stations, forcing it to wait for another opportunity or seek an alternative station in the network. It also logs the outage for later analysis.

    def on_outage(self, ch, method, properties, body):
        """
        Callback function when message detected on the *PREFIX/outage/report* topic.

        Args:
           client (:obj:`MQTT Client`): Client that connects application to the event broker using the MQTT protocol. Includes user credentials, tls certificates, and host server-port information.
           userdata: User defined data of any type (not currently used)
           message (:obj:`message`): Contains *topic* the client subscribed to and *payload* message content as attributes

        """
        body = body.decode("utf-8")
        outageReport = OutageReport.model_validate_json(body)
        self.grounds.loc[outageReport.groundId, "operational"] = False
        if outageReport.groundId == 11:
            for c, mode in enumerate(self.cost_mode):
                self.cost_mode[c] = "discrete"

The function on_restore handles ground station restoration notifications. After an outage period ends, this function processes the message that indicates a ground station has returned to operational status. It updates the satellite’s internal database to mark the station as available again, allowing the satellite to establish contact with that station on subsequent orbital passes. This function also logs when stations return to service for operational analytics.

    def on_restore(self, ch, method, properties, body):
        """
        Callback function when message detected on the *PREFIX/outage/restore* topic.

        Args:
           client (:obj:`MQTT Client`): Client that connects application to the event broker using the MQTT protocol. Includes user credentials, tls certificates, and host server-port information.
           userdata: User defined data of any type (not currently used)
           message (:obj:`message`): Contains *topic* the client subscribed to and *payload* message content as attributes

        """
        body = body.decode("utf-8")
        outageRestore = OutageRestore.model_validate_json(body)
        self.grounds.loc[outageRestore.groundId, "operational"] = True
        if outageRestore.groundId == 11:
            for c, mode in enumerate(self.cost_mode):
                self.cost_mode[c] = "both"

SatStatePublisher class

The next class in the SatelliteStorage application is the SatStatePublisher. This class is responsible for publishing satellite status information over the NOS-T infrastructure. It takes the satellite data generation, storage, and downlink status information from the SatelliteStorage class and formats it into messages that can be consumed by other applications. These messages contain details about the satellite’s position, storage capacity utilization, and downlink history, which are essential for monitoring operations and for visualization in the Scoreboard application. The SatStatePublisher ensures that all components in the test suite have up-to-date information about the status of each satellite, enabling synchronized operations and comprehensive performance analysis throughout the simulation.

class SatStatePublisher(WallclockTimeIntervalPublisher):
    """
    *This object class inherits properties from the WallclockTimeIntervalPublisher object class from the publisher template in the NOS-T tools library*

    The user can optionally specify the wallclock :obj:`timedelta` between message publications and the scenario :obj:`datetime` when the first of these messages should be published.

    Args:
        app (:obj:`ManagedApplication`): An application containing a test-run namespace, a name and description for the app, client credentials, and simulation timing instructions
        constellation (:obj:`SatelliteStorage`): SatelliteStorage :obj:`Entity` object class with all constituent satellites
        time_status_step (:obj:`timedelta`): Optional duration between time status 'heartbeat' messages
        time_status_init (:obj:`datetime`): Optional scenario :obj:`datetime` for publishing the first time status 'heartbeat' message

    """

    def __init__(
        self, app, constellation, time_status_step=None, time_status_init=None
    ):
        super().__init__(app, time_status_step, time_status_init)
        self.constellation = constellation
        self.isInRange = [
            False for i, satellite in enumerate(self.constellation.satellites)
        ]

    def publish_message(self):
        """
        *Abstract publish_message method inherited from the WallclockTimeIntervalPublisher object class from the publisher template in the NOS-T tools library*

        This method sends a message to the *PREFIX/constellation/location* topic for each constituent satellite (:obj:`SatelliteStorage`), which includes:

        Args:
            id (int): Unique id for satellite in constellation
            name (str): Unique *str* name for satellite in constellation
            latitude (float): Latitude in degrees for satellite in constellation at current scenario time
            longitude (float): Longitude in degrees for satellite in constellation at current scenario time
            altitude (float): Altitude above sea-level in meters for satellite in constellation at current scenario time
            capacity_used (float): Fraction of solid-state recorder capacity used for satellite in constellation at current scenario time
            commRange (bool): Boolean state variable indicating if satellite in constellaton is in view of a ground station at current scenario time
            groundId (int): Optional unique id for ground station in view of satellite in constellation at current scenario time (if commRange = False, the groundId = None)
            totalLinkCount (int): Unique count of downlink opportunities for satellite in constellation
            cumulativeCostBySat (float): Cumulative costs incurred for downlinks and/or fixed cost contracts for satellite in constellation at current scenario time
            time (:obj:`datetime`): Current scenario :obj:`datetime`

        """
        for i, satellite in enumerate(self.constellation.satellites):
            next_time = constellation.ts.from_datetime(
                constellation.get_time() + 60 * self.time_status_step
            )
            satSpaceTime = satellite.at(next_time)
            subpoint = wgs84.subpoint(satSpaceTime)
            self.isInRange[i], groundId = check_in_range(
                next_time, satellite, constellation.grounds
            )
            self.app.send_message(
                self.app.app_name,
                "location",
                SatelliteState(
                    id=i,
                    name=self.constellation.names[i],
                    latitude=subpoint.latitude.degrees,
                    longitude=subpoint.longitude.degrees,
                    altitude=subpoint.elevation.m,
                    capacity_used=self.constellation.capacity_used[i]
                    * self.constellation.ssr_capacity[i],
                    commRange=self.isInRange[i],
                    groundId=groundId,
                    totalLinkCount=self.constellation.linkCounts[i],
                    cumulativeCostBySat=self.constellation.cumulativeCostBySat[
                        self.constellation.names[i]
                    ],
                    time=constellation.get_time(),
                ).model_dump_json(),
            )

The final block of code in the Satellites app is for initializing data and adding the functions and classes.

if __name__ == "__main__":
    # Define application name
    NAME = "satelliteStorage"

    # Load config
    config = ConnectionConfig(yaml_file="downlink.yaml", app_name=NAME)

    # Create the managed application
    app = ManagedApplication(app_name=NAME)

    # Load current TLEs for active satellites from Celestrak
    activesats_url = (
        "https://celestrak.org/NORAD/elements/gp.php?GROUP=active&FORMAT=tle"
    )
    activesats = load.tle_file(
        activesats_url, filename="satelliteStorage/active.txt", reload=True
    )

    by_name = {sat.name: sat for sat in activesats}
    names = ["SUOMI NPP", "NOAA 20 (JPSS-1)"]

    ES = []
    indices = []
    for name_i, name in enumerate(names):
        ES.append(by_name[name])
        indices.append(name_i)

    # Initialize the Constellation object class (in this example from EarthSatellite type)
    constellation = SatelliteStorage("constellation", app, [0, 1], names, ES)

    # Add the Constellation entity to the application's simulator
    app.simulator.add_entity(constellation)

    # Add a shutdown observer to shut down after a single test case
    app.simulator.add_observer(ShutDownObserver(app))

    # Add a position publisher to update satellite state every 5 seconds of wallclock time
    app.simulator.add_observer(
        SatStatePublisher(app, constellation, timedelta(seconds=1))
    )

    # Start up the application
    app.start_up(
        config.rc.simulation_configuration.execution_parameters.general.prefix,
        config,
    )

    # Add message callbacks
    app.add_message_callback("ground", "location", constellation.on_ground)
    app.add_message_callback("ground", "linkStart", constellation.on_linkStart)
    app.add_message_callback("ground", "linkCharge", constellation.on_linkCharge)
    app.add_message_callback("outage", "report", constellation.on_outage)
    app.add_message_callback("outage", "restore", constellation.on_restore)

    while True:
        pass

Manager

Maintaining a consistent simulation clock is important for many NOS-T use cases. For test suites that need to run faster than real time, it is an absolute necessity. The NOS-T Manager application (main_manager.py) is a good way to orchestrate all of the pieces for these types of tests. The manager is included in the NOS-T Tools library and will ensure that compliant applications start at the same time, and use a consistent simulation clock throughout the test run.

Next, we will go through the Manager code block-by-block to understand what it is doing. First, we have all of the import statements that the Manager relies on.

import logging

from nost_tools.application_utils import ShutDownObserver
from nost_tools.configuration import ConnectionConfig
from nost_tools.manager import Manager

Finally, the last line in the above code block sets up a logger to help you track what is going on. More info on the various levels can be found here.

logging.basicConfig(level=logging.INFO)

The next block of code loads configuration settings from a YAML configuration file. This file contains the parameters that will be used to run the test suite, such as the simulation duration, RabbitMQ connection details, and other application-specific settings. The YAML file is a convenient way to manage configuration settings without hardcoding them into the application code. The file is described below.

if __name__ == "__main__":
    # Load config
    config = ConnectionConfig(yaml_file="downlink.yaml")

    # Create the manager application
    manager = Manager()

    # add a shutdown observer to shut down after a single test case
    manager.simulator.add_observer(ShutDownObserver(manager))

    # Start up the manager
    manager.start_up(
        config.rc.simulation_configuration.execution_parameters.general.prefix,
        config,
    )

    manager.execute_test_plan()

Execution

There are a total of five files you will need to run for the FireSat+ test suite: three user applications, the NOS-T manager application, and the Scoreboard, a geospatial data visualization tool.

There are a few more steps necessary to run the Downlink test suite. You need to create a Cesium token to run the Scoreboard and set up environment files for each application.

YAML Configuration File

Each application reads a single YAML configuration file to access the parameters for the test case. The configuration file is located in the examples/downlink/ folder. The configuration file is named downlink.yaml and contains the following parameters:

info:
  title: Novel Observing Strategies Testbed (NOS-T) YAML Configuration
  version: '1.0.0'
  description: Version-controlled AsyncAPI document for RabbitMQ event broker with Keycloak authentication within NOS-T
servers:
  rabbitmq:
    keycloak_authentication: False
    host: "localhost"
    port: 5672
    tls: False
    virtual_host: "/"
    message_expiration: "60000"
    heartbeat: 600
    blocked_connection_timeout: 300
execution:
  general:
    prefix: downlink
  application:
    shut_down_when_terminated: True
  manager:
    sim_start_time: "2023-01-23T07:20:00+00:00"
    sim_stop_time: "2023-01-23T09:20:00+00:00"
    start_time: 
    time_step: "0:00:01"
    time_scale_factor: 60
    time_status_step: "0:00:01"
    is_scenario_time_status_step: False
    time_status_init: "2023-01-23T07:21:00+00:00"
    command_lead: "0:00:05"
    required_apps:
      - manager
      - satelliteStorage
      - outage
      - ground
    init_retry_delay_s: 5
    init_max_retry: 5
    set_offset: True
    shut_down_when_terminated: True
  managed_applications:
    satelliteStorage:
      time_scale_factor: 60
      time_step: "0:00:01"
      is_scenario_time_step: False
      set_offset: True
      time_status_step: "0:00:10"
      is_scenario_time_status_step: False
      time_status_init: "2023-01-23T07:20:00+00:00"
      shut_down_when_terminated: True
      manager_app_name: "manager"
      configuration_parameters:
        SSR_CAPACITY: 
          - 280
          - 343
        CAPACITY_USED:
          - 0.30
          - 0.25
        INSTRUMENT_RATES:
          - 0.0125
          - 0.0125
        COST_MODE:
          - "discrete"
          - "discrete"
        FIXED_RATES:
          - 0.09
          - 0.09
    outage:
      time_scale_factor: 60
      time_step: "0:00:01"
      is_scenario_time_step: False
      set_offset: True
      time_status_step: "0:00:10"
      is_scenario_time_status_step: False
      time_status_init: "2023-01-23T07:30:00+00:00"
      shut_down_when_terminated: True
      manager_app_name: "manager"
    ground:
      time_scale_factor: 60
      time_step: "0:00:01"
      is_scenario_time_step: False
      set_offset: True
      time_status_step: "0:00:10"
      is_scenario_time_status_step: False
      time_status_init: "2023-01-23T07:30:00+00:00"
      shut_down_when_terminated: True
      manager_app_name: "manager"
      configuration_parameters:
        stations:
          - groundId: 0
            latitude: 40.1625
            longitude: -83.21
            elevAngle: 20.0
            operational: true
            downlinkRate: 0.78156
            costPerSecond: 0.37
            costMode: "discrete"
          - groundId: 1
            latitude: 43.8041
            longitude: -120.5542
            elevAngle: 20.0
            operational: true
            downlinkRate: 0.78156
            costPerSecond: 0.37
            costMode: "discrete"
          - groundId: 2
            latitude: 19.8968
            longitude: -155.5828
            elevAngle: 20.0
            operational: true
            downlinkRate: 0.78156
            costPerSecond: 0.37
            costMode: "discrete"
          - groundId: 3
            latitude: -33.8688
            longitude: 151.2093
            elevAngle: 20.0
            operational: true
            downlinkRate: 0.78156
            costPerSecond: 0.37
            costMode: "discrete"
          - groundId: 4
            latitude: 37.5665
            longitude: 126.978
            elevAngle: 20.0
            operational: true
            downlinkRate: 0.78156
            costPerSecond: 0.37
            costMode: "discrete"
          - groundId: 5
            latitude: 26.0667
            longitude: 50.5577
            elevAngle: 20.0
            operational: true
            downlinkRate: 0.78156
            costPerSecond: 0.37
            costMode: "discrete"
          - groundId: 6
            latitude: -33.9249
            longitude: 18.4241
            elevAngle: 20.0
            operational: true
            downlinkRate: 0.78156
            costPerSecond: 0.37
            costMode: "discrete"
          - groundId: 7
            latitude: 59.3293
            longitude: 18.0686
            elevAngle: 20.0
            operational: true
            downlinkRate: 0.78156
            costPerSecond: 0.37
            costMode: "discrete"
          - groundId: 8
            latitude: 53.1424
            longitude: -7.6921
            elevAngle: 20.0
            operational: true
            downlinkRate: 0.78156
            costPerSecond: 0.37
            costMode: "discrete"
          - groundId: 9
            latitude: -53.1638
            longitude: -70.9171
            elevAngle: 20.0
            operational: true
            downlinkRate: 0.78156
            costPerSecond: 0.37
            costMode: "discrete"
          - groundId: 10
            latitude: 1.2840
            longitude: 103.8488
            elevAngle: 20.0
            operational: true
            downlinkRate: 0.78156
            costPerSecond: 0.37
            costMode: "discrete"
          # - groundId: 0
          #   latitude: 78.229772
          #   longitude: 15.407786
          #   elevAngle: 5.0
          #   operational: true
          #   downlinkRate: 0.300
          #   costPerSecond: 0.09
          #   costMode: "continuous"

Note

For details about the YAML configuration file, refer to the YAML guide.

Cesium Access Token and Assets

The scoreboard application uses the Cesium geospatial visualization tool which requires getting an access token and an 3D Earth map asset. You will get an access token by signing in at the following link: https://cesium.com/ion/signin/tokens

After creating an account, you must add the Asset “Blue Marble Next Generation July, 2004” from the Asset Depot (ID 3845) to your account assets to enable visualization.

Setting Up Environment Files

The Scoreboard application is in .html, and pulls in credentials from a JavaScript file. To do this create a text file with the name “env.js” containing the following information:

var HOST="your event broker host URL"
var PORT="your event broker port"
var USERNAME="your event broker username"
var PASSWORD="your event broker password"
var TOKEN="your Cesium token (see Cesium installation instructions)"

For example, if you are running the test suite on your local computer using a local RabbitMQ event broker, you can set up the env.js file like this:

var HOST="localhost"
var PORT=15670
var USERNAME="admin"
var PASSWORD="admin"
var TOKEN="your Cesium token (see Cesium installation instructions)"

Note

For details on setting up a local RabbitMQ broker, refer to the RabbitMQ Broker on Local Host guide.

Executing the Downlink Test Suite

Finally, you need to run the five applications together in order to execute the Downlink test suite. These applications need to be logically separated when running. For the Python scripts, this can be done by running them on separate computers, by using separate consoles in Spyder, or separate terminals with VSCode. The Scoreboard is an .html file and can be run in a web browser, double-clicking the file should work. Each folder in the Downlink test suite has a code you need to run, they are:

  • main_satelliteStorage.py - The SatelliteStorage app models satellites that generate and store data during their orbits, then downlink when in contact with operational ground stations.

  • main_ground.py - The Ground app models a network of ground stations that receive data from satellites, providing geographic locations, elevation angles, and downlink rates.

  • main_outages.py - The Outages app introduces realistic constraints by randomly creating outages in ground station operations.

  • scoreboard.html - The Scoreboard provides a geospatial visualization of satellite positions, ground stations (active in pink, inactive in gray), and data transmission activities.

  • main_manager.py - The NOS-T Manager app orchestrates each test run by starting the other apps simultaneously, maintaining a consistent simulation time, and shutting down the apps at the end.

Note

You must run the manager application (manager/main_manager.py) first, as it establishes the RabbitMQ exchange required to run the execution. Tt does not matter in which order you start the other applications.

If everything is running correctly, the Scoreboard app should show an image similar to below.

../../_images/downlinkScoreboard1.png

Conclusion

The Downlink test suite demonstrates how to model satellite data downlink operations using the NOS-T Tools library. By following this tutorial, you have learned how to set up the necessary components, understand the application code, and execute the test suite. The Downlink test suite provides a foundation for simulating realistic satellite operations and can be extended for more complex scenarios in the future. Some good next steps for learning other NOS-T functions and developing your own test suites can be found at the following links:

  • Main Downlink documentation

  • Science Event Dashboard Test Suite

  • FireSat+ Test Suite

  • NOS-T Tools API documentation

Previous Next

© Copyright 2023, Stevens Institute of Technology.

Built with Sphinx using a theme provided by Read the Docs.