Technical components

Version 20 (Paul Carensac, 08/02/2016 04:46 pm)

1 1 Paul Carensac
h1. Technical components
2 2 Paul Carensac
3 8 Paul Carensac
Explanations about the technical components of the project : the ones we have created (internal), and the imported ones (external).
4 2 Paul Carensac
5 14 Paul Carensac
{{>toc}}
6 14 Paul Carensac
7 2 Paul Carensac
---
8 2 Paul Carensac
9 2 Paul Carensac
h2. %{margin-left:0px; font-weight:bold; font-size:25px;  display:block; color:red;}Internal components%
10 2 Paul Carensac
11 18 Paul Carensac
h3. Celery tasks list
12 18 Paul Carensac
13 18 Paul Carensac
See on this google document : https://docs.google.com/spreadsheets/d/15fu0BQm0VYx07qyAl5YiP_OwARTJZdd_JEmK4uoteKU/edit?usp=sharing
14 18 Paul Carensac
15 18 Paul Carensac
h3. Pyros grammar (for instruments)
16 18 Paul Carensac
17 18 Paul Carensac
See on this google doc : https://docs.google.com/spreadsheets/d/1rDWRI2FCyFLhu-9HEGVtSUtgD4vUY7FHG977ilPVkdU/edit?usp=sharing
18 18 Paul Carensac
19 18 Paul Carensac
h3. THE AGENT AND SENDER COMPONENTS ARE NOT IMPLEMENTED ANYMORE
20 18 Paul Carensac
21 2 Paul Carensac
h3. Agent
22 1 Paul Carensac
23 4 Paul Carensac
The Agent class is in the common.agent.py file.
24 4 Paul Carensac
25 4 Paul Carensac
 * *I - Purpose*
26 4 Paul Carensac
27 4 Paul Carensac
    * Generically handles and creates the asynchronous modules
28 4 Paul Carensac
    * Uses the threading library (see below in External components) to make all modules independent
29 4 Paul Carensac
    * Provides an abstract class to be inherited
30 4 Paul Carensac
31 4 Paul Carensac
 * *II - Features*
32 4 Paul Carensac
33 4 Paul Carensac
    * Uses a config file (pyros_agent_config.ini) to set the network communication interface of all agents
34 4 Paul Carensac
    * Provides a 'work' method to override : this is the entry method of the newly created thread (see 'How to use it' section below)
35 4 Paul Carensac
    * Provides the 'receive' and 'analyze_message' methods to generically receive messages from network and analyze them 
36 4 Paul Carensac
37 4 Paul Carensac
 * *III - How to use it ?*
38 4 Paul Carensac
39 4 Paul Carensac
    Each of these points are +NECESSARY+
40 4 Paul Carensac
41 4 Paul Carensac
    * Create a new class that inherits from Agent
42 4 Paul Carensac
    * In the __init__ method, first call the __init__ method of Agent, passing the name of the agent as second parameter (they are defined in the Agent class, eg: Agent.SCHEDULER)
43 4 Paul Carensac
    * Inside the class, define the messages your agent can receive (eg: MSG_OBS_FINISHED = "Observation finished")
44 4 Paul Carensac
    * Create a method to be called for every message you created
45 4 Paul Carensac
    * In the __init__, after calling the Agent's __init__, associate each message to its associated function in the 'self.actions_by_message' dictionary (eg: self.actions_by_message[self.MSG_OBS_FINISHED] = self.observation_finished)
46 4 Paul Carensac
    * Override the method work : this will be the entry function of the new thread, so do whatever you need. This MUST NOT be an infinite loop, because Agent's receive method will be called after this one
47 4 Paul Carensac
    * If ever needed, override the 'shutdown' method, it will be called when your agent receive the Agent.SHUTDOWN message (eg: if you created another thread in the 'work' method, you need to close it)
48 5 Paul Carensac
    * To start the agent, just instantiate your class and do MyClass.start() (the 'work' method will be called)
