1 /*
2 * $HeadURL: $
3 * $Date: $
4 * $Revision: $
5 * $Author: $
6 *
7 * Copyright (c) 2005 MindTree Consulting Ltd.
8 *
9 * This file is part of Insight.
10 *
11 * Insight is free software: you can redistribute it and/or modify it under the
12 * terms of the GNU General Public License as published by the Free Software
13 * Foundation, either version 3 of the License, or (at your option) any later
14 * version.
15 *
16 * Insight is distributed in the hope that it will be useful, but
17 * WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
19 * Public License for more details.
20 *
21 * You should have received a copy of the GNU General Public License along with
22 * Insight. If not, see <http://www.gnu.org/licenses/>.
23 */
24
25 package com.mindtree.techworks.insight.receiver;
26
27 import java.util.ArrayList;
28 import java.util.Iterator;
29 import java.util.List;
30
31 import org.apache.log4j.Appender;
32
33 import com.mindtree.techworks.insight.model.ILogEventModelMutator;
34 import com.mindtree.techworks.insight.model.IMutatorListener;
35 import com.mindtree.techworks.insight.spi.LogEvent;
36 import com.mindtree.techworks.insight.spi.LogNamespace;
37
38 /**
39 *
40 * The <code>AbstractReceiver</code> class is an abstraction of functionality common to all Insight
41 * receiver implementations.
42 * This class doesnot notify registered Log4j Appender instances. Instead it notifies those
43 * appenders that have been registered directly with this class using
44 * <p>
45 * #addAppender(Appender appender)
46 * <p>
47 * The appenders are notified for every LoggingEvent successfully parsed/received by this receiver.
48 * This AbstractReceiver processes events in an asynchronous manner using callback
49 * methods on derived implementations for events and their availability.
50 *
51 * @author Regunath B
52 * @version 1.0, 05/07/05
53 */
54
55 public abstract class AbstractReceiver extends Thread implements ReceiverInterpreter, ILogEventModelMutator {
56
57 /**
58 * State constants for this Thread - Thread Idle
59 */
60 private static final int IDLE = 0;
61
62 /**
63 * State constants for this Thread - Thread Active
64 */
65 private static final int ACTIVE = 1;
66
67 /**
68 * State constants for this Thread - Thread Exited
69 */
70 private static final int EXIT = 2;
71
72 /**
73 * Sleep duration in milliseconds during tailing
74 */
75 private static final long SLEEP_DURATION = 250;
76
77 /**
78 * State identifier for this Thread
79 */
80 private volatile int state = IDLE;
81
82 /**
83 * Count of events interpreted by this Receiver
84 */
85 protected long interpretedEventCount = 0L;
86
87 /**
88 * List of Appender instances that have been registered for notification of Log Events.
89 */
90 private List appenders = new ArrayList();
91
92 /**
93 * List of ReceiverListener instance that have been registered for notification
94 */
95 private List receiverListeners = new ArrayList();
96
97 /**
98 * List of MutatorListener instances that have been registered
99 */
100 private List mutatorListeners = new ArrayList();
101
102 /**
103 * Adds the specified Appender for notification when a LoggingEvent has been parsed
104 * @param appender the Appender instance to be added to the appender list
105 */
106 public void addAppender(Appender appender) {
107 appenders.add(appender);
108 }
109
110 /**
111 * Removes the specified Appender for notification when a LoggingEvent has been parsed
112 * @param appender the Appender instance to be added to the appender list
113 */
114 public void removeAppender(Appender appender) {
115 appenders.remove(appender);
116 }
117
118 /**
119 * Adds the specified ReceiverListener for notification when a ReceiverSpecifc Events will occur.
120 * @param receiverListener the ReceiverListener instance to be added to the receiverListener list
121 */
122 public void addReceiverListener(ReceiverListener receiverListener) {
123 receiverListeners.add(receiverListener);
124 }
125
126 /**
127 * Removes the specified ReceiverListener for notification when a ReceiverSpecifc Events will occur.
128 * @param receiverListener the ReceiverListener instance to be added to the receiverListener list
129 */
130 public void removeReceiverListener (ReceiverListener receiverListener) {
131 receiverListeners.remove(receiverListener);
132 }
133
134 /**
135 * Adds the specified MutatorListener
136 * @param mutatorListener the IMutatorListener to add
137 */
138 public void addMutatorListener (IMutatorListener mutatorListener) {
139 mutatorListeners.add(mutatorListener);
140 }
141
142 /**
143 * Removes the specified MutatorListener
144 * @param mutatorListener the MutatorListener to remove
145 */
146 public void removeMutatorListener (IMutatorListener mutatorListener) {
147 mutatorListeners.remove(mutatorListener);
148 }
149
150 /**
151 * Informs this receiver to start processing events from the
152 * log namespace.
153 */
154 public void startup() {
155 if (this.state == ACTIVE) { // this thread has started already
156 return;
157 }
158 this.state = ACTIVE;
159 // explicitly set the priority to normal. Otherwise
160 // this thread will inherit the priority of the creator thread
161 // i.e the AWT event dispatch thread that runs with max priority
162 this.setPriority(Thread.NORM_PRIORITY);
163 this.start();
164 }
165
166 /**
167 * Overriden Thread super class method
168 * @see java.lang.Thread#run()
169 */
170 public void run() {
171 this.notifyStartup();
172 while(hasMoreEvents() || isTailing()) {
173 if (this.state == EXIT || this.state == IDLE) {
174 break;
175 }
176 LogEvent[] events = getNextEvents();
177 if (events != null) {
178 for (int i = 0; i < events.length; i++) {
179 fireEventReceived(events[i]);
180 }
181 } else if (isTailing()) {
182 synchronized (this) {
183 try {
184 wait(SLEEP_DURATION);
185 } catch (InterruptedException ie) {
186 // do nothing
187 }
188 }
189 }
190 }
191 this.notifyShutdown();
192 }
193
194 /**
195 * Informs this receiver to stop processing events from the log
196 * namespace.
197 */
198 public void shutdown() {
199 if (this.state == EXIT) { // this thread has stopped
200 return;
201 }
202 this.state = EXIT;
203 synchronized (this) {
204 notifyAll();
205 }
206 }
207
208 /**
209 * Returns the LogNamespace that this AbstractReceiver was initialized with
210 * @return Returns the namespace.
211 */
212 public abstract LogNamespace [] getNamespaces();
213
214 /**
215 * Returns true if this AbstractReceiver tails the LogNamespace
216 * @return the isTailing value specified during initialization
217 * @see #initialize(LogNamespace, boolean)
218 */
219 public abstract boolean isTailing();
220
221 /**
222 * Informs this AbstractReceiver to initialize itself. This method is called
223 * after the no args constructor of the specific derived instance has been
224 * called. Tailing receivers require an explicit call to shutdown() to stop
225 * receiving events from the specified LogNamespace.
226 * @param namespace the LogNamespace that this AbstractReceiver will process events from
227 * @param isTailing determines if this AbstractReceiver tails the specified LogNamespace
228 * @throws ReceiverInitializationException if any problems occurs during the initialization of receiver.
229 * @see #shutdown()
230 */
231 protected abstract void initialize(LogNamespace namespace, boolean isTailing) throws ReceiverInitializationException;
232
233 /**
234 * Informs this AbstractReceiver to reset itself. This method is called
235 * post shutdown to let this class cleanup and release resources such
236 * socket connections, input streams e.t.c.
237 */
238 protected abstract void deInitialize();
239
240 /**
241 * Gets null or the next LoggingEvent(s) processed by this receiver
242 * @return null or the LoggingEvent instance
243 */
244 protected abstract LogEvent[] getNextEvents();
245
246 /**
247 * Determines if this receiver has any more events to process
248 * @return true if events exist in the log namespace for processing
249 */
250 protected abstract boolean hasMoreEvents();
251
252 /**
253 * Informs this AbstractReceiver that the specifed event has been received.
254 * Notifies only Appender instances that have been directly registered with
255 * this receiver.
256 * @param event the LogEvent that has been interpreted
257 */
258 private void fireEventReceived (LogEvent event) {
259 Iterator iterator = appenders.iterator();
260 LogEvent logEvent = event;
261 // logEvent.setNamespace(this.getNamespace());
262 while(iterator.hasNext()) {
263 ((Appender)iterator.next()).doAppend(logEvent);
264 }
265 this.interpretedEventCount += 1;
266 }
267
268 /**
269 * Notifies thread exceution startup
270 */
271 private void notifyStartup() {
272 // synchronize on the class intance to prevent multiple
273 // instances of this thread from being active at the same time
274 synchronized(AbstractReceiver.class) {
275 Iterator iterator1 = receiverListeners.iterator();
276 //Notify All the Listerners that File Load event has been started
277 while(iterator1.hasNext()){
278 ((ReceiverListener)iterator1.next()).startLoadEventNotification(getNamespaces());
279 }
280 Iterator mutatorIterator1 = mutatorListeners.iterator();
281 while(mutatorIterator1.hasNext()){
282 ((IMutatorListener)mutatorIterator1.next()).startMutating(this.isTailing() ?
283 ILogEventModelMutator.TAILING_MUTATOR : ILogEventModelMutator.NON_TAILING_MUTATOR);
284 }
285 }
286 }
287
288 /**
289 * Notifies thread exceution completion and shutdown
290 */
291 private void notifyShutdown() {
292 // synchronize on the class intance to prevent multiple
293 // instances of this thread from being active at the same time
294 synchronized(AbstractReceiver.class) {
295 //Send Notification to all the Receiver Listener that
296 //Loading of file has been completed
297 Iterator iterator2 = receiverListeners.iterator();
298 int receiverInfoFlag = interpretedEventCount == 0L ? ReceiverInterpreter.FAILURE : ReceiverInterpreter.SUCCESS;
299 while(iterator2.hasNext()){
300 ((ReceiverListener)iterator2.next()).endLoadEventNotification(getNamespaces(), receiverInfoFlag);
301 }
302 int mutatorInfoFlag = interpretedEventCount == 0L ? ILogEventModelMutator.FAILURE : ILogEventModelMutator.SUCCESS;
303 Iterator mutatorIterator2 = mutatorListeners.iterator();
304 while(mutatorIterator2.hasNext()){
305 ((IMutatorListener)mutatorIterator2.next()).endMutating(mutatorInfoFlag);
306 }
307 this.deInitialize();
308 }
309 }
310 }