Technical components

Version 25 (Paul Carensac, 08/08/2016 05:16 pm)

1 1 Paul Carensac
h1. Technical components
2 2 Paul Carensac
3 8 Paul Carensac
Explanations about the technical components of the project : the ones we have created (internal), and the imported ones (external).
4 2 Paul Carensac
5 14 Paul Carensac
{{>toc}}
6 14 Paul Carensac
7 2 Paul Carensac
---
8 2 Paul Carensac
9 2 Paul Carensac
h2. %{margin-left:0px; font-weight:bold; font-size:25px;  display:block; color:red;}Internal components%
10 2 Paul Carensac
11 23 Paul Carensac
h3. Fixtures
12 23 Paul Carensac
13 23 Paul Carensac
The fixtures are JSON files representing DB objects. The fixtures allow us to preserve some useful data (as the devices), and load them when the DB is purged / installed, in order to not re-type them manually.
14 23 Paul Carensac
15 23 Paul Carensac
 * Django Commands :
16 23 Paul Carensac
17 23 Paul Carensac
  * To create a fixture : python manage.py dumpdata --indent=4 pyrosapp auth.User > fixtures/initial_fixture.json
18 23 Paul Carensac
    "pyrosapp" and "auth.User" are the django app from which we want to save the models
19 23 Paul Carensac
  * To load a fixture in DB : python manage.py loaddata fixture/initial_fixture.json
20 23 Paul Carensac
21 23 Paul Carensac
We also use fixture for tests : at test laucnhing, the fixture is loaded in the test DB to set up the good conditions for the test.
22 23 Paul Carensac
The fixtures are located in the *src/fixtures* folder.
23 23 Paul Carensac
24 22 Paul Carensac
h3. Simulators
25 22 Paul Carensac
26 22 Paul Carensac
# TODO: à renseigner quand ils serotn plus génériques + bien commenter le code
27 22 Paul Carensac
28 18 Paul Carensac
h3. Celery tasks list
29 18 Paul Carensac
30 18 Paul Carensac
See on this google document : https://docs.google.com/spreadsheets/d/15fu0BQm0VYx07qyAl5YiP_OwARTJZdd_JEmK4uoteKU/edit?usp=sharing
31 18 Paul Carensac
32 18 Paul Carensac
h3. Pyros grammar (for instruments)
33 18 Paul Carensac
34 18 Paul Carensac
See on this google doc : https://docs.google.com/spreadsheets/d/1rDWRI2FCyFLhu-9HEGVtSUtgD4vUY7FHG977ilPVkdU/edit?usp=sharing
35 18 Paul Carensac
36 18 Paul Carensac
h3. THE AGENT AND SENDER COMPONENTS ARE NOT IMPLEMENTED ANYMORE
37 18 Paul Carensac
38 2 Paul Carensac
h3. Agent
39 1 Paul Carensac
40 4 Paul Carensac
The Agent class is in the common.agent.py file.
41 4 Paul Carensac
42 4 Paul Carensac
 * *I - Purpose*
43 4 Paul Carensac
44 4 Paul Carensac
    * Generically handles and creates the asynchronous modules
45 4 Paul Carensac
    * Uses the threading library (see below in External components) to make all modules independent
46 4 Paul Carensac
    * Provides an abstract class to be inherited
47 4 Paul Carensac
48 4 Paul Carensac
 * *II - Features*
49 4 Paul Carensac
50 4 Paul Carensac
    * Uses a config file (pyros_agent_config.ini) to set the network communication interface of all agents
51 4 Paul Carensac
    * Provides a 'work' method to override : this is the entry method of the newly created thread (see 'How to use it' section below)
52 4 Paul Carensac
    * Provides the 'receive' and 'analyze_message' methods to generically receive messages from network and analyze them 
53 4 Paul Carensac
54 4 Paul Carensac
 * *III - How to use it ?*
55 4 Paul Carensac
56 4 Paul Carensac
    Each of these points are +NECESSARY+
57 4 Paul Carensac
58 4 Paul Carensac
    * Create a new class that inherits from Agent
59 4 Paul Carensac
    * In the __init__ method, first call the __init__ method of Agent, passing the name of the agent as second parameter (they are defined in the Agent class, eg: Agent.SCHEDULER)
60 4 Paul Carensac
    * Inside the class, define the messages your agent can receive (eg: MSG_OBS_FINISHED = "Observation finished")
61 4 Paul Carensac
    * Create a method to be called for every message you created
62 4 Paul Carensac
    * In the __init__, after calling the Agent's __init__, associate each message to its associated function in the 'self.actions_by_message' dictionary (eg: self.actions_by_message[self.MSG_OBS_FINISHED] = self.observation_finished)
