001/*
002 * JPPF.
003 * Copyright (C) 2005-2019 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.management;
020
021import java.net.*;
022import java.util.*;
023import java.util.concurrent.CopyOnWriteArrayList;
024import java.util.concurrent.atomic.*;
025
026import javax.management.MBeanServerConnection;
027import javax.management.remote.*;
028
029import org.jppf.jmx.*;
030import org.jppf.ssl.SSLHelper;
031import org.jppf.utils.*;
032import org.jppf.utils.concurrent.ThreadSynchronization;
033import org.jppf.utils.configuration.JPPFProperties;
034import org.slf4j.*;
035
036/**
037 * Wrapper around a JMX connection, providing a thread-safe way of handling disconnections and recovery.
038 * @author Laurent Cohen
039 */
040public abstract class AbstractJMXConnectionWrapper extends ThreadSynchronization implements JPPFAdminMBean, AutoCloseable {
041  /**
042   * Explicit serialVersionUID.
043   */
044  private static final long serialVersionUID = 1L;
045  /** Logger for this class. */
046  private static Logger log = LoggerFactory.getLogger(AbstractJMXConnectionWrapper.class);
047  /** Determines whether debug log statements are enabled. */
048  private static boolean debugEnabled = LoggingUtils.isDebugEnabled(log);
049  /** Prefix for the name given to the connection thread. */
050  public static String CONNECTION_NAME_PREFIX = "jmx@";
051  /** The timeout in millis for JMX connection attempts. A value of 0 or less means no timeout. */
052  static final long CONNECTION_TIMEOUT = JPPFConfiguration.get(JPPFProperties.MANAGEMENT_CONNECTION_TIMEOUT);
053  /** URL of the MBean server, in a JMX-compliant format. */
054  protected JMXServiceURL url;
055  /** The JMX client. */
056  protected JMXConnector jmxc;
057  /** A connection to the MBean server. */
058  protected AtomicReference<MBeanServerConnection> mbeanConnection = new AtomicReference<>(null);
059  /** The host the server is running on. */
060  protected String host;
061  /** The RMI port used by the server. */
062  protected int port;
063  /** The connection thread that performs the connection to the management server. */
064  protected AtomicReference<JMXConnectionThread> connectionThread = new AtomicReference<>(null);
065  /** A string representing this connection, used for logging purposes. */
066  protected String idString;
067  /** A string representing this connection, used for displaying in the admin conosle. */
068  protected String displayName;
069  /** Determines whether the connection to the JMX server has been established. */
070  protected AtomicBoolean connected = new AtomicBoolean(false);
071  /** Determines whether this connection has been closed by a all to the {@link #close()} method. */
072  protected AtomicBoolean closed = new AtomicBoolean(false);
073  /** Determines whether the connection to the JMX server has been established. */
074  protected boolean local;
075  /** JMX properties used for establishing the connection. */
076  protected Map<String, Object> env = new HashMap<>();
077  /** Determines whether the JMX connection should be secure or not. */
078  protected boolean sslEnabled;
079  /** Used to synchronize during the connection process. */
080  final Object connectionLock = new Object();
081  /** The list of listeners to this connection wrapper. */
082  final List<JMXWrapperListener> listeners = new CopyOnWriteArrayList<>();
083  /** The time at which connection attempts started. */
084  long connectionStart;
085  /** Whether to try to reconnect upon error. */
086  boolean reconnectOnError = true;
087  /**
088   * The JMX remote protocol.
089   */
090  private final String protocol;
091
092  /**
093   * Initialize a local connection (same JVM) to the MBean server.
094   */
095  public AbstractJMXConnectionWrapper() {
096    local = true;
097    idString = displayName = "local";
098    host = "local";
099    this.protocol = JMXHelper.LOCAL_PROTOCOL;
100  }
101
102  /**
103   * Initialize the connection to the remote MBean server.
104   * @param host the host the server is running on.
105   * @param port the port used by the server.
106   * @param sslEnabled specifies whether the jmx connection should be secure or not.
107   */
108  public AbstractJMXConnectionWrapper(final String host, final int port, final boolean sslEnabled) {
109    this(JPPFConfiguration.get(JPPFProperties.JMX_REMOTE_PROTOCOL), host, port, sslEnabled);
110  }
111
112  /**
113   * Initialize the connection to the remote MBean server.
114   * @param protocol the JMX remote protocol to use.
115   * @param host the host the server is running on.
116   * @param port the port used by the server.
117   * @param sslEnabled specifies whether the jmx connection should be secure or not.
118   */
119  public AbstractJMXConnectionWrapper(final String protocol, final String host, final int port, final boolean sslEnabled) {
120    this.protocol = protocol;
121    try {
122      this.host = (NetworkUtils.isIPv6Address(host)) ? "[" + host + "]" : host;
123      this.port = port;
124      this.sslEnabled = sslEnabled;
125      idString = this.host + ':' + this.port;
126      this.displayName = this.idString;
127      //url = new JMXServiceURL("service:jmx:jmxmp://" + idString);
128      url = new JMXServiceURL(protocol, host, port);
129      if (sslEnabled) SSLHelper.configureJMXProperties(protocol, env);
130      if (JMXHelper.JMXMP_PROTOCOL.equals(protocol)) initJMXMP();
131      else initJPPF();
132      ClassLoader cl = Thread.currentThread().getContextClassLoader();
133      if (cl == null) cl = getClass().getClassLoader();
134      env.put(JMXConnectorFactory.PROTOCOL_PROVIDER_CLASS_LOADER, cl);
135      env.put(JMXConnectorFactory.DEFAULT_CLASS_LOADER, cl);
136      if (debugEnabled) log.debug("created {} with sslEnabled={}, url={}, env={}", getClass().getSimpleName(), this.sslEnabled, url, env);
137    } catch(final Exception e) {
138      log.error(e.getMessage(), e);
139    }
140    local = false;
141  }
142
143  /**
144   * Initialize the environment for the JMXMP protocol.
145   * @throws Exception if any error occcurs.
146   */
147  private void initJMXMP() throws Exception {
148    env.put("jmx.remote.object.wrapping", JMXMPServer.newObjectWrapping());
149    env.put(JMXConnectorFactory.PROTOCOL_PROVIDER_PACKAGES, "com.sun.jmx.remote.protocol");
150    env.put("jmx.remote.x.server.max.threads", 1);
151    env.put("jmx.remote.x.client.connection.check.period", 0);
152    env.put("jmx.remote.x.request.timeout", JPPFConfiguration.get(JPPFProperties.JMX_REMOTE_REQUEST_TIMEOUT));
153  }
154
155  /**
156   * Initialize the environment for the JPPF JMX remote protocol.
157   * @throws Exception if any error occcurs.
158   */
159  private void initJPPF() throws Exception {
160    env.put(JMXConnectorFactory.PROTOCOL_PROVIDER_PACKAGES, "org.jppf.jmxremote.protocol");
161    env.put(JPPFJMXProperties.REQUEST_TIMEOUT.getName(), JPPFConfiguration.get(JPPFJMXProperties.REQUEST_TIMEOUT));
162    env.put(JPPFJMXProperties.TLS_ENABLED.getName(), Boolean.valueOf(sslEnabled).toString());
163  }
164
165  /**
166   * Initialize the connection to the remote MBean server.
167   */
168  public abstract void connect();
169
170  /**
171   * Initiate the connection and wait until the connection is established or the timeout has expired, whichever comes first.
172   * @param timeout the maximum time to wait for, a value of zero means no timeout and
173   * this method just waits until the connection is established.
174   * @return {@code true} if the connection was established in the specified time, {@code false} otherwise.
175   */
176  public abstract boolean connectAndWait(final long timeout);
177
178  /**
179   * Initialize the connection to the remote MBean server.
180   * @throws Exception if the connection could not be established.
181   */
182  void performConnection() throws Exception {
183    connected.set(false);
184    final long elapsed;
185    synchronized(this) {
186      elapsed = (System.nanoTime() - connectionStart) / 1_000_000L;
187    }
188    if ((CONNECTION_TIMEOUT > 0L) && (elapsed >= CONNECTION_TIMEOUT)) {
189      fireTimeout();
190      close();
191      return;
192    }
193    synchronized(connectionLock) {
194      if (jmxc == null) {
195        jmxc = JMXConnectorFactory.newJMXConnector(url, env);
196        jmxc.addConnectionNotificationListener((notification, handback) -> {
197          if (JMXConnectionNotification.FAILED.equals(notification.getType())) reset();
198        }, null, null);
199      }
200      jmxc.connect();
201      //connectionThread.get().close();
202      connectionThread.get().setStopped(true);
203      connectionThread.set(null);
204    }
205    synchronized(this) {
206      mbeanConnection.set(jmxc.getMBeanServerConnection());
207      try {
208        setHost(InetAddress.getByName(host).getHostName());
209      } catch (@SuppressWarnings("unused") final UnknownHostException e) {
210      }
211    }
212    connected.set(true);
213    wakeUp();
214    fireConnected();
215    if (debugEnabled) log.debug(getId() + " JMX connection successfully established");
216  }
217
218  /**
219   * Reset the JMX connection and attempt to reconnect.
220   */
221  void reset() {
222    connected.set(false);
223    if (jmxc != null) {
224      try {
225        jmxc.close();
226      } catch(final Exception e2) {
227        if (debugEnabled) log.debug(e2.getMessage(), e2);
228      }
229      jmxc = null;
230    }
231    if (isReconnectOnError()) connect();
232  }
233
234  /**
235   * Get the host the server is running on.
236   * @return the host as a string.
237   */
238  public String getHost() {
239    return host;
240  }
241
242  /**
243   * Get the host the server is running on.
244   * @param host the host as a string.
245   */
246  public void setHost(final String host) {
247    this.host = host;
248    this.displayName = this.host + ':' + this.port;
249  }
250
251  /**
252   * Get a string describing this connection.
253   * @return a string in the format host:port.
254   */
255  public String getId() {
256    return idString;
257  }
258
259  /**
260   * Get the string representing this connection, used for displaying in the admin conosle.
261   * @return the display name as a string.
262   */
263  public String getDisplayName() {
264    return displayName;
265  }
266
267  @Override
268  public String toString() {
269    return  new StringBuilder(getClass().getSimpleName()).append('[').append("url=").append(url).append(", connected=").append(connected)
270      .append(", local=").append(local).append(", secure=").append(sslEnabled).append(']').toString();
271  }
272
273  /**
274   * Add a listener to this connection wrapper.
275   * @param listener the listener to add.
276   */
277  public void addJMXWrapperListener(final JMXWrapperListener listener) {
278    listeners.add(listener);
279  }
280
281  /**
282   * Remove a listener from this connection wrapper.
283   * @param listener the listener to add.
284   */
285  public void removeJMXWrapperListener(final JMXWrapperListener listener) {
286    listeners.remove(listener);
287  }
288
289  /**
290   * Notify all listeners that the connection was successful.
291   */
292  protected void fireConnected() {
293    final JMXWrapperEvent event = new JMXWrapperEvent(this);
294    final Runnable r = new Runnable() {
295      @Override
296      public void run() {
297        for (JMXWrapperListener listener: listeners) listener.jmxWrapperConnected(event);
298      }
299    };
300    //new Thread(r, getDisplayName() + " connection notifier").start();
301    r.run();
302  }
303
304  /**
305   * Notify all listeners that the connection could not be established before reaching the timeout.
306   */
307  protected void fireTimeout() {
308    final JMXWrapperEvent event = new JMXWrapperEvent(this);
309    for (final JMXWrapperListener listener: listeners) listener.jmxWrapperTimeout(event);
310  }
311
312  /**
313   * @return Whether this connection wrapper reconnects on error.
314   * @exclude
315   */
316  public synchronized boolean isReconnectOnError() {
317    return reconnectOnError;
318  }
319
320  /**
321   * Specifiy whether this connection wrapper reconnects on error.
322   * @param reconnectOnError {@code true} to reconnect, {@code false} otherwise.
323   * @exclude
324   */
325  public synchronized void setReconnectOnError(final boolean reconnectOnError) {
326    this.reconnectOnError = reconnectOnError;
327  }
328
329  /**
330   * Get the JMX remote protocol used.
331   * @return the JMX remote protocol string.
332   */
333  public String getProtocol() {
334    return protocol;
335  }
336}