Why The World Needs Flarion. Read More

Apache Spark Resource Configuration

From Theory to Practice
By
Ran Reichman
read time
February 5, 2025

Apache Spark's resource configuration remains one of the most challenging aspects of operating data pipelines at scale. Theoretical best practices are widely available, but production deployments often require adjustments to accommodate real-world constraints. This guide bridges that gap, exploring how to properly size Spark resources—from executors to partitions—while identifying common failure patterns and strategies to address them in production.

The Baseline Configuration

Consider a typical Spark job processing 1TB of data. A standard recommended setup might include:

  • A cluster of 20 nodes, each with 32 cores and 256GB RAM
  • Effective capacity of 28 cores and 240GB RAM per node after system overhead
  • 4 executors per node (80 total executors)
  • 7 cores per executor (with 1 core reserved for overhead)
  • 56GB RAM per executor
  • ~128MB partition sizes for optimal parallelism

While this configuration serves as a solid starting point, production workloads rarely conform to such clean boundaries. Let's examine some common failure patterns and mitigation strategies.When Reality Hits: Failure Patterns and Solutions

Failure Pattern #1: Workload Evolution Requiring Infrastructure Changes

A typical scenario: A job that previously ran efficiently on 20 nodes begins to experience increasing memory pressure or extended runtimes, despite configuration adjustments. Signs of resource constraints include:

  • Consistently high GC time across executors (>15% of executor runtime)
  • Storage fraction frequently dropping below 0.3
  • Executor memory usage consistently above 85%
  • Stage attempts failing despite conservative memory settings

Root cause analysis approach:

  1. Analyze growth patterns in your data volume and complexity.
  2. Profile representative jobs to understand resource bottlenecks.

Key scaling triggers:

  • CPU-bound: When average CPU utilization stays above 80% for most of the job duration.
  • Memory-bound: When GC time exceeds 15% or OOM errors occur despite tuning.
  • I/O-bound: When shuffle spill exceeds 20% of executor memory.

If CPU-bound (high CPU utilization, low wait times):

  • First try increasing cores per executor.
  • If insufficient, add nodes while maintaining a similar cores/node ratio.

If memory-bound (Out Of Memory - OOM):

  • First try reducing executors per node to allocate more memory per executor.
  • If insufficient, add nodes with higher memory configurations.

Failure Pattern #2: Memory Exhaustion In Compute Heavy Operations

A typical scenario: Your job runs fine for many days but then suddenly fails with Out Of Memory (OOM) errors. Investigation reveals that during month-end processing, certain joins produce intermediate results 5-10x larger than your input data. The executor memory gets exhausted trying to handle these large shuffles.A possible solution would be to update the configuration to:

  • spark.executor.memoryOverhead: 25% (increased from default 10%)
  • spark.memory.fraction: 0.75 (decreased from default 0.6)

These settings help because they:- Reserve more memory for off-heap operations (shuffles, network buffers)- Reduce the fraction of memory used for caching, giving more to execution- Allow GC to reclaim memory more aggressively

Failure Pattern #3: Data Skew, The Silent Killer

A typical scenario: Your daily aggregation job suddenly takes 4 hours instead of 1 hour. Investigation shows that 90% of the data is going to 10% of the partitions. Common culprits:- Timestamp-based keys clustering around business hours- Geographic data concentrated in major cities- Business IDs with vastly different activity levelsBefore implementing solutions, quantify your skew:

  1. Monitor partition sizes through the Spark UI
  2. Track duration variation across tasks within the same stage
  3. Look for orders of magnitude differences in partition sizes

A possible solution would be to analyze your key distribution and for known skewed keys, implement pre-processing like so:// For timestamp skewval smoothed_key = concat(date_col, hash(minute_col) % 10)// For business ID skewval salted_key = concat(business_id, hash(row_number) % 5)Using Spark’s built-in skew handling helps, but understanding the specific skew of your data is more robust and lasting. Spark’s skew handling configurations:

  • spark.sql.adaptive.enabled: true
  • spark.sql.adaptive.skewJoin.enabled: true

Failure Pattern #4: Resource Starvation in Mixed Workloads

A typical scenario: A seemingly well-configured job starts showing erratic behavior—some stages complete quickly while others seem stuck, executors appear underutilized despite high load, and the overall job progress becomes unpredictable. This is a typical case of resource starvation occurring within a single application.

  1. Late stages in complex DAGs struggle to get resources
  2. Shuffle operations become bottlenecks
  3. Some executors are overwhelmed while others sit idle
  4. Task attempts timeout and retry repeatedly

