JGroups не формируют кластер с UDP

Я пытаюсь создать протокол выбора лидера, используя JGroups, чтобы N экземпляров моей программы могли выбрать мастера, а все клиенты получили IP-адреса этого мастера. Более или менее текущая реализация опирается на каждый экземпляр, пытающийся получить блокировку на канале блокировки, и когда он успешно получает этот канал, он становится главным, а все остальные переключаются на клиентов.

import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.*;
import org.jgroups.*;
import org.jgroups.blocks.locking.LockService;

public class AutoDiscovery
{
    static org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(AutoDiscovery.class); //used for logging purposes (see log4j library)
    /* this variable indicates whether I have become the master or I'm just a client*/
    public volatile AtomicBoolean becomeMaster = new AtomicBoolean(false);
    /* The address of the server if we are a client or of ourself if we are
     * server */
    public String serverAddress;
    /* A channel on which to acquire a lock, so that only one can become server */
    private JChannel lockChannel;
    /* A shared channel ffor communication between client and master*/
    private JChannel communicationChannel;
    private LockService lockService;
    /* A thread which tries to acquire a lock */
    private Thread acquiringThread;
    /* A thread which listens for the server ip which may change */
    private Thread listeningThread;
    /* A thread which lists the status and initializes the acquiring thread*/
    private Thread statusThread;
    private String name;
    /* If we pass from being a client to being a server we must stop the listening
     * thread however we cannot call listeningThread.stop() but instead we change
     * the stopListening boolean to true */
    private boolean stopListening = false;
    /* This lock communicates I have finally become either master or client so
     * the serverAddress and becomeMaster variables are correctly set */
    public final Object finishedLock = new Object();

    public static void main(String[] args) throws Exception
    {
        Thread.currentThread().setName("MyMainThread");
        Random rand = new Random();

        AutoDiscovery master = new AutoDiscovery("Node" + rand.nextInt(10));

        master.lockChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml"));
        master.lockChannel.connect("lock-channel");

        master.communicationChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml"));
        master.communicationChannel.connect("communication-channel");

        master.lockService = new LockService(master.lockChannel);
        master.startStatusPrinterThread();
    }

    public AutoDiscovery(String name)
    {
        this.name = name;
    }

    public AutoDiscovery()
    {
        try
        {
            Thread.currentThread().setName("MyMainThread");
            Random rand = new Random();

            this.name = ("Node" + rand.nextInt(10));

            lockChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml"));
            lockChannel.connect("lock-channel");

            communicationChannel = new JChannel(AutoDiscovery.class.getResource("/resource/udp.xml"));
            communicationChannel.connect("communication-channel");

            lockService = new LockService(lockChannel);
            startStatusPrinterThread();
        }
        catch (Exception ex)
        {
            Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public void startAcquiringThread()
    {
        acquiringThread = new Thread()
        {
            @Override
            public void run()
            {
                while (true)
                {
                    //if you have become Master send your ip every now and then
                    if (becomeMaster.get())
                    {
                        try
                        {
                            communicationChannel.send(new Message(null, null, "serverip " + serverAddress));
                        }
                        catch (Exception ex)
                        {
                            Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, ex);
                        }
                    }
                    else
                    {
                        try
                        {
                            Thread.currentThread().setName(name + "AcquiringThread");
                            Lock lock = lockService.getLock("serverLock");
                            if (lock.tryLock(4, TimeUnit.SECONDS))
                            {
                                becomeMaster.set(true);
                                stopListening = true;
                                /* Now that I'm server I must find out my own ip address on which to listen */
                                Enumeration<NetworkInterface> networkInterfaces;
                                try
                                {
                                    networkInterfaces = NetworkInterface.getNetworkInterfaces();
                                    for (NetworkInterface netint : Collections.list(networkInterfaces))
                                    {
                                        Enumeration<InetAddress> inetAddresses = netint.getInetAddresses();
                                        for (InetAddress inetAddress : Collections.list(inetAddresses))
                                        {
                                            if (isIPAddress(inetAddress.getHostAddress())
                                                    && !inetAddress.getHostAddress().equals("127.0.0.1"))
                                            {
                                                serverAddress = inetAddress.getHostAddress();
                                            }
                                        }
                                    }
                                    /* I notify to the rest of the program I have correctly initialized 
                                     * becomeMaster and serverAddress */
                                    synchronized (finishedLock)
                                    {
                                        finishedLock.notify();
                                    }
                                }
                                catch (Exception e)
                                {
                                    Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, e);
                                    System.exit(0);
                                }
                                log.info(Thread.currentThread().getName() + ": I acquired lock! will become master! my ip is " + serverAddress);
                            }
                            else
                            {
                                becomeMaster.set(false);
                                stopListening = false;
                                if (listeningThread == null || !listeningThread.isAlive())
                                {
                                    if (!stopListening) //??? this codnition might be useless
                                    {
                                        startListeningThread();
                                    }
                                }
                            }
                        }
                        catch (Exception e)
                        {
                            e.printStackTrace();
                        }
                    }
                    try
                    {
                        sleep(5000L);
                    }
                    catch (InterruptedException ex)
                    {
                        Logger.getLogger(AutoDiscovery.class.getName()).log(Level.SEVERE, null, ex);
                    }
                }
            }
        };
        acquiringThread.setDaemon(true);
        acquiringThread.start();
    }

