1. xApp Async Library In-depth¶
1.1 xApp Async Library Overview¶
The xApp Library provides functionality that will operate with the Python Async IO framwework. The advantage of using this approach is that it is easier to handle events such as messages received and configuration changes in an asynchronous manner, so no polling is required. Periodic operations can also be performed using timers.
The basic features offered are effecively equivalent to the non-Async versions, and this page describes the different behaviour and additional functionality provided.
1.2 General Async xApp Concepts and Features¶
The Python AsyncIO framework is fairly well documented elsewhere. The official documentation can be found here https://docs.python.org/3/library/asyncio.html.
The Async version of the xApp Library uses it in the following way.
- There is only one event loop within the xApp which eliminates the need to use thread safefy mechanisms such as locks. This event loop can be acquired by using the asyncio.get_event_loop() in any functions. It is not recommended to introduce other event loops or threads as this could interefere with the operation of the xApp. However there are ways to handle this to ensure coroutines are posted from these other loops or threads which is described on this page.
- The subscriber pattern is often used so Async functions can be written to handle external events such as messages received on Kafka or NATS, or configuration changes via Async callback functions.
- Notable additions are a Periodic Timer which can be used to perform operations at regular intervals, a Single Shot Timer and the ability to subscribe to configuration changes.
- The modules that include async functionality are prefixed with async_. These are located in the core directory and include:
- async_actions.py
- async_admission_control_message_handler.py
- async_kafka.py
- async_nats.py
- async_settings.py
- async_timer.py
1.3 Creating an Async xApp¶
The process of building an Async xApp is similar to that of a non-Async version, but there are some notable differences due the nature of how Async applications operate.
An example of how to use build an Async xApp and use the functions can be found in the xapp_async_main.py file in the example/core directory, located in the xApp Framework repository.
1.3.1 Async xApp Builder¶
The xApp Builder should be used to generate async version of the generic xApp framework. This is achieved by calling build_async() instead of build() as shown below.
builder = xapp_lib.XAppBuilder("..", absolute=False)
builder.metadata("core/xapp_metadata.json")
builder.endpoints("config/xapp_endpoints.json")
builder.config("config/xapp_config.json")
builder.readme("README.md")
builder.restapi(
[
("/api/", restapi.MainApiHandler),
("/api/actions", restapi.ActionsHandler),
("/api/request", restapi.RequestHandler),
]
)
xapp = builder.build_async()
1.3.2 Setup the Logger¶
Next, the logger should be configured with a format and the required logging level. The logging level is usually defined with the 'LOG_LEVEL' setting.
The following is an example of configurint the log level.
logging.root.handlers = []
logging.basicConfig(
format="[%(levelname)s] %(asctime)s (%(threadName)s) %(message)s", level=xapp.settings().get_config('LOG_LEVEL'))
1.3.3 Create an Application Object¶
It is advised to create an application class to encapsulate the functionality, data and state of the application. This is described in more detail in the Application Object section. We create an instance of this class at this point in the process.
my_app = MyApp(xapp)
Here we can see that the instance of the generic xApp framework we built is passed into our own Application object. At this point the init member function of MyApp is called. This is described in the Initialisation Function section.
1.3.4 Setup NATS Subscriptions¶
It is possible be notified when messages are received on specific NATS subjects. The xapp.nats().subscribe function takes in a dictionary of subject : callback pairs.
A subject is in the form of a typical NATS subscription.
The example shows how to subscribe to NATS messages received on the subject RC4G_CTRL_UE_MEASUREMENT_DATA...*.
subjects_and_handlers = { 'RC4G_CTRL_UE_MEASUREMENT_DATA.*.*.*': my_app.handle_nats_4g_ue_measurement}
xapp.nats().subscribe(subjects_and_handlers)
It is advised that the callback functions are member functions of the Application class, so when they are called they can operate on the application data. Each callback function should defined in the following way:
async def nats_callback(self, message):
subject = message.subject
data = message.data
Note this will be called from the Async event loop, so needs to be defined as an async function.
It should also be noted it is also possible to add and remove NATS subscriptions while the application is running. This described in theh Run-time Add NATS Subscription section.
1.3.5 Setup Kafka Subscriptions¶
It is possible be notified when messages are received on specific Kafka topics. The xapp.kafka().subscribe function takes in a dictionary of topic : callback pairs.
A topic is in the form of a typical Kafka subscription.
A callback function should be defined to contain a message parameter which will contain the message received on for that topic.
The following example shows how to subscribe to the topics accelleran.drax.4g.ric.raw.ue_measurements and accelleran.drax.5g.ric.raw.ran_control_response.
topics_and_handlers = { 'accelleran.drax.4g.ric.raw.ue_measurements' : my_app.handle_kafka_4g_ue_measurement,
'accelleran.drax.5g.ric.raw.ran_control_response': my_app.handle_kafka_5g_handover_response
}
self.xapp.kafka().subscribe(topics_and_handlers)
It is advised that the callback functions are member functions of the Application class, so when they are called they can operate on the application data. Each callback function should defined in the following way:
async def kafka_callback(self, message):
json_object = json.loads(message)
It can be seen that the message will be a string representation of the JSON message received over Kafka, so it should be converted to a JSON object using the json.loads function in order for it to be operated on.
Unlike NATS, it is not currently possible to add and remove Kafka subscriptions while the application is running.
1.3.6 Run the Async Event Loop¶
Once the application object has been created, the single Asycio event loop is started using the the xapp.run_async function. By default this will start the Async REST API, Async NATS and Async Kafka. However it is possible to provide other async coroutines to start by adding them as parameters.
The following is an example of calling run_async which starts the run_async function from my_app:
xapp.run_async(my_app.run_async())
1.4 Application Object¶
When using AsyncIO it is easier to manage the callbacks within an object encapsulating the applicaton data and state, as the call back functions are almost certainily going to need operate on that information. Additionally, some objects and Async tasks need to be kept persistent over the life-time of the application, as otherwise they can end up becoming lost or cancelled and need to be maintained as member variables of this object. Therefore, in the example we create a class called MyApp (this of course can be given a different name).
An example of the basic structure of an application class is shown below.
class MyApp:
def __init__(self, xapp: xapp_lib.AsyncXApp):
self.xapp = xapp
# Setup settings handlers
settings_changed_handlers = { 'LOG_LEVEL': self.handle_log_level_changed }
self.xapp.settings().subscribe(settings_changed_handlers)
# Create a Periodic Timer to perfom periodic processing (every second in this example)
self.periodic_timer = async_timer.AsyncPeriodicTimer(1, self.periodic_timer_handler)
# Add any application data here as member variables
async def handle_log_level_changed(self, value):
logger = logging.getLogger()
logger.setLevel(value)
async def periodic_timer_handler(self):
# Perform some operations on the application data
async def run_async(self):
# Application functionality to be run when the Async event loop is started.
# This function is optional
In the above example, an application is given the reference to the generic AsyncXApp created by the xApp builder, and creates all its own data as member variables containing the application data, defines the calbacks as member functions, and finally defines an optional function to be run when the Async event loop is started.
The main functionality is split into the __init__ and run_async methods which are described below.
1.4.1 Initialisation Function¶
The __init__ function is called when the Application object is constructed, so is run in the non-Async context (ie not within the Async event loop thread).
Typically this function will contain the initialisation of the member data (and maybe sub-components) of the application.
In addition it will contain the functionality described in the following sections.
1.4.1.1 Setup Settings Subscriptions¶
It is possible be notified when changes are made to specific settings via for example the REST API. The settings().subscribe function takes in a dictionary of "setting : callback" pairs.
The following example shows how to subscribe to a change of LOG_LEVEL
settings_changed_handlers = { 'LOG_LEVEL': self.handle_log_level_changed }
self.xapp.settings().subscribe(settings_changed_handlers)
The following is an example of how the callback function should be defined.
async def handle_log_level_changed(self, value):
logger = logging.getLogger()
logger.setLevel(value)
Note this will be called from the Async event loop, so needs to be defined as an async function.
1.4.1.2 Create Timers¶
The Async xApp framework provides two types of timer. A periodic timer which is fired at regular intervals and and single shot timer which as its name suggests is only fired once. Both types of timer should be created within the init function as they need to persist during the lifetime of tha application otherwise it is possible they become garbage collected and no longer fire.
Each type of timer is described below.
1.4.1.2.1 Periodic Timer¶
A Periodic Timer has been added to allow periodic updates to be performed, and multiple periodic timers can be created.
Each periodic timer is created with an interval and a callback function.
An interval is a number of seconds as a float, and can therefore be specified in fractions of a second.
A callback function takes no parameters.
The following example shows how to create a periodic timer which calls the function periodic_timer_callback function once a second.
self.periodic_timer = async_timer.AsyncPeriodicTimer(1, self.periodic_timer_callback)
Each callback function should be defined in the following way:
async def periodic_timer_callback(self):
logging.info("Periodic timer fired")
Note this will be called from the Async event loop, so needs to be defined as an async function.
1.4.1.2.2 Single Shot Timer¶
A Single Shot Timer has also been provided, which as its names suggests is only fired once after it has been started.
Each one shot timer is created with an interval and a callback function.
An interval is a number of seconds as a float, and can be therefore be specified in fractions of a second.
The following example shows how to create a one shot timer that calls the single_shot_timer_callback function 1 second after it has been started.
self.single_shot_timer = async_timer.AsyncSingleShotTimer(1, self.single_shot_timer_callback)
Each callback function should be defined in the following way:
async def single_shot_timer_callback(self, timer):
logging.info("One shot timer fired")
Note this will be called from the Async event loop, so needs to be defined as an async function.
1.4.1.2.3 Create Admission Control Messsage Handler¶
If it is required to process UE or PDU Admission Control Messages an AdmissionControlMessageHandler object should be created as a memmber variable of the application object. The callbacks are added within the Async context, which is discussed later.
The following is an example of how to create an AdmissionControlMessageHandler object.
self.admission_control_message_handler = AsyncAdmissionControlMessageHandler(self.xapp.nats())
1.4.2 Run Async Function¶
The run_async function will start off any other procesing required in the application from within the Async event loop. This typically includes starting any periodic timers required.
1.5 Async xApp Framework Features¶
The previous sections described how to create an Async xApp and the Application class. This section describes each Async feature in more detail and how they should be called within the Async (event loop) context. Note the examples assume that the functions are written within the the application class, so references to "self" refer to this class.
1.5.1 NATS¶
As described in the Setup NATS Subscriptions section, it is possible to configure the callback functions that are called when messages are received on specific NATS subjects when the application object is being initialised. However the following functionality is also provided.
1.5.1.1 Run-time Add NATS Subscription¶
It is possible to add NATS subscriptions while the applicaion is running from within the Async event loop. This is achieved by calling the add_subscription function.
The following is an example of how to add a new subscription for the NATS subject 'test_subject', where the test_callback method is called whenever a message is recived on that that NATS subject.
await self.xapp.nats().add_subscription('test_subject', self.test_callback)
Note that as this is called with the Async context, the add_subscription function needs to be awaited upon.
The callback function should be defined in the same way as defined earlier, so for example:
async def test_callback(self, message):
subject = message.subject
data = message.data
1.5.1.2 Run-time Remove NATS Subscription¶
It is also possible to remove NATS subscriptions while the applicaion is running from within the Async event loop. This is achieved by calling the remove_subscription function.
The following is an example of how to remove the subscription for the NATS subject 'test_topic'.
await self.xapp.nats().remove_subscription('test_topic')
Note that as this is called with the Async context, the remove_subscription function needs to be awaited upon.
1.5.1.3 Publish to the dRAX NATS Databus¶
The function to publish to the dRAX NATS Databus is similar to the non-Async version via the send_message function.
The following is an example of how to send a json object on the NATS subject 'test_subject'
data = {"key": "data"}
await self.xapp.nats().json().send_message('test_subject', data)
Note that as this is called with the Async context, the send_message function needs to be awaited upon.
1.5.2 Kafka¶
As described in the Setup Kafka Subscriptions section, it is possible to configure the callback functions that are called when messages are received on specific Kafka topics when the application object is being initialised. However the following functionality is also provided.
1.5.2.1 Publish to the dRAX Kafka Databus¶
The function to publish to the dRAX Kafka Databus is similar to the non-Async version via the send_message function.
The following is an example of how to send a json object on the Kafka topic 'test_topic'
data = {"key": "data"}
await xapp.kafka().json().send_message('test_topic', data)
Note that as this is called with the Async context, the send_message function needs to be awaited upon.
1.5.3 Periodic Timer¶
As described in the Create Periodic Timers section, periodic timers are created in the init function of the application.
However the timers can be started and canelled from with the Async context. In general, periodic timers are started in the run_async function of the application.
The following is an example of how to start a periodic timer
await self.periodic_timer.start()
The following is an example of how to cancel a periodic timer
await self.periodic_timer.cancel()
Note these will be called from the Async event loop, so they need to be awaited upon.
1.5.4 Single Shot Timer¶
As described in the Create Periodic Timers section, single shot timers are created in the init function of the application.
The timers can be started and canelled from with the Async context.
The following is an example of how to start a single shot timer
await self.single_shot_timer.start()
The following is an example of how to cancel a periodic timer
await self.single_shot_timer.cancel()
Note these will be called from the Async event loop, so they need to be awaited upon.
1.5.5 Admission Control¶
The Async framework supports UE and PDU Admission Control. Sending messages is similar to the non-Async version of the SDK, but the Async version allows the application to subscribe to UE and PDU Admission Control messages received, and define callback functions which are called when each type of admission control request message is received. The following sections describe the functionality provided.
1.5.5.1 Subscribe to UE Admission Control Requests¶
If the application needs to handle UE Admission request messages, it should subscribe to them using the subscribe_ue_admission_control function. This takes in callback functions for UE admission requests, update indications, and release indications.
The following is an example of the subscription:
await self.admission_control_message_handler.subscribe_ue_admission_control(self.ue_admission_request_handler,
self.ue_info_update_indication_handler,
self.ue_release_indication_handler)
Note this will be called from the Async event loop, so needs to be awaited upon.
The sections below describe each of the callback functions.
1.5.5.1.1 UE Admission Request Callback Function¶
The UE Admission Request callback function should be defined in the following way:
async def ue_admission_request_handler(self, ue_adm_req):
self.cu_cp_name = ue_adm_req.ue_admission_request_received.CuCpName
self.cu_cp_ue_id = ue_adm_req.ue_admission_request_received.CuCpUeId
The us_adm_req is quite a large structure defined in the proto buf files, but the relevant fields are cu_cp_name and cu_cp_ue_id. These should be stored as they may need to be used in other functions.
Note this will be called from the Async event loop, so needs to be defined as an async function.
1.5.5.1.2 UE Information Update Indication Callback Function¶
The UE Information Update Indication callback function should be defined in the following way:
async def ue_info_update_indication_handler(self, ue_info_update):
cu_cp_name = ue_info_update.ue_info_update_indication_received.CuCpName
cu_cp_ue_id = ue_info_update.ue_info_update_indication_received.CuCpUeId
The ue_info_update is quite a large structure defined in the proto buf files, but the relevant fields are cu_cp_name and cu_cp_ue_id.
Note this will be called from the Async event loop, so needs to be defined as an async function.
1.5.5.1.3 UE Release Indication Callback Function¶
The UE Release Indication callback function should be defined in the following way:
async def ue_release_indication_handler(self, ue_release_ind):
cu_cp_name = ue_release_ind.ue_release_indication_received.CuCpName
cu_cp_ue_id = ue_release_ind.ue_release_indication_received.CuCpUeId
The ue_release_ind is quite a large structure defined in the proto buf files, but the relevant fields are cu_cp_name and cu_cp_ue_id.
Note this will be called from the Async event loop, so needs to be defined as an async function.
1.5.5.2 Send UE Admission Control Messages¶
It is possible to send UE Admission Control messages. The functions provided are very similar to those provided by the non-Async version of the SDK.
The sections below describe each of these functions.
1.5.5.2.1 Accept a UE Admission Request¶
The following example shows how to accept a UE Admission Request.
err = await async_actions.async_ue_admission_accept(self.xapp, self.cu_cp_name, self.cu_cp_ue_id)
if err == 0:
logging.info(f"Admission of UE {self.cu_cp_ue_id} on CU {self.cu_cp_name} succeded")
else:
logging.info(f"Admission of UE {self.cu_cp_ue_id} on CU {self.cu_cp_name} failed")
In the example, it can be seen that the cu_cp_name and cu_cp_ue_id are those that were stored when the UE Admission Request was received.
Note this will be called from the Async event loop, so it needs to be awaited upon.
1.5.5.2.2 Reject a UE Admission Request¶
The following example shows how to reject a UE Admission Request.
err = await async_actions.async_ue_admission_reject(self.xapp, self.cu_cp_name, self.cu_cp_ue_id)
if err == 0:
logging.info(f"Rejection of UE {self.cu_cp_ue_id} on CU {self.cu_cp_name} succeded")
else:
logging.info(f"Rejection of UE {self.cu_cp_ue_id} on CU {self.cu_cp_name} failed")
In the example, it can be seen that the cu_cp_name and cu_cp_ue_id are those that were stored when the UE Admission Request was received.
Note this will be called from the Async event loop, so it needs to be awaited upon.
1.5.5.2.3 Release a UE¶
The following example shows how to release a UE.
err = await async_actions.async_ue_admission_reject(self.xapp, self.cu_cp_name, self.cu_cp_ue_id)
if err == 0:
logging.info(f"Rejection of UE {self.cu_cp_ue_id} on CU {self.cu_cp_name} succeded")
else:
logging.info(f"Rejection of UE {self.cu_cp_ue_id} on CU {self.cu_cp_name} failed")
In the example, it can be seen that the cu_cp_name and cu_cp_ue_id are those that were stored when the UE Admission Request was received.
Note this will be called from the Async event loop, so it needs to be awaited upon.
1.5.5.3 Unsubscribe from UE Admission Control Requests¶
The following example shows how unsubsctibe from UE Admission Control Requests
await async_actions.unsubscribe_ue_admission_control()
Note this will be called from the Async event loop, so it needs to be awaited upon.
1.5.5.4 Subscribe to PDU Admission Control Requests¶
If the application needs to handle PDU Admission request messages, it should subscribe to them using the subscribe_pdu_admission_control function. This takes in callback functions for PDU admission requests, modification requests, and release indications.
The following is an example of the subscription:
await self.admission_control_message_handler.subscribe_pdu_admission_control(self.pdu_session_adm_request_handler,
self.pdu_session_mod_request_handler,
self.pdu_session_release_indicator_handler)
Note this will be called from the Async event loop, so needs to be awaited upon.
The sections below describe each of the callback functions.
1.5.5.4.1 PDU Session Admission Request Callback Function¶
The PDU Admission Request callback function should be defined in the following way:
async def pdu_session_adm_request_handler(self, pdu_session_adm_req):
self.cu_cp_name = pdu_session_adm_req.pdu_session_adm_request_received.CuCpName
self.cu_cp_ue_id = pdu_session_adm_req.pdu_session_adm_request_received.CuCpUeId
The us_adm_req is quite a large structure defined in the proto buf files, but the relevant fields are cu_cp_name and cu_cp_ue_id. These should be stored as they may need to be used in other functions.
Note this will be called from the Async event loop, so needs to be defined as an async function.
1.5.5.4.2 PDU Session Modification Request Callback Function¶
The PDU Session Modification Request callback function should be defined in the following way:
async def pdu_session_mod_request_handler(self, pdu_session_mod_req):
cu_cp_name = pdu_session_mod_req.pdu_session_mod_request_received.CuCpName
cu_cp_ue_id = pdu_session_mod_req.pdu_session_mod_request_received.CuCpUeId
The pdu_session_mod_req is quite a large structure defined in the proto buf files, but the relevant fields are cu_cp_name and cu_cp_ue_id.
Note this will be called from the Async event loop, so needs to be defined as an async function.
1.5.5.4.3 PDU Session Release Indicator Callback Function¶
The PDU Session Release Indicator callback function should be defined in the following way:
async def pdu_session_release_indicator_handler(self, pdu_session_release_ind):
cu_cp_name = pdu_session_release_ind.pdu_session_release_indicator_received.CuCpName
cu_cp_ue_id = pdu_session_release_ind.pdu_session_release_indicator_received.CuCpUeId
The pdu_session_release_ind is quite a large structure defined in the proto buf files, but the relevant fields are cu_cp_name and cu_cp_ue_id.
Note this will be called from the Async event loop, so needs to be defined as an async function.
1.5.5.5 Send PDU Admission Control Messages¶
It is possible to send PDU Admission Control messages. The functions provided are very similar to those provided by the non-Async version of the SDK.
The sections below describe each of these functions.
1.5.5.5.1 Accept a PDU Session Admission Request¶
The following example shows how to accept a PDU Admission Request.
err = await async_actions.async_pdu_session_admission_accept(self.xapp, self.cu_cp_name, self.cu_cp_ue_id)
if err == 0:
logging.info(f"Admission of PDU session {self.cu_cp_ue_id} on CU {self.cu_cp_name } succeded")
else:
logging.info(f"Admission of PDU session {self.cu_cp_ue_id} on CU {self.cu_cp_name } failed")
In the example, it can be seen that the cu_cp_name and cu_cp_ue_id are those that were stored when the PDU Admission Request was received.
Note this will be called from the Async event loop, so it needs to be awaited upon.
1.5.5.5.2 Reject a PDU Session Admission Request¶
The following example shows how to reject a PDU Admission Request.
err = await async_actions.async_pdu_session_admission_reject(self.xapp, self.cu_cp_name, self.cu_cp_ue_id)
if err == 0:
logging.info(f"Rejection of PDU session {self.cu_cp_ue_id} on CU {self.cu_cp_name} succeded")
else:
logging.info(f"Rejection of PDU session {self.cu_cp_ue_id} on CU {self.cu_cp_name} failed")
In the example, it can be seen that the cu_cp_name and cu_cp_ue_id are those that were stored when the PDU Admission Request was received.
Note this will be called from the Async event loop, so it needs to be awaited upon.
1.5.5.5.3 Accept a PDU Session Admission Modification Request¶
The following example shows how to accept a PDU Admission ModificationRequest.
err = await async_actions.async_pdu_session_modification_accept(self.xapp, self.cu_cp_name, self.cu_cp_ue_id)
if err == 0:
logging.info(f"Admission of PDU session modification {self.cu_cp_ue_id} on CU {self.cu_cp_name} succeded")
else:
logging.info(f"Admission of PDU session modification {self.cu_cp_ue_id} on CU {self.cu_cp_name} failed")
In the example, it can be seen that the cu_cp_name and cu_cp_ue_id are those that were stored when the PDU Admission Request was received.
Note this will be called from the Async event loop, so it needs to be awaited upon.
1.5.5.5.4 Reject a PDU Session Admission Modification Request¶
The following example shows how to reject a PDU Admission Modification Request.
err = await async_actions.async_pdu_session_modification_reject(self.xapp, pdu_session_mod_req.CuCpName, self.cu_cp_ue_id)
if err == 0:
logging.info(f"Rejection of PDU session modification {self.cu_cp_ue_id} on CU {self.cu_cp_name} succeded")
else:
logging.info(f"Rejection of PDU session modification {self.cu_cp_ue_id} on CU {self.cu_cp_name} failed")
In the example, it can be seen that the cu_cp_name and cu_cp_ue_id are those that were stored when the PDU Admission Request was received.
Note this will be called from the Async event loop, so it needs to be awaited upon.
1.5.5.6 Unsubscribe from PDU Admission Control Messages¶
The following example shows how unsubsctibe from UE Admission Control Requests
await async_actions.unsubscribe_pdu_admission_control()
Note this will be called from the Async event loop, so it needs to be awaited upon.
1.5.6 Async dRAX Commands¶
Async versions of the Handover commands/triggers have been provided.
The following is an example of a function to trigger an 4G LTE Handover command.
async def send_handover_trigger_4g(self):
handover_list = [
{'ueIdx': 'ueDraxId_to_handover_1', 'targetCell': 'Bt1Cell', 'sourceCell': 'Bt2Cell'},
{'ueIdx': 'ueDraxId_to_handover_2', 'targetCell': 'Bt2Cell', 'sourceCell': 'Bt1Cell'}
]
await async_actions.async_trigger_handover(xapp.nats(), handover_list)
The following is an example of a functon to trigger a 5G LTE Handover command.
async def send_handover_trigger_5g(self):
ue_id = 1
target_cell = actions.Cell(id=456, plmn='654321')
transaction_id = await async_actions.async_trigger_handover_5g(xapp=xapp,
ue_id=ue_id,
target_cell=target_cell)
1.5.7 Calling the dRAX RIC REST API¶
When making requests to the dRAX API Gateway, the async_request package should be used in instead of the non-Async request package.
The following is an example of this.
endpoint = '/discover/services/netconf'
api_response = await async_requests.get(actions.create_api_url(self.xapp, endpoint))
Note that as this is called with the Async context, the subscribe function needs to be awaited upon.
1.5.8 Access to the xApp main event loop¶
A function has been provided which can get the main event loop from the xApp.
The following is an example of how to use this.
main_event_loop = self.xapp.get_event_loop()
1.5.9 User defined threads or other Event Loops interacting with the xApp Event Loop¶
It may be necessary for developers to introduce their own threads or event loops. If any work needs to transferred back to the main event loop so processing can be handled by the Async Xapp framework, the Asyncio function run_coroutine_threadsafe should be used as shown below.
asyncio.run_coroutine_threadsafe(coroutine_for_main_event_loop, main_loop)
1.5.10 Async Rest API Functions¶
The REST API page describes how to add custom handlers for REST API endpoints. Similar to functions performed in different threads, the handlers should put any processing on the main event loop.
The following is an example of this.
async def get_handler(xapp):
# Process Get
async def post_handler(xapp):
# Proecss Post
class RequestHandler(tornado.web.RequestHandler):
def initialize(self, xapp):
self.xapp = xapp
self.write("init request handler\n")
def get(self):
asyncio.run_coroutine_threadsafe(get_handler(self.xapp), self.xapp.get_event_loop())
def post(self):
asyncio.run_coroutine_threadsafe(post_handler(self.xapp), self.xapp.get_event_loop())