The root cause often lies in complex transformation chains: sqlCopydata.join(lookup1).groupBy("key1").agg(...).join(lookup2).groupBy("key2").agg(...)Each transformation creates intermediate results that compete for resources. Without proper management, earlier stages can hog resources, starving later stages.Possible solutions include:

  1. Dividing compute-intensive jobs into smaller jobs that use resources more predictably.
  2. If splitting a large job isn’t possible, using checkpoints and persist methods to better divide a single job into distinct parts. (expect a future blog post on these methods)
  3. Applying Spark Shuffle management - setting spark.dynamicAllocation.shuffleTracking.enabled and spark.shuffle.service.enabled to true.

Conclusions & The Path Forward

We've found that most Spark issues manifest first as performance degradation before becoming outright failures. The goal of a data engineering team isn't to prevent all issues but to catch and address them before they impact production stability. While adding resources can sometimes help, precise optimization and proper monitoring often provide more sustainable solutions. Spark offers a robust set of job management tools and settings, but addressing problems through standard Spark configurations alone often proves insufficient.The Flarion platform transforms this landscape in two key ways: through significant workload acceleration that reduces resource requirements and minimizes garbage collection overhead, and by providing enhanced visibility into Spark deployments. This combination of speed and improved observability enables engineering teams to identify potential issues before they escalate into failures, shifting from reactive troubleshooting to proactive optimization. As a result, data engineering teams experience both reduced failure rates and decreased operational burden, creating a more stable and efficient production environment.

Related Posts

At the recent Open Lakehouse + AI Summit, OpenAI's data platform team gave a detailed account of how they run Spark internally. It's a revealing look at the operational reality of serving over a thousand internal customers across model training, analytics, safety research, and finance.

Their setup is representative of large-scale data platforms. They run both Databricks and a self-hosted "OpenAI Spark" on Kubernetes, unified through a shared Unity Catalog. Users switch between engines by changing a single configuration parameter. This hybrid pattern has become the norm for organizations processing data at serious volume, and OpenAI's experience illuminates why.

The Hybrid Reality

Three forces push enterprises toward running their own Spark alongside managed services. First, data security requirements often mandate that sensitive workloads stay within controlled infrastructure - no amount of compliance certifications fully satisfies some internal security teams. Second, the economics shift at scale: organizations processing petabytes daily often find that self-hosted deployments dramatically reduce costs for predictable, high-volume workloads. Third, operating your own stack means you can debug it. Full source code access and the ability to implement workload-specific optimizations matter when you're troubleshooting production incidents.

Building the Infrastructure Layer

The OpenAI team's account of scaling self-hosted Spark follows a familiar trajectory. Initial deployment is straightforward - Spark on Kubernetes, Airflow integration, jobs start flowing - and then usage grows.

Kubernetes control plane limits surface first - API servers buckling under listing operations from thousands of concurrent jobs. The response is multiple clusters, which immediately creates routing problems. Static routing (annotating jobs with target clusters) proves operationally painful. The solution is a gateway service that handles dynamic routing, access control, quota tracking, and auto-tuning based on historical patterns. This is infrastructure that managed services provide invisibly, and that self-hosted deployments must build explicitly.

Catalog integration across both managed and self-hosted environments requires careful coordination: permission verification, scoped credentials, distribution to executors. These are solved problems, but solving them yourself takes engineering time.

Performance at Petabyte Scale

OpenAI's talk gets more interesting when it turns to optimizations that don't appear in Spark documentation. Their CDC ingestion example is illustrative: at petabyte scale, Spark's default merge operation breaks down because mixed event types require outer joins that can't be broadcast. Their solution - splitting merges into separate operations for updates/deletes versus inserts - is the kind of pattern that emerges only from production experience.

Cloud storage API limits create another class of problems. Transaction-per-second caps become bottlenecks when scanning tables with extensive metadata. The optimizations are straightforward once you know to look: listing only from the last known commit, caching metadata, eliminating redundant status checks.

The most impactful optimization they described involves recognizing what data doesn't need to be read at all. Merge operations that update rows based on key matching don't need to scan target table columns if the CDC payload already contains the necessary data. This column pruning yielded 98% reductions in data scanned for some of their workloads.

The Architectural Ceiling

Even with these optimizations, OpenAI's team acknowledged limitations that configuration tuning can't address. PySpark's architecture creates both performance overhead and debugging complexity. JSON processing remains expensive. These are consequences of Spark's JVM-based architecture, and the industry is responding.

