Technical components
Version 18 (Paul Carensac, 07/08/2016 12:42 pm)
1 | 1 | Paul Carensac | h1. Technical components |
---|---|---|---|
2 | 2 | Paul Carensac | |
3 | 8 | Paul Carensac | Explanations about the technical components of the project : the ones we have created (internal), and the imported ones (external). |
4 | 2 | Paul Carensac | |
5 | 14 | Paul Carensac | {{>toc}} |
6 | 14 | Paul Carensac | |
7 | 2 | Paul Carensac | --- |
8 | 2 | Paul Carensac | |
9 | 2 | Paul Carensac | h2. %{margin-left:0px; font-weight:bold; font-size:25px; display:block; color:red;}Internal components% |
10 | 2 | Paul Carensac | |
11 | 18 | Paul Carensac | h3. Celery tasks list |
12 | 18 | Paul Carensac | |
13 | 18 | Paul Carensac | See on this google document : https://docs.google.com/spreadsheets/d/15fu0BQm0VYx07qyAl5YiP_OwARTJZdd_JEmK4uoteKU/edit?usp=sharing |
14 | 18 | Paul Carensac | |
15 | 18 | Paul Carensac | h3. Pyros grammar (for instruments) |
16 | 18 | Paul Carensac | |
17 | 18 | Paul Carensac | See on this google doc : https://docs.google.com/spreadsheets/d/1rDWRI2FCyFLhu-9HEGVtSUtgD4vUY7FHG977ilPVkdU/edit?usp=sharing |
18 | 18 | Paul Carensac | |
19 | 18 | Paul Carensac | h3. THE AGENT AND SENDER COMPONENTS ARE NOT IMPLEMENTED ANYMORE |
20 | 18 | Paul Carensac | |
21 | 2 | Paul Carensac | h3. Agent |
22 | 1 | Paul Carensac | |
23 | 4 | Paul Carensac | The Agent class is in the common.agent.py file. |
24 | 4 | Paul Carensac | |
25 | 4 | Paul Carensac | * *I - Purpose* |
26 | 4 | Paul Carensac | |
27 | 4 | Paul Carensac | * Generically handles and creates the asynchronous modules |
28 | 4 | Paul Carensac | * Uses the threading library (see below in External components) to make all modules independent |
29 | 4 | Paul Carensac | * Provides an abstract class to be inherited |
30 | 4 | Paul Carensac | |
31 | 4 | Paul Carensac | * *II - Features* |
32 | 4 | Paul Carensac | |
33 | 4 | Paul Carensac | * Uses a config file (pyros_agent_config.ini) to set the network communication interface of all agents |
34 | 4 | Paul Carensac | * Provides a 'work' method to override : this is the entry method of the newly created thread (see 'How to use it' section below) |
35 | 4 | Paul Carensac | * Provides the 'receive' and 'analyze_message' methods to generically receive messages from network and analyze them |
36 | 4 | Paul Carensac | |
37 | 4 | Paul Carensac | * *III - How to use it ?* |
38 | 4 | Paul Carensac | |
39 | 4 | Paul Carensac | Each of these points are +NECESSARY+ |
40 | 4 | Paul Carensac | |
41 | 4 | Paul Carensac | * Create a new class that inherits from Agent |
42 | 4 | Paul Carensac | * In the __init__ method, first call the __init__ method of Agent, passing the name of the agent as second parameter (they are defined in the Agent class, eg: Agent.SCHEDULER) |
43 | 4 | Paul Carensac | * Inside the class, define the messages your agent can receive (eg: MSG_OBS_FINISHED = "Observation finished") |
44 | 4 | Paul Carensac | * Create a method to be called for every message you created |
45 | 4 | Paul Carensac | * In the __init__, after calling the Agent's __init__, associate each message to its associated function in the 'self.actions_by_message' dictionary (eg: self.actions_by_message[self.MSG_OBS_FINISHED] = self.observation_finished) |
46 | 4 | Paul Carensac | * Override the method work : this will be the entry function of the new thread, so do whatever you need. This MUST NOT be an infinite loop, because Agent's receive method will be called after this one |
47 | 4 | Paul Carensac | * If ever needed, override the 'shutdown' method, it will be called when your agent receive the Agent.SHUTDOWN message (eg: if you created another thread in the 'work' method, you need to close it) |
48 | 5 | Paul Carensac | * To start the agent, just instantiate your class and do MyClass.start() (the 'work' method will be called) |
49 | 1 | Paul Carensac | |
50 | 5 | Paul Carensac | The main points to understand are that you can do whatever you want (but non-blocking) in work method (like creating new threads or variables' initialization), then the only entry points are the message-associated methods |
51 | 4 | Paul Carensac | |
52 | 7 | Paul Carensac | * *IV - Important : pyros agents launching* |
53 | 7 | Paul Carensac | |
54 | 7 | Paul Carensac | * In pyros, there is maximum 1 agent per application |
55 | 7 | Paul Carensac | * The agent must be started at application start : |
56 | 7 | Paul Carensac | |
57 | 7 | Paul Carensac | * In the MyApp.apps.py file, create a class inheriting from django.apps.AppConfig |
58 | 7 | Paul Carensac | * Define the 'name' attribute in it, giving it the name of the agent |
59 | 7 | Paul Carensac | * Create a 'ready(self)' method |
60 | 7 | Paul Carensac | * in the ready method, import your agent implementation, instantiate it and start it |
61 | 7 | Paul Carensac | <pre> |
62 | 7 | Paul Carensac | from django.apps import AppConfig |
63 | 7 | Paul Carensac | |
64 | 7 | Paul Carensac | |
65 | 7 | Paul Carensac | class AlertManagerConfig(AppConfig): |
66 | 7 | Paul Carensac | name = 'alert_manager' |
67 | 7 | Paul Carensac | |
68 | 7 | Paul Carensac | def ready(self): |
69 | 7 | Paul Carensac | from alert_manager.agent import AlertManagerAgent |
70 | 7 | Paul Carensac | self.agent = AlertManagerAgent() |
71 | 7 | Paul Carensac | self.agent.start() |
72 | 7 | Paul Carensac | </pre> |
73 | 7 | Paul Carensac | |
74 | 2 | Paul Carensac | h3. Sender |
75 | 1 | Paul Carensac | |
76 | 4 | Paul Carensac | The Sender class is in the common.sender.py file |
77 | 4 | Paul Carensac | |
78 | 4 | Paul Carensac | * *I - Purpose* |
79 | 4 | Paul Carensac | |
80 | 4 | Paul Carensac | * Send a given message to an agent |
81 | 4 | Paul Carensac | |
82 | 4 | Paul Carensac | * *II - Features* |
83 | 4 | Paul Carensac | |
84 | 4 | Paul Carensac | * Uses the 'pyros_agent_config.ini' file to get the agents' network interface configuration (ip and port) |
85 | 4 | Paul Carensac | * Provide a 'send_to' static method to send the messages |
86 | 4 | Paul Carensac | |
87 | 4 | Paul Carensac | * *III - How to use it ?* |
88 | 4 | Paul Carensac | |
89 | 4 | Paul Carensac | * The targeted agent must be described in 'pyros_agent_config.ini' |
90 | 4 | Paul Carensac | * Use Sender.send_to method, giving as first parameter the name of the targeted agent (eg: Agent.SCHEDULER), and as second parameter the message (eg: Agent.SHUTDOWN) |
91 | 4 | Paul Carensac | * /!\ send_to is a static method, you don't need to instantiate a Sender (just do Sender.send_to(...)) |
92 | 9 | Paul Carensac | |
93 | 2 | Paul Carensac | --- |
94 | 2 | Paul Carensac | |
95 | 2 | Paul Carensac | h2. %{margin-left:0px; font-weight:bold; font-size:25px; display:block; color:red;}External components% |
96 | 2 | Paul Carensac | |
97 | 10 | Paul Carensac | h3. Celery |
98 | 10 | Paul Carensac | |
99 | 10 | Paul Carensac | * *I - Purpose* |
100 | 11 | Paul Carensac | Celery is used to create and handle tasks in different processes/threads (*called workers*). |
101 | 10 | Paul Carensac | Its use is very easy. |
102 | 10 | Paul Carensac | |
103 | 10 | Paul Carensac | * *II - Features* |
104 | 10 | Paul Carensac | |
105 | 10 | Paul Carensac | * Create personalized tasks asynchronously |
106 | 10 | Paul Carensac | * Has ETA and countdowns |
107 | 10 | Paul Carensac | * Lots of configurations are possible |
108 | 10 | Paul Carensac | |
109 | 10 | Paul Carensac | * *III - How to use it ?* |
110 | 1 | Paul Carensac | |
111 | 12 | Paul Carensac | With Django, you first need to create a *celery.py* in project's folder (the one containing settings.py) : |
112 | 11 | Paul Carensac | <pre> |
113 | 11 | Paul Carensac | from __future__ import absolute_import |
114 | 11 | Paul Carensac | import os |
115 | 11 | Paul Carensac | from celery import Celery |
116 | 1 | Paul Carensac | |
117 | 11 | Paul Carensac | os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'PROJECT_NAME.settings') |
118 | 11 | Paul Carensac | |
119 | 11 | Paul Carensac | from django.conf import settings |
120 | 11 | Paul Carensac | |
121 | 11 | Paul Carensac | app = Celery('PROJECT_NAME') |
122 | 11 | Paul Carensac | |
123 | 11 | Paul Carensac | app.config_from_object('django.conf:settings') |
124 | 11 | Paul Carensac | app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) |
125 | 11 | Paul Carensac | |
126 | 11 | Paul Carensac | @app.task(bind=True) |
127 | 11 | Paul Carensac | def debug_task(self): |
128 | 11 | Paul Carensac | print("Request: {0!r}".format(self.request)) |
129 | 11 | Paul Carensac | </pre> |
130 | 11 | Paul Carensac | |
131 | 11 | Paul Carensac | /!\ Replace PROJECT_NAME by your project's name (pyros in our case) |
132 | 11 | Paul Carensac | |
133 | 12 | Paul Carensac | In the project's *__init__.py* (pyros/__init__.py), add this code : |
134 | 11 | Paul Carensac | <pre> |
135 | 11 | Paul Carensac | from __future__ import absolute_import |
136 | 11 | Paul Carensac | |
137 | 11 | Paul Carensac | from .celery import app as celery_app |
138 | 11 | Paul Carensac | </pre> |
139 | 11 | Paul Carensac | |
140 | 12 | Paul Carensac | Now you can *declare tasks* and configure them. |
141 | 12 | Paul Carensac | There are two ways of declaring tasks, but the only one interesting us is the *'class form'* : |
142 | 11 | Paul Carensac | <pre> |
143 | 11 | Paul Carensac | # my_app/operations.py |
144 | 11 | Paul Carensac | |
145 | 11 | Paul Carensac | from __future__ import absolute_import |
146 | 11 | Paul Carensac | |
147 | 11 | Paul Carensac | from celery import shared_task |
148 | 11 | Paul Carensac | |
149 | 11 | Paul Carensac | from time import sleep |
150 | 11 | Paul Carensac | |
151 | 11 | Paul Carensac | @shared_task |
152 | 11 | Paul Carensac | def mul(x, y): |
153 | 11 | Paul Carensac | sleep(3) |
154 | 11 | Paul Carensac | return x * y |
155 | 11 | Paul Carensac | </pre> |
156 | 11 | Paul Carensac | |
157 | 12 | Paul Carensac | Then you need to *register the task in settings.py*. |
158 | 11 | Paul Carensac | To do this, you just need to indicate the file containing the task in the CELERY_IMPORTS tuple : |
159 | 11 | Paul Carensac | <pre> |
160 | 11 | Paul Carensac | CELERY_IMPORTS = ("my_app.operations",) |
161 | 11 | Paul Carensac | </pre> |
162 | 11 | Paul Carensac | |
163 | 11 | Paul Carensac | Task are registered in queues when created (task creation is explained below). |
164 | 12 | Paul Carensac | You will want to *specify in which queue a task is registered* (routed), in settings.py : |
165 | 11 | Paul Carensac | <pre> |
166 | 11 | Paul Carensac | CELERY_ROUTES = { |
167 | 11 | Paul Carensac | "my_app.operations.mul": {"queue": "my_operations_queue"}, |
168 | 11 | Paul Carensac | "app2.scheduler.Scheduler": {"queue": "scheduling_q"}, |
169 | 11 | Paul Carensac | } |
170 | 11 | Paul Carensac | </pre> |
171 | 11 | Paul Carensac | |
172 | 12 | Paul Carensac | In pyros, we want only one process by queue, to avoid several scheduling at the same time for example. There are also other *useful configurations* : |
173 | 11 | Paul Carensac | <pre> |
174 | 11 | Paul Carensac | # pyros/settings.py |
175 | 11 | Paul Carensac | ''' This settings is for having only 1 process (worker) by queue ''' |
176 | 11 | Paul Carensac | CELERYD_CONCURRENCY = 1 |
177 | 11 | Paul Carensac | |
178 | 11 | Paul Carensac | ''' These settings is for the worker to take only 1 task at the same time ''' |
179 | 11 | Paul Carensac | CELERY_ACKS_LATE = False |
180 | 11 | Paul Carensac | CELERYD_PREFETCH_MULTIPLIER = 1 |
181 | 11 | Paul Carensac | |
182 | 11 | Paul Carensac | ''' Removes pickle warning ''' |
183 | 11 | Paul Carensac | CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] |
184 | 11 | Paul Carensac | |
185 | 11 | Paul Carensac | ''' The way the tasks result are retrieved ''' |
186 | 11 | Paul Carensac | CELERY_RESULT_BACKEND = 'rpc://' |
187 | 11 | Paul Carensac | |
188 | 11 | Paul Carensac | </pre> |
189 | 11 | Paul Carensac | |
190 | 12 | Paul Carensac | There we are ! We can now start our workers and create tasks. |
191 | 12 | Paul Carensac | To *start the worker* : |
192 | 11 | Paul Carensac | <pre> |
193 | 1 | Paul Carensac | $ celery worker -A pyros -Q my_operations_queue |
194 | 1 | Paul Carensac | </pre> |
195 | 1 | Paul Carensac | |
196 | 12 | Paul Carensac | To *create a task* (in python) : |
197 | 1 | Paul Carensac | <pre> |
198 | 1 | Paul Carensac | from my_app.oparations import mul |
199 | 1 | Paul Carensac | mul.delay(4, 5) |
200 | 1 | Paul Carensac | </pre> |
201 | 1 | Paul Carensac | |
202 | 1 | Paul Carensac | In the terminal where you started your worker, you can see the task and its result ! |
203 | 12 | Paul Carensac | |
204 | 12 | Paul Carensac | In your code, you can *wait for a task to be finished*, and retrieve its result : |
205 | 12 | Paul Carensac | <pre> |
206 | 12 | Paul Carensac | from app.tasks import my_task |
207 | 12 | Paul Carensac | result = my_task.delay(4, 4) |
208 | 12 | Paul Carensac | result.get() # blocking if the task is not finished |
209 | 12 | Paul Carensac | </pre> |
210 | 12 | Paul Carensac | |
211 | 12 | Paul Carensac | You will also want to *stop a task*, given a task id : |
212 | 12 | Paul Carensac | <pre> |
213 | 12 | Paul Carensac | from celery.task.control import revoke |
214 | 12 | Paul Carensac | |
215 | 12 | Paul Carensac | # To delete a pending task (waiting in queue) |
216 | 12 | Paul Carensac | revoke(task_id) |
217 | 12 | Paul Carensac | # OR, if you want the task to be aborted even during its execution |
218 | 12 | Paul Carensac | revoke(task_id, terminate=True) |
219 | 12 | Paul Carensac | # You can retrieve task_id with the 'result' (see above) ==> result.id |
220 | 12 | Paul Carensac | </pre> |
221 | 12 | Paul Carensac | |
222 | 10 | Paul Carensac | --- |
223 | 10 | Paul Carensac | |
224 | 15 | Paul Carensac | h3. Comet |
225 | 15 | Paul Carensac | |
226 | 15 | Paul Carensac | * *I - Purpose* |
227 | 15 | Paul Carensac | Comet is used to receive and send VOEvents. It is called a voevent broker. |
228 | 15 | Paul Carensac | It is *very* easy to use |
229 | 15 | Paul Carensac | |
230 | 15 | Paul Carensac | * *II - Features* |
231 | 15 | Paul Carensac | |
232 | 15 | Paul Carensac | * Send a voevent, giving a port, a host and a XML file |
233 | 15 | Paul Carensac | * Listen for voevent reception, and store voevents in a given directory |
234 | 15 | Paul Carensac | |
235 | 15 | Paul Carensac | * *III - How to use it ?* |
236 | 15 | Paul Carensac | |
237 | 15 | Paul Carensac | * To send a voevent : |
238 | 15 | Paul Carensac | <pre> |
239 | 15 | Paul Carensac | # Options --host and --port are optionnal. Default values : 'localhost' and '8089' |
240 | 15 | Paul Carensac | $ comet-sendvo --host=localhost --port=8098 -f voevent_to_publish.xml |
241 | 15 | Paul Carensac | </pre> |
242 | 15 | Paul Carensac | * To receive voevents : |
243 | 15 | Paul Carensac | <pre> |
244 | 17 | Paul Carensac | $ twistd comet --receive --save-event --save-event-directory=some/directory --remote=localhost:5632 --local-ivo=ivo://irap/pyros |
245 | 1 | Paul Carensac | </pre> |
246 | 16 | Paul Carensac | |
247 | 15 | Paul Carensac | --- |
248 | 15 | Paul Carensac | |
249 | 1 | Paul Carensac | h3. Threading library |
250 | 5 | Paul Carensac | |
251 | 5 | Paul Carensac | * *I - Purpose* |
252 | 5 | Paul Carensac | |
253 | 5 | Paul Carensac | * Simply create threads with basic communication |
254 | 5 | Paul Carensac | * Allows to handle concurrent access |
255 | 5 | Paul Carensac | |
256 | 5 | Paul Carensac | * *II - Features* |
257 | 5 | Paul Carensac | |
258 | 5 | Paul Carensac | Provides : |
259 | 5 | Paul Carensac | |
260 | 5 | Paul Carensac | * A Thread class to inherit from, with a run() method that will be called when the thread starts |
261 | 5 | Paul Carensac | * An Event class to set/unset a boolean in order to transmit message to the thread |
262 | 5 | Paul Carensac | * Lock and RLock object to handle concurrent access |
263 | 5 | Paul Carensac | |
264 | 5 | Paul Carensac | * *III - How to use it ?* |
265 | 5 | Paul Carensac | |
266 | 5 | Paul Carensac | <pre>from threading import Thread, Event</pre> |
267 | 5 | Paul Carensac | |
268 | 5 | Paul Carensac | * Thread |
269 | 5 | Paul Carensac | |
270 | 5 | Paul Carensac | * Create a class inheriting from Thread |
271 | 5 | Paul Carensac | * Override 'run' method, that will be called at thread start |
272 | 5 | Paul Carensac | * Instantiate your class, and do MyClass.start() to create the thread |
273 | 5 | Paul Carensac | |
274 | 5 | Paul Carensac | * Event |
275 | 5 | Paul Carensac | |
276 | 5 | Paul Carensac | * Create an Event variable in your Thread-inheriting class (eg: 'self.stop_event = Event()') |
277 | 5 | Paul Carensac | * After thread starts, you can set/unset the event by doing MyClass.stop_event.set() / .clear() |
278 | 5 | Paul Carensac | * There are a few useful methods, see this link for further information : https://docs.python.org/3/library/threading.html#threading.Event |
279 | 5 | Paul Carensac | * Lock / RLock |
280 | 5 | Paul Carensac | |
281 | 5 | Paul Carensac | * Still not used, see online documentation : https://docs.python.org/3/library/threading.html#lock-objects |
282 | 10 | Paul Carensac | |
283 | 10 | Paul Carensac | --- |
284 | 2 | Paul Carensac | |
285 | 2 | Paul Carensac | h3. Socket library |
286 | 2 | Paul Carensac | |
287 | 6 | Paul Carensac | * *I - Purpose* |
288 | 6 | Paul Carensac | |
289 | 6 | Paul Carensac | * Handle network communication, just giving IP and Port of the interlocutors |
290 | 6 | Paul Carensac | |
291 | 6 | Paul Carensac | * *II - Features* |
292 | 6 | Paul Carensac | |
293 | 6 | Paul Carensac | * 'server' system to create an interface, waiting for client connections and sending / receiving data from them |
294 | 6 | Paul Carensac | * 'client' system to connect to a server, and send/receive data from it |
295 | 6 | Paul Carensac | |
296 | 6 | Paul Carensac | * *III - How to use it ?* |
297 | 6 | Paul Carensac | |
298 | 6 | Paul Carensac | * Server |
299 | 6 | Paul Carensac | |
300 | 6 | Paul Carensac | * Instantiate socket and wait for connections |
301 | 6 | Paul Carensac | <pre> |
302 | 6 | Paul Carensac | self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # create the socket |
303 | 6 | Paul Carensac | self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # for the port to be immediately re-usable after closing the socket |
304 | 6 | Paul Carensac | self.server_socket.bind((self.ip, self.receive_port)) # associate the socket to an ip and a port |
305 | 6 | Paul Carensac | self.server_socket.listen(12) # wait for connections (here, 12 connections can be simultaneously waiting for acceptance) |
306 | 6 | Paul Carensac | </pre> |
307 | 6 | Paul Carensac | * Accept connections |
308 | 6 | Paul Carensac | <pre> |
309 | 6 | Paul Carensac | conn, addr = self.server_socket.accept() # conn is a new socket created at the connection |
310 | 6 | Paul Carensac | </pre> |
311 | 6 | Paul Carensac | * Exchanging messages |
312 | 6 | Paul Carensac | <pre> |
313 | 6 | Paul Carensac | conn.send(bytes(message, 'UTF-8')) # sending |
314 | 6 | Paul Carensac | data = conn.recv(self.buffer_size).decode() # receiving |
315 | 6 | Paul Carensac | </pre> |
316 | 6 | Paul Carensac | * Closing sockets when you're done with them |
317 | 6 | Paul Carensac | <pre> |
318 | 6 | Paul Carensac | conn.close() |
319 | 6 | Paul Carensac | ... |
320 | 6 | Paul Carensac | server_socket.close() |
321 | 6 | Paul Carensac | </pre> |
322 | 6 | Paul Carensac | |
323 | 6 | Paul Carensac | * Client |
324 | 6 | Paul Carensac | |
325 | 6 | Paul Carensac | * Instantiate the socket and connect to a server |
326 | 6 | Paul Carensac | <pre> |
327 | 6 | Paul Carensac | client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
328 | 6 | Paul Carensac | client_socket.connect((dest_ip, dest_receive_port)) |
329 | 6 | Paul Carensac | </pre> |
330 | 6 | Paul Carensac | * Exchanging messages |
331 | 6 | Paul Carensac | <pre> |
332 | 6 | Paul Carensac | client_socket.send(bytes(message, 'UTF-8')) # sending |
333 | 6 | Paul Carensac | data = client_socket.recv(self.buffer_size).decode() # receiving |
334 | 6 | Paul Carensac | </pre> |
335 | 6 | Paul Carensac | * Closing sockets when you're done with them |
336 | 6 | Paul Carensac | <pre> |
337 | 6 | Paul Carensac | client_socket.close() |
338 | 6 | Paul Carensac | </pre> |
339 | 6 | Paul Carensac | |
340 | 2 | Paul Carensac | --- |