MIT-6.830 Lab4 总结与备忘
实现一个基于锁的事务系统,需要添加加锁和解锁调用,并跟踪每个事务所持有的锁
事务、锁和一致性控制:
- 事务是一组原子执行的数据库操作(插入、读取、删除等),要么操作都完成,要么都没有完成
- ACID属性:
- 两阶段锁定、缓冲区管理确保原子性
- 基于原子性,数据库是事务一致的
- 隔离:严格的两阶段锁定提供隔离
- 持久性:强制的缓冲区管理确保持久性(NO STEAL/FORCE buffer management policy)
- 如果脏页被一个未提交的事务加锁,不应该从缓冲池中淘汰它(更新到磁盘)
- 事务提交时,脏页强制写到磁盘
- Lab4假设数据库不会崩溃
以页为单位锁定
- 需要创建数据结构,跟踪事务持有哪些锁,在事务被请求时检查是否应该授予其锁
- 需要实现共享锁和独占锁:事务读取一个对象前加共享锁,事务写入一个对象前加独占锁(如果事务t在对象o上有唯一的共享锁,则t可以将其升级前为独占锁)
如果一个事务请求的锁不能立刻授予,则应当阻塞
Exercise 1、2
思路整理
- 要在
BufferPool
写获取、释放锁的方法,通过LockingTest单元测试 - 修改
getPage()
,其在返回页面前阻塞并获得所需的锁 - 这里最好还是在getPage()中获得页面的锁,而不是在每个操作中对页面加锁
- 共享锁和独占锁可以根据Permission类的一个实例(Permissions.READ_WRITE或 Permissions.READ_ONLY)进行区分,即作为Lock类的一个属性
- 实现
unsafeReleasePage()
,在事务结束的时候调用 - 实现
holdsLock()
,判断一个页面是否已经被事务加锁 - 要考虑,在writePage的时候,是否存在和其他事务的race
- 注意,在插入tuple的时候,如果事务t发现页面p上没有空槽,那么t对p的锁可以立刻释放
- 因此,需要在bufferpool中定义一个Lock类,拥有事务id属性,以及读写的permission,作为锁类型的区分
- 同时,需要在bufferpool中定义一个LockManager类,并添加一个实例作为属性,用于维护事务和锁的状态,包括分配锁、移除锁
- 这个类里需要有一个页号到锁的集合的concurrentHashMap
- 分配锁:
- 如果当前页号没有锁,说明是第一次获取锁。所以读就直接分配共享锁,写就直接分配独占锁(此时,分配就是指在map里新建一个entry,new一个锁,然后把锁加到entry的value里),返回true
- 如果当前页号已经有锁,则遍历页号对应的所有锁,看锁保持的transactionId和这次的事务id是不是一样:
- 如果和当前的transactionId一样,则判断已有锁的类型
- case1:已有读锁,还想要读锁。直接返回true
- case2:已有读锁,想要写锁。要检查页号对应的锁是不是只有这一个,是则锁升级,不是则返回false
- case3:已有写锁。直接返回
- 如果和当前的transactionId不一样,则
- case1:如果这次想要写锁,直接false
- case2:如果这次要读锁,则看页号对应的锁里有没有写锁。有写锁则失败,全是读锁则可以分配个新的读锁
- 移除锁:即,将锁从concurrentHashMap移除
- 这里为了保证请求的一致性(保证并发的请求具有线性一致性),可以简单地使用java的synchronized关键字,作为函数的修饰符——某个对象实例内,synchronized aMethod(){}可以防止多个线程同时访问这个对象的synchronized方法,并且如果一个对象有多个synchronized方法,只要一个线程访问其中一个synchronized方法,其它线程不能同时访问这个对象中任何一个synchronized方法。但是不同对象实例的 synchronized方法是不相干扰的,其它线程可以同时访问相同类的另一个对象实例中的synchronized方法。由于一个database只有一个bufferpool实例,一个bufferpool只有一个lockmanager实例,因此就保证了一次只有一个线程访问,其他线程阻塞
- (synchronized static aStaticMethod{}防止多个线程同时访问这个类中的synchronized static方法,可以对类的所有对象实例起作用)
具体实现
LockManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81public class LockManager {
private ConcurrentHashMap<PageId, Vector<Lock>> lockMap;
public LockManager () {
lockMap = new ConcurrentHashMap<>();
}
// 删除一个lock,如果对应的Vector为空,则删除这个entry
public synchronized void removeLock(TransactionId tid, PageId pageId) {
Vector<Lock> curLockVector = this.lockMap.get(pageId);
int index = 0;
boolean found = false;
for (int i = 0; i < curLockVector.size(); i++) {
if (curLockVector.get(i).getTid().equals(tid)) {
found = true;
index = i;
break;
}
}
if (found) {
curLockVector.remove(index);
if (curLockVector.size() == 0) {
this.lockMap.remove(pageId);
}
}
}
// 加锁
public synchronized boolean setLock(PageId pageId, TransactionId tid, Permissions type) {
Vector<Lock> curLockVector = this.lockMap.get(pageId);
if (curLockVector == null) { // 如果这个page没有上锁
curLockVector = new Vector<>();
curLockVector.add(new Lock(tid, type));
this.lockMap.put(pageId, curLockVector);
return true;
} else {
if (type.equals(Permissions.READ_WRITE)) {
// 申请独占锁
if (curLockVector.size() == 1) {
if (curLockVector.get(0).getTid().equals(tid)) {
// 只有一个锁并且锁的事务id一致,看是否升级锁
if (curLockVector.get(0).getPermission().equals(Permissions.READ_ONLY)) {
curLockVector.get(0).setPermission(type);
this.lockMap.put(pageId, curLockVector);
}
return true;
} else{ // 当前页面已经被其他事务共享,暂时不能独占
return false;
}
} else { // 当前页面的锁有多个
return false;
}
} else {
// 申请共享锁
for (Lock curLock : curLockVector) {
if (curLock.getPermission().equals(Permissions.READ_WRITE)) { // 如果这个锁是独占锁,此时只能有一个锁
return curLock.getTid().equals(tid) && curLockVector.size() == 1;
}
if(curLock.getTid().equals(tid)) {
return true;
}
}
curLockVector.add(new Lock(tid, type));
this.lockMap.put(pageId, curLockVector);
return true;
}
}
}
// 查看一个tid是否对pid加了lock
public synchronized boolean holdLock (PageId pid, TransactionId tid) {
Vector<Lock> curLockVector = this.lockMap.get(pid);
for (Lock curLock : curLockVector) {
if (curLock.getTid().equals(tid)) {
return true;
}
}
return false;
}
}在GetPage时,要重复地检查是否获得相应的锁,如果没有,则需要循环等待(设置循环超时时间)
1
2
3
4
5
6
7
8
9
10
11
12
13
14public Page getPage(TransactionId tid, PageId pid, Permissions perm)
throws TransactionAbortedException, DbException {
// some code goes here
boolean hasLock = false;
long startTime = System.currentTimeMillis();
long timeout = 100;
while (!hasLock) {
if (System.currentTimeMillis()-startTime > timeout) {
throw new TransactionAbortedException();
}
hasLock = this.lockManager.setLock(pid, tid, perm);
}
...
}在insert tuple时,如果当前Page的slot没有空的,则当前page去锁
1
2
3
4
5
6
7
8
9
10// public List<Page> insertTuple(TransactionId tid, Tuple t)
...
if (curPage.getNumEmptySlots() > 0) {
curPage.insertTuple(t);
res.add(curPage);
this.writePage(curPage);
return res;
} else {
Database.getBufferPool().unsafeReleasePage(tid, curPage.pid);
}
Exercise 3、4
思路整理
修改
BufferPool
的evictPage()
,使其不驱逐脏页;BufferPool中实现transactionComplete()
,通过TransactionTest单元测试和AbortEvictionTest系统测试一个事务对page的修改,只有在提交之后才被写入磁盘,因此不能淘汰脏页(如果可以淘汰,则可能通过丢弃脏页并从磁盘重读中止一个事务)。如果缓冲池所有Page都是脏页,应该抛出DbException。如果驱逐了一个干净页面,注意事务可能持有被淘汰页面的锁
SimpleDB每次查询都会创建一个TransactionId对象,对象传递给每个参与查询的操作符,而查询完成后BufferPool的
transactionComplete()
被调用。这个方法可以提交或中止事务(由参数commit指定),事务执行时操作符可以抛出一个TransactionAbortedException异常,表明发生了内部错误或死锁。代码里transactionComplete有两个版本:接受额外的布尔提交参数,一个不接受——没有额外参数的方法总是commit,可以简单地调用transactionComplete(tid, true)
来实现提交时,应该将与事务相关的脏页flush到磁盘上;中止时,应该恢复页面为磁盘内容,以恢复事务所做的任何改变。而无论提交还是中止,都应该释放BufferPool保存的关于该事务的任何状态,并释放该事务持有的任何锁
具体实现
- 修改
evictPage()
,倒着遍历,删除最后一个非脏页 - 修改
transactionComplete()
,根据commit判断是否调用public synchronized void restorePages(TransactionId tid)
- 为
LockManager
添加removeTidLocks(tid)
,删除所有关于tid的锁 - 修改
HeapFile.java
,writePage()
方法由flushPage()
调用,以及insertTuple时创建新页时调用(以更新磁盘中HeapFile的page数目),deletePage不调用——这个bug导致AbortEvictionTest
一直没有通过,找了很久。。。 - 修改
lruCache
,之前双向链表的更新与lruCache
的更新混在了一起,要将其分开——否则不能保证lruCache
的size,和双向链表的长度相等 getPage
前增加synchronized
修饰符,保证一次只有一个事务在操作缓冲池
Exercise 5
这里没有更改代码,测试自然通过(其实就是要在getPage
的一开始,增加事务获取锁需要的时间的检测,超过时间则抛出异常)