49 1 Paul Carensac
50 5 Paul Carensac
    The main points to understand are that you can do whatever you want (but non-blocking) in work method (like creating new threads or variables' initialization), then the only entry points are the message-associated methods
51 4 Paul Carensac
52 7 Paul Carensac
 * *IV - Important : pyros agents launching*
53 7 Paul Carensac
54 7 Paul Carensac
    * In pyros, there is maximum 1 agent per application
55 7 Paul Carensac
    * The agent must be started at application start :
56 7 Paul Carensac
        
57 7 Paul Carensac
        * In the MyApp.apps.py file, create a class inheriting from django.apps.AppConfig
58 7 Paul Carensac
        * Define the 'name' attribute in it, giving it the name of the agent
59 7 Paul Carensac
        * Create a 'ready(self)' method
60 7 Paul Carensac
        * in the ready method, import your agent implementation, instantiate it and start it
61 7 Paul Carensac
<pre>
62 7 Paul Carensac
from django.apps import AppConfig
63 7 Paul Carensac
64 7 Paul Carensac
65 7 Paul Carensac
class AlertManagerConfig(AppConfig):
66 7 Paul Carensac
    name = 'alert_manager'
67 7 Paul Carensac
    
68 7 Paul Carensac
    def ready(self):
69 7 Paul Carensac
        from alert_manager.agent import AlertManagerAgent
70 7 Paul Carensac
        self.agent = AlertManagerAgent()
71 7 Paul Carensac
        self.agent.start()
72 7 Paul Carensac
</pre>
73 7 Paul Carensac
74 2 Paul Carensac
h3. Sender
75 1 Paul Carensac
76 4 Paul Carensac
The Sender class is in the common.sender.py file
77 4 Paul Carensac
78 4 Paul Carensac
 * *I - Purpose*
79 4 Paul Carensac
80 4 Paul Carensac
    * Send a given message to an agent
81 4 Paul Carensac
82 4 Paul Carensac
 * *II - Features*
83 4 Paul Carensac
84 4 Paul Carensac
    * Uses the 'pyros_agent_config.ini' file to get the agents' network interface configuration (ip and port)
85 4 Paul Carensac
    * Provide a 'send_to' static method to send the messages
86 4 Paul Carensac
87 4 Paul Carensac
 * *III - How to use it ?*
88 4 Paul Carensac
89 4 Paul Carensac
    * The targeted agent must be described in 'pyros_agent_config.ini'
90 4 Paul Carensac
    * Use Sender.send_to method, giving as first parameter the name of the targeted agent (eg: Agent.SCHEDULER), and as second parameter the message (eg: Agent.SHUTDOWN)
91 4 Paul Carensac
    * /!\ send_to is a static method, you don't need to instantiate a Sender (just do Sender.send_to(...))
92 9 Paul Carensac
93 2 Paul Carensac
---
94 2 Paul Carensac
95 2 Paul Carensac
h2. %{margin-left:0px; font-weight:bold; font-size:25px;  display:block; color:red;}External components%
96 2 Paul Carensac
97 10 Paul Carensac
h3. Celery
98 10 Paul Carensac
99 10 Paul Carensac
 * *I - Purpose*
100 11 Paul Carensac
    Celery is used to create and handle tasks in different processes/threads (*called workers*).
101 10 Paul Carensac
    Its use is very easy.
102 10 Paul Carensac
103 10 Paul Carensac
 * *II - Features*
104 10 Paul Carensac
105 10 Paul Carensac
    * Create personalized tasks asynchronously
106 10 Paul Carensac
    * Has ETA and countdowns
107 10 Paul Carensac
    * Lots of configurations are possible
108 10 Paul Carensac
109 10 Paul Carensac
 * *III - How to use it ?*
110 1 Paul Carensac
111 12 Paul Carensac
    With Django, you first need to create a *celery.py* in project's folder (the one containing settings.py) :
112 11 Paul Carensac
<pre>
113 11 Paul Carensac
from __future__ import absolute_import
114 11 Paul Carensac
import os
115 11 Paul Carensac
from celery import Celery
116 1 Paul Carensac
117 11 Paul Carensac
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'PROJECT_NAME.settings')
118 11 Paul Carensac
119 11 Paul Carensac
from django.conf import settings
120 11 Paul Carensac
121 11 Paul Carensac
app = Celery('PROJECT_NAME')
122 11 Paul Carensac
123 11 Paul Carensac
app.config_from_object('django.conf:settings')
124 11 Paul Carensac
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
125 11 Paul Carensac
126 11 Paul Carensac
@app.task(bind=True)
127 11 Paul Carensac
def debug_task(self):
128 11 Paul Carensac
    print("Request: {0!r}".format(self.request))
