Shark:SQL and Rich Analytics at Scale Paper By:

Shark:SQL and Rich Analytics at Scale Paper By:

Shark:SQL and Rich Analytics at Scale Paper By: Reynold Xin, Josh Rosen, Matei Zaharia, Michael Franklin, Scott Shenker, Ion Stoica Presentaed By :Jacob Komarovski Based on the slides of :Kirti Dighe, Drushti Gawade Introduction Modern data analysis faces a confluence of growing challenges: Data

volumes are expanding dramatically Increases in incidences of faults and stragglers (slow tasks), complicating parallel database design Users still expect to be able to query data at interactive speeds Introduction (cont.) Two major lines of systems have recently been explored : MapReduce

MPP analytic databases MapReduce Offers a fine-grained fault tolerance model suitable for large clusters Fairly general Supports unstructured data and schema-onread. Have high latencies of tens of seconds to hours Have largely been dismissed for interactivespeed queries

MPP analytic database MPP = massively parallel processing Employ a coarser-grained recovery model, where an entire query has to be resubmitted if a machine fails This works well for short queries where a retry is inexpensive, but faces significant challenges in long queries as clusters scale up Lack the rich analytics functions that are easy to implement in MapReduce, such as machine learning and graph algorithms

MPP analytic database (cont.) Possible to implement some of these functions using UDFs (User Defined Functions) These algorithms are often expensive, furthering the need for fault and straggler recovery for long queries Most organizations tend to use other systems alongside MPP databases to perform complex analytics

Conclusion Processing systems will need to support both SQL and complex analytics efficiently, and to provide fine-grained fault recovery across both types of operations aka Shark ! What is Shark? A new data analysis system Built on the top of the RDD and spark Compatible with Apache Hive data, metastores,

and queries(HiveQL, UDFs, etc) Similar speedups of up to 100x Supports low-latency, interactive queries through in-memory computation Supports both SQL and complex analytics such as machine learning Spark Spark is the MapReduce-like cluster computing engine used by Shark Features of Spark

Supports general computation Provides in-memory storage abstractionRDD Engine is optimized for low latency Modifications for Shark Support partial DAG (directed acyclic graph) execution Optimization of joint algorithm

RDD Sparks main abstraction - RDD (Resilient Distributed Datasets) Collection stored in external storage system such as HDFS (Hadoop Distributed File System) or derived data set (created by applying operators to other RDDs) Contains arbitrary data types (Java objects) Benefits of RDDs

Return at the speed of DRAM Use of lineage Speedy recovery RDD In memory In-memory computing is increasingly important in large-scale analytics for two reasons:

Many complex analytics functions, such as machine learning and graph algorithms, are iterative, going over the data multiple times. Thus, the fastest systems deployed for these applications are in-memory. Even traditional SQL warehouse workloads exhibit strong temporal and spatial locality, because more-recent fact table data and small dimension tables are read disproportionately often

RDD In memory (cont.) A study of Facebooks Hive warehouse and Microsofts Bing analytics cluster showed that over 95% of queries in both systems could be served out of memory using just 64 GB/node as a cache, even though each system manages more than 100 PB of total data RDD lineage RDDs restrict the programming interface to coarse-grained

deterministic operators that affect multiple data items at once, such as map, group-by and join, and recover from failures by tracking the lineage of each dataset and recomputing lost data. When a node fails, Shark can recover mid-query by rerunning the deterministic operations used to build lost data partitions on other nodes, similar to MapReduce Fault tolerance guarantees

Shark can tolerate the loss of any set of worker nodes. Recovery is parallelized across the cluster. The deterministic nature of RDDs also enables straggler mitigation

Recovery works even in queries that combine SQL and machine learning UDFs Executing SQL over RDDs Process of executing SQL queries:

Query parsing Logical plan generation Physical plan generation (using Sharks optimizer) Sparks master then executes this graph using standard MapReduce scheduling techniques Sparks Engine extension

Systems like Shark and Hive are frequently used to query fresh data that has not undergone a data loading process The lack of statistics for fresh data, combined with the prevalent use of UDFs, necessitates dynamic approaches to query optimization Partial DAG execution(PDE) A technique that allows dynamic alteration of query

plans based on data statistics collected at run-time Sparks Engine extension (cont.) Partial DAG execution(PDE) Static query optimization Dynamic query optimization Modification of statistics

Example of statistics Partition size record count List of heavy hitters Approximate histogram Join Optimization Partial DAG execution can be used to perform several run-time optimizations for join queries Join Optimization

