Technical components

Version 22 (Paul Carensac, 08/08/2016 11:20 am)

1 1 Paul Carensac
h1. Technical components
2 2 Paul Carensac
3 8 Paul Carensac
Explanations about the technical components of the project : the ones we have created (internal), and the imported ones (external).
4 2 Paul Carensac
5 14 Paul Carensac
{{>toc}}
6 14 Paul Carensac
7 2 Paul Carensac
---
8 2 Paul Carensac
9 2 Paul Carensac
h2. %{margin-left:0px; font-weight:bold; font-size:25px;  display:block; color:red;}Internal components%
10 2 Paul Carensac
11 22 Paul Carensac
h3. Simulators
12 22 Paul Carensac
13 22 Paul Carensac
# TODO: à renseigner quand ils serotn plus génériques + bien commenter le code
14 22 Paul Carensac
15 18 Paul Carensac
h3. Celery tasks list
16 18 Paul Carensac
17 18 Paul Carensac
See on this google document : https://docs.google.com/spreadsheets/d/15fu0BQm0VYx07qyAl5YiP_OwARTJZdd_JEmK4uoteKU/edit?usp=sharing
18 18 Paul Carensac
19 18 Paul Carensac
h3. Pyros grammar (for instruments)
20 18 Paul Carensac
21 18 Paul Carensac
See on this google doc : https://docs.google.com/spreadsheets/d/1rDWRI2FCyFLhu-9HEGVtSUtgD4vUY7FHG977ilPVkdU/edit?usp=sharing
22 18 Paul Carensac
23 18 Paul Carensac
h3. THE AGENT AND SENDER COMPONENTS ARE NOT IMPLEMENTED ANYMORE
24 18 Paul Carensac
25 2 Paul Carensac
h3. Agent
26 1 Paul Carensac
27 4 Paul Carensac
The Agent class is in the common.agent.py file.
28 4 Paul Carensac
29 4 Paul Carensac
 * *I - Purpose*
30 4 Paul Carensac
31 4 Paul Carensac
    * Generically handles and creates the asynchronous modules
32 4 Paul Carensac
    * Uses the threading library (see below in External components) to make all modules independent
33 4 Paul Carensac
    * Provides an abstract class to be inherited
34 4 Paul Carensac
35 4 Paul Carensac
 * *II - Features*
36 4 Paul Carensac
37 4 Paul Carensac
    * Uses a config file (pyros_agent_config.ini) to set the network communication interface of all agents
38 4 Paul Carensac
    * Provides a 'work' method to override : this is the entry method of the newly created thread (see 'How to use it' section below)
39 4 Paul Carensac
    * Provides the 'receive' and 'analyze_message' methods to generically receive messages from network and analyze them 
40 4 Paul Carensac
41 4 Paul Carensac
 * *III - How to use it ?*
42 4 Paul Carensac
43 4 Paul Carensac
    Each of these points are +NECESSARY+
44 4 Paul Carensac
45 4 Paul Carensac
    * Create a new class that inherits from Agent
46 4 Paul Carensac
    * In the __init__ method, first call the __init__ method of Agent, passing the name of the agent as second parameter (they are defined in the Agent class, eg: Agent.SCHEDULER)
47 4 Paul Carensac
    * Inside the class, define the messages your agent can receive (eg: MSG_OBS_FINISHED = "Observation finished")
48 4 Paul Carensac
    * Create a method to be called for every message you created
49 4 Paul Carensac
    * In the __init__, after calling the Agent's __init__, associate each message to its associated function in the 'self.actions_by_message' dictionary (eg: self.actions_by_message[self.MSG_OBS_FINISHED] = self.observation_finished)
50 4 Paul Carensac
    * Override the method work : this will be the entry function of the new thread, so do whatever you need. This MUST NOT be an infinite loop, because Agent's receive method will be called after this one
51 4 Paul Carensac
    * If ever needed, override the 'shutdown' method, it will be called when your agent receive the Agent.SHUTDOWN message (eg: if you created another thread in the 'work' method, you need to close it)
52 5 Paul Carensac
    * To start the agent, just instantiate your class and do MyClass.start() (the 'work' method will be called)
