Postgresql 使用 wal 日志保存每一次的数据修改,这样保证了数据库即使意外宕机,也能利用它准确的恢复数据。wal 日志也叫做 xlog,在 9.4 版本之后作了重大更新,本篇只讲解最新版的格式。wal 日志被用于多个方面,比如修改数据,修改索引等,每种用途的格式都不相同,但是构建方式是相同的。
WAL日志文件
WAL段文件
WAL日志文件存放在sd_wal目录下,每个文件大小默认为16M:
1 2 3 4 5 6 7 -rw------- 1 zhangze zhangze 16777216 Oct 8 10:57 0000000100000000000000B6 -rw------- 1 zhangze zhangze 16777216 Oct 8 10:57 0000000100000000000000B7 -rw------- 1 zhangze zhangze 16777216 Oct 8 10:57 0000000100000000000000B8 -rw------- 1 zhangze zhangze 16777216 Oct 8 10:57 0000000100000000000000B9 -rw------- 1 zhangze zhangze 16777216 Oct 8 10:57 0000000100000000000000BA -rw------- 1 zhangze zhangze 16777216 Oct 8 10:57 0000000100000000000000BB drwx------ 2 zhangze zhangze 68 Oct 8 10:53 archive_status
文件名由16进制的24个字符组成,每8个字符为一组,每组意义如下:
1 2 00000001 00000000 000000B6 时间线 LogID LogSeg
时间线 :时间线ID,取值范围为 0x00000000 -> 0xFFFFFFFF。数据库建好后的第一个WAL日志文件的时间线ID从1开始
LogID :逻辑文件ID,取值范围为 0x00000000 -> 0xFFFFFFFF
LogSeg :物理文件ID,取值范围为 0x00000000 -> 0x000000FF。数据库建好后的第一个WAL日志文件的LogSeg从1开始,达到最大值(0xFF)后从0开始。
LSN 即日志序列号,表示XLog记录在事务日志文件中的偏移,为uint64值。LSN由三部分组成,分别是逻辑文件ID,物理文件ID和文件内偏移量。LSN打印出来是两个8位的十六进制数,如16/B374D848。由专门的类型pg_lsn
来存放LSN数据
PG WAL文件名字的命名方法是在XLogFileName宏里定义的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 #define XLogSegmentsPerXLogId(wal_segsz_bytes) \ (UINT64CONST(0x100000000) / (wal_segsz_bytes)) #define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes) \ snprintf(fname, MAXFNAMELEN, "%08X%08X%08X" , tli, \ (uint32) ((logSegNo) / XLogSegmentsPerXLogId(wal_segsz_bytes)), \ (uint32) ((logSegNo) % XLogSegmentsPerXLogId(wal_segsz_bytes))) #define XLogFileNameById(fname, tli, log, seg) \ snprintf(fname, MAXFNAMELEN, "%08X%08X%08X" , tli, log, seg) #define IsXLogFileName(fname) \ (strlen(fname) == XLOG_FNAME_LEN && \ strspn(fname, "0123456789ABCDEF" ) == XLOG_FNAME_LEN)
WAL文件内部结构
每个WAL段文件由多个8kb大小的page组成,每个Page中存放着PageHeader信息,以及多条WAL Record
Page结构
每个page的组织方式如下图:
PageHeader :在wal page的组成中有两种pageheader结构,XLogPageHeaderData
和XLogLongPageHeaderData
。每个WAL段的第一个Page的Header应为LongHeader
Remain data :存储着上一个page中最后一个Record没有存完的数据,大小为xlp_rem_len
,对应page的不完整Record
Record :存储具体的WAL Record
无数据区域 :一个WAL Record的头部信息不允许跨页,如果剩余空间不够存储头部信息,则舍弃这部分空间
Record结构
每个WAL Record的结构如下图,绿色部分为数据描述结构,黄色部分是实际保存的数据
XLogRecord :一个WAL记录的入口,解析WAL时,从这个结构体入手
Block :第一个虚线框称为一个BLOCK,用以描述Buffer相关的数据结构。通过XLogRegisterBuffer()
函数注册到wal记录中
XLogRecordBlockHeader :一个BLOCK的头部信息
XLogRecordBlockImageHeader :如果该WAL是fpw记录,该结构存放fpw相关信息
fpw:Full_page_write,具体见整页写入
XLogRecordBlockCompressHeader :记录hole的大小
hole:数据文件的page中,可能会有一块空白区域,即pointer和tuple之间的区域,称为hole
RelFilenode :此结构记录了此block所属的关系
BlockNumber :此block记录的page的块号
XLogRecordDataHeader(Long/short) :当main data的大小大于255时,使用Long Header
buffer data :第二个虚线框部分,包括page data和tuple data
page data :由XLogRegisterBuffer()
函数注册到wal记录,存放buffer page信息
tuple data :由XLogRegisterBufData()
函数注册到wal记录,存储了实际的buff数据和变更数据。
main data :保存非buffer性的数据,由XLogRegisterData()
函数到WAL记录,例如特殊结构体,旧元组或key
WAL日志写入实现
当数据库数据发生变更时:
change发生时:先要将变更后内容计入wal buffer中,再将变更后的数据写入data buffer;
commit发生时:wal buffer中数据刷新到磁盘;
checkpoint发生时:将所有data buffer刷新的磁盘。
WAL日志机制就是先将变更内容存放到wal buffer,commit后将wal buffer刷入磁盘的过程。过程中主要的函数如下:
1 2 3 4 5 6 7 8 9 XLogBeginInsert(); XLogRegisterData(); XLogRegisterBuffer(); XLogRegisterBufData(); XLogSetRecordFlags(); XLogInsert(); XLogRecordAssemble(); XLogInsertRecord(); PageSetLSN
整页写入(Full_Write_Page)
如果数据库系统在写入脏页的过程中出现故障,会导致磁盘上的页面数据损坏,而XLOG是无法在损坏的页面上重放的,需要整页写入来恢复。
如果启用整页写入,PostgreSQL会在每个检查点后,每个页面第一次变更发生前,将整个页面以及Header信息作为一条XLog写入,这个功能默认开启。在数据库恢复过程中,如果检查到一条XLog是一个用来整页写入的备份区块,会使用另一条重放规则:XLog会直接覆盖当前页面,无视页面和XLog记录中的LSN,然后将页面的LSN更新为XLog记录的LSN
具体数据结构
XLog Page
XLog日志分为很多逻辑段文件,每个段文件分成许多个页面,每个页面的大小为一个块的大小。每个日志页面都有一个头部信息:
1 2 3 4 5 6 7 8 9 10 11 typedef struct XLogPageHeaderData { uint16 xlp_magic; uint16 xlp_info; TimeLineID xlp_tli; XLogRecPtr xlp_pageaddr; uint32 xlp_rem_len; } XLogPageHeaderData;
如果页面是该日志文件的首页,那么在原头部信息的基础上会使用一个长的头部信息
1 2 3 4 5 6 7 typedef struct XLogLongPageHeaderData { XLogPageHeaderData std ; uint64 xlp_sysid; uint32 xlp_seg_size; uint32 xlp_XLog_blcksz; } XLogLongPageHeaderData;
XLog Record
XLogRecord
结构XLogRecord
记录了XLog的相关控制信息,一个XLog记录最多可以附3个备份块, 每个块对应一个磁盘大小的数据,长度为8kb
1 2 3 4 5 6 7 8 9 10 typedef struct XLogRecord { uint32 xl_tot_len; TransactionId xl_xid; XLogRecPtr xl_prev; uint8 xl_info; RmgrId xl_rmid; pg_crc32c xl_crc; } XLogRecord;
存放block的相关信息
1 2 3 4 5 6 7 8 9 typedef struct XLogRecordBlockHeader { uint8 id; uint8 fork_flags; uint16 data_length; } XLogRecordBlockHeader;
存放整页写入的相关信息
1 2 3 4 5 6 typedef struct XLogRecordBlockImageHeader { uint16 length; uint16 hole_offset; uint8 bimg_info; } XLogRecordBlockImageHeader;
存放page中的hole大小
1 2 3 4 typedef struct XLogRecordBlockCompressHeader { uint16 hole_length; } XLogRecordBlockCompressHeader;
XLog Data
WAL Record的数据部分的header信息
1 2 3 4 5 6 7 8 9 10 typedef struct XLogRecordDataHeaderShort { uint8 id; uint8 data_length; }XLogRecordDataHeaderShort; typedef struct XLogRecordDataHeaderLong { uint8 id; }XLogRecordDataHeaderLong;
XLogRecData
XLog日志记录中的数据信息存储在结构XLogRecData
中
1 2 3 4 5 6 typedef struct XLogRecData { struct XLogRecData *next ; char *data; uint32 len; } XLogRecData;
XLog控制结构
XLogCtlData
在共享内存中用结构XLogCtlData
保存XLog信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 typedef struct XLogCtlData { XLogCtlInsert Insert; XLogwrtRqst LogwrtRqst; XLogRecPtr RedoRecPtr; FullTransactionId ckptFullXid; XLogRecPtr asyncXactLSN; XLogRecPtr replicationSlotMinLSN; XLogSegNo lastRemovedSegNo; XLogRecPtr unloggedLSN; slock_t ulsn_lck; pg_time_t lastSegSwitchTime; XLogRecPtr lastSegSwitchLSN; XLogwrtResult LogwrtResult; XLogRecPtr InitializedUpTo; char *pages; XLogRecPtr *xlblocks; int XLogCacheBlck; TimeLineID ThisTimeLineID; TimeLineID PrevTimeLineID; RecoveryState SharedRecoveryState; bool SharedHotStandbyActive; bool XLogWriterSleeping; Latch recoveryWakeupLatch; XLogRecPtr lastCheckPointRecPtr; XLogRecPtr lastCheckPointEndPtr; CheckPoint lastCheckPoint; XLogRecPtr lastReplayedEndRecPtr; TimeLineID lastReplayedTLI; XLogRecPtr replayEndRecPtr; TimeLineID replayEndTLI; TimestampTz recoveryLastXTime; TimestampTz currentChunkStartTime; bool recoveryPause; XLogRecPtr lastFpwDisableRecPtr; slock_t info_lck; } XLogCtlData;
Register_buffer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 typedef struct { bool in_use; uint8 flags; RelFileNode rnode; ForkNumber forkno; BlockNumber block; Page page; uint32 rdata_len; XLogRecData *rdata_head; XLogRecData *rdata_tail; XLogRecData bkp_rdatas[2 ]; char compressed_page[PGLZ_MAX_BLCKSZ]; } registered_buffer;
重要全局变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 static XLogRecData *mainrdata_head;static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;static registered_buffer *registered_buffers;static XLogRecData *rdatas;
具体函数代码
XLogBeginInsert
函数主要作用是检验调用环境是否正确,判断当前是否可以执行xlog插入,并设置开始构造WAL记录的标记,标志wal插入开始。
1 2 3 4 5 6 7 8 9 10 11 Assert(max_registered_block_id == 0 ); Assert(mainrdata_last == (XLogRecData *) &mainrdata_head); Assert(mainrdata_len == 0 ); if (!XLogInsertAllowed()) elog(ERROR, "cannot make new WAL entries during recovery" ); if (begininsert_called) elog(ERROR, "XLogBeginInsert was already called" ); begininsert_called = true ;
XLogRegisterData
将本条wal记录的特殊结构体数据注册到wal记录,比如XLOG_HEAP_INSERT子类型的xl_heap_insert结构体。
将一些旧元组数据注册到wal记录,比如执行update语句的旧元组数据、delete语句的旧元组数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Assert(begininsert_called); if (num_rdatas >= max_rdatas) elog(ERROR, "too much WAL data" ); rdata = &rdatas[num_rdatas++]; rdata->data = data; rdata->len = len; mainrdata_last->next = rdata; mainrdata_last = rdata; mainrdata_len += len;
XLogRegisterBuffer
将涉及到的buff注册到wal记录,比如insert语句的目标buff、update语句的目标buff和源buff
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 if (block_id >= max_registered_block_id){ if (block_id >= max_registered_buffers) elog(ERROR, "too many registered buffers" ); max_registered_block_id = block_id + 1 ; } regbuf = ®istered_buffers[block_id]; BufferGetTag(buffer, ®buf->rnode, ®buf->forkno, ®buf->block); regbuf->page = BufferGetPage(buffer); regbuf->flags = flags; regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head; regbuf->rdata_len = 0 ; regbuf->in_use = true ;
XLogRegisterBufData
函数主要作用是将元组内容注册到WAL记录中。需要参数block id,这个id必须是已经通过XLogRegisterBuffer
注册的block
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 regbuf = ®istered_buffers[block_id]; if (!regbuf->in_use) elog(ERROR, "no block with id %d registered with WAL insertion" , block_id); if (num_rdatas >= max_rdatas) elog(ERROR, "too much WAL data" ); rdata = &rdatas[num_rdatas++]; rdata->data = data; rdata->len = len; regbuf->rdata_tail->next = rdata; regbuf->rdata_tail = rdata; regbuf->rdata_len += len;
XLogInsert
插入WAL的操作由函数XLogInsert
完成,根据Rdata链表和相应的资源管理器info向WAL日志文件中插入一条WAL记录。事务执行插入,删除,更新,提交,终止或回滚命令时都需要调用此函数
1 2 3 4 if ((info & ~(XLR_RMGR_INFO_MASK | XLR_SPECIAL_REL_UPDATE | XLR_CHECK_CONSISTENCY)) != 0 ) elog(PANIC, "invalid xlog info mask %02X" , info);
1 2 3 4 5 6 if (IsBootstrapProcessingMode() && rmid != RM_XLog_ID){ XLogResetInsertion(); EndPos = SizeOfXLogLongPHD; return EndPos; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 do { XLogRecPtr RedoRecPtr; bool doPageWrites; XLogRecPtr fpw_lsn; XLogRecData *rdt; GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites); rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites, &fpw_lsn); EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags); } while (EndPos == InvalidXLogRecPtr);
XLogRecordAssemble
函数用于将已注册的数据和缓冲区页面数据组装成一条WAL记录,将其写入到XLogRecData
链表中。
执行到这个阶段,wal记录的数据存储在:
mainrdata_head
每一个注册的buff的rdata_head
每一个注册的buff的page字段中
函数执行过程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 XLogRecData *rdt; uint32 total_len = 0 ; int block_id;pg_crc32c rdata_crc; registered_buffer *prev_regbuf = NULL ; XLogRecData *rdt_datas_last; XLogRecord *rechdr; char *scratch = hdr_scratch;rechdr = (XLogRecord *) scratch; scratch += SizeOfXLogRecord; hdr_rdt.next = NULL ; rdt_datas_last = &hdr_rdt; hdr_rdt.data = hdr_scratch;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 *fpw_lsn = InvalidXLogRecPtr; for (block_id = 0 ; block_id < max_registered_block_id; block_id++){ registered_buffer *regbuf = ®istered_buffers[block_id]; XLogRecordBlockHeader bkpb; XLogRecordBlockImageHeader bimg; XLogRecordBlockCompressHeader cbimg = {0 }; bool samerel; bool is_compressed = false ; bool include_image; bkpb.id = block_id; bkpb.fork_flags = regbuf->forkno; bkpb.data_length = 0 ; if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT) bkpb.fork_flags |= BKPBLOCK_WILL_INIT; if (include_image) { Page page = regbuf->page; uint16 compressed_len = 0 ; if (regbuf->flags & REGBUF_STANDARD) { uint16 lower = ((PageHeader) page)->pd_lower; uint16 upper = ((PageHeader) page)->pd_upper; if (lower >= SizeOfPageHeaderData && upper > lower && upper <= BLCKSZ) { bimg.hole_offset = lower; cbimg.hole_length = upper - lower; } else { bimg.hole_offset = 0 ; cbimg.hole_length = 0 ; } } else { bimg.hole_offset = 0 ; cbimg.hole_length = 0 ; } if (wal_compression) { is_compressed = XLogCompressBackupBlock(page, bimg.hole_offset, cbimg.hole_length, regbuf->compressed_page, &compressed_len); } bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE; rdt_datas_last->next = ®buf->bkp_rdatas[0 ]; rdt_datas_last = rdt_datas_last->next; bimg.bimg_info = (cbimg.hole_length == 0 ) ? 0 : BKPIMAGE_HAS_HOLE; if (needs_backup) bimg.bimg_info |= BKPIMAGE_APPLY; if (is_compressed) { bimg.length = compressed_len; bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED; rdt_datas_last->data = regbuf->compressed_page; rdt_datas_last->len = compressed_len; } else { bimg.length = BLCKSZ - cbimg.hole_length; if (cbimg.hole_length == 0 ) { rdt_datas_last->data = page; rdt_datas_last->len = BLCKSZ; } else { rdt_datas_last->data = page; rdt_datas_last->len = bimg.hole_offset; rdt_datas_last->next = ®buf->bkp_rdatas[1 ]; rdt_datas_last = rdt_datas_last->next; rdt_datas_last->data = page + (bimg.hole_offset + cbimg.hole_length); rdt_datas_last->len = BLCKSZ - (bimg.hole_offset + cbimg.hole_length); } } total_len += bimg.length; } if (needs_data) { bkpb.fork_flags |= BKPBLOCK_HAS_DATA; bkpb.data_length = regbuf->rdata_len; total_len += regbuf->rdata_len; rdt_datas_last->next = regbuf->rdata_head; rdt_datas_last = regbuf->rdata_tail; } if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode)) { samerel = true ; bkpb.fork_flags |= BKPBLOCK_SAME_REL; } else samerel = false ; prev_regbuf = regbuf; memcpy (scratch, &bkpb, SizeOfXLogRecordBlockHeader); scratch += SizeOfXLogRecordBlockHeader; if (include_image) { memcpy (scratch, &bimg, SizeOfXLogRecordBlockImageHeader); scratch += SizeOfXLogRecordBlockImageHeader; if (cbimg.hole_length != 0 && is_compressed) { memcpy (scratch, &cbimg, SizeOfXLogRecordBlockCompressHeader); scratch += SizeOfXLogRecordBlockCompressHeader; } } if (!samerel) { memcpy (scratch, ®buf->rnode, sizeof (RelFileNode)); scratch += sizeof (RelFileNode); } memcpy (scratch, ®buf->block, sizeof (BlockNumber)); scratch += sizeof (BlockNumber); }
1 2 3 4 5 6 7 if ((curinsert_flags & XLog_INCLUDE_ORIGIN) && replorigin_session_origin != InvalidRepOriginId) { *(scratch++) = (char ) XLR_BLOCK_ID_ORIGIN; memcpy (scratch, &replorigin_session_origin, sizeof (replorigin_session_origin)); scratch += sizeof (replorigin_session_origin); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 if (mainrdata_len > 0 ){ if (mainrdata_len > 255 ) { *(scratch++) = (char ) XLR_BLOCK_ID_DATA_LONG; memcpy (scratch, &mainrdata_len, sizeof (uint32)); scratch += sizeof (uint32); } else { *(scratch++) = (char ) XLR_BLOCK_ID_DATA_SHORT; *(scratch++) = (uint8) mainrdata_len; } rdt_datas_last->next = mainrdata_head; rdt_datas_last = mainrdata_last; total_len += mainrdata_len; } rdt_datas_last->next = NULL ; hdr_rdt.len = (scratch - hdr_scratch); total_len += hdr_rdt.len;
1 2 3 4 INIT_CRC32C(rdata_crc); COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord); for (rdt = hdr_rdt.next; rdt != NULL ; rdt = rdt->next) COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
1 2 3 4 5 6 7 8 rechdr->xl_xid = GetCurrentTransactionIdIfAny(); rechdr->xl_tot_len = total_len; rechdr->xl_info = info; rechdr->xl_rmid = rmid; rechdr->xl_prev = InvalidXLogRecPtr; rechdr->xl_crc = rdata_crc; return &hdr_rdt;
XLogInsertRecord
将XLogRecordAssemble
组装好的记录插入到WAL内存中。过程分两步:
1 2 3 4 5 6 7 8 if (isLogSwitch) inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev); else { ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos, &rechdr->xl_prev); inserted = true ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 if (inserted){ rdata_crc = rechdr->xl_crc; COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc)); FIN_CRC32C(rdata_crc); rechdr->xl_crc = rdata_crc; CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata, StartPos, EndPos); if ((flags & XLog_MARK_UNIMPORTANT) == 0 ) { int lockno = holdingAllLocks ? 0 : MyLockNo; WALInsertLocks[lockno].l.lastImportantAt = StartPos; } }
PageSetLSN
更新被修改的Page LSN