元组插入

元组的插入主要由函数heap_insert来完成,主要分为几步:

  1. 初始化元组头 HeapTupleHeader

  2. 获取可用的page

  3. 判断元组可见性和事务冲突

  4. 将元组写入可用的page,并标记page dirty

  5. 写wal

初始化元组的元数据

heap_prepare_insert中完成,接口比较简单,我们直接来看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static HeapTuple
heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
CommandId cid, int options)
{
/* 计算元组的infomask & infomask2 */
tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
tup->t_data->t_infomask |= HEAP_XMAX_INVALID;

/* 设置xmin,也就是元组插入的事务号 */
HeapTupleHeaderSetXmin(tup->t_data, xid);
if (options & HEAP_INSERT_FROZEN)
HeapTupleHeaderSetXminFrozen(tup->t_data);

/* 设置cid、xmax和tableoid,由于是元组插入,则将xmax设置为0 */
HeapTupleHeaderSetCmin(tup->t_data, cid);
HeapTupleHeaderSetXmax(tup->t_data, 0);
tup->t_tableOid = RelationGetRelid(relation);

/* 判断是否为TOAST元组 */
if (relation->rd_rel->relkind != RELKIND_RELATION &&
relation->rd_rel->relkind != RELKIND_MATVIEW)
{
Assert(!HeapTupleHasExternal(tup));
return tup;
}
else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
/* TOAST元组,调用相应的插入接口 */
return heap_toast_insert_or_update(relation, tup, NULL, options);
else
/* 否则直接返回准备好的tup */
return tup;
}

获取可用的page

RelationGetBufferForTuple中完成。

relation是目标表对象,len是tuple插入需要的长度。otherBuffer用于元组update时替换旧的buffer,options是写入的选项,bistate表示批量插入对象的状态,vmbuffer和vmbuffer_other用于可见性映射

1
2
3
4
5
Buffer
RelationGetBufferForTuple(Relation relation, Size len,
Buffer otherBuffer, int options,
BulkInsertState bistate,
Buffer *vmbuffer, Buffer *vmbuffer_other)
  1. 通过填充因子,计算空闲空间。

填充因子FillFactor是一个百分比,限制了我们对于page的使用率

1
2
3
4
5
6
7
8
9
10
11
// 这里使用了默认填充百分比, HEAP_DEFAULT_FILLFACTOR = 100, 也就是完全填充
saveFreeSpace = RelationGetTargetPageFreeSpace(relation, HEAP_DEFAULT_FILLFACTOR);

// 没有tuple的page中也可能有line pointer,所以使用一个近似值
nearlyEmptyFreeSpace = MaxHeapTupleSize -
(MaxHeapTuplesPerPage / 8 * sizeof(ItemIdData));

if (len + saveFreeSpace > nearlyEmptyFreeSpace)
targetFreeSpace = Max(len, nearlyEmptyFreeSpace);
else
targetFreeSpace = len + saveFreeSpace;
  1. 尝试从cache中获取表最近使用的page,如果没有则尝试通过FSM获取满足插入条件的page

1
2
3
4
5
6
7
8
9
10
if (bistate && bistate->current_buf != InvalidBuffer)
targetBlock = BufferGetBlockNumber(bistate->current_buf);
else
targetBlock = RelationGetTargetBlock(relation);

if (targetBlock == InvalidBlockNumber && use_fsm)
{
// cache中没有目标page,让FSM来提供一个初始目标page
targetBlock = GetPageWithFreeSpace(relation, targetFreeSpace);
}
  1. 接下来我们拿着之前获取到的block,去获取对应的buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if (otherBuffer == InvalidBuffer)
{
// 没有otherbuffer, 简单插入语句, 获取buffer
buffer = ReadBufferBI(relation, targetBlock, RBM_NORMAL, bistate);
if (PageIsAllVisible(BufferGetPage(buffer)))
visibilitymap_pin(relation, targetBlock, vmbuffer);

if ((options & HEAP_INSERT_FROZEN) &&
(PageGetMaxOffsetNumber(BufferGetPage(buffer)) == 0))
visibilitymap_pin(relation, targetBlock, vmbuffer);

LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
}

page = BufferGetPage(buffer);
  • 如果拿到的page有足够的空闲空间,我们就完成了获取可用page的步骤,直接返回就可以了

1
2
3
4
5
6
7
8
// 检查page是否有足够的空闲空间,如果有则返回当前page
pageFreeSpace = PageGetHeapFreeSpace(page);
if (targetFreeSpace <= pageFreeSpace)
{
/* use this page as future insert target, too */
RelationSetTargetBlock(relation, targetBlock);
return buffer;
}
  • 如果没有足够的空间,我们需要去寻找下一个block

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 如果有正在进行的批量操作, 则还有其他的未使用的page, 我们尝试使用下一个page
if (bistate && bistate->next_free != InvalidBlockNumber)
{
targetBlock = bistate->next_free;
if (bistate->next_free >= bistate->last_free)
{
bistate->next_free = InvalidBlockNumber;
bistate->last_free = InvalidBlockNumber;
}
else
bistate->next_free++;
}