53 1 Paul Carensac
54 5 Paul Carensac
    The main points to understand are that you can do whatever you want (but non-blocking) in work method (like creating new threads or variables' initialization), then the only entry points are the message-associated methods
55 4 Paul Carensac
56 7 Paul Carensac
 * *IV - Important : pyros agents launching*
57 7 Paul Carensac
58 7 Paul Carensac
    * In pyros, there is maximum 1 agent per application
59 7 Paul Carensac
    * The agent must be started at application start :
60 7 Paul Carensac
        
61 7 Paul Carensac
        * In the MyApp.apps.py file, create a class inheriting from django.apps.AppConfig
62 7 Paul Carensac
        * Define the 'name' attribute in it, giving it the name of the agent
63 7 Paul Carensac
        * Create a 'ready(self)' method
64 7 Paul Carensac
        * in the ready method, import your agent implementation, instantiate it and start it
65 7 Paul Carensac
<pre>
66 7 Paul Carensac
from django.apps import AppConfig
67 7 Paul Carensac
68 7 Paul Carensac
69 7 Paul Carensac
class AlertManagerConfig(AppConfig):
70 7 Paul Carensac
    name = 'alert_manager'
71 7 Paul Carensac
    
72 7 Paul Carensac
    def ready(self):
73 7 Paul Carensac
        from alert_manager.agent import AlertManagerAgent
74 7 Paul Carensac
        self.agent = AlertManagerAgent()
75 7 Paul Carensac
        self.agent.start()
76 7 Paul Carensac
</pre>
77 7 Paul Carensac
78 2 Paul Carensac
h3. Sender
79 1 Paul Carensac
80 4 Paul Carensac
The Sender class is in the common.sender.py file
81 4 Paul Carensac
82 4 Paul Carensac
 * *I - Purpose*
83 4 Paul Carensac
84 4 Paul Carensac
    * Send a given message to an agent
85 4 Paul Carensac
86 4 Paul Carensac
 * *II - Features*
87 4 Paul Carensac
88 4 Paul Carensac
    * Uses the 'pyros_agent_config.ini' file to get the agents' network interface configuration (ip and port)
89 4 Paul Carensac
    * Provide a 'send_to' static method to send the messages
90 4 Paul Carensac
91 4 Paul Carensac
 * *III - How to use it ?*
92 4 Paul Carensac
93 4 Paul Carensac
    * The targeted agent must be described in 'pyros_agent_config.ini'
94 4 Paul Carensac
    * Use Sender.send_to method, giving as first parameter the name of the targeted agent (eg: Agent.SCHEDULER), and as second parameter the message (eg: Agent.SHUTDOWN)
95 4 Paul Carensac
    * /!\ send_to is a static method, you don't need to instantiate a Sender (just do Sender.send_to(...))
96 9 Paul Carensac
97 2 Paul Carensac
---
98 2 Paul Carensac
99 2 Paul Carensac
h2. %{margin-left:0px; font-weight:bold; font-size:25px;  display:block; color:red;}External components%
100 2 Paul Carensac
101 10 Paul Carensac
h3. Celery
102 10 Paul Carensac
103 10 Paul Carensac
 * *I - Purpose*
104 11 Paul Carensac
    Celery is used to create and handle tasks in different processes/threads (*called workers*).
105 10 Paul Carensac
    Its use is very easy.
106 10 Paul Carensac
107 10 Paul Carensac
 * *II - Features*
108 10 Paul Carensac
109 10 Paul Carensac
    * Create personalized tasks asynchronously
110 10 Paul Carensac
    * Has ETA and countdowns
111 10 Paul Carensac
    * Lots of configurations are possible
112 10 Paul Carensac
113 10 Paul Carensac
 * *III - How to use it ?*
114 1 Paul Carensac
115 12 Paul Carensac
    With Django, you first need to create a *celery.py* in project's folder (the one containing settings.py) :
116 11 Paul Carensac
<pre>
117 11 Paul Carensac
from __future__ import absolute_import
118 11 Paul Carensac
import os
119 11 Paul Carensac
from celery import Celery
120 1 Paul Carensac
121 11 Paul Carensac
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'PROJECT_NAME.settings')
122 11 Paul Carensac
123 11 Paul Carensac
from django.conf import settings
124 11 Paul Carensac
125 11 Paul Carensac
app = Celery('PROJECT_NAME')
126 11 Paul Carensac
127 11 Paul Carensac
app.config_from_object('django.conf:settings')
128 11 Paul Carensac
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
129 11 Paul Carensac
130 11 Paul Carensac
@app.task(bind=True)
131 11 Paul Carensac
def debug_task(self):
132 11 Paul Carensac
    print("Request: {0!r}".format(self.request))
