Source code for panoramisk.fast_agi
import logging
import asyncio
from collections import OrderedDict
from .utils import parse_agi_result
log = logging.getLogger(__name__)
class Request:
def __init__(self, app, headers, reader, writer, encoding='utf-8'):
self.app = app
self.headers = headers
self.reader = reader
self.writer = writer
self.encoding = encoding
async def send_command(self, command):
"""Send a command for FastAGI request:
:param command: Command to launch on FastAGI request. Ex: 'EXEC StartMusicOnHolds'
:type command: String
:Example:
::
async def call_waiting(request):
print(['AGI variables:', request.headers])
await request.send_command('ANSWER')
await request.send_command('EXEC StartMusicOnHold')
await request.send_command('EXEC Wait 10')
"""
command += '\n'
self.writer.write(command.encode(self.encoding))
await self.writer.drain()
agi_result = await self._read_result()
# If Asterisk returns `100 Trying...`, wait for next the response.
while agi_result.get('status_code') == 100:
agi_result = await self._read_result()
# when we got AGIUsageError the following line contains some indication
if 'error' in agi_result and agi_result['error'] == 'AGIUsageError':
buff_usage_error = await self.reader.readline()
agi_result['msg'] += buff_usage_error.decode(self.encoding)
return agi_result
async def _read_result(self):
"""Parse read a response from the AGI and parse it.
:return dict: The AGI response parsed into a dict.
"""
response = await self.reader.readline()
return parse_agi_result(response.decode(self.encoding)[:-1])
[docs]class Application(dict):
"""Main object:
.. code-block:: python
>>> fa_app = Application()
"""
buf_size = 100
def __init__(self, default_encoding='utf-8', loop=None):
super(Application, self).__init__()
self.default_encoding = default_encoding
if loop is None:
loop = asyncio.get_event_loop()
self.loop = loop
self._route = OrderedDict()
[docs] def add_route(self, path, endpoint):
"""Add a route for FastAGI requests:
:param path: URI to answer. Ex: 'calls/start'
:type path: String
:param endpoint: command to launch. Ex: start
:type endpoint: callable
:Example:
::
async def start(request):
print('Receive a FastAGI request')
print(['AGI variables:', request.headers])
fa_app = Application()
fa_app.add_route('calls/start', start)
"""
assert callable(endpoint), endpoint
if path in self._route:
raise ValueError('A route already exists.')
if not asyncio.iscoroutinefunction(endpoint):
endpoint = asyncio.coroutine(endpoint)
self._route[path] = endpoint
[docs] def del_route(self, path):
"""Delete a route for FastAGI requests:
:param path: URI to answer. Ex: 'calls/start'
:type path: String
:Example:
::
async def start(request):
print('Receive a FastAGI request')
print(['AGI variables:', request.headers])
fa_app = Application()
fa_app.add_route('calls/start', start)
fa_app.del_route('calls/start')
"""
if path not in self._route:
raise ValueError('This route doesn\'t exist.')
del(self._route[path])
[docs] async def handler(self, reader, writer):
"""AsyncIO coroutine handler to launch socket listening.
:Example:
::
async def start(request):
print('Receive a FastAGI request')
print(['AGI variables:', request.headers])
fa_app = Application()
fa_app.add_route('calls/start', start)
coro = asyncio.start_server(fa_app.handler, '0.0.0.0', 4574)
server = loop.run_until_complete(coro)
See https://docs.python.org/3/library/asyncio-stream.html
"""
buffer = b''
while b'\n\n' not in buffer:
buffer += await reader.read(self.buf_size)
lines = buffer[:-2].decode(self.default_encoding).split('\n')
headers = OrderedDict([
line.split(': ', 1) for line in lines if ': ' in line
])
agi_network_script = headers.get('agi_network_script')
log.info('Received FastAGI request from %r for "%s" route',
writer.get_extra_info('peername'), agi_network_script)
log.debug("Asterisk Headers: %r", headers)
if agi_network_script is not None:
route = self._route.get(agi_network_script)
if route is not None:
request = Request(app=self,
headers=headers,
reader=reader, writer=writer,
encoding=self.default_encoding)
try:
await route(request)
except BaseException:
log.exception(
'An exception has been raised for the request "%s"',
agi_network_script
)
else:
log.error('No route for the request "%s"', agi_network_script)
else:
log.error('No agi_network_script header for the request')
log.debug("Closing client socket")
writer.close()