Technical components

Version 12 (Paul Carensac, 05/04/2016 05:06 pm)

1 1 Paul Carensac
h1. Technical components
2 2 Paul Carensac
3 8 Paul Carensac
Explanations about the technical components of the project : the ones we have created (internal), and the imported ones (external).
4 2 Paul Carensac
5 2 Paul Carensac
---
6 2 Paul Carensac
7 2 Paul Carensac
h2. %{margin-left:0px; font-weight:bold; font-size:25px;  display:block; color:red;}Internal components%
8 2 Paul Carensac
9 2 Paul Carensac
h3. Agent
10 1 Paul Carensac
11 4 Paul Carensac
The Agent class is in the common.agent.py file.
12 4 Paul Carensac
13 4 Paul Carensac
 * *I - Purpose*
14 4 Paul Carensac
15 4 Paul Carensac
    * Generically handles and creates the asynchronous modules
16 4 Paul Carensac
    * Uses the threading library (see below in External components) to make all modules independent
17 4 Paul Carensac
    * Provides an abstract class to be inherited
18 4 Paul Carensac
19 4 Paul Carensac
 * *II - Features*
20 4 Paul Carensac
21 4 Paul Carensac
    * Uses a config file (pyros_agent_config.ini) to set the network communication interface of all agents
22 4 Paul Carensac
    * Provides a 'work' method to override : this is the entry method of the newly created thread (see 'How to use it' section below)
23 4 Paul Carensac
    * Provides the 'receive' and 'analyze_message' methods to generically receive messages from network and analyze them 
24 4 Paul Carensac
25 4 Paul Carensac
 * *III - How to use it ?*
26 4 Paul Carensac
27 4 Paul Carensac
    Each of these points are +NECESSARY+
28 4 Paul Carensac
29 4 Paul Carensac
    * Create a new class that inherits from Agent
30 4 Paul Carensac
    * In the __init__ method, first call the __init__ method of Agent, passing the name of the agent as second parameter (they are defined in the Agent class, eg: Agent.SCHEDULER)
31 4 Paul Carensac
    * Inside the class, define the messages your agent can receive (eg: MSG_OBS_FINISHED = "Observation finished")
32 4 Paul Carensac
    * Create a method to be called for every message you created
33 4 Paul Carensac
    * In the __init__, after calling the Agent's __init__, associate each message to its associated function in the 'self.actions_by_message' dictionary (eg: self.actions_by_message[self.MSG_OBS_FINISHED] = self.observation_finished)
34 4 Paul Carensac
    * Override the method work : this will be the entry function of the new thread, so do whatever you need. This MUST NOT be an infinite loop, because Agent's receive method will be called after this one
35 4 Paul Carensac
    * If ever needed, override the 'shutdown' method, it will be called when your agent receive the Agent.SHUTDOWN message (eg: if you created another thread in the 'work' method, you need to close it)
36 5 Paul Carensac
    * To start the agent, just instantiate your class and do MyClass.start() (the 'work' method will be called)