133 11 Paul Carensac
</pre>
134 11 Paul Carensac
135 11 Paul Carensac
    /!\ Replace PROJECT_NAME by your project's name (pyros in our case)
136 11 Paul Carensac
137 12 Paul Carensac
    In the project's *__init__.py* (pyros/__init__.py), add this code :
138 11 Paul Carensac
<pre>
139 11 Paul Carensac
from __future__ import absolute_import
140 11 Paul Carensac
141 11 Paul Carensac
from .celery import app as celery_app
142 11 Paul Carensac
</pre>
143 11 Paul Carensac
144 12 Paul Carensac
    Now you can *declare tasks* and configure them.
145 12 Paul Carensac
    There are two ways of declaring tasks, but the only one interesting us is the *'class form'* :
146 11 Paul Carensac
<pre>
147 11 Paul Carensac
# my_app/operations.py
148 11 Paul Carensac
149 11 Paul Carensac
from __future__ import absolute_import
150 11 Paul Carensac
151 11 Paul Carensac
from celery import shared_task
152 11 Paul Carensac
153 11 Paul Carensac
from time import sleep
154 11 Paul Carensac
155 11 Paul Carensac
@shared_task
156 11 Paul Carensac
def mul(x, y):
157 11 Paul Carensac
    sleep(3)
158 11 Paul Carensac
    return x * y
159 11 Paul Carensac
</pre>
160 11 Paul Carensac
161 12 Paul Carensac
    Then you need to *register the task in settings.py*.
162 11 Paul Carensac
    To do this, you just need to indicate the file containing the task in the CELERY_IMPORTS tuple :
163 11 Paul Carensac
<pre>
164 11 Paul Carensac
    CELERY_IMPORTS = ("my_app.operations",)
165 11 Paul Carensac
</pre>
166 11 Paul Carensac
167 11 Paul Carensac
    Task are registered in queues when created (task creation is explained below).
168 12 Paul Carensac
    You will want to *specify in which queue a task is registered* (routed), in settings.py :
169 11 Paul Carensac
<pre>
170 11 Paul Carensac
CELERY_ROUTES = {
171 11 Paul Carensac
    "my_app.operations.mul": {"queue": "my_operations_queue"},
172 11 Paul Carensac
    "app2.scheduler.Scheduler": {"queue": "scheduling_q"},
173 11 Paul Carensac
}
174 11 Paul Carensac
</pre>
175 11 Paul Carensac
176 12 Paul Carensac
    In pyros, we want only one process by queue, to avoid several scheduling at the same time for example. There are also other *useful configurations* :
177 11 Paul Carensac
<pre>
178 11 Paul Carensac
# pyros/settings.py
179 11 Paul Carensac
180 11 Paul Carensac
''' These settings is for the worker to take only 1 task at the same time '''
181 11 Paul Carensac
CELERY_ACKS_LATE = False
182 11 Paul Carensac
CELERYD_PREFETCH_MULTIPLIER = 1
183 11 Paul Carensac
184 11 Paul Carensac
''' Removes pickle warning '''
185 11 Paul Carensac
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
186 11 Paul Carensac
187 11 Paul Carensac
''' The way the tasks result are retrieved '''
188 11 Paul Carensac
CELERY_RESULT_BACKEND = 'rpc://'
189 11 Paul Carensac
190 11 Paul Carensac
</pre>
191 11 Paul Carensac
192 12 Paul Carensac
    There we are ! We can now start our workers and create tasks.
193 1 Paul Carensac
    To *start the worker* :
194 1 Paul Carensac
<pre>
195 19 Paul Carensac
$ celery worker -A pyros -Q my_operations_queue -n pyros@task_name -c 1
196 19 Paul Carensac
# The -c option is to set the number of process in this worker.
197 19 Paul Carensac
# The -n option sets the name of the queue.
198 1 Paul Carensac
</pre>
199 1 Paul Carensac
200 12 Paul Carensac
    To *create a task* (in python) :
201 1 Paul Carensac
<pre>
202 1 Paul Carensac
from my_app.oparations import mul
203 1 Paul Carensac
mul.delay(4, 5)
204 1 Paul Carensac
</pre>
205 1 Paul Carensac
206 1 Paul Carensac
    In the terminal where you started your worker, you can see the task and its result !
