001    package org.hackystat.telemetry.analyzer.function.impl;
002    
003    import java.util.ArrayList;
004    import java.util.List;
005    import java.util.Map;
006    import java.util.TreeMap;
007    
008    import org.hackystat.telemetry.analyzer.function.TelemetryFunction;
009    import org.hackystat.telemetry.analyzer.function.TelemetryFunctionException;
010    import org.hackystat.telemetry.analyzer.model.TelemetryDataPoint;
011    import org.hackystat.telemetry.analyzer.model.TelemetryStream;
012    import org.hackystat.telemetry.analyzer.model.TelemetryStreamCollection;
013    
014    /**
015     * Filters telemetry streams in a <code>TelemetryStreamCollection</code> object by applying a 
016     * ranking function.
017     * 
018     * @author (Cedric) Qin ZHANG
019     */
020    public class FilterFunction extends TelemetryFunction {
021    
022      private static Map<String, RankFunction> RANK_FUNCTIONS = new TreeMap<String, RankFunction>();
023      
024      /**
025       * Constructs this instance.
026       * 
027       */
028      public FilterFunction() {
029        super("Filter");
030        RANK_FUNCTIONS.put("avg", new AverageRankFunction());
031        RANK_FUNCTIONS.put("max", new MaxRankFunction());
032        RANK_FUNCTIONS.put("min", new MinRankFunction());
033        RANK_FUNCTIONS.put("last", new LastRankFunction());
034        RANK_FUNCTIONS.put("delta", new DeltaRankFunction());
035        RANK_FUNCTIONS.put("simpledelta", new SimpleDeltaRankFunction());
036      }
037      
038      /**
039       * Performs filter operation. The returned <code>TelemetryStreamCollection</code> only contains
040       * telemetry streams that satisfies the specified criteria.
041       * 
042       * @param parameters An array of 3 objects.
043       *        <ul>
044       *          <li>1st element: A <code>TelemetryStreamCollection</code> instance.
045       *          <li>2nd element: A <code>String</code> representing ranking function name.
046       *              It must be one of "Sum|Max|Min|Last|Delta".
047       *          <li>3rd element: A <code>String</code> specifying how to apply cutoff value.
048       *              It must be one of "Above|Below|Top|Bottom|TopPercent|BottomPercent".
049       *          <li>4th element: A <code>Number</code> instance representing cutoff value.
050       *        </ul> 
051       * 
052       * @return A new <code>TelemetryStreamCollection</code> instance after filtering. Note
053       *         that it may contain no telemetry stream at all if no stream satisfies the criteria.
054       * 
055       * @throws TelemetryFunctionException If anything is wrong.
056       */
057      @Override
058      public Object compute(Object[] parameters) throws TelemetryFunctionException {
059        if (parameters.length != 4 || ! (parameters[0] instanceof TelemetryStreamCollection)
060             || ! (parameters[1] instanceof String) || ! (parameters[2] instanceof String)
061             || ! (parameters[3] instanceof Number || parameters[3] instanceof String)) {
062           throw new TelemetryFunctionException("Telemetry function " + this.getName()
063             + " accept 4 parameters: TelemetryStreamCollection, Avg|Max|Min|Last|Delta|SimpleDelta,"
064             + " Above|Below|Top|Bottom|TopPercent|BottomPercent, Number.");
065        }
066        
067        if (parameters[3] instanceof String) {
068          try {
069            parameters[3] = new Double((String) parameters[3]);
070          }
071          catch (NumberFormatException ex) {
072            throw new TelemetryFunctionException("The 4th parameter of telemetry function " 
073                + this.getName() + " is not a valid number.", ex);     
074          }
075        }
076          
077        TelemetryStreamCollection streams = (TelemetryStreamCollection) parameters[0];
078         
079        String rankFunctionName = (String) parameters[1];
080        RankFunction rankFunction = RANK_FUNCTIONS.get(rankFunctionName.toLowerCase());
081        if (rankFunction == null) {
082          throw new TelemetryFunctionException("Rank function '" + rankFunctionName 
083              + "' does not exist.");
084        }
085         
086        String opMode = (String) parameters[2];
087        Number cutoff = (Number) parameters[3];
088       
089        try {
090          if ("Above".equalsIgnoreCase(opMode)) {
091            return this.applyAbsoluteCutoff(streams, rankFunction, true, cutoff.doubleValue());
092          }
093          else if ("Below".equals(opMode)) {
094            return this.applyAbsoluteCutoff(streams, rankFunction, false, cutoff.doubleValue());
095          }
096          else if ("TopPercent".equalsIgnoreCase(opMode) || "BottomPercent".equalsIgnoreCase(opMode)
097              || "Top".equalsIgnoreCase(opMode) || "Bottom".equalsIgnoreCase(opMode)) {
098            return this.applyRelativeCutoff(streams, rankFunction, opMode, cutoff.intValue());
099          }
100          else {
101            throw new TelemetryFunctionException("The 3rd parameter of telemetry function " 
102                + this.getName() + " must be one of Above|Below|TopPercent|BottomPercent|Top|Bottom.");
103            }
104        }
105        catch (Exception ex) {
106          throw new TelemetryFunctionException(ex);
107        }
108      }
109      
110      /**
111       * Applies filter and returns only the telemetry streams meeting the criteria.
112       * 
113       * @param streams Telemetry stream collection.
114       * @param rankFunction The rank function.
115       * @param isAbove True if the filter is to return the streams above the cutoff value.
116       *                False if the filter is to return the streams below the cutoff value.
117       * @param cutoff The cutoff value.
118       * 
119       * @return A new <code>TelemetryStreamCollection</code> instance after filtering. 
120       * @throws Exception If there is anything wrong.
121       */
122      private TelemetryStreamCollection applyAbsoluteCutoff(TelemetryStreamCollection streams,
123          RankFunction rankFunction, boolean isAbove, double cutoff) throws Exception {
124        
125        TelemetryStreamCollection target = new TelemetryStreamCollection(
126            streams.getName(), streams.getProject(), streams.getInterval());   
127        for (TelemetryStream stream : streams) {
128          double rank = rankFunction.getRank(stream);
129          if ((isAbove && rank > cutoff) || (! isAbove && rank < cutoff)) {
130            target.add(stream);
131          }
132        }
133        return target;
134      }
135      
136      /**
137       * Applies filter and returns the telemetry streams at the top or bottom.
138       * 
139       * @param streams Telemetry stream collection.
140       * @param rankFunction The rank function.
141       * @param opMode Operation mode.
142       * @param cutoff The cutoff value.
143       * 
144       * @return A new <code>TelemetryStreamCollection</code> instance after filtering. 
145       * @throws Exception If there is anything wrong.
146       */
147      @SuppressWarnings("cast")
148      private TelemetryStreamCollection applyRelativeCutoff(TelemetryStreamCollection streams, 
149          RankFunction rankFunction, String opMode, int cutoff) throws Exception {
150        
151        TelemetryStreamCollection target = new TelemetryStreamCollection(
152            streams.getName(), streams.getProject(), streams.getInterval());
153        ArrayList<TelemetryStream> orderedList = this.sort(rankFunction, streams);
154        
155        if ("TopPercent".equalsIgnoreCase(opMode)) {
156          if (cutoff < 0 || cutoff > 100) {
157            throw new TelemetryFunctionException("You must supply a cutoff value from 0 to 100 for " +
158                "'TopPercent' operation mode.");
159          }
160          int start = (int) Math.floor(
161              (double) orderedList.size() - (double) orderedList.size() * cutoff / 100);
162          if (start < 0) {
163            start = 0;
164          }
165          for (int i = start; i < orderedList.size(); i++) {
166            target.add(orderedList.get(i));
167          }
168        }
169        else if ("BottomPercent".equalsIgnoreCase(opMode)) {
170          if (cutoff < 0 || cutoff > 100) {
171            throw new TelemetryFunctionException("You must supply a cutoff value from 0 to 100 for " +
172                "'BottomPercent' operation mode.");
173          }
174          int count = (int) Math.ceil((double) orderedList.size() * cutoff / 100);
175          for (int i = 0; i < count && i < orderedList.size(); i++) {
176            target.add(orderedList.get(i));
177          }
178        }
179        else if ("Top".equalsIgnoreCase(opMode)) {
180          int start = orderedList.size() - cutoff;
181          if (start < 0) {
182            start = 0;
183          }
184          for (int i = start; i < orderedList.size(); i++) {
185            target.add(orderedList.get(i));
186          }
187        }
188        else if ("Bottom".equalsIgnoreCase(opMode)) {
189          for (int i = 0; i < cutoff && i < orderedList.size(); i++) {
190            target.add(orderedList.get(i));
191          }
192        }
193        else {
194          throw new TelemetryFunctionException("Unsupported op mode '" + opMode + "'.");
195        }
196        
197        return target;
198      }
199      
200      /**
201       * Sorts the telemetry streams according to a rank function.
202       * 
203       * @param rankFunction The rank function.
204       * @param streams Telemetry stream collection.
205       * 
206       * @return A sorted list of telemetry streams, from the smallest rank value to the largest.
207       * 
208       * @throws TelemetryFunctionException If the rank function does not exist.
209       */
210      private ArrayList<TelemetryStream> sort(RankFunction rankFunction, 
211          TelemetryStreamCollection streams) 
212          throws TelemetryFunctionException {
213    
214        TreeMap<Double, List<TelemetryStream>> map = new TreeMap<Double, List<TelemetryStream>>();
215        for (TelemetryStream stream : streams.getTelemetryStreams()) {
216          Double rank = new Double(rankFunction.getRank(stream));
217          List<TelemetryStream> list = map.get(rank);
218          if (list == null) {
219            list = new ArrayList<TelemetryStream>(1);
220            map.put(rank, list);
221          }
222          list.add(stream);
223        }
224        
225        ArrayList<TelemetryStream> sortedList = 
226          new ArrayList<TelemetryStream>(streams.getTelemetryStreams().size());
227        for (List<TelemetryStream> list : map.values()) {
228          sortedList.addAll(list);
229        }
230        return sortedList;
231      }
232      
233      //===================== Rank Function ==============================
234      
235      /**
236       * Rank function.
237       */
238      interface RankFunction {
239        
240        /**
241         * Gets a rank value for a telemetry stream. Note that any implmentation MUST observe the
242         * rules: The function should never return Double.POSITIVE_INFINITY, 
243         *       Double.NEGATIVE_INFINITY, or Double.NaN.
244         *
245         * @param stream A telemetry stream.
246         * @return A rank value.
247         */
248        double getRank(TelemetryStream stream);
249      }
250      
251      //NOTE: Sum function is bad, it ignores the data points with no value, which makes the sum
252      //      of different telemetry streams not comparable if there is a missing point.
253      //      There is no easy way around this. Use Average instead.
254      
255    //  /**
256    //   * A rank function using sum as ranking criteria.
257    //   * 
258    //   * @author (Cedric) Qin ZHANG
259    //   * @version $Id$
260    //   */
261    //  static class SumRankFunction implements RankFunction {
262    //    
263    //    /**
264    //     * Computes the sum of a telemetry stream.
265    //     * @param stream A telemetry stream.
266    //     * @return The sum.
267    //     */
268    //    public double getRank(TelemetryStream stream) {
269    //      double sum = 0;
270    //      for (Iterator i = stream.getDataPoints().iterator(); i.hasNext(); ) {
271    //        TelemetryDataPoint dp = (TelemetryDataPoint) i.next();
272    //        Number number = dp.getValue();
273    //        if (number != null) {
274    //          double numberValue = number.doubleValue();
275    //          if (! Double.isInfinite(numberValue) && ! Double.isNaN(numberValue)) {
276    //            //Cannot use infinity, because Positive infinity + negative infinity = NaN
277    //            sum += numberValue;
278    //          }
279    //        }
280    //      }
281    //      return sum;
282    //    }
283    //  }
284      
285      /**
286       * A rank function using average as ranking criteria.
287       * 
288       * @author (Cedric) Qin ZHANG
289       */
290      static class AverageRankFunction implements RankFunction {
291        
292        /**
293         * Computes the average of a telemetry stream.
294         * @param stream A telemetry stream.
295         * @return The average.
296         */
297        public double getRank(TelemetryStream stream) {
298          int count = 0;
299          double sum = 0;
300          for (TelemetryDataPoint dp : stream.getDataPoints()) {
301            Number number = dp.getValue();
302            if (number != null) {
303              double numberValue = number.doubleValue();
304              if (! Double.isInfinite(numberValue) && ! Double.isNaN(numberValue)) {
305                //Cannot use infinity, because Positive infinity + negative infinity = NaN
306                count ++;
307                sum += numberValue;
308              }
309            }
310          }
311          return count == 0 ? 0 : sum / count;
312        }
313      }
314      
315      /**
316       * A rank function using max as ranking criteria.
317       * 
318       * @author (Cedric) Qin ZHANG
319       */
320      static class MaxRankFunction implements RankFunction {
321        
322        /**
323         * Computes the max value of a telemetry stream.
324         * @param stream A telemetry stream.
325         * @return The max value.
326         */
327        public double getRank(TelemetryStream stream) {
328          double max = Double.MIN_VALUE;
329          for (TelemetryDataPoint dp : stream.getDataPoints()) {
330            Number number = dp.getValue();
331            if (number != null) {
332              double numberValue = number.doubleValue();
333              if (! Double.isInfinite(numberValue) && ! Double.isNaN(numberValue)) {
334                if (max < numberValue) { //NOPMD
335                  max = numberValue;
336                }
337              }
338            }
339          }
340          return max;
341        }
342      }
343      
344      /**
345       * A rank function using min as ranking criteria.
346       * 
347       * @author (Cedric) Qin ZHANG
348       */
349      static class MinRankFunction implements RankFunction {
350        
351        /**
352         * Computes the min value of a telemetry stream.
353         * @param stream A telemetry stream.
354         * @return The min value.
355         */
356        public double getRank(TelemetryStream stream) {
357          double min = Double.MAX_VALUE;
358          for (TelemetryDataPoint dp : stream.getDataPoints()) {
359            Number number = dp.getValue();
360            if (number != null) {
361              double numberValue = number.doubleValue();
362              if (! Double.isInfinite(numberValue) && ! Double.isNaN(numberValue)) {
363                if (min > numberValue) { //NOPMD
364                  min = numberValue;
365                }
366              }
367            }
368          }
369          return min;
370        }
371      }
372      
373      /**
374       * A rank function using the lastest data point value as ranking criteria.
375       * 
376       * @author (Cedric) Qin ZHANG
377       */
378      static class LastRankFunction implements RankFunction {
379        
380        /**
381         * Computes the last value of a telemetry stream.
382         * @param stream A telemetry stream.
383         * @return The last value.
384         */
385        public double getRank(TelemetryStream stream) {
386          List<TelemetryDataPoint> list = stream.getDataPoints();
387          for (int i = list.size() - 1; i >= 0; i--) {
388            TelemetryDataPoint dp = list.get(i);
389            Number number = dp.getValue();
390            if (number != null) {
391              double numberValue = number.doubleValue();
392              if (! Double.isInfinite(numberValue) && ! Double.isNaN(numberValue)) {
393                return numberValue;
394              }
395            }
396          }
397          return 0;
398        }
399      }
400      
401      /**
402       * A rank function using delta as ranking criteria.
403       * 
404       * @author (Cedric) Qin ZHANG
405       */
406      static class DeltaRankFunction implements RankFunction {
407        
408        /**
409         * Computes the sum of delta of a telemetry stream.
410         * @param stream A telemetry stream.
411         * @return The delta sum.
412         */
413        public double getRank(TelemetryStream stream) {
414          double sum = 0;
415          double lastValue = Double.NaN;
416          for (TelemetryDataPoint dp : stream.getDataPoints()) {
417            Number number = dp.getValue();
418            if (number != null) {
419              double numberValue = number.doubleValue();
420              if (! Double.isInfinite(numberValue) && ! Double.isNaN(numberValue)) {
421                if (! Double.isNaN(lastValue)) {
422                  sum += Math.abs(numberValue - lastValue);
423                }
424                lastValue = numberValue;
425              }
426            }
427          }
428          return sum;
429        }
430      }
431      
432      /**
433       * A rank function using the different between the last data point and the first data point
434       * as criteria. Note that this rank value may be negative if the last 
435       * 
436       * @author (Cedric) Qin ZHANG
437       */
438      static class SimpleDeltaRankFunction implements RankFunction {
439        
440        /**
441         * Computes the sum of delta of a telemetry stream.
442         * @param stream A telemetry stream.
443         * @return The delta sum.
444         */
445        public double getRank(TelemetryStream stream) {
446          double firstValue = Double.NaN;
447          double lastValue = Double.NaN;
448          for (TelemetryDataPoint dp : stream.getDataPoints()) {
449            Number number = dp.getValue();
450            if (number != null) {
451              double numberValue = number.doubleValue();
452              if (! Double.isInfinite(numberValue) && ! Double.isNaN(numberValue)) {
453                if (Double.isNaN(firstValue)) {
454                  firstValue = numberValue;
455                }
456                lastValue = numberValue;
457              }
458            }
459          }
460          return (Double.isNaN(firstValue) || Double.isNaN(lastValue)) ? 0 : lastValue - firstValue;
461        }
462      }
463    }