// 如果没有进行批量操作, 这个时候就要问一问FSM的意见
else if (!use_fsm)
{
// 没有FSM, 只能跳出循环去扩页
break;
}
else
{
// 通过FSM寻找下一个合适的page
targetBlock = RecordAndGetPageWithFreeSpace(relation,
targetBlock,
pageFreeSpace,
targetFreeSpace);
}
  1. 如果以上循环中没有找到空闲buffer,我们只能进入扩页的逻辑来新增一个page用来插入

1
2
3
4
5
6
7
// 扩页并返回新增的page, 扩页的具体逻辑我们在这就不详细介绍了
// 简单来说就是给relation加Exclusive锁, 然后初始化一个新的page进去
buffer = RelationAddBlocks(relation, bistate, num_pages, use_fsm,
&unlockedTargetBuffer);

targetBlock = BufferGetBlockNumber(buffer);
page = BufferGetPage(buffer);
  1. 新增的页我们再来检查下是否有足够的空闲空间,如果有的话就能返回使用了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
pageFreeSpace = PageGetHeapFreeSpace(page);
if (len > pageFreeSpace)
{
if (unlockedTargetBuffer)
{
if (otherBuffer != InvalidBuffer)
LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
UnlockReleaseBuffer(buffer);

goto loop;
}
elog(PANIC, "tuple is too big: size %zu", len);
}

RelationSetTargetBlock(relation, targetBlock);

return buffer;

写入数据

写入数据是由接口RelationPutHeapTuple来完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void
RelationPutHeapTuple(Relation relation,
Buffer buffer,
HeapTuple tuple,
bool token)
{
Page pageHeader;
OffsetNumber offnum;

...
pageHeader = BufferGetPage(buffer);

// 将元组插入到page中
offnum = PageAddItem(pageHeader, (Item) tuple->t_data,
tuple->t_len, InvalidOffsetNumber, false, true);

// 记录当前的位置到元组的t_self中
ItemPointerSet(&(tuple->t_self), BufferGetBlockNumber(buffer), offnum);

// 将当前的位置也记录到元组的ctid中
if (!token)
{
ItemId itemId = PageGetItemId(pageHeader, offnum);
HeapTupleHeader item = (HeapTupleHeader) PageGetItem(pageHeader, itemId);

item->t_ctid = tuple->t_self;
}
}

这里主要的工作都有函数PageAddItem来完成,PageAddItem是一个宏,内部调用PageAddItemExtended,我们来详细看下这个接口

page是插入的页面,item的插入的数据指针,size的插入数据的大小。offsetNumber是元组在页面中的偏移量,如果插入成功则会被返回。flags是插入的选项

1
2
3
4
5
6
OffsetNumber
PageAddItemExtended(Page page,
Item item,
Size size,
OffsetNumber offsetNumber,
int flags)
  1. 首先我们在page中寻找下一个slot的位置,如果之后没有找到空余的slot,我们就会使用这个位置来插入元组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));

// 如果page有free line pointer, 也就是pd_flags中包含PD_HAS_FREE_LINES标记
// 则去行指针数组中寻找free slot
if (PageHasFreeLinePointers(page))
{
// 扫描PageHeader的行指针数组,查找标记为 LP_UNUSED的 itemId
for (offsetNumber = FirstOffsetNumber;
offsetNumber < limit;
offsetNumber++)
{
if (!ItemIdIsUsed(itemId) && !ItemIdHasStorage(itemId))
break;
}
}
else
{
// 没有free line pointer, 使用limit
offsetNumber = limit;
}
  1. 接下来我们计算page的pd_lower和pg_upper指针指向的offset

1
2
3
4
5
6
7
8
if (offsetNumber == limit || needshuffle)
lower = phdr->pd_lower + sizeof(ItemIdData);
else
lower = phdr->pd_lower;

alignedSize = MAXALIGN(size);

upper = (int) phdr->pd_upper - (int) alignedSize;
  1. 最后我们就可以把元组插入到page中了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

// 如果已经有了对应的行指针, 则重新设置行指针中的值和标记
itemId = PageGetItemId(page, offsetNumber);

if (needshuffle)
memmove(itemId + 1, itemId,
(limit - offsetNumber) * sizeof(ItemIdData));

ItemIdSetNormal(itemId, upper, size);

// 将数据插入到对应的offset
memcpy((char *) page + upper, item, size);

// 更新PageHeader中的upper和lower指针
phdr->pd_lower = (LocationIndex) lower;
phdr->pd_upper = (LocationIndex) upper;

###WAL log和统计信息

将元组真正插入到page之后,我们会写WAL log并更新相关的统计信息。这两块我们就不在这里详细的描述了