Connection pools
From JPPF 6.3 Documentation
|
Main Page > Development guide > Connection pools |
All server connections in a JPPF client are organized into connection pools, whose number is determined by the client configuration properties. Their size is also based on the configuration and can also be changed dynamically via the JPPF APIs. In the next sections, we will see how connection pools can be configured, explored and programmatically accessed.
1 Connection pools class hierarchy
The class hierarchy for the client-side JPPF connections is as follows:
2 The JPPFConnectionPool API
The ConnectionPool interface and its direct abstract implementation AbstractConnectionPool provide the base methods to explore and access the connections in the pool:
public interface ConnectionPool<E extends AutoCloseable> extends Iterable<E>, AutoCloseable { // Get the next connection that is connected and available E getConnection(); // Determine whether this pool is empty boolean isEmpty(); // Get the current size of this pool int connectionCount(); // Get the maximum size of this connection pool int getSize(); // Set the maximum size of this pool, starting or stopping connections as needed int setSize(int size); // Get a list of connections held by this pool List<E> getConnections(); }
AbstractClientConnectionPool has the accessors to attributes specific to JPPF client connections to a driver:
public abstract class AbstractClientConnectionPool extends AbstractConnectionPool<JPPFClientConnection> { // Get the number of connections in this pool that have one of the specified statuses public int connectionCount(JPPFClientConnectionStatus...statuses) // Get a list of connections whose status is one of the specified statuses public List<JPPFClientConnection> getConnections(JPPFClientConnectionStatus...statuses) // Get the id of this pool public int getId() // Get the priority associated with this pool public int getPriority() // Check whether this pool is for SSL connections public boolean isSslEnabled() // Get the name of this pool. public String getName() // Get the uuid of the driver to which connections in this pool are connected public String getDriverUuid() // Get the host name of the remote driver public String getDriverHost() // Get the ip address of the remote driver public String getDriverIPAddress() // Get the port to use on the remote driver public int getDriverPort() // Get the JPPF client which holds this pool public JPPFClient getClient() // Get the driver's system information public JPPFSystemInformation getSystemInfo() // Get the jmx port to use on the remote driver public int getJmxPort() // Get the maximum number of jobs each connection can process concurrently public int getMaxJobs() // Set the maximum number of jobs each connection can process concurrently public int setMaxJobs(int maxJobs) }
The name of the pool, accessed via the getName() method, can be computed in different ways:
- when the pool is created from a driver discovery plugin, its name is provided directly
- when UDP discovery is disabled, the pools names are read from the jppf.drivers configuration property:
jppf.drivers = <driver_name_1> ... <driver_name_N>
- when UDP discovery is enabled, the names are computed automatically in the form jppf_discovery-n, where n is the order of discovery of the pool, without any other meaning
The same mechanism applies to getDriverHost(), getDriverPort(), getPriority(), isSslEnabled(), getMaxJobs() and getSize().
Note that some of these attributes also have a setter method, which means that they can be changed dynamically y API.
The pool's actual size can be grown or shrunk dynamically, using the setSize(int) method. The JPPF client will create or close connections accordingly. An attempt to set a size equal to the current size will have no effect whatsoever. In some cases, when trying to reduce the connection pool's size, there may be too many connections in the pool that are busy executing jobs and the client will not be able to close all the requested connections. In this case, setSize() will return the new actual size, which may be smaller than the requested size.
Similarly, the number of jobs each connection can handle concurrently can be dynamically changed by calling setMaxJobs(int). By default, this number of jobs is set to Integer.MAX_VALUE, that is, 231 - 1, or 2,147,483,647.
3 Client connections
3.1 The JPPFClientConnection interface
As we have seen, connection pools contain and manage a set of connections from a client to a driver. These connections are represented by the JPPFClientConnection interface, defined as follows:
public interface JPPFClientConnection extends ClientConnectionStatusHandler, AutoCloseable { // Get the priority assigned to this connection int getPriority(); // Shutdown this connection and release all the resources it is using void close(); // Determine whether this connection was closed boolean isClosed(); // Get the name assigned to this client connection String getName(); // Determines if this connection is over SSL boolean isSSLEnabled(); // Get the driver's host name or ip address String getHost(); // Get the port number on which the dirver is listeneing for connections int getPort(); // Get the unique identifier of the remote driver String getDriverUuid(); // Get the system information for the remote driver this connection refers to JPPFSystemInformation getSystemInfo(); // Get the unique ID for this connection and its two channels String getConnectionUuid(); // Get the pool this connection belongs to JPPFConnectionPool getConnectionPool(); // Whether the connection represents a local executor boolean isLocal(); }
Note that most of these methods, except for close(), isClosed(), getConnectionUuid() and getName(), actually delegate to the JPPFConnectionPool to which the connection belongs.
3.2 Status notifications for existing connections
Each server connection has a status that depends on the state of its network connection to the server and whether it is executing a job request. A connection status is represented by the enum JPPFClientConnectionStatus, and has the following possible values: NEW, DISCONNECTED, CONNECTING, ACTIVE, EXECUTING, FAILED or CLOSED.
JPPFClientConnection extends the interface ClientConnectionStatusHandler, which provides the following methods to handle the connection status and register or remove listeners:
public interface ClientConnectionStatusHandler { // Get the status of this connection JPPFClientConnectionStatus getStatus(); // Register a connection status listener with this connection void addClientConnectionStatusListener(ClientConnectionStatusListener listener); // Remove a connection status listener from the registered listeners void removeClientConnectionStatusListener(ClientConnectionStatusListener listener); }
Implementations of ClientConnectionStatusHandler can emit events when the status of the connection changes. To be notified of these events, you need to implement and register a ClientConnectionStatusListener, defined as follows:
public interface ClientConnectionStatusListener extends EventListener { // Called to notify of a status change event on a client connection void statusChanged(ClientConnectionStatusEvent event); }
The events are of type ClientConnectionStatusEvent:
public class ClientConnectionStatusEvent extends EventObject { // Get the connection that is the source of this event public JPPFClientConnection getClientConnection() // Get the connection status before the change public JPPFClientConnectionStatus getOldStatus() }
Here is a sample status listener implementation:
ClientConnectionStatusListener myStatusListener = event -> { // obtain the client connection from the event JPPFClientConnection connection = event.getClientConnection(); // get the new status JPPFClientConnectionStatus status = connection.getStatus(); System.out.println("Connection " + connection.getName() + " status changed to " + status); } // register the listener with an existing connection JPPFClientConnection connection = ...; connection.addClientConnectionStatusListener(myStatusListener);
4 Exploring the connections in a pool
The JPPFConnectionPool class has higher-level methods to explore the connections with filtering and wait for them to be in a specified state:
public class JPPFConnectionPool extends AbstractClientConnectionPool { // Wait for the specified number of connections to be in the ACTIVE status public List<JPPFClientConnection> awaitActiveConnections(ComparisonOperator operator, int nbConnections) // Wait for the a connection to be in the ACTIVE state public JPPFClientConnection awaitActiveConnection() // Wait for the specified number of connections to be in theACTIVE or EXECUTING state public List<JPPFClientConnection> awaitWorkingConnections(ComparisonOperator operator, int nbConnections) // Wait for a connection to be in the ACTIVE or EXECUTING} state public JPPFClientConnection awaitWorkingConnection() // Wait for the given number of connections to be in one of the given states public List<JPPFClientConnection> awaitConnections(ComparisonOperator operator, int nbConnections, JPPFClientConnectionStatus...statuses) // Wait for a connection to be in one of the specified states public JPPFClientConnection awaitConnection(JPPFClientConnectionStatus...statuses) // Wait for the given number of connections to be in one of the given states, // or for the given timeout to expire, whichever happens first public List<JPPFClientConnection> awaitConnections(ComparisonOperator operator, int nbConnections, long timeout, JPPFClientConnectionStatus...statuses) }
The ComparisonOperator parameter is typically, but not necessarily, an element of the Operator enum, which defines the most common conditions that the number of connections must satisfy:
public enum Operator implements ComparisonOperator { // The number of connections is equal to the expected number EQUAL, // The number of connections is different from the expected number NOT_EQUAL, // The number of connections is at least to the expected number AT_LEAST, // The number of connections is at most the expected number AT_MOST, // The number of connections is strictly greater than the expected number MORE_THAN, // The number of connections is strictly less than the expected number LESS_THAN }
Examples:
JPPFConnectionPool pool = ...; // wait until more than 2 connections in the pool are active List<JPPFClientConnection> list = pool.awaitActiveConnections(Operator.MORE_THAN, 2); // with the same condition defined as a lambda expression list = pool.awaitActiveConnections((actual, expected) -> actual > expected, 2); // wait for no more than 5 seconds or until at least one connection is closed list = pool.awaitConnections(Operator.AT_LEAST, 1, 5000L, JPPFClientConnectionStatus.CLOSED);
5 Associated JMX connection pool
Each connection pool has an associated pool of JMX connections to the same remote driver. To access and manipulate this JMX pool, the AbstractClientConnectionPool class provides the following API:
public abstract class AbstractClientConnectionPool extends AbstractConnectionPool<JPPFClientConnection> implements Comparable<AbstractClientConnectionPool> { // Get a <i>connected</i> JMX connection among those in the JMX pool public JMXDriverConnectionWrapper getJmxConnection() // Get a JMX connection among those in the JMX pool public JMXDriverConnectionWrapper getJmxConnection(boolean connectedOnly) // Get the current maximum size of the associated JMX connection pool public int getJMXPoolSize() // Set a new size for the associated pool of JMX connections, //adding new or closing existing connections as needed public int setJMXPoolSize(int maxSize) }
As we can see, the JMX pool handles connections of type JMXDriverConnectionWrapper, which is a wrapper around an MBeanServerConnection, connected to the MBean server of a remote JPPF driver.
Please note that the JMX pool size, when left unspecified, defaults to 1.
JPPFConnectionPool has higher-level methods to explore the JMX connections with filtering, and wait for them to be in a specified state:
public class JPPFConnectionPool extends AbstractClientConnectionPool { // Wait for the specified number of JMX connections to be in the specified state public List<JMXDriverConnectionWrapper> awaitJMXConnections( ComparisonOperator operator, int nbConnections, boolean connectedOnly) // Wait for the specified number of JMX connections to be in the specified state, // or the specified timeout to expire, whichever happens first public List<JMXDriverConnectionWrapper> awaitJMXConnections( ComparisonOperator operator, int nbConnections, long timeout, boolean connectedOnly) // Wait for a JMX connection to be in the specified state public JMXDriverConnectionWrapper awaitJMXConnection(boolean connectedOnly) }
6 Exploring the connection pools
The JPPFClient class, or more exactly its super-super class AbstractJPPFClient, provides a number of methods to discover and explore the connection pools currently handled by the client:
public class JPPFClient extends AbstractGenericClient { ... } public abstract class AbstractGenericClient extends AbstractJPPFClient { ... } public abstract class AbstractJPPFClient implements ClientConnectionStatusListener, AutoCloseable { // Find the connection pool with the specified priority and id public JPPFConnectionPool findConnectionPool(int priority, int poolId) // Find the connection pool with the specified id public JPPFConnectionPool findConnectionPool(int poolId) // Find the connection pool with the specified name public JPPFConnectionPool findConnectionPool(String name) // Find the connection pools whose name matches the specified regular expression public List<JPPFConnectionPool> findConnectionPools(String name) // Find the connection pools that have at least one connection matching // one of the specified statuses public List<JPPFConnectionPool> findConnectionPools(JPPFClientConnectionStatus...statuses) // Get a set of existing connection pools with the specified priority public List<JPPFConnectionPool> getConnectionPools(int priority) // Get a list of all priorities for the currently existing pools in descending order public List<Integer> getPoolPriorities() // Get a list of existing connection pools, ordered by descending priority public List<JPPFConnectionPool> getConnectionPools() // Get a pool with the highest possible priority that has at least 1 active connection public JPPFConnectionPool getConnectionPool() // Get the connection pools that pass the specified filter public List<JPPFConnectionPool> findConnectionPools(ConnectionPoolFilter<JPPFConnectionPool> filter) }
Note that the connection pools are held in a multimap-like data structure, where the key is the pool priority sorted in descending order (highest priority first). Consequently, all getXXX() and findXXX() methods which return a list of connection pools are guaranteed to have the resulting elements od the list sorted by order of descending priority.
The last findConnectionPools() method provides a generic way of filtering the existing connection pools, by making use of a ConnectionPoolFilter, defined as follows:
public interface ConnectionPoolFilter<E extends ConnectionPool> { // Determine whether this filter accepts the specified connection pool boolean accepts(E pool); }
In addition to this, JPPFClient provides a set of methods which wait until one or more conenction pools fulfill a specified condition, and return a list of pools which satisfy the condition:
public class JPPFClient extends AbstractGenericClient // Wait for at least one connection pool with at least one connection in ACTIVE status public JPPFConnectionPool awaitActiveConnectionPool() // Wait for at least one connection pool with at least one connection // in ACTIVE or EXECUTING status public JPPFConnectionPool awaitWorkingConnectionPool() // Wait for at least one connection pool with at least one connection // in one of the specified statuses public JPPFConnectionPool awaitConnectionPool(JPPFClientConnectionStatus...statuses) // Wait for at least one connection pool with at least one connection in one of the // specified statuses or the specified timeout (in ms) expires, whichever happens first public JPPFConnectionPool awaitConnectionPool( long timeout, JPPFClientConnectionStatus...statuses) // Wait for at least one connection pool with at least one connection in one of the // specified statuses or the specified timeout (in ms) expires, whichever happens first public List<JPPFConnectionPool> awaitConnectionPools( long timeout, JPPFClientConnectionStatus...statuses) }
7 Connection Pool Events
The JPPF client API allows the registration or unregistration of listeners to connection pool events: connection pools added to or removed from the client, or connections added to or removed from a connection pool. This can be done in two ways:
1) From a JPPFClient constructor:
public class JPPFClient extends AbstractGenericClient { // Initialize this client with an automatically generated application UUID public JPPFClient(final ConnectionPoolListener... listeners) // Initialize this client with the specified UUID and listeners public JPPFClient(final String uuid, final ConnectionPoolListener... listeners) { }
2) Using the related add and remove methods in the grand parent of JPPFClient:, AbstractJPPFClient:
public abstract class AbstractJPPFClient implements ClientConnectionStatusListener, AutoCloseable { // Add a listener to the list of listeners to this client public void addConnectionPoolListener(final ConnectionPoolListener listener) // Remove a listener from the list of listeners to this client public void removeConnectionPoolListener(final ConnectionPoolListener listener) }
As we can see in the methods signatures, the listeners implement the interface ConnectionPoolListener, is defined as:
public interface ConnectionPoolListener extends EventListener { // Called when a new connection pool is created void connectionPoolAdded(ConnectionPoolEvent event); // Called when a connection pool removed void connectionPoolRemoved(ConnectionPoolEvent event); // Called when a new connection is created void connectionAdded(ConnectionPoolEvent event); // Called when a connection pool is removed void connectionRemoved(ConnectionPoolEvent event); }
Note that, if you do not wish to implement all the methods in ConnectionPoolListener, you can instead extend the adapter class ConnectionPoolListenerAdapter, which implments each method of the interface as an empty method.
All notification methods receive an event of type ConnectionPoolEvent, defined as follows:
public class ConnectionPoolEvent extends EventObject { // Get the source of this event public JPPFConnectionPool getConnectionPool() // Get the connection that triggered this event, if any public JPPFClientConnection getConnection() }
Please note that, in the case of a connectionPoolAdded() or connectionPoolAdded() notification, the method getConnection() will return nulll, since no connection is involved.
8 Putting it all together
We will illustrate how client, connection pool events and connection events fit together, with an example which prints the status of all connections created by the client:
// this status listener prints the old and new status of each connection ClientConnectionStatusListener myStatusListener = event -> { // obtain the client connection from the event JPPFClientConnection connection = (JPPFClientConnection) event.getClientConnectionStatusHandler(); // get the new and old status JPPFClientConnectionStatus newStatus = connection.getStatus(); JPPFClientConnectionStatus oldStatus = event.getOldStatus(); // print the connection name, old status and new status System.out.println(connection.getName() + " status changed from " + oldStatus + " to " + newStatus); }; // create a connection pool listener which registers // a status listener on new connections ConnectionPoolListener myPoolListener = new ConnectionPoolListenerAdapter() { @Override public void connectionAdded(final ConnectionPoolEvent event) { // obtain the connection pool and client connection from the event JPPFConnectionPool pool = event.getConnectionPool(); JPPFClientConnection connection = event.getConnection(); System.out.println("connection " + connection + " added to pool " + pool); // add the status listener to the connection connection.addClientConnectionStatusListener(myListener); } @Override public void connectionRemoved(final ConnectionPoolEvent event) { // obtain the connection pool and client connection from the event JPPFConnectionPool pool = event.getConnectionPool(); JPPFClientConnection connection = event.getConnection(); System.out.println("connection " + connection + " removed from pool " + pool); // remove the status listener from the connection connection.removeClientConnectionStatusListener(myListener); } } // create a JPPFClient with our connection pool listener JPPFClient client = new JPPFClient(myPoolListener);
Main Page > Development guide > Connection pools |