207 12 Paul Carensac
208 12 Paul Carensac
    In your code, you can *wait for a task to be finished*, and retrieve its result :
209 12 Paul Carensac
<pre>
210 12 Paul Carensac
from app.tasks import my_task
211 12 Paul Carensac
result = my_task.delay(4, 4)
212 12 Paul Carensac
result.get() # blocking if the task is not finished
213 12 Paul Carensac
</pre>
214 12 Paul Carensac
215 12 Paul Carensac
    You will also want to *stop a task*, given a task id :
216 12 Paul Carensac
<pre>
217 12 Paul Carensac
from celery.task.control import revoke
218 12 Paul Carensac
219 12 Paul Carensac
# To delete a pending task (waiting in queue)
220 12 Paul Carensac
revoke(task_id)
221 12 Paul Carensac
# OR, if you want the task to be aborted even during its execution
222 12 Paul Carensac
revoke(task_id, terminate=True)
223 12 Paul Carensac
# You can retrieve task_id with the 'result' (see above) ==> result.id
224 12 Paul Carensac
</pre>
225 12 Paul Carensac
226 10 Paul Carensac
---
227 10 Paul Carensac
228 15 Paul Carensac
h3. Comet
229 15 Paul Carensac
230 15 Paul Carensac
 * *I - Purpose*
231 15 Paul Carensac
    Comet is used to receive and send VOEvents. It is called a voevent broker.
232 15 Paul Carensac
    It is *very* easy to use
233 15 Paul Carensac
 
234 15 Paul Carensac
 * *II - Features*
235 15 Paul Carensac
236 15 Paul Carensac
    * Send a voevent, giving a port, a host and a XML file
237 15 Paul Carensac
    * Listen for voevent reception, and store voevents in a given directory
238 15 Paul Carensac
239 15 Paul Carensac
 * *III - How to use it ?*
240 15 Paul Carensac
241 15 Paul Carensac
    * To send a voevent :
242 15 Paul Carensac
<pre>
243 15 Paul Carensac
# Options --host and --port are optionnal. Default values : 'localhost' and '8089'
244 15 Paul Carensac
$ comet-sendvo --host=localhost --port=8098 -f voevent_to_publish.xml
245 15 Paul Carensac
</pre>
246 15 Paul Carensac
    * To receive voevents :
247 15 Paul Carensac
<pre>
248 21 Paul Carensac
$ twistd -n comet --receive --save-event --save-event-directory=some/directory --remote=ip:port --local-ivo=ivo://irap/pyros
249 1 Paul Carensac
</pre>
250 20 Paul Carensac
Possible error : 
251 20 Paul Carensac
<pre>
252 20 Paul Carensac
TypeError: 'ProcessLookupError' object is not subscriptable
253 20 Paul Carensac
</pre>
254 20 Paul Carensac
You need to delete the twistd.log and the twistd.pid files. This means that you forgot the '-n' option
255 20 Paul Carensac
256 20 Paul Carensac
    * Different available brokers : http://wiki.ivoa.net/twiki/bin/view/IVOA/IvoaVOEvent#VOEvent_Transport
257 20 Paul Carensac
258 16 Paul Carensac
259 15 Paul Carensac
---
260 15 Paul Carensac
261 1 Paul Carensac
h3. Threading library
262 5 Paul Carensac
263 5 Paul Carensac
 * *I - Purpose*
264 5 Paul Carensac
265 5 Paul Carensac
    * Simply create threads with basic communication
266 5 Paul Carensac
    * Allows to handle concurrent access
267 5 Paul Carensac
268 5 Paul Carensac
 * *II - Features*
269 5 Paul Carensac
270 5 Paul Carensac
    Provides :
271 5 Paul Carensac
272 5 Paul Carensac
    * A Thread class to inherit from, with a run() method that will be called when the thread starts
273 5 Paul Carensac
    * An Event class to set/unset a boolean in order to transmit message to the thread
274 5 Paul Carensac
    * Lock and RLock object to handle concurrent access
275 5 Paul Carensac
276 5 Paul Carensac
 * *III - How to use it ?*
277 5 Paul Carensac
278 5 Paul Carensac
    <pre>from threading import Thread, Event</pre>
