Stomp.py Reconnect to activemq after disconnect

Сообщение об ошибке

Deprecated function: The each() function is deprecated. This message will be suppressed on further calls в функции menu_set_active_trail() (строка 2405 в файле /var/www/notes7/includes/menu.inc).

Менеджер очередей activemq, брокер stomp.py

Была задача создать клиента, спраляющегося с дисконектами.

Пример описанный тут: https://jasonrbriggs.github.io/stomp.py/api.html#dealing-with-disconnects , не сказать, что рабочий.

После некоторых правок, столкнулся с проблемой, указанной здесь.

В итоге получилось создать работающий вариант:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import os
import time
import stomp
import logging
import threading
import signal

reload(sys)
sys.setdefaultencoding('utf-8')

console = logging.StreamHandler()
console.setFormatter(logging.Formatter('[%(asctime)s] %(name)-12s %(levelname)-8s %(message)s'))
logging.getLogger().addHandler(console)
logging.getLogger().setLevel(logging.DEBUG)
LOGGER = logging.getLogger('vstp')


class MyListener(stomp.ConnectionListener):
    def __init__(self, conn):
        self.conn = conn


    def on_disconnected(self):
        logging.info("stomp disconnected, try to reconnect")
        with self.conn.need_reconnect:
            self.conn.need_reconnect.notify()


class ConnectThread(threading.Thread):
    def __init__(self,conn):
        threading.Thread.__init__(self)
        self.daemon = True
        self.conn = conn
    def run(self):
        while True:
            try:
                if not self.conn.is_connected():
                    logging.info("I AM HERE TO CONNECT")
                    self.conn.start()
                    self.conn.connect(wait=True)
                    self.conn.subscribe(destination="/queue/test", id="foo-1", ack='client-individual')
                    with self.conn.need_reconnect:
                        self.conn.need_reconnect.wait()
            except Exception as ex:
                template = "An exception of type {0} occurred. Arguments:\n{1!r}"
                message = template.format(type(ex).__name__, ex.args)
                logging.warning(message)
                time.sleep(5)



class ActiveDaemon(object):
    def __init__(self):
        self.stop = False
        signal.signal(signal.SIGINT, self.go_down)

        self.conn = stomp.Connection([('localhost', 61613)], keepalive=True, reconnect_attempts_max=1, heartbeats=(4000, 4000))
        self.conn.need_reconnect = threading.Condition()
        self.conn.set_listener('', MyListener(self.conn))

    def run(self):
        c = ConnectThread(self.conn)
        c.start()
        
        while not self.stop:
            time.sleep(2)
        
        self.conn.unsubscribe(destination="/queue/test", id="foo-1")
        self.conn.disconnect(receipt=None)

    def go_down(self, signum, frame):
        logging.info("got exit command")
        self.stop = True
      
if __name__ == "__main__":
    daemon = ActiveDaemon()
    daemon.run()

Комментарии

Thanks a lot, it really worked and handles the disconnects due to network failures.

Добавить комментарий

CAPTCHA