"""
Comming soon
"""
from urllib.parse import unquote
from copy import copy, deepcopy
from rest_framework.views import APIView
from django.http import HttpResponse
from structlog import get_logger
import staticconf
from django.conf import settings
from importlib import import_module
from django.contrib.sessions.backends import signed_cookies
from django.contrib.sessions.backends.base import CreateError
from jinja2 import Template, Environment, TemplateSyntaxError
from .screens.serializers import UssdBaseSerializer
from rest_framework.serializers import SerializerMetaclass
import re
import json
import os
from configure import Configuration
from datetime import datetime
from ussd.models import SessionLookup
from annoying.functions import get_object_or_None
from ussd import defaults as ussd_airflow_variables
from django.utils import timezone
import requests
import inspect
from ussd.tasks import report_session
from ussd import utilities
_registered_ussd_handlers = {}
_registered_filters = {}
_customer_journey_files = []
_built_in_functions = {}
# initialize jinja2 environment
env = Environment(keep_trailing_newline=True)
env.filters.update(_registered_filters)
class MissingAttribute(Exception):
pass
class InvalidAttribute(Exception):
pass
class DuplicateSessionId(Exception):
pass
def register_filter(func_name, *args, **kwargs):
filter_name = func_name.__name__
_registered_filters[filter_name] = func_name
def register_function(func_name, *args, **kwargs):
function_name = func_name.__name__
_built_in_functions[function_name] = func_name
def get_session_engine():
session_engine = import_module(getattr(settings, "USSD_SESSION_ENGINE",
settings.SESSION_ENGINE))
if session_engine is signed_cookies:
raise ValueError("You cannot use channels session "
"functionality with signed cookie sessions!")
return session_engine
def ussd_session(session_id):
session = get_session_engine().SessionStore(session_key=session_id)
session._session.keys()
session._session_key = session_id
# If the session does not already exist, save to force our
# session key to be valid.
if not session.exists(session.session_key):
try:
session.save(must_create=True)
except CreateError:
# Session wasn't unique, so another consumer is doing the same thing
raise DuplicateSessionId("another sever is working"
"on this session id")
return session
def generate_session_id():
session_store = get_session_engine().SessionStore()
session_store.save() # generate session_key
return session_store.session_key
def load_yaml(file_path, namespace):
file_path = Template(file_path).render(os.environ)
yaml_dict = Configuration.from_file(
os.path.abspath(file_path),
configure=False
)
staticconf.DictConfiguration(
yaml_dict,
namespace=namespace,
flatten=False)
class UssdRequest(object):
"""
:param session_id:
used to get session or create session if does not
exits.
If session is less than 8 we add *s* to make the session
equal to 8
:param phone_number:
This the user identifier
:param input:
This ussd input the user has entered.
:param language:
Language to use to display ussd
:param kwargs:
Extra arguments.
All the extra arguments will be set to the self attribute
For instance:
.. code-block:: python
from ussd.core import UssdRequest
ussdRequest = UssdRequest(
'12345678', '702729654', '1', 'en',
name='mwas'
)
# accessing kwarg argument
ussdRequest.name
"""
def __init__(self, session_id, phone_number,
ussd_input, language, default_language=None,
use_built_in_session_management=False,
expiry=180,
**kwargs):
"""
:param session_id: Used to maintain session
:param phone_number: user dialing in
:param ussd_input: input entered by user
:param language: language to be used
:param default_language: language to used
:param use_built_in_session_management: Used to enable ussd_airflow to
manage its own session, by default its set to False, is set to true
then the session_id should be None and expiry can't be None.
:param expiry: Its only used if use_built_in_session_management has
been enabled.
:param kwargs: All other extra arguments
"""
self.expiry = expiry
# A bit of defensive programming to make sure
# session_built_in_management has been initiated
if use_built_in_session_management and session_id is not None:
raise InvalidAttribute("When using built_in_session_management "
"has been enabled session_id should "
"be None")
if use_built_in_session_management and expiry is None:
raise InvalidAttribute("When built_in_session_management has been"
"enabled expiry should not be None")
# session id should not be None if built in session management
# has not been enabled
if session_id is None and not use_built_in_session_management:
raise InvalidAttribute(
"Session id should not be None if built in session management "
"has not been enabled"
)
if use_built_in_session_management:
session_id = self.get_or_create_session_id(phone_number)
else:
# if session id is less than 8 should provide the
# suplimentary characters with 's'
if len(str(
session_id)) < 8 and not use_built_in_session_management:
session_id = 's' * (8 - len(str(session_id))) + session_id
self.phone_number = phone_number
self.input = unquote(ussd_input)
self.language = language
self.default_language = default_language or 'en'
self.session_id = session_id
self.session = ussd_session(self.session_id)
for key, value in kwargs.items():
setattr(self, key, value)
def forward(self, handler_name):
"""
Forwards a copy of the current request to a new
handler. Clears any input, as it is assumed this was meant for
the previous handler. If you need to pass info between
handlers, do it through the USSD session.
"""
new_request = copy(self)
new_request.input = ''
return new_request, handler_name
def all_variables(self):
all_variables = copy(self.__dict__)
# delete session if it exist
all_variables.pop("session", None)
return all_variables
def get_or_create_session_id(self, user_id):
session_mapping = get_object_or_None(SessionLookup, user_id=user_id)
# if its missing create a new one.
if session_mapping is None:
session_mapping = SessionLookup.objects.create(
user_id=user_id,
session_id=generate_session_id()
)
else:
session = ussd_session(session_mapping.session_id)
# get last time session was updated
if session.get(ussd_airflow_variables.last_update):
last_updated = utilities.string_to_datetime(
session[ussd_airflow_variables.last_update])
else:
last_updated = timezone.make_naive(session_mapping.updated_at)
# check inactivity or if session has been closed
inactivity_duration = (datetime.now() - last_updated).total_seconds()
if inactivity_duration > self.expiry or \
session.get(ussd_airflow_variables.expiry):
# update session_mapping with the new session_id
session_mapping.session_id = generate_session_id()
session_mapping.save()
return session_mapping.session_id
[docs]class UssdResponse(object):
"""
:param text:
This is the ussd text to display to the user
:param status:
This shows the status of ussd session.
True -> to continue with the session
False -> to end the session
:param session:
This is the session object of the ussd session
"""
def __init__(self, text, status=True, session=None):
self.text = text
self.status = status
self.session = session
def dumps(self):
return self.text
def __str__(self):
return self.dumps()
class UssdViewMetaClass(type):
def __init__(cls,name,bases,attr,**kwargs):
super(UssdViewMetaClass,cls).__init__(
name,bases,attr)
path = getattr(cls,'customer_journey_conf')
if path is not None:
_customer_journey_files.append(getattr(cls,'customer_journey_conf'))
class UssdHandlerMetaClass(type):
def __init__(cls, name, bases, attr, **kwargs):
super(UssdHandlerMetaClass, cls).__init__(
name, bases, attr)
abstract = attr.get('abstract', False)
if not abstract or attr.get('screen_type', '') == 'custom_screen':
required_attributes = ('screen_type', 'serializer', 'handle')
# check all attributes have been defined
for attribute in required_attributes:
if attribute not in attr and not hasattr(cls, attribute):
raise MissingAttribute(
"{0} is required in class {1}".format(
attribute, name)
)
if not isinstance(attr['serializer'], SerializerMetaclass):
raise InvalidAttribute(
"serializer should be a "
"instance of {serializer}".format(
serializer=SerializerMetaclass)
)
_registered_ussd_handlers[attr['screen_type']] = cls
class UssdHandlerAbstract(object, metaclass=UssdHandlerMetaClass):
abstract = True
def __init__(self, ussd_request: UssdRequest,
handler: str, screen_content: dict,
initial_screen: dict, logger=None):
self.ussd_request = ussd_request
self.handler = handler
self.screen_content = screen_content
self.SINGLE_VAR = re.compile(r"^%s\s*(\w*)\s*%s$" % (
'{{', '}}'))
self.clean_regex = re.compile(r'^{{\s*(\S*)\s*}}$')
self.logger = logger or get_logger(__name__).bind(
handler=self.handler,
screen_type=getattr(self, 'screen_type', 'custom_screen'),
**ussd_request.all_variables(),
)
self.initial_screen = initial_screen
self.pagination_config = self.initial_screen.get('pagination_config',
{})
self.pagination_more_option = self._add_end_line(
self.get_text(
self.pagination_config.get('more_option', "more\n")
)
)
self.pagination_back_option = self._add_end_line(
self.get_text(
self.pagination_config.get('back_option', "back\n")
)
)
self.ussd_text_limit = self.pagination_config.\
get("ussd_text_limit", ussd_airflow_variables.ussd_text_limit)
def handle(self):
if not self.ussd_request.input:
ussd_response = self.show_ussd_content()
return ussd_response if isinstance(ussd_response, UssdResponse) \
else UssdResponse(str(ussd_response))
return self.handle_ussd_input(self.ussd_request.input)
def get_text_limit(self):
return self.ussd_text_limit
def show_ussd_content(self, **kwargs):
raise NotImplementedError
def handle_ussd_input(self, ussd_input):
raise NotImplementedError
def route_options(self, route_options=None):
"""
iterates all the options executing expression comand.
"""
if route_options is None:
route_options = self.screen_content["next_screen"]
if isinstance(route_options, str):
return self.ussd_request.forward(route_options)
loop_items = [0]
if self.screen_content.get("with_items"):
loop_items = self.evaluate_jija_expression(
self.screen_content["with_items"],
session=self.ussd_request.session
) or loop_items
for item in loop_items:
extra_context = {
"item": item
}
if isinstance(loop_items, dict):
extra_context.update(
dict(
key=item,
value=loop_items[item],
item={item: loop_items[item]}
)
)
for option in route_options:
if self.evaluate_jija_expression(
option.get('expression') or option['condition'],
session=self.ussd_request.session,
extra_context=extra_context
):
return self.ussd_request.forward(option['next_screen'])
return self.ussd_request.forward(
self.screen_content['default_next_screen']
)
@staticmethod
def get_session_items(session) -> dict:
return dict(iter(session.items()))
@classmethod
def get_context(cls, session, extra_context=None):
context = cls.get_session_items(session)
context.update(
dict(os.environ)
)
if extra_context is not None:
context.update(extra_context)
# add timestamp in the context
context.update(
dict(now=datetime.now())
)
# add all built in functions
context.update(
_built_in_functions
)
return context
@staticmethod
def render_text(session, text, context=None, extra=None, encode=None):
if context is None:
context = UssdHandlerAbstract.get_context(
session
)
if extra:
context.update(extra)
template = env.from_string(text or '')
text = template.render(context)
return json.dumps(text) if encode is 'json' else text
def get_text(self, text_context=None):
text_context = self.screen_content.get('text')\
if text_context is None \
else text_context
if isinstance(text_context, dict):
language = (self.ussd_request.session.get('override_language') or self.ussd_request.language) \
if self.ussd_request.language \
in text_context.keys() \
else self.ussd_request.default_language
text_context = text_context[language]
return self.render_text(
self.ussd_request.session,
text_context
)
@classmethod
def evaluate_jija_expression(cls, expression, session,
extra_context=None,
lazy_evaluating=False,
default=None):
if not isinstance(expression, str) or \
(lazy_evaluating and not cls._contains_vars(
expression)):
return expression
context = cls.get_context(
session, extra_context=extra_context)
try:
expr = env.compile_expression(
expression.replace("{{", "").replace("}}", "")
)
return expr(context)
except Exception:
try:
return env.from_string(expression or '').render(context)
except Exception:
return default
@classmethod
def validate(cls, screen_name: str, ussd_content: dict) -> (bool, dict):
screen_content = ussd_content[screen_name]
# adding screen name in context might be needed by validator
ussd_content['screen_name'] = screen_name
validation = cls.serializer(data=screen_content,
context=ussd_content)
del ussd_content['screen_name']
if validation.is_valid():
return True, {}
return False, validation.errors
@staticmethod
def _contains_vars(data):
'''
returns True if the data contains a variable pattern
'''
if isinstance(data, str):
for marker in ('{%', '{{', '{#'):
if marker in data:
return True
return False
@staticmethod
def _add_end_line(text):
if text and '\n' not in text:
text += '\n'
return text
def get_loop_items(self):
loop_items = self.evaluate_jija_expression(
self.screen_content["with_items"],
session=self.ussd_request.session
) if self.screen_content.get("with_items") else [0] or [0]
return loop_items
@classmethod
def render_request_conf(cls, session, data):
if isinstance(data, str):
jinja_results = cls.evaluate_jija_expression(data, session)
return data if jinja_results is None else jinja_results
elif isinstance(data, list):
list_data = []
for i in data:
list_data.append(cls.render_request_conf(
session, i))
return list_data
elif isinstance(data, dict):
dict_data = {}
for key, value in data.items():
dict_data.update(
{key: cls.render_request_conf(
session, value)}
)
return dict_data
else:
return data
@staticmethod
def get_variables_from_response_obj(response):
response_varialbes = {}
for i in inspect.getmembers(response):
# Ignores anything starting with underscore
# (that is, private and protected attributes)
if not i[0].startswith('_'):
# Ignores methods
if not inspect.ismethod(i[1]) and \
type(i[1]) in \
(str, dict, int, dict, float, list, tuple):
if len(i) == 2:
response_varialbes.update(
{i[0]: i[1]}
)
try:
response_content = json.loads(response.content.decode())
except json.JSONDecodeError:
response_content = response.content.decode()
if isinstance(response_content, dict):
response_varialbes.update(
response_content
)
# update content to save the one that has been decoded
response_varialbes.update(
{"content": response_content}
)
return response_varialbes
@classmethod
def make_request(cls, http_request_conf, response_session_key_save,
session, logger=None
):
logger = logger or get_logger(__name__).bind(
action="make_request",
session_id=session.session_key
)
logger.info("sending_request", **http_request_conf)
response = requests.request(**http_request_conf)
logger.info("response", status_code=response.status_code,
content=response.content)
response_to_save = cls.get_variables_from_response_obj(response)
# save response in session
session[response_session_key_save] = response_to_save
return response
@staticmethod
def fire_ussd_report_session_task(initial_screen: dict, session_id: str,
support_countdown=True):
ussd_report_session = initial_screen['ussd_report_session']
args = (session_id,)
kwargs = {'screen_content': initial_screen}
keyword_args = ussd_report_session.get("async_parameters",
{"countdown": 900}).copy()
if not support_countdown and keyword_args.get('countdown'):
del keyword_args['countdown']
report_session.apply_async(
args=args,
kwargs=kwargs,
**keyword_args
)
[docs]class UssdView(APIView, metaclass=UssdViewMetaClass):
"""
To create Ussd View requires the following things:
- Inherit from **UssdView** (Mandatory)
.. code-block:: python
from ussd.core import UssdView
- Define Http method either **get** or **post** (Mandatory)
The http method should return Ussd Request
.. autoclass:: ussd.core.UssdRequest
- define this varialbe *customer_journey_conf*
This is the path of the file that has ussd screens
If you want your file to be dynamic implement the
following method **get_customer_journey_conf** it
will be called by request object
- define this variable *customer_journey_namespace*
Ussd_airflow uses this namespace to save the
customer journey content in memory. If you want
customer_journey_namespace to be dynamic implement
this method **get_customer_journey_namespace** it
will be called with request object
- override HttpResponse
In ussd airflow the http method return UssdRequest object
not Http response. Then ussd view gets UssdResponse object
and convert it to HttpResponse. The default HttpResponse
returned is a normal HttpResponse with body being ussd text
To override HttpResponse returned define this method.
**ussd_response_handler** it will be called with
**UssdResponse** object.
.. autoclass:: ussd.core.UssdResponse
Example of Ussd view
.. code-block:: python
from ussd.core import UssdView, UssdRequest
class SampleOne(UssdView):
def get(self, req):
return UssdRequest(
phone_number=req.data['phoneNumber'].strip('+'),
session_id=req.data['sessionId'],
ussd_input=text,
service_code=req.data['serviceCode'],
language=req.data.get('language', 'en')
)
Example of Ussd View that defines its own HttpResponse.
.. code-block:: python
from ussd.core import UssdView, UssdRequest
class SampleOne(UssdView):
def get(self, req):
return UssdRequest(
phone_number=req.data['phoneNumber'].strip('+'),
session_id=req.data['sessionId'],
ussd_input=text,
service_code=req.data['serviceCode'],
language=req.data.get('language', 'en')
)
def ussd_response_handler(self, ussd_response):
if ussd_response.status:
res = 'CON' + ' ' + str(ussd_response)
response = HttpResponse(res)
else:
res = 'END' + ' ' + str(ussd_response)
response = HttpResponse(res)
return response
"""
customer_journey_conf = None
customer_journey_namespace = None
def initial(self, request, *args, **kwargs):
# initialize restframework
super(UssdView, self).initial(request, args, kwargs)
# initialize ussd
self.ussd_initial(request)
def ussd_initial(self, request, *args, **kwargs):
if hasattr(self, 'get_customer_journey_conf'):
self.customer_journey_conf = self.get_customer_journey_conf(
request
)
if hasattr(self, 'get_customer_journey_namespace'):
self.customer_journey_namespace = \
self.get_customer_journey_namespace(request)
if self.customer_journey_conf is None \
or self.customer_journey_namespace is None:
raise MissingAttribute("attribute customer_journey_conf and "
"customer_journey_namespace are required")
if not self.customer_journey_namespace in \
staticconf.config.configuration_namespaces:
load_yaml(
self.customer_journey_conf,
self.customer_journey_namespace
)
# confirm variable template has been loaded
# get initial screen
initial_screen = staticconf.read(
"initial_screen",
namespace=self.customer_journey_namespace)
if isinstance(initial_screen, dict) and \
initial_screen.get('variables'):
variable_conf = initial_screen['variables']
file_path = variable_conf['file']
namespace = variable_conf['namespace']
if not namespace in \
staticconf.config.configuration_namespaces:
load_yaml(file_path, namespace)
self.initial_screen = initial_screen \
if isinstance(initial_screen, dict) \
else {"initial_screen": initial_screen}
def finalize_response(self, request, response, *args, **kwargs):
if isinstance(response, UssdRequest):
self.logger = get_logger(__name__).bind(**response.all_variables())
try:
ussd_response = self.ussd_dispatcher(response)
except Exception as e:
if settings.DEBUG:
ussd_response = UssdResponse(str(e))
return self.ussd_response_handler(ussd_response)
return super(UssdView, self).finalize_response(
request, response, args, kwargs)
def ussd_response_handler(self, ussd_response):
return HttpResponse(str(ussd_response))
def ussd_dispatcher(self, ussd_request):
# Clear input and initialize session if we are starting up
if '_ussd_state' not in ussd_request.session:
ussd_request.input = ''
ussd_request.session['_ussd_state'] = {'next_screen': ''}
ussd_request.session['ussd_interaction'] = []
ussd_request.session['posted'] = False
ussd_request.session['submit_data'] = {}
ussd_request.session['session_id'] = ussd_request.session_id
ussd_request.session['phone_number'] = ussd_request.phone_number
# update ussd_request variable to session and template variables
# to be used later for jinja2 evaluation
ussd_request.session.update(ussd_request.all_variables())
# for backward compatibility
# there are some jinja template using ussd_request
# eg. {{ussd_request.session_id}}
ussd_request.session.update(
{"ussd_request": ussd_request.all_variables()}
)
self.logger.debug('gateway_request', text=ussd_request.input)
# Invoke handlers
ussd_response = self.run_handlers(ussd_request)
ussd_request.session[ussd_airflow_variables.last_update] = \
utilities.datetime_to_string(datetime.now())
# Save session
ussd_request.session.save()
self.logger.debug('gateway_response', text=ussd_response.dumps(),
input="{redacted}")
return ussd_response
def run_handlers(self, ussd_request):
handler = ussd_request.session['_ussd_state']['next_screen'] \
if ussd_request.session.get('_ussd_state', {}).get('next_screen') \
else "initial_screen"
ussd_response = (ussd_request, handler)
if handler != "initial_screen":
# get start time
start_time = utilities.string_to_datetime(
ussd_request.session["ussd_interaction"][-1]["start_time"])
end_time = datetime.now()
# Report in milliseconds
duration = (end_time - start_time).total_seconds() * 1000
ussd_request.session["ussd_interaction"][-1].update(
{
"input": ussd_request.input,
"end_time": utilities.datetime_to_string(end_time),
"duration": duration
}
)
# Handle any forwarded Requests; loop until a Response is
# eventually returned.
while not isinstance(ussd_response, UssdResponse):
ussd_request, handler = ussd_response
screen_content = staticconf.read(
handler,
namespace=self.customer_journey_namespace)
screen_type = 'initial_screen' \
if handler == "initial_screen" and \
isinstance(screen_content, str) \
else screen_content['type']
ussd_response = _registered_ussd_handlers[screen_type](
ussd_request,
handler,
screen_content,
initial_screen=self.initial_screen,
logger=self.logger
).handle()
ussd_request.session['_ussd_state']['next_screen'] = handler
ussd_request.session['ussd_interaction'].append(
{
"screen_name": handler,
"screen_text": str(ussd_response),
"input": ussd_request.input,
"start_time": utilities.datetime_to_string(datetime.now())
}
)
# Attach session to outgoing response
ussd_response.session = ussd_request.session
return ussd_response
@staticmethod
def validate_ussd_journey(ussd_content: dict) -> (bool, dict):
errors = {}
is_valid = True
# should define initial screen
if not 'initial_screen' in ussd_content.keys():
is_valid = False
errors.update(
{'hidden_fields': {
"initial_screen": ["This field is required."]
}}
)
for screen_name, screen_content in ussd_content.items():
# all screens should have type attribute
if screen_name == "initial_screen" and \
isinstance(screen_content, str):
if not screen_content in ussd_content.keys():
is_valid = False
errors.update(
dict(
screen_name="Screen not available"
)
)
continue
screen_type = screen_content.get('type')
# all screen should have type field.
serialize = UssdBaseSerializer(data=screen_content,
context=ussd_content)
base_validation = serialize.is_valid()
if serialize.errors:
errors.update(
{screen_name: serialize.errors}
)
if not base_validation:
is_valid = False
continue
# all screen type have their handlers
handlers = _registered_ussd_handlers[screen_type]
screen_validation, screen_errors = handlers.validate(
screen_name,
ussd_content
)
if screen_errors:
errors.update(
{screen_name: screen_errors}
)
if not screen_validation:
is_valid = screen_validation
return is_valid, errors