import asyncore
import time
import logging
import sys
import traceback
from yowsup import stacks
from yowsup_celery.layer import CeleryLayer
from yowsup_celery import exceptions
from yowsup.layers import YowLayerEvent
from yowsup.layers.auth import AuthError
from yowsup.layers.network import YowNetworkLayer
try:
import Queue
except ImportError:
import queue as Queue
logger = logging.getLogger(__name__)
[docs]class YowsupStack(stacks.YowStack):
"""
Gateway for Yowsup in a client API way
:ivar bool listening: asyncore loop task in execution
:ivar Queue detached_queue: Queue with callbacks to execute after
:ivar YowLayerInterface facade:layer interface on top of stack
disconnection
"""
def __init__(self, credentials, encryption=False, top_layers=None):
"""
:param credentials: number and registed password
:param bool encryptionEnabled: E2E encryption enabled/ disabled
:params top_layers: tuple of layer between :class:`yowsup_gateway.layer.CeleryLayer`
and Yowsup Core Layers
"""
top_layers = top_layers + (CeleryLayer,) if top_layers else (CeleryLayer,)
layers = stacks.YowStackBuilder.getDefaultLayers(axolotl=encryption) + top_layers
try:
super(YowsupStack, self).__init__(layers, reversed=False)
except ValueError as e:
raise exceptions.ConfigurationError(e.args[0])
self.setCredentials(credentials)
self.detached_queue = Queue.Queue()
self.facade = self.getLayerInterface(CeleryLayer)
self.listening = False
[docs] def execDetached(self, fn):
return self.detached_queue.put(fn)
[docs] def cleanup(self):
self.listening = False
self.detached_queue = Queue.Queue()
[docs] def asynloop(self, auto_connect=False, timeout=10, detached_delay=0.2):
"""
Non-blocking event loop consuming messages until connection is lost,
or shutdown is requested.
:param int timeout: number of secs for asyncore timeout
:param float detached_delay: float secs to sleep when exiting asyncore loop and execution detached queue
callbacks
"""
if auto_connect:
self.broadcastEvent(YowLayerEvent(YowNetworkLayer.EVENT_STATE_CONNECT))
try:
self.listening = True
start = int(time.time())
while True:
asyncore.loop(timeout)
time.sleep(detached_delay)
try:
# Execute from detached queue callback
callback = self.detached_queue.get(False)
callback()
except Queue.Empty:
pass
if int(time.time()) - start > timeout:
logger.info("Asynloop : Timeout")
# defensive code should be already disconneted
if self.facade.connected():
self.broadcastEvent(YowLayerEvent(YowNetworkLayer.EVENT_STATE_DISCONNECT))
break
self.cleanup()
except AuthError as e:
self.cleanup()
raise exceptions.AuthenticationError("Authentication Error: {0}".format(e))
except:
self.cleanup()
exc_info = sys.exc_info()
traceback.print_exception(*exc_info)
raise exceptions.UnexpectedError(str(exc_info[0]))