In shuffle join, both join tables are hashpartitioned by the join key. Each reducer joins corresponding partitions using a local join algorithm, which is chosen by each reducer based on runtime statistics Join Optimization In map join (also known as broadcast join) a

small input table is broadcast to all nodes, where it is joined with each partition of a large table. This approach can result in significant cost savings by avoiding an expensive repartitioning and shuffling phase Join Optimization Map join is only worthwhile if some join inputs are small, so Shark uses partial DAG

execution to select the join strategy at runtime based on its inputs exact sizes Skew handling and degree parallelism Use individual partitions sizes to determine the number of reducers at run-time by coalescing many small partitions into fewer coarse partitions that are used by reduce tasks.

To mitigate skew, fine-grained partitions are assigned to coalesced partitions using a greedy binpacking heuristic that attempts to equalize coalesced partitions sizes Columnar Memory Store Simply catching records as JVM objects is insufficient Shark employs column oriented storage , a partition of columns is one MapReduce

record Benefits: compact representation, cpu efficient compression, cache locality Columnar Memory Store (cont.) A more serious implication, however, is the effect on garbage collection (GC). The JVM garbage collection time correlates linearly with the number of objects in the heap, so it could take minutes to perform a full GC on a large heap.

These unpredictable, expensive garbage collections cause large variability in workers response times Map pruning Map pruning is the process of pruning data partitions based on their natural clustering columns Shark can avoid scanning certain blocks of data if their values fall out of the querys filter range Sharks memory store on each worker piggybacks the data loading process to collect

statistics The collected statistics are sent back to the master program and kept in memory for pruning partitions during query execution. Machine learning support Shark supports machine learning-first class citizen Programming model design to express machine learning algorithm:

1. Language Integration Shark allows queries to return the RDD representing the query plan Callers to Shark can then invoke distributed computation over the query result using the returned RDD Shark allows queries to perform logistic regression over a user database Ex: Data analysis pipeline that performs logistic regression over database.

2. Execution Engine Integration Common abstraction allows machine learning computation and SQl queries to share workers and cached data without the overhead of data movement Enables end to end fault tolerance (thanks to lineage) Implementation How can we improve Query Processing Speed?

Minimize tail latency (the longer events in the system) CPU cost processing of each Examples: Memory-based shuffle Temporary object creation Bytecode compilation of expression evaluation Experiments Evaluation of shark was done using these databases: Pavlo et al. Benchmark: 2.1 TB of data reproducing

Pavlo et al.s comparison of MapReduce vs. analytical DBMSs. TPC-H Dataset: 100 GB and 1 TB datasets generated by the DBGEN program. Real Hive Warehouse: 1.7 TB of sampled Hive warehouse data from an early industrial user of Shark. Machine Learning Dataset: 100 GB synthetic dataset to measure the performance of machine learning algorithms. Methodology and cluster setup

Amazon EC2 with 100m2.4xlarge nodes 8 virtual cores 68 GB of memory 1.6 TB of local storage Pavlo etal. Benchmarks used these 2 tables: 1 GB/node ranking table 20 GB/node uservisits table Selection Query (cluster index) SELECT pageURL, pageRank FROM rankings WHERE pageRank > X;

Aggregation Queries SELECT sourceIP, SUM(adRevenue) FROM uservisits GROUP BY sourceIP; SELECT SUBSTR(sourceIP, 1, 7), SUM(adRevenue) FROM uservisits GROUP BY SUBSTR(sourceIP, 1, 7);

Join Query SELECT INTO Temp sourceIP, AVG(pageRank), SUM(adRevenue) as totalRevenue FROM rankings AS R, uservisits AS UV WHERE R.pageURL = UV.destURL AND UV.visitDate BETWEEN Date(2000-01-15) AND Date(200001-22) GROUP BY UV.sourceIP; Join query runtime from stategies Pavlo Benchmark

optimizers Join chosen by Data Loading To query data in HDFS directly, which means its data ingress rate is at least as fast as Hadoops. Micro-Benchmarks Aggregation performance SELECT [GROUP_BY_COLUMN], COUNT(*)

FROM lineitem GROUP BY [GROUP_BY_COLUMN] Join selection at runtime Fault tolerence Measuring sharks performance in presence of node failures simulate failures and measure query performance before, during and after failure recovery.

Real hive warehouse 1. Query 1 computes summary statistics in 12 dimensions for users of a specific customer on a specific day. 2. Query 2 counts the number of sessions and distinct customer/client combination grouped by countries with filter cates on eight columns. 3. Query 3 counts the number of sessions and distinct users for all but 2 countries. 4. Query 4 computes summary statistics in 7 dimensions grouping

