Postgresql 使用 wal 日志保存每一次的数据修改,这样保证了数据库即使意外宕机,也能利用它准确的恢复数据。wal 日志也叫做 xlog,在 9.4 版本之后作了重大更新,本篇只讲解最新版的格式。wal 日志被用于多个方面,比如修改数据,修改索引等,每种用途的格式都不相同,但是构建方式是相同的。

WAL日志文件

WAL段文件

WAL日志文件存放在sd_wal目录下,每个文件大小默认为16M:

1
2
3
4
5
6
7
-rw-------  1 zhangze  zhangze  16777216 Oct  8 10:57 0000000100000000000000B6
-rw------- 1 zhangze zhangze 16777216 Oct 8 10:57 0000000100000000000000B7
-rw------- 1 zhangze zhangze 16777216 Oct 8 10:57 0000000100000000000000B8
-rw------- 1 zhangze zhangze 16777216 Oct 8 10:57 0000000100000000000000B9
-rw------- 1 zhangze zhangze 16777216 Oct 8 10:57 0000000100000000000000BA
-rw------- 1 zhangze zhangze 16777216 Oct 8 10:57 0000000100000000000000BB
drwx------ 2 zhangze zhangze 68 Oct 8 10:53 archive_status

文件名由16进制的24个字符组成,每8个字符为一组,每组意义如下:

1
2
00000001 00000000 000000B6
时间线 LogID LogSeg
  • 时间线:时间线ID,取值范围为 0x00000000 -> 0xFFFFFFFF。数据库建好后的第一个WAL日志文件的时间线ID从1开始

  • LogID:逻辑文件ID,取值范围为 0x00000000 -> 0xFFFFFFFF

  • LogSeg:物理文件ID,取值范围为 0x00000000 -> 0x000000FF。数据库建好后的第一个WAL日志文件的LogSeg从1开始,达到最大值(0xFF)后从0开始。

LSN即日志序列号,表示XLog记录在事务日志文件中的偏移,为uint64值。LSN由三部分组成,分别是逻辑文件ID,物理文件ID和文件内偏移量。LSN打印出来是两个8位的十六进制数,如16/B374D848。由专门的类型pg_lsn来存放LSN数据

PG WAL文件名字的命名方法是在XLogFileName宏里定义的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#define XLogSegmentsPerXLogId(wal_segsz_bytes)  \
(UINT64CONST(0x100000000) / (wal_segsz_bytes))

#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes) \
snprintf(fname, MAXFNAMELEN, "%08X%08X%08X", tli, \
(uint32) ((logSegNo) / XLogSegmentsPerXLogId(wal_segsz_bytes)), \
(uint32) ((logSegNo) % XLogSegmentsPerXLogId(wal_segsz_bytes)))

#define XLogFileNameById(fname, tli, log, seg) \
snprintf(fname, MAXFNAMELEN, "%08X%08X%08X", tli, log, seg)

#define IsXLogFileName(fname) \
(strlen(fname) == XLOG_FNAME_LEN && \
strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN)

WAL文件内部结构

每个WAL段文件由多个8kb大小的page组成,每个Page中存放着PageHeader信息,以及多条WAL Record

Page结构

每个page的组织方式如下图:

WAL的page结构

  • PageHeader:在wal page的组成中有两种pageheader结构,XLogPageHeaderDataXLogLongPageHeaderData。每个WAL段的第一个Page的Header应为LongHeader

  • Remain data:存储着上一个page中最后一个Record没有存完的数据,大小为xlp_rem_len,对应page的不完整Record

  • Record:存储具体的WAL Record

  • 无数据区域:一个WAL Record的头部信息不允许跨页,如果剩余空间不够存储头部信息,则舍弃这部分空间

Record结构

每个WAL Record的结构如下图,绿色部分为数据描述结构,黄色部分是实际保存的数据

