1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package com.mindtree.techworks.insight.receiver;
25
26 import java.io.BufferedReader;
27 import java.io.IOException;
28 import java.io.Serializable;
29 import java.io.StringReader;
30 import java.net.DatagramPacket;
31 import java.net.DatagramSocket;
32 import java.net.SocketException;
33 import java.net.SocketTimeoutException;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.HashMap;
37 import java.util.Iterator;
38 import java.util.LinkedList;
39 import java.util.List;
40 import java.util.Map;
41
42 import com.mindtree.techworks.insight.model.ReceiverFormat;
43 import com.mindtree.techworks.insight.remoteprotocol.RemoteProtocolException;
44 import com.mindtree.techworks.insight.remoteprotocol.spi.DataPacket;
45 import com.mindtree.techworks.insight.remoteprotocol.spi.LogMessageDataContent;
46 import com.mindtree.techworks.insight.remoteprotocol.spi.MessageType;
47 import com.mindtree.techworks.insight.remoteprotocol.spi.NullMessageDataContent;
48 import com.mindtree.techworks.insight.remoteprotocol.spi.PacketDataContent;
49 import com.mindtree.techworks.insight.remoteprotocol.spi.PacketHeader;
50 import com.mindtree.techworks.insight.remoteprotocol.spi.StartupMessageDataContent;
51 import com.mindtree.techworks.insight.spi.LogEvent;
52 import com.mindtree.techworks.insight.spi.LogNamespace;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public class RemoteProtocolStreamReceiver extends AbstractReceiver {
72
73
74
75
76 private static final int SOCKET_TIMEOUT = 2000;
77
78
79
80
81
82
83
84
85 protected LinkedList packetQueue = new LinkedList ();
86
87
88
89
90 protected int portListeningOn;
91
92
93
94
95 private Map knownSources = new HashMap (4);
96
97
98
99
100 private Map unknownSources = new HashMap (2);
101
102
103
104
105 private List loadedNamespaces = new ArrayList(2);
106
107
108
109
110 private ReceiverFormat[] receiverFormat;
111
112
113
114
115 private LogNamespace unknownNamespace;
116
117
118
119
120 private DatagramSocket socket;
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136 public RemoteProtocolStreamReceiver (ReceiverFormat[] receiverFormat,
137 int portListeningOn) throws ReceiverInitializationException {
138
139 this.receiverFormat = receiverFormat;
140 this.portListeningOn = portListeningOn;
141 this.unknownNamespace = new LogNamespace ("unknown", "unknown",
142 receiverFormat, "unknown");
143 this.initialize (null, true);
144 }
145
146
147
148
149
150
151
152
153 public LogNamespace[] getNamespaces () {
154
155 if (loadedNamespaces.size() > 0) {
156 LogNamespace[] namespaces = new LogNamespace[loadedNamespaces.size()];
157 return (LogNamespace []) loadedNamespaces.toArray(namespaces);
158 }
159 return new LogNamespace[] {unknownNamespace};
160 }
161
162
163
164
165
166
167
168 public boolean isTailing () {
169 return true;
170 }
171
172
173
174
175
176
177
178 protected void initialize (LogNamespace namespace, boolean isTailing)
179 throws ReceiverInitializationException {
180 try {
181 socket = new DatagramSocket (this.portListeningOn);
182 socket.setSoTimeout(SOCKET_TIMEOUT);
183 } catch (SocketException e) {
184 throw new ReceiverInitializationException (
185 "Could not open socket.", e);
186 }
187 }
188
189
190
191
192 protected void deInitialize () {
193 this.socket.close();
194 }
195
196
197
198
199 protected LogEvent [] getNextEvents () {
200
201 populatePacketQueue();
202
203
204
205
206 int packetCountToProcess = packetQueue.size ();
207 ArrayList processedEvents = new ArrayList();
208 for (int i = 0; i < packetCountToProcess; i++ ) {
209 DataPacket currentPacket = null;
210 synchronized (packetQueue) {
211 currentPacket = (DataPacket) packetQueue.removeFirst ();
212 }
213 switch (currentPacket.getPacketHeader ().getMessageType ()
214 .getType ()) {
215 case MessageType.STARTUP_MESSAGE_TYPE:
216 processStartupPacket (currentPacket);
217 break;
218 case MessageType.LOG_DATA_MESSAGE_TYPE:
219 processedEvents.addAll (processLogPacket (currentPacket));
220 break;
221 case MessageType.SHUTDOWN_MESSAGE_TYPE:
222 Short sourceId = new Short (currentPacket
223 .getPacketHeader ().getSourceIdentifier ());
224 knownSources.remove (sourceId);
225 break;
226 default:
227
228 }
229 }
230
231 if (null != processedEvents) {
232 LogEvent [] loggingEvents = new LogEvent [processedEvents.size ()];
233 return (LogEvent []) processedEvents.toArray (loggingEvents);
234 }
235 return null;
236 }
237
238
239
240
241
242 protected boolean hasMoreEvents () {
243
244 if ( !this.packetQueue.isEmpty ()) {
245 for (Iterator packetItr = packetQueue.iterator (); packetItr
246 .hasNext ();) {
247 if (MessageType.LOG_DATA_MESSAGE
248 .equals (((DataPacket) packetItr.next ())
249 .getPacketHeader ().getMessageType ())) {
250 return true;
251 }
252 }
253 return false;
254 }
255 return false;
256 }
257
258
259
260
261
262
263
264
265
266
267
268 private void processStartupPacket (DataPacket currentPacket) {
269
270
271 if ( !MessageType.STARTUP_MESSAGE.equals (currentPacket
272 .getPacketDataContent ().getMessageType ())) {
273 return;
274 }
275
276
277 Short sourceId = new Short (currentPacket.getPacketHeader ()
278 .getSourceIdentifier ());
279 SourceDetails sourceDetails = null;
280 if ( !knownSources.containsKey (sourceId)) {
281
282 if (unknownSources.containsKey (sourceId)) {
283 sourceDetails = (SourceDetails) unknownSources.get (sourceId);
284 unknownSources.remove (sourceId);
285 } else {
286 sourceDetails = new SourceDetails (sourceId.shortValue ());
287 }
288 } else {
289 sourceDetails = (SourceDetails) knownSources.get (sourceId);
290 }
291
292
293 if ( !sourceDetails.isLogNamespaceSet ()) {
294 StartupMessageDataContent messageData = (StartupMessageDataContent) currentPacket
295 .getPacketDataContent ();
296 String namespace = messageData.getNamespace ();
297 String nodeId = namespace.split ("/")[2];
298
299 LogNamespace logNamespace = new LogNamespace (namespace, null,
300 receiverFormat, nodeId);
301 sourceDetails.setLogNamespace (logNamespace);
302
303 LogInterpreter logInterpreter = new LogInterpreter (logNamespace);
304 sourceDetails.setLogInterpreter (logInterpreter);
305
306 knownSources.put (sourceId, sourceDetails);
307 }
308
309 }
310
311
312
313
314
315
316
317
318 private ArrayList processLogPacket (DataPacket currentPacket) {
319
320
321 if ( !MessageType.LOG_DATA_MESSAGE.equals (currentPacket
322 .getPacketDataContent ().getMessageType ())) {
323 return new ArrayList (0);
324 }
325
326
327 Short sourceId = new Short (currentPacket.getPacketHeader ()
328 .getSourceIdentifier ());
329 SourceDetails sourceDetails = null;
330 if ( !knownSources.containsKey (sourceId)) {
331
332 if (unknownSources.containsKey (sourceId)) {
333 sourceDetails = (SourceDetails) unknownSources.get (sourceId);
334 } else {
335 sourceDetails = new SourceDetails (sourceId.shortValue ());
336 sourceDetails.setLogNamespace (unknownNamespace);
337 sourceDetails.setLogInterpreter (new LogInterpreter (unknownNamespace));
338 sourceDetails.setLogNamespaceSet (false);
339
340 unknownSources.put (sourceId, sourceDetails);
341 }
342 } else {
343 sourceDetails = (SourceDetails) knownSources.get (sourceId);
344 }
345
346
347 LogInterpreter logInterpreter = sourceDetails.getLogInterpreter ();
348 LogMessageDataContent logMessageDataContent = (LogMessageDataContent) currentPacket
349 .getPacketDataContent ();
350 ArrayList logMessages = new ArrayList ();
351 String logData = null;
352
353 try {
354 logData = logMessageDataContent.getLogMessage ();
355 } catch (RemoteProtocolException e) {
356
357 logData = "";
358 }
359
360
361 if (!loadedNamespaces.contains(sourceDetails.getLogNamespace())) {
362 loadedNamespaces.add(sourceDetails.getLogNamespace());
363 }
364
365 BufferedReader logReader = new BufferedReader (new StringReader (
366 logData));
367 try {
368 while (logReader.ready ()) {
369 String line = logReader.readLine ();
370 if (null != line) {
371 LogEvent [] logEvents = logInterpreter
372 .parseLogMessage(line);
373 logMessages.addAll (Arrays.asList (logEvents));
374 } else {
375 break;
376 }
377 }
378 } catch (IOException ioe) {
379
380 }
381
382 return logMessages;
383 }
384
385
386
387
388 private void populatePacketQueue () {
389 DatagramPacket packet = new DatagramPacket (
390 new byte [DataPacket.MAX_PACKET_SIZE],
391 DataPacket.MAX_PACKET_SIZE);
392 try {
393 socket.receive (packet);
394
395 try {
396 PacketHeader header = new PacketHeader (packet
397 .getData ());
398
399
400 byte [] messageData = new byte [packet.getLength ()];
401 System.arraycopy (packet.getData (), 0, messageData, 0,
402 packet.getLength ());
403
404 PacketDataContent content = null;
405
406 switch (header.getMessageType ().getType ()) {
407 case MessageType.STARTUP_MESSAGE_TYPE:
408 content = new StartupMessageDataContent (
409 messageData, header);
410 break;
411 case MessageType.LOG_DATA_MESSAGE_TYPE:
412 content = new LogMessageDataContent (
413 messageData, header);
414 break;
415 case MessageType.SHUTDOWN_MESSAGE_TYPE:
416 content = new NullMessageDataContent ();
417 content
418 .setMessageType (MessageType.SHUTDOWN_MESSAGE);
419 break;
420 default:
421
422 }
423
424 if (null != content) {
425 DataPacket dataPacket = new DataPacket (content,
426 header);
427 synchronized (packetQueue) {
428 packetQueue.addLast (dataPacket);
429 }
430 }
431 } catch (RemoteProtocolException e) {
432
433
434 }
435 } catch (SocketTimeoutException ste) {
436
437 } catch (IOException ioex) {
438
439 ioex.printStackTrace ();
440 } catch (Exception ie) {
441
442 ie.printStackTrace();
443 }
444 }
445
446
447
448
449
450
451
452
453
454
455
456 protected class SourceDetails implements Serializable {
457
458
459
460
461
462
463
464
465 private static final long serialVersionUID = -4363523350633442160L;
466
467
468
469
470
471
472
473
474 private LogNamespace logNamespace;
475
476
477
478
479 private short sourceId;
480
481
482
483
484 private boolean isLogNamespaceSet = false;
485
486
487
488
489 private LogInterpreter logInterpreter;
490
491
492
493
494
495
496
497
498
499
500
501 public SourceDetails (short sourceId) {
502
503 this.sourceId = sourceId;
504 }
505
506
507
508
509
510
511
512
513
514
515 public boolean isLogNamespaceSet () {
516
517 return isLogNamespaceSet;
518 }
519
520
521
522
523
524
525
526
527 public void setLogNamespaceSet (boolean isLogNamespaceSet) {
528
529 this.isLogNamespaceSet = isLogNamespaceSet;
530 }
531
532
533
534
535
536
537
538 public LogNamespace getLogNamespace () {
539
540 return logNamespace;
541 }
542
543
544
545
546
547
548
549
550 public void setLogNamespace (LogNamespace logNamespace) {
551
552 this.logNamespace = logNamespace;
553 }
554
555
556
557
558
559
560
561 public short getSourceId () {
562
563 return sourceId;
564 }
565
566
567
568
569
570
571
572 public LogInterpreter getLogInterpreter () {
573
574 return logInterpreter;
575 }
576
577
578
579
580
581
582
583
584 public void setLogInterpreter (LogInterpreter logInterpreter) {
585
586 this.logInterpreter = logInterpreter;
587 }
588 }
589 }