279 5 Paul Carensac
280 5 Paul Carensac
    * Thread
281 5 Paul Carensac
282 5 Paul Carensac
        * Create a class inheriting from Thread
283 5 Paul Carensac
        * Override 'run' method, that will be called at thread start
284 5 Paul Carensac
        * Instantiate your class, and do MyClass.start() to create the thread
285 5 Paul Carensac
286 5 Paul Carensac
    * Event
287 5 Paul Carensac
288 5 Paul Carensac
        * Create an Event variable in your Thread-inheriting class (eg: 'self.stop_event = Event()')
289 5 Paul Carensac
        * After thread starts, you can set/unset the event by doing MyClass.stop_event.set() / .clear()
290 5 Paul Carensac
        * There are a few useful methods, see this link for further information : https://docs.python.org/3/library/threading.html#threading.Event
291 5 Paul Carensac
    * Lock / RLock
292 5 Paul Carensac
293 5 Paul Carensac
        * Still not used, see online documentation : https://docs.python.org/3/library/threading.html#lock-objects
294 10 Paul Carensac
295 10 Paul Carensac
---
296 2 Paul Carensac
297 2 Paul Carensac
h3. Socket library
298 2 Paul Carensac
299 6 Paul Carensac
 * *I - Purpose*
300 6 Paul Carensac
301 6 Paul Carensac
    * Handle network communication, just giving IP and Port of the interlocutors
302 6 Paul Carensac
303 6 Paul Carensac
 * *II - Features*
304 6 Paul Carensac
305 6 Paul Carensac
    * 'server' system to create an interface, waiting for client connections and sending / receiving data from them
306 6 Paul Carensac
    * 'client' system to connect to a server, and send/receive data from it
307 6 Paul Carensac
308 6 Paul Carensac
 * *III - How to use it ?*
309 6 Paul Carensac
310 6 Paul Carensac
    * Server
311 6 Paul Carensac
312 6 Paul Carensac
        * Instantiate socket and wait for connections
313 6 Paul Carensac
<pre>
314 6 Paul Carensac
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)   # create the socket
315 6 Paul Carensac
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # for the port to be immediately re-usable after closing the socket
316 6 Paul Carensac
self.server_socket.bind((self.ip, self.receive_port))                    # associate the socket to an ip and a port
317 6 Paul Carensac
self.server_socket.listen(12)                                            # wait for connections (here, 12 connections can be simultaneously waiting for acceptance)
318 6 Paul Carensac
</pre>
319 6 Paul Carensac
        * Accept connections
320 6 Paul Carensac
<pre>
321 6 Paul Carensac
conn, addr = self.server_socket.accept() # conn is a new socket created at the connection
322 6 Paul Carensac
</pre>
323 6 Paul Carensac
        * Exchanging messages
324 6 Paul Carensac
<pre>
325 6 Paul Carensac
conn.send(bytes(message, 'UTF-8'))          # sending
326 6 Paul Carensac
data = conn.recv(self.buffer_size).decode() # receiving
327 6 Paul Carensac
</pre>
328 6 Paul Carensac
        * Closing sockets when you're done with them
329 6 Paul Carensac
<pre>
330 6 Paul Carensac
conn.close()
331 6 Paul Carensac
...
332 6 Paul Carensac
server_socket.close()
333 6 Paul Carensac
</pre>
334 6 Paul Carensac
335 6 Paul Carensac
    * Client
336 6 Paul Carensac
337 6 Paul Carensac
        * Instantiate the socket and connect to a server
338 6 Paul Carensac
<pre>
339 6 Paul Carensac
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
340 6 Paul Carensac
client_socket.connect((dest_ip, dest_receive_port))
341 6 Paul Carensac
</pre>
342 6 Paul Carensac
        * Exchanging messages
343 6 Paul Carensac
<pre>
344 6 Paul Carensac
client_socket.send(bytes(message, 'UTF-8'))          # sending
345 6 Paul Carensac
data = client_socket.recv(self.buffer_size).decode() # receiving
346 6 Paul Carensac
</pre>
347 6 Paul Carensac
        * Closing sockets when you're done with them
348 6 Paul Carensac
<pre>
349 6 Paul Carensac
client_socket.close()
350 6 Paul Carensac
</pre>
351 6 Paul Carensac
352 2 Paul Carensac
---