Source code for rasahub.plugin

from __future__ import unicode_literals

from rasahub.message import RasahubMessage

import threading
import sys
import time
import json

is_py2 = sys.version[0] == '2'
if is_py2:
    import Queue as queue
else:
    import queue as queue

[docs]class RasahubPlugin(object): """Main class for a plugin. """ def __init__(self): """Creates message sending queue. """ self.outputqueue = queue.Queue() self.in_event = threading.Event() self.out_event = threading.Event() self.target = '' self.name = ''
[docs] def start(self, main_queue): """Starts sending and receiving threads. Args: main_queue: Main message queue of handler """ self.main_queue = main_queue self.receiving = threading.Thread(target = self.in_thread, args = (self.main_queue, self.in_event,)) self.sending = threading.Thread(target = self.out_thread, args = (self.outputqueue, self.main_queue, self.out_event,)) self.receiving.start() self.sending.start()
return True
[docs] def end_process(self): """Safely closes threads. """ self.outputqueue.join() print(self.name + " queue joined..") self.in_event.set() print(self.name + " in threads closed..") self.out_event.set() print(self.name + " out threads closed..") self.end()
return True
[docs] def add_target(self, classname): """Adds target to plugin. Args: classname: Classname of target plugin """
self.target = classname
[docs] def set_name(self, pluginname): """Sets name of plugin. Args: pluginname: Name of the plugin """
self.name = pluginname
[docs] def in_thread(self, main_queue, run_event): """Input message thread. Args: main_queue: Main message queue of Handler run_event: Thread run event """ while (not run_event.is_set()): in_message = self.receive() if in_message is not None: # add source and target to message message = RasahubMessage(message = in_message['message'], message_id = in_message['message_id'], source = self.name, target = self.target)
main_queue.put(message) #time.sleep(0.5)
[docs] def out_thread(self, outputqueue, main_queue, run_event): """Output message thread. Args: outputqueue: Output queue of plugin main_queue: Main message queue of Handler run_event: Thread run event """ while (not run_event.is_set()): try: out_message = outputqueue.get(False) if len(out_message.message) > 0 and out_message.message[0] == '$': # find escape characters in message string first_index = out_message.message.find('$') second_index = out_message.message[first_index+1:].find('$') command = out_message.message[first_index+1:second_index+1] payload = {} if len(out_message.message[second_index+2:]) > 0: payload['args'] = json.loads(out_message.message[second_index+2:]) payload['message_id'] = out_message.message_id payload['message_source'] = out_message.source payload['message_target'] = out_message.target out_message = self.process_command(command, payload, out_message) # check target after processing if out_message is not None: if out_message.target == self.name: self.send(out_message, main_queue) # free space del out_message else: main_queue.put(out_message) outputqueue.task_done() except queue.Empty:
pass
[docs] def send(self, messagedata): """Sending function, to be implemented by plugin. """
raise NotImplementedError
[docs] def receive(self): """Receiving function, to be implemented by plugin. """
raise NotImplementedError
[docs] def process_command(self, message, out_message): """Output message hook, to be implemented by plugin. """
raise NotImplementedError
[docs] def end(self): """Function to close connections etc., to be implemented by plugin. """
raise NotImplementedError