[ADDED] Introduce PersistMode for configurable persistence settings in streams (#1943)

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
This commit is contained in:
Piotr Piotrowski
2025-09-22 10:14:38 +02:00
committed by Piotr Piotrowski
parent 13d3ae2df4
commit c85e27545a

View File

@@ -207,6 +207,9 @@ type (
// AllowAtomicPublish allows atomic batch publishing into the stream.
AllowAtomicPublish bool `json:"allow_atomic,omitempty"`
// PersistMode allows to opt-in to different persistence mode settings.
PersistMode PersistModeType `json:"persist_mode,omitempty"`
}
// StreamSourceInfo shows information about an upstream stream
@@ -428,6 +431,9 @@ type (
// StoreCompression determines how messages are compressed.
StoreCompression uint8
// PersistModeType determines what persistence mode the stream uses.
PersistModeType int
)
const (
@@ -459,6 +465,16 @@ const (
workQueuePolicyString = "workqueue"
)
const (
// DefaultPersistMode specifies the default persist mode. Writes to the stream will immediately be flushed.
// The publish acknowledgement will be sent after the persisting completes.
DefaultPersistMode = PersistModeType(iota)
// AsyncPersistMode specifies writes to the stream will be flushed asynchronously.
// The publish acknowledgement may be sent before the persisting completes.
// This means writes could be lost if they weren't flushed prior to a hard kill of the server.
AsyncPersistMode
)
func (rp RetentionPolicy) String() string {
switch rp {
case LimitsPolicy:
@@ -533,6 +549,40 @@ func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error {
return nil
}
func (pm PersistModeType) String() string {
switch pm {
case DefaultPersistMode:
return "Default"
case AsyncPersistMode:
return "Async"
default:
return "Unknown Persist Mode"
}
}
func (pm PersistModeType) MarshalJSON() ([]byte, error) {
switch pm {
case DefaultPersistMode:
return json.Marshal("default")
case AsyncPersistMode:
return json.Marshal("async")
default:
return nil, fmt.Errorf("nats: can not marshal %v", pm)
}
}
func (pm *PersistModeType) UnmarshalJSON(data []byte) error {
switch strings.ToLower(string(data)) {
case jsonString("default"):
*pm = DefaultPersistMode
case jsonString("async"):
*pm = AsyncPersistMode
default:
return fmt.Errorf("nats: can not unmarshal %q", data)
}
return nil
}
const (
// FileStorage specifies on disk storage. It's the default.
FileStorage StorageType = iota