Technical components

Version 15 (Paul Carensac, 05/13/2016 04:14 pm)

1 1 Paul Carensac
h1. Technical components
2 2 Paul Carensac
3 8 Paul Carensac
Explanations about the technical components of the project : the ones we have created (internal), and the imported ones (external).
4 2 Paul Carensac
5 14 Paul Carensac
{{>toc}}
6 14 Paul Carensac
7 2 Paul Carensac
---
8 2 Paul Carensac
9 2 Paul Carensac
h2. %{margin-left:0px; font-weight:bold; font-size:25px;  display:block; color:red;}Internal components%
10 2 Paul Carensac
11 2 Paul Carensac
h3. Agent
12 1 Paul Carensac
13 4 Paul Carensac
The Agent class is in the common.agent.py file.
14 4 Paul Carensac
15 4 Paul Carensac
 * *I - Purpose*
16 4 Paul Carensac
17 4 Paul Carensac
    * Generically handles and creates the asynchronous modules
18 4 Paul Carensac
    * Uses the threading library (see below in External components) to make all modules independent
19 4 Paul Carensac
    * Provides an abstract class to be inherited
20 4 Paul Carensac
21 4 Paul Carensac
 * *II - Features*
22 4 Paul Carensac
23 4 Paul Carensac
    * Uses a config file (pyros_agent_config.ini) to set the network communication interface of all agents
24 4 Paul Carensac
    * Provides a 'work' method to override : this is the entry method of the newly created thread (see 'How to use it' section below)
25 4 Paul Carensac
    * Provides the 'receive' and 'analyze_message' methods to generically receive messages from network and analyze them 
26 4 Paul Carensac
27 4 Paul Carensac
 * *III - How to use it ?*
28 4 Paul Carensac
29 4 Paul Carensac
    Each of these points are +NECESSARY+
30 4 Paul Carensac
31 4 Paul Carensac
    * Create a new class that inherits from Agent
32 4 Paul Carensac
    * In the __init__ method, first call the __init__ method of Agent, passing the name of the agent as second parameter (they are defined in the Agent class, eg: Agent.SCHEDULER)
33 4 Paul Carensac
    * Inside the class, define the messages your agent can receive (eg: MSG_OBS_FINISHED = "Observation finished")
34 4 Paul Carensac
    * Create a method to be called for every message you created
35 4 Paul Carensac
    * In the __init__, after calling the Agent's __init__, associate each message to its associated function in the 'self.actions_by_message' dictionary (eg: self.actions_by_message[self.MSG_OBS_FINISHED] = self.observation_finished)
36 4 Paul Carensac
    * Override the method work : this will be the entry function of the new thread, so do whatever you need. This MUST NOT be an infinite loop, because Agent's receive method will be called after this one
37 4 Paul Carensac
    * If ever needed, override the 'shutdown' method, it will be called when your agent receive the Agent.SHUTDOWN message (eg: if you created another thread in the 'work' method, you need to close it)
38 5 Paul Carensac
    * To start the agent, just instantiate your class and do MyClass.start() (the 'work' method will be called)
