001 package org.hackystat.telemetry.analyzer.function.impl; 002 003 import java.util.ArrayList; 004 import java.util.List; 005 import java.util.Map; 006 import java.util.TreeMap; 007 008 import org.hackystat.telemetry.analyzer.function.TelemetryFunction; 009 import org.hackystat.telemetry.analyzer.function.TelemetryFunctionException; 010 import org.hackystat.telemetry.analyzer.model.TelemetryDataPoint; 011 import org.hackystat.telemetry.analyzer.model.TelemetryStream; 012 import org.hackystat.telemetry.analyzer.model.TelemetryStreamCollection; 013 014 /** 015 * Filters telemetry streams in a <code>TelemetryStreamCollection</code> object by applying a 016 * ranking function. 017 * 018 * @author (Cedric) Qin ZHANG 019 */ 020 public class FilterFunction extends TelemetryFunction { 021 022 private static Map<String, RankFunction> RANK_FUNCTIONS = new TreeMap<String, RankFunction>(); 023 024 /** 025 * Constructs this instance. 026 * 027 */ 028 public FilterFunction() { 029 super("Filter"); 030 RANK_FUNCTIONS.put("avg", new AverageRankFunction()); 031 RANK_FUNCTIONS.put("max", new MaxRankFunction()); 032 RANK_FUNCTIONS.put("min", new MinRankFunction()); 033 RANK_FUNCTIONS.put("last", new LastRankFunction()); 034 RANK_FUNCTIONS.put("delta", new DeltaRankFunction()); 035 RANK_FUNCTIONS.put("simpledelta", new SimpleDeltaRankFunction()); 036 } 037 038 /** 039 * Performs filter operation. The returned <code>TelemetryStreamCollection</code> only contains 040 * telemetry streams that satisfies the specified criteria. 041 * 042 * @param parameters An array of 3 objects. 043 * <ul> 044 * <li>1st element: A <code>TelemetryStreamCollection</code> instance. 045 * <li>2nd element: A <code>String</code> representing ranking function name. 046 * It must be one of "Sum|Max|Min|Last|Delta". 047 * <li>3rd element: A <code>String</code> specifying how to apply cutoff value. 048 * It must be one of "Above|Below|Top|Bottom|TopPercent|BottomPercent". 049 * <li>4th element: A <code>Number</code> instance representing cutoff value. 050 * </ul> 051 * 052 * @return A new <code>TelemetryStreamCollection</code> instance after filtering. Note 053 * that it may contain no telemetry stream at all if no stream satisfies the criteria. 054 * 055 * @throws TelemetryFunctionException If anything is wrong. 056 */ 057 @Override 058 public Object compute(Object[] parameters) throws TelemetryFunctionException { 059 if (parameters.length != 4 || ! (parameters[0] instanceof TelemetryStreamCollection) 060 || ! (parameters[1] instanceof String) || ! (parameters[2] instanceof String) 061 || ! (parameters[3] instanceof Number || parameters[3] instanceof String)) { 062 throw new TelemetryFunctionException("Telemetry function " + this.getName() 063 + " accept 4 parameters: TelemetryStreamCollection, Avg|Max|Min|Last|Delta|SimpleDelta," 064 + " Above|Below|Top|Bottom|TopPercent|BottomPercent, Number."); 065 } 066 067 if (parameters[3] instanceof String) { 068 try { 069 parameters[3] = new Double((String) parameters[3]); 070 } 071 catch (NumberFormatException ex) { 072 throw new TelemetryFunctionException("The 4th parameter of telemetry function " 073 + this.getName() + " is not a valid number.", ex); 074 } 075 } 076 077 TelemetryStreamCollection streams = (TelemetryStreamCollection) parameters[0]; 078 079 String rankFunctionName = (String) parameters[1]; 080 RankFunction rankFunction = RANK_FUNCTIONS.get(rankFunctionName.toLowerCase()); 081 if (rankFunction == null) { 082 throw new TelemetryFunctionException("Rank function '" + rankFunctionName 083 + "' does not exist."); 084 } 085 086 String opMode = (String) parameters[2]; 087 Number cutoff = (Number) parameters[3]; 088 089 try { 090 if ("Above".equalsIgnoreCase(opMode)) { 091 return this.applyAbsoluteCutoff(streams, rankFunction, true, cutoff.doubleValue()); 092 } 093 else if ("Below".equals(opMode)) { 094 return this.applyAbsoluteCutoff(streams, rankFunction, false, cutoff.doubleValue()); 095 } 096 else if ("TopPercent".equalsIgnoreCase(opMode) || "BottomPercent".equalsIgnoreCase(opMode) 097 || "Top".equalsIgnoreCase(opMode) || "Bottom".equalsIgnoreCase(opMode)) { 098 return this.applyRelativeCutoff(streams, rankFunction, opMode, cutoff.intValue()); 099 } 100 else { 101 throw new TelemetryFunctionException("The 3rd parameter of telemetry function " 102 + this.getName() + " must be one of Above|Below|TopPercent|BottomPercent|Top|Bottom."); 103 } 104 } 105 catch (Exception ex) { 106 throw new TelemetryFunctionException(ex); 107 } 108 } 109 110 /** 111 * Applies filter and returns only the telemetry streams meeting the criteria. 112 * 113 * @param streams Telemetry stream collection. 114 * @param rankFunction The rank function. 115 * @param isAbove True if the filter is to return the streams above the cutoff value. 116 * False if the filter is to return the streams below the cutoff value. 117 * @param cutoff The cutoff value. 118 * 119 * @return A new <code>TelemetryStreamCollection</code> instance after filtering. 120 * @throws Exception If there is anything wrong. 121 */ 122 private TelemetryStreamCollection applyAbsoluteCutoff(TelemetryStreamCollection streams, 123 RankFunction rankFunction, boolean isAbove, double cutoff) throws Exception { 124 125 TelemetryStreamCollection target = new TelemetryStreamCollection( 126 streams.getName(), streams.getProject(), streams.getInterval()); 127 for (TelemetryStream stream : streams) { 128 double rank = rankFunction.getRank(stream); 129 if ((isAbove && rank > cutoff) || (! isAbove && rank < cutoff)) { 130 target.add(stream); 131 } 132 } 133 return target; 134 } 135 136 /** 137 * Applies filter and returns the telemetry streams at the top or bottom. 138 * 139 * @param streams Telemetry stream collection. 140 * @param rankFunction The rank function. 141 * @param opMode Operation mode. 142 * @param cutoff The cutoff value. 143 * 144 * @return A new <code>TelemetryStreamCollection</code> instance after filtering. 145 * @throws Exception If there is anything wrong. 146 */ 147 @SuppressWarnings("cast") 148 private TelemetryStreamCollection applyRelativeCutoff(TelemetryStreamCollection streams, 149 RankFunction rankFunction, String opMode, int cutoff) throws Exception { 150 151 TelemetryStreamCollection target = new TelemetryStreamCollection( 152 streams.getName(), streams.getProject(), streams.getInterval()); 153 ArrayList<TelemetryStream> orderedList = this.sort(rankFunction, streams); 154 155 if ("TopPercent".equalsIgnoreCase(opMode)) { 156 if (cutoff < 0 || cutoff > 100) { 157 throw new TelemetryFunctionException("You must supply a cutoff value from 0 to 100 for " + 158 "'TopPercent' operation mode."); 159 } 160 int start = (int) Math.floor( 161 (double) orderedList.size() - (double) orderedList.size() * cutoff / 100); 162 if (start < 0) { 163 start = 0; 164 } 165 for (int i = start; i < orderedList.size(); i++) { 166 target.add(orderedList.get(i)); 167 } 168 } 169 else if ("BottomPercent".equalsIgnoreCase(opMode)) { 170 if (cutoff < 0 || cutoff > 100) { 171 throw new TelemetryFunctionException("You must supply a cutoff value from 0 to 100 for " + 172 "'BottomPercent' operation mode."); 173 } 174 int count = (int) Math.ceil((double) orderedList.size() * cutoff / 100); 175 for (int i = 0; i < count && i < orderedList.size(); i++) { 176 target.add(orderedList.get(i)); 177 } 178 } 179 else if ("Top".equalsIgnoreCase(opMode)) { 180 int start = orderedList.size() - cutoff; 181 if (start < 0) { 182 start = 0; 183 } 184 for (int i = start; i < orderedList.size(); i++) { 185 target.add(orderedList.get(i)); 186 } 187 } 188 else if ("Bottom".equalsIgnoreCase(opMode)) { 189 for (int i = 0; i < cutoff && i < orderedList.size(); i++) { 190 target.add(orderedList.get(i)); 191 } 192 } 193 else { 194 throw new TelemetryFunctionException("Unsupported op mode '" + opMode + "'."); 195 } 196 197 return target; 198 } 199 200 /** 201 * Sorts the telemetry streams according to a rank function. 202 * 203 * @param rankFunction The rank function. 204 * @param streams Telemetry stream collection. 205 * 206 * @return A sorted list of telemetry streams, from the smallest rank value to the largest. 207 * 208 * @throws TelemetryFunctionException If the rank function does not exist. 209 */ 210 private ArrayList<TelemetryStream> sort(RankFunction rankFunction, 211 TelemetryStreamCollection streams) 212 throws TelemetryFunctionException { 213 214 TreeMap<Double, List<TelemetryStream>> map = new TreeMap<Double, List<TelemetryStream>>(); 215 for (TelemetryStream stream : streams.getTelemetryStreams()) { 216 Double rank = new Double(rankFunction.getRank(stream)); 217 List<TelemetryStream> list = map.get(rank); 218 if (list == null) { 219 list = new ArrayList<TelemetryStream>(1); 220 map.put(rank, list); 221 } 222 list.add(stream); 223 } 224 225 ArrayList<TelemetryStream> sortedList = 226 new ArrayList<TelemetryStream>(streams.getTelemetryStreams().size()); 227 for (List<TelemetryStream> list : map.values()) { 228 sortedList.addAll(list); 229 } 230 return sortedList; 231 } 232 233 //===================== Rank Function ============================== 234 235 /** 236 * Rank function. 237 */ 238 interface RankFunction { 239 240 /** 241 * Gets a rank value for a telemetry stream. Note that any implmentation MUST observe the 242 * rules: The function should never return Double.POSITIVE_INFINITY, 243 * Double.NEGATIVE_INFINITY, or Double.NaN. 244 * 245 * @param stream A telemetry stream. 246 * @return A rank value. 247 */ 248 double getRank(TelemetryStream stream); 249 } 250 251 //NOTE: Sum function is bad, it ignores the data points with no value, which makes the sum 252 // of different telemetry streams not comparable if there is a missing point. 253 // There is no easy way around this. Use Average instead. 254 255 // /** 256 // * A rank function using sum as ranking criteria. 257 // * 258 // * @author (Cedric) Qin ZHANG 259 // * @version $Id$ 260 // */ 261 // static class SumRankFunction implements RankFunction { 262 // 263 // /** 264 // * Computes the sum of a telemetry stream. 265 // * @param stream A telemetry stream. 266 // * @return The sum. 267 // */ 268 // public double getRank(TelemetryStream stream) { 269 // double sum = 0; 270 // for (Iterator i = stream.getDataPoints().iterator(); i.hasNext(); ) { 271 // TelemetryDataPoint dp = (TelemetryDataPoint) i.next(); 272 // Number number = dp.getValue(); 273 // if (number != null) { 274 // double numberValue = number.doubleValue(); 275 // if (! Double.isInfinite(numberValue) && ! Double.isNaN(numberValue)) { 276 // //Cannot use infinity, because Positive infinity + negative infinity = NaN 277 // sum += numberValue; 278 // } 279 // } 280 // } 281 // return sum; 282 // } 283 // } 284 285 /** 286 * A rank function using average as ranking criteria. 287 * 288 * @author (Cedric) Qin ZHANG 289 */ 290 static class AverageRankFunction implements RankFunction { 291 292 /** 293 * Computes the average of a telemetry stream. 294 * @param stream A telemetry stream. 295 * @return The average. 296 */ 297 public double getRank(TelemetryStream stream) { 298 int count = 0; 299 double sum = 0; 300 for (TelemetryDataPoint dp : stream.getDataPoints()) { 301 Number number = dp.getValue(); 302 if (number != null) { 303 double numberValue = number.doubleValue(); 304 if (! Double.isInfinite(numberValue) && ! Double.isNaN(numberValue)) { 305 //Cannot use infinity, because Positive infinity + negative infinity = NaN 306 count ++; 307 sum += numberValue; 308 } 309 } 310 } 311 return count == 0 ? 0 : sum / count; 312 } 313 } 314 315 /** 316 * A rank function using max as ranking criteria. 317 * 318 * @author (Cedric) Qin ZHANG 319 */ 320 static class MaxRankFunction implements RankFunction { 321 322 /** 323 * Computes the max value of a telemetry stream. 324 * @param stream A telemetry stream. 325 * @return The max value. 326 */ 327 public double getRank(TelemetryStream stream) { 328 double max = Double.MIN_VALUE; 329 for (TelemetryDataPoint dp : stream.getDataPoints()) { 330 Number number = dp.getValue(); 331 if (number != null) { 332 double numberValue = number.doubleValue(); 333 if (! Double.isInfinite(numberValue) && ! Double.isNaN(numberValue)) { 334 if (max < numberValue) { //NOPMD 335 max = numberValue; 336 } 337 } 338 } 339 } 340 return max; 341 } 342 } 343 344 /** 345 * A rank function using min as ranking criteria. 346 * 347 * @author (Cedric) Qin ZHANG 348 */ 349 static class MinRankFunction implements RankFunction { 350 351 /** 352 * Computes the min value of a telemetry stream. 353 * @param stream A telemetry stream. 354 * @return The min value. 355 */ 356 public double getRank(TelemetryStream stream) { 357 double min = Double.MAX_VALUE; 358 for (TelemetryDataPoint dp : stream.getDataPoints()) { 359 Number number = dp.getValue(); 360 if (number != null) { 361 double numberValue = number.doubleValue(); 362 if (! Double.isInfinite(numberValue) && ! Double.isNaN(numberValue)) { 363 if (min > numberValue) { //NOPMD 364 min = numberValue; 365 } 366 } 367 } 368 } 369 return min; 370 } 371 } 372 373 /** 374 * A rank function using the lastest data point value as ranking criteria. 375 * 376 * @author (Cedric) Qin ZHANG 377 */ 378 static class LastRankFunction implements RankFunction { 379 380 /** 381 * Computes the last value of a telemetry stream. 382 * @param stream A telemetry stream. 383 * @return The last value. 384 */ 385 public double getRank(TelemetryStream stream) { 386 List<TelemetryDataPoint> list = stream.getDataPoints(); 387 for (int i = list.size() - 1; i >= 0; i--) { 388 TelemetryDataPoint dp = list.get(i); 389 Number number = dp.getValue(); 390 if (number != null) { 391 double numberValue = number.doubleValue(); 392 if (! Double.isInfinite(numberValue) && ! Double.isNaN(numberValue)) { 393 return numberValue; 394 } 395 } 396 } 397 return 0; 398 } 399 } 400 401 /** 402 * A rank function using delta as ranking criteria. 403 * 404 * @author (Cedric) Qin ZHANG 405 */ 406 static class DeltaRankFunction implements RankFunction { 407 408 /** 409 * Computes the sum of delta of a telemetry stream. 410 * @param stream A telemetry stream. 411 * @return The delta sum. 412 */ 413 public double getRank(TelemetryStream stream) { 414 double sum = 0; 415 double lastValue = Double.NaN; 416 for (TelemetryDataPoint dp : stream.getDataPoints()) { 417 Number number = dp.getValue(); 418 if (number != null) { 419 double numberValue = number.doubleValue(); 420 if (! Double.isInfinite(numberValue) && ! Double.isNaN(numberValue)) { 421 if (! Double.isNaN(lastValue)) { 422 sum += Math.abs(numberValue - lastValue); 423 } 424 lastValue = numberValue; 425 } 426 } 427 } 428 return sum; 429 } 430 } 431 432 /** 433 * A rank function using the different between the last data point and the first data point 434 * as criteria. Note that this rank value may be negative if the last 435 * 436 * @author (Cedric) Qin ZHANG 437 */ 438 static class SimpleDeltaRankFunction implements RankFunction { 439 440 /** 441 * Computes the sum of delta of a telemetry stream. 442 * @param stream A telemetry stream. 443 * @return The delta sum. 444 */ 445 public double getRank(TelemetryStream stream) { 446 double firstValue = Double.NaN; 447 double lastValue = Double.NaN; 448 for (TelemetryDataPoint dp : stream.getDataPoints()) { 449 Number number = dp.getValue(); 450 if (number != null) { 451 double numberValue = number.doubleValue(); 452 if (! Double.isInfinite(numberValue) && ! Double.isNaN(numberValue)) { 453 if (Double.isNaN(firstValue)) { 454 firstValue = numberValue; 455 } 456 lastValue = numberValue; 457 } 458 } 459 } 460 return (Double.isNaN(firstValue) || Double.isNaN(lastValue)) ? 0 : lastValue - firstValue; 461 } 462 } 463 }