    public void startListeningThread()
    {
        listeningThread = new Thread()
        {
            @Override
            public void run()
            {
                try
                {
                    while (true)
                    {
                        Thread.currentThread().setName(name + "ListeningThread");
                        communicationChannel.setReceiver(new ReceiverAdapter()
                        {
                            @Override
                            public void receive(Message msg)
                            {
                                if (msg.getObject() != null)
                                {
                                    String leaderServerAddress = (msg.getObject().toString().substring(9));
                                    if (isIPAddress(leaderServerAddress))
                                    {
                                        serverAddress = leaderServerAddress;
                                        log.info(name + " Master server has ip" + serverAddress);
                                        /* I notify to the rest of the program I have correctly initialized 
                                         * becomeMaster and serverAddress */
                                        synchronized (finishedLock)
                                        {
                                            finishedLock.notify();
                                        }
                                    }
                                    else
                                    {
                                        log.info(name + ": discarded message " + msg.getObject().toString());
                                    }
                                }
                            }
                        });
                        sleep(10000L);
                        if (stopListening)
                        {
                            return;
                        }
                    }
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                }
            }
        };
        listeningThread.setDaemon(true);
        listeningThread.start();
    }

    private void startStatusPrinterThread()
    {
        statusThread = new Thread()
        {
            @Override
            public void run()
            {
                Thread.currentThread().setName(name + "StatusPrinterThread");
                startAcquiringThread();
                while (true)
                {
                    try
                    {
                        if (becomeMaster.get())
                        {
                            log.info(name + " startStatusPrinterThread(): I am happily a Master!");
                        }
                        else
                        {
                            if (!acquiringThread.isAlive())
                            {
                                startAcquiringThread();
                            }
                        }
                        sleep(5000L);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                }
            }
        };
        statusThread.setDaemon(true);
        statusThread.start();
    }

    private static boolean isIPAddress(String str)
    {
        Pattern ipPattern = Pattern.compile("^([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
                + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
                + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
                + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])$");
        return ipPattern.matcher(str).matches();
    }
}

теперь мой текущий udp.xml

<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.0.xsd">
    <UDP
        mcast_port="${jgroups.udp.mcast_port:45588}"
         tos="8"
         ucast_recv_buf_size="20M"
         ucast_send_buf_size="640K"
         mcast_recv_buf_size="25M"
         mcast_send_buf_size="640K"
         loopback="true"
         level="WARN"
         log_discard_msgs="false"
         max_bundle_size="64K"
         max_bundle_timeout="30"
         ip_ttl="${jgroups.udp.ip_ttl:8}"
         enable_diagnostics="true"
         thread_naming_pattern="cl"

         timer_type="new"
         timer.min_threads="4"
         timer.max_threads="10"
         timer.keep_alive_time="3000"
         timer.queue_max_size="500"

         thread_pool.enabled="true"
         thread_pool.min_threads="2"
         thread_pool.max_threads="8"
         thread_pool.keep_alive_time="5000"
         thread_pool.queue_enabled="true"
         thread_pool.queue_max_size="10000"
         thread_pool.rejection_policy="discard"

         oob_thread_pool.enabled="true"
         oob_thread_pool.min_threads="1"
         oob_thread_pool.max_threads="8"
         oob_thread_pool.keep_alive_time="5000"
         oob_thread_pool.queue_enabled="false"
         oob_thread_pool.queue_max_size="100"
         oob_thread_pool.rejection_policy="Run"/>

    <PING timeout="2000"
            num_initial_members="3"/>
    <MERGE2 max_interval="30000"
            min_interval="10000"/>
    <FD_SOCK/>
    <FD_ALL/>
    <VERIFY_SUSPECT timeout="1500"  />
    <BARRIER />
    <pbcast.NAKACK exponential_backoff="300"
                   xmit_stagger_timeout="200"
                   use_mcast_xmit="false"
                   discard_delivered_msgs="true"/>
    <UNICAST />
    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                   max_bytes="4M"/>
    <pbcast.GMS print_local_addr="true" join_timeout="3000"
                view_bundling="true"/>
    <UFC max_credits="2M"
         min_threshold="0.4"/>
    <MFC max_credits="2M"
         min_threshold="0.4"/>
    <FRAG2 frag_size="60K"  />
    <pbcast.STATE_TRANSFER />
    <CENTRAL_LOCK />
    <!-- pbcast.FLUSH  /-->