39 1 Paul Carensac
40 5 Paul Carensac
    The main points to understand are that you can do whatever you want (but non-blocking) in work method (like creating new threads or variables' initialization), then the only entry points are the message-associated methods
41 4 Paul Carensac
42 7 Paul Carensac
 * *IV - Important : pyros agents launching*
43 7 Paul Carensac
44 7 Paul Carensac
    * In pyros, there is maximum 1 agent per application
45 7 Paul Carensac
    * The agent must be started at application start :
46 7 Paul Carensac
        
47 7 Paul Carensac
        * In the MyApp.apps.py file, create a class inheriting from django.apps.AppConfig
48 7 Paul Carensac
        * Define the 'name' attribute in it, giving it the name of the agent
49 7 Paul Carensac
        * Create a 'ready(self)' method
50 7 Paul Carensac
        * in the ready method, import your agent implementation, instantiate it and start it
51 7 Paul Carensac
<pre>
52 7 Paul Carensac
from django.apps import AppConfig
53 7 Paul Carensac
54 7 Paul Carensac
55 7 Paul Carensac
class AlertManagerConfig(AppConfig):
56 7 Paul Carensac
    name = 'alert_manager'
57 7 Paul Carensac
    
58 7 Paul Carensac
    def ready(self):
59 7 Paul Carensac
        from alert_manager.agent import AlertManagerAgent
60 7 Paul Carensac
        self.agent = AlertManagerAgent()
61 7 Paul Carensac
        self.agent.start()
62 7 Paul Carensac
</pre>
63 7 Paul Carensac
64 2 Paul Carensac
h3. Sender
65 1 Paul Carensac
66 4 Paul Carensac
The Sender class is in the common.sender.py file
67 4 Paul Carensac
68 4 Paul Carensac
 * *I - Purpose*
69 4 Paul Carensac
70 4 Paul Carensac
    * Send a given message to an agent
71 4 Paul Carensac
72 4 Paul Carensac
 * *II - Features*
73 4 Paul Carensac
74 4 Paul Carensac
    * Uses the 'pyros_agent_config.ini' file to get the agents' network interface configuration (ip and port)
75 4 Paul Carensac
    * Provide a 'send_to' static method to send the messages
76 4 Paul Carensac
77 4 Paul Carensac
 * *III - How to use it ?*
78 4 Paul Carensac
79 4 Paul Carensac
    * The targeted agent must be described in 'pyros_agent_config.ini'
80 4 Paul Carensac
    * Use Sender.send_to method, giving as first parameter the name of the targeted agent (eg: Agent.SCHEDULER), and as second parameter the message (eg: Agent.SHUTDOWN)
81 4 Paul Carensac
    * /!\ send_to is a static method, you don't need to instantiate a Sender (just do Sender.send_to(...))
82 9 Paul Carensac
83 2 Paul Carensac
---
84 2 Paul Carensac
85 2 Paul Carensac
h2. %{margin-left:0px; font-weight:bold; font-size:25px;  display:block; color:red;}External components%
86 2 Paul Carensac
87 10 Paul Carensac
h3. Celery
88 10 Paul Carensac
89 10 Paul Carensac
 * *I - Purpose*
90 11 Paul Carensac
    Celery is used to create and handle tasks in different processes/threads (*called workers*).
91 10 Paul Carensac
    Its use is very easy.
92 10 Paul Carensac
93 10 Paul Carensac
 * *II - Features*
94 10 Paul Carensac
95 10 Paul Carensac
    * Create personalized tasks asynchronously
96 10 Paul Carensac
    * Has ETA and countdowns
97 10 Paul Carensac
    * Lots of configurations are possible
98 10 Paul Carensac
99 10 Paul Carensac
 * *III - How to use it ?*
100 1 Paul Carensac
101 12 Paul Carensac
    With Django, you first need to create a *celery.py* in project's folder (the one containing settings.py) :
102 11 Paul Carensac
<pre>
103 11 Paul Carensac
from __future__ import absolute_import
104 11 Paul Carensac
import os
105 11 Paul Carensac
from celery import Celery
106 1 Paul Carensac
107 11 Paul Carensac
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'PROJECT_NAME.settings')
108 11 Paul Carensac
109 11 Paul Carensac
from django.conf import settings
110 11 Paul Carensac
111 11 Paul Carensac
app = Celery('PROJECT_NAME')
112 11 Paul Carensac
113 11 Paul Carensac
app.config_from_object('django.conf:settings')
114 11 Paul Carensac
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
115 11 Paul Carensac
116 11 Paul Carensac
@app.task(bind=True)
117 11 Paul Carensac
def debug_task(self):
118 11 Paul Carensac
    print("Request: {0!r}".format(self.request))
119 11 Paul Carensac
</pre>
120 11 Paul Carensac
121 11 Paul Carensac
    /!\ Replace PROJECT_NAME by your project's name (pyros in our case)