WAL_Record

  • XLogRecord:一个WAL记录的入口,解析WAL时,从这个结构体入手

  • Block:第一个虚线框称为一个BLOCK,用以描述Buffer相关的数据结构。通过XLogRegisterBuffer()函数注册到wal记录中

    • XLogRecordBlockHeader:一个BLOCK的头部信息
    • XLogRecordBlockImageHeader:如果该WAL是fpw记录,该结构存放fpw相关信息
    • XLogRecordBlockCompressHeader:记录hole的大小
      • hole:数据文件的page中,可能会有一块空白区域,即pointer和tuple之间的区域,称为hole
    • RelFilenode:此结构记录了此block所属的关系
    • BlockNumber:此block记录的page的块号
  • XLogRecordDataHeader(Long/short):当main data的大小大于255时,使用Long Header

  • buffer data:第二个虚线框部分,包括page data和tuple data

    • page data:由XLogRegisterBuffer()函数注册到wal记录,存放buffer page信息
    • tuple data:由XLogRegisterBufData()函数注册到wal记录,存储了实际的buff数据和变更数据。
  • main data:保存非buffer性的数据,由XLogRegisterData()函数到WAL记录,例如特殊结构体,旧元组或key

WAL日志写入实现

当数据库数据发生变更时:

  • change发生时:先要将变更后内容计入wal buffer中,再将变更后的数据写入data buffer;

  • commit发生时:wal buffer中数据刷新到磁盘;

  • checkpoint发生时:将所有data buffer刷新的磁盘。

WAL日志机制就是先将变更内容存放到wal buffer,commit后将wal buffer刷入磁盘的过程。过程中主要的函数如下:

1
2
3
4
5
6
7
8
9
XLogBeginInsert();        // 表示开始构建 xlog
XLogRegisterData(); // 将WAL记录的特殊结构体数据注册到WAL,比如heap_insert中的xl_heap_insert结构体
XLogRegisterBuffer(); // 将涉及到的buf注册到wal记录,比如heap_insert中page页赋予regbuf->page
XLogRegisterBufData(); // 将元组内容注册到WAL记录,比如insert语句的元组数据等
XLogSetRecordFlags();
XLogInsert();
XLogRecordAssemble();
XLogInsertRecord(); // 根据当前的数据库状态,把上述函数注册的数据进行筛选组装,最终形成完整的wal记录并写入到walbuff
PageSetLSN

WAL实现过程

整页写入(Full_Write_Page)

如果数据库系统在写入脏页的过程中出现故障,会导致磁盘上的页面数据损坏,而XLOG是无法在损坏的页面上重放的,需要整页写入来恢复。

如果启用整页写入,PostgreSQL会在每个检查点后,每个页面第一次变更发生前,将整个页面以及Header信息作为一条XLog写入,这个功能默认开启。在数据库恢复过程中,如果检查到一条XLog是一个用来整页写入的备份区块,会使用另一条重放规则:XLog会直接覆盖当前页面,无视页面和XLog记录中的LSN,然后将页面的LSN更新为XLog记录的LSN

具体数据结构

XLog Page

XLogPageHeaderData

XLog日志分为很多逻辑段文件,每个段文件分成许多个页面,每个页面的大小为一个块的大小。每个日志页面都有一个头部信息:

1
2
3
4
5
6
7
8
9
10
11
typedef struct XLogPageHeaderData
{
uint16 xlp_magic; // 校验位,检验WAL的版本信息
uint16 xlp_info; // 标记位
TimeLineID xlp_tli; // 页面第一条记录的时间序列
XLogRecPtr xlp_pageaddr; // XLog页面的地址

// 当前页面没有足够空间用于记录时,继续在下一页记录
// 记录了前一页的剩余字节数,包括备份块数据,即该记录在本页继续存储占用的空间大小
uint32 xlp_rem_len;
} XLogPageHeaderData;
  • 其中,标记位xlp_info只使用最低两位,0表明该页的第一个XLog记录接着上一页的最后一个XLog记录,1表示该页是该XLog文件的首页

XLogLongPageHeaderData

如果页面是该日志文件的首页,那么在原头部信息的基础上会使用一个长的头部信息

