Post

async log实现策略

1 muduo

同步日志的话,在一些文件写入很慢的设备上,可能导致出现一些性能问题,你也可能常见到这种同步日志突然出现日志丢了10s多的情况。所以现代的日志库一般都支持async logging的方式

可能业界很多log都用的spdlog,但是muduo的实现确实非常的简单,而且也是支持异步的,基本可以看作是一个工程化能用的最简单logging。这玩意只支持写文件,不像spd那样有很多的sink,但是大多数情况下,我们只需要写文件就够了。

现存的日志库,看到的都是把日志分成两个部分的

  1. 日志前端,提供程序使用的接口,负责生成 + fmt日志消息
  2. 日志后端,负责将日志消息写入到目的地

单线程的环境下,前后端其实无所谓的,就是一个task stream,但是在多线程环境下,因为线程间的隔离,难点其实是如何把日志数据高效的传输到后端。简单的讲就是一个非常经典的生产者消费者模型。

所以,分成两部分看的话,

  1. 日志的前端,要做到低延迟 + 不阻塞 + cpu开销低
  2. 日志的后端,要做到吞吐量足够大,并且不要占用过多的资源

总体来说,日志前端有两种风格API

  1. printf
  2. stream

stream风格比较易用,而且最大的优点是,在输出级别高于日志级别的时候,可以是一个空操作,不会有任何的开销,但是printf比较难做到这一点

总体来讲,muduo的前后端的核心思路是double buffer,前端是BufferVecA, 后端是BufferVecB,前端写满了,swap A和B,让后端将原来BufferA的数据写到文件里去

这样来说的话,锁的区域只用保护swap的过程就可以了,同时,日志消息也能做一个Buffer填满之后的批处理,较少了唤醒线程的次数

1.1 muduo异步日志策略前端

muduo的前端设计了两个buffer,currentBuffer_ + nextBuffer_

并且,有一个存放BufferPtr的vector缓存, buffers_

1
2
3
4
5
6
typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer;
typedef std::vector<std::unique_ptr<Buffer>> BufferVector;
typedef BufferVector::value_type BufferPtr;
BufferPtr currentBuffer_ GUARDED_BY(mutex_);
BufferPtr nextBuffer_ GUARDED_BY(mutex_);
BufferVector buffers_ GUARDED_BY(mutex_);

在往前端写日志的时候,会先往currentBuffer_写,如果currentBuffer_写满了,就会把currentBuffer_放到buffers_里面,然后把nextBuffer_的所有权move给currentBuffer_,然后再往currentBuffer_写

currentBuffer_写满了将其放入buffers_,通知后端线程读。

此时考虑一种特殊情况,前端写的太快了,nextBuffer_在后端线程来不及生成,此时就需要currentBuffer_新开辟内存。注释里当然也看到这个是很偶先的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void AsyncLogging::append(const char* logline, int len)
{
  muduo::MutexLockGuard lock(mutex_);
  if (currentBuffer_->avail() > len)
  {
    currentBuffer_->append(logline, len);
  }
  else
  {
    buffers_.push_back(std::move(currentBuffer_));

    if (nextBuffer_)
    {
      currentBuffer_ = std::move(nextBuffer_);
    }
    else
    {
      currentBuffer_.reset(new Buffer); // Rarely happens
    }
    currentBuffer_->append(logline, len);
    cond_.notify();
  }
}

1.2 muduo异步日志策略后端

后端也有2个BufferPtr,分别为newBuffer1和newBuffer2,还有一个写的BufferPtr的vector缓存, buffersToWrite, 这玩意是单独开在一个线程里面的

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void AsyncLogging::threadFunc()
{
  assert(running_ == true);
  latch_.countDown();
  LogFile output(basename_, rollSize_, false);
  BufferPtr newBuffer1(new Buffer);
  BufferPtr newBuffer2(new Buffer);
  newBuffer1->bzero();
  newBuffer2->bzero();
  BufferVector buffersToWrite;
  buffersToWrite.reserve(16);
  while (running_)
  {
    assert(newBuffer1 && newBuffer1->length() == 0);
    assert(newBuffer2 && newBuffer2->length() == 0);
    assert(buffersToWrite.empty());

    {
      muduo::MutexLockGuard lock(mutex_);
      if (buffers_.empty())  // unusual usage!
      {
        cond_.waitForSeconds(flushInterval_);
      }
      buffers_.push_back(std::move(currentBuffer_));
      currentBuffer_ = std::move(newBuffer1);
      buffersToWrite.swap(buffers_);
      if (!nextBuffer_)
      {
        nextBuffer_ = std::move(newBuffer2);
      }
    }

    assert(!buffersToWrite.empty());

    // buffersToWrite size > 25, drop

    // write to file

    // drop non-bzero-ed buffers, remain 2 buffers

    if (!newBuffer1)
    {
      assert(!buffersToWrite.empty());
      newBuffer1 = std::move(buffersToWrite.back());
      buffersToWrite.pop_back();
      newBuffer1->reset();
    }

    if (!newBuffer2)
    {
      assert(!buffersToWrite.empty());
      newBuffer2 = std::move(buffersToWrite.back());
      buffersToWrite.pop_back();
      newBuffer2->reset();
    }
    // clear buffersToWrite
    // flush
  }
  // flush
}

可以看到这里的临界区其实不大,主要是在交换buffers_和buffersToWrite的时候,在临界区之外,可以安全的访问buffersToWrite。

触发等待条件有两个

  1. 如果buffers_为空,那么就等待flushInterval_时间,然后再继续,这里的flushInterval_是一个很重要的参数,可以控制日志的flush频率,如果flushInterval_设置的太小,那么会导致日志的flush频率过高,影响性能,如果设置的太大,那么会导致日志的flush频率过低,影响日志的实时性。
  2. 其二是前端写满了一个或多个buffer。

临界区里最后干的一件事情是用newBuffer2替换nextBuffer,这样前端始终有一个预备buffer可供调配。nextBuffer可以减少前端临界区分配内存的概率,缩短前端临界区长度。

会buffersToWrite内的buffer重新填充newBuffer1和newBuffer2,这样下一次执行的时候还有两个空闲buffer可用于替换前端的当前缓冲和预备缓冲。

最后,这四个缓冲在程序启动的时候会全部填充为0,这样可以避免程序热身时page fault引发性能不稳定。

1.3 前端过度刷日志怎么办

可以看到threadFunc里有一个逻辑是buffersToWrite size > 25, drop,这个是为了防止前端过度刷日志导致oom,导致后端来不及处理,这里的25是一个经验值,可以根据实际情况调整。

当然,这玩意其实是4缓的方案,cur + next + new1 + new2,辗转腾挪的其实都是这几片内存,他们之间其实都是move的,所以不会有内存的拷贝,只是移动。

如果你需要更大的缓存方案的话,其实可以开一个emptyBuffers的数组,初始化的时候在emptyBuffers里放足够多的空闲buffer,这个值调的够好的话几乎不会遇到在临界区里分配内存的问题,但同样也要注意,emptyBuffers.size() + fullBufer.size()应该小于一个阈值,不然也容易导致性能问题。

This post is licensed under CC BY 4.0 by the author.

Trending Tags