Technical components
Version 19 (Paul Carensac, 07/08/2016 12:43 pm) → Version 20/27 (Paul Carensac, 08/02/2016 04:46 pm)
h1. Technical components
Explanations about the technical components of the project : the ones we have created (internal), and the imported ones (external).
{{>toc}}
---
h2. %{margin-left:0px; font-weight:bold; font-size:25px; display:block; color:red;}Internal components%
h3. Celery tasks list
See on this google document : https://docs.google.com/spreadsheets/d/15fu0BQm0VYx07qyAl5YiP_OwARTJZdd_JEmK4uoteKU/edit?usp=sharing
h3. Pyros grammar (for instruments)
See on this google doc : https://docs.google.com/spreadsheets/d/1rDWRI2FCyFLhu-9HEGVtSUtgD4vUY7FHG977ilPVkdU/edit?usp=sharing
h3. THE AGENT AND SENDER COMPONENTS ARE NOT IMPLEMENTED ANYMORE
h3. Agent
The Agent class is in the common.agent.py file.
* *I - Purpose*
* Generically handles and creates the asynchronous modules
* Uses the threading library (see below in External components) to make all modules independent
* Provides an abstract class to be inherited
* *II - Features*
* Uses a config file (pyros_agent_config.ini) to set the network communication interface of all agents
* Provides a 'work' method to override : this is the entry method of the newly created thread (see 'How to use it' section below)
* Provides the 'receive' and 'analyze_message' methods to generically receive messages from network and analyze them
* *III - How to use it ?*
Each of these points are +NECESSARY+
* Create a new class that inherits from Agent
* In the __init__ method, first call the __init__ method of Agent, passing the name of the agent as second parameter (they are defined in the Agent class, eg: Agent.SCHEDULER)
* Inside the class, define the messages your agent can receive (eg: MSG_OBS_FINISHED = "Observation finished")
* Create a method to be called for every message you created
* In the __init__, after calling the Agent's __init__, associate each message to its associated function in the 'self.actions_by_message' dictionary (eg: self.actions_by_message[self.MSG_OBS_FINISHED] = self.observation_finished)
* Override the method work : this will be the entry function of the new thread, so do whatever you need. This MUST NOT be an infinite loop, because Agent's receive method will be called after this one
* If ever needed, override the 'shutdown' method, it will be called when your agent receive the Agent.SHUTDOWN message (eg: if you created another thread in the 'work' method, you need to close it)
* To start the agent, just instantiate your class and do MyClass.start() (the 'work' method will be called)
The main points to understand are that you can do whatever you want (but non-blocking) in work method (like creating new threads or variables' initialization), then the only entry points are the message-associated methods
* *IV - Important : pyros agents launching*
* In pyros, there is maximum 1 agent per application
* The agent must be started at application start :
* In the MyApp.apps.py file, create a class inheriting from django.apps.AppConfig
* Define the 'name' attribute in it, giving it the name of the agent
* Create a 'ready(self)' method
* in the ready method, import your agent implementation, instantiate it and start it
<pre>
from django.apps import AppConfig
class AlertManagerConfig(AppConfig):
name = 'alert_manager'
def ready(self):
from alert_manager.agent import AlertManagerAgent
self.agent = AlertManagerAgent()
self.agent.start()
</pre>
h3. Sender
The Sender class is in the common.sender.py file
* *I - Purpose*
* Send a given message to an agent
* *II - Features*
* Uses the 'pyros_agent_config.ini' file to get the agents' network interface configuration (ip and port)
* Provide a 'send_to' static method to send the messages
* *III - How to use it ?*
* The targeted agent must be described in 'pyros_agent_config.ini'
* Use Sender.send_to method, giving as first parameter the name of the targeted agent (eg: Agent.SCHEDULER), and as second parameter the message (eg: Agent.SHUTDOWN)
* /!\ send_to is a static method, you don't need to instantiate a Sender (just do Sender.send_to(...))
---
h2. %{margin-left:0px; font-weight:bold; font-size:25px; display:block; color:red;}External components%
h3. Celery
* *I - Purpose*
Celery is used to create and handle tasks in different processes/threads (*called workers*).
Its use is very easy.
* *II - Features*
* Create personalized tasks asynchronously
* Has ETA and countdowns
* Lots of configurations are possible
* *III - How to use it ?*
With Django, you first need to create a *celery.py* in project's folder (the one containing settings.py) :
<pre>
from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'PROJECT_NAME.settings')
from django.conf import settings
app = Celery('PROJECT_NAME')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print("Request: {0!r}".format(self.request))
</pre>
/!\ Replace PROJECT_NAME by your project's name (pyros in our case)
In the project's *__init__.py* (pyros/__init__.py), add this code :
<pre>
from __future__ import absolute_import
from .celery import app as celery_app
</pre>
Now you can *declare tasks* and configure them.
There are two ways of declaring tasks, but the only one interesting us is the *'class form'* :
<pre>
# my_app/operations.py
from __future__ import absolute_import
from celery import shared_task
from time import sleep
@shared_task
def mul(x, y):
sleep(3)
return x * y
</pre>
Then you need to *register the task in settings.py*.
To do this, you just need to indicate the file containing the task in the CELERY_IMPORTS tuple :
<pre>
CELERY_IMPORTS = ("my_app.operations",)
</pre>
Task are registered in queues when created (task creation is explained below).
You will want to *specify in which queue a task is registered* (routed), in settings.py :
<pre>
CELERY_ROUTES = {
"my_app.operations.mul": {"queue": "my_operations_queue"},
"app2.scheduler.Scheduler": {"queue": "scheduling_q"},
}
</pre>
In pyros, we want only one process by queue, to avoid several scheduling at the same time for example. There are also other *useful configurations* :
<pre>
# pyros/settings.py
''' These settings is for the worker to take only 1 task at the same time '''
CELERY_ACKS_LATE = False
CELERYD_PREFETCH_MULTIPLIER = 1
''' Removes pickle warning '''
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
''' The way the tasks result are retrieved '''
CELERY_RESULT_BACKEND = 'rpc://'
</pre>
There we are ! We can now start our workers and create tasks.
To *start the worker* :
<pre>
$ celery worker -A pyros -Q my_operations_queue -n pyros@task_name -c 1
# The -c option is to set the number of process in this worker.
# The -n option sets the name of the queue.
</pre>
To *create a task* (in python) :
<pre>
from my_app.oparations import mul
mul.delay(4, 5)
</pre>
In the terminal where you started your worker, you can see the task and its result !
In your code, you can *wait for a task to be finished*, and retrieve its result :
<pre>
from app.tasks import my_task
result = my_task.delay(4, 4)
result.get() # blocking if the task is not finished
</pre>
You will also want to *stop a task*, given a task id :
<pre>
from celery.task.control import revoke
# To delete a pending task (waiting in queue)
revoke(task_id)
# OR, if you want the task to be aborted even during its execution
revoke(task_id, terminate=True)
# You can retrieve task_id with the 'result' (see above) ==> result.id
</pre>
---
h3. Comet
* *I - Purpose*
Comet is used to receive and send VOEvents. It is called a voevent broker.
It is *very* easy to use
* *II - Features*
* Send a voevent, giving a port, a host and a XML file
* Listen for voevent reception, and store voevents in a given directory
* *III - How to use it ?*
* To send a voevent :
<pre>
# Options --host and --port are optionnal. Default values : 'localhost' and '8089'
$ comet-sendvo --host=localhost --port=8098 -f voevent_to_publish.xml
</pre>
* To receive voevents :
<pre>
$ twistd -n comet --receive --save-event --save-event-directory=some/directory --remote=localhost:5632 --local-ivo=ivo://irap/pyros
</pre>
Possible error :
<pre>
TypeError: 'ProcessLookupError' object is not subscriptable
</pre>
You need to delete the twistd.log and the twistd.pid files. This means that you forgot the '-n' option
* Different available brokers : http://wiki.ivoa.net/twiki/bin/view/IVOA/IvoaVOEvent#VOEvent_Transport
---
h3. Threading library
* *I - Purpose*
* Simply create threads with basic communication
* Allows to handle concurrent access
* *II - Features*
Provides :
* A Thread class to inherit from, with a run() method that will be called when the thread starts
* An Event class to set/unset a boolean in order to transmit message to the thread
* Lock and RLock object to handle concurrent access
* *III - How to use it ?*
<pre>from threading import Thread, Event</pre>
* Thread
* Create a class inheriting from Thread
* Override 'run' method, that will be called at thread start
* Instantiate your class, and do MyClass.start() to create the thread
* Event
* Create an Event variable in your Thread-inheriting class (eg: 'self.stop_event = Event()')
* After thread starts, you can set/unset the event by doing MyClass.stop_event.set() / .clear()
* There are a few useful methods, see this link for further information : https://docs.python.org/3/library/threading.html#threading.Event
* Lock / RLock
* Still not used, see online documentation : https://docs.python.org/3/library/threading.html#lock-objects
---
h3. Socket library
* *I - Purpose*
* Handle network communication, just giving IP and Port of the interlocutors
* *II - Features*
* 'server' system to create an interface, waiting for client connections and sending / receiving data from them
* 'client' system to connect to a server, and send/receive data from it
* *III - How to use it ?*
* Server
* Instantiate socket and wait for connections
<pre>
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # create the socket
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # for the port to be immediately re-usable after closing the socket
self.server_socket.bind((self.ip, self.receive_port)) # associate the socket to an ip and a port
self.server_socket.listen(12) # wait for connections (here, 12 connections can be simultaneously waiting for acceptance)
</pre>
* Accept connections
<pre>
conn, addr = self.server_socket.accept() # conn is a new socket created at the connection
</pre>
* Exchanging messages
<pre>
conn.send(bytes(message, 'UTF-8')) # sending
data = conn.recv(self.buffer_size).decode() # receiving
</pre>
* Closing sockets when you're done with them
<pre>
conn.close()
...
server_socket.close()
</pre>
* Client
* Instantiate the socket and connect to a server
<pre>
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((dest_ip, dest_receive_port))
</pre>
* Exchanging messages
<pre>
client_socket.send(bytes(message, 'UTF-8')) # sending
data = client_socket.recv(self.buffer_size).decode() # receiving
</pre>
* Closing sockets when you're done with them
<pre>
client_socket.close()
</pre>
---
Explanations about the technical components of the project : the ones we have created (internal), and the imported ones (external).
{{>toc}}
---
h2. %{margin-left:0px; font-weight:bold; font-size:25px; display:block; color:red;}Internal components%
h3. Celery tasks list
See on this google document : https://docs.google.com/spreadsheets/d/15fu0BQm0VYx07qyAl5YiP_OwARTJZdd_JEmK4uoteKU/edit?usp=sharing
h3. Pyros grammar (for instruments)
See on this google doc : https://docs.google.com/spreadsheets/d/1rDWRI2FCyFLhu-9HEGVtSUtgD4vUY7FHG977ilPVkdU/edit?usp=sharing
h3. THE AGENT AND SENDER COMPONENTS ARE NOT IMPLEMENTED ANYMORE
h3. Agent
The Agent class is in the common.agent.py file.
* *I - Purpose*
* Generically handles and creates the asynchronous modules
* Uses the threading library (see below in External components) to make all modules independent
* Provides an abstract class to be inherited
* *II - Features*
* Uses a config file (pyros_agent_config.ini) to set the network communication interface of all agents
* Provides a 'work' method to override : this is the entry method of the newly created thread (see 'How to use it' section below)
* Provides the 'receive' and 'analyze_message' methods to generically receive messages from network and analyze them
* *III - How to use it ?*
Each of these points are +NECESSARY+
* Create a new class that inherits from Agent
* In the __init__ method, first call the __init__ method of Agent, passing the name of the agent as second parameter (they are defined in the Agent class, eg: Agent.SCHEDULER)
* Inside the class, define the messages your agent can receive (eg: MSG_OBS_FINISHED = "Observation finished")
* Create a method to be called for every message you created
* In the __init__, after calling the Agent's __init__, associate each message to its associated function in the 'self.actions_by_message' dictionary (eg: self.actions_by_message[self.MSG_OBS_FINISHED] = self.observation_finished)
* Override the method work : this will be the entry function of the new thread, so do whatever you need. This MUST NOT be an infinite loop, because Agent's receive method will be called after this one
* If ever needed, override the 'shutdown' method, it will be called when your agent receive the Agent.SHUTDOWN message (eg: if you created another thread in the 'work' method, you need to close it)
* To start the agent, just instantiate your class and do MyClass.start() (the 'work' method will be called)
The main points to understand are that you can do whatever you want (but non-blocking) in work method (like creating new threads or variables' initialization), then the only entry points are the message-associated methods
* *IV - Important : pyros agents launching*
* In pyros, there is maximum 1 agent per application
* The agent must be started at application start :
* In the MyApp.apps.py file, create a class inheriting from django.apps.AppConfig
* Define the 'name' attribute in it, giving it the name of the agent
* Create a 'ready(self)' method
* in the ready method, import your agent implementation, instantiate it and start it
<pre>
from django.apps import AppConfig
class AlertManagerConfig(AppConfig):
name = 'alert_manager'
def ready(self):
from alert_manager.agent import AlertManagerAgent
self.agent = AlertManagerAgent()
self.agent.start()
</pre>
h3. Sender
The Sender class is in the common.sender.py file
* *I - Purpose*
* Send a given message to an agent
* *II - Features*
* Uses the 'pyros_agent_config.ini' file to get the agents' network interface configuration (ip and port)
* Provide a 'send_to' static method to send the messages
* *III - How to use it ?*
* The targeted agent must be described in 'pyros_agent_config.ini'
* Use Sender.send_to method, giving as first parameter the name of the targeted agent (eg: Agent.SCHEDULER), and as second parameter the message (eg: Agent.SHUTDOWN)
* /!\ send_to is a static method, you don't need to instantiate a Sender (just do Sender.send_to(...))
---
h2. %{margin-left:0px; font-weight:bold; font-size:25px; display:block; color:red;}External components%
h3. Celery
* *I - Purpose*
Celery is used to create and handle tasks in different processes/threads (*called workers*).
Its use is very easy.
* *II - Features*
* Create personalized tasks asynchronously
* Has ETA and countdowns
* Lots of configurations are possible
* *III - How to use it ?*
With Django, you first need to create a *celery.py* in project's folder (the one containing settings.py) :
<pre>
from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'PROJECT_NAME.settings')
from django.conf import settings
app = Celery('PROJECT_NAME')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print("Request: {0!r}".format(self.request))
</pre>
/!\ Replace PROJECT_NAME by your project's name (pyros in our case)
In the project's *__init__.py* (pyros/__init__.py), add this code :
<pre>
from __future__ import absolute_import
from .celery import app as celery_app
</pre>
Now you can *declare tasks* and configure them.
There are two ways of declaring tasks, but the only one interesting us is the *'class form'* :
<pre>
# my_app/operations.py
from __future__ import absolute_import
from celery import shared_task
from time import sleep
@shared_task
def mul(x, y):
sleep(3)
return x * y
</pre>
Then you need to *register the task in settings.py*.
To do this, you just need to indicate the file containing the task in the CELERY_IMPORTS tuple :
<pre>
CELERY_IMPORTS = ("my_app.operations",)
</pre>
Task are registered in queues when created (task creation is explained below).
You will want to *specify in which queue a task is registered* (routed), in settings.py :
<pre>
CELERY_ROUTES = {
"my_app.operations.mul": {"queue": "my_operations_queue"},
"app2.scheduler.Scheduler": {"queue": "scheduling_q"},
}
</pre>
In pyros, we want only one process by queue, to avoid several scheduling at the same time for example. There are also other *useful configurations* :
<pre>
# pyros/settings.py
''' These settings is for the worker to take only 1 task at the same time '''
CELERY_ACKS_LATE = False
CELERYD_PREFETCH_MULTIPLIER = 1
''' Removes pickle warning '''
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
''' The way the tasks result are retrieved '''
CELERY_RESULT_BACKEND = 'rpc://'
</pre>
There we are ! We can now start our workers and create tasks.
To *start the worker* :
<pre>
$ celery worker -A pyros -Q my_operations_queue -n pyros@task_name -c 1
# The -c option is to set the number of process in this worker.
# The -n option sets the name of the queue.
</pre>
To *create a task* (in python) :
<pre>
from my_app.oparations import mul
mul.delay(4, 5)
</pre>
In the terminal where you started your worker, you can see the task and its result !
In your code, you can *wait for a task to be finished*, and retrieve its result :
<pre>
from app.tasks import my_task
result = my_task.delay(4, 4)
result.get() # blocking if the task is not finished
</pre>
You will also want to *stop a task*, given a task id :
<pre>
from celery.task.control import revoke
# To delete a pending task (waiting in queue)
revoke(task_id)
# OR, if you want the task to be aborted even during its execution
revoke(task_id, terminate=True)
# You can retrieve task_id with the 'result' (see above) ==> result.id
</pre>
---
h3. Comet
* *I - Purpose*
Comet is used to receive and send VOEvents. It is called a voevent broker.
It is *very* easy to use
* *II - Features*
* Send a voevent, giving a port, a host and a XML file
* Listen for voevent reception, and store voevents in a given directory
* *III - How to use it ?*
* To send a voevent :
<pre>
# Options --host and --port are optionnal. Default values : 'localhost' and '8089'
$ comet-sendvo --host=localhost --port=8098 -f voevent_to_publish.xml
</pre>
* To receive voevents :
<pre>
$ twistd -n comet --receive --save-event --save-event-directory=some/directory --remote=localhost:5632 --local-ivo=ivo://irap/pyros
</pre>
Possible error :
<pre>
TypeError: 'ProcessLookupError' object is not subscriptable
</pre>
You need to delete the twistd.log and the twistd.pid files. This means that you forgot the '-n' option
* Different available brokers : http://wiki.ivoa.net/twiki/bin/view/IVOA/IvoaVOEvent#VOEvent_Transport
---
h3. Threading library
* *I - Purpose*
* Simply create threads with basic communication
* Allows to handle concurrent access
* *II - Features*
Provides :
* A Thread class to inherit from, with a run() method that will be called when the thread starts
* An Event class to set/unset a boolean in order to transmit message to the thread
* Lock and RLock object to handle concurrent access
* *III - How to use it ?*
<pre>from threading import Thread, Event</pre>
* Thread
* Create a class inheriting from Thread
* Override 'run' method, that will be called at thread start
* Instantiate your class, and do MyClass.start() to create the thread
* Event
* Create an Event variable in your Thread-inheriting class (eg: 'self.stop_event = Event()')
* After thread starts, you can set/unset the event by doing MyClass.stop_event.set() / .clear()
* There are a few useful methods, see this link for further information : https://docs.python.org/3/library/threading.html#threading.Event
* Lock / RLock
* Still not used, see online documentation : https://docs.python.org/3/library/threading.html#lock-objects
---
h3. Socket library
* *I - Purpose*
* Handle network communication, just giving IP and Port of the interlocutors
* *II - Features*
* 'server' system to create an interface, waiting for client connections and sending / receiving data from them
* 'client' system to connect to a server, and send/receive data from it
* *III - How to use it ?*
* Server
* Instantiate socket and wait for connections
<pre>
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # create the socket
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # for the port to be immediately re-usable after closing the socket
self.server_socket.bind((self.ip, self.receive_port)) # associate the socket to an ip and a port
self.server_socket.listen(12) # wait for connections (here, 12 connections can be simultaneously waiting for acceptance)
</pre>
* Accept connections
<pre>
conn, addr = self.server_socket.accept() # conn is a new socket created at the connection
</pre>
* Exchanging messages
<pre>
conn.send(bytes(message, 'UTF-8')) # sending
data = conn.recv(self.buffer_size).decode() # receiving
</pre>
* Closing sockets when you're done with them
<pre>
conn.close()
...
server_socket.close()
</pre>
* Client
* Instantiate the socket and connect to a server
<pre>
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((dest_ip, dest_receive_port))
</pre>
* Exchanging messages
<pre>
client_socket.send(bytes(message, 'UTF-8')) # sending
data = client_socket.recv(self.buffer_size).decode() # receiving
</pre>
* Closing sockets when you're done with them
<pre>
client_socket.close()
</pre>
---