1
2
3
4
5
6
7
typedef struct XLogLongPageHeaderData
{
XLogPageHeaderData std; // 标准的头部信息,即 XLogPageHeaderData
uint64 xlp_sysid; // pg_control中的系统标识符
uint32 xlp_seg_size; // 校验位,段的大小
uint32 xlp_XLog_blcksz; // 校验位,块的大小
} XLogLongPageHeaderData;
  • 对于长的XLog日志记录,允许将没有足够空间存储的数据存储到下一个页面,但不允许Record头部信息被分开存储到两个不同页面。如果剩余空间已经不足以存储一个头部信息,那么剩余空间将被舍弃,将这个XLog记录存储到新的下一个页面中

XLog Record

XLogRecord

结构XLogRecord记录了XLog的相关控制信息,一个XLog记录最多可以附3个备份块, 每个块对应一个磁盘大小的数据,长度为8kb

1
2
3
4
5
6
7
8
9
10
typedef struct XLogRecord
{
uint32 xl_tot_len; // 整条记录的长度
TransactionId xl_xid; // 事务ID
XLogRecPtr xl_prev; // 指向日志中的前一个记录
uint8 xl_info; // 信息标记位
RmgrId xl_rmid; // 资源管理器

pg_crc32c xl_crc; // 本记录的CRC校验码
} XLogRecord;
  • 其中,资源管理器号主要用于日志系统中,数据库系统需要将记录的日志数据分类,为它们分配对应的资源管理器号。读取日志记录时,结合资源管理器号和信息标志位,能够直到数据库对源数据做的是哪种操作,从而迅速正确的调用对应的函数。共有16种资源

  • 信息标志位的高4位由资源管理器使用,标识该日志是哪种类型的日志。低4位表示对应的块是否需要备份

XLogRecordBlockHeader

存放block的相关信息

1
2
3
4
5
6
7
8
9
typedef struct XLogRecordBlockHeader
{
uint8 id; // 块引用ID
uint8 fork_flags; // 在关系中使用的fork和flags
uint16 data_length; // payload字节大小

//如果设置了 BKPBLOCK_HAS_IMAGE,后续为XLogRecordBlockImageHeader结构体,否则为RelFileNode
//之后为BlockNumber
} XLogRecordBlockHeader;
XLogRecordBlockImageHeader

存放整页写入的相关信息

1
2
3
4
5
6
typedef struct XLogRecordBlockImageHeader
{
uint16 length; // page大小
uint16 hole_offset; // hole之前的长度
uint8 bimg_info; // 标记位,是否压缩
} XLogRecordBlockImageHeader;
XLogRecordBlockCompressHeader

存放page中的hole大小

1
2
3
4
typedef struct XLogRecordBlockCompressHeader
{
uint16 hole_length; // hole大小
} XLogRecordBlockCompressHeader;

XLog Data

XLogRecordDataHeader

WAL Record的数据部分的header信息

1
2
3
4
5
6
7
8
9
10
typedef struct XLogRecordDataHeaderShort
{
uint8 id;
uint8 data_length;
}XLogRecordDataHeaderShort;

typedef struct XLogRecordDataHeaderLong
{
uint8 id;
}XLogRecordDataHeaderLong;
XLogRecData

XLog日志记录中的数据信息存储在结构XLogRecData

1
2
3
4
5
6
typedef struct XLogRecData
{
struct XLogRecData *next; // 下一个节点
char *data; // 数据
uint32 len; // 数据长度
} XLogRecData;

XLog控制结构

XLogCtlData