Remote shuffle services decouple shuffle data from executor lifecycles. Native acceleration engines process data in columnar format with SIMD instructions. This is the problem Flarion addresses directly - accelerating Spark workloads natively without requiring pipeline changes, targeting exactly the architectural constraints OpenAI describes. Organizations facing similar ceilings can evaluate whether native acceleration closes the gap before committing to the engineering investment of building their own optimization layer.

OpenAI's scale is unique, but its challenges are common. Hybrid deployments, control plane scaling, storage API limits, the performance ceiling of JVM-based processing - these are what enterprises running Spark at scale consistently encounter. Their solutions represent current best practice. The question for most organizations is when they'll face these problems, and whether they'll be ready.

Modern distributed processing engines achieve remarkable scale and reliability, yet they share a fundamental architectural constraint: rigid storage format requirements. This constraint forces organizations to accept significant performance penalties, sometimes 10-100x slower queries than technically possible, simply because their processing engine cannot adapt to optimized storage formats.

Why Parquet Falls Short for Selective Analytics

Parquet excels as a universal columnar format, but its design prioritizes compatibility and compression over query performance. Understanding its limitations requires examining how analytical queries actually execute.

The Row Group Problem

Parquet organizes data into row groups, typically 100,000 to 1 million rows each. This coarse granularity creates a fundamental problem: if even a single row in a row group matches your query filter, you must read ALL requested columns for the ENTIRE row group.

Consider a query filtering for a specific customer ID in a billion-row table. That customer's 100 transactions might be scattered across 100 different row groups. Parquet must read 100 row groups × 100,000 rows × all requested columns, processing 10 million rows to return 100.

Single-Phase Execution

Parquet readers process queries in a single phase:

1. Read all requested columns for relevant row groups

2. Decompress the data

3. Apply filters

4. Return matching rows

This means reading and decompressing massive amounts of data that will immediately be discarded. There's no mechanism to filter first, then read only what's needed.

Limited Predicate Pushdown

While Parquet stores min/max statistics per row group and optional Bloom filters, these only help skip entire row groups. For analytical queries where some matching rows exist in many row groups, this provides minimal benefit. You still read entire 100,000-row chunks to extract perhaps 10 rows from each.

How Query-Optimized Formats Solve These Problems

Formats like ClickHouse's MergeTree take a fundamentally different approach:

Granular Storage

Instead of 100,000-row groups, data is organized into 8,192-row granules. This 12x finer granularity means reading much less unnecessary data when matches are sparse. Finding those same 100 customer transactions requires reading ~12x less data just from granularity alone.

Two-Phase Execution with PREWHERE

Query-optimized formats implement two-phase execution:

1. Phase 1: Read ONLY filter columns for candidate granules

2. Phase 2: For rows that pass filters, read the remaining requested columns

This seemingly simple change has a profound impact. Instead of reading 20 columns for millions of rows, you read 1-2 filter columns first, identify the 100 matching rows, then read the other 18 columns for just those 100 rows.

Sparse Indexing

A sparse primary index stores one entry per granule (every 8,192 rows), creating a tiny index that fits entirely in memory even for billion-row tables. Binary search on this index instantly identifies which granules to read, eliminating 99%+ of data before any I/O occurs.

Lazy Materialization

Column reads are deferred until absolutely necessary. If a query has multiple filters, the engine applies them progressively, reading additional columns only for rows that survive each filter. This minimizes decompression work and memory bandwidth usage.

The Performance Gap: A First-Principles Calculation

Let's calculate the actual performance difference using a realistic analytical query on data stored in S3:

Scenario:

  • 100 million rows, 100 columns
  • Query with 0.01% selectivity returning 10,000 rows
  • Projects 20 columns
  • All data in S3 (network I/O is the same for both formats)

Parquet Execution:

  • Data organized in 100,000-row groups
  • 10,000 matching rows spread across ~100 row groups
  • Must read all 20 requested columns for entire row groups containing any match
  • Compressed data transferred from S3: 160 MB (with 10:1 compression)
  • Data decompressed and processed: 1.6 GB

Query-Optimized Format (like ClickHouse MergeTree):

  • Data organized in 8,192-row granules with sparse index
  • Two-phase execution: read filter columns first, identify matches, then read other columns
  • Compressed data transferred from S3: 46 MB (with 3.5:1 compression)
  • Data decompressed and processed: 162 MB

Despite storing data 2.9x larger on S3 due to less aggressive compression, the query-optimized format transfers 3.5x less data over the network and processes 10x less decompressed data. Processing 1.6 GB vs 162 MB of decompressed data is the difference between 100ms and 10ms query time

Why Processing Engines Don't Support Alternative Formats