122 11 Paul Carensac
123 12 Paul Carensac
    In the project's *__init__.py* (pyros/__init__.py), add this code :
124 11 Paul Carensac
<pre>
125 11 Paul Carensac
from __future__ import absolute_import
126 11 Paul Carensac
127 11 Paul Carensac
from .celery import app as celery_app
128 11 Paul Carensac
</pre>
129 11 Paul Carensac
130 12 Paul Carensac
    Now you can *declare tasks* and configure them.
131 12 Paul Carensac
    There are two ways of declaring tasks, but the only one interesting us is the *'class form'* :
132 11 Paul Carensac
<pre>
133 11 Paul Carensac
# my_app/operations.py
134 11 Paul Carensac
135 11 Paul Carensac
from __future__ import absolute_import
136 11 Paul Carensac
137 11 Paul Carensac
from celery import shared_task
138 11 Paul Carensac
139 11 Paul Carensac
from time import sleep
140 11 Paul Carensac
141 11 Paul Carensac
@shared_task
142 11 Paul Carensac
def mul(x, y):
143 11 Paul Carensac
    sleep(3)
144 11 Paul Carensac
    return x * y
145 11 Paul Carensac
</pre>
146 11 Paul Carensac
147 12 Paul Carensac
    Then you need to *register the task in settings.py*.
148 11 Paul Carensac
    To do this, you just need to indicate the file containing the task in the CELERY_IMPORTS tuple :
149 11 Paul Carensac
<pre>
150 11 Paul Carensac
    CELERY_IMPORTS = ("my_app.operations",)
151 11 Paul Carensac
</pre>
152 11 Paul Carensac
153 11 Paul Carensac
    Task are registered in queues when created (task creation is explained below).
154 12 Paul Carensac
    You will want to *specify in which queue a task is registered* (routed), in settings.py :
155 11 Paul Carensac
<pre>
156 11 Paul Carensac
CELERY_ROUTES = {
157 11 Paul Carensac
    "my_app.operations.mul": {"queue": "my_operations_queue"},
158 11 Paul Carensac
    "app2.scheduler.Scheduler": {"queue": "scheduling_q"},
159 11 Paul Carensac
}
160 11 Paul Carensac
</pre>
161 11 Paul Carensac
162 12 Paul Carensac
    In pyros, we want only one process by queue, to avoid several scheduling at the same time for example. There are also other *useful configurations* :
163 11 Paul Carensac
<pre>
164 11 Paul Carensac
# pyros/settings.py
165 11 Paul Carensac
''' This settings is for having only 1 process (worker) by queue '''
166 11 Paul Carensac
CELERYD_CONCURRENCY = 1
167 11 Paul Carensac
168 11 Paul Carensac
''' These settings is for the worker to take only 1 task at the same time '''
169 11 Paul Carensac
CELERY_ACKS_LATE = False
170 11 Paul Carensac
CELERYD_PREFETCH_MULTIPLIER = 1
171 11 Paul Carensac
172 11 Paul Carensac
''' Removes pickle warning '''
173 11 Paul Carensac
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
174 11 Paul Carensac
175 11 Paul Carensac
''' The way the tasks result are retrieved '''
176 11 Paul Carensac
CELERY_RESULT_BACKEND = 'rpc://'
177 11 Paul Carensac
178 11 Paul Carensac
</pre>
179 11 Paul Carensac
180 12 Paul Carensac
    There we are ! We can now start our workers and create tasks.
181 12 Paul Carensac
    To *start the worker* :
182 11 Paul Carensac
<pre>
183 1 Paul Carensac
$ celery worker -A pyros -Q my_operations_queue
184 1 Paul Carensac
</pre>
185 1 Paul Carensac
186 12 Paul Carensac
    To *create a task* (in python) :
187 1 Paul Carensac
<pre>
188 1 Paul Carensac
from my_app.oparations import mul
189 1 Paul Carensac
mul.delay(4, 5)
190 1 Paul Carensac
</pre>
191 1 Paul Carensac
192 1 Paul Carensac
    In the terminal where you started your worker, you can see the task and its result !