在共享内存中用结构XLogCtlData保存XLog信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
typedef struct XLogCtlData
{
XLogCtlInsert Insert; // 插入一条日志后,最新的相关信息

// 以下受info_lck保护
XLogwrtRqst LogwrtRqst; // 将日志写入和同步的位置
XLogRecPtr RedoRecPtr; // 最近的 Insert->RedoRecPtr 副本
FullTransactionId ckptFullXid; // 最新检查点的nextXID
XLogRecPtr asyncXactLSN; // 最新异步提交/中断的LSN
XLogRecPtr replicationSlotMinLSN; // 所有缓冲区所需的最老的LSN

XLogSegNo lastRemovedSegNo; // 最后的删除/回收的XLog段

// 用于不需要记录日志的关系的假的LSN
XLogRecPtr unloggedLSN;
slock_t ulsn_lck;

// 切换后最新的XLog的时间和LSN,受XLogWriteLock保护
pg_time_t lastSegSwitchTime;
XLogRecPtr lastSegSwitchLSN;

// 已经写入和同步的位置,受info_lck或XLogWriteLock保护
XLogwrtResult LogwrtResult;

// 缓存中的最后初始化页面,最后一个字节位置+1
XLogRecPtr InitializedUpTo;

// 这些值在启动后不会修改,尽管指向的页面和xlblocks通常会改变
// xlblocks受 XLogBufMappingLock 保护
char *pages; // 未写入XLog页面的缓冲区
XLogRecPtr *xlblocks; // 缓冲区内容对应的XLog文件的内部指针
int XLogCacheBlck; // XLog最大缓冲区的下标

// ThisTimeLineID 的共享副本,在完成恢复后不要修改
TimeLineID ThisTimeLineID;
TimeLineID PrevTimeLineID;

// 标记我们是否处于崩溃或恢复状态,受info_lock保护
RecoveryState SharedRecoveryState;
bool SharedHotStandbyActive;

// 指示XLog写入是否处于节能模式,受info_lock保护
bool XLogWriterSleeping;

// 等待唤醒,如果出现出发文件,则唤醒启动进程以继续执行XLog重放
Latch recoveryWakeupLatch;

// 在恢复期间保留最后检查点记录的副本,受info_lck保护
XLogRecPtr lastCheckPointRecPtr;
XLogRecPtr lastCheckPointEndPtr;
CheckPoint lastCheckPoint;

XLogRecPtr lastReplayedEndRecPtr; // 指向成功重放的最后一条记录的结尾+1
TimeLineID lastReplayedTLI;
XLogRecPtr replayEndRecPtr; // 如果正处于redo函数回放记录期间,则指向正在恢复记录的结尾+1
// 否则等于lastReplayedEndRecPtr
TimeLineID replayEndTLI;
TimestampTz recoveryLastXTime; // 重放(正在重放)的最后一个COMMIT/ABORT记录的时间戳

// 开始重放当前XLog数据块的时间戳
TimestampTz currentChunkStartTime;

bool recoveryPause; // 是否暂停恢复

// 指向最后重放的XLog_FPW_CHANGE记录的起始点
// 用于禁用full_page_writes
XLogRecPtr lastFpwDisableRecPtr;

slock_t info_lck; // 共享锁
} XLogCtlData;
Register_buffer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct
{
bool in_use; // is this slot in use?
uint8 flags; // REGBUF_* flags
RelFileNode rnode; // 指定所属表的存储目录
ForkNumber forkno; // 哪种文件类型
BlockNumber block; // 块编号
Page page; // 对应的原始数据页

uint32 rdata_len; // 私有数据链表的长度总和
XLogRecData *rdata_head; // 私有数据链表头部节点
XLogRecData *rdata_tail; // 私有数据链表尾部节点

XLogRecData bkp_rdatas[2]; // 存储着压缩后或忽略空闲数据的数据,如果有空闲位置且没有压缩,那么数据会被分成两个部分,存储在两个数组元素里

char compressed_page[PGLZ_MAX_BLCKSZ]; // 如果开启了压缩,那么存储着压缩后的数据
} registered_buffer;

重要全局变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static XLogRecData *mainrdata_head;
static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;

/*使用XLogRegisterBuffer注册的数据存储到registered_buffers数组里*/
static registered_buffer *registered_buffers;

/*
* 使用XLogRegisterBufData注册的数据存储到rdatas数组里,并链接为链表,使用registered_buffer结构里的rdata_head和
* rdata_tail作为链表的首尾。
*
* 使用XLogRegisterData注册的数据存储到rdatas数组里,并使用mainrdata_head和mainrdata_lastata注册的数据存储到rdatas数组里,
* 并链接为链表,使用registered_buffer结构里的rdata_head和rdata_tail作为链表的首尾。
*/
static XLogRecData *rdatas;

具体函数代码

XLogBeginInsert

函数主要作用是检验调用环境是否正确,判断当前是否可以执行xlog插入,并设置开始构造WAL记录的标记,标志wal插入开始。