129 11 Paul Carensac
</pre>
130 11 Paul Carensac
131 11 Paul Carensac
    /!\ Replace PROJECT_NAME by your project's name (pyros in our case)
132 11 Paul Carensac
133 12 Paul Carensac
    In the project's *__init__.py* (pyros/__init__.py), add this code :
134 11 Paul Carensac
<pre>
135 11 Paul Carensac
from __future__ import absolute_import
136 11 Paul Carensac
137 11 Paul Carensac
from .celery import app as celery_app
138 11 Paul Carensac
</pre>
139 11 Paul Carensac
140 12 Paul Carensac
    Now you can *declare tasks* and configure them.
141 12 Paul Carensac
    There are two ways of declaring tasks, but the only one interesting us is the *'class form'* :
142 11 Paul Carensac
<pre>
143 11 Paul Carensac
# my_app/operations.py
144 11 Paul Carensac
145 11 Paul Carensac
from __future__ import absolute_import
146 11 Paul Carensac
147 11 Paul Carensac
from celery import shared_task
148 11 Paul Carensac
149 11 Paul Carensac
from time import sleep
150 11 Paul Carensac
151 11 Paul Carensac
@shared_task
152 11 Paul Carensac
def mul(x, y):
153 11 Paul Carensac
    sleep(3)
154 11 Paul Carensac
    return x * y
155 11 Paul Carensac
</pre>
156 11 Paul Carensac
157 12 Paul Carensac
    Then you need to *register the task in settings.py*.
158 11 Paul Carensac
    To do this, you just need to indicate the file containing the task in the CELERY_IMPORTS tuple :
159 11 Paul Carensac
<pre>
160 11 Paul Carensac
    CELERY_IMPORTS = ("my_app.operations",)
161 11 Paul Carensac
</pre>
162 11 Paul Carensac
163 11 Paul Carensac
    Task are registered in queues when created (task creation is explained below).
164 12 Paul Carensac
    You will want to *specify in which queue a task is registered* (routed), in settings.py :
165 11 Paul Carensac
<pre>
166 11 Paul Carensac
CELERY_ROUTES = {
167 11 Paul Carensac
    "my_app.operations.mul": {"queue": "my_operations_queue"},
168 11 Paul Carensac
    "app2.scheduler.Scheduler": {"queue": "scheduling_q"},
169 11 Paul Carensac
}
170 11 Paul Carensac
</pre>
171 11 Paul Carensac
172 12 Paul Carensac
    In pyros, we want only one process by queue, to avoid several scheduling at the same time for example. There are also other *useful configurations* :
173 11 Paul Carensac
<pre>
174 11 Paul Carensac
# pyros/settings.py
175 11 Paul Carensac
176 11 Paul Carensac
''' These settings is for the worker to take only 1 task at the same time '''
177 11 Paul Carensac
CELERY_ACKS_LATE = False
178 11 Paul Carensac
CELERYD_PREFETCH_MULTIPLIER = 1
179 11 Paul Carensac
180 11 Paul Carensac
''' Removes pickle warning '''
181 11 Paul Carensac
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
182 11 Paul Carensac
183 11 Paul Carensac
''' The way the tasks result are retrieved '''
184 11 Paul Carensac
CELERY_RESULT_BACKEND = 'rpc://'
185 11 Paul Carensac
186 11 Paul Carensac
</pre>
187 11 Paul Carensac
188 12 Paul Carensac
    There we are ! We can now start our workers and create tasks.
189 1 Paul Carensac
    To *start the worker* :
190 1 Paul Carensac
<pre>
191 19 Paul Carensac
$ celery worker -A pyros -Q my_operations_queue -n pyros@task_name -c 1
192 19 Paul Carensac
# The -c option is to set the number of process in this worker.
193 19 Paul Carensac
# The -n option sets the name of the queue.
194 1 Paul Carensac
</pre>
195 1 Paul Carensac
196 12 Paul Carensac
    To *create a task* (in python) :
197 1 Paul Carensac
<pre>
198 1 Paul Carensac
from my_app.oparations import mul
199 1 Paul Carensac
mul.delay(4, 5)
200 1 Paul Carensac
</pre>
201 1 Paul Carensac
202 1 Paul Carensac
    In the terminal where you started your worker, you can see the task and its result !