37 1 Paul Carensac
38 5 Paul Carensac
    The main points to understand are that you can do whatever you want (but non-blocking) in work method (like creating new threads or variables' initialization), then the only entry points are the message-associated methods
39 4 Paul Carensac
40 7 Paul Carensac
 * *IV - Important : pyros agents launching*
41 7 Paul Carensac
42 7 Paul Carensac
    * In pyros, there is maximum 1 agent per application
43 7 Paul Carensac
    * The agent must be started at application start :
44 7 Paul Carensac
        
45 7 Paul Carensac
        * In the MyApp.apps.py file, create a class inheriting from django.apps.AppConfig
46 7 Paul Carensac
        * Define the 'name' attribute in it, giving it the name of the agent
47 7 Paul Carensac
        * Create a 'ready(self)' method
48 7 Paul Carensac
        * in the ready method, import your agent implementation, instantiate it and start it
49 7 Paul Carensac
<pre>
50 7 Paul Carensac
from django.apps import AppConfig
51 7 Paul Carensac
52 7 Paul Carensac
53 7 Paul Carensac
class AlertManagerConfig(AppConfig):
54 7 Paul Carensac
    name = 'alert_manager'
55 7 Paul Carensac
    
56 7 Paul Carensac
    def ready(self):
57 7 Paul Carensac
        from alert_manager.agent import AlertManagerAgent
58 7 Paul Carensac
        self.agent = AlertManagerAgent()
59 7 Paul Carensac
        self.agent.start()
60 7 Paul Carensac
</pre>
61 7 Paul Carensac
62 2 Paul Carensac
h3. Sender
63 1 Paul Carensac
64 4 Paul Carensac
The Sender class is in the common.sender.py file
65 4 Paul Carensac
66 4 Paul Carensac
 * *I - Purpose*
67 4 Paul Carensac
68 4 Paul Carensac
    * Send a given message to an agent
69 4 Paul Carensac
70 4 Paul Carensac
 * *II - Features*
71 4 Paul Carensac
72 4 Paul Carensac
    * Uses the 'pyros_agent_config.ini' file to get the agents' network interface configuration (ip and port)
73 4 Paul Carensac
    * Provide a 'send_to' static method to send the messages
74 4 Paul Carensac
75 4 Paul Carensac
 * *III - How to use it ?*
76 4 Paul Carensac
77 4 Paul Carensac
    * The targeted agent must be described in 'pyros_agent_config.ini'
78 4 Paul Carensac
    * Use Sender.send_to method, giving as first parameter the name of the targeted agent (eg: Agent.SCHEDULER), and as second parameter the message (eg: Agent.SHUTDOWN)
79 4 Paul Carensac
    * /!\ send_to is a static method, you don't need to instantiate a Sender (just do Sender.send_to(...))
80 9 Paul Carensac
81 2 Paul Carensac
---
82 2 Paul Carensac
83 2 Paul Carensac
h2. %{margin-left:0px; font-weight:bold; font-size:25px;  display:block; color:red;}External components%
84 2 Paul Carensac
85 10 Paul Carensac
h3. Celery
86 10 Paul Carensac
87 10 Paul Carensac
 * *I - Purpose*
88 11 Paul Carensac
    Celery is used to create and handle tasks in different processes/threads (*called workers*).
89 10 Paul Carensac
    Its use is very easy.
90 10 Paul Carensac
91 10 Paul Carensac
 * *II - Features*
92 10 Paul Carensac
93 10 Paul Carensac
    * Create personalized tasks asynchronously
94 10 Paul Carensac
    * Has ETA and countdowns
95 10 Paul Carensac
    * Lots of configurations are possible
96 10 Paul Carensac
97 10 Paul Carensac
 * *III - How to use it ?*
98 1 Paul Carensac
99 12 Paul Carensac
    With Django, you first need to create a *celery.py* in project's folder (the one containing settings.py) :
100 11 Paul Carensac
<pre>
101 11 Paul Carensac
from __future__ import absolute_import
102 11 Paul Carensac
import os
103 11 Paul Carensac
from celery import Celery
104 1 Paul Carensac
105 11 Paul Carensac
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'PROJECT_NAME.settings')
106 11 Paul Carensac
107 11 Paul Carensac
from django.conf import settings
108 11 Paul Carensac
109 11 Paul Carensac
app = Celery('PROJECT_NAME')
110 11 Paul Carensac
111 11 Paul Carensac
app.config_from_object('django.conf:settings')
112 11 Paul Carensac
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
113 11 Paul Carensac
114 11 Paul Carensac
@app.task(bind=True)
115 11 Paul Carensac
def debug_task(self):
116 11 Paul Carensac
    print("Request: {0!r}".format(self.request))
117 11 Paul Carensac
</pre>
118 11 Paul Carensac
119 11 Paul Carensac
    /!\ Replace PROJECT_NAME by your project's name (pyros in our case)
120 11 Paul Carensac
121 12 Paul Carensac
    In the project's *__init__.py* (pyros/__init__.py), add this code :
122 11 Paul Carensac
<pre>
123 11 Paul Carensac
from __future__ import absolute_import
124 11 Paul Carensac
125 11 Paul Carensac
from .celery import app as celery_app
126 11 Paul Carensac
</pre>
127 11 Paul Carensac
128 12 Paul Carensac
    Now you can *declare tasks* and configure them.
129 12 Paul Carensac
    There are two ways of declaring tasks, but the only one interesting us is the *'class form'* :
130 11 Paul Carensac
<pre>
131 11 Paul Carensac
# my_app/operations.py
132 11 Paul Carensac
133 11 Paul Carensac
from __future__ import absolute_import
134 11 Paul Carensac
135 11 Paul Carensac
from celery import shared_task
136 11 Paul Carensac
137 11 Paul Carensac
from time import sleep
138 11 Paul Carensac
139 11 Paul Carensac
@shared_task
140 11 Paul Carensac
def mul(x, y):
141 11 Paul Carensac
    sleep(3)
142 11 Paul Carensac
    return x * y
143 11 Paul Carensac
</pre>
144 11 Paul Carensac
145 12 Paul Carensac
    Then you need to *register the task in settings.py*.