1
2
3
4
5
6
7
8
9
10
11
Assert(max_registered_block_id == 0);
Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
Assert(mainrdata_len == 0);

if (!XLogInsertAllowed())
elog(ERROR, "cannot make new WAL entries during recovery");

if (begininsert_called)
elog(ERROR, "XLogBeginInsert was already called");

begininsert_called = true;

XLogRegisterData

  1. 将本条wal记录的特殊结构体数据注册到wal记录,比如XLOG_HEAP_INSERT子类型的xl_heap_insert结构体。

  2. 将一些旧元组数据注册到wal记录,比如执行update语句的旧元组数据、delete语句的旧元组数据。

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 检查是否已经开始构造WAL
Assert(begininsert_called);

// 检查数据量是否超过限制值
if (num_rdatas >= max_rdatas)
elog(ERROR, "too much WAL data");
rdata = &rdatas[num_rdatas++];

rdata->data = data;
rdata->len = len;

// 使用 mainrdata_last 指针跟踪链条的结束点,在这里不需要清除next变量
mainrdata_last->next = rdata;
mainrdata_last = rdata;

mainrdata_len += len;

XLogRegisterBuffer

将涉及到的buff注册到wal记录,比如insert语句的目标buff、update语句的目标buff和源buff

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 找到registered_buffer数组中第一个空的的位置
if (block_id >= max_registered_block_id)
{
if (block_id >= max_registered_buffers)
elog(ERROR, "too many registered buffers");
max_registered_block_id = block_id + 1;
}

// 将这个buffer的数据填充
regbuf = &registered_buffers[block_id];

BufferGetTag(buffer, &regbuf->rnode, &regbuf->forkno, &regbuf->block);
regbuf->page = BufferGetPage(buffer);
regbuf->flags = flags;
regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
regbuf->rdata_len = 0;

// 将缓冲区标记为已使用
regbuf->in_use = true;

XLogRegisterBufData

函数主要作用是将元组内容注册到WAL记录中。需要参数block id,这个id必须是已经通过XLogRegisterBuffer注册的block

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 读取已经注册的缓冲区结构
regbuf = &registered_buffers[block_id];
if (!regbuf->in_use)
elog(ERROR, "no block with id %d registered with WAL insertion",
block_id);

// 读取buffer数据
if (num_rdatas >= max_rdatas)
elog(ERROR, "too much WAL data");
rdata = &rdatas[num_rdatas++];

rdata->data = data;
rdata->len = len;

regbuf->rdata_tail->next = rdata;
regbuf->rdata_tail = rdata;
regbuf->rdata_len += len;

XLogInsert

插入WAL的操作由函数XLogInsert完成,根据Rdata链表和相应的资源管理器info向WAL日志文件中插入一条WAL记录。事务执行插入,删除,更新,提交,终止或回滚命令时都需要调用此函数

  • 判断调用时是否设置了rmgr标记位:

1
2
3
4
if ((info & ~(XLR_RMGR_INFO_MASK |
XLR_SPECIAL_REL_UPDATE |
XLR_CHECK_CONSISTENCY)) != 0)
elog(PANIC, "invalid xlog info mask %02X", info);
  • 如果处于bootstrap模式,除了XLog资源外,不需要实际记录内容,指向第一个检查点的指针

1
2
3
4
5
6
if (IsBootstrapProcessingMode() && rmid != RM_XLog_ID)
{
XLogResetInsertion();
EndPos = SizeOfXLogLongPHD;
return EndPos;
}
  • 组合成完整的WAL记录并写入WAL日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
do
{
XLogRecPtr RedoRecPtr;
bool doPageWrites;
XLogRecPtr fpw_lsn;
XLogRecData *rdt;

GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);

// 调用函数组装注册的数据
rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
&fpw_lsn);

// 将组装好的数据写入到WAL内存中
EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
} while (EndPos == InvalidXLogRecPtr);

XLogRecordAssemble

函数用于将已注册的数据和缓冲区页面数据组装成一条WAL记录,将其写入到XLogRecData链表中。

执行到这个阶段,wal记录的数据存储在:

  1. mainrdata_head

  2. 每一个注册的buff的rdata_head

  3. 每一个注册的buff的page字段中