193 12 Paul Carensac
194 12 Paul Carensac
    In your code, you can *wait for a task to be finished*, and retrieve its result :
195 12 Paul Carensac
<pre>
196 12 Paul Carensac
from app.tasks import my_task
197 12 Paul Carensac
result = my_task.delay(4, 4)
198 12 Paul Carensac
result.get() # blocking if the task is not finished
199 12 Paul Carensac
</pre>
200 12 Paul Carensac
201 12 Paul Carensac
    You will also want to *stop a task*, given a task id :
202 12 Paul Carensac
<pre>
203 12 Paul Carensac
from celery.task.control import revoke
204 12 Paul Carensac
205 12 Paul Carensac
# To delete a pending task (waiting in queue)
206 12 Paul Carensac
revoke(task_id)
207 12 Paul Carensac
# OR, if you want the task to be aborted even during its execution
208 12 Paul Carensac
revoke(task_id, terminate=True)
209 12 Paul Carensac
# You can retrieve task_id with the 'result' (see above) ==> result.id
210 12 Paul Carensac
</pre>
211 12 Paul Carensac
212 10 Paul Carensac
---
213 10 Paul Carensac
214 15 Paul Carensac
215 15 Paul Carensac
h3. Comet
216 15 Paul Carensac
217 15 Paul Carensac
 * *I - Purpose*
218 15 Paul Carensac
    Comet is used to receive and send VOEvents. It is called a voevent broker.
219 15 Paul Carensac
    It is *very* easy to use
220 15 Paul Carensac
 
221 15 Paul Carensac
 * *II - Features*
222 15 Paul Carensac
223 15 Paul Carensac
    * Send a voevent, giving a port, a host and a XML file
224 15 Paul Carensac
    * Listen for voevent reception, and store voevents in a given directory
225 15 Paul Carensac
226 15 Paul Carensac
 * *III - How to use it ?*
227 15 Paul Carensac
228 15 Paul Carensac
    * To send a voevent :
229 15 Paul Carensac
<pre>
230 15 Paul Carensac
# Options --host and --port are optionnal. Default values : 'localhost' and '8089'
231 15 Paul Carensac
$ comet-sendvo --host=localhost --port=8098 -f voevent_to_publish.xml
232 15 Paul Carensac
</pre>
233 15 Paul Carensac
    * To receive voevents :
234 15 Paul Carensac
<pre>
235 15 Paul Carensac
twistd comet --receive --save-event --save-event-directory=some/directory --remote=localhost:5632 --local-ivo=ivo://irap/pyros
236 15 Paul Carensac
</pre>
237 15 Paul Carensac
---
238 15 Paul Carensac
239 1 Paul Carensac
h3. Threading library
240 5 Paul Carensac
241 5 Paul Carensac
 * *I - Purpose*
242 5 Paul Carensac
243 5 Paul Carensac
    * Simply create threads with basic communication
244 5 Paul Carensac
    * Allows to handle concurrent access
245 5 Paul Carensac
246 5 Paul Carensac
 * *II - Features*
247 5 Paul Carensac
248 5 Paul Carensac
    Provides :
249 5 Paul Carensac
250 5 Paul Carensac
    * A Thread class to inherit from, with a run() method that will be called when the thread starts
251 5 Paul Carensac
    * An Event class to set/unset a boolean in order to transmit message to the thread
252 5 Paul Carensac
    * Lock and RLock object to handle concurrent access
253 5 Paul Carensac
254 5 Paul Carensac
 * *III - How to use it ?*
255 5 Paul Carensac
256 5 Paul Carensac
    <pre>from threading import Thread, Event</pre>
257 5 Paul Carensac
258 5 Paul Carensac
    * Thread
259 5 Paul Carensac
260 5 Paul Carensac
        * Create a class inheriting from Thread
261 5 Paul Carensac
        * Override 'run' method, that will be called at thread start
262 5 Paul Carensac
        * Instantiate your class, and do MyClass.start() to create the thread