63 4 Paul Carensac
    * Override the method work : this will be the entry function of the new thread, so do whatever you need. This MUST NOT be an infinite loop, because Agent's receive method will be called after this one
64 4 Paul Carensac
    * If ever needed, override the 'shutdown' method, it will be called when your agent receive the Agent.SHUTDOWN message (eg: if you created another thread in the 'work' method, you need to close it)
65 5 Paul Carensac
    * To start the agent, just instantiate your class and do MyClass.start() (the 'work' method will be called)
66 1 Paul Carensac
67 5 Paul Carensac
    The main points to understand are that you can do whatever you want (but non-blocking) in work method (like creating new threads or variables' initialization), then the only entry points are the message-associated methods
68 4 Paul Carensac
69 7 Paul Carensac
 * *IV - Important : pyros agents launching*
70 7 Paul Carensac
71 7 Paul Carensac
    * In pyros, there is maximum 1 agent per application
72 7 Paul Carensac
    * The agent must be started at application start :
73 7 Paul Carensac
        
74 7 Paul Carensac
        * In the MyApp.apps.py file, create a class inheriting from django.apps.AppConfig
75 7 Paul Carensac
        * Define the 'name' attribute in it, giving it the name of the agent
76 7 Paul Carensac
        * Create a 'ready(self)' method
77 7 Paul Carensac
        * in the ready method, import your agent implementation, instantiate it and start it
78 7 Paul Carensac
<pre>
79 7 Paul Carensac
from django.apps import AppConfig
80 7 Paul Carensac
81 7 Paul Carensac
82 7 Paul Carensac
class AlertManagerConfig(AppConfig):
83 7 Paul Carensac
    name = 'alert_manager'
84 7 Paul Carensac
    
85 7 Paul Carensac
    def ready(self):
86 7 Paul Carensac
        from alert_manager.agent import AlertManagerAgent
87 7 Paul Carensac
        self.agent = AlertManagerAgent()
88 7 Paul Carensac
        self.agent.start()
89 7 Paul Carensac
</pre>
90 7 Paul Carensac
91 2 Paul Carensac
h3. Sender
92 1 Paul Carensac
93 4 Paul Carensac
The Sender class is in the common.sender.py file
94 4 Paul Carensac
95 4 Paul Carensac
 * *I - Purpose*
96 4 Paul Carensac
97 4 Paul Carensac
    * Send a given message to an agent
98 4 Paul Carensac
99 4 Paul Carensac
 * *II - Features*
100 4 Paul Carensac
101 4 Paul Carensac
    * Uses the 'pyros_agent_config.ini' file to get the agents' network interface configuration (ip and port)
102 4 Paul Carensac
    * Provide a 'send_to' static method to send the messages
103 4 Paul Carensac
104 4 Paul Carensac
 * *III - How to use it ?*
105 4 Paul Carensac
106 4 Paul Carensac
    * The targeted agent must be described in 'pyros_agent_config.ini'
107 4 Paul Carensac
    * Use Sender.send_to method, giving as first parameter the name of the targeted agent (eg: Agent.SCHEDULER), and as second parameter the message (eg: Agent.SHUTDOWN)
108 4 Paul Carensac
    * /!\ send_to is a static method, you don't need to instantiate a Sender (just do Sender.send_to(...))
109 9 Paul Carensac
110 2 Paul Carensac
---
111 2 Paul Carensac
112 2 Paul Carensac
h2. %{margin-left:0px; font-weight:bold; font-size:25px;  display:block; color:red;}External components%
113 2 Paul Carensac
114 10 Paul Carensac
h3. Celery
115 10 Paul Carensac
116 10 Paul Carensac
 * *I - Purpose*
117 11 Paul Carensac
    Celery is used to create and handle tasks in different processes/threads (*called workers*).
118 10 Paul Carensac
    Its use is very easy.
119 10 Paul Carensac
120 10 Paul Carensac
 * *II - Features*
121 10 Paul Carensac
122 10 Paul Carensac
    * Create personalized tasks asynchronously
123 10 Paul Carensac
    * Has ETA and countdowns
124 10 Paul Carensac
    * Lots of configurations are possible
125 10 Paul Carensac
126 10 Paul Carensac
 * *III - How to use it ?*
127 1 Paul Carensac
128 12 Paul Carensac
    With Django, you first need to create a *celery.py* in project's folder (the one containing settings.py) :
129 11 Paul Carensac
<pre>
130 11 Paul Carensac
from __future__ import absolute_import
131 11 Paul Carensac
import os
132 11 Paul Carensac
from celery import Celery
133 1 Paul Carensac
134 11 Paul Carensac
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'PROJECT_NAME.settings')
135 11 Paul Carensac
136 11 Paul Carensac
from django.conf import settings
137 11 Paul Carensac
138 11 Paul Carensac
app = Celery('PROJECT_NAME')
139 11 Paul Carensac
140 11 Paul Carensac
app.config_from_object('django.conf:settings')
141 11 Paul Carensac
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
142 11 Paul Carensac
143 11 Paul Carensac
@app.task(bind=True)
144 11 Paul Carensac
def debug_task(self):
145 11 Paul Carensac
    print("Request: {0!r}".format(self.request))