</config> 

Теперь вышесказанное работает, когда я запускаю N экземпляров программы на одной и той же машине (N-1 участники становятся клиентами, а 1 - основными). При запуске на двух разных компьютерах, подключенных к одной и той же локальной сети, по-видимому, после вызова JChannel.connect() с одинаковым именем кластера в каждом члене, каждый участник создает свой канал, и общий кластер не создается. В результате при отправке сообщений клиентам другой мастер видит другой физический адрес для того же имени кластера, и все сообщения отбрасываются.

Поэтому я получаю такие предупреждения, как:

7683 [Incoming-1,communication-channel,pc-home-41714] WARN org.jgroups.protocols.pbcast.NAKACK  - [JGRP00011] pc-home-41714: dropped message 293 from non-member cf8b4ea6-8cc8-cb21-538f-b03f3fa7413d (view=[pc-home-41714|0] [pc-home-41714])

1207996 [TransferQueueBundler,communication-channel,pc-home-5280] WARN org.jgroups.protocols.UDP  - pc-home-5280: no physical address for cf8b4ea6-8cc8-cb21-538f-b03f3fa7413d, dropping message
1209526 [TransferQueueBundler,lock-channel,pc-home-59082] WARN org.jgroups.protocols.UDP  - pc-home-59082: no physical address for efbe6408-0e21-d119-e2b8-f1d5762d9b45, dropping message

Если я изменяю udp.xml loopback="true" на loopback="false", то происходит то, что они оба подключаются к одному кластеру, но затем выдают ошибку вроде:

55539 [Node0StatusPrinterThread] INFO plarz.net.planningig.autodiscovery.AutoDiscovery  - Node0 startStatusPrinterThread(): I am happily a Master!
59077 [TransferQueueBundler,lock-channel,pc-test-6919] ERROR org.jgroups.protocols.UDP  - pc-test-6919: exception sending bundled msgs: java.lang.Exception: dest=/fe80:0:0:0:226:18ff:fece:6ccc%2:43109 (130 bytes):, cause: java.io.IOException: Network is unreachable
59505 [TransferQueueBundler,communication-channel,pc-test-35303] ERROR org.jgroups.protocols.UDP  - pc-test-35303: exception sending bundled msgs: java.lang.Exception: dest=/fe80:0:0:0:226:18ff:fece:6ccc%2:55053 (139 bytes):, cause: java.io.IOException: Network is unreachable

1 ответ

Ошибка:

[Сервер:ha-server-3] 13:59:13,122 ПРЕДУПРЕЖДЕНИЕ [org.jgroups.protocols.UDP] (OOB-15, null) null: нет физического адреса для 766de5c9-8ac2-6d30-89ef-78d39aa5f7eb, удаление сообщения

В моем случае это было связано с наличием нескольких кластеров jboss в одной сети, и каждый из них имел одинаковое имя. Например, ha-server-1 и ha-server-2 существовали в двух разных кластерах на разных машинах.

Кластер-1(10.10.10.10): | +- ха-сервер-1 +- ха-сервер-2

Кластер-2(10.10.10.20): | +- ха-сервер-1 +- ха-сервер-2

Я решил эту проблему, изменив имена ha-сервера. Примечание: оба были независимым кластером. Я предполагаю, что это произошло из-за многоадресной проблемы JGroups. Любое дальнейшее объяснение от эксперта, как вы, будет приятно.

обратитесь к http://icfun.blogspot.com/2013/10/no-physical-address-for-766de5c9-8ac2.html

Другие вопросы по тегам