263 5 Paul Carensac
264 5 Paul Carensac
    * Event
265 5 Paul Carensac
266 5 Paul Carensac
        * Create an Event variable in your Thread-inheriting class (eg: 'self.stop_event = Event()')
267 5 Paul Carensac
        * After thread starts, you can set/unset the event by doing MyClass.stop_event.set() / .clear()
268 5 Paul Carensac
        * There are a few useful methods, see this link for further information : https://docs.python.org/3/library/threading.html#threading.Event
269 5 Paul Carensac
    * Lock / RLock
270 5 Paul Carensac
271 5 Paul Carensac
        * Still not used, see online documentation : https://docs.python.org/3/library/threading.html#lock-objects
272 10 Paul Carensac
273 10 Paul Carensac
---
274 2 Paul Carensac
275 2 Paul Carensac
h3. Socket library
276 2 Paul Carensac
277 6 Paul Carensac
 * *I - Purpose*
278 6 Paul Carensac
279 6 Paul Carensac
    * Handle network communication, just giving IP and Port of the interlocutors
280 6 Paul Carensac
281 6 Paul Carensac
 * *II - Features*
282 6 Paul Carensac
283 6 Paul Carensac
    * 'server' system to create an interface, waiting for client connections and sending / receiving data from them
284 6 Paul Carensac
    * 'client' system to connect to a server, and send/receive data from it
285 6 Paul Carensac
286 6 Paul Carensac
 * *III - How to use it ?*
287 6 Paul Carensac
288 6 Paul Carensac
    * Server
289 6 Paul Carensac
290 6 Paul Carensac
        * Instantiate socket and wait for connections
291 6 Paul Carensac
<pre>
292 6 Paul Carensac
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)   # create the socket
293 6 Paul Carensac
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # for the port to be immediately re-usable after closing the socket
294 6 Paul Carensac
self.server_socket.bind((self.ip, self.receive_port))                    # associate the socket to an ip and a port
295 6 Paul Carensac
self.server_socket.listen(12)                                            # wait for connections (here, 12 connections can be simultaneously waiting for acceptance)
296 6 Paul Carensac
</pre>
297 6 Paul Carensac
        * Accept connections
298 6 Paul Carensac
<pre>
299 6 Paul Carensac
conn, addr = self.server_socket.accept() # conn is a new socket created at the connection
300 6 Paul Carensac
</pre>
301 6 Paul Carensac
        * Exchanging messages
302 6 Paul Carensac
<pre>
303 6 Paul Carensac
conn.send(bytes(message, 'UTF-8'))          # sending
304 6 Paul Carensac
data = conn.recv(self.buffer_size).decode() # receiving
305 6 Paul Carensac
</pre>
306 6 Paul Carensac
        * Closing sockets when you're done with them
307 6 Paul Carensac
<pre>
308 6 Paul Carensac
conn.close()
309 6 Paul Carensac
...
310 6 Paul Carensac
server_socket.close()
311 6 Paul Carensac
</pre>
312 6 Paul Carensac
313 6 Paul Carensac
    * Client
314 6 Paul Carensac
315 6 Paul Carensac
        * Instantiate the socket and connect to a server
316 6 Paul Carensac
<pre>
317 6 Paul Carensac
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
318 6 Paul Carensac
client_socket.connect((dest_ip, dest_receive_port))
319 6 Paul Carensac
</pre>
320 6 Paul Carensac
        * Exchanging messages
321 6 Paul Carensac
<pre>
322 6 Paul Carensac
client_socket.send(bytes(message, 'UTF-8'))          # sending
323 6 Paul Carensac
data = client_socket.recv(self.buffer_size).decode() # receiving
324 6 Paul Carensac
</pre>
325 6 Paul Carensac
        * Closing sockets when you're done with them
326 6 Paul Carensac
<pre>
327 6 Paul Carensac
client_socket.close()
328 6 Paul Carensac
</pre>
329 6 Paul Carensac
330 2 Paul Carensac
---