mock_consul_client_internal_test.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package multitenant
  2. import (
  3. "sync"
  4. "time"
  5. consul "github.com/hashicorp/consul/api"
  6. )
  7. type mockKV struct {
  8. mtx sync.Mutex
  9. cond *sync.Cond
  10. kvps map[string]*consul.KVPair
  11. next uint64 // the next update will have this 'index in the the log'
  12. }
  13. func newMockConsulClient() ConsulClient {
  14. m := mockKV{
  15. kvps: map[string]*consul.KVPair{},
  16. }
  17. m.cond = sync.NewCond(&m.mtx)
  18. go m.loop()
  19. return &consulClient{&m}
  20. }
  21. func copyKVPair(in *consul.KVPair) *consul.KVPair {
  22. value := make([]byte, len(in.Value))
  23. copy(value, in.Value)
  24. return &consul.KVPair{
  25. Key: in.Key,
  26. CreateIndex: in.CreateIndex,
  27. ModifyIndex: in.ModifyIndex,
  28. LockIndex: in.LockIndex,
  29. Flags: in.Flags,
  30. Value: value,
  31. Session: in.Session,
  32. }
  33. }
  34. // periodic loop to wake people up, so they can honour timeouts
  35. func (m *mockKV) loop() {
  36. for range time.Tick(1 * time.Second) {
  37. m.mtx.Lock()
  38. m.cond.Broadcast()
  39. m.mtx.Unlock()
  40. }
  41. }
  42. func (m *mockKV) CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error) {
  43. m.mtx.Lock()
  44. defer m.mtx.Unlock()
  45. existing, ok := m.kvps[p.Key]
  46. if ok && existing.ModifyIndex != p.ModifyIndex {
  47. return false, nil, nil
  48. }
  49. if ok {
  50. existing.Value = p.Value
  51. } else {
  52. m.kvps[p.Key] = copyKVPair(p)
  53. }
  54. m.kvps[p.Key].ModifyIndex++
  55. m.kvps[p.Key].LockIndex = m.next
  56. m.next++
  57. m.cond.Broadcast()
  58. return true, nil, nil
  59. }
  60. func (m *mockKV) Get(key string, q *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) {
  61. m.mtx.Lock()
  62. defer m.mtx.Unlock()
  63. value, ok := m.kvps[key]
  64. if !ok {
  65. return nil, nil, nil
  66. }
  67. for q.WaitIndex >= value.ModifyIndex {
  68. m.cond.Wait()
  69. }
  70. return copyKVPair(value), nil, nil
  71. }
  72. func (m *mockKV) List(prefix string, q *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) {
  73. m.mtx.Lock()
  74. defer m.mtx.Unlock()
  75. deadline := time.Now().Add(q.WaitTime)
  76. for m.next <= q.WaitIndex && time.Now().Before(deadline) {
  77. m.cond.Wait()
  78. }
  79. if time.Now().After(deadline) {
  80. return nil, &consul.QueryMeta{LastIndex: q.WaitIndex}, nil
  81. }
  82. result := consul.KVPairs{}
  83. for _, kvp := range m.kvps {
  84. if kvp.LockIndex >= q.WaitIndex {
  85. result = append(result, copyKVPair(kvp))
  86. }
  87. }
  88. return result, &consul.QueryMeta{LastIndex: m.next}, nil
  89. }