From 3728d998c4d391505e70958699ed0d12ed9c0120 Mon Sep 17 00:00:00 2001 From: yanavlasov Date: Sun, 19 Apr 2026 14:17:03 -0400 Subject: [PATCH 1/4] Send user defined metadata to the client (#1112) Signed-off-by: Yan Avlasov --- src/config/config.go | 2 + src/config/config_impl.go | 74 ++++++++++++++++ src/service/ratelimit.go | 58 ++++++++----- src/service/ratelimit_test.go | 59 ++++++++----- test/config/config_test.go | 53 ++++++++++++ test/config/metadata.yaml | 28 ++++++ test/service/ratelimit_test.go | 154 ++++++++++++++++++++++++++++++--- 7 files changed, 375 insertions(+), 53 deletions(-) create mode 100644 test/config/metadata.yaml diff --git a/src/config/config.go b/src/config/config.go index 6e31883de..fd8619269 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -4,6 +4,7 @@ import ( pb_struct "github.com/envoyproxy/go-control-plane/envoy/extensions/common/ratelimit/v3" pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3" "golang.org/x/net/context" + "google.golang.org/protobuf/types/known/structpb" "github.com/envoyproxy/ratelimit/src/stats" ) @@ -29,6 +30,7 @@ type RateLimit struct { // ShareThresholdKeyPattern is a slice of wildcard patterns for descriptor entries // The slice index corresponds to the descriptor entry index. ShareThresholdKeyPattern []string + Metadata *structpb.Struct } // Interface for interacting with a loaded rate limit config. diff --git a/src/config/config_impl.go b/src/config/config_impl.go index dd3c5b5e8..c2b48e3f9 100644 --- a/src/config/config_impl.go +++ b/src/config/config_impl.go @@ -8,6 +8,7 @@ import ( pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3" logger "github.com/sirupsen/logrus" "golang.org/x/net/context" + "google.golang.org/protobuf/types/known/structpb" "gopkg.in/yaml.v2" "github.com/envoyproxy/ratelimit/src/stats" @@ -35,6 +36,7 @@ type YamlDescriptor struct { DetailedMetric bool `yaml:"detailed_metric"` ValueToMetric bool `yaml:"value_to_metric"` ShareThreshold bool `yaml:"share_threshold"` + Metadata map[string]interface{} } type YamlRoot struct { @@ -56,6 +58,7 @@ type rateLimitDescriptor struct { valueToMetric bool shareThreshold bool wildcardPattern string // stores the wildcard pattern when share_threshold is true + metadata *structpb.Struct } type rateLimitDomain struct { @@ -84,6 +87,7 @@ var validKeys = map[string]bool{ "detailed_metric": true, "value_to_metric": true, "share_threshold": true, + "metadata": true, } // Create a new rate limit config entry. @@ -134,6 +138,64 @@ func newRateLimitConfigError(name string, err string) RateLimitConfigError { return RateLimitConfigError(fmt.Sprintf("%s: %s", name, err)) } +func convertMap(m map[interface{}]interface{}) map[string]interface{} { + res := make(map[string]interface{}) + for k, v := range m { + strKey := fmt.Sprintf("%v", k) + switch val := v.(type) { + case map[interface{}]interface{}: + res[strKey] = convertMap(val) + case []interface{}: + res[strKey] = convertSlice(val) + default: + res[strKey] = v + } + } + return res +} + +func convertSlice(s []interface{}) []interface{} { + res := make([]interface{}, len(s)) + for i, v := range s { + switch val := v.(type) { + case map[interface{}]interface{}: + res[i] = convertMap(val) + case []interface{}: + res[i] = convertSlice(val) + default: + res[i] = v + } + } + return res +} + +// Create envoyMetadata from YamlMetadata +// @param yamlMetadata supplies metadata parsed from config YAML +func parseMetadata(yamlMetadata map[string]interface{}) (*structpb.Struct, error) { + if len(yamlMetadata) == 0 { + return nil, nil + } + + convertedValue := make(map[string]interface{}) + for k, v := range yamlMetadata { + switch val := v.(type) { + case map[interface{}]interface{}: + convertedValue[k] = convertMap(val) + case []interface{}: + convertedValue[k] = convertSlice(val) + default: + convertedValue[k] = v + } + } + + pbStruct, err := structpb.NewStruct(convertedValue) + if err != nil { + return nil, err + } + + return pbStruct, nil +} + // wildcardMatch reports whether value matches a pre-split wildcard pattern. // parts is the pattern split on "*" at load time (e.g. ["path_bar", "baz", "qux"]). // Each * matches zero or more characters. No allocations are performed per call. @@ -268,6 +330,11 @@ func (this *rateLimitDescriptor) loadDescriptors(config RateLimitConfigToLoad, p }) } + metadata, err := parseMetadata(descriptorConfig.Metadata) + if err != nil { + panic(newRateLimitConfigError(config.Name, fmt.Sprintf("error parsing metadata: %s", err.Error()))) + } + logger.Debugf( "loading descriptor: key=%s%s", newParentKey, rateLimitDebugString) newDescriptor := &rateLimitDescriptor{ @@ -277,6 +344,7 @@ func (this *rateLimitDescriptor) loadDescriptors(config RateLimitConfigToLoad, p valueToMetric: descriptorConfig.ValueToMetric, shareThreshold: descriptorConfig.ShareThreshold, wildcardPattern: wildcardPattern, + metadata: metadata, } newDescriptor.loadDescriptors(config, newParentKey+".", descriptorConfig.Descriptors, statsManager) this.descriptors[finalKey] = newDescriptor @@ -298,6 +366,11 @@ func validateYamlKeys(fileName string, config_map map[interface{}]interface{}) { logger.Debug(errorText) panic(newRateLimitConfigError(fileName, errorText)) } + if k.(string) == "metadata" { + // Do not validate keys/values in the metadata, since they are arbitrary. If config is invalid the parsing fill fail + // later when it is converted to protobuf.Struct + continue + } switch v := v.(type) { case []interface{}: for _, e := range v { @@ -516,6 +589,7 @@ func (this *rateLimitConfigImpl) GetLimit( DetailedMetric: originalLimit.DetailedMetric, // Initialize ShareThresholdKeyPattern with correct length, empty strings for entries without share_threshold ShareThresholdKeyPattern: nil, + Metadata: nextDescriptor.metadata, } // Apply all tracked share_threshold patterns when we find the rate_limit // This works whether the rate_limit is at the wildcard level or deeper diff --git a/src/service/ratelimit.go b/src/service/ratelimit.go index f6ecd2e24..48e6a59a7 100644 --- a/src/service/ratelimit.go +++ b/src/service/ratelimit.go @@ -207,7 +207,7 @@ func (this *service) shouldRateLimitWorker( var minimumDescriptor *pb.RateLimitResponse_DescriptorStatus = nil // Track quota mode violations for metadata - var quotaModeViolations []int + var passedDescriptors []int failedRateLimitDescriptors := 0 failedQuotaDescriptors := 0 totalQuotaDescriptors := 0 @@ -231,14 +231,15 @@ func (this *service) shouldRateLimitWorker( isQuotaMode := globalQuotaMode || (limitsToCheck[i] != nil && limitsToCheck[i].QuotaMode) if descriptorStatus.Code == pb.RateLimitResponse_OVER_LIMIT { if isQuotaMode { - // In quota mode: track the violation for metadata but keep response as OK - quotaModeViolations = append(quotaModeViolations, i) failedQuotaDescriptors += 1 } else { failedRateLimitDescriptors += 1 minimumDescriptor = descriptorStatus minLimitRemaining = 0 } + } else { + // Keep track of the descriptors that have passed + passedDescriptors = append(passedDescriptors, i) } if isQuotaMode { totalQuotaDescriptors += 1 @@ -270,14 +271,14 @@ func (this *service) shouldRateLimitWorker( // If response dynamic data enabled, set dynamic data on response. if this.responseDynamicMetadataEnabled { - response.DynamicMetadata = ratelimitToMetadata(request, quotaModeViolations, limitsToCheck) + response.DynamicMetadata = ratelimitToMetadata(request, passedDescriptors, limitsToCheck) } response.OverallCode = finalCode return response } -func ratelimitToMetadata(req *pb.RateLimitRequest, quotaModeViolations []int, limitsToCheck []*config.RateLimit) *structpb.Struct { +func ratelimitToMetadata(req *pb.RateLimitRequest, passedDescriptors []int, limitsToCheck []*config.RateLimit) *structpb.Struct { fields := make(map[string]*structpb.Value) // Domain @@ -301,26 +302,19 @@ func ratelimitToMetadata(req *pb.RateLimitRequest, quotaModeViolations []int, li fields["hitsAddend"] = structpb.NewNumberValue(float64(hitsAddend)) } - // Quota mode information - if len(quotaModeViolations) > 0 { - violationValues := make([]*structpb.Value, len(quotaModeViolations)) - for i, violationIndex := range quotaModeViolations { - violationValues[i] = structpb.NewNumberValue(float64(violationIndex)) + passedMetadata := &structpb.Struct{Fields: make(map[string]*structpb.Value)} + for _, idx := range passedDescriptors { + if idx < len(limitsToCheck) { + limit := limitsToCheck[idx] + if limit != nil && limit.Metadata != nil { + mergeMetadata(passedMetadata, limit.Metadata) + } } - fields["quotaModeViolations"] = structpb.NewListValue(&structpb.ListValue{ - Values: violationValues, - }) } - // Check if any limits have quota mode enabled - quotaModeEnabled := false - for _, limit := range limitsToCheck { - if limit != nil && limit.QuotaMode { - quotaModeEnabled = true - break - } + if len(passedMetadata.GetFields()) > 0 { + fields["metadata"] = structpb.NewStructValue(passedMetadata) } - fields["quotaModeEnabled"] = structpb.NewBoolValue(quotaModeEnabled) return &structpb.Struct{Fields: fields} } @@ -355,6 +349,28 @@ func descriptorToStruct(descriptor *ratelimitv3.RateLimitDescriptor) *structpb.S return &structpb.Struct{Fields: fields} } +func mergeMetadata(dest *structpb.Struct, src *structpb.Struct) { + if src == nil { + return + } + for k, v := range src.GetFields() { + destVal, exists := dest.GetFields()[k] + if exists { + // If both are structs, merge them recursively + if destStruct := destVal.GetStructValue(); destStruct != nil { + if srcStruct := v.GetStructValue(); srcStruct != nil { + mergeMetadata(destStruct, srcStruct) + continue + } + } + // TODO(yanavlasov): add option to overwrite or add if typoe is a list + } else { + // Otherwise overwrite or add + dest.GetFields()[k] = v + } + } +} + func (this *service) rateLimitLimitHeader(descriptor *pb.RateLimitResponse_DescriptorStatus) *core.HeaderValue { // Limit header only provides the mandatory part from the spec, the actual limit // the optional quota policy is currently not provided diff --git a/src/service/ratelimit_test.go b/src/service/ratelimit_test.go index 0f43b6fb0..bb0234d74 100644 --- a/src/service/ratelimit_test.go +++ b/src/service/ratelimit_test.go @@ -16,11 +16,11 @@ import ( func TestRatelimitToMetadata(t *testing.T) { cases := []struct { - name string - req *pb.RateLimitRequest - quotaModeViolations []int - limitsToCheck []*config.RateLimit - expected string + name string + req *pb.RateLimitRequest + passedDescriptors []int + limitsToCheck []*config.RateLimit + expected string }{ { name: "Single descriptor with single entry, no quota violations", @@ -37,8 +37,8 @@ func TestRatelimitToMetadata(t *testing.T) { }, }, }, - quotaModeViolations: nil, - limitsToCheck: []*config.RateLimit{nil}, + passedDescriptors: nil, + limitsToCheck: []*config.RateLimit{nil}, expected: `{ "descriptors": [ { @@ -47,8 +47,7 @@ func TestRatelimitToMetadata(t *testing.T) { ] } ], - "domain": "fake-domain", - "quotaModeEnabled": false + "domain": "fake-domain" }`, }, { @@ -66,10 +65,15 @@ func TestRatelimitToMetadata(t *testing.T) { }, }, }, - quotaModeViolations: []int{0}, + passedDescriptors: []int{0}, limitsToCheck: []*config.RateLimit{ { QuotaMode: true, + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "name": structpb.NewStringValue("service_1"), + }, + }, }, }, expected: `{ @@ -81,8 +85,9 @@ func TestRatelimitToMetadata(t *testing.T) { } ], "domain": "quota-domain", - "quotaModeEnabled": true, - "quotaModeViolations": [0] + "metadata": { + "name": "service_1" + } }`, }, { @@ -116,16 +121,31 @@ func TestRatelimitToMetadata(t *testing.T) { }, }, }, - quotaModeViolations: []int{1, 2}, + passedDescriptors: []int{1, 2}, limitsToCheck: []*config.RateLimit{ { QuotaMode: false, + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "name": structpb.NewStringValue("service_1"), + }, + }, }, { QuotaMode: true, + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "name": structpb.NewStringValue("service_2"), + }, + }, }, { QuotaMode: true, + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "name": structpb.NewStringValue("service_3"), + }, + }, }, }, expected: `{ @@ -147,8 +167,9 @@ func TestRatelimitToMetadata(t *testing.T) { } ], "domain": "mixed-domain", - "quotaModeEnabled": true, - "quotaModeViolations": [1, 2] + "metadata": { + "name": "service_2" + } }`, }, { @@ -167,7 +188,7 @@ func TestRatelimitToMetadata(t *testing.T) { }, }, }, - quotaModeViolations: []int{0}, + passedDescriptors: []int{0}, limitsToCheck: []*config.RateLimit{ { QuotaMode: true, @@ -182,16 +203,14 @@ func TestRatelimitToMetadata(t *testing.T) { } ], "domain": "addend-domain", - "hitsAddend": 5, - "quotaModeEnabled": true, - "quotaModeViolations": [0] + "hitsAddend": 5 }`, }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - got := ratelimitToMetadata(tc.req, tc.quotaModeViolations, tc.limitsToCheck) + got := ratelimitToMetadata(tc.req, tc.passedDescriptors, tc.limitsToCheck) expected := &structpb.Struct{} err := protojson.Unmarshal([]byte(tc.expected), expected) require.NoError(t, err) diff --git a/test/config/config_test.go b/test/config/config_test.go index 4b371fb6b..2bd009a99 100644 --- a/test/config/config_test.go +++ b/test/config/config_test.go @@ -2206,5 +2206,58 @@ func TestWildcardStatsBehavior(t *testing.T) { }) } +func TestMetadata(t *testing.T) { + assert := assert.New(t) + stats := stats.NewStore(stats.NewNullSink(), false) + rlConfig := config.NewRateLimitConfigImpl(loadFile("metadata.yaml"), mockstats.NewMockStatManager(stats), false) + rlConfig.Dump() + assert.Equal(rlConfig.IsEmptyDomains(), false) + assert.Nil(rlConfig.GetLimit(context.TODO(), "test-domain", &pb_struct.RateLimitDescriptor{})) + assert.EqualValues(0, stats.NewCounter("test-domain.domain_not_found").Value()) + + rl := rlConfig.GetLimit( + context.TODO(), "test-domain", + &pb_struct.RateLimitDescriptor{ + Entries: []*pb_struct.RateLimitDescriptor_Entry{{Key: "key1", Value: "value1"}, {Key: "subkey1", Value: "something"}}, + }) + rl.Stats.TotalHits.Inc() + rl.Stats.OverLimit.Inc() + rl.Stats.NearLimit.Inc() + rl.Stats.WithinLimit.Inc() + assert.EqualValues(5, rl.Limit.RequestsPerUnit) + assert.Equal(pb.RateLimitResponse_RateLimit_SECOND, rl.Limit.Unit) + assert.EqualValues(1, stats.NewCounter("test-domain.key1_value1.subkey1.total_hits").Value()) + assert.EqualValues(1, stats.NewCounter("test-domain.key1_value1.subkey1.over_limit").Value()) + assert.EqualValues(1, stats.NewCounter("test-domain.key1_value1.subkey1.near_limit").Value()) + assert.EqualValues(1, stats.NewCounter("test-domain.key1_value1.subkey1.within_limit").Value()) + + // Verify metadata for key1_value1.subkey1 + assert.NotNil(rl.Metadata) + nameVal, ok := rl.Metadata.GetFields()["name"] + assert.True(ok) + assert.Equal("service_1", nameVal.GetStringValue()) + + rl = rlConfig.GetLimit( + context.TODO(), "test-domain", + &pb_struct.RateLimitDescriptor{ + Entries: []*pb_struct.RateLimitDescriptor_Entry{{Key: "key2", Value: "something"}}, + }) + rl.Stats.TotalHits.Inc() + rl.Stats.OverLimit.Inc() + rl.Stats.NearLimit.Inc() + rl.Stats.WithinLimit.Inc() + assert.EqualValues(20, rl.Limit.RequestsPerUnit) + assert.Equal(pb.RateLimitResponse_RateLimit_MINUTE, rl.Limit.Unit) + assert.EqualValues(1, stats.NewCounter("test-domain.key2.total_hits").Value()) + assert.EqualValues(1, stats.NewCounter("test-domain.key2.over_limit").Value()) + assert.EqualValues(1, stats.NewCounter("test-domain.key2.near_limit").Value()) + assert.EqualValues(1, stats.NewCounter("test-domain.key2.within_limit").Value()) + + assert.NotNil(rl.Metadata) + serviceVal, ok := rl.Metadata.GetFields()["service_with_quota"] + assert.True(ok) + assert.Equal("service_2", serviceVal.GetStructValue().GetFields()["name"].GetStringValue()) +} + // share_threshold parity (middle+trailing wildcards produce the same shared-counter // behaviour as trailing-only) is covered by TestShareThreshold Cases 5-7. diff --git a/test/config/metadata.yaml b/test/config/metadata.yaml new file mode 100644 index 000000000..cb1dadbc6 --- /dev/null +++ b/test/config/metadata.yaml @@ -0,0 +1,28 @@ +domain: test-domain +descriptors: + # Top level key/value with no default rate limit. + - key: key1 + value: value1 + descriptors: + # 2nd level key only with default rate limit. + - key: subkey1 + rate_limit: + unit: second + requests_per_unit: 5 + metadata: + name: service_1 + + # 2nd level key/value with limit. Specific override at 2nd level. + - key: subkey1 + value: subvalue1 + rate_limit: + unit: second + requests_per_unit: 10 + + - key: key2 + rate_limit: + unit: minute + requests_per_unit: 20 + metadata: + service_with_quota: + name: service_2 diff --git a/test/service/ratelimit_test.go b/test/service/ratelimit_test.go index a949f9a68..c77ba6b87 100644 --- a/test/service/ratelimit_test.go +++ b/test/service/ratelimit_test.go @@ -22,6 +22,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/protobuf/types/known/structpb" "github.com/envoyproxy/ratelimit/src/trace" @@ -719,7 +720,7 @@ func TestServiceGlobalQuotaMode(test *testing.T) { t.assert.Nil(err) } -func TestServiceQuotaModeWithMetadata(test *testing.T) { +func TestMetadataReturnedForPassedDescriptors(test *testing.T) { os.Setenv("QUOTA_MODE", "true") os.Setenv("RESPONSE_DYNAMIC_METADATA", "true") defer func() { @@ -749,6 +750,9 @@ func TestServiceQuotaModeWithMetadata(test *testing.T) { config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_MINUTE, t.statsManager.NewStats("key"), false, false, false, "", nil, false), config.NewRateLimit(5, pb.RateLimitResponse_RateLimit_MINUTE, t.statsManager.NewStats("key2"), false, false, true, "", nil, false), } + limits[0].Metadata = &structpb.Struct{Fields: map[string]*structpb.Value{"name": structpb.NewStringValue("service_1")}} + limits[1].Metadata = &structpb.Struct{Fields: map[string]*structpb.Value{"name": structpb.NewStringValue("service_2")}} + t.config.EXPECT().GetLimit(context.Background(), "quota-domain", request.Descriptors[0]).Return(limits[0]) t.config.EXPECT().GetLimit(context.Background(), "quota-domain", request.Descriptors[1]).Return(limits[1]) t.cache.EXPECT().DoLimit(context.Background(), request, limits).Return( @@ -757,23 +761,149 @@ func TestServiceQuotaModeWithMetadata(test *testing.T) { {Code: pb.RateLimitResponse_OVER_LIMIT, CurrentLimit: limits[1].Limit, LimitRemaining: 0}, }) response, err := service.ShouldRateLimit(context.Background(), request) + test.Logf("DynamicMetadata: %+v", response.DynamicMetadata) + + // Verify response includes metadata about quota violations + t.assert.Nil(err) + t.assert.Equal(pb.RateLimitResponse_OK, response.OverallCode) + t.assert.NotNil(response.DynamicMetadata) + + // Verify metadata for passed limits + passedMetadataVal, ok := response.DynamicMetadata.GetFields()["metadata"] + t.assert.True(ok) + passedMetadata := passedMetadataVal.GetStructValue() + t.assert.NotNil(passedMetadata) + + fields := passedMetadata.GetFields() + nameVal, ok := fields["name"] + t.assert.True(ok) + // Since descriptor 1 has passed and 2 had failed, metadata from the first descriptors should be returned + t.assert.Equal("service_1", nameVal.GetStringValue()) +} + +func TestMetadataReturnedForAllPassedDescriptors(test *testing.T) { + os.Setenv("QUOTA_MODE", "true") + os.Setenv("RESPONSE_DYNAMIC_METADATA", "true") + defer func() { + os.Unsetenv("QUOTA_MODE") + os.Unsetenv("RESPONSE_DYNAMIC_METADATA") + }() + + t := commonSetup(test) + defer t.controller.Finish() + + service := t.setupBasicService() + + // Force a config reload to pick up environment variables. + barrier := newBarrier() + t.configUpdateEvent.EXPECT().GetConfig().DoAndReturn(func() (config.RateLimitConfig, any) { + barrier.signal() + return t.config, nil + }) + t.configUpdateEventChan <- t.configUpdateEvent + barrier.wait() + + // Make a request. + request := common.NewRateLimitRequest( + "quota-domain", [][][2]string{{{"regular", "limit"}}, {{"quota", "limit"}}}, 1) + + limits := []*config.RateLimit{ + config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_MINUTE, t.statsManager.NewStats("key"), false, false, true, "", nil, false), + config.NewRateLimit(5, pb.RateLimitResponse_RateLimit_MINUTE, t.statsManager.NewStats("key2"), false, false, true, "", nil, false), + } + limits[0].Metadata = &structpb.Struct{Fields: map[string]*structpb.Value{"name": structpb.NewStringValue("service_1")}} + limits[1].Metadata = &structpb.Struct{Fields: map[string]*structpb.Value{"some_other_name": structpb.NewStringValue("service_2")}} + + t.config.EXPECT().GetLimit(context.Background(), "quota-domain", request.Descriptors[0]).Return(limits[0]) + t.config.EXPECT().GetLimit(context.Background(), "quota-domain", request.Descriptors[1]).Return(limits[1]) + t.cache.EXPECT().DoLimit(context.Background(), request, limits).Return( + []*pb.RateLimitResponse_DescriptorStatus{ + {Code: pb.RateLimitResponse_OK, CurrentLimit: limits[0].Limit, LimitRemaining: 5}, + {Code: pb.RateLimitResponse_OK, CurrentLimit: limits[1].Limit, LimitRemaining: 6}, + }) + response, err := service.ShouldRateLimit(context.Background(), request) + test.Logf("DynamicMetadata: %+v", response.DynamicMetadata) + + // Verify response includes metadata about quota violations + t.assert.Nil(err) + t.assert.Equal(pb.RateLimitResponse_OK, response.OverallCode) + t.assert.NotNil(response.DynamicMetadata) + + // Verify metadata for passed limits + passedMetadataVal, ok := response.DynamicMetadata.GetFields()["metadata"] + t.assert.True(ok) + passedMetadata := passedMetadataVal.GetStructValue() + t.assert.NotNil(passedMetadata) + + fields := passedMetadata.GetFields() + nameVal, ok := fields["name"] + t.assert.True(ok) + // Both descriptors have passed metadata should contain values from both descriptors + t.assert.Equal("service_1", nameVal.GetStringValue()) + nameVal, ok = fields["some_other_name"] + t.assert.True(ok) + t.assert.Equal("service_2", nameVal.GetStringValue()) +} + +func TestOverlappingMetadataReturnsTheFirstValue(test *testing.T) { + os.Setenv("QUOTA_MODE", "true") + os.Setenv("RESPONSE_DYNAMIC_METADATA", "true") + defer func() { + os.Unsetenv("QUOTA_MODE") + os.Unsetenv("RESPONSE_DYNAMIC_METADATA") + }() + + t := commonSetup(test) + defer t.controller.Finish() + + service := t.setupBasicService() + + // Force a config reload to pick up environment variables. + barrier := newBarrier() + t.configUpdateEvent.EXPECT().GetConfig().DoAndReturn(func() (config.RateLimitConfig, any) { + barrier.signal() + return t.config, nil + }) + t.configUpdateEventChan <- t.configUpdateEvent + barrier.wait() + + // Make a request. + request := common.NewRateLimitRequest( + "quota-domain", [][][2]string{{{"regular", "limit"}}, {{"quota", "limit"}}}, 1) + + limits := []*config.RateLimit{ + config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_MINUTE, t.statsManager.NewStats("key"), false, false, true, "", nil, false), + config.NewRateLimit(5, pb.RateLimitResponse_RateLimit_MINUTE, t.statsManager.NewStats("key2"), false, false, true, "", nil, false), + } + limits[0].Metadata = &structpb.Struct{Fields: map[string]*structpb.Value{"name": structpb.NewStringValue("service_1")}} + limits[1].Metadata = &structpb.Struct{Fields: map[string]*structpb.Value{"name": structpb.NewStringValue("service_2")}} + + t.config.EXPECT().GetLimit(context.Background(), "quota-domain", request.Descriptors[0]).Return(limits[0]) + t.config.EXPECT().GetLimit(context.Background(), "quota-domain", request.Descriptors[1]).Return(limits[1]) + t.cache.EXPECT().DoLimit(context.Background(), request, limits).Return( + []*pb.RateLimitResponse_DescriptorStatus{ + {Code: pb.RateLimitResponse_OK, CurrentLimit: limits[0].Limit, LimitRemaining: 5}, + {Code: pb.RateLimitResponse_OK, CurrentLimit: limits[1].Limit, LimitRemaining: 6}, + }) + response, err := service.ShouldRateLimit(context.Background(), request) + test.Logf("DynamicMetadata: %+v", response.DynamicMetadata) // Verify response includes metadata about quota violations t.assert.Nil(err) t.assert.Equal(pb.RateLimitResponse_OK, response.OverallCode) t.assert.NotNil(response.DynamicMetadata) - // Check that quota violation is tracked in metadata for descriptor index 1 - quotaViolations := response.DynamicMetadata.Fields["quotaModeViolations"] - t.assert.NotNil(quotaViolations) - violations := quotaViolations.GetListValue() - t.assert.Len(violations.Values, 1) - t.assert.Equal(float64(1), violations.Values[0].GetNumberValue()) - - // Check that quotaModeEnabled is true - quotaModeEnabled := response.DynamicMetadata.Fields["quotaModeEnabled"] - t.assert.NotNil(quotaModeEnabled) - t.assert.True(quotaModeEnabled.GetBoolValue()) + // Verify metadata for passed limits + passedMetadataVal, ok := response.DynamicMetadata.GetFields()["metadata"] + t.assert.True(ok) + passedMetadata := passedMetadataVal.GetStructValue() + t.assert.NotNil(passedMetadata) + + fields := passedMetadata.GetFields() + nameVal, ok := fields["name"] + t.assert.True(ok) + // Metadata from the first descriptor takes precendence + t.assert.Equal("service_1", nameVal.GetStringValue()) } func TestServicePerDescriptorQuotaMode(test *testing.T) { From caf90daec66b51c8229135e66b63c191363b6aca Mon Sep 17 00:00:00 2001 From: Ian Kerins Date: Sun, 19 Apr 2026 15:05:17 -0400 Subject: [PATCH 2/4] Dockerfile: add ENTRYPOINT (#1095) Make the docker image easier to consume. Signed-off-by: Ian Kerins --- Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/Dockerfile b/Dockerfile index b1171c5d0..cd89973f8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,3 +12,4 @@ RUN CGO_ENABLED=0 GOOS=linux go build -o /go/bin/ratelimit -ldflags="-w -s" -v g FROM gcr.io/distroless/static-debian12:nonroot@sha256:e8a4044e0b4ae4257efa45fc026c0bc30ad320d43bd4c1a7d5271bd241e386d0 COPY --from=build /go/bin/ratelimit /bin/ratelimit +ENTRYPOINT ["/bin/ratelimit"] From 6b266b01ee6b4a232f249c598b19a188dc501795 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Sun, 19 Apr 2026 22:21:24 +0300 Subject: [PATCH 3/4] Fix Prometheus response time units (#1104) --- README.md | 2 + src/service_cmd/runner/runner.go | 3 +- src/settings/settings.go | 25 +++++----- src/settings/settings_test.go | 19 ++++++++ src/stats/prom/default_mapper.yaml | 1 + src/stats/prom/prometheus_sink.go | 24 ++++++++-- src/stats/prom/prometheus_sink_test.go | 64 ++++++++++++++++++++++++++ 7 files changed, 121 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 538b413c1..6cf2e51c2 100644 --- a/README.md +++ b/README.md @@ -1105,6 +1105,7 @@ To enable Prometheus integration set: 2. `PROMETHEUS_ADDR`: The port to listen on for Prometheus metrics. Defaults to `:9090` 3. `PROMETHEUS_PATH`: The path to listen on for Prometheus metrics. Defaults to `/metrics` 4. `PROMETHEUS_MAPPER_YAML`: The path to the YAML file that defines the mapping from statsd to prometheus metrics. +5. `PROMETHEUS_RESPONSE_TIME_AS_MILLISECONDS`: `true` to keep the legacy millisecond behavior for `ratelimit_server.*.response_time` in the built-in mapper. Ignored when `PROMETHEUS_MAPPER_YAML` is set. Define the mapping from statsd to prometheus metrics in a YAML file. Find more information about the mapping in the [Metric Mapping and Configuration](https://github.com/prometheus/statsd_exporter?tab=readme-ov-file#metric-mapping-and-configuration). @@ -1181,6 +1182,7 @@ mappings: # Requires statsd exporter >= v0.6.0 since it uses the "drop" action. - match: "ratelimit_server.*.response_time" name: "ratelimit_service_response_time_seconds" timer_type: histogram + scale: 0.001 labels: grpc_method: "$1" diff --git a/src/service_cmd/runner/runner.go b/src/service_cmd/runner/runner.go index f062d15fc..a3889a58d 100644 --- a/src/service_cmd/runner/runner.go +++ b/src/service_cmd/runner/runner.go @@ -68,7 +68,8 @@ func NewRunner(s settings.Settings) Runner { } logger.Info("Stats initialized for Prometheus") store = gostats.NewStore(prom.NewPrometheusSink(prom.WithAddr(s.PrometheusAddr), - prom.WithPath(s.PrometheusPath), prom.WithMapperYamlPath(s.PrometheusMapperYaml)), false) + prom.WithPath(s.PrometheusPath), prom.WithMapperYamlPath(s.PrometheusMapperYaml), + prom.WithResponseTimeAsMilliseconds(s.PrometheusResponseTimeAsMilliseconds)), false) default: logger.Info("Stats initialized for stdout") store = gostats.NewStore(gostats.NewLoggingSink(), false) diff --git a/src/settings/settings.go b/src/settings/settings.go index 7ad2d80df..6a1be618f 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -82,18 +82,19 @@ type Settings struct { XdsClientGrpcOptionsMaxMsgSizeInBytes int `envconfig:"XDS_CLIENT_MAX_MSG_SIZE_IN_BYTES" default:""` // Stats-related settings - UseDogStatsd bool `envconfig:"USE_DOG_STATSD" default:"false"` - UseDogStatsdMogrifiers []string `envconfig:"USE_DOG_STATSD_MOGRIFIERS" default:""` - UseStatsd bool `envconfig:"USE_STATSD" default:"true"` - StatsdHost string `envconfig:"STATSD_HOST" default:"localhost"` - StatsdPort int `envconfig:"STATSD_PORT" default:"8125"` - ExtraTags map[string]string `envconfig:"EXTRA_TAGS" default:""` - StatsFlushInterval time.Duration `envconfig:"STATS_FLUSH_INTERVAL" default:"10s"` - DisableStats bool `envconfig:"DISABLE_STATS" default:"false"` - UsePrometheus bool `envconfig:"USE_PROMETHEUS" default:"false"` - PrometheusAddr string `envconfig:"PROMETHEUS_ADDR" default:":9090"` - PrometheusPath string `envconfig:"PROMETHEUS_PATH" default:"/metrics"` - PrometheusMapperYaml string `envconfig:"PROMETHEUS_MAPPER_YAML" default:""` + UseDogStatsd bool `envconfig:"USE_DOG_STATSD" default:"false"` + UseDogStatsdMogrifiers []string `envconfig:"USE_DOG_STATSD_MOGRIFIERS" default:""` + UseStatsd bool `envconfig:"USE_STATSD" default:"true"` + StatsdHost string `envconfig:"STATSD_HOST" default:"localhost"` + StatsdPort int `envconfig:"STATSD_PORT" default:"8125"` + ExtraTags map[string]string `envconfig:"EXTRA_TAGS" default:""` + StatsFlushInterval time.Duration `envconfig:"STATS_FLUSH_INTERVAL" default:"10s"` + DisableStats bool `envconfig:"DISABLE_STATS" default:"false"` + UsePrometheus bool `envconfig:"USE_PROMETHEUS" default:"false"` + PrometheusAddr string `envconfig:"PROMETHEUS_ADDR" default:":9090"` + PrometheusPath string `envconfig:"PROMETHEUS_PATH" default:"/metrics"` + PrometheusMapperYaml string `envconfig:"PROMETHEUS_MAPPER_YAML" default:""` + PrometheusResponseTimeAsMilliseconds bool `envconfig:"PROMETHEUS_RESPONSE_TIME_AS_MILLISECONDS" default:"false"` // Settings for rate limit configuration RuntimePath string `envconfig:"RUNTIME_ROOT" default:"/srv/runtime_data/current"` diff --git a/src/settings/settings_test.go b/src/settings/settings_test.go index b2c7bc7eb..f15fbea2b 100644 --- a/src/settings/settings_test.go +++ b/src/settings/settings_test.go @@ -7,12 +7,31 @@ import ( "github.com/stretchr/testify/assert" ) +const prometheusResponseTimeAsMillisecondsEnv = "PROMETHEUS_RESPONSE_TIME_AS_MILLISECONDS" + func TestSettingsTlsConfigUnmodified(t *testing.T) { settings := NewSettings() assert.NotNil(t, settings.RedisTlsConfig) assert.Nil(t, settings.RedisTlsConfig.RootCAs) } +func TestPrometheusResponseTimeAsMillisecondsDefault(t *testing.T) { + os.Unsetenv(prometheusResponseTimeAsMillisecondsEnv) + + settings := NewSettings() + + assert.False(t, settings.PrometheusResponseTimeAsMilliseconds) +} + +func TestPrometheusResponseTimeAsMillisecondsEnabled(t *testing.T) { + os.Setenv(prometheusResponseTimeAsMillisecondsEnv, "true") + defer os.Unsetenv(prometheusResponseTimeAsMillisecondsEnv) + + settings := NewSettings() + + assert.True(t, settings.PrometheusResponseTimeAsMilliseconds) +} + // Tests for RedisPoolOnEmptyBehavior func TestRedisPoolOnEmptyBehavior_Default(t *testing.T) { os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR") diff --git a/src/stats/prom/default_mapper.yaml b/src/stats/prom/default_mapper.yaml index 6229ce152..e03706e37 100644 --- a/src/stats/prom/default_mapper.yaml +++ b/src/stats/prom/default_mapper.yaml @@ -86,6 +86,7 @@ mappings: - match: "ratelimit_server.*.response_time" name: "ratelimit_service_response_time_seconds" timer_type: histogram + scale: 0.001 labels: grpc_method: "$1" diff --git a/src/stats/prom/prometheus_sink.go b/src/stats/prom/prometheus_sink.go index 2c93c49c8..24ae907f8 100644 --- a/src/stats/prom/prometheus_sink.go +++ b/src/stats/prom/prometheus_sink.go @@ -3,6 +3,7 @@ package prom import ( _ "embed" "net/http" + "strings" "github.com/go-kit/log" gostats "github.com/lyft/gostats" @@ -64,9 +65,10 @@ var ( type prometheusSink struct { config struct { - addr string - path string - mapperYamlPath string + addr string + path string + mapperYamlPath string + responseTimeAsMilliseconds bool } mapper *mapper.MetricMapper events chan event.Events @@ -93,6 +95,20 @@ func WithMapperYamlPath(mapperYamlPath string) prometheusSinkOption { } } +func WithResponseTimeAsMilliseconds(responseTimeAsMilliseconds bool) prometheusSinkOption { + return func(sink *prometheusSink) { + sink.config.responseTimeAsMilliseconds = responseTimeAsMilliseconds + } +} + +func (s *prometheusSink) mapperConfig() string { + if s.config.responseTimeAsMilliseconds { + return strings.Replace(defaultMapper, " scale: 0.001\n", "", 1) + } + + return defaultMapper +} + // NewPrometheusSink returns a Sink that flushes stats to os.StdErr. func NewPrometheusSink(opts ...prometheusSinkOption) gostats.Sink { promRegistry := prometheus.DefaultRegisterer @@ -119,7 +135,7 @@ func NewPrometheusSink(opts ...prometheusSinkOption) gostats.Sink { if sink.config.mapperYamlPath != "" { _ = sink.mapper.InitFromFile(sink.config.mapperYamlPath) } else { - _ = sink.mapper.InitFromYAMLString(defaultMapper) + _ = sink.mapper.InitFromYAMLString(sink.mapperConfig()) } sink.exp = exporter.NewExporter(promRegistry, diff --git a/src/stats/prom/prometheus_sink_test.go b/src/stats/prom/prometheus_sink_test.go index d9731eba5..d2d5e890a 100644 --- a/src/stats/prom/prometheus_sink_test.go +++ b/src/stats/prom/prometheus_sink_test.go @@ -121,3 +121,67 @@ func TestFlushTimer(t *testing.T) { *m.Metric[0].Histogram.SampleSum == 1.0 }, time.Second, time.Millisecond) } + +func TestFlushResponseTimeConvertsMillisecondsToSeconds(t *testing.T) { + s.FlushTimer("ratelimit_server.ShouldRateLimit.response_time", 1000) + assert.Eventually(t, func() bool { + metricFamilies, err := prometheus.DefaultGatherer.Gather() + if err != nil { + return false + } + + metrics := make(map[string]*dto.MetricFamily) + for _, metricFamily := range metricFamilies { + metrics[*metricFamily.Name] = metricFamily + } + + m, ok := metrics["ratelimit_service_response_time_seconds"] + if !ok || len(m.Metric) != 1 { + return false + } + + return *m.Metric[0].Histogram.SampleCount == uint64(1) && + reflect.DeepEqual(toMap(m.Metric[0].Label), map[string]string{ + "grpc_method": "ShouldRateLimit", + }) && + *m.Metric[0].Histogram.SampleSum == 1.0 + }, time.Second, time.Millisecond) +} + +func TestFlushResponseTimeCanUseLegacyMilliseconds(t *testing.T) { + oldRegisterer := prometheus.DefaultRegisterer + oldGatherer := prometheus.DefaultGatherer + reg := prometheus.NewRegistry() + prometheus.DefaultRegisterer = reg + prometheus.DefaultGatherer = reg + defer func() { + prometheus.DefaultRegisterer = oldRegisterer + prometheus.DefaultGatherer = oldGatherer + }() + + legacySink := NewPrometheusSink(WithAddr(":0"), WithPath("/metrics-legacy"), WithResponseTimeAsMilliseconds(true)) + legacySink.FlushTimer("ratelimit_server.ShouldRateLimit.response_time", 1000) + + assert.Eventually(t, func() bool { + metricFamilies, err := reg.Gather() + if err != nil { + return false + } + + metrics := make(map[string]*dto.MetricFamily) + for _, metricFamily := range metricFamilies { + metrics[*metricFamily.Name] = metricFamily + } + + m, ok := metrics["ratelimit_service_response_time_seconds"] + if !ok || len(m.Metric) != 1 { + return false + } + + return *m.Metric[0].Histogram.SampleCount == uint64(1) && + reflect.DeepEqual(toMap(m.Metric[0].Label), map[string]string{ + "grpc_method": "ShouldRateLimit", + }) && + *m.Metric[0].Histogram.SampleSum == 1000.0 + }, time.Second, time.Millisecond) +} From 4ffd8a2d13490ee6e1d2d480f8eb32355ca4d6dc Mon Sep 17 00:00:00 2001 From: Ian Kerins Date: Sun, 19 Apr 2026 15:21:49 -0400 Subject: [PATCH 4/4] feat: add zipkin b3 header propagation (#1110) --- go.mod | 1 + go.sum | 2 ++ src/trace/trace.go | 5 +++-- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index fcc6d6763..18efe1e4c 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 + go.opentelemetry.io/contrib/propagators/b3 v1.40.0 go.opentelemetry.io/otel v1.40.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 diff --git a/go.sum b/go.sum index 02be9a68a..c22f0f884 100644 --- a/go.sum +++ b/go.sum @@ -155,6 +155,8 @@ go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 h1:9G6E0TXzGFVfTnawRzrPl83iHOAV7L8NJiR8RSGYV1g= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0/go.mod h1:azvtTADFQJA8mX80jIH/akaE7h+dbm/sVuaHqN13w74= +go.opentelemetry.io/contrib/propagators/b3 v1.40.0 h1:xariChe8OOVF3rNlfzGFgQc61npQmXhzZj/i82mxMfg= +go.opentelemetry.io/contrib/propagators/b3 v1.40.0/go.mod h1:72WvbdxbOfXaELEQfonFfOL6osvcVjI7uJEE8C2nkrs= go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 h1:3Q/xZUyC1BBkualc9ROb4G8qkH90LXEIICcs5zv1OYY= diff --git a/src/trace/trace.go b/src/trace/trace.go index 6fa49b653..bdf0478f1 100644 --- a/src/trace/trace.go +++ b/src/trace/trace.go @@ -6,6 +6,7 @@ import ( "github.com/google/uuid" logger "github.com/sirupsen/logrus" + "go.opentelemetry.io/contrib/propagators/b3" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" @@ -62,7 +63,7 @@ func InitProductionTraceProvider(protocol string, serviceName string, serviceNam sdktrace.WithResource(resource), ) otel.SetTracerProvider(tp) - otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, b3.New(), propagation.Baggage{})) logger.Infof("TracerProvider initialized with following parameters: protocol: %s, serviceName: %s, serviceNamespace: %s, serviceInstanceId: %s, samplingRate: %f", protocol, serviceName, serviceNamespace, useServiceInstanceId, samplingRate) return tp @@ -100,7 +101,7 @@ func GetTestSpanExporter() *tracetest.InMemoryExporter { sdktrace.WithSyncer(testSpanExporter), ) otel.SetTracerProvider(tp) - otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, b3.New(), propagation.Baggage{})) return testSpanExporter }