函数执行过程如下:

  • 保存头部信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
XLogRecData *rdt;		// XLogRecData链表指针
uint32 total_len = 0;
int block_id;
pg_crc32c rdata_crc;
registered_buffer *prev_regbuf = NULL;
XLogRecData *rdt_datas_last;
XLogRecord *rechdr;
char *scratch = hdr_scratch;

rechdr = (XLogRecord *) scratch;
scratch += SizeOfXLogRecord;

hdr_rdt.next = NULL;
rdt_datas_last = &hdr_rdt;
hdr_rdt.data = hdr_scratch;
  • 构造保存所有块公用的数据部分的rdata链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
*fpw_lsn = InvalidXLogRecPtr;
for (block_id = 0; block_id < max_registered_block_id; block_id++)
{
registered_buffer *regbuf = &registered_buffers[block_id];
XLogRecordBlockHeader bkpb;
XLogRecordBlockImageHeader bimg;
XLogRecordBlockCompressHeader cbimg = {0};
bool samerel;
bool is_compressed = false;
bool include_image;

// 设置block头部信息
bkpb.id = block_id;
bkpb.fork_flags = regbuf->forkno;
bkpb.data_length = 0;
// 设置标记位
if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
bkpb.fork_flags |= BKPBLOCK_WILL_INIT;

// 执行full_page_write,写入XLogRecordBlockImageHeader
if (include_image)
{
Page page = regbuf->page; // 获取对应的page
uint16 compressed_len = 0; // 压缩后的大小

// page需要备份,计算空闲空间大小和偏移
// "hole"指的是page中的空白区域
if (regbuf->flags & REGBUF_STANDARD)
{
// 如果页面遵循标准布局,lower和upper指针将被跳过
uint16 lower = ((PageHeader) page)->pd_lower; // 获取lower
uint16 upper = ((PageHeader) page)->pd_upper; // 获取upper

// 判断page中是否存在"hole"
if (lower >= SizeOfPageHeaderData &&
upper > lower &&
upper <= BLCKSZ)
{
// 计算“hole”的长度的偏移量
bimg.hole_offset = lower;
cbimg.hole_length = upper - lower;
}
else
{
// 没有可以移除的"hole"
bimg.hole_offset = 0;
cbimg.hole_length = 0;
}
}
else
{
// 不是标准的页面布局,无法尝试估算"hole"
bimg.hole_offset = 0;
cbimg.hole_length = 0;
}

// 启用WAL压缩
if (wal_compression)
{
is_compressed =
XLogCompressBackupBlock(page, bimg.hole_offset,
cbimg.hole_length,
regbuf->compressed_page,
&compressed_len); // 调用XLogCompressBackupBlock压缩
}

// 填充XLogRecordBlockHeader结构体的剩余字段
bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;

// 为page内容构造XLogRecData入口
rdt_datas_last->next = &regbuf->bkp_rdatas[0];
rdt_datas_last = rdt_datas_last->next;
// 设置标记
bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;

// 在redo期间,在设置了BKPIMAGE_APPLY标记的情况下full-page才会回放.
if (needs_backup)
bimg.bimg_info |= BKPIMAGE_APPLY;

// 需要压缩
if (is_compressed)
{
bimg.length = compressed_len; // 压缩后的空间
bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED; // 压缩标记

rdt_datas_last->data = regbuf->compressed_page; // 放在registered_buffer中
rdt_datas_last->len = compressed_len;
}
else
{
// 没有压缩,计算image的大小
bimg.length = BLCKSZ - cbimg.hole_length;

// 如果没有"hole"存在
if (cbimg.hole_length == 0)
{
rdt_datas_last->data = page; // 数据指针直接指向page
rdt_datas_last->len = BLCKSZ; // 大小为block size
}
else
{
// 跳过hole
rdt_datas_last->data = page; // 数据指针
rdt_datas_last->len = bimg.hole_offset;// 获取hole的偏移

rdt_datas_last->next = &regbuf->bkp_rdatas[1];// 第2部分
rdt_datas_last = rdt_datas_last->next;

rdt_datas_last->data =
page + (bimg.hole_offset + cbimg.hole_length);// 指针指向第二部分
rdt_datas_last->len =
BLCKSZ - (bimg.hole_offset + cbimg.hole_length);// 设置长度
}
}

total_len += bimg.length; // 调整总长度
}

// 如果需要包含数据
if (needs_data)
{
// 把该缓冲区链接到调用者提供的rdata链中,构成一个整体的链表
bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
bkpb.data_length = regbuf->rdata_len;
total_len += regbuf->rdata_len;

rdt_datas_last->next = regbuf->rdata_head;
rdt_datas_last = regbuf->rdata_tail;
}

// 如果存在上一个注册的buf,而且RefFileNode相同
if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
{
samerel = true;
bkpb.fork_flags |= BKPBLOCK_SAME_REL;
}
else
samerel = false;
// 切换为当前的注册的buf
prev_regbuf = regbuf;

// 拷贝头部信息到scratch缓冲区中
memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
scratch += SizeOfXLogRecordBlockHeader;
if (include_image)
{
memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
scratch += SizeOfXLogRecordBlockImageHeader;
// 压缩存储,追加SizeOfXLogRecordBlockCompressHeader
if (cbimg.hole_length != 0 && is_compressed)
{
memcpy(scratch, &cbimg,
SizeOfXLogRecordBlockCompressHeader);
scratch += SizeOfXLogRecordBlockCompressHeader;
}
}
// 不是同一个REL,追加RelFileNode
if (!samerel)
{
memcpy(scratch, &regbuf->rnode, sizeof(RelFileNode));
scratch += sizeof(RelFileNode);
}
// 追加BlockNumber
memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
scratch += sizeof(BlockNumber);
}
  • 接下来组装XLog Record origin标记

