滑动窗口的最大值问题
给出一个序列,要求找出滑动窗口中的最大值,比如:
# 序列: 2, 6, 1, 5, 3, 9, 7, 4
# 窗口大小: 4
[2, 6, 1, 5], 3, 9, 7, 4 => 6
2, [6, 1, 5, 3], 9, 7, 4 => 6
2, 6, [1, 5, 3, 9], 7, 4 => 9
2, 6, 1, [5, 3, 9, 7], 4 => 9
2, 6, 1, 5, [3, 9, 7, 4] => 9
# 期望结果: [6, 6, 9, 9, 9]
并要求算法的时间复杂度为 O(n)
。
稍加观察便能发现滑动窗口其实就是一个队列:窗口每滑动一次,相当于出列一个元素,并入列一个元素。因此这个问题实际上也可以看作是要求设计一个 pop(), push(), max()
均为 O(1) 的队列。
pop()
和 push()
做到 O(1)
很简单,max()
就没那么容易了。随着元素的进队,我们可以记录元素之间的大小关系,维护一个最大值记录,但当队首元素弹出时,已有的大小关系就会被破坏——被弹出的元素可能就是最大值,这样就需要重新开始评估新的最大值。但若我们只从队末弹出呢?这样便不会破坏已记录的剩余元素的最大值。这种只在一端进出的数据结构就是栈。只要在进栈的同时维护一个最大值栈,我们就可以轻松得到一个 pop(), push(), max()
均为 O(1)
的栈。比如令 2, 7, 4
依次进栈,并同时维护一个当前时刻的最大值栈 2, 7, 7
,弹出一个元素的时候也同时弹出最大值栈中的元素,这样我们就可以在 O(1)
的时间内找到一个栈的 max
。
我们知道,使用两个栈可以构造一个队列,即一个栈用于 push
,一个栈用于 pop
,因此我们可以使用两个 max()
为 O(1)
操作的栈来构造一个 max()
为 O(1)
的队列。
这是因为一个滑动窗口中的元素要么全在一个栈中,此种情况下只需 O(1)
的时间便可得到该滑动窗口的最大值;要么一部分在一个栈中,一部分在另一个栈中,而从两个栈中找到各自的最大值只需要 O(1)
的时间,再比较两个部分各自的最大值,便可以得到该滑动窗口的最大值,因此此种情况下也只需要 O(1)
的操作就可以得到该滑动窗口的最大值。
以序列 2, 6, 1, 5, 3, 9, 7, 4
为例,设其滑动窗口的大小为 4,记用于出列的栈为 stack_out
,用于入列的栈为 stack_in
。首先得到第一个滑动窗口,即入列 4 个元素:
stack_out: stack_in:
(5, 6) <- Top
None (1, 6)
(6, 6)
(2, 2) <- Bottom
使用 (value, max)
表示当前要入栈的元素 value
以及当前的最大值 max
。此时只需要读出 stack_in
栈顶元素的最大值即为当前滑动窗口的最大值。
向右滑动一格即表示将 2
出列,将 3
入列:
stack_out: stack_in:
<- Top
(6, 6)
(1, 5)
(5, 5) (3, 3) <- Bottom
此时便得到了第二个滑动窗口。它的元素被分置在两个栈中:有 3 个元素在 stack_out
中、 1
个元素在 stack_in
中。而我们可以用 O(1)
的时间从 stack_out
中找到 3 个元素这个部分中的最大值,同时用 O(1)
的时间从 stack_in
中找到另一部分的最大值。因此将 stack_out
与 stack_in
栈顶的最大值相比较即可得到第二个滑动窗口的最大值。也就是说当一个滑动窗口的元素被分散在两个栈中时,我们需要 O(1) + O(1) + O(1) = O(1)
的时间找到该滑动窗口的最大值。三个 O(1)
依次为:从 stack_out
找到第一部分最大值的时间、从 stack_in
中找到另一部分最大值的时间、比较两个最大值得到最终的最大值的时间。
依次处理下去,便可得到我们想要的结果。
P.S.
如果在实现上有疑惑,不妨看看下面给出的这种队列类型的 Python 代码。在该代码中,入队操作被命名为 append
,而不是 push
,其目的是与 Python 标准库中队列的方法名保持一致。
from typing import List
from collections import namedtuple
Node = namedtuple('Node', ['value', 'max'])
class MaxQueue():
def __init__(self, stack_len: int) -> None:
self.stack_in = []
self.stack_out = []
self.stack_len = stack_len
def pop(self) -> int:
if not self.stack_out:
if not self.stack_in:
raise IndexError('pop from an empty queue')
else:
self._move_in_to_out()
return self.stack_out.pop().value
def append(self, value: int) -> None:
if len(self.stack_in) >= self.stack_len:
if self.stack_out:
raise IndexError('the queue is full')
else:
self._move_in_to_out()
self._push_to_stack(self.stack_in, value)
def max(self) -> int:
if self.stack_in and self.stack_out:
return max(self.stack_in[-1].max, self.stack_out[-1].max)
if self.stack_in:
return self.stack_in[-1].max
if self.stack_out:
return self.stack_out[-1].max
def _move_in_to_out(self) -> None:
while self.stack_in:
self._push_to_stack(self.stack_out,
self.stack_in.pop().value)
def _push_to_stack(self, stack: List[Node], value: int) -> None:
if stack:
stack.append(Node(value, max=max(value, stack[-1].max)))
else:
stack.append(Node(value, max=value))