146 11 Paul Carensac
</pre>
147 11 Paul Carensac
148 11 Paul Carensac
    /!\ Replace PROJECT_NAME by your project's name (pyros in our case)
149 11 Paul Carensac
150 12 Paul Carensac
    In the project's *__init__.py* (pyros/__init__.py), add this code :
151 11 Paul Carensac
<pre>
152 11 Paul Carensac
from __future__ import absolute_import
153 11 Paul Carensac
154 11 Paul Carensac
from .celery import app as celery_app
155 11 Paul Carensac
</pre>
156 11 Paul Carensac
157 12 Paul Carensac
    Now you can *declare tasks* and configure them.
158 12 Paul Carensac
    There are two ways of declaring tasks, but the only one interesting us is the *'class form'* :
159 11 Paul Carensac
<pre>
160 11 Paul Carensac
# my_app/operations.py
161 11 Paul Carensac
162 11 Paul Carensac
from __future__ import absolute_import
163 11 Paul Carensac
164 11 Paul Carensac
from celery import shared_task
165 11 Paul Carensac
166 11 Paul Carensac
from time import sleep
167 11 Paul Carensac
168 11 Paul Carensac
@shared_task
169 11 Paul Carensac
def mul(x, y):
170 11 Paul Carensac
    sleep(3)
171 11 Paul Carensac
    return x * y
172 11 Paul Carensac
</pre>
173 11 Paul Carensac
174 12 Paul Carensac
    Then you need to *register the task in settings.py*.
175 11 Paul Carensac
    To do this, you just need to indicate the file containing the task in the CELERY_IMPORTS tuple :
176 11 Paul Carensac
<pre>
177 11 Paul Carensac
    CELERY_IMPORTS = ("my_app.operations",)
178 11 Paul Carensac
</pre>
179 11 Paul Carensac
180 11 Paul Carensac
    Task are registered in queues when created (task creation is explained below).
181 12 Paul Carensac
    You will want to *specify in which queue a task is registered* (routed), in settings.py :
182 11 Paul Carensac
<pre>
183 11 Paul Carensac
CELERY_ROUTES = {
184 11 Paul Carensac
    "my_app.operations.mul": {"queue": "my_operations_queue"},
185 11 Paul Carensac
    "app2.scheduler.Scheduler": {"queue": "scheduling_q"},
186 11 Paul Carensac
}
187 11 Paul Carensac
</pre>
188 11 Paul Carensac
189 12 Paul Carensac
    In pyros, we want only one process by queue, to avoid several scheduling at the same time for example. There are also other *useful configurations* :
190 11 Paul Carensac
<pre>
191 11 Paul Carensac
# pyros/settings.py
192 11 Paul Carensac
193 11 Paul Carensac
''' These settings is for the worker to take only 1 task at the same time '''
194 11 Paul Carensac
CELERY_ACKS_LATE = False
195 11 Paul Carensac
CELERYD_PREFETCH_MULTIPLIER = 1
196 11 Paul Carensac
197 11 Paul Carensac
''' Removes pickle warning '''
198 11 Paul Carensac
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
199 11 Paul Carensac
200 11 Paul Carensac
''' The way the tasks result are retrieved '''
201 11 Paul Carensac
CELERY_RESULT_BACKEND = 'rpc://'
202 11 Paul Carensac
203 11 Paul Carensac
</pre>
204 11 Paul Carensac
205 12 Paul Carensac
    There we are ! We can now start our workers and create tasks.
206 1 Paul Carensac
    To *start the worker* :
207 1 Paul Carensac
<pre>
208 19 Paul Carensac
$ celery worker -A pyros -Q my_operations_queue -n pyros@task_name -c 1
209 19 Paul Carensac
# The -c option is to set the number of process in this worker.
210 19 Paul Carensac
# The -n option sets the name of the queue.
211 1 Paul Carensac
</pre>
212 1 Paul Carensac
213 12 Paul Carensac
    To *create a task* (in python) :