While Spark and Ray technically could support arbitrary storage formats through extension APIs, in practice they don't. The ecosystem has converged on Parquet, ORC, and Avro, leaving significant performance opportunities unexplored.

The Spark Reality

Spark provides a DataSource V2 API for custom formats, yet virtually no production deployments use them. The practical barriers:

  • Format implementations must handle Spark's complex internal row representation
  • Custom formats cannot leverage Spark's vectorized execution optimizations
  • The Catalyst optimizer cannot reason about custom format capabilities
  • Maintaining custom format readers requires deep Spark internals knowledge

Ray's Format Limitations

Ray delegates data loading to Pandas or PyArrow:

python

@ray.remote

def process_partition(file_path):

    # Limited to formats Pandas supports

    df = pd.read_parquet(file_path)

    return df.groupby('device_id').mean()

Adding optimized formats would require writing C++ extensions, ensuring serialization compatibility, and maintaining format-specific optimizations Ray doesn't understand.

The Architectural Impedance Mismatch

Even when custom format support exists, fundamental architectural assumptions prevent leveraging format-specific optimizations. Standard engines read all requested columns before filtering and lack the execution primitives for two-phase execution. The granularity mismatch between coarse row groups and fine granules cannot be bridged through format plugins.

Real-World Impact on S3-Based Data Lakes

Production systems using S3 data lakes demonstrate concrete impact:

E-commerce Recommendation Engine

  • Dataset in S3: 500M products × 800 attributes
  • Parquet in Spark: 8.2 seconds average latency
  • Possible with optimized format: 100ms
  • Impact: Real-time recommendations impossible

Financial Risk Analytics

  • Dataset in S3: 10 years of trades, 2,000 columns
  • Parquet in Spark: 45 seconds per portfolio
  • Possible with optimized format: 500ms
  • Impact: Risk managers wait minutes for updates that could take seconds

IoT Monitoring

  • Dataset in S3: 100,000 devices × 1,000 metrics
  • Parquet in Ray: 12 seconds per device group
  • Possible with optimized format: 50ms
  • Impact: Alert latency prevents real-time response

The Compression-Performance Trade-off

Parquet achieves superior compression, typically 2-3x better than query-optimized formats. A 100GB dataset might compress to 10GB in Parquet versus 30GB in an optimized format, translating to $2.30/month versus $6.90/month on S3.

However, compute costs dominate this equation. When queries run 10-100x faster with optimized formats, the required infrastructure shrinks proportionally. A workload requiring a 16-node Spark cluster at $8,000/month might run on 2 nodes at $1,000/month with format optimization. The $7,000/month compute savings overwhelms the $4.60/month additional S3 storage cost by a factor of 1,500x.

Performance Patterns by Query Type

Different query patterns show varying sensitivity to format selection:

Highly Selective Queries (<0.1% of rows returned)

  • Performance difference: 20-100x
  • Parquet's coarse row groups create massive read amplification

Wide Table Projections (many columns, few rows)

  • Performance difference: 10-50x
  • Single-phase execution forces reading all columns upfront

Aggregation Queries

  • Performance difference: 5-20x
  • Processing unnecessary rows dominates computation

Full Table Scans

  • Performance difference: 0.7-1.4x (Parquet often faster)
  • Parquet's superior compression provides advantage when reading everything

The Hidden Costs of Format Lock-in

Beyond direct performance impact, format rigidity creates cascading inefficiencies:

Infrastructure Over-provisioning: Organizations scale clusters to compensate for format inefficiency. A workload naturally requiring 2 nodes might run on 20 nodes to achieve acceptable latency.

Architectural Workarounds: Teams implement pre-aggregation pipelines, materialized views, and caching layers, each adding operational complexity without addressing the root cause.

Opportunity Costs: When queries take 30 seconds instead of 300ms, entire categories of applications become infeasible.

The Path Forward

Parquet remains excellent for data interchange, archival storage, and full-scan workloads. The opportunity lies in enabling processing engines to leverage format diversity based on workload requirements.

The ideal architecture would support understanding the profile of the workloads, adjusting compaction accordingly, and using the best data format for the job. Current processing engines cannot implement this strategy due to their format rigidity. They lack the flexibility to transparently read different formats while applying format-specific optimizations.

This limitation is beginning to change. Next-generation execution engines recognize that format flexibility is essential for modern analytical workloads. By decoupling query execution from storage assumptions, systems like Flarion can deliver the performance that has always been technically possible but practically unreachable.

Organizations no longer need to accept 10x performance penalties as the price of distributed processing. The ability to match storage formats to query patterns represents the difference between queries that frustrate users and analytics that drive real-time decisions.

Oops! Something went wrong while submitting the form.