1
2
3
4
5
6
7
if ((curinsert_flags & XLog_INCLUDE_ORIGIN) &&
replorigin_session_origin != InvalidRepOriginId)
{
*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
scratch += sizeof(replorigin_session_origin);
}
  • 接下来组装数据(main data)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
if (mainrdata_len > 0)
{
// 长度超过255,则使用Long格式
if (mainrdata_len > 255)
{
*(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
memcpy(scratch, &mainrdata_len, sizeof(uint32));
scratch += sizeof(uint32);
}
else
{
*(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
*(scratch++) = (uint8) mainrdata_len;
}
rdt_datas_last->next = mainrdata_head;
rdt_datas_last = mainrdata_last;
total_len += mainrdata_len;
}
rdt_datas_last->next = NULL;

hdr_rdt.len = (scratch - hdr_scratch); // 头部大小
total_len += hdr_rdt.len; // 总长度
  • 计算数据的CRC

1
2
3
4
INIT_CRC32C(rdata_crc);
COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
  • 最后填充记录头部信息的其他域字段

1
2
3
4
5
6
7
8
rechdr->xl_xid = GetCurrentTransactionIdIfAny();
rechdr->xl_tot_len = total_len;
rechdr->xl_info = info;
rechdr->xl_rmid = rmid;
rechdr->xl_prev = InvalidXLogRecPtr;
rechdr->xl_crc = rdata_crc;

return &hdr_rdt;

XLogInsertRecord

XLogRecordAssemble组装好的记录插入到WAL内存中。过程分两步:

  • 在内存中为WAL记录保留足够的空间

1
2
3
4
5
6
7
8
if (isLogSwitch)
inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
else
{
ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos,
&rechdr->xl_prev);
inserted = true;
}
  • 将记录复制到WAL内存中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if (inserted)
{
// 计算头部的CRC
rdata_crc = rechdr->xl_crc;
COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc));
FIN_CRC32C(rdata_crc);
rechdr->xl_crc = rdata_crc;

// 所有的数据,包括header信息,均可以进行插入
CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata,
StartPos, EndPos);

// 更新最后一条重要记录的LSN,当持有锁时,只更新第一个
if ((flags & XLog_MARK_UNIMPORTANT) == 0)
{
int lockno = holdingAllLocks ? 0 : MyLockNo;

WALInsertLocks[lockno].l.lastImportantAt = StartPos;
}
}

PageSetLSN

更新被修改的Page LSN