214 1 Paul Carensac
<pre>
215 1 Paul Carensac
from my_app.oparations import mul
216 1 Paul Carensac
mul.delay(4, 5)
217 1 Paul Carensac
</pre>
218 1 Paul Carensac
219 1 Paul Carensac
    In the terminal where you started your worker, you can see the task and its result !
220 12 Paul Carensac
221 12 Paul Carensac
    In your code, you can *wait for a task to be finished*, and retrieve its result :
222 12 Paul Carensac
<pre>
223 12 Paul Carensac
from app.tasks import my_task
224 12 Paul Carensac
result = my_task.delay(4, 4)
225 12 Paul Carensac
result.get() # blocking if the task is not finished
226 12 Paul Carensac
</pre>
227 12 Paul Carensac
228 12 Paul Carensac
    You will also want to *stop a task*, given a task id :
229 12 Paul Carensac
<pre>
230 12 Paul Carensac
from celery.task.control import revoke
231 12 Paul Carensac
232 12 Paul Carensac
# To delete a pending task (waiting in queue)
233 12 Paul Carensac
revoke(task_id)
234 12 Paul Carensac
# OR, if you want the task to be aborted even during its execution
235 12 Paul Carensac
revoke(task_id, terminate=True)
236 12 Paul Carensac
# You can retrieve task_id with the 'result' (see above) ==> result.id
237 12 Paul Carensac
</pre>
238 12 Paul Carensac
239 10 Paul Carensac
---
240 10 Paul Carensac
241 15 Paul Carensac
h3. Comet
242 15 Paul Carensac
243 15 Paul Carensac
 * *I - Purpose*
244 15 Paul Carensac
    Comet is used to receive and send VOEvents. It is called a voevent broker.
245 15 Paul Carensac
    It is *very* easy to use
246 15 Paul Carensac
 
247 15 Paul Carensac
 * *II - Features*
248 15 Paul Carensac
249 15 Paul Carensac
    * Send a voevent, giving a port, a host and a XML file
250 15 Paul Carensac
    * Listen for voevent reception, and store voevents in a given directory
251 15 Paul Carensac
252 15 Paul Carensac
 * *III - How to use it ?*
253 15 Paul Carensac
254 15 Paul Carensac
    * To send a voevent :
255 15 Paul Carensac
<pre>
256 15 Paul Carensac
# Options --host and --port are optionnal. Default values : 'localhost' and '8089'
257 15 Paul Carensac
$ comet-sendvo --host=localhost --port=8098 -f voevent_to_publish.xml
258 15 Paul Carensac
</pre>
259 15 Paul Carensac
    * To receive voevents :
260 15 Paul Carensac
<pre>
261 21 Paul Carensac
$ twistd -n comet --receive --save-event --save-event-directory=some/directory --remote=ip:port --local-ivo=ivo://irap/pyros
262 1 Paul Carensac
</pre>
263 20 Paul Carensac
Possible error : 
264 20 Paul Carensac
<pre>
265 20 Paul Carensac
TypeError: 'ProcessLookupError' object is not subscriptable
266 20 Paul Carensac
</pre>
267 20 Paul Carensac
You need to delete the twistd.log and the twistd.pid files. This means that you forgot the '-n' option
268 20 Paul Carensac
269 20 Paul Carensac
    * Different available brokers : http://wiki.ivoa.net/twiki/bin/view/IVOA/IvoaVOEvent#VOEvent_Transport
270 20 Paul Carensac
271 16 Paul Carensac
272 15 Paul Carensac
---
273 15 Paul Carensac
274 1 Paul Carensac
h3. Threading library
275 5 Paul Carensac
276 24 Paul Carensac
The threading library was used in the agent system.
277 24 Paul Carensac
278 5 Paul Carensac
 * *I - Purpose*
279 5 Paul Carensac
280 5 Paul Carensac
    * Simply create threads with basic communication
281 5 Paul Carensac
    * Allows to handle concurrent access
282 5 Paul Carensac
283 5 Paul Carensac
 * *II - Features*
284 5 Paul Carensac
285 5 Paul Carensac
    Provides :
286 5 Paul Carensac
287 5 Paul Carensac
    * A Thread class to inherit from, with a run() method that will be called when the thread starts
288 5 Paul Carensac
    * An Event class to set/unset a boolean in order to transmit message to the thread
289 5 Paul Carensac
    * Lock and RLock object to handle concurrent access
290 5 Paul Carensac
291 5 Paul Carensac
 * *III - How to use it ?*