146 11 Paul Carensac
    To do this, you just need to indicate the file containing the task in the CELERY_IMPORTS tuple :
147 11 Paul Carensac
<pre>
148 11 Paul Carensac
    CELERY_IMPORTS = ("my_app.operations",)
149 11 Paul Carensac
</pre>
150 11 Paul Carensac
151 11 Paul Carensac
    Task are registered in queues when created (task creation is explained below).
152 12 Paul Carensac
    You will want to *specify in which queue a task is registered* (routed), in settings.py :
153 11 Paul Carensac
<pre>
154 11 Paul Carensac
CELERY_ROUTES = {
155 11 Paul Carensac
    "my_app.operations.mul": {"queue": "my_operations_queue"},
156 11 Paul Carensac
    "app2.scheduler.Scheduler": {"queue": "scheduling_q"},
157 11 Paul Carensac
}
158 11 Paul Carensac
</pre>
159 11 Paul Carensac
160 12 Paul Carensac
    In pyros, we want only one process by queue, to avoid several scheduling at the same time for example. There are also other *useful configurations* :
161 11 Paul Carensac
<pre>
162 11 Paul Carensac
# pyros/settings.py
163 11 Paul Carensac
''' This settings is for having only 1 process (worker) by queue '''
164 11 Paul Carensac
CELERYD_CONCURRENCY = 1
165 11 Paul Carensac
166 11 Paul Carensac
''' These settings is for the worker to take only 1 task at the same time '''
167 11 Paul Carensac
CELERY_ACKS_LATE = False
168 11 Paul Carensac
CELERYD_PREFETCH_MULTIPLIER = 1
169 11 Paul Carensac
170 11 Paul Carensac
''' Removes pickle warning '''
171 11 Paul Carensac
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
172 11 Paul Carensac
173 11 Paul Carensac
''' The way the tasks result are retrieved '''
174 11 Paul Carensac
CELERY_RESULT_BACKEND = 'rpc://'
175 11 Paul Carensac
176 11 Paul Carensac
</pre>
177 11 Paul Carensac
178 12 Paul Carensac
    There we are ! We can now start our workers and create tasks.
179 12 Paul Carensac
    To *start the worker* :
180 11 Paul Carensac
<pre>
181 1 Paul Carensac
$ celery worker -A pyros -Q my_operations_queue
182 1 Paul Carensac
</pre>
183 1 Paul Carensac
184 12 Paul Carensac
    To *create a task* (in python) :
185 1 Paul Carensac
<pre>
186 1 Paul Carensac
from my_app.oparations import mul
187 1 Paul Carensac
mul.delay(4, 5)
188 1 Paul Carensac
</pre>
189 1 Paul Carensac
190 1 Paul Carensac
    In the terminal where you started your worker, you can see the task and its result !
191 12 Paul Carensac
192 12 Paul Carensac
    In your code, you can *wait for a task to be finished*, and retrieve its result :
193 12 Paul Carensac
<pre>
194 12 Paul Carensac
from app.tasks import my_task
195 12 Paul Carensac
result = my_task.delay(4, 4)
196 12 Paul Carensac
result.get() # blocking if the task is not finished
197 12 Paul Carensac
</pre>
198 12 Paul Carensac
199 12 Paul Carensac
    You will also want to *stop a task*, given a task id :
200 12 Paul Carensac
<pre>
201 12 Paul Carensac
from celery.task.control import revoke
202 12 Paul Carensac
203 12 Paul Carensac
# To delete a pending task (waiting in queue)
204 12 Paul Carensac
revoke(task_id)
205 12 Paul Carensac
# OR, if you want the task to be aborted even during its execution
206 12 Paul Carensac
revoke(task_id, terminate=True)
207 12 Paul Carensac
# You can retrieve task_id with the 'result' (see above) ==> result.id
208 12 Paul Carensac
</pre>
209 12 Paul Carensac
210 10 Paul Carensac
---
211 10 Paul Carensac
212 1 Paul Carensac
h3. Threading library
213 5 Paul Carensac
214 5 Paul Carensac
 * *I - Purpose*
215 5 Paul Carensac
216 5 Paul Carensac
    * Simply create threads with basic communication
217 5 Paul Carensac
    * Allows to handle concurrent access
218 5 Paul Carensac
219 5 Paul Carensac
 * *II - Features*
220 5 Paul Carensac
221 5 Paul Carensac
    Provides :
222 5 Paul Carensac
223 5 Paul Carensac
    * A Thread class to inherit from, with a run() method that will be called when the thread starts
224 5 Paul Carensac
    * An Event class to set/unset a boolean in order to transmit message to the thread
