iradix.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662
  1. package iradix
  2. import (
  3. "bytes"
  4. "strings"
  5. "github.com/hashicorp/golang-lru/simplelru"
  6. )
  7. const (
  8. // defaultModifiedCache is the default size of the modified node
  9. // cache used per transaction. This is used to cache the updates
  10. // to the nodes near the root, while the leaves do not need to be
  11. // cached. This is important for very large transactions to prevent
  12. // the modified cache from growing to be enormous. This is also used
  13. // to set the max size of the mutation notify maps since those should
  14. // also be bounded in a similar way.
  15. defaultModifiedCache = 8192
  16. )
  17. // Tree implements an immutable radix tree. This can be treated as a
  18. // Dictionary abstract data type. The main advantage over a standard
  19. // hash map is prefix-based lookups and ordered iteration. The immutability
  20. // means that it is safe to concurrently read from a Tree without any
  21. // coordination.
  22. type Tree struct {
  23. root *Node
  24. size int
  25. }
  26. // New returns an empty Tree
  27. func New() *Tree {
  28. t := &Tree{
  29. root: &Node{
  30. mutateCh: make(chan struct{}),
  31. },
  32. }
  33. return t
  34. }
  35. // Len is used to return the number of elements in the tree
  36. func (t *Tree) Len() int {
  37. return t.size
  38. }
  39. // Txn is a transaction on the tree. This transaction is applied
  40. // atomically and returns a new tree when committed. A transaction
  41. // is not thread safe, and should only be used by a single goroutine.
  42. type Txn struct {
  43. // root is the modified root for the transaction.
  44. root *Node
  45. // snap is a snapshot of the root node for use if we have to run the
  46. // slow notify algorithm.
  47. snap *Node
  48. // size tracks the size of the tree as it is modified during the
  49. // transaction.
  50. size int
  51. // writable is a cache of writable nodes that have been created during
  52. // the course of the transaction. This allows us to re-use the same
  53. // nodes for further writes and avoid unnecessary copies of nodes that
  54. // have never been exposed outside the transaction. This will only hold
  55. // up to defaultModifiedCache number of entries.
  56. writable *simplelru.LRU
  57. // trackChannels is used to hold channels that need to be notified to
  58. // signal mutation of the tree. This will only hold up to
  59. // defaultModifiedCache number of entries, after which we will set the
  60. // trackOverflow flag, which will cause us to use a more expensive
  61. // algorithm to perform the notifications. Mutation tracking is only
  62. // performed if trackMutate is true.
  63. trackChannels map[chan struct{}]struct{}
  64. trackOverflow bool
  65. trackMutate bool
  66. }
  67. // Txn starts a new transaction that can be used to mutate the tree
  68. func (t *Tree) Txn() *Txn {
  69. txn := &Txn{
  70. root: t.root,
  71. snap: t.root,
  72. size: t.size,
  73. }
  74. return txn
  75. }
  76. // TrackMutate can be used to toggle if mutations are tracked. If this is enabled
  77. // then notifications will be issued for affected internal nodes and leaves when
  78. // the transaction is committed.
  79. func (t *Txn) TrackMutate(track bool) {
  80. t.trackMutate = track
  81. }
  82. // trackChannel safely attempts to track the given mutation channel, setting the
  83. // overflow flag if we can no longer track any more. This limits the amount of
  84. // state that will accumulate during a transaction and we have a slower algorithm
  85. // to switch to if we overflow.
  86. func (t *Txn) trackChannel(ch chan struct{}) {
  87. // In overflow, make sure we don't store any more objects.
  88. if t.trackOverflow {
  89. return
  90. }
  91. // If this would overflow the state we reject it and set the flag (since
  92. // we aren't tracking everything that's required any longer).
  93. if len(t.trackChannels) >= defaultModifiedCache {
  94. // Mark that we are in the overflow state
  95. t.trackOverflow = true
  96. // Clear the map so that the channels can be garbage collected. It is
  97. // safe to do this since we have already overflowed and will be using
  98. // the slow notify algorithm.
  99. t.trackChannels = nil
  100. return
  101. }
  102. // Create the map on the fly when we need it.
  103. if t.trackChannels == nil {
  104. t.trackChannels = make(map[chan struct{}]struct{})
  105. }
  106. // Otherwise we are good to track it.
  107. t.trackChannels[ch] = struct{}{}
  108. }
  109. // writeNode returns a node to be modified, if the current node has already been
  110. // modified during the course of the transaction, it is used in-place. Set
  111. // forLeafUpdate to true if you are getting a write node to update the leaf,
  112. // which will set leaf mutation tracking appropriately as well.
  113. func (t *Txn) writeNode(n *Node, forLeafUpdate bool) *Node {
  114. // Ensure the writable set exists.
  115. if t.writable == nil {
  116. lru, err := simplelru.NewLRU(defaultModifiedCache, nil)
  117. if err != nil {
  118. panic(err)
  119. }
  120. t.writable = lru
  121. }
  122. // If this node has already been modified, we can continue to use it
  123. // during this transaction. We know that we don't need to track it for
  124. // a node update since the node is writable, but if this is for a leaf
  125. // update we track it, in case the initial write to this node didn't
  126. // update the leaf.
  127. if _, ok := t.writable.Get(n); ok {
  128. if t.trackMutate && forLeafUpdate && n.leaf != nil {
  129. t.trackChannel(n.leaf.mutateCh)
  130. }
  131. return n
  132. }
  133. // Mark this node as being mutated.
  134. if t.trackMutate {
  135. t.trackChannel(n.mutateCh)
  136. }
  137. // Mark its leaf as being mutated, if appropriate.
  138. if t.trackMutate && forLeafUpdate && n.leaf != nil {
  139. t.trackChannel(n.leaf.mutateCh)
  140. }
  141. // Copy the existing node. If you have set forLeafUpdate it will be
  142. // safe to replace this leaf with another after you get your node for
  143. // writing. You MUST replace it, because the channel associated with
  144. // this leaf will be closed when this transaction is committed.
  145. nc := &Node{
  146. mutateCh: make(chan struct{}),
  147. leaf: n.leaf,
  148. }
  149. if n.prefix != nil {
  150. nc.prefix = make([]byte, len(n.prefix))
  151. copy(nc.prefix, n.prefix)
  152. }
  153. if len(n.edges) != 0 {
  154. nc.edges = make([]edge, len(n.edges))
  155. copy(nc.edges, n.edges)
  156. }
  157. // Mark this node as writable.
  158. t.writable.Add(nc, nil)
  159. return nc
  160. }
  161. // Visit all the nodes in the tree under n, and add their mutateChannels to the transaction
  162. // Returns the size of the subtree visited
  163. func (t *Txn) trackChannelsAndCount(n *Node) int {
  164. // Count only leaf nodes
  165. leaves := 0
  166. if n.leaf != nil {
  167. leaves = 1
  168. }
  169. // Mark this node as being mutated.
  170. if t.trackMutate {
  171. t.trackChannel(n.mutateCh)
  172. }
  173. // Mark its leaf as being mutated, if appropriate.
  174. if t.trackMutate && n.leaf != nil {
  175. t.trackChannel(n.leaf.mutateCh)
  176. }
  177. // Recurse on the children
  178. for _, e := range n.edges {
  179. leaves += t.trackChannelsAndCount(e.node)
  180. }
  181. return leaves
  182. }
  183. // mergeChild is called to collapse the given node with its child. This is only
  184. // called when the given node is not a leaf and has a single edge.
  185. func (t *Txn) mergeChild(n *Node) {
  186. // Mark the child node as being mutated since we are about to abandon
  187. // it. We don't need to mark the leaf since we are retaining it if it
  188. // is there.
  189. e := n.edges[0]
  190. child := e.node
  191. if t.trackMutate {
  192. t.trackChannel(child.mutateCh)
  193. }
  194. // Merge the nodes.
  195. n.prefix = concat(n.prefix, child.prefix)
  196. n.leaf = child.leaf
  197. if len(child.edges) != 0 {
  198. n.edges = make([]edge, len(child.edges))
  199. copy(n.edges, child.edges)
  200. } else {
  201. n.edges = nil
  202. }
  203. }
  204. // insert does a recursive insertion
  205. func (t *Txn) insert(n *Node, k, search []byte, v interface{}) (*Node, interface{}, bool) {
  206. // Handle key exhaustion
  207. if len(search) == 0 {
  208. var oldVal interface{}
  209. didUpdate := false
  210. if n.isLeaf() {
  211. oldVal = n.leaf.val
  212. didUpdate = true
  213. }
  214. nc := t.writeNode(n, true)
  215. nc.leaf = &leafNode{
  216. mutateCh: make(chan struct{}),
  217. key: k,
  218. val: v,
  219. }
  220. return nc, oldVal, didUpdate
  221. }
  222. // Look for the edge
  223. idx, child := n.getEdge(search[0])
  224. // No edge, create one
  225. if child == nil {
  226. e := edge{
  227. label: search[0],
  228. node: &Node{
  229. mutateCh: make(chan struct{}),
  230. leaf: &leafNode{
  231. mutateCh: make(chan struct{}),
  232. key: k,
  233. val: v,
  234. },
  235. prefix: search,
  236. },
  237. }
  238. nc := t.writeNode(n, false)
  239. nc.addEdge(e)
  240. return nc, nil, false
  241. }
  242. // Determine longest prefix of the search key on match
  243. commonPrefix := longestPrefix(search, child.prefix)
  244. if commonPrefix == len(child.prefix) {
  245. search = search[commonPrefix:]
  246. newChild, oldVal, didUpdate := t.insert(child, k, search, v)
  247. if newChild != nil {
  248. nc := t.writeNode(n, false)
  249. nc.edges[idx].node = newChild
  250. return nc, oldVal, didUpdate
  251. }
  252. return nil, oldVal, didUpdate
  253. }
  254. // Split the node
  255. nc := t.writeNode(n, false)
  256. splitNode := &Node{
  257. mutateCh: make(chan struct{}),
  258. prefix: search[:commonPrefix],
  259. }
  260. nc.replaceEdge(edge{
  261. label: search[0],
  262. node: splitNode,
  263. })
  264. // Restore the existing child node
  265. modChild := t.writeNode(child, false)
  266. splitNode.addEdge(edge{
  267. label: modChild.prefix[commonPrefix],
  268. node: modChild,
  269. })
  270. modChild.prefix = modChild.prefix[commonPrefix:]
  271. // Create a new leaf node
  272. leaf := &leafNode{
  273. mutateCh: make(chan struct{}),
  274. key: k,
  275. val: v,
  276. }
  277. // If the new key is a subset, add to to this node
  278. search = search[commonPrefix:]
  279. if len(search) == 0 {
  280. splitNode.leaf = leaf
  281. return nc, nil, false
  282. }
  283. // Create a new edge for the node
  284. splitNode.addEdge(edge{
  285. label: search[0],
  286. node: &Node{
  287. mutateCh: make(chan struct{}),
  288. leaf: leaf,
  289. prefix: search,
  290. },
  291. })
  292. return nc, nil, false
  293. }
  294. // delete does a recursive deletion
  295. func (t *Txn) delete(parent, n *Node, search []byte) (*Node, *leafNode) {
  296. // Check for key exhaustion
  297. if len(search) == 0 {
  298. if !n.isLeaf() {
  299. return nil, nil
  300. }
  301. // Copy the pointer in case we are in a transaction that already
  302. // modified this node since the node will be reused. Any changes
  303. // made to the node will not affect returning the original leaf
  304. // value.
  305. oldLeaf := n.leaf
  306. // Remove the leaf node
  307. nc := t.writeNode(n, true)
  308. nc.leaf = nil
  309. // Check if this node should be merged
  310. if n != t.root && len(nc.edges) == 1 {
  311. t.mergeChild(nc)
  312. }
  313. return nc, oldLeaf
  314. }
  315. // Look for an edge
  316. label := search[0]
  317. idx, child := n.getEdge(label)
  318. if child == nil || !bytes.HasPrefix(search, child.prefix) {
  319. return nil, nil
  320. }
  321. // Consume the search prefix
  322. search = search[len(child.prefix):]
  323. newChild, leaf := t.delete(n, child, search)
  324. if newChild == nil {
  325. return nil, nil
  326. }
  327. // Copy this node. WATCH OUT - it's safe to pass "false" here because we
  328. // will only ADD a leaf via nc.mergeChild() if there isn't one due to
  329. // the !nc.isLeaf() check in the logic just below. This is pretty subtle,
  330. // so be careful if you change any of the logic here.
  331. nc := t.writeNode(n, false)
  332. // Delete the edge if the node has no edges
  333. if newChild.leaf == nil && len(newChild.edges) == 0 {
  334. nc.delEdge(label)
  335. if n != t.root && len(nc.edges) == 1 && !nc.isLeaf() {
  336. t.mergeChild(nc)
  337. }
  338. } else {
  339. nc.edges[idx].node = newChild
  340. }
  341. return nc, leaf
  342. }
  343. // delete does a recursive deletion
  344. func (t *Txn) deletePrefix(parent, n *Node, search []byte) (*Node, int) {
  345. // Check for key exhaustion
  346. if len(search) == 0 {
  347. nc := t.writeNode(n, true)
  348. if n.isLeaf() {
  349. nc.leaf = nil
  350. }
  351. nc.edges = nil
  352. return nc, t.trackChannelsAndCount(n)
  353. }
  354. // Look for an edge
  355. label := search[0]
  356. idx, child := n.getEdge(label)
  357. // We make sure that either the child node's prefix starts with the search term, or the search term starts with the child node's prefix
  358. // Need to do both so that we can delete prefixes that don't correspond to any node in the tree
  359. if child == nil || (!bytes.HasPrefix(child.prefix, search) && !bytes.HasPrefix(search, child.prefix)) {
  360. return nil, 0
  361. }
  362. // Consume the search prefix
  363. if len(child.prefix) > len(search) {
  364. search = []byte("")
  365. } else {
  366. search = search[len(child.prefix):]
  367. }
  368. newChild, numDeletions := t.deletePrefix(n, child, search)
  369. if newChild == nil {
  370. return nil, 0
  371. }
  372. // Copy this node. WATCH OUT - it's safe to pass "false" here because we
  373. // will only ADD a leaf via nc.mergeChild() if there isn't one due to
  374. // the !nc.isLeaf() check in the logic just below. This is pretty subtle,
  375. // so be careful if you change any of the logic here.
  376. nc := t.writeNode(n, false)
  377. // Delete the edge if the node has no edges
  378. if newChild.leaf == nil && len(newChild.edges) == 0 {
  379. nc.delEdge(label)
  380. if n != t.root && len(nc.edges) == 1 && !nc.isLeaf() {
  381. t.mergeChild(nc)
  382. }
  383. } else {
  384. nc.edges[idx].node = newChild
  385. }
  386. return nc, numDeletions
  387. }
  388. // Insert is used to add or update a given key. The return provides
  389. // the previous value and a bool indicating if any was set.
  390. func (t *Txn) Insert(k []byte, v interface{}) (interface{}, bool) {
  391. newRoot, oldVal, didUpdate := t.insert(t.root, k, k, v)
  392. if newRoot != nil {
  393. t.root = newRoot
  394. }
  395. if !didUpdate {
  396. t.size++
  397. }
  398. return oldVal, didUpdate
  399. }
  400. // Delete is used to delete a given key. Returns the old value if any,
  401. // and a bool indicating if the key was set.
  402. func (t *Txn) Delete(k []byte) (interface{}, bool) {
  403. newRoot, leaf := t.delete(nil, t.root, k)
  404. if newRoot != nil {
  405. t.root = newRoot
  406. }
  407. if leaf != nil {
  408. t.size--
  409. return leaf.val, true
  410. }
  411. return nil, false
  412. }
  413. // DeletePrefix is used to delete an entire subtree that matches the prefix
  414. // This will delete all nodes under that prefix
  415. func (t *Txn) DeletePrefix(prefix []byte) bool {
  416. newRoot, numDeletions := t.deletePrefix(nil, t.root, prefix)
  417. if newRoot != nil {
  418. t.root = newRoot
  419. t.size = t.size - numDeletions
  420. return true
  421. }
  422. return false
  423. }
  424. // Root returns the current root of the radix tree within this
  425. // transaction. The root is not safe across insert and delete operations,
  426. // but can be used to read the current state during a transaction.
  427. func (t *Txn) Root() *Node {
  428. return t.root
  429. }
  430. // Get is used to lookup a specific key, returning
  431. // the value and if it was found
  432. func (t *Txn) Get(k []byte) (interface{}, bool) {
  433. return t.root.Get(k)
  434. }
  435. // GetWatch is used to lookup a specific key, returning
  436. // the watch channel, value and if it was found
  437. func (t *Txn) GetWatch(k []byte) (<-chan struct{}, interface{}, bool) {
  438. return t.root.GetWatch(k)
  439. }
  440. // Commit is used to finalize the transaction and return a new tree. If mutation
  441. // tracking is turned on then notifications will also be issued.
  442. func (t *Txn) Commit() *Tree {
  443. nt := t.CommitOnly()
  444. if t.trackMutate {
  445. t.Notify()
  446. }
  447. return nt
  448. }
  449. // CommitOnly is used to finalize the transaction and return a new tree, but
  450. // does not issue any notifications until Notify is called.
  451. func (t *Txn) CommitOnly() *Tree {
  452. nt := &Tree{t.root, t.size}
  453. t.writable = nil
  454. return nt
  455. }
  456. // slowNotify does a complete comparison of the before and after trees in order
  457. // to trigger notifications. This doesn't require any additional state but it
  458. // is very expensive to compute.
  459. func (t *Txn) slowNotify() {
  460. snapIter := t.snap.rawIterator()
  461. rootIter := t.root.rawIterator()
  462. for snapIter.Front() != nil || rootIter.Front() != nil {
  463. // If we've exhausted the nodes in the old snapshot, we know
  464. // there's nothing remaining to notify.
  465. if snapIter.Front() == nil {
  466. return
  467. }
  468. snapElem := snapIter.Front()
  469. // If we've exhausted the nodes in the new root, we know we need
  470. // to invalidate everything that remains in the old snapshot. We
  471. // know from the loop condition there's something in the old
  472. // snapshot.
  473. if rootIter.Front() == nil {
  474. close(snapElem.mutateCh)
  475. if snapElem.isLeaf() {
  476. close(snapElem.leaf.mutateCh)
  477. }
  478. snapIter.Next()
  479. continue
  480. }
  481. // Do one string compare so we can check the various conditions
  482. // below without repeating the compare.
  483. cmp := strings.Compare(snapIter.Path(), rootIter.Path())
  484. // If the snapshot is behind the root, then we must have deleted
  485. // this node during the transaction.
  486. if cmp < 0 {
  487. close(snapElem.mutateCh)
  488. if snapElem.isLeaf() {
  489. close(snapElem.leaf.mutateCh)
  490. }
  491. snapIter.Next()
  492. continue
  493. }
  494. // If the snapshot is ahead of the root, then we must have added
  495. // this node during the transaction.
  496. if cmp > 0 {
  497. rootIter.Next()
  498. continue
  499. }
  500. // If we have the same path, then we need to see if we mutated a
  501. // node and possibly the leaf.
  502. rootElem := rootIter.Front()
  503. if snapElem != rootElem {
  504. close(snapElem.mutateCh)
  505. if snapElem.leaf != nil && (snapElem.leaf != rootElem.leaf) {
  506. close(snapElem.leaf.mutateCh)
  507. }
  508. }
  509. snapIter.Next()
  510. rootIter.Next()
  511. }
  512. }
  513. // Notify is used along with TrackMutate to trigger notifications. This must
  514. // only be done once a transaction is committed via CommitOnly, and it is called
  515. // automatically by Commit.
  516. func (t *Txn) Notify() {
  517. if !t.trackMutate {
  518. return
  519. }
  520. // If we've overflowed the tracking state we can't use it in any way and
  521. // need to do a full tree compare.
  522. if t.trackOverflow {
  523. t.slowNotify()
  524. } else {
  525. for ch := range t.trackChannels {
  526. close(ch)
  527. }
  528. }
  529. // Clean up the tracking state so that a re-notify is safe (will trigger
  530. // the else clause above which will be a no-op).
  531. t.trackChannels = nil
  532. t.trackOverflow = false
  533. }
  534. // Insert is used to add or update a given key. The return provides
  535. // the new tree, previous value and a bool indicating if any was set.
  536. func (t *Tree) Insert(k []byte, v interface{}) (*Tree, interface{}, bool) {
  537. txn := t.Txn()
  538. old, ok := txn.Insert(k, v)
  539. return txn.Commit(), old, ok
  540. }
  541. // Delete is used to delete a given key. Returns the new tree,
  542. // old value if any, and a bool indicating if the key was set.
  543. func (t *Tree) Delete(k []byte) (*Tree, interface{}, bool) {
  544. txn := t.Txn()
  545. old, ok := txn.Delete(k)
  546. return txn.Commit(), old, ok
  547. }
  548. // DeletePrefix is used to delete all nodes starting with a given prefix. Returns the new tree,
  549. // and a bool indicating if the prefix matched any nodes
  550. func (t *Tree) DeletePrefix(k []byte) (*Tree, bool) {
  551. txn := t.Txn()
  552. ok := txn.DeletePrefix(k)
  553. return txn.Commit(), ok
  554. }
  555. // Root returns the root node of the tree which can be used for richer
  556. // query operations.
  557. func (t *Tree) Root() *Node {
  558. return t.root
  559. }
  560. // Get is used to lookup a specific key, returning
  561. // the value and if it was found
  562. func (t *Tree) Get(k []byte) (interface{}, bool) {
  563. return t.root.Get(k)
  564. }
  565. // longestPrefix finds the length of the shared prefix
  566. // of two strings
  567. func longestPrefix(k1, k2 []byte) int {
  568. max := len(k1)
  569. if l := len(k2); l < max {
  570. max = l
  571. }
  572. var i int
  573. for i = 0; i < max; i++ {
  574. if k1[i] != k2[i] {
  575. break
  576. }
  577. }
  578. return i
  579. }
  580. // concat two byte slices, returning a third new copy
  581. func concat(a, b []byte) []byte {
  582. c := make([]byte, len(a)+len(b))
  583. copy(c, a)
  584. copy(c[len(a):], b)
  585. return c
  586. }