292 5 Paul Carensac
293 5 Paul Carensac
    <pre>from threading import Thread, Event</pre>
294 5 Paul Carensac
295 5 Paul Carensac
    * Thread
296 5 Paul Carensac
297 5 Paul Carensac
        * Create a class inheriting from Thread
298 5 Paul Carensac
        * Override 'run' method, that will be called at thread start
299 5 Paul Carensac
        * Instantiate your class, and do MyClass.start() to create the thread
300 5 Paul Carensac
301 5 Paul Carensac
    * Event
302 5 Paul Carensac
303 5 Paul Carensac
        * Create an Event variable in your Thread-inheriting class (eg: 'self.stop_event = Event()')
304 5 Paul Carensac
        * After thread starts, you can set/unset the event by doing MyClass.stop_event.set() / .clear()
305 5 Paul Carensac
        * There are a few useful methods, see this link for further information : https://docs.python.org/3/library/threading.html#threading.Event
306 5 Paul Carensac
    * Lock / RLock
307 5 Paul Carensac
308 5 Paul Carensac
        * Still not used, see online documentation : https://docs.python.org/3/library/threading.html#lock-objects
309 10 Paul Carensac
310 10 Paul Carensac
---
311 2 Paul Carensac
312 2 Paul Carensac
h3. Socket library
313 2 Paul Carensac
314 25 Paul Carensac
The sockets are used in the simulators and the Instruments Command Control.
315 25 Paul Carensac
316 6 Paul Carensac
 * *I - Purpose*
317 6 Paul Carensac
318 6 Paul Carensac
    * Handle network communication, just giving IP and Port of the interlocutors
319 6 Paul Carensac
320 6 Paul Carensac
 * *II - Features*
321 6 Paul Carensac
322 6 Paul Carensac
    * 'server' system to create an interface, waiting for client connections and sending / receiving data from them
323 6 Paul Carensac
    * 'client' system to connect to a server, and send/receive data from it
324 6 Paul Carensac
325 6 Paul Carensac
 * *III - How to use it ?*
326 6 Paul Carensac
327 6 Paul Carensac
    * Server
328 6 Paul Carensac
329 6 Paul Carensac
        * Instantiate socket and wait for connections
330 6 Paul Carensac
<pre>
331 25 Paul Carensac
import socket
332 25 Paul Carensac
333 6 Paul Carensac
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)   # create the socket
334 6 Paul Carensac
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # for the port to be immediately re-usable after closing the socket
335 6 Paul Carensac
self.server_socket.bind((self.ip, self.receive_port))                    # associate the socket to an ip and a port
336 6 Paul Carensac
self.server_socket.listen(12)                                            # wait for connections (here, 12 connections can be simultaneously waiting for acceptance)
337 6 Paul Carensac
</pre>
338 6 Paul Carensac
        * Accept connections
339 6 Paul Carensac
<pre>
340 6 Paul Carensac
conn, addr = self.server_socket.accept() # conn is a new socket created at the connection
341 6 Paul Carensac
</pre>
342 6 Paul Carensac
        * Exchanging messages
343 6 Paul Carensac
<pre>
344 6 Paul Carensac
conn.send(bytes(message, 'UTF-8'))          # sending
345 6 Paul Carensac
data = conn.recv(self.buffer_size).decode() # receiving
346 6 Paul Carensac
</pre>
347 6 Paul Carensac
        * Closing sockets when you're done with them
348 6 Paul Carensac
<pre>
349 6 Paul Carensac
conn.close()
350 6 Paul Carensac
...
351 6 Paul Carensac
server_socket.close()
352 6 Paul Carensac
</pre>
353 6 Paul Carensac
354 6 Paul Carensac
    * Client
355 6 Paul Carensac
356 6 Paul Carensac
        * Instantiate the socket and connect to a server
357 6 Paul Carensac
<pre>
358 6 Paul Carensac
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
359 6 Paul Carensac
client_socket.connect((dest_ip, dest_receive_port))
360 6 Paul Carensac
</pre>
361 6 Paul Carensac
        * Exchanging messages
362 6 Paul Carensac
<pre>
363 6 Paul Carensac
client_socket.send(bytes(message, 'UTF-8'))          # sending
364 6 Paul Carensac
data = client_socket.recv(self.buffer_size).decode() # receiving
365 6 Paul Carensac
</pre>
366 6 Paul Carensac
        * Closing sockets when you're done with them
367 6 Paul Carensac
<pre>
368 6 Paul Carensac
client_socket.close()
369 6 Paul Carensac
</pre>
370 6 Paul Carensac
371 2 Paul Carensac
---