当前位置:Java -> 降低大数据成本:使用Apache Spark进行高效数据处理
在当今以数据驱动的世界中,高效的数据处理在任何项目的成功中起着至关重要的作用。Apache Spark,一个强大的开源数据处理框架,在该领域已经成为一种革新性的技术。在本文中,我们将深入探讨确保数据流水线资源高效、成本有效和时间高效的策略。这份指南对数据初学者和专业人士都具有价值,可以作为优化机会的清单。尽管这里的大多数建议对Spark的使用具有普遍适用性,但我们将指出当使用Scala或Python API或编写Spark SQL时可能会影响应用程序行为的具体差异。
在大多数情况下,被处理的数据是以列格式存储的。当你仅需要从大分区中检索少量行时,这种格式可能并不理想,但在分析使用案例中表现卓越。特别是当你不得不检查大部分输入行,但只对特定的列子集感兴趣时,这种格式特别有益。Spark通过专门读取参与后续计算的列充分利用了这种存储特性。
为了确保效率,审查执行计划并验证最早的项目语句是否仅选择了你将来需要的列非常重要。这种做法避免了Spark读取数百列然后在处理过程中将它们丢弃的不必要负担。
现在我们已经优化了列选择,让我们深入讨论如何高效检索必要的行。正如之前提到的,当你只需要访问大数据集中的少量行时,列式存储并不是最有效的选择,这一点仍然成立。然而,有一些策略可以通过巧妙地组织你的数据来增强这种情况。
想象一下,你正在处理来自多个城市超市的按小时间隔收集的销售数据。在你的数据流水线中,你避免混合来自不同商店和小时的数据,这是一个很好的做法。在流水线的早期阶段,你很可能进行筛选,但你可以进一步防止Spark完全读取多余的数据。通过基于hour
和supermarket_id
对数据进行分区,Spark可以通过对元数据存储的简单检查跳过无关的数据。
然而,要小心分区列的基数。如果可能的话,使你的分区列的基数保持在一位数或两位数,这使它成为非常适合分区的候选列。但是,如果它可能达到数万个,需要考虑制定有效的聚合策略。回到之前的例子,想象一个像沃尔玛这样的超市连锁店,在这种情况下,使用supermarket_id
进行分区可能导致超过10,000个分区。在这种情况下,选择不同的分区方法,例如使用州作为分区可能更有效。例如,在美国,你可以合理地假设基数不会超过50。
为了确保过滤器推送操作按预期运行,审查你的执行计划。在数据集加载的初始行中,你会发现像PushedFilters: [*EqualTo(state, CA)]
这样的信息。这确认了你的过滤优化正在按预期实现。
Shuffle操作是昂贵的,因此要尽量减少它们的影响。在洗牌之前减小数据大小,应用筛选操作,并考虑使用"byKey
"方法,比如reduceByKey
或aggregateByKey
,尽可能避免Shuffle操作。
你可能无法完全摆脱所有的洗牌,所以考虑智能分区 - 它可以节省未来操作中的资源。
在我们继续讨论关于洗牌的问题时,考虑广播你的数据集中的一个数据,如果它足够小的话是至关重要的。将它广播到所有的worker可以消除与大部分"大数据"部分进行联接时昂贵的洗牌需求。然而,在该背景下,理解Spark SQL和Spark数据集API之间的微妙差异非常重要。
在数据集API中,使用广播方法会尝试广播你的变量,但如果它不适合内存,会引发异常。另一方面,在Spark SQL中,/* BROADCAST(dataset) */
的表示作为一个提示。如果广播无法适应内存,Spark会退回到常规联接。在你期望的小广播数据集意外增长的情况下,两种方法之间的结果是不同的。
使用数据集API时,你会注意到Spark中的作业失败,这虽然会破坏稳定性,但清楚地突出了问题。相比之下,Spark SQL中的作业可能会变慢,提供更大的稳定性,但可能会在一段时间内掩盖问题。因此,在决定特定用例的广播策略时,理解这些差异非常重要。
现在,让我们深入讨论数据大小的重要方面,并解决数据倾斜的问题。直观地识别它们的最明显的方法是查看作业处理统计信息。如果你看到中位数和最大时间或输入大小之间存在巨大差异,或者在1000个作业中有998个在几分钟内完成,但有2个余下的作业要花一个多小时那就是一个数据倾斜的很好指标。
下面是一个数据倾斜的示例:最耗时的任务需要惊人的7秒,并读取了116 MB的输入,而中位数任务仅需要处理50毫秒,处理了仅有4 KB的数据。
这是一个“修复”的数据流水线示例 - 现在最大的时间只有2秒和10 MB - 仍然比中位数大得多,但远远没有之前那么大规模:
数据倾斜是指在进行洗牌时,一个组的数据量远大于其他组,这可能导致问题,比如当一个工作节点无法在内存中容纳整个数据块时,会出现内存溢出(OOM)错误。更普遍的情况是数据溢出到磁盘,导致序列化时间延长,并且垃圾回收(GC)花费过多的时间。
为了解决数据倾斜问题,可以考虑使用键盐化等技术。我们不在这里深入讨论细节,键盐化的含义是对有问题的键引入随机组件,并基于修改后的键重新分配数据。这可以有效地减轻由数据倾斜引起的性能瓶颈。
注意:Spark 3.2.0默认启用了自适应查询执行(AQE),这应该有助于处理倾斜现象,但是如果您发现任务统计信息奇怪,请尝试手动盐化。
重要的是要明白,当您与Spark API交互时,创建新数据集并将其保存到变量并没有实际保存任何状态;相反,您正在存储用于获取特定结果所需的计算的有向无环图(DAG)。因此,当您重复使用相同的变量时,Spark将会冗余地多次重新计算相同的数据。
为了解决这种冗余,建议在观察到这种重新计算模式时对您的数据集进行缓存。Spark提供各种缓存选项,允许您将数据集存储在内存中或将其序列化到磁盘。虽然内存中的缓存提供最快的访问速度,但请记住,Spark会监视内存消耗,并且如有必要可以将数据集从内存中清除,因为它们在稍后需要时可以重新计算。
因此,重要的是要把握平衡,避免试图一直将一切都缓存,因为这样做可能不会产生期望的性能改进。相反,明智地将缓存应用于那些频繁重复使用的特定数据集,以优化您的Spark应用的效率。
尽管最初采用“灵活”的模式(如将复杂对象存储为JSON字符串)可能看起来很诱人,但在长期稳定下来的数据方案上,考虑最合适的数据格式至关重要。此时,选择最适合的数据格式变得至关重要。
选择原生数组或映射始终提供最佳性能。这种选择具有诸多优势,包括通过消除重复的反序列化来节省大量资源。此外,它避免了在用户定义函数(UDF)中实例化代价高昂的序列化/反序列化(SerDe)库所带来的开销。
此外,采用原生数组或映射还可以为未来提供额外的好处。随着Spark不断发展并集成各种数据格式,您可能会解锁更多的优化。例如,这可能涉及到读取映射中特定键的下推,提高应用的效率并减少处理开销。通过选择最佳的数据格式,您不仅可以改善当前性能,还可以为未来Spark生态系统中的潜在性能提升做好准备。
我强烈建议在可能的情况下尽量减少使用UDF,特别是如果可以使用原生Spark SQL函数完成相同的功能。重要的是要了解,原生Spark函数虽然在功能上有所限制,但应该是您的首选,而使用UDF应该是最后的选择。
这一建议有两个关键原因:表达和潜在的优化。Spark使用其专有的原生格式来管理内存数据。当调用UDF时,每行数据都必须传递到Java虚拟机(JVM)以执行函数,然后将结果序列化回Spark的原生格式。可以想象,这个过程会带来相当大的计算成本和资源开销。
此外,UDF对于Spark的优化器来说也是一个挑战。这些优化器无法看到UDF的内部工作情况,使它们变得不透明。因此,Spark内置的优化技术无法应用于UDF,从而限制了性能改进的潜力。通过优先考虑使用原生Spark SQL函数,并最大限度地减少对UDF的依赖,您不仅可以减少计算开销,还可以利用Spark的优化能力来增强数据处理工作流的效率。
最后一点,我建议考虑最适合您需求的执行模式:批处理还是流处理。这可能一开始看起来像一个重大转变,确实如此,但Spark API旨在抽象出与这个选择相关的许多复杂性。虽然Spark天生适用于批处理数据集(以至于即使流处理基本上也是微批处理),但通常是默认选择。
然而,如果您的数据处理主要涉及小的聚合,其中大部分处理可以容纳在一行的内存空间中,那么值得探索流处理。此外,如果您需要与之联接的“边”数据集相对较小,流处理可能是一个可行的选择。批处理和流处理之间的决定应考虑用户对数据输出的期望以及数据到达时间的重要性。
本文简要总结了优化Spark应用的不同方式,减少成本和处理时间。虽然这并非是所有可能的问题和缓解措施的详尽清单,但它为您提供了一个很好的起点来寻找问题所在。在开始优化之前,我想给出最后的建议,那就是不要凭直觉,而是对您的应用程序进行基准测试,并查看统计数据和指标,以了解实际问题所在。
推荐阅读: 36.drop、delete和truncate的区别?
本文链接: 降低大数据成本:使用Apache Spark进行高效数据处理