203 12 Paul Carensac
204 12 Paul Carensac
    In your code, you can *wait for a task to be finished*, and retrieve its result :
205 12 Paul Carensac
<pre>
206 12 Paul Carensac
from app.tasks import my_task
207 12 Paul Carensac
result = my_task.delay(4, 4)
208 12 Paul Carensac
result.get() # blocking if the task is not finished
209 12 Paul Carensac
</pre>
210 12 Paul Carensac
211 12 Paul Carensac
    You will also want to *stop a task*, given a task id :
212 12 Paul Carensac
<pre>
213 12 Paul Carensac
from celery.task.control import revoke
214 12 Paul Carensac
215 12 Paul Carensac
# To delete a pending task (waiting in queue)
216 12 Paul Carensac
revoke(task_id)
217 12 Paul Carensac
# OR, if you want the task to be aborted even during its execution
218 12 Paul Carensac
revoke(task_id, terminate=True)
219 12 Paul Carensac
# You can retrieve task_id with the 'result' (see above) ==> result.id
220 12 Paul Carensac
</pre>
221 12 Paul Carensac
222 10 Paul Carensac
---
223 10 Paul Carensac
224 15 Paul Carensac
h3. Comet
225 15 Paul Carensac
226 15 Paul Carensac
 * *I - Purpose*
227 15 Paul Carensac
    Comet is used to receive and send VOEvents. It is called a voevent broker.
228 15 Paul Carensac
    It is *very* easy to use
229 15 Paul Carensac
 
230 15 Paul Carensac
 * *II - Features*
231 15 Paul Carensac
232 15 Paul Carensac
    * Send a voevent, giving a port, a host and a XML file
233 15 Paul Carensac
    * Listen for voevent reception, and store voevents in a given directory
234 15 Paul Carensac
235 15 Paul Carensac
 * *III - How to use it ?*
236 15 Paul Carensac
237 15 Paul Carensac
    * To send a voevent :
238 15 Paul Carensac
<pre>
239 15 Paul Carensac
# Options --host and --port are optionnal. Default values : 'localhost' and '8089'
240 15 Paul Carensac
$ comet-sendvo --host=localhost --port=8098 -f voevent_to_publish.xml
241 15 Paul Carensac
</pre>
242 15 Paul Carensac
    * To receive voevents :
243 15 Paul Carensac
<pre>
244 20 Paul Carensac
$ twistd -n comet --receive --save-event --save-event-directory=some/directory --remote=localhost:5632 --local-ivo=ivo://irap/pyros
245 1 Paul Carensac
</pre>
246 20 Paul Carensac
Possible error : 
247 20 Paul Carensac
<pre>
248 20 Paul Carensac
TypeError: 'ProcessLookupError' object is not subscriptable
249 20 Paul Carensac
</pre>
250 20 Paul Carensac
You need to delete the twistd.log and the twistd.pid files. This means that you forgot the '-n' option
251 20 Paul Carensac
252 20 Paul Carensac
    * Different available brokers : http://wiki.ivoa.net/twiki/bin/view/IVOA/IvoaVOEvent#VOEvent_Transport
253 20 Paul Carensac
254 16 Paul Carensac
255 15 Paul Carensac
---
256 15 Paul Carensac
257 1 Paul Carensac
h3. Threading library
258 5 Paul Carensac
259 5 Paul Carensac
 * *I - Purpose*
260 5 Paul Carensac
261 5 Paul Carensac
    * Simply create threads with basic communication
262 5 Paul Carensac
    * Allows to handle concurrent access
263 5 Paul Carensac
264 5 Paul Carensac
 * *II - Features*
265 5 Paul Carensac
266 5 Paul Carensac
    Provides :
267 5 Paul Carensac
268 5 Paul Carensac
    * A Thread class to inherit from, with a run() method that will be called when the thread starts
269 5 Paul Carensac
    * An Event class to set/unset a boolean in order to transmit message to the thread
270 5 Paul Carensac
    * Lock and RLock object to handle concurrent access
271 5 Paul Carensac
272 5 Paul Carensac
 * *III - How to use it ?*
273 5 Paul Carensac
274 5 Paul Carensac
    <pre>from threading import Thread, Event</pre>
