View Javadoc

1   /*
2    * $HeadURL: $
3    * $Date: $
4    * $Revision: $
5    * $Author: $
6    * 
7    * Copyright (c) 2005 MindTree Consulting Ltd. 
8    * 
9    * This file is part of Insight.
10   * 
11   * Insight is free software: you can redistribute it and/or modify it under the 
12   * terms of the GNU General Public License as published by the Free Software 
13   * Foundation, either version 3 of the License, or (at your option) any later 
14   * version.
15   * 
16   * Insight is distributed in the hope that it will be useful, but 
17   * WITHOUT ANY WARRANTY; without even the implied warranty of 
18   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General 
19   * Public License for more details.
20   * 
21   * You should have received a copy of the GNU General Public License along with 
22   * Insight.  If not, see <http://www.gnu.org/licenses/>.
23   */
24  
25  package com.mindtree.techworks.insight.receiver;
26  
27  import java.util.ArrayList;
28  import java.util.Iterator;
29  import java.util.List;
30  
31  import org.apache.log4j.Appender;
32  
33  import com.mindtree.techworks.insight.model.ILogEventModelMutator;
34  import com.mindtree.techworks.insight.model.IMutatorListener;
35  import com.mindtree.techworks.insight.spi.LogEvent;
36  import com.mindtree.techworks.insight.spi.LogNamespace;
37  
38  /**
39  *
40  * The <code>AbstractReceiver</code> class is an abstraction of functionality common to all Insight
41  * receiver implementations. 
42  * This class doesnot notify registered Log4j Appender instances. Instead it notifies those
43  * appenders that have been registered directly with this class using 
44  * <p>
45  * #addAppender(Appender appender)
46  * <p>    
47  * The appenders are notified for every LoggingEvent successfully parsed/received by this receiver.
48  * This AbstractReceiver processes events in an asynchronous manner using callback
49  * methods on derived implementations for events and their availability. 
50  *
51  * @author  Regunath B
52  * @version 1.0, 05/07/05
53  */
54  
55  public abstract class AbstractReceiver extends Thread implements ReceiverInterpreter, ILogEventModelMutator {
56  	
57  	/**
58  	 * State constants for this Thread - Thread Idle
59  	 */
60  	private static final int IDLE = 0;
61  	
62  	/**
63  	 * State constants for this Thread - Thread Active
64  	 */
65  	private static final int ACTIVE = 1;
66  	
67  	/**
68  	 * State constants for this Thread - Thread Exited
69  	 */
70  	private static final int EXIT = 2;
71  	
72  	/**
73  	 * Sleep duration in milliseconds during tailing 
74  	 */
75  	private static final long SLEEP_DURATION = 250;
76  	
77  	/**
78  	 * State identifier for this Thread
79  	 */
80  	private volatile int state = IDLE;
81  		
82  	/**
83  	 * Count of events interpreted by this Receiver
84  	 */
85  	protected long interpretedEventCount = 0L;
86  	
87  	/**
88  	 * List of Appender instances that have been registered for notification of Log Events.
89  	 */
90  	private List appenders = new ArrayList();
91  
92  	/**
93  	 * List of ReceiverListener instance that have been registered for notification
94  	 */
95  	private List receiverListeners = new ArrayList();
96  	
97  	/**
98  	 * List of MutatorListener instances that have been registered
99  	 */
100 	private List mutatorListeners = new ArrayList();
101 		
102 	/**
103 	 * Adds the specified Appender for notification when a LoggingEvent has been parsed
104 	 * @param appender the Appender instance to be added to the appender list
105 	 */
106 	public void addAppender(Appender appender) {
107 		appenders.add(appender);
108 	}
109 	
110 	/**
111 	 * Removes the specified Appender for notification when a LoggingEvent has been parsed
112 	 * @param appender the Appender instance to be added to the appender list
113 	 */
114 	public void removeAppender(Appender appender) {
115 		appenders.remove(appender);
116 	}
117 
118 	/**
119 	 * Adds the specified ReceiverListener for notification when a ReceiverSpecifc Events will occur.
120 	 * @param receiverListener the ReceiverListener instance to be added to the receiverListener list
121 	 */
122 	public void addReceiverListener(ReceiverListener receiverListener) {
123 		receiverListeners.add(receiverListener);		
124 	}
125 
126 	/**
127 	 * Removes the specified ReceiverListener for notification when a ReceiverSpecifc Events will occur.
128 	 * @param receiverListener the ReceiverListener instance to be added to the receiverListener list
129 	 */
130 	public void removeReceiverListener (ReceiverListener receiverListener) {
131 		receiverListeners.remove(receiverListener);
132 	}
133 
134 	/**
135 	 * Adds the specified MutatorListener
136 	 * @param mutatorListener the IMutatorListener to add
137 	 */
138 	public void addMutatorListener (IMutatorListener mutatorListener) {
139 		mutatorListeners.add(mutatorListener);
140 	}
141 
142 	/**
143 	 * Removes the specified MutatorListener
144 	 * @param mutatorListener the MutatorListener to remove
145 	 */
146 	public void removeMutatorListener (IMutatorListener mutatorListener) {
147 		mutatorListeners.remove(mutatorListener);
148 	}
149 	
150 	/**
151 	 * Informs this receiver to start processing events from the
152 	 * log namespace. 
153 	 */
154 	public void startup() {
155 		if (this.state == ACTIVE) { // this thread has started already
156 			return;
157 		}		
158 		this.state = ACTIVE;
159 	  	// explicitly set the priority to normal. Otherwise
160 	  	// this thread will inherit the priority of the creator thread
161 	  	// i.e the AWT event dispatch thread that runs with max priority
162 		this.setPriority(Thread.NORM_PRIORITY);							  	
163 		this.start();
164 	}
165 	
166 	/**
167 	 * Overriden Thread super class method
168 	 * @see java.lang.Thread#run()
169 	 */
170 	public void run() {
171 		this.notifyStartup();
172 		while(hasMoreEvents() || isTailing()) {
173 			if (this.state == EXIT || this.state == IDLE) {
174 				break;
175 			}
176 			LogEvent[] events = getNextEvents();			
177 			if (events != null) {
178 				for (int i = 0; i < events.length; i++) {
179 					fireEventReceived(events[i]);
180 				}
181 			} else if (isTailing()) {
182 				synchronized (this) {
183 					try {
184 						wait(SLEEP_DURATION);
185 					} catch (InterruptedException ie) {
186 						// do nothing
187 					}
188 				}
189 			}
190 		}
191 		this.notifyShutdown();
192 	}
193 	
194 	/**
195 	 * Informs this receiver to stop processing events from the log
196 	 * namespace. 
197 	 */
198 	public void shutdown() {
199 		if (this.state == EXIT) { // this thread has stopped
200 			return;
201 		}
202 		this.state = EXIT;
203 		synchronized (this) {
204 			notifyAll();
205 		}
206 	}
207 	
208 	/**
209 	 * Returns the LogNamespace that this AbstractReceiver was initialized with
210 	 * @return Returns the namespace.
211 	 */
212 	public abstract LogNamespace [] getNamespaces();
213 	
214 	/**
215 	 * Returns true if this AbstractReceiver tails the LogNamespace
216 	 * @return the isTailing value specified during initialization
217 	 * @see #initialize(LogNamespace, boolean)
218 	 */
219 	public abstract boolean isTailing();
220 	
221 	/**
222 	 * Informs this AbstractReceiver to initialize itself. This method is called
223 	 * after the no args constructor of the specific derived instance has been
224 	 * called. Tailing receivers require an explicit call to shutdown() to stop
225 	 * receiving events from the specified LogNamespace.
226 	 * @param namespace the LogNamespace that this AbstractReceiver will process events from
227 	 * @param isTailing determines if this AbstractReceiver tails the specified LogNamespace
228 	 * @throws ReceiverInitializationException if any problems occurs during the initialization of receiver. 
229 	 * @see #shutdown()
230 	 */
231 	protected abstract void initialize(LogNamespace namespace, boolean isTailing) throws ReceiverInitializationException;
232 	
233 	/**
234 	 * Informs this AbstractReceiver to reset itself. This method is called
235 	 * post shutdown to let this class cleanup and release resources such
236 	 * socket connections, input streams e.t.c.
237 	 */
238 	protected abstract void deInitialize();
239 	
240 	/**
241 	 * Gets null or the next LoggingEvent(s) processed by this receiver
242 	 * @return null or the LoggingEvent instance
243 	 */
244 	protected abstract LogEvent[] getNextEvents();
245 	
246 	/**
247 	 * Determines if this receiver has any more events to process
248 	 * @return true if events exist in the log namespace for processing
249 	 */
250 	protected abstract boolean hasMoreEvents();
251 	
252 	/**
253 	 * Informs this AbstractReceiver that the specifed event has been received. 
254 	 * Notifies only Appender instances that have been directly registered with 
255 	 * this receiver.
256 	 * @param event the LogEvent that has been interpreted
257 	 */
258 	private void fireEventReceived (LogEvent event) {
259 		Iterator iterator = appenders.iterator();
260 		LogEvent logEvent = event;
261 //		logEvent.setNamespace(this.getNamespace());
262 		while(iterator.hasNext()) {
263 			((Appender)iterator.next()).doAppend(logEvent);
264 		}
265 		this.interpretedEventCount += 1;
266 	}
267 	
268 	/**
269 	 * Notifies thread exceution startup
270 	 */
271 	private void notifyStartup() {
272 		// synchronize on the class intance to prevent multiple
273 		// instances of this thread from being active at the same time
274 		synchronized(AbstractReceiver.class) {
275 		  	Iterator iterator1 = receiverListeners.iterator();
276 		  	//Notify All the Listerners that File Load event has been started
277 		  	while(iterator1.hasNext()){
278 		  		((ReceiverListener)iterator1.next()).startLoadEventNotification(getNamespaces());
279 		  	}		  	
280 		  	Iterator mutatorIterator1 = mutatorListeners.iterator();
281 		  	while(mutatorIterator1.hasNext()){
282 		  		((IMutatorListener)mutatorIterator1.next()).startMutating(this.isTailing() ? 
283 		  				ILogEventModelMutator.TAILING_MUTATOR : ILogEventModelMutator.NON_TAILING_MUTATOR);
284 		  	}
285 		}
286 	}
287 
288 	/**
289 	 * Notifies thread exceution completion and shutdown
290 	 */
291 	private void notifyShutdown() {
292 		// synchronize on the class intance to prevent multiple
293 		// instances of this thread from being active at the same time
294 		synchronized(AbstractReceiver.class) {
295 	        //Send Notification to all the Receiver Listener that 
296 	        //Loading of file has been completed
297 	        Iterator iterator2 = receiverListeners.iterator();
298 	        int receiverInfoFlag = interpretedEventCount == 0L ? ReceiverInterpreter.FAILURE : ReceiverInterpreter.SUCCESS;
299 	        while(iterator2.hasNext()){
300 	        	((ReceiverListener)iterator2.next()).endLoadEventNotification(getNamespaces(), receiverInfoFlag);
301 	        }
302 	        int mutatorInfoFlag = interpretedEventCount == 0L ? ILogEventModelMutator.FAILURE : ILogEventModelMutator.SUCCESS; 
303 	     	Iterator mutatorIterator2 = mutatorListeners.iterator();
304 		  	while(mutatorIterator2.hasNext()){
305 		  		((IMutatorListener)mutatorIterator2.next()).endMutating(mutatorInfoFlag);
306 		  	}
307 		  	this.deInitialize();
308 		}
309 	}
310 }