1 /* 2 * $HeadURL: $ 3 * $Date: $ 4 * $Revision: $ 5 * $Author: $ 6 * 7 * Copyright (c) 2005 MindTree Consulting Ltd. 8 * 9 * This file is part of Insight. 10 * 11 * Insight is free software: you can redistribute it and/or modify it under the 12 * terms of the GNU General Public License as published by the Free Software 13 * Foundation, either version 3 of the License, or (at your option) any later 14 * version. 15 * 16 * Insight is distributed in the hope that it will be useful, but 17 * WITHOUT ANY WARRANTY; without even the implied warranty of 18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General 19 * Public License for more details. 20 * 21 * You should have received a copy of the GNU General Public License along with 22 * Insight. If not, see <http://www.gnu.org/licenses/>. 23 */ 24 25 package com.mindtree.techworks.insight.receiver; 26 27 import java.util.ArrayList; 28 import java.util.Iterator; 29 import java.util.List; 30 31 import org.apache.log4j.Appender; 32 33 import com.mindtree.techworks.insight.model.ILogEventModelMutator; 34 import com.mindtree.techworks.insight.model.IMutatorListener; 35 import com.mindtree.techworks.insight.spi.LogEvent; 36 import com.mindtree.techworks.insight.spi.LogNamespace; 37 38 /** 39 * 40 * The <code>AbstractReceiver</code> class is an abstraction of functionality common to all Insight 41 * receiver implementations. 42 * This class doesnot notify registered Log4j Appender instances. Instead it notifies those 43 * appenders that have been registered directly with this class using 44 * <p> 45 * #addAppender(Appender appender) 46 * <p> 47 * The appenders are notified for every LoggingEvent successfully parsed/received by this receiver. 48 * This AbstractReceiver processes events in an asynchronous manner using callback 49 * methods on derived implementations for events and their availability. 50 * 51 * @author Regunath B 52 * @version 1.0, 05/07/05 53 */ 54 55 public abstract class AbstractReceiver extends Thread implements ReceiverInterpreter, ILogEventModelMutator { 56 57 /** 58 * State constants for this Thread - Thread Idle 59 */ 60 private static final int IDLE = 0; 61 62 /** 63 * State constants for this Thread - Thread Active 64 */ 65 private static final int ACTIVE = 1; 66 67 /** 68 * State constants for this Thread - Thread Exited 69 */ 70 private static final int EXIT = 2; 71 72 /** 73 * Sleep duration in milliseconds during tailing 74 */ 75 private static final long SLEEP_DURATION = 250; 76 77 /** 78 * State identifier for this Thread 79 */ 80 private volatile int state = IDLE; 81 82 /** 83 * Count of events interpreted by this Receiver 84 */ 85 protected long interpretedEventCount = 0L; 86 87 /** 88 * List of Appender instances that have been registered for notification of Log Events. 89 */ 90 private List appenders = new ArrayList(); 91 92 /** 93 * List of ReceiverListener instance that have been registered for notification 94 */ 95 private List receiverListeners = new ArrayList(); 96 97 /** 98 * List of MutatorListener instances that have been registered 99 */ 100 private List mutatorListeners = new ArrayList(); 101 102 /** 103 * Adds the specified Appender for notification when a LoggingEvent has been parsed 104 * @param appender the Appender instance to be added to the appender list 105 */ 106 public void addAppender(Appender appender) { 107 appenders.add(appender); 108 } 109 110 /** 111 * Removes the specified Appender for notification when a LoggingEvent has been parsed 112 * @param appender the Appender instance to be added to the appender list 113 */ 114 public void removeAppender(Appender appender) { 115 appenders.remove(appender); 116 } 117 118 /** 119 * Adds the specified ReceiverListener for notification when a ReceiverSpecifc Events will occur. 120 * @param receiverListener the ReceiverListener instance to be added to the receiverListener list 121 */ 122 public void addReceiverListener(ReceiverListener receiverListener) { 123 receiverListeners.add(receiverListener); 124 } 125 126 /** 127 * Removes the specified ReceiverListener for notification when a ReceiverSpecifc Events will occur. 128 * @param receiverListener the ReceiverListener instance to be added to the receiverListener list 129 */ 130 public void removeReceiverListener (ReceiverListener receiverListener) { 131 receiverListeners.remove(receiverListener); 132 } 133 134 /** 135 * Adds the specified MutatorListener 136 * @param mutatorListener the IMutatorListener to add 137 */ 138 public void addMutatorListener (IMutatorListener mutatorListener) { 139 mutatorListeners.add(mutatorListener); 140 } 141 142 /** 143 * Removes the specified MutatorListener 144 * @param mutatorListener the MutatorListener to remove 145 */ 146 public void removeMutatorListener (IMutatorListener mutatorListener) { 147 mutatorListeners.remove(mutatorListener); 148 } 149 150 /** 151 * Informs this receiver to start processing events from the 152 * log namespace. 153 */ 154 public void startup() { 155 if (this.state == ACTIVE) { // this thread has started already 156 return; 157 } 158 this.state = ACTIVE; 159 // explicitly set the priority to normal. Otherwise 160 // this thread will inherit the priority of the creator thread 161 // i.e the AWT event dispatch thread that runs with max priority 162 this.setPriority(Thread.NORM_PRIORITY); 163 this.start(); 164 } 165 166 /** 167 * Overriden Thread super class method 168 * @see java.lang.Thread#run() 169 */ 170 public void run() { 171 this.notifyStartup(); 172 while(hasMoreEvents() || isTailing()) { 173 if (this.state == EXIT || this.state == IDLE) { 174 break; 175 } 176 LogEvent[] events = getNextEvents(); 177 if (events != null) { 178 for (int i = 0; i < events.length; i++) { 179 fireEventReceived(events[i]); 180 } 181 } else if (isTailing()) { 182 synchronized (this) { 183 try { 184 wait(SLEEP_DURATION); 185 } catch (InterruptedException ie) { 186 // do nothing 187 } 188 } 189 } 190 } 191 this.notifyShutdown(); 192 } 193 194 /** 195 * Informs this receiver to stop processing events from the log 196 * namespace. 197 */ 198 public void shutdown() { 199 if (this.state == EXIT) { // this thread has stopped 200 return; 201 } 202 this.state = EXIT; 203 synchronized (this) { 204 notifyAll(); 205 } 206 } 207 208 /** 209 * Returns the LogNamespace that this AbstractReceiver was initialized with 210 * @return Returns the namespace. 211 */ 212 public abstract LogNamespace [] getNamespaces(); 213 214 /** 215 * Returns true if this AbstractReceiver tails the LogNamespace 216 * @return the isTailing value specified during initialization 217 * @see #initialize(LogNamespace, boolean) 218 */ 219 public abstract boolean isTailing(); 220 221 /** 222 * Informs this AbstractReceiver to initialize itself. This method is called 223 * after the no args constructor of the specific derived instance has been 224 * called. Tailing receivers require an explicit call to shutdown() to stop 225 * receiving events from the specified LogNamespace. 226 * @param namespace the LogNamespace that this AbstractReceiver will process events from 227 * @param isTailing determines if this AbstractReceiver tails the specified LogNamespace 228 * @throws ReceiverInitializationException if any problems occurs during the initialization of receiver. 229 * @see #shutdown() 230 */ 231 protected abstract void initialize(LogNamespace namespace, boolean isTailing) throws ReceiverInitializationException; 232 233 /** 234 * Informs this AbstractReceiver to reset itself. This method is called 235 * post shutdown to let this class cleanup and release resources such 236 * socket connections, input streams e.t.c. 237 */ 238 protected abstract void deInitialize(); 239 240 /** 241 * Gets null or the next LoggingEvent(s) processed by this receiver 242 * @return null or the LoggingEvent instance 243 */ 244 protected abstract LogEvent[] getNextEvents(); 245 246 /** 247 * Determines if this receiver has any more events to process 248 * @return true if events exist in the log namespace for processing 249 */ 250 protected abstract boolean hasMoreEvents(); 251 252 /** 253 * Informs this AbstractReceiver that the specifed event has been received. 254 * Notifies only Appender instances that have been directly registered with 255 * this receiver. 256 * @param event the LogEvent that has been interpreted 257 */ 258 private void fireEventReceived (LogEvent event) { 259 Iterator iterator = appenders.iterator(); 260 LogEvent logEvent = event; 261 // logEvent.setNamespace(this.getNamespace()); 262 while(iterator.hasNext()) { 263 ((Appender)iterator.next()).doAppend(logEvent); 264 } 265 this.interpretedEventCount += 1; 266 } 267 268 /** 269 * Notifies thread exceution startup 270 */ 271 private void notifyStartup() { 272 // synchronize on the class intance to prevent multiple 273 // instances of this thread from being active at the same time 274 synchronized(AbstractReceiver.class) { 275 Iterator iterator1 = receiverListeners.iterator(); 276 //Notify All the Listerners that File Load event has been started 277 while(iterator1.hasNext()){ 278 ((ReceiverListener)iterator1.next()).startLoadEventNotification(getNamespaces()); 279 } 280 Iterator mutatorIterator1 = mutatorListeners.iterator(); 281 while(mutatorIterator1.hasNext()){ 282 ((IMutatorListener)mutatorIterator1.next()).startMutating(this.isTailing() ? 283 ILogEventModelMutator.TAILING_MUTATOR : ILogEventModelMutator.NON_TAILING_MUTATOR); 284 } 285 } 286 } 287 288 /** 289 * Notifies thread exceution completion and shutdown 290 */ 291 private void notifyShutdown() { 292 // synchronize on the class intance to prevent multiple 293 // instances of this thread from being active at the same time 294 synchronized(AbstractReceiver.class) { 295 //Send Notification to all the Receiver Listener that 296 //Loading of file has been completed 297 Iterator iterator2 = receiverListeners.iterator(); 298 int receiverInfoFlag = interpretedEventCount == 0L ? ReceiverInterpreter.FAILURE : ReceiverInterpreter.SUCCESS; 299 while(iterator2.hasNext()){ 300 ((ReceiverListener)iterator2.next()).endLoadEventNotification(getNamespaces(), receiverInfoFlag); 301 } 302 int mutatorInfoFlag = interpretedEventCount == 0L ? ILogEventModelMutator.FAILURE : ILogEventModelMutator.SUCCESS; 303 Iterator mutatorIterator2 = mutatorListeners.iterator(); 304 while(mutatorIterator2.hasNext()){ 305 ((IMutatorListener)mutatorIterator2.next()).endMutating(mutatorInfoFlag); 306 } 307 this.deInitialize(); 308 } 309 } 310 }