001    // Copyright (c) 2001 Hursh Jain (http://www.mollypages.org) 
002    // The Molly framework is freely distributable under the terms of an
003    // MIT-style license. For details, see the molly pages web site at:
004    // http://www.mollypages.org/. Use, modify, have fun !
005    
006    package fc.jdbc;
007    
008    import java.sql.*;
009    import java.io.*;
010    import java.util.*;
011    import java.util.concurrent.*;
012    
013    import fc.io.*;
014    import fc.util.*;
015    
016    //import EDU.oswego.cs.dl.util.concurrent.*;
017    
018    /**  
019    A pooled connection manager. Connections must be closed when done
020    with, which automatically returns them back to the pool. 
021    
022    @author hursh jain
023    **/
024    public class PooledConnectionMgr extends ConnectionMgr
025    {
026    /*
027    we don't really have any way to get all connections in
028    the buffer and close them all - the bounderbuffer class
029    doesn't allow this. So we need this class
030    */
031    class MyBoundedBuffer extends ArrayBlockingQueue
032      {
033      int bufsize;
034      
035      MyBoundedBuffer(int size) {
036        super(size);
037        this.bufsize = size;
038        }
039        
040      //closes all connections, even ones that are checked out
041      void closeAllConnections() 
042        {
043        Object[] items = toArray();
044        for (int n = 0; n < items.length; n++)
045          {
046          try {
047            PooledConnection con = (PooledConnection) items[n];
048            if (con != null) {
049              if (! con.isClosed() ) {  
050                log.warn("PooledConnectionMgr shutdown: closing open database connection #" + con.id);
051                }
052              con.closeReally();
053              }
054            }
055          catch (SQLException e)
056            {
057            log.warn(e);
058            }
059          }
060        }
061    }
062    
063    public static final int defaultSize = 100;
064    
065    MyBoundedBuffer   buffer;
066    boolean       dbgCheckOut;
067    Map         checkedOutInfo; //useful for debugging pool leaks
068    
069    /** If a connection cannot be obtained within this time,
070    then a warning will be logged. The value is in milliseconds.
071    */
072    public static final long ConnectionTimeoutWarning = 1500;
073    
074    /** 
075    Constructs a new connection pool with the specified size.
076    {@see ConnectionMgr} 
077    **/
078    public PooledConnectionMgr(
079        String jdbc_url, String jdbc_driver, 
080        String jdbc_user, String jdbc_password, 
081        String jdbc_catalog,
082        int size) 
083    throws Exception
084      {
085      super(jdbc_url, jdbc_driver, jdbc_user, jdbc_password, jdbc_catalog);
086      init(size);
087      }
088    
089    /** 
090    Constructs a new connection pool with the specified size. The
091    size is the number of connections that this pool holds.
092    **/
093    public PooledConnectionMgr(PropertyMgr props, int size) 
094    throws Exception
095      { 
096      super(props);
097      init(size);
098      }
099    
100    /** 
101    Constructs a new connection pool with the specified size. The
102    size is the number of connections that this pool holds.
103    **/
104    public PooledConnectionMgr(PropertyMgr props, String prefix, int size) 
105    throws Exception
106      { 
107      super(props, prefix);
108      init(size);
109      }
110      
111    /** 
112    Constructs a new connection pool with the specified size. The
113    size is the number of connections that this pool holds. 
114    **/
115    public PooledConnectionMgr(
116        SystemLog log, PropertyMgr props, int size) 
117    throws Exception
118      { 
119      super(log, props);
120      init(size);
121      }
122    
123    /** 
124    Constructs a new connection pool with the specified size. The
125    size is the number of connections that this pool holds. 
126    **/
127    public PooledConnectionMgr(
128        SystemLog log, PropertyMgr props, String prefix, int size) 
129    throws Exception
130      { 
131      super(log, props, prefix);
132      init(size);
133      }
134      
135    private void init(int size) throws Exception
136      {
137      if (size <= 0) {
138        throw new IllegalArgumentException("size must be greater than 0");
139        }
140      
141      buffer = new MyBoundedBuffer(size);
142      log.bug("PooledConnectionMgr.constructor: Constructed new connection pool buffer: ", buffer);
143    
144      for (int n = 0; n < size; n++) 
145        {
146        Connection con = DriverManager.getConnection(url,user,password);
147        if (con == null) {
148          throw new Exception("PooledConnectionMgr: Cannot connect to database [" + url + "]");
149          }
150        PooledConnection pooled_con = new PooledConnection(this, con);
151        buffer.put(pooled_con);
152        }
153    
154      checkedOutInfo = new Hashtable(); 
155      log.bug("PooledConnectionMgr.constructor: added ", buffer.size(), " connections to the connection pool");
156      }
157    
158    /** 
159    Helps to debug connection checkouts. By default, this is 
160    <tt>false</tt> (since there is a stack trace overhead per
161    connection when debugging is enabled).
162    
163    @param  dbg   <tt>true</tt> to enable tracking of checked out
164            connections from this pool. <tt>false</tt>
165            stops tracking and clears any existing tracking
166            info.
167    **/
168    public void setDebugCheckout(boolean dbg) {
169      dbgCheckOut = dbg;
170      if (! dbg) {
171        checkedOutInfo.clear();
172        }
173      }
174    
175    /** 
176    Returns a iterator over a collection of Exception objects, each
177    containing a stack trace for code that has currently checked out
178    a connection from this pool. For this to work, make sure that
179    {@link #setDebugCheckout} has first been invoked with 
180    <tt>true</tt>.
181    **/
182    public Iterator getCheckoutInfo() {
183      return checkedOutInfo.values().iterator();
184      }
185    
186    /**
187    Gets a list of connections in the pool such that each connection has a
188    transaction that was started more than the specified milliseconds ago but
189    has not yet been aborted or commited (this is a good way to see if there
190    is a transaction leak somewhere).
191    */
192    public List getHungTransactions(long milliseconds) 
193      {
194      long now = System.currentTimeMillis();
195      Object[] items = buffer.toArray();
196      List list = new ArrayList();
197      for (int n = 0; n < items.length; n++) {
198        PooledConnection con = (PooledConnection) items[n];
199        if (con.transaction_start_time == 0 
200          || (con.transaction_start_time + milliseconds) > now) {
201          continue;
202          }
203        list.add(con);
204        }
205      return list;
206      }
207    
208    /** 
209    Returns a connection from the pool possibly waiting
210    indefinitely until a connection is available. However, if a
211    connection is not obtained in {@link
212    #ConnectionTimeoutWarning} milliseconds, then a warning is
213    logged.
214    **/
215    protected Connection getConImpl() throws SQLException 
216      {
217      try {
218        PooledConnection con = null;
219        con = (PooledConnection) buffer.poll(ConnectionTimeoutWarning, 
220                            TimeUnit.MILLISECONDS);
221      
222        if (con == null) 
223          {
224          log.warn(this, 
225            "Could not obtain connection in ", 
226            String.valueOf(ConnectionTimeoutWarning), 
227            " milliseconds..\n", 
228            "Possibly waiting forever for connection here...\n",
229            IOUtil.throwableToString(new Exception("Debug Stack Trace"))
230            );
231      
232          con = (PooledConnection) buffer.take();
233          log.info(this, "...Connection obtained successfully.");
234          }
235          
236        con.setClosed(false);
237        
238        if (dbgCheckOut) {
239          checkedOutInfo.put(new Integer(con.getID()), new Exception("==PooledConnectionMgr/Checkout Debug Info=="));
240          }
241          
242        return con;
243        }
244      catch (InterruptedException e) {
245        e.printStackTrace();
246        throw new SQLException(e.getMessage());
247        }
248      }
249    
250    protected boolean handleMgrShutdown() {
251      buffer.closeAllConnections();
252      return true;
253      } 
254      
255    void put(PooledConnection con) throws SQLException
256      {
257      //we know this won't block because our pool only gets stuff
258      //put in it that was previously taken from it
259      try { 
260          buffer.put(con);
261        if (dbgCheckOut) {
262          checkedOutInfo.remove( new Integer(con.getID()) );
263          }
264        }
265      catch (InterruptedException e) 
266        {
267        e.printStackTrace();
268        throw new SQLException(e.getMessage());
269        }
270      }
271    
272    public String toString() {
273      return myname + " [capacity=" + buffer.bufsize + "; available=" + buffer.size() + "]";
274      }
275      
276    //== testing =================================================  
277      
278    public static void main(String[] args) throws Exception
279      {
280      Args myargs = new Args(args);
281      Log.getDefault().setLevel(SystemLog.DEBUG);
282      myargs.setUsage("java fc.jdbc.PooledConnectionMgr -conf conf-file -query query-to-execute");
283      
284      String query    = myargs.getRequired("query");
285      String propfile = myargs.getRequired("conf");
286      final int size = 5;
287      
288      //test 1
289      FilePropertyMgr fmgr = new FilePropertyMgr(new File(propfile));
290    
291      PooledConnectionMgr pool = new PooledConnectionMgr(fmgr, size);
292    
293      pool.setDebugCheckout(true);
294      
295      List list = new ArrayList();
296      for (int n = 0; n < size/2; n++) {
297        Connection con = pool.getConnection();
298        list.add(con);  
299        }
300        
301      System.out.println("Got connections, connection pool="+pool);
302      System.out.println("Now using connections...");
303      
304      int num = 1;
305      for (Iterator it = list.iterator(); it.hasNext(); /*empty*/) {
306        Connection item = (Connection) it.next();
307        Statement stmt = item.createStatement();
308        ResultSet rs = stmt.executeQuery(query);
309        System.out.println("=== query using connection [# " + num++ + "]" + item + "=====");
310        QueryUtil.printRS(rs);
311        System.out.println("=============================================================");
312        item.close();   
313        } 
314      
315      System.out.println("Used and closed all connections in pool.");
316      Connection con = pool.getConnection();
317      System.out.println("Got new connection:" + con);
318      
319      System.out.println("==== [debug] Checkout Info: ====");
320      for (Iterator it = pool.getCheckoutInfo(); it.hasNext(); /*empty*/) {
321        Throwable item = (Throwable) it.next();
322        item.printStackTrace();
323        } 
324      System.out.println("==== [debug end] Checkout Info: ====");
325          
326      System.out.println("Closed all connections, connection pool="+pool);  
327      
328      pool.close();
329      
330      System.out.println("Trying to get connection after closing connection mgr");
331      try {
332        con = pool.getConnection();
333        }
334      catch (Exception e) {
335        e.printStackTrace();    
336        }
337        
338      //test 2
339      System.out.println("TEST 2....");
340      
341      final PooledConnectionMgr pool2 = 
342              new PooledConnectionMgr(fmgr, size);
343    
344      Thread t = new Thread() 
345        {
346        public void run() 
347          {
348          try {
349            List list = new ArrayList();
350            for (int n = 0; n < size; n++) {
351              Connection con = pool2.getConnection();
352              list.add(con);  
353              System.out.println("Got connection: " + con);
354              }
355            System.out.println("sleeping 10 seconds");
356            Thread.currentThread().sleep(5000);
357            for (int n = 0; n < list.size(); n++)
358              {
359              System.out.println(
360                "Releasing connection: " + 
361                 (n+1) + " " + list.get(n) );
362              ((Connection) list.get(n)).close();
363              }
364            }
365          catch (Exception e) {
366            e.printStackTrace();
367            }
368          }
369        };
370        
371      t.start();
372      
373      for (int n = 0; n < size; n++) {
374        System.out.println("Main Thread: Trying to get connection..." + (n+1));
375        con = pool2.getConnection();
376        System.out.println("Main Thread: Got connection: " + con);
377        con.close();
378        }
379    
380      pool2.close();  
381      } //main
382    }          //~class PooledConnectionMgr