225 5 Paul Carensac
    * Lock and RLock object to handle concurrent access
226 5 Paul Carensac
227 5 Paul Carensac
 * *III - How to use it ?*
228 5 Paul Carensac
229 5 Paul Carensac
    <pre>from threading import Thread, Event</pre>
230 5 Paul Carensac
231 5 Paul Carensac
    * Thread
232 5 Paul Carensac
233 5 Paul Carensac
        * Create a class inheriting from Thread
234 5 Paul Carensac
        * Override 'run' method, that will be called at thread start
235 5 Paul Carensac
        * Instantiate your class, and do MyClass.start() to create the thread
236 5 Paul Carensac
237 5 Paul Carensac
    * Event
238 5 Paul Carensac
239 5 Paul Carensac
        * Create an Event variable in your Thread-inheriting class (eg: 'self.stop_event = Event()')
240 5 Paul Carensac
        * After thread starts, you can set/unset the event by doing MyClass.stop_event.set() / .clear()
241 5 Paul Carensac
        * There are a few useful methods, see this link for further information : https://docs.python.org/3/library/threading.html#threading.Event
242 5 Paul Carensac
    * Lock / RLock
243 5 Paul Carensac
244 5 Paul Carensac
        * Still not used, see online documentation : https://docs.python.org/3/library/threading.html#lock-objects
245 10 Paul Carensac
246 10 Paul Carensac
---
247 2 Paul Carensac
248 2 Paul Carensac
h3. Socket library
249 2 Paul Carensac
250 6 Paul Carensac
 * *I - Purpose*
251 6 Paul Carensac
252 6 Paul Carensac
    * Handle network communication, just giving IP and Port of the interlocutors
253 6 Paul Carensac
254 6 Paul Carensac
 * *II - Features*
255 6 Paul Carensac
256 6 Paul Carensac
    * 'server' system to create an interface, waiting for client connections and sending / receiving data from them
257 6 Paul Carensac
    * 'client' system to connect to a server, and send/receive data from it
258 6 Paul Carensac
259 6 Paul Carensac
 * *III - How to use it ?*
260 6 Paul Carensac
261 6 Paul Carensac
    * Server
262 6 Paul Carensac
263 6 Paul Carensac
        * Instantiate socket and wait for connections
264 6 Paul Carensac
<pre>
265 6 Paul Carensac
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)   # create the socket
266 6 Paul Carensac
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # for the port to be immediately re-usable after closing the socket
267 6 Paul Carensac
self.server_socket.bind((self.ip, self.receive_port))                    # associate the socket to an ip and a port
268 6 Paul Carensac
self.server_socket.listen(12)                                            # wait for connections (here, 12 connections can be simultaneously waiting for acceptance)
269 6 Paul Carensac
</pre>
270 6 Paul Carensac
        * Accept connections
271 6 Paul Carensac
<pre>
272 6 Paul Carensac
conn, addr = self.server_socket.accept() # conn is a new socket created at the connection
273 6 Paul Carensac
</pre>
274 6 Paul Carensac
        * Exchanging messages
275 6 Paul Carensac
<pre>
276 6 Paul Carensac
conn.send(bytes(message, 'UTF-8'))          # sending
277 6 Paul Carensac
data = conn.recv(self.buffer_size).decode() # receiving
278 6 Paul Carensac
</pre>
279 6 Paul Carensac
        * Closing sockets when you're done with them
280 6 Paul Carensac
<pre>
281 6 Paul Carensac
conn.close()
282 6 Paul Carensac
...
283 6 Paul Carensac
server_socket.close()
284 6 Paul Carensac
</pre>
285 6 Paul Carensac
286 6 Paul Carensac
    * Client
287 6 Paul Carensac
288 6 Paul Carensac
        * Instantiate the socket and connect to a server
289 6 Paul Carensac
<pre>
290 6 Paul Carensac
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
291 6 Paul Carensac
client_socket.connect((dest_ip, dest_receive_port))
292 6 Paul Carensac
</pre>
293 6 Paul Carensac
        * Exchanging messages
294 6 Paul Carensac
<pre>
295 6 Paul Carensac
client_socket.send(bytes(message, 'UTF-8'))          # sending
296 6 Paul Carensac
data = client_socket.recv(self.buffer_size).decode() # receiving
297 6 Paul Carensac
</pre>
298 6 Paul Carensac
        * Closing sockets when you're done with them
299 6 Paul Carensac
<pre>
300 6 Paul Carensac
client_socket.close()
301 6 Paul Carensac
</pre>
302 6 Paul Carensac
303 2 Paul Carensac
---