Tuesday, 18 August 2015

Spark SQL

Standard
  

In this tutorial i am going to explain you how to perform log analysis with Spark SQL.

1) Download Logs for analysis

2) Write Java class


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import scala.Tuple2;

import java.util.List;

/**
 * LogAnalyzerSQL shows how to use SQL syntax with Spark.
 *
 * Example command to run:
 * %  ${YOUR_SPARK_HOME}/bin/spark-submit
 *     --class "LogsAnalyzerSQL"
 *     --master local[4]
 *     target/log-analyzer-1.0.jar
 *     ../../data/apache.accesslog
 */
public class LogAnalyzerSQL {

  public static void main(String[] args) {
    // Initialize the Spark context.
    SparkConf conf = new SparkConf().setAppName("Log Analyzer SQL");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaSQLContext sqlContext = new JavaSQLContext(sc);

    if (args.length == 0) {
      System.out.println("Must specify an access logs file.");
      System.exit(-1);
    }
    String logFile = args[0];
    JavaRDD<ApacheAccessLog> accessLogs = sc.textFile(logFile)
        .map(ApacheAccessLog::parseFromLogLine);

    // Spark SQL can imply a schema for a table if given a Java class with getters and setters.
    JavaSchemaRDD schemaRDD = sqlContext.applySchema(accessLogs, ApacheAccessLog.class);
    schemaRDD.registerTempTable("logs");
    sqlContext.sqlContext().cacheTable("logs");

    // Calculate statistics based on the content size.
    Row contentSizeStats =
        sqlContext.sql("SELECT SUM(contentSize), COUNT(*), MIN(contentSize), MAX(contentSize) FROM logs")
            .collect()
            .get(0);
    System.out.println(String.format("Content Size Avg: %s, Min: %s, Max: %s",
        contentSizeStats.getLong(0) / contentSizeStats.getLong(1),
        contentSizeStats.getLong(2),
        contentSizeStats.getLong(3)));

    // Compute Response Code to Count.
    List<Tuple2<Integer, Long>> responseCodeToCount = sqlContext
        .sql("SELECT responseCode, COUNT(*) FROM logs GROUP BY responseCode LIMIT 100")
        .mapToPair(row -> new Tuple2<>(row.getInt(0), row.getLong(1)))
        .collect();
    System.out.println(String.format("Response code counts: %s", responseCodeToCount));

    // Any IPAddress that has accessed the server more than 10 times.
    List<String> ipAddresses = sqlContext
        .sql("SELECT ipAddress, COUNT(*) AS total FROM logs GROUP BY ipAddress HAVING total > 10 LIMIT 100")
        .map(row -> row.getString(0))
        .collect();
    System.out.println(String.format("IPAddresses > 10 times: %s", ipAddresses));

    // Top Endpoints.
    List<Tuple2<String, Long>> topEndpoints = sqlContext
        .sql("SELECT endpoint, COUNT(*) AS total FROM logs GROUP BY endpoint ORDER BY total DESC LIMIT 10")
        .map(row -> new Tuple2<>(row.getString(0), row.getLong(1)))
        .collect();
    System.out.println(String.format("Top Endpoints: %s", topEndpoints));

    sc.stop();
  }
}





3) You need to use following class for access logs


import java.io.Serializable;
import java.lang.String;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * This class represents an Apache access log line.
 * See http://httpd.apache.org/docs/2.2/logs.html for more details.
 */
public class ApacheAccessLog implements Serializable {
  private static final Logger logger = Logger.getLogger("Access");

  private String ipAddress;
  private String clientIdentd;
  private String userID;
  private String dateTimeString;
  private String method;
  private String endpoint;
  private String protocol;
  private int responseCode;
  private long contentSize;

  private ApacheAccessLog(String ipAddress, String clientIdentd, String userID,
                          String dateTime, String method, String endpoint,
                          String protocol, String responseCode,
                          String contentSize) {
    this.ipAddress = ipAddress;
    this.clientIdentd = clientIdentd;
    this.userID = userID;
    this.dateTimeString = dateTime;
    this.method = method;
    this.endpoint = endpoint;
    this.protocol = protocol;
    this.responseCode = Integer.parseInt(responseCode);
    this.contentSize = Long.parseLong(contentSize);
  }

  public String getIpAddress() {
    return ipAddress;
  }

  public String getClientIdentd() {
    return clientIdentd;
  }

  public String getUserID() {
    return userID;
  }

  public String getDateTimeString() {
    return dateTimeString;
  }

  public String getMethod() {
    return method;
  }

  public String getEndpoint() {
    return endpoint;
  }

  public String getProtocol() {
    return protocol;
  }

  public int getResponseCode() {
    return responseCode;
  }

  public long getContentSize() {
    return contentSize;
  }

  public void setIpAddress(String ipAddress) {
    this.ipAddress = ipAddress;
  }

  public void setClientIdentd(String clientIdentd) {
    this.clientIdentd = clientIdentd;
  }

  public void setUserID(String userID) {
    this.userID = userID;
  }

  public void setDateTimeString(String dateTimeString) {
    this.dateTimeString = dateTimeString;
  }

  public void setMethod(String method) {
    this.method = method;
  }

  public void setEndpoint(String endpoint) {
    this.endpoint = endpoint;
  }

  public void setProtocol(String protocol) {
    this.protocol = protocol;
  }

  public void setResponseCode(int responseCode) {
    this.responseCode = responseCode;
  }

  public void setContentSize(long contentSize) {
    this.contentSize = contentSize;
  }

  // Example Apache log line:
  //   127.0.0.1 - - [21/Jul/2014:9:55:27 -0800] "GET /home.html HTTP/1.1" 200 2048
  private static final String LOG_ENTRY_PATTERN =
      // 1:IP  2:client 3:user 4:date time                   5:method 6:req 7:proto   8:respcode 9:size
      "^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(\\S+) (\\S+) (\\S+)\" (\\d{3}) (\\d+)";
  private static final Pattern PATTERN = Pattern.compile(LOG_ENTRY_PATTERN);

  public static ApacheAccessLog parseFromLogLine(String logline) {
    Matcher m = PATTERN.matcher(logline);
    if (!m.find()) {
      logger.log(Level.ALL, "Cannot parse logline" + logline);
      throw new RuntimeException("Error parsing logline");
    }

    return new ApacheAccessLog(m.group(1), m.group(2), m.group(3), m.group(4),
        m.group(5), m.group(6), m.group(7), m.group(8), m.group(9));
  }

  @Override public String toString() {
    return String.format("%s %s %s [%s] \"%s %s %s\" %s %s",
        ipAddress, clientIdentd, userID, dateTimeString, method, endpoint,
        protocol, responseCode, contentSize);
  }
}


4) As we are using Java 8 need to add plugins and spark sql dependency into pom file

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>

 <dependencies>
    <dependency>
    <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.10</artifactId>
          <version>1.4.1</version>
    </dependency>

 <!-- Spark SQL -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.1.0</version>
    </dependency>


     <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.10</version>
          <scope>test</scope>
     </dependency>
     <dependency>
          <groupId>net.sf.opencsv</groupId>
          <artifactId>opencsv</artifactId>
          <version>2.3</version>
       </dependency>
 </dependencies>
 <!--Need only if using Java 8 -->

   <build>
     <plugins>
       <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.3</version>
            <configuration>
              <source>1.8</source>
              <target>1.8</target>
            </configuration>
  </plugin>
     </plugins>
  </build>
</project>




Now use mvn package command and generate jar file and copy file into Spark example filder also copy logs folder into example folder

Now run following command on apache spark shell

./bin/spark-submit   --class "LogAnalyzerSQL"  --master local[4]    ./examples/simple-project-1.0.jar  ./examples/error_log
 

0 comments:

Post a Comment