by a column, and showing the top groups sorted in descending order. Machine learning Algorithms Compare performance of shark, running the same work flow in Hive and Hadoop. Workflow consisted of three steps: Selecting the data of interest from the warehouse using SQL Extracting Features Applying Iterative Algorithms

Logistic Regression K-Means Clustering Logistic Regression, pre-iterarion runtime(seconds) K-means Clustering, pre-iteration algorithm Why are Previous MapReduce-Based Systems MapReduce is slower than MPP databases for several Slow?

reasons: expensive data materialization for fault tolerance inferior data layout costlier execution strategies Research shows that: A combination of conceptually simple engineering changes to the engine (e.g. in-memory storage) and more involved architectural changes (e.g. partial DAG execution) can alleviate them. Task scheduling overhead, actually has a dramatic effect on performance, and greatly improves load

balancing if minimized. Other Benefits of the FineGrained Task Model Elasticity: In traditional MPP databases, a distributed query plan is selected once and the system needs to run at that level of parallelism for the whole duration of the query. In a fine-grained task system, however, nodes can appear or go away during a query, and pending work will automatically be spread onto them. This enables the database engine to naturally be

elastic. Other Benefits of the FineGrained Task Model (cont.) Multitenancy: This elasticity enables dynamic resource sharing between users In a traditional MPP database, if an important query arrives while another large query using most of the cluster, there are few options beyond canceling the earlier query. In systems based on fine-grained tasks, one can simply wait a few seconds for the current tasks from

the first query to finish, and start giving the nodes tasks from the second query. Shark - Conclusion Runs fast relational queries in a faulttolerant manner using the fine-grained deterministic task model introduced by MapReduce.

Effective way to scale query processing to ever-larger workloads, and to combine it with rich analytics. Questions? THATS IT!

Recently Viewed Presentations

  • MONDAY 22nd MAY Technology toolkit session 1: Kahoot

    MONDAY 22nd MAY Technology toolkit session 1: Kahoot

    Kahoot is a game-based classroom response system launched in 2013. This platform offers educators an engaging way to test the learning and knowledge of their students. Kahoot uses game-based learning approach to inspire creation and research in students. Its system...
  • Apresentação do PowerPoint

    Apresentação do PowerPoint

    5. Em 1854, o presidente dos EUA, Franklin Pierce, manifestou o desejo de comprar um pedaço das terras ocupadas pela aldeia do cacique Seattle, líder das tribos Suquamish e Duwamish, ocupantes do território onde hoje é o estado americano de...
  • Understanding Value of Places - Lancaster High School

    Understanding Value of Places - Lancaster High School

    Understanding Value of Places ... The 9 in the hundreds place is 10 times greater than the 9 in the tens place Which of the following names the value of the 6s in 6,650? A. 60 and 6 B. 600...
  • Comer, Abnormal Psychology, 8th edition

    Comer, Abnormal Psychology, 8th edition

    Near-total regression leads to self-centered symptoms such as neologisms, loose associations, and delusions of grandeur. Attempts to reestablish control and contact reality may influence development of other psychotic symptoms. During the past few decades, psychological factors are again being considered...
  • The Data & Information Sharing Solution (The Vision)

    The Data & Information Sharing Solution (The Vision)

    Steve Maass Acting Geospatial Data Supporting the Business Lines (What's been documented so far in the DOI DRM) Law Enforcement Recreation Indian Trust Finance Fire Management Best use of resources can be applied through coordination of GIS data via the...
  • Using longitudinal synoptics of water quality along Hyalite

    Using longitudinal synoptics of water quality along Hyalite

    Ewing Lab. Ethan Wologo, MSU. Joe Capella, MSU. Adam Sigler, MSU. Laboratory Analysis. Dr. Jane Klassen, MSU. Dr. Christine Gobrogge, MSU. Jackie Timmer, MBMG
  • HHCApp Auditors Session Objectives How to login Update

    HHCApp Auditors Session Objectives How to login Update

    Save one moment at a time by using the green arrow. Save all moments by using Save All. Data validation rules assist with accurate data collection. Two options for saving and clearing a completed moment. Select the green arrow next...
  • Earth's Layers - Richmond County School System

    Earth's Layers - Richmond County School System

    Earth's Layers. Asthenosphere. Solid part of the upper mantle. Weaker, less rigid (plasticity - like silly putty) Crust moves over the plastic-like asthenosphere