275 5 Paul Carensac
276 5 Paul Carensac
    * Thread
277 5 Paul Carensac
278 5 Paul Carensac
        * Create a class inheriting from Thread
279 5 Paul Carensac
        * Override 'run' method, that will be called at thread start
280 5 Paul Carensac
        * Instantiate your class, and do MyClass.start() to create the thread
281 5 Paul Carensac
282 5 Paul Carensac
    * Event
283 5 Paul Carensac
284 5 Paul Carensac
        * Create an Event variable in your Thread-inheriting class (eg: 'self.stop_event = Event()')
285 5 Paul Carensac
        * After thread starts, you can set/unset the event by doing MyClass.stop_event.set() / .clear()
286 5 Paul Carensac
        * There are a few useful methods, see this link for further information : https://docs.python.org/3/library/threading.html#threading.Event
287 5 Paul Carensac
    * Lock / RLock
288 5 Paul Carensac
289 5 Paul Carensac
        * Still not used, see online documentation : https://docs.python.org/3/library/threading.html#lock-objects
290 10 Paul Carensac
291 10 Paul Carensac
---
292 2 Paul Carensac
293 2 Paul Carensac
h3. Socket library
294 2 Paul Carensac
295 6 Paul Carensac
 * *I - Purpose*
296 6 Paul Carensac
297 6 Paul Carensac
    * Handle network communication, just giving IP and Port of the interlocutors
298 6 Paul Carensac
299 6 Paul Carensac
 * *II - Features*
300 6 Paul Carensac
301 6 Paul Carensac
    * 'server' system to create an interface, waiting for client connections and sending / receiving data from them
302 6 Paul Carensac
    * 'client' system to connect to a server, and send/receive data from it
303 6 Paul Carensac
304 6 Paul Carensac
 * *III - How to use it ?*
305 6 Paul Carensac
306 6 Paul Carensac
    * Server
307 6 Paul Carensac
308 6 Paul Carensac
        * Instantiate socket and wait for connections
309 6 Paul Carensac
<pre>
310 6 Paul Carensac
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)   # create the socket
311 6 Paul Carensac
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # for the port to be immediately re-usable after closing the socket
312 6 Paul Carensac
self.server_socket.bind((self.ip, self.receive_port))                    # associate the socket to an ip and a port
313 6 Paul Carensac
self.server_socket.listen(12)                                            # wait for connections (here, 12 connections can be simultaneously waiting for acceptance)
314 6 Paul Carensac
</pre>
315 6 Paul Carensac
        * Accept connections
316 6 Paul Carensac
<pre>
317 6 Paul Carensac
conn, addr = self.server_socket.accept() # conn is a new socket created at the connection
318 6 Paul Carensac
</pre>
319 6 Paul Carensac
        * Exchanging messages
320 6 Paul Carensac
<pre>
321 6 Paul Carensac
conn.send(bytes(message, 'UTF-8'))          # sending
322 6 Paul Carensac
data = conn.recv(self.buffer_size).decode() # receiving
323 6 Paul Carensac
</pre>
324 6 Paul Carensac
        * Closing sockets when you're done with them
325 6 Paul Carensac
<pre>
326 6 Paul Carensac
conn.close()
327 6 Paul Carensac
...
328 6 Paul Carensac
server_socket.close()
329 6 Paul Carensac
</pre>
330 6 Paul Carensac
331 6 Paul Carensac
    * Client
332 6 Paul Carensac
333 6 Paul Carensac
        * Instantiate the socket and connect to a server
334 6 Paul Carensac
<pre>
335 6 Paul Carensac
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
336 6 Paul Carensac
client_socket.connect((dest_ip, dest_receive_port))
337 6 Paul Carensac
</pre>
338 6 Paul Carensac
        * Exchanging messages
339 6 Paul Carensac
<pre>
340 6 Paul Carensac
client_socket.send(bytes(message, 'UTF-8'))          # sending
341 6 Paul Carensac
data = client_socket.recv(self.buffer_size).decode() # receiving
342 6 Paul Carensac
</pre>
343 6 Paul Carensac
        * Closing sockets when you're done with them
344 6 Paul Carensac
<pre>
345 6 Paul Carensac
client_socket.close()
346 6 Paul Carensac
</pre>
347 6 Paul Carensac
348 2 Paul Carensac
---