feat(backend): Add multi-provider storage configuration support

This commit is contained in:
pycook
2025-06-18 21:02:48 +08:00
parent e262569aa3
commit 470927ed58
14 changed files with 2700 additions and 31 deletions

View File

@@ -3,8 +3,13 @@ module github.com/veops/oneterm
go 1.21.3
require (
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/BurntSushi/toml v1.4.0
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
github.com/atotto/clipboard v0.1.4
// Cloud Storage Providers
github.com/aws/aws-sdk-go v1.55.5
github.com/charmbracelet/bubbles v0.19.0
github.com/charmbracelet/bubbletea v0.27.1
github.com/charmbracelet/lipgloss v0.13.0
@@ -15,6 +20,7 @@ require (
github.com/go-resty/resty/v2 v2.14.0
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.24.6+incompatible
github.com/mattn/go-runewidth v0.0.16
github.com/minio/minio-go/v7 v7.0.76
github.com/nicksnyder/go-i18n/v2 v2.4.0
@@ -29,6 +35,7 @@ require (
github.com/swaggo/files v1.0.1
github.com/swaggo/gin-swagger v1.6.0
github.com/swaggo/swag v1.16.3
github.com/tencentyun/cos-go-sdk-v5 v0.7.55
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.26.0
golang.org/x/sync v0.8.0
@@ -41,29 +48,36 @@ require (
)
require (
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/charmbracelet/x/ansi v0.1.4 // indirect
github.com/charmbracelet/x/input v0.1.0 // indirect
github.com/charmbracelet/x/term v0.1.1 // indirect
github.com/charmbracelet/x/windows v0.1.0 // indirect
github.com/clbanning/mxj v1.8.4 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect
github.com/go-ini/ini v1.67.0 // indirect
github.com/google/go-querystring v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.5.5 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-ieproxy v0.0.1 // indirect
github.com/mattn/go-localereader v0.0.1 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/mozillazg/go-httpheader v0.2.1 // indirect
github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 // indirect
github.com/muesli/cancelreader v0.2.2 // indirect
github.com/muesli/termenv v0.15.2 // indirect
github.com/rs/xid v1.6.0 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
golang.org/x/time v0.6.0 // indirect
)
require (

View File

@@ -1,3 +1,18 @@
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=
github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest/adal v0.9.13 h1:Mp5hbtOePIzM8pJVRa3YLrWWmZtoxRXqUEzCfJt3+/Q=
github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw=
github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74=
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+ZtXWSmf4Tg=
github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0=
github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
@@ -6,10 +21,15 @@ github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tN
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F7Y6wUM49Lcha2FMXt4UM8g=
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
github.com/atotto/clipboard v0.1.4 h1:EH0zSVneZPSuFR11BlR9YppQTVDbh5+16AmcJi4g1z4=
github.com/atotto/clipboard v0.1.4/go.mod h1:ZY9tmq7sm5xIbd9bOK4onWV4S6X0u6GY7Vn0Yu86PYI=
github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU=
github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k=
github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
@@ -36,6 +56,8 @@ github.com/charmbracelet/x/term v0.1.1 h1:3cosVAiPOig+EV4X9U+3LDgtwwAoEzJjNdwbXD
github.com/charmbracelet/x/term v0.1.1/go.mod h1:wB1fHt5ECsu3mXYusyzcngVWWlu1KKUmmLhfgr/Flxw=
github.com/charmbracelet/x/windows v0.1.0 h1:gTaxdvzDM5oMa/I2ZNF7wN78X/atWemG9Wph7Ika2k4=
github.com/charmbracelet/x/windows v0.1.0/go.mod h1:GLEO/l+lizvFDBPLIOk+49gdX49L9YWMB5t+DZd0jkQ=
github.com/clbanning/mxj v1.8.4 h1:HuhwZtbyvyOw+3Z1AowPkU87JkJUSv751ELWaiTpj8I=
github.com/clbanning/mxj v1.8.4/go.mod h1:BVjHeAH+rl9rs6f+QIpeRl0tfu10SXn1pUSa5PVGJng=
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
@@ -55,6 +77,8 @@ github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f h1:Y/CXytFA4m6
github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f/go.mod h1:vw97MGsxSvLiUE2X8qFplwetxpGLQrlU1Q9AUEIzCaM=
github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4=
github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
@@ -97,15 +121,22 @@ github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ
github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.24.6+incompatible h1:/2MdLc7zHJqzV7J2uVGaoGymVobB/OHC8wmEyWRaK68=
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.24.6+incompatible/go.mod h1:l7VUhRbTKCzdOacdT4oWCwATKyvZqUOlOqr0Ous3k4s=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
@@ -120,6 +151,10 @@ github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/
github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
@@ -134,6 +169,7 @@ github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgSh
github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@@ -152,6 +188,8 @@ github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqfI=
github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
@@ -165,6 +203,7 @@ github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
github.com/minio/minio-go/v7 v7.0.76 h1:9nxHH2XDai61cT/EFhyIw/wW4vJfpPNvl7lSFpRt+Ng=
github.com/minio/minio-go/v7 v7.0.76/go.mod h1:AVM3IUN6WwKzmwBxVdjzhH8xq+f57JSbbvzqvUzR6eg=
github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@@ -172,6 +211,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mozillazg/go-httpheader v0.2.1 h1:geV7TrjbL8KXSyvghnFm+NyTux/hxwueTSrwhe88TQQ=
github.com/mozillazg/go-httpheader v0.2.1/go.mod h1:jJ8xECTlalr6ValeXYdOF8fFUISeBAdw6E61aqQma60=
github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 h1:ZK8zHtRHOkbHy6Mmr5D264iyp3TiX5OmNcI5cIARiQI=
github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6/go.mod h1:CJlz5H+gyd6CUWT45Oy4q24RdLyn7Md9Vj2/ldJBSIo=
github.com/muesli/cancelreader v0.2.2 h1:3I4Kt4BQjOR54NavqnDogx/MIoWBFa0StPA8ELUXHmA=
@@ -185,6 +226,8 @@ github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.13.6 h1:JFZT4XbOU7l77xGSpOdW+pwIMqP044IyjXX6FGyEKFo=
github.com/pkg/sftp v1.13.6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Qk=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@@ -236,6 +279,10 @@ github.com/swaggo/gin-swagger v1.6.0 h1:y8sxvQ3E20/RCyrXeFfg60r6H0Z+SwpTjMYsMm+z
github.com/swaggo/gin-swagger v1.6.0/go.mod h1:BG00cCEy294xtVpyIAHG6+e2Qzj/xKlRdOqDkvq0uzo=
github.com/swaggo/swag v1.16.3 h1:PnCYjPCah8FK4I26l2F/KQ4yz3sILcVUN3cTlBFA9Pg=
github.com/swaggo/swag v1.16.3/go.mod h1:DImHIuOFXKpMFAQjcC7FG4m3Dg4+QuUgUzJmKjI/gRk=
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.563/go.mod h1:7sCQWVkxcsR38nffDW057DRGk8mUjK1Ing/EFOK8s8Y=
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/kms v1.0.563/go.mod h1:uom4Nvi9W+Qkom0exYiJ9VWJjXwyxtPYTkKkaLMlfE0=
github.com/tencentyun/cos-go-sdk-v5 v0.7.55 h1:9DfH3umWUd0I2jdqcUxrU1kLfUPOydULNy4T9qN5PF8=
github.com/tencentyun/cos-go-sdk-v5 v0.7.55/go.mod h1:8+hG+mQMuRP/OIS9d83syAvXvrMj9HhkND6Q1fLghw0=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
@@ -255,6 +302,8 @@ golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUu
golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc=
golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
@@ -271,9 +320,12 @@ golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM=
golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
@@ -294,8 +346,11 @@ golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210420072515-93ed5bcd2bfe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -324,6 +379,7 @@ golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4=
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
@@ -358,6 +414,7 @@ gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -2,6 +2,7 @@ package controller
import (
"errors"
"fmt"
"net/http"
"github.com/gin-gonic/gin"
@@ -135,6 +136,14 @@ func (c *Controller) UpdateStorageConfig(ctx *gin.Context) {
}
}
})
// Always refresh providers after update to handle name changes
if !ctx.IsAborted() {
if err := storageService.RefreshProviders(ctx); err != nil {
ctx.AbortWithError(http.StatusInternalServerError, &myErrors.ApiError{Code: myErrors.ErrInternal, Data: map[string]any{"err": err}})
return
}
}
}
// DeleteStorageConfig godoc
@@ -177,8 +186,19 @@ func (c *Controller) TestStorageConnection(ctx *gin.Context) {
}
config := &model.StorageConfig{}
if err := ctx.ShouldBindJSON(config); err != nil {
ctx.AbortWithError(http.StatusBadRequest, &myErrors.ApiError{Code: myErrors.ErrInvalidArgument, Data: map[string]any{"err": err}})
if err := ctx.ShouldBindBodyWithJSON(config); err != nil {
// Provide more detailed error information for JSON parsing issues
errorMsg := fmt.Sprintf("failed to parse request body: %v", err)
if err.Error() == "EOF" {
errorMsg = "request body is empty, please provide storage configuration JSON"
}
ctx.AbortWithError(http.StatusBadRequest, &myErrors.ApiError{Code: myErrors.ErrInvalidArgument, Data: map[string]any{"err": errorMsg}})
return
}
// Validate required fields
if config.Type == "" {
ctx.AbortWithError(http.StatusBadRequest, &myErrors.ApiError{Code: myErrors.ErrInvalidArgument, Data: map[string]any{"err": "storage type is required"}})
return
}
@@ -216,9 +236,16 @@ func (c *Controller) GetStorageHealth(ctx *gin.Context) {
// Convert error map to a more API-friendly format
healthStatus := make(map[string]map[string]interface{})
for name, err := range healthResults {
var errorMsg interface{}
if err != nil {
errorMsg = err.Error() // Convert error to string for proper JSON serialization
} else {
errorMsg = nil
}
healthStatus[name] = map[string]interface{}{
"healthy": err == nil,
"error": err,
"error": errorMsg,
}
}
@@ -253,6 +280,12 @@ func (c *Controller) SetPrimaryStorage(ctx *gin.Context) {
return
}
// First, clear all existing primary flags
if err := storageService.ClearAllPrimaryFlags(ctx); err != nil {
ctx.AbortWithError(http.StatusInternalServerError, &myErrors.ApiError{Code: myErrors.ErrInternal, Data: map[string]any{"err": err}})
return
}
// Update to set as primary
config.IsPrimary = true
config.UpdaterId = currentUser.GetUid()
@@ -262,6 +295,12 @@ func (c *Controller) SetPrimaryStorage(ctx *gin.Context) {
return
}
// Refresh providers after setting new primary to ensure session replay adapter uses the new primary provider
if err := storageService.RefreshProviders(ctx); err != nil {
ctx.AbortWithError(http.StatusInternalServerError, &myErrors.ApiError{Code: myErrors.ErrInternal, Data: map[string]any{"err": err}})
return
}
ctx.JSON(http.StatusOK, defaultHttpResponse)
}
@@ -302,9 +341,58 @@ func (c *Controller) ToggleStorageProvider(ctx *gin.Context) {
return
}
// Refresh providers after toggle to ensure correct provider state
if err := storageService.RefreshProviders(ctx); err != nil {
ctx.AbortWithError(http.StatusInternalServerError, &myErrors.ApiError{Code: myErrors.ErrInternal, Data: map[string]any{"err": err}})
return
}
ctx.JSON(http.StatusOK, HttpResponse{Data: map[string]bool{"enabled": config.Enabled}})
}
// GetStorageMetrics godoc
//
// @Tags storage
// @Summary Get storage usage metrics
// @Success 200 {object} HttpResponse{data=map[string]any}
// @Router /storage/metrics [get]
func (c *Controller) GetStorageMetrics(ctx *gin.Context) {
currentUser, _ := acl.GetSessionFromCtx(ctx)
if !acl.IsAdmin(currentUser) {
ctx.AbortWithError(http.StatusForbidden, &myErrors.ApiError{Code: myErrors.ErrNoPerm, Data: map[string]any{"perm": acl.READ}})
return
}
metrics, err := storageService.GetStorageMetrics(ctx)
if err != nil {
ctx.AbortWithError(http.StatusInternalServerError, &myErrors.ApiError{Code: myErrors.ErrInternal, Data: map[string]any{"err": err}})
return
}
ctx.JSON(http.StatusOK, HttpResponse{Data: metrics})
}
// RefreshStorageMetrics godoc
//
// @Tags storage
// @Summary Refresh storage usage metrics
// @Success 200 {object} HttpResponse{}
// @Router /storage/metrics/refresh [post]
func (c *Controller) RefreshStorageMetrics(ctx *gin.Context) {
currentUser, _ := acl.GetSessionFromCtx(ctx)
if !acl.IsAdmin(currentUser) {
ctx.AbortWithError(http.StatusForbidden, &myErrors.ApiError{Code: myErrors.ErrNoPerm, Data: map[string]any{"perm": acl.WRITE}})
return
}
if err := storageService.RefreshStorageMetrics(ctx); err != nil {
ctx.AbortWithError(http.StatusInternalServerError, &myErrors.ApiError{Code: myErrors.ErrInternal, Data: map[string]any{"err": err}})
return
}
ctx.JSON(http.StatusOK, defaultHttpResponse)
}
// validateStorageConfig validates storage configuration
func validateStorageConfig(config *model.StorageConfig) error {
if config.Name == "" {

View File

@@ -184,6 +184,8 @@ func SetupRouter(r *gin.Engine) {
storage.DELETE("/configs/:id", c.DeleteStorageConfig)
storage.POST("/test-connection", c.TestStorageConnection)
storage.GET("/health", c.GetStorageHealth)
// storage.GET("/metrics", c.GetStorageMetrics)
// storage.POST("/metrics/refresh", c.RefreshStorageMetrics)
storage.PUT("/configs/:id/set-primary", c.SetPrimaryStorage)
storage.PUT("/configs/:id/toggle", c.ToggleStorageProvider)
}

View File

@@ -17,6 +17,9 @@ const (
StorageTypeMinio StorageType = "minio"
StorageTypeOSS StorageType = "oss"
StorageTypeCOS StorageType = "cos"
StorageTypeAzure StorageType = "azure"
StorageTypeOBS StorageType = "obs"
StorageTypeOOS StorageType = "oos"
)
// StorageConfigMap represents the configuration parameters for a storage backend
@@ -53,7 +56,7 @@ type StorageConfig struct {
UpdaterId int `json:"updater_id" gorm:"column:updater_id"`
CreatedAt time.Time `json:"created_at" gorm:"column:created_at"`
UpdatedAt time.Time `json:"updated_at" gorm:"column:updated_at"`
DeletedAt soft_delete.DeletedAt `json:"-" gorm:"column:deleted_at;uniqueIndex:deleted_at"`
DeletedAt soft_delete.DeletedAt `json:"-" gorm:"column:deleted_at"`
}
func (m *StorageConfig) TableName() string {
@@ -108,7 +111,7 @@ type StorageMetrics struct {
CreatedAt time.Time `json:"created_at" gorm:"column:created_at"`
UpdatedAt time.Time `json:"updated_at" gorm:"column:updated_at"`
DeletedAt soft_delete.DeletedAt `json:"-" gorm:"column:deleted_at;uniqueIndex:deleted_at"`
DeletedAt soft_delete.DeletedAt `json:"-" gorm:"column:deleted_at"`
}
func (m *StorageMetrics) TableName() string {
@@ -135,7 +138,7 @@ type FileMetadata struct {
UpdaterId int `json:"updater_id" gorm:"column:updater_id"`
CreatedAt time.Time `json:"created_at" gorm:"column:created_at"`
UpdatedAt time.Time `json:"updated_at" gorm:"column:updated_at"`
DeletedAt soft_delete.DeletedAt `json:"-" gorm:"column:deleted_at;uniqueIndex:deleted_at"`
DeletedAt soft_delete.DeletedAt `json:"-" gorm:"column:deleted_at"`
}
func (m *FileMetadata) TableName() string {

View File

@@ -31,6 +31,9 @@ type StorageRepository interface {
// DeleteStorageConfig deletes storage configuration
DeleteStorageConfig(ctx context.Context, name string) error
// ClearAllPrimaryFlags clears all primary flags in database
ClearAllPrimaryFlags(ctx context.Context) error
// GetFileMetadata retrieves file metadata
GetFileMetadata(ctx context.Context, key string) (*model.FileMetadata, error)
@@ -45,6 +48,13 @@ type StorageRepository interface {
// ListFileMetadata lists file metadata with pagination
ListFileMetadata(ctx context.Context, prefix string, limit, offset int) ([]*model.FileMetadata, int64, error)
// Storage Metrics Operations
GetStorageMetrics(ctx context.Context) ([]*model.StorageMetrics, error)
GetStorageMetricsByName(ctx context.Context, storageName string) (*model.StorageMetrics, error)
CreateStorageMetrics(ctx context.Context, metrics *model.StorageMetrics) error
UpdateStorageMetrics(ctx context.Context, metrics *model.StorageMetrics) error
UpsertStorageMetrics(ctx context.Context, metrics *model.StorageMetrics) error
}
// storageRepository implements StorageRepository
@@ -118,6 +128,11 @@ func (r *storageRepository) DeleteStorageConfig(ctx context.Context, name string
return dbpkg.DB.Where("name = ?", name).Delete(&model.StorageConfig{}).Error
}
// ClearAllPrimaryFlags clears all primary flags in database
func (r *storageRepository) ClearAllPrimaryFlags(ctx context.Context) error {
return dbpkg.DB.Model(&model.StorageConfig{}).Where("is_primary = ?", true).Update("is_primary", false).Error
}
// GetFileMetadata retrieves file metadata
func (r *storageRepository) GetFileMetadata(ctx context.Context, key string) (*model.FileMetadata, error) {
var metadata model.FileMetadata
@@ -163,3 +178,35 @@ func (r *storageRepository) ListFileMetadata(ctx context.Context, prefix string,
err = query.Limit(limit).Offset(offset).Find(&metadata).Error
return metadata, total, err
}
// GetStorageMetrics retrieves storage metrics
func (r *storageRepository) GetStorageMetrics(ctx context.Context) ([]*model.StorageMetrics, error) {
var metrics []*model.StorageMetrics
err := dbpkg.DB.Find(&metrics).Error
return metrics, err
}
// GetStorageMetricsByName retrieves storage metrics by name
func (r *storageRepository) GetStorageMetricsByName(ctx context.Context, storageName string) (*model.StorageMetrics, error) {
var metric model.StorageMetrics
err := dbpkg.DB.Where("storage_name = ?", storageName).First(&metric).Error
if err != nil {
return nil, err
}
return &metric, nil
}
// CreateStorageMetrics creates new storage metrics
func (r *storageRepository) CreateStorageMetrics(ctx context.Context, metrics *model.StorageMetrics) error {
return dbpkg.DB.Create(metrics).Error
}
// UpdateStorageMetrics updates storage metrics
func (r *storageRepository) UpdateStorageMetrics(ctx context.Context, metrics *model.StorageMetrics) error {
return dbpkg.DB.Save(metrics).Error
}
// UpsertStorageMetrics upserts storage metrics
func (r *storageRepository) UpsertStorageMetrics(ctx context.Context, metrics *model.StorageMetrics) error {
return dbpkg.DB.Save(metrics).Error
}

View File

@@ -146,17 +146,5 @@ func (s *SessionService) GetSessionReplayFilename(ctx context.Context, sessionId
// GetSessionReplay gets session replay file reader
func (s *SessionService) GetSessionReplay(ctx context.Context, sessionId string) (io.ReadCloser, error) {
// First try to get from storage service
if DefaultStorageService != nil {
reader, err := DefaultStorageService.GetSessionReplay(ctx, sessionId)
if err == nil {
return reader, nil
}
logger.L().Warn("Failed to get replay from storage service, falling back to local file",
zap.String("session_id", sessionId),
zap.Error(err))
}
// Fallback to direct file access with date hierarchy search
return gsession.GetReplay(sessionId)
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/veops/oneterm/internal/model"
"github.com/veops/oneterm/internal/repository"
"github.com/veops/oneterm/pkg/config"
dbpkg "github.com/veops/oneterm/pkg/db"
"github.com/veops/oneterm/pkg/logger"
"github.com/veops/oneterm/pkg/storage"
"github.com/veops/oneterm/pkg/storage/providers"
@@ -32,6 +33,9 @@ type StorageService interface {
UpdateStorageConfig(ctx context.Context, config *model.StorageConfig) error
DeleteStorageConfig(ctx context.Context, name string) error
// Clear all primary flags for ensuring single primary constraint
ClearAllPrimaryFlags(ctx context.Context) error
// File Operations combining storage backend and database metadata
UploadFile(ctx context.Context, key string, reader io.Reader, size int64, metadata *model.FileMetadata) error
DownloadFile(ctx context.Context, key string) (io.ReadCloser, *model.FileMetadata, error)
@@ -47,11 +51,13 @@ type StorageService interface {
SaveRDPFile(ctx context.Context, assetId int, remotePath string, reader io.Reader, size int64) error
GetRDPFile(ctx context.Context, assetId int, remotePath string) (io.ReadCloser, error)
DeleteRDPFile(ctx context.Context, assetId int, remotePath string) error
ListRDPFiles(ctx context.Context, assetId int, directory string, limit, offset int) ([]*model.FileMetadata, int64, error)
// Provider management
GetPrimaryProvider() (storage.Provider, error)
HealthCheck(ctx context.Context) map[string]error
CreateProvider(config *model.StorageConfig) (storage.Provider, error)
RefreshProviders(ctx context.Context) error
// New method for building queries
BuildQuery(ctx *gin.Context) *gorm.DB
@@ -59,6 +65,11 @@ type StorageService interface {
// GetAvailableProvider returns an available storage provider with fallback logic
// Priority: Primary storage first, then by priority (lower number = higher priority)
GetAvailableProvider(ctx context.Context) (storage.Provider, error)
// Storage Metrics Operations
GetStorageMetrics(ctx context.Context) (map[string]*model.StorageMetrics, error)
RefreshStorageMetrics(ctx context.Context) error
CalculateStorageMetrics(ctx context.Context, storageName string) (*model.StorageMetrics, error)
}
// storageService implements StorageService
@@ -103,17 +114,42 @@ func (s *storageService) CreateStorageConfig(ctx context.Context, config *model.
return fmt.Errorf("failed to create storage config: %w", err)
}
// Initialize storage provider
provider, err := s.CreateProvider(config)
if err != nil {
logger.L().Warn("Failed to initialize storage provider",
zap.String("name", config.Name),
zap.Error(err))
} else {
// Only initialize storage provider if it's enabled
if config.Enabled {
provider, err := s.CreateProvider(config)
if err != nil {
logger.L().Warn("Failed to initialize storage provider",
zap.String("name", config.Name),
zap.Error(err))
return nil // Don't fail the creation, just warn
}
// Perform health check before adding to providers map
if err := provider.HealthCheck(ctx); err != nil {
logger.L().Warn("Storage provider failed health check",
zap.String("name", config.Name),
zap.String("type", string(config.Type)),
zap.Error(err))
// Still add to providers map even if health check fails,
// so it appears in health status for monitoring
}
s.providers[config.Name] = provider
if config.IsPrimary {
s.primary = config.Name
logger.L().Info("Set new storage as primary provider",
zap.String("name", config.Name),
zap.String("type", string(config.Type)))
}
logger.L().Info("Storage provider initialized successfully",
zap.String("name", config.Name),
zap.String("type", string(config.Type)),
zap.Bool("is_primary", config.IsPrimary))
} else {
logger.L().Info("Storage configuration created but not initialized (disabled)",
zap.String("name", config.Name),
zap.String("type", string(config.Type)))
}
return nil
@@ -124,7 +160,15 @@ func (s *storageService) UpdateStorageConfig(ctx context.Context, config *model.
return err
}
return s.storageRepo.UpdateStorageConfig(ctx, config)
if err := s.storageRepo.UpdateStorageConfig(ctx, config); err != nil {
return err
}
if err := s.RefreshProviders(ctx); err != nil {
logger.L().Warn("Failed to refresh providers after config update", zap.Error(err))
}
return nil
}
func (s *storageService) DeleteStorageConfig(ctx context.Context, name string) error {
@@ -296,9 +340,42 @@ func (s *storageService) GetPrimaryProvider() (storage.Provider, error) {
func (s *storageService) HealthCheck(ctx context.Context) map[string]error {
results := make(map[string]error)
for name, provider := range s.providers {
err := provider.HealthCheck(ctx)
results[name] = err
// Get all storage configurations from database
configs, err := s.GetStorageConfigs(ctx)
if err != nil {
logger.L().Error("Failed to get storage configs for health check", zap.Error(err))
return results
}
// Check each configuration
for _, config := range configs {
if !config.Enabled {
// For disabled configs, add a special error indicating they are disabled
results[config.Name] = fmt.Errorf("storage provider is disabled")
continue
}
// For enabled configs, check if provider exists and perform health check
if provider, exists := s.providers[config.Name]; exists {
err := provider.HealthCheck(ctx)
if err != nil {
// Add more context to the error message
logger.L().Warn("Storage provider health check failed",
zap.String("name", config.Name),
zap.String("type", string(config.Type)),
zap.Error(err))
results[config.Name] = fmt.Errorf("health check failed: %v", err)
} else {
results[config.Name] = nil
}
} else {
// Provider should exist but doesn't - this indicates an initialization problem
logger.L().Warn("Storage provider not found in memory",
zap.String("name", config.Name),
zap.String("type", string(config.Type)),
zap.Bool("enabled", config.Enabled))
results[config.Name] = fmt.Errorf("storage provider not initialized, possible configuration error or initialization failure")
}
}
return results
@@ -353,15 +430,56 @@ func (s *storageService) CreateProvider(config *model.StorageConfig) (storage.Pr
localConfig.RetentionConfig = retentionConfig
return providers.NewLocal(localConfig)
case model.StorageTypeMinio:
minioConfig, err := providers.ParseMinioConfigFromMap(config.Config)
if err != nil {
return nil, fmt.Errorf("failed to parse Minio config: %w", err)
}
return providers.NewMinio(minioConfig)
case model.StorageTypeS3:
// TODO: implement S3 provider with path strategy support
return nil, fmt.Errorf("S3 provider not implemented yet")
s3Config, err := providers.ParseS3ConfigFromMap(config.Config)
if err != nil {
return nil, fmt.Errorf("failed to parse S3 config: %w", err)
}
return providers.NewS3(s3Config)
case model.StorageTypeAzure:
azureConfig, err := providers.ParseAzureConfigFromMap(config.Config)
if err != nil {
return nil, fmt.Errorf("failed to parse Azure config: %w", err)
}
return providers.NewAzure(azureConfig)
case model.StorageTypeCOS:
cosConfig, err := providers.ParseCOSConfigFromMap(config.Config)
if err != nil {
return nil, fmt.Errorf("failed to parse COS config: %w", err)
}
return providers.NewCOS(cosConfig)
case model.StorageTypeOSS:
ossConfig, err := providers.ParseOSSConfigFromMap(config.Config)
if err != nil {
return nil, fmt.Errorf("failed to parse OSS config: %w", err)
}
return providers.NewOSS(ossConfig)
case model.StorageTypeOBS:
obsConfig, err := providers.ParseOBSConfigFromMap(config.Config)
if err != nil {
return nil, fmt.Errorf("failed to parse OBS config: %w", err)
}
return providers.NewOBS(obsConfig)
case model.StorageTypeOOS:
oosConfig, err := providers.ParseOOSConfigFromMap(config.Config)
if err != nil {
return nil, fmt.Errorf("failed to parse OOS config: %w", err)
}
return providers.NewOOS(oosConfig)
default:
return nil, fmt.Errorf("unsupported storage type: %s", config.Type)
}
@@ -605,6 +723,26 @@ func init() {
}
}
}()
// Start background storage metrics calculation
// go func() {
// // Update storage metrics every 30 minutes to avoid high resource consumption
// ticker := time.NewTicker(30 * time.Minute)
// defer ticker.Stop()
// // Initial update after 5 minutes
// time.Sleep(5 * time.Minute)
// if DefaultStorageService != nil {
// performMetricsUpdate()
// }
// for {
// <-ticker.C
// if DefaultStorageService != nil {
// performMetricsUpdate()
// }
// }
// }()
}
// performHealthMonitoring performs periodic health checks on all storage providers
@@ -648,3 +786,204 @@ func performHealthMonitoring() {
zap.Int("total_providers", totalCount))
}
}
// performMetricsUpdate performs periodic storage metrics calculation
func performMetricsUpdate() {
ctx := context.Background()
storageImpl, ok := DefaultStorageService.(*storageService)
if !ok {
return
}
// Refresh metrics for all storage configurations
if err := storageImpl.RefreshStorageMetrics(ctx); err != nil {
logger.L().Warn("Failed to refresh storage metrics during background update", zap.Error(err))
return
}
// Log completion
configs, err := storageImpl.GetStorageConfigs(ctx)
if err == nil && len(configs) > 0 {
enabledCount := 0
for _, config := range configs {
if config.Enabled {
enabledCount++
}
}
logger.L().Info("Storage metrics update completed",
zap.Int("enabled_storages", enabledCount),
zap.Int("total_storages", len(configs)))
}
}
func (s *storageService) RefreshProviders(ctx context.Context) error {
s.providers = make(map[string]storage.Provider)
s.primary = ""
configs, err := s.GetStorageConfigs(ctx)
if err != nil {
return fmt.Errorf("failed to load storage configurations: %w", err)
}
successCount := initializeStorageProviders(ctx, s, configs)
if provider, err := s.GetPrimaryProvider(); err == nil {
storage.InitializeAdapter(provider)
logger.L().Info("Session replay adapter re-initialized with new primary provider",
zap.String("provider_type", provider.Type()))
} else {
logger.L().Warn("Failed to re-initialize session replay adapter", zap.Error(err))
}
logger.L().Info("Storage providers refreshed",
zap.Int("total_configs", len(configs)),
zap.Int("successful_providers", successCount))
return nil
}
func (s *storageService) ClearAllPrimaryFlags(ctx context.Context) error {
if err := s.storageRepo.ClearAllPrimaryFlags(ctx); err != nil {
return fmt.Errorf("failed to clear primary flags in database: %w", err)
}
s.primary = ""
if err := s.RefreshProviders(ctx); err != nil {
logger.L().Warn("Failed to refresh providers after clearing primary flags", zap.Error(err))
return err
}
return nil
}
func (s *storageService) ListRDPFiles(ctx context.Context, assetId int, directory string, limit, offset int) ([]*model.FileMetadata, int64, error) {
prefix := fmt.Sprintf("rdp_files/%d/%s", assetId, directory)
return s.storageRepo.ListFileMetadata(ctx, prefix, limit, offset)
}
func (s *storageService) GetStorageMetrics(ctx context.Context) (map[string]*model.StorageMetrics, error) {
metricsList, err := s.storageRepo.GetStorageMetrics(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get storage metrics: %w", err)
}
metricsMap := make(map[string]*model.StorageMetrics)
for _, metric := range metricsList {
metricsMap[metric.StorageName] = metric
}
return metricsMap, nil
}
func (s *storageService) RefreshStorageMetrics(ctx context.Context) error {
// Get all storage configurations
configs, err := s.GetStorageConfigs(ctx)
if err != nil {
return fmt.Errorf("failed to get storage configs: %w", err)
}
// Calculate metrics for each storage
for _, config := range configs {
if !config.Enabled {
continue
}
metric, err := s.CalculateStorageMetrics(ctx, config.Name)
if err != nil {
logger.L().Warn("Failed to calculate storage metrics",
zap.String("storage", config.Name),
zap.Error(err))
continue
}
// Upsert metrics
if err := s.storageRepo.UpsertStorageMetrics(ctx, metric); err != nil {
logger.L().Warn("Failed to save storage metrics",
zap.String("storage", config.Name),
zap.Error(err))
}
}
return nil
}
func (s *storageService) CalculateStorageMetrics(ctx context.Context, storageName string) (*model.StorageMetrics, error) {
metric := &model.StorageMetrics{
StorageName: storageName,
LastUpdated: time.Now(),
IsHealthy: true,
}
// Check if provider exists and is healthy
if provider, exists := s.providers[storageName]; exists {
if err := provider.HealthCheck(ctx); err != nil {
metric.IsHealthy = false
metric.ErrorMessage = err.Error()
}
} else {
metric.IsHealthy = false
metric.ErrorMessage = "Provider not initialized"
}
// Calculate file counts and sizes efficiently using database aggregation
// Use storage_name field from file_metadata table
if err := s.calculateFileStats(ctx, storageName, metric); err != nil {
logger.L().Warn("Failed to calculate file stats",
zap.String("storage", storageName),
zap.Error(err))
// Don't fail completely, just log the warning
}
return metric, nil
}
// Helper method to calculate file statistics efficiently
func (s *storageService) calculateFileStats(ctx context.Context, storageName string, metric *model.StorageMetrics) error {
// Calculate total file count and size
type Result struct {
Count int64 `json:"count"`
Size int64 `json:"size"`
}
var totalResult Result
err := dbpkg.DB.Model(&model.FileMetadata{}).
Select("COUNT(*) as count, COALESCE(SUM(file_size), 0) as size").
Where("storage_name = ?", storageName).
Scan(&totalResult).Error
if err != nil {
return fmt.Errorf("failed to calculate total stats: %w", err)
}
metric.FileCount = totalResult.Count
metric.TotalSize = totalResult.Size
// Calculate replay-specific stats
var replayResult Result
err = dbpkg.DB.Model(&model.FileMetadata{}).
Select("COUNT(*) as count, COALESCE(SUM(file_size), 0) as size").
Where("storage_name = ? AND category = ?", storageName, "replay").
Scan(&replayResult).Error
if err != nil {
logger.L().Warn("Failed to calculate replay stats", zap.Error(err))
} else {
metric.ReplayCount = replayResult.Count
metric.ReplaySize = replayResult.Size
}
// Calculate RDP file stats
var rdpResult Result
err = dbpkg.DB.Model(&model.FileMetadata{}).
Select("COUNT(*) as count, COALESCE(SUM(file_size), 0) as size").
Where("storage_name = ? AND category = ?", storageName, "rdp_file").
Scan(&rdpResult).Error
if err != nil {
logger.L().Warn("Failed to calculate RDP file stats", zap.Error(err))
} else {
metric.RdpFileCount = rdpResult.Count
metric.RdpFileSize = rdpResult.Size
}
return nil
}

View File

@@ -0,0 +1,312 @@
package providers
import (
"context"
"fmt"
"io"
"net/url"
"path/filepath"
"strconv"
"strings"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/veops/oneterm/pkg/logger"
"github.com/veops/oneterm/pkg/storage"
"go.uber.org/zap"
)
// AzureConfig holds configuration for Azure Blob Storage
type AzureConfig struct {
AccountName string `json:"account_name" mapstructure:"account_name"`
AccountKey string `json:"account_key" mapstructure:"account_key"`
ContainerName string `json:"container_name" mapstructure:"container_name"`
EndpointSuffix string `json:"endpoint_suffix" mapstructure:"endpoint_suffix"`
PathStrategy storage.PathStrategy `json:"path_strategy" mapstructure:"path_strategy"`
RetentionConfig storage.StorageRetentionConfig `json:"retention" mapstructure:"retention"`
}
// Azure implements the storage.Provider interface for Azure Blob Storage
type Azure struct {
containerURL azblob.ContainerURL
config AzureConfig
pathGenerator *storage.PathGenerator
}
// NewAzure creates a new Azure Blob Storage provider
func NewAzure(config AzureConfig) (storage.Provider, error) {
// Set default path strategy if not specified
if config.PathStrategy == "" {
config.PathStrategy = storage.DateHierarchyStrategy
}
// Set default retention config if not specified
if config.RetentionConfig.RetentionDays == 0 {
config.RetentionConfig = storage.DefaultRetentionConfig()
}
// Set default endpoint suffix if not specified
if config.EndpointSuffix == "" {
config.EndpointSuffix = "core.windows.net"
}
// Create credential
credential, err := azblob.NewSharedKeyCredential(config.AccountName, config.AccountKey)
if err != nil {
return nil, fmt.Errorf("failed to create Azure credential: %w", err)
}
// Create pipeline
pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{})
// Create service URL
serviceURL, err := url.Parse(fmt.Sprintf("https://%s.blob.%s", config.AccountName, config.EndpointSuffix))
if err != nil {
return nil, fmt.Errorf("failed to parse service URL: %w", err)
}
// Create container URL
containerURL := azblob.NewContainerURL(*serviceURL, pipeline)
// Check if container exists and is accessible
ctx := context.Background()
_, err = containerURL.GetProperties(ctx, azblob.LeaseAccessConditions{})
if err != nil {
return nil, fmt.Errorf("failed to access container %s: %w", config.ContainerName, err)
}
// Use container name as virtual base path for path generator
pathGenerator := storage.NewPathGenerator(config.PathStrategy, config.ContainerName)
return &Azure{
containerURL: containerURL,
config: config,
pathGenerator: pathGenerator,
}, nil
}
// Upload uploads a file to Azure Blob Storage
func (a *Azure) Upload(ctx context.Context, key string, reader io.Reader, size int64) error {
blobKey := a.getBlobKey(key)
blobURL := a.containerURL.NewBlockBlobURL(blobKey)
_, err := azblob.UploadStreamToBlockBlob(ctx, reader, blobURL, azblob.UploadStreamToBlockBlobOptions{
BufferSize: 4 * 1024 * 1024, // 4MB buffer
MaxBuffers: 16,
BlobHTTPHeaders: azblob.BlobHTTPHeaders{
ContentType: "application/octet-stream",
},
})
if err != nil {
return fmt.Errorf("failed to upload blob: %w", err)
}
return nil
}
// Download downloads a file from Azure Blob Storage
func (a *Azure) Download(ctx context.Context, key string) (io.ReadCloser, error) {
blobKey := a.getBlobKey(key)
// Try exact key first
blobURL := a.containerURL.NewBlobURL(blobKey)
response, err := blobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
if err == nil {
return response.Body(azblob.RetryReaderOptions{}), nil
}
// For date hierarchy strategy, search in date prefixes
if a.config.PathStrategy == storage.DateHierarchyStrategy {
return a.searchInDatePrefixes(ctx, key)
}
return nil, fmt.Errorf("blob not found: %s", key)
}
// Delete deletes a file from Azure Blob Storage
func (a *Azure) Delete(ctx context.Context, key string) error {
blobKey := a.getBlobKey(key)
blobURL := a.containerURL.NewBlobURL(blobKey)
_, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{})
if err != nil {
return fmt.Errorf("failed to delete blob: %w", err)
}
return nil
}
// Exists checks if a file exists in Azure Blob Storage
func (a *Azure) Exists(ctx context.Context, key string) (bool, error) {
blobKey := a.getBlobKey(key)
// Try exact key first
blobURL := a.containerURL.NewBlobURL(blobKey)
_, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
if err == nil {
return true, nil
}
// For date hierarchy strategy, search in date prefixes
if a.config.PathStrategy == storage.DateHierarchyStrategy {
_, err := a.searchInDatePrefixes(ctx, key)
return err == nil, nil
}
return false, nil
}
// GetSize gets the size of a file in Azure Blob Storage
func (a *Azure) GetSize(ctx context.Context, key string) (int64, error) {
blobKey := a.getBlobKey(key)
blobURL := a.containerURL.NewBlobURL(blobKey)
properties, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
if err != nil {
if a.config.PathStrategy == storage.DateHierarchyStrategy {
return 0, fmt.Errorf("blob not found: %s", key)
}
return 0, err
}
return properties.ContentLength(), nil
}
// Type returns the storage type
func (a *Azure) Type() string {
return "azure"
}
// HealthCheck performs a health check on Azure Blob Storage
func (a *Azure) HealthCheck(ctx context.Context) error {
// Check if container is accessible
_, err := a.containerURL.GetProperties(ctx, azblob.LeaseAccessConditions{})
if err != nil {
return fmt.Errorf("Azure health check failed: %w", err)
}
return nil
}
// GetPathStrategy returns the path strategy
func (a *Azure) GetPathStrategy() storage.PathStrategy {
return a.config.PathStrategy
}
// GetRetentionConfig returns the retention configuration
func (a *Azure) GetRetentionConfig() storage.StorageRetentionConfig {
return a.config.RetentionConfig
}
// getBlobKey resolves the blob key for a given storage key
func (a *Azure) getBlobKey(key string) string {
// Remove container prefix if present (path generator includes it)
if strings.HasPrefix(key, a.config.ContainerName+"/") {
return strings.TrimPrefix(key, a.config.ContainerName+"/")
}
return key
}
// searchInDatePrefixes searches for a blob in date-based prefixes
func (a *Azure) searchInDatePrefixes(ctx context.Context, key string) (io.ReadCloser, error) {
dir := filepath.Dir(key)
filename := filepath.Base(key)
// For flat keys like "sessionID.cast", treat as replays category
if dir == "." {
dir = "replays"
key = "replays/" + filename
}
logger.L().Debug("Searching in date prefixes",
zap.String("original_key", key),
zap.String("category", dir),
zap.String("filename", filename))
// List blobs with category prefix to find date directories
var datePrefixes []string
for marker := (azblob.Marker{}); marker.NotDone(); {
listResponse, err := a.containerURL.ListBlobsHierarchySegment(ctx, marker, "/", azblob.ListBlobsSegmentOptions{
Prefix: dir + "/",
})
if err != nil {
return nil, fmt.Errorf("failed to list blobs: %w", err)
}
// Collect date prefixes from blob prefixes
for _, prefix := range listResponse.Segment.BlobPrefixes {
prefixStr := prefix.Name
// Extract date directory from prefix
parts := strings.Split(strings.TrimPrefix(prefixStr, dir+"/"), "/")
if len(parts) >= 1 {
dateStr := parts[0]
// Check if it looks like a date (YYYY-MM-DD)
if len(dateStr) == 10 && dateStr[4] == '-' && dateStr[7] == '-' {
datePrefixes = append(datePrefixes, prefixStr)
}
}
}
marker = listResponse.NextMarker
}
logger.L().Debug("Found date prefixes",
zap.String("key", key),
zap.Strings("prefixes", datePrefixes))
// Search in date prefixes (newest first by sorting in reverse)
for i := len(datePrefixes) - 1; i >= 0; i-- {
possibleKey := datePrefixes[i] + filename
logger.L().Debug("Trying date prefix",
zap.String("possible_key", possibleKey))
blobURL := a.containerURL.NewBlobURL(possibleKey)
response, err := blobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
if err == nil {
logger.L().Debug("Found blob in date prefix",
zap.String("key", possibleKey))
return response.Body(azblob.RetryReaderOptions{}), nil
}
}
return nil, fmt.Errorf("blob not found in any date prefix: %s (searched %d prefixes)", key, len(datePrefixes))
}
// ParseAzureConfigFromMap creates AzureConfig from string map (for database storage)
func ParseAzureConfigFromMap(configMap map[string]string) (AzureConfig, error) {
config := AzureConfig{}
config.AccountName = configMap["account_name"]
config.AccountKey = configMap["account_key"]
config.ContainerName = configMap["container_name"]
config.EndpointSuffix = configMap["endpoint_suffix"]
// Parse path strategy
if strategyStr, exists := configMap["path_strategy"]; exists {
config.PathStrategy = storage.PathStrategy(strategyStr)
} else {
config.PathStrategy = storage.DateHierarchyStrategy
}
// Parse retention configuration
retentionConfig := storage.DefaultRetentionConfig()
if retentionDaysStr, exists := configMap["retention_days"]; exists {
if days, err := strconv.Atoi(retentionDaysStr); err == nil {
retentionConfig.RetentionDays = days
}
}
if archiveDaysStr, exists := configMap["archive_days"]; exists {
if days, err := strconv.Atoi(archiveDaysStr); err == nil {
retentionConfig.ArchiveDays = days
}
}
if cleanupStr, exists := configMap["cleanup_enabled"]; exists {
retentionConfig.CleanupEnabled = cleanupStr == "true"
}
if archiveStr, exists := configMap["archive_enabled"]; exists {
retentionConfig.ArchiveEnabled = archiveStr == "true"
}
config.RetentionConfig = retentionConfig
return config, nil
}

View File

@@ -0,0 +1,313 @@
package providers
import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"path/filepath"
"strconv"
"strings"
"github.com/tencentyun/cos-go-sdk-v5"
"github.com/veops/oneterm/pkg/logger"
"github.com/veops/oneterm/pkg/storage"
"go.uber.org/zap"
)
// COSConfig holds configuration for Tencent Cloud COS storage
type COSConfig struct {
Region string `json:"region" mapstructure:"region"`
SecretID string `json:"secret_id" mapstructure:"secret_id"`
SecretKey string `json:"secret_key" mapstructure:"secret_key"`
BucketName string `json:"bucket_name" mapstructure:"bucket_name"`
AppID string `json:"app_id" mapstructure:"app_id"`
PathStrategy storage.PathStrategy `json:"path_strategy" mapstructure:"path_strategy"`
RetentionConfig storage.StorageRetentionConfig `json:"retention" mapstructure:"retention"`
}
// COS implements the storage.Provider interface for Tencent Cloud COS storage
type COS struct {
client *cos.Client
config COSConfig
pathGenerator *storage.PathGenerator
}
// NewCOS creates a new Tencent Cloud COS storage provider
func NewCOS(config COSConfig) (storage.Provider, error) {
// Set default path strategy if not specified
if config.PathStrategy == "" {
config.PathStrategy = storage.DateHierarchyStrategy
}
// Set default retention config if not specified
if config.RetentionConfig.RetentionDays == 0 {
config.RetentionConfig = storage.DefaultRetentionConfig()
}
// Construct bucket URL
bucketURL := fmt.Sprintf("https://%s-%s.cos.%s.myqcloud.com", config.BucketName, config.AppID, config.Region)
u, err := url.Parse(bucketURL)
if err != nil {
return nil, fmt.Errorf("failed to parse bucket URL: %w", err)
}
// Create COS client
client := cos.NewClient(&cos.BaseURL{BucketURL: u}, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: config.SecretID,
SecretKey: config.SecretKey,
},
})
// Check if bucket exists and is accessible
ctx := context.Background()
_, err = client.Bucket.Head(ctx)
if err != nil {
return nil, fmt.Errorf("failed to access bucket %s: %w", config.BucketName, err)
}
// Use bucket name as virtual base path for path generator
pathGenerator := storage.NewPathGenerator(config.PathStrategy, config.BucketName)
return &COS{
client: client,
config: config,
pathGenerator: pathGenerator,
}, nil
}
// Upload uploads a file to COS storage
func (c *COS) Upload(ctx context.Context, key string, reader io.Reader, size int64) error {
objectKey := c.getObjectKey(key)
_, err := c.client.Object.Put(ctx, objectKey, reader, &cos.ObjectPutOptions{
ObjectPutHeaderOptions: &cos.ObjectPutHeaderOptions{
ContentType: "application/octet-stream",
},
})
if err != nil {
return fmt.Errorf("failed to upload object: %w", err)
}
return nil
}
// Download downloads a file from COS storage
func (c *COS) Download(ctx context.Context, key string) (io.ReadCloser, error) {
objectKey := c.getObjectKey(key)
// Try exact key first
response, err := c.client.Object.Get(ctx, objectKey, nil)
if err == nil {
return response.Body, nil
}
// For date hierarchy strategy, search in date prefixes
if c.config.PathStrategy == storage.DateHierarchyStrategy {
return c.searchInDatePrefixes(ctx, key)
}
return nil, fmt.Errorf("object not found: %s", key)
}
// Delete deletes a file from COS storage
func (c *COS) Delete(ctx context.Context, key string) error {
objectKey := c.getObjectKey(key)
_, err := c.client.Object.Delete(ctx, objectKey)
if err != nil {
return fmt.Errorf("failed to delete object: %w", err)
}
return nil
}
// Exists checks if a file exists in COS storage
func (c *COS) Exists(ctx context.Context, key string) (bool, error) {
objectKey := c.getObjectKey(key)
// Try exact key first
_, err := c.client.Object.Head(ctx, objectKey, nil)
if err == nil {
return true, nil
}
// For date hierarchy strategy, search in date prefixes
if c.config.PathStrategy == storage.DateHierarchyStrategy {
_, err := c.searchInDatePrefixes(ctx, key)
return err == nil, nil
}
return false, nil
}
// GetSize gets the size of a file in COS storage
func (c *COS) GetSize(ctx context.Context, key string) (int64, error) {
objectKey := c.getObjectKey(key)
response, err := c.client.Object.Head(ctx, objectKey, nil)
if err != nil {
if c.config.PathStrategy == storage.DateHierarchyStrategy {
return 0, fmt.Errorf("object not found: %s", key)
}
return 0, err
}
contentLength := response.Header.Get("Content-Length")
if contentLength == "" {
return 0, nil
}
size, err := strconv.ParseInt(contentLength, 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse content length: %w", err)
}
return size, nil
}
// Type returns the storage type
func (c *COS) Type() string {
return "cos"
}
// HealthCheck performs a health check on COS storage
func (c *COS) HealthCheck(ctx context.Context) error {
// Check if bucket is accessible
_, err := c.client.Bucket.Head(ctx)
if err != nil {
return fmt.Errorf("COS health check failed: %w", err)
}
return nil
}
// GetPathStrategy returns the path strategy
func (c *COS) GetPathStrategy() storage.PathStrategy {
return c.config.PathStrategy
}
// GetRetentionConfig returns the retention configuration
func (c *COS) GetRetentionConfig() storage.StorageRetentionConfig {
return c.config.RetentionConfig
}
// getObjectKey resolves the object key for a given storage key
func (c *COS) getObjectKey(key string) string {
// Remove bucket prefix if present (path generator includes it)
if strings.HasPrefix(key, c.config.BucketName+"/") {
return strings.TrimPrefix(key, c.config.BucketName+"/")
}
return key
}
// searchInDatePrefixes searches for an object in date-based prefixes
func (c *COS) searchInDatePrefixes(ctx context.Context, key string) (io.ReadCloser, error) {
dir := filepath.Dir(key)
filename := filepath.Base(key)
// For flat keys like "sessionID.cast", treat as replays category
if dir == "." {
dir = "replays"
key = "replays/" + filename
}
logger.L().Debug("Searching in date prefixes",
zap.String("original_key", key),
zap.String("category", dir),
zap.String("filename", filename))
// List objects with category prefix to find date directories
var datePrefixes []string
marker := ""
for {
result, _, err := c.client.Bucket.Get(ctx, &cos.BucketGetOptions{
Prefix: dir + "/",
Delimiter: "/",
Marker: marker,
})
if err != nil {
return nil, fmt.Errorf("failed to list objects: %w", err)
}
// Collect date prefixes from common prefixes
for _, prefix := range result.CommonPrefixes {
// Extract date directory from prefix
parts := strings.Split(strings.TrimPrefix(prefix, dir+"/"), "/")
if len(parts) >= 1 {
dateStr := parts[0]
// Check if it looks like a date (YYYY-MM-DD)
if len(dateStr) == 10 && dateStr[4] == '-' && dateStr[7] == '-' {
datePrefixes = append(datePrefixes, prefix)
}
}
}
if !result.IsTruncated {
break
}
marker = result.NextMarker
}
logger.L().Debug("Found date prefixes",
zap.String("key", key),
zap.Strings("prefixes", datePrefixes))
// Search in date prefixes (newest first by sorting in reverse)
for i := len(datePrefixes) - 1; i >= 0; i-- {
possibleKey := datePrefixes[i] + filename
logger.L().Debug("Trying date prefix",
zap.String("possible_key", possibleKey))
response, err := c.client.Object.Get(ctx, possibleKey, nil)
if err == nil {
logger.L().Debug("Found object in date prefix",
zap.String("key", possibleKey))
return response.Body, nil
}
}
return nil, fmt.Errorf("object not found in any date prefix: %s (searched %d prefixes)", key, len(datePrefixes))
}
// ParseCOSConfigFromMap creates COSConfig from string map (for database storage)
func ParseCOSConfigFromMap(configMap map[string]string) (COSConfig, error) {
config := COSConfig{}
config.Region = configMap["region"]
config.SecretID = configMap["secret_id"]
config.SecretKey = configMap["secret_key"]
config.BucketName = configMap["bucket_name"]
config.AppID = configMap["app_id"]
// Parse path strategy
if strategyStr, exists := configMap["path_strategy"]; exists {
config.PathStrategy = storage.PathStrategy(strategyStr)
} else {
config.PathStrategy = storage.DateHierarchyStrategy
}
// Parse retention configuration
retentionConfig := storage.DefaultRetentionConfig()
if retentionDaysStr, exists := configMap["retention_days"]; exists {
if days, err := strconv.Atoi(retentionDaysStr); err == nil {
retentionConfig.RetentionDays = days
}
}
if archiveDaysStr, exists := configMap["archive_days"]; exists {
if days, err := strconv.Atoi(archiveDaysStr); err == nil {
retentionConfig.ArchiveDays = days
}
}
if cleanupStr, exists := configMap["cleanup_enabled"]; exists {
retentionConfig.CleanupEnabled = cleanupStr == "true"
}
if archiveStr, exists := configMap["archive_enabled"]; exists {
retentionConfig.ArchiveEnabled = archiveStr == "true"
}
config.RetentionConfig = retentionConfig
return config, nil
}

View File

@@ -0,0 +1,311 @@
package providers
import (
"context"
"fmt"
"io"
"path/filepath"
"strconv"
"strings"
"github.com/huaweicloud/huaweicloud-sdk-go-obs/obs"
"github.com/veops/oneterm/pkg/logger"
"github.com/veops/oneterm/pkg/storage"
"go.uber.org/zap"
)
// OBSConfig holds configuration for Huawei Cloud OBS storage
type OBSConfig struct {
Endpoint string `json:"endpoint" mapstructure:"endpoint"`
AccessKeyID string `json:"access_key_id" mapstructure:"access_key_id"`
SecretAccessKey string `json:"secret_access_key" mapstructure:"secret_access_key"`
BucketName string `json:"bucket_name" mapstructure:"bucket_name"`
PathStrategy storage.PathStrategy `json:"path_strategy" mapstructure:"path_strategy"`
RetentionConfig storage.StorageRetentionConfig `json:"retention" mapstructure:"retention"`
}
// OBS implements the storage.Provider interface for Huawei Cloud OBS storage
type OBS struct {
client *obs.ObsClient
config OBSConfig
pathGenerator *storage.PathGenerator
}
// NewOBS creates a new Huawei Cloud OBS storage provider
func NewOBS(config OBSConfig) (storage.Provider, error) {
// Set default path strategy if not specified
if config.PathStrategy == "" {
config.PathStrategy = storage.DateHierarchyStrategy
}
// Set default retention config if not specified
if config.RetentionConfig.RetentionDays == 0 {
config.RetentionConfig = storage.DefaultRetentionConfig()
}
// Create OBS client
client, err := obs.New(config.AccessKeyID, config.SecretAccessKey, config.Endpoint)
if err != nil {
return nil, fmt.Errorf("failed to create OBS client: %w", err)
}
// Check if bucket exists and is accessible
_, err = client.HeadBucket(config.BucketName)
if err != nil {
return nil, fmt.Errorf("failed to access bucket %s: %w", config.BucketName, err)
}
// Use bucket name as virtual base path for path generator
pathGenerator := storage.NewPathGenerator(config.PathStrategy, config.BucketName)
return &OBS{
client: client,
config: config,
pathGenerator: pathGenerator,
}, nil
}
// Upload uploads a file to OBS storage
func (o *OBS) Upload(ctx context.Context, key string, reader io.Reader, size int64) error {
objectKey := o.getObjectKey(key)
input := &obs.PutObjectInput{}
input.Bucket = o.config.BucketName
input.Key = objectKey
input.Body = reader
input.ContentType = "application/octet-stream"
_, err := o.client.PutObject(input)
if err != nil {
return fmt.Errorf("failed to upload object: %w", err)
}
return nil
}
// Download downloads a file from OBS storage
func (o *OBS) Download(ctx context.Context, key string) (io.ReadCloser, error) {
objectKey := o.getObjectKey(key)
// Try exact key first
input := &obs.GetObjectInput{}
input.Bucket = o.config.BucketName
input.Key = objectKey
output, err := o.client.GetObject(input)
if err == nil {
return output.Body, nil
}
// For date hierarchy strategy, search in date prefixes
if o.config.PathStrategy == storage.DateHierarchyStrategy {
return o.searchInDatePrefixes(ctx, key)
}
return nil, fmt.Errorf("object not found: %s", key)
}
// Delete deletes a file from OBS storage
func (o *OBS) Delete(ctx context.Context, key string) error {
objectKey := o.getObjectKey(key)
input := &obs.DeleteObjectInput{}
input.Bucket = o.config.BucketName
input.Key = objectKey
_, err := o.client.DeleteObject(input)
if err != nil {
return fmt.Errorf("failed to delete object: %w", err)
}
return nil
}
// Exists checks if a file exists in OBS storage
func (o *OBS) Exists(ctx context.Context, key string) (bool, error) {
objectKey := o.getObjectKey(key)
// Try exact key first
input := &obs.GetObjectMetadataInput{}
input.Bucket = o.config.BucketName
input.Key = objectKey
_, err := o.client.GetObjectMetadata(input)
if err == nil {
return true, nil
}
// For date hierarchy strategy, search in date prefixes
if o.config.PathStrategy == storage.DateHierarchyStrategy {
_, err := o.searchInDatePrefixes(ctx, key)
return err == nil, nil
}
return false, nil
}
// GetSize gets the size of a file in OBS storage
func (o *OBS) GetSize(ctx context.Context, key string) (int64, error) {
objectKey := o.getObjectKey(key)
input := &obs.GetObjectMetadataInput{}
input.Bucket = o.config.BucketName
input.Key = objectKey
output, err := o.client.GetObjectMetadata(input)
if err != nil {
if o.config.PathStrategy == storage.DateHierarchyStrategy {
return 0, fmt.Errorf("object not found: %s", key)
}
return 0, err
}
return output.ContentLength, nil
}
// Type returns the storage type
func (o *OBS) Type() string {
return "obs"
}
// HealthCheck performs a health check on OBS storage
func (o *OBS) HealthCheck(ctx context.Context) error {
// Check if bucket is accessible
_, err := o.client.HeadBucket(o.config.BucketName)
if err != nil {
return fmt.Errorf("OBS health check failed: %w", err)
}
return nil
}
// GetPathStrategy returns the path strategy
func (o *OBS) GetPathStrategy() storage.PathStrategy {
return o.config.PathStrategy
}
// GetRetentionConfig returns the retention configuration
func (o *OBS) GetRetentionConfig() storage.StorageRetentionConfig {
return o.config.RetentionConfig
}
// getObjectKey resolves the object key for a given storage key
func (o *OBS) getObjectKey(key string) string {
// Remove bucket prefix if present (path generator includes it)
if strings.HasPrefix(key, o.config.BucketName+"/") {
return strings.TrimPrefix(key, o.config.BucketName+"/")
}
return key
}
// searchInDatePrefixes searches for an object in date-based prefixes
func (o *OBS) searchInDatePrefixes(ctx context.Context, key string) (io.ReadCloser, error) {
dir := filepath.Dir(key)
filename := filepath.Base(key)
// For flat keys like "sessionID.cast", treat as replays category
if dir == "." {
dir = "replays"
key = "replays/" + filename
}
logger.L().Debug("Searching in date prefixes",
zap.String("original_key", key),
zap.String("category", dir),
zap.String("filename", filename))
// List objects with category prefix to find date directories
var datePrefixes []string
input := &obs.ListObjectsInput{}
input.Bucket = o.config.BucketName
input.Prefix = dir + "/"
input.Delimiter = "/"
for {
output, err := o.client.ListObjects(input)
if err != nil {
return nil, fmt.Errorf("failed to list objects: %w", err)
}
// Collect date prefixes from common prefixes
for _, prefix := range output.CommonPrefixes {
// Extract date directory from prefix
parts := strings.Split(strings.TrimPrefix(prefix, dir+"/"), "/")
if len(parts) >= 1 {
dateStr := parts[0]
// Check if it looks like a date (YYYY-MM-DD)
if len(dateStr) == 10 && dateStr[4] == '-' && dateStr[7] == '-' {
datePrefixes = append(datePrefixes, prefix)
}
}
}
if !output.IsTruncated {
break
}
input.Marker = output.NextMarker
}
logger.L().Debug("Found date prefixes",
zap.String("key", key),
zap.Strings("prefixes", datePrefixes))
// Search in date prefixes (newest first by sorting in reverse)
for i := len(datePrefixes) - 1; i >= 0; i-- {
possibleKey := datePrefixes[i] + filename
logger.L().Debug("Trying date prefix",
zap.String("possible_key", possibleKey))
getInput := &obs.GetObjectInput{}
getInput.Bucket = o.config.BucketName
getInput.Key = possibleKey
output, err := o.client.GetObject(getInput)
if err == nil {
logger.L().Debug("Found object in date prefix",
zap.String("key", possibleKey))
return output.Body, nil
}
}
return nil, fmt.Errorf("object not found in any date prefix: %s (searched %d prefixes)", key, len(datePrefixes))
}
// ParseOBSConfigFromMap creates OBSConfig from string map (for database storage)
func ParseOBSConfigFromMap(configMap map[string]string) (OBSConfig, error) {
config := OBSConfig{}
config.Endpoint = configMap["endpoint"]
config.AccessKeyID = configMap["access_key_id"]
config.SecretAccessKey = configMap["secret_access_key"]
config.BucketName = configMap["bucket_name"]
// Parse path strategy
if strategyStr, exists := configMap["path_strategy"]; exists {
config.PathStrategy = storage.PathStrategy(strategyStr)
} else {
config.PathStrategy = storage.DateHierarchyStrategy
}
// Parse retention configuration
retentionConfig := storage.DefaultRetentionConfig()
if retentionDaysStr, exists := configMap["retention_days"]; exists {
if days, err := strconv.Atoi(retentionDaysStr); err == nil {
retentionConfig.RetentionDays = days
}
}
if archiveDaysStr, exists := configMap["archive_days"]; exists {
if days, err := strconv.Atoi(archiveDaysStr); err == nil {
retentionConfig.ArchiveDays = days
}
}
if cleanupStr, exists := configMap["cleanup_enabled"]; exists {
retentionConfig.CleanupEnabled = cleanupStr == "true"
}
if archiveStr, exists := configMap["archive_enabled"]; exists {
retentionConfig.ArchiveEnabled = archiveStr == "true"
}
config.RetentionConfig = retentionConfig
return config, nil
}

View File

@@ -0,0 +1,525 @@
package providers
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"fmt"
"io"
"net/http"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
"github.com/veops/oneterm/pkg/logger"
"github.com/veops/oneterm/pkg/storage"
"go.uber.org/zap"
)
// OOSConfig holds configuration for China Telecom Cloud Object Storage Service
type OOSConfig struct {
Endpoint string `json:"endpoint" mapstructure:"endpoint"`
AccessKeyID string `json:"access_key_id" mapstructure:"access_key_id"`
SecretAccessKey string `json:"secret_access_key" mapstructure:"secret_access_key"`
BucketName string `json:"bucket_name" mapstructure:"bucket_name"`
Region string `json:"region" mapstructure:"region"`
PathStrategy storage.PathStrategy `json:"path_strategy" mapstructure:"path_strategy"`
RetentionConfig storage.StorageRetentionConfig `json:"retention" mapstructure:"retention"`
}
// OOS implements the storage.Provider interface for China Telecom Cloud Object Storage Service
type OOS struct {
config OOSConfig
httpClient *http.Client
pathGenerator *storage.PathGenerator
}
// NewOOS creates a new China Telecom Cloud Object Storage Service provider
func NewOOS(config OOSConfig) (storage.Provider, error) {
// Set default path strategy if not specified
if config.PathStrategy == "" {
config.PathStrategy = storage.DateHierarchyStrategy
}
// Set default retention config if not specified
if config.RetentionConfig.RetentionDays == 0 {
config.RetentionConfig = storage.DefaultRetentionConfig()
}
// Validate required fields
if config.Endpoint == "" || config.AccessKeyID == "" || config.SecretAccessKey == "" || config.BucketName == "" {
return nil, fmt.Errorf("endpoint, access_key_id, secret_access_key, and bucket_name are required")
}
// Create HTTP client
httpClient := &http.Client{
Timeout: 30 * time.Second,
}
// Use bucket name as virtual base path for path generator
pathGenerator := storage.NewPathGenerator(config.PathStrategy, config.BucketName)
oos := &OOS{
config: config,
httpClient: httpClient,
pathGenerator: pathGenerator,
}
// Test connection
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := oos.HealthCheck(ctx); err != nil {
return nil, fmt.Errorf("failed to connect to OOS: %w", err)
}
return oos, nil
}
// Upload uploads a file to OOS storage
func (o *OOS) Upload(ctx context.Context, key string, reader io.Reader, size int64) error {
objectKey := o.getObjectKey(key)
// Read all data from reader
data, err := io.ReadAll(reader)
if err != nil {
return fmt.Errorf("failed to read data: %w", err)
}
// Create PUT request
url := fmt.Sprintf("%s/%s/%s", strings.TrimRight(o.config.Endpoint, "/"), o.config.BucketName, objectKey)
req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewReader(data))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
// Set headers
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Content-Length", strconv.FormatInt(int64(len(data)), 10))
req.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
// Sign request
if err := o.signRequest(req, "PUT", objectKey, data); err != nil {
return fmt.Errorf("failed to sign request: %w", err)
}
// Send request
resp, err := o.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(body))
}
return nil
}
// Download downloads a file from OOS storage
func (o *OOS) Download(ctx context.Context, key string) (io.ReadCloser, error) {
objectKey := o.getObjectKey(key)
// Try exact key first
reader, err := o.downloadObject(ctx, objectKey)
if err == nil {
return reader, nil
}
// For date hierarchy strategy, search in date prefixes
if o.config.PathStrategy == storage.DateHierarchyStrategy {
return o.searchInDatePrefixes(ctx, key)
}
return nil, fmt.Errorf("object not found: %s", key)
}
// Delete deletes a file from OOS storage
func (o *OOS) Delete(ctx context.Context, key string) error {
objectKey := o.getObjectKey(key)
// Create DELETE request
url := fmt.Sprintf("%s/%s/%s", strings.TrimRight(o.config.Endpoint, "/"), o.config.BucketName, objectKey)
req, err := http.NewRequestWithContext(ctx, "DELETE", url, nil)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
// Set headers
req.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
// Sign request
if err := o.signRequest(req, "DELETE", objectKey, nil); err != nil {
return fmt.Errorf("failed to sign request: %w", err)
}
// Send request
resp, err := o.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("delete failed with status %d: %s", resp.StatusCode, string(body))
}
return nil
}
// Exists checks if a file exists in OOS storage
func (o *OOS) Exists(ctx context.Context, key string) (bool, error) {
objectKey := o.getObjectKey(key)
// Try exact key first
exists, err := o.objectExists(ctx, objectKey)
if err == nil && exists {
return true, nil
}
// For date hierarchy strategy, search in date prefixes
if o.config.PathStrategy == storage.DateHierarchyStrategy {
_, err := o.searchInDatePrefixes(ctx, key)
return err == nil, nil
}
return false, nil
}
// GetSize gets the size of a file in OOS storage
func (o *OOS) GetSize(ctx context.Context, key string) (int64, error) {
objectKey := o.getObjectKey(key)
// Create HEAD request
url := fmt.Sprintf("%s/%s/%s", strings.TrimRight(o.config.Endpoint, "/"), o.config.BucketName, objectKey)
req, err := http.NewRequestWithContext(ctx, "HEAD", url, nil)
if err != nil {
return 0, fmt.Errorf("failed to create request: %w", err)
}
// Set headers
req.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
// Sign request
if err := o.signRequest(req, "HEAD", objectKey, nil); err != nil {
return 0, fmt.Errorf("failed to sign request: %w", err)
}
// Send request
resp, err := o.httpClient.Do(req)
if err != nil {
return 0, fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("object not found: %s", key)
}
contentLength := resp.Header.Get("Content-Length")
if contentLength == "" {
return 0, nil
}
size, err := strconv.ParseInt(contentLength, 10, 64)
if err != nil {
return 0, fmt.Errorf("invalid content length: %s", contentLength)
}
return size, nil
}
// Type returns the storage type
func (o *OOS) Type() string {
return "oos"
}
// HealthCheck performs a health check on OOS storage
func (o *OOS) HealthCheck(ctx context.Context) error {
// Check if bucket is accessible by listing objects with limit 1
url := fmt.Sprintf("%s/%s?max-keys=1", strings.TrimRight(o.config.Endpoint, "/"), o.config.BucketName)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
// Set headers
req.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
// Sign request
if err := o.signRequest(req, "GET", "", nil); err != nil {
return fmt.Errorf("failed to sign request: %w", err)
}
// Send request
resp, err := o.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("health check failed with status %d: %s", resp.StatusCode, string(body))
}
return nil
}
// GetPathStrategy returns the path strategy
func (o *OOS) GetPathStrategy() storage.PathStrategy {
return o.config.PathStrategy
}
// GetRetentionConfig returns the retention configuration
func (o *OOS) GetRetentionConfig() storage.StorageRetentionConfig {
return o.config.RetentionConfig
}
// getObjectKey generates the object key based on the path strategy
func (o *OOS) getObjectKey(key string) string {
if o.pathGenerator != nil {
// For session replay files, use the replay path generator
if strings.HasSuffix(key, ".cast") {
sessionID := strings.TrimSuffix(key, ".cast")
return o.pathGenerator.GenerateReplayPath(sessionID, time.Now())
}
// For other files, generate path based on strategy
switch o.pathGenerator.Strategy {
case storage.DateHierarchyStrategy:
dateDir := time.Now().Format("2006/01/02")
return filepath.Join(dateDir, key)
case storage.FlatStrategy:
fallthrough
default:
return key
}
}
return key
}
// downloadObject downloads an object from OOS
func (o *OOS) downloadObject(ctx context.Context, objectKey string) (io.ReadCloser, error) {
// Create GET request
url := fmt.Sprintf("%s/%s/%s", strings.TrimRight(o.config.Endpoint, "/"), o.config.BucketName, objectKey)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
// Set headers
req.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
// Sign request
if err := o.signRequest(req, "GET", objectKey, nil); err != nil {
return nil, fmt.Errorf("failed to sign request: %w", err)
}
// Send request
resp, err := o.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to send request: %w", err)
}
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
return nil, fmt.Errorf("download failed with status %d", resp.StatusCode)
}
return resp.Body, nil
}
// objectExists checks if an object exists in OOS
func (o *OOS) objectExists(ctx context.Context, objectKey string) (bool, error) {
// Create HEAD request
url := fmt.Sprintf("%s/%s/%s", strings.TrimRight(o.config.Endpoint, "/"), o.config.BucketName, objectKey)
req, err := http.NewRequestWithContext(ctx, "HEAD", url, nil)
if err != nil {
return false, fmt.Errorf("failed to create request: %w", err)
}
// Set headers
req.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
// Sign request
if err := o.signRequest(req, "HEAD", objectKey, nil); err != nil {
return false, fmt.Errorf("failed to sign request: %w", err)
}
// Send request
resp, err := o.httpClient.Do(req)
if err != nil {
return false, fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK, nil
}
// searchInDatePrefixes searches for a file in date-based prefixes
func (o *OOS) searchInDatePrefixes(ctx context.Context, key string) (io.ReadCloser, error) {
// Generate possible date prefixes for the last 30 days
now := time.Now()
for i := 0; i < 30; i++ {
date := now.AddDate(0, 0, -i)
datePrefix := date.Format("2006/01/02")
objectKey := filepath.Join(datePrefix, key)
reader, err := o.downloadObject(ctx, objectKey)
if err == nil {
return reader, nil
}
// Log the attempt for debugging
logger.L().Debug("Searched for object in date prefix",
zap.String("date_prefix", datePrefix),
zap.String("object_key", objectKey),
zap.Error(err))
}
return nil, fmt.Errorf("object not found in any date prefix: %s", key)
}
// signRequest signs an HTTP request using OOS signature algorithm
func (o *OOS) signRequest(req *http.Request, method, objectKey string, body []byte) error {
// Get date from header
date := req.Header.Get("Date")
if date == "" {
date = time.Now().UTC().Format(http.TimeFormat)
req.Header.Set("Date", date)
}
// Build string to sign
stringToSign := o.buildStringToSign(method, objectKey, req.Header, date)
// Calculate signature
signature := o.calculateSignature(stringToSign)
// Set authorization header
auth := fmt.Sprintf("AWS %s:%s", o.config.AccessKeyID, signature)
req.Header.Set("Authorization", auth)
return nil
}
// buildStringToSign builds the string to sign for OOS authentication
func (o *OOS) buildStringToSign(method, objectKey string, headers http.Header, date string) string {
var parts []string
// HTTP method
parts = append(parts, method)
// Content-MD5 (usually empty)
parts = append(parts, headers.Get("Content-MD5"))
// Content-Type
parts = append(parts, headers.Get("Content-Type"))
// Date
parts = append(parts, date)
// Canonicalized OOS headers (x-oos-*)
canonicalizedHeaders := o.getCanonicalizedHeaders(headers)
if canonicalizedHeaders != "" {
parts = append(parts, canonicalizedHeaders)
}
// Canonicalized resource
resource := fmt.Sprintf("/%s", o.config.BucketName)
if objectKey != "" {
resource += "/" + objectKey
}
parts = append(parts, resource)
return strings.Join(parts, "\n")
}
// getCanonicalizedHeaders gets canonicalized OOS headers
func (o *OOS) getCanonicalizedHeaders(headers http.Header) string {
var oosHeaders []string
for key, values := range headers {
lowerKey := strings.ToLower(key)
if strings.HasPrefix(lowerKey, "x-oos-") {
for _, value := range values {
oosHeaders = append(oosHeaders, fmt.Sprintf("%s:%s", lowerKey, strings.TrimSpace(value)))
}
}
}
if len(oosHeaders) == 0 {
return ""
}
sort.Strings(oosHeaders)
return strings.Join(oosHeaders, "\n") + "\n"
}
// calculateSignature calculates the HMAC-SHA1 signature
func (o *OOS) calculateSignature(stringToSign string) string {
h := hmac.New(sha1.New, []byte(o.config.SecretAccessKey))
h.Write([]byte(stringToSign))
return base64.StdEncoding.EncodeToString(h.Sum(nil))
}
// ParseOOSConfigFromMap parses OOS configuration from a map
func ParseOOSConfigFromMap(configMap map[string]string) (OOSConfig, error) {
config := OOSConfig{}
// Required fields
if endpoint, ok := configMap["endpoint"]; ok {
config.Endpoint = endpoint
} else {
return config, fmt.Errorf("endpoint is required")
}
if accessKeyID, ok := configMap["access_key_id"]; ok {
config.AccessKeyID = accessKeyID
} else {
return config, fmt.Errorf("access_key_id is required")
}
if secretAccessKey, ok := configMap["secret_access_key"]; ok {
config.SecretAccessKey = secretAccessKey
} else {
return config, fmt.Errorf("secret_access_key is required")
}
if bucketName, ok := configMap["bucket_name"]; ok {
config.BucketName = bucketName
} else {
return config, fmt.Errorf("bucket_name is required")
}
// Optional fields
if region, ok := configMap["region"]; ok {
config.Region = region
}
// Parse path strategy
if pathStrategy, ok := configMap["path_strategy"]; ok {
config.PathStrategy = storage.PathStrategy(pathStrategy)
} else {
config.PathStrategy = storage.DateHierarchyStrategy
}
// Parse retention config
if retentionDaysStr, ok := configMap["retention_days"]; ok {
if retentionDays, err := strconv.Atoi(retentionDaysStr); err == nil {
config.RetentionConfig.RetentionDays = retentionDays
}
}
if config.RetentionConfig.RetentionDays == 0 {
config.RetentionConfig = storage.DefaultRetentionConfig()
}
return config, nil
}

View File

@@ -0,0 +1,308 @@
package providers
import (
"context"
"fmt"
"io"
"path/filepath"
"strconv"
"strings"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/veops/oneterm/pkg/logger"
"github.com/veops/oneterm/pkg/storage"
"go.uber.org/zap"
)
// OSSConfig holds configuration for Alibaba Cloud OSS storage
type OSSConfig struct {
Endpoint string `json:"endpoint" mapstructure:"endpoint"`
AccessKeyID string `json:"access_key_id" mapstructure:"access_key_id"`
AccessKeySecret string `json:"access_key_secret" mapstructure:"access_key_secret"`
BucketName string `json:"bucket_name" mapstructure:"bucket_name"`
PathStrategy storage.PathStrategy `json:"path_strategy" mapstructure:"path_strategy"`
RetentionConfig storage.StorageRetentionConfig `json:"retention" mapstructure:"retention"`
}
// OSS implements the storage.Provider interface for Alibaba Cloud OSS storage
type OSS struct {
client *oss.Client
bucket *oss.Bucket
config OSSConfig
pathGenerator *storage.PathGenerator
}
// NewOSS creates a new Alibaba Cloud OSS storage provider
func NewOSS(config OSSConfig) (storage.Provider, error) {
// Set default path strategy if not specified
if config.PathStrategy == "" {
config.PathStrategy = storage.DateHierarchyStrategy
}
// Set default retention config if not specified
if config.RetentionConfig.RetentionDays == 0 {
config.RetentionConfig = storage.DefaultRetentionConfig()
}
// Create OSS client
client, err := oss.New(config.Endpoint, config.AccessKeyID, config.AccessKeySecret)
if err != nil {
return nil, fmt.Errorf("failed to create OSS client: %w", err)
}
// Get bucket
bucket, err := client.Bucket(config.BucketName)
if err != nil {
return nil, fmt.Errorf("failed to get bucket %s: %w", config.BucketName, err)
}
// Check if bucket exists and is accessible
exists, err := client.IsBucketExist(config.BucketName)
if err != nil {
return nil, fmt.Errorf("failed to check bucket existence: %w", err)
}
if !exists {
return nil, fmt.Errorf("bucket %s does not exist", config.BucketName)
}
// Use bucket name as virtual base path for path generator
pathGenerator := storage.NewPathGenerator(config.PathStrategy, config.BucketName)
return &OSS{
client: client,
bucket: bucket,
config: config,
pathGenerator: pathGenerator,
}, nil
}
// Upload uploads a file to OSS storage
func (o *OSS) Upload(ctx context.Context, key string, reader io.Reader, size int64) error {
objectKey := o.getObjectKey(key)
err := o.bucket.PutObject(objectKey, reader, oss.ContentType("application/octet-stream"))
if err != nil {
return fmt.Errorf("failed to upload object: %w", err)
}
return nil
}
// Download downloads a file from OSS storage
func (o *OSS) Download(ctx context.Context, key string) (io.ReadCloser, error) {
objectKey := o.getObjectKey(key)
// Try exact key first
reader, err := o.bucket.GetObject(objectKey)
if err == nil {
return reader, nil
}
// For date hierarchy strategy, search in date prefixes
if o.config.PathStrategy == storage.DateHierarchyStrategy {
return o.searchInDatePrefixes(ctx, key)
}
return nil, fmt.Errorf("object not found: %s", key)
}
// Delete deletes a file from OSS storage
func (o *OSS) Delete(ctx context.Context, key string) error {
objectKey := o.getObjectKey(key)
err := o.bucket.DeleteObject(objectKey)
if err != nil {
return fmt.Errorf("failed to delete object: %w", err)
}
return nil
}
// Exists checks if a file exists in OSS storage
func (o *OSS) Exists(ctx context.Context, key string) (bool, error) {
objectKey := o.getObjectKey(key)
// Try exact key first
exists, err := o.bucket.IsObjectExist(objectKey)
if err != nil {
return false, err
}
if exists {
return true, nil
}
// For date hierarchy strategy, search in date prefixes
if o.config.PathStrategy == storage.DateHierarchyStrategy {
_, err := o.searchInDatePrefixes(ctx, key)
return err == nil, nil
}
return false, nil
}
// GetSize gets the size of a file in OSS storage
func (o *OSS) GetSize(ctx context.Context, key string) (int64, error) {
objectKey := o.getObjectKey(key)
meta, err := o.bucket.GetObjectMeta(objectKey)
if err != nil {
if o.config.PathStrategy == storage.DateHierarchyStrategy {
return 0, fmt.Errorf("object not found: %s", key)
}
return 0, err
}
contentLength := meta.Get("Content-Length")
if contentLength == "" {
return 0, nil
}
size, err := strconv.ParseInt(contentLength, 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse content length: %w", err)
}
return size, nil
}
// Type returns the storage type
func (o *OSS) Type() string {
return "oss"
}
// HealthCheck performs a health check on OSS storage
func (o *OSS) HealthCheck(ctx context.Context) error {
// Check if bucket is accessible
exists, err := o.client.IsBucketExist(o.config.BucketName)
if err != nil {
return fmt.Errorf("OSS health check failed: %w", err)
}
if !exists {
return fmt.Errorf("bucket %s does not exist", o.config.BucketName)
}
return nil
}
// GetPathStrategy returns the path strategy
func (o *OSS) GetPathStrategy() storage.PathStrategy {
return o.config.PathStrategy
}
// GetRetentionConfig returns the retention configuration
func (o *OSS) GetRetentionConfig() storage.StorageRetentionConfig {
return o.config.RetentionConfig
}
// getObjectKey resolves the object key for a given storage key
func (o *OSS) getObjectKey(key string) string {
// Remove bucket prefix if present (path generator includes it)
if strings.HasPrefix(key, o.config.BucketName+"/") {
return strings.TrimPrefix(key, o.config.BucketName+"/")
}
return key
}
// searchInDatePrefixes searches for an object in date-based prefixes
func (o *OSS) searchInDatePrefixes(ctx context.Context, key string) (io.ReadCloser, error) {
dir := filepath.Dir(key)
filename := filepath.Base(key)
// For flat keys like "sessionID.cast", treat as replays category
if dir == "." {
dir = "replays"
key = "replays/" + filename
}
logger.L().Debug("Searching in date prefixes",
zap.String("original_key", key),
zap.String("category", dir),
zap.String("filename", filename))
// List objects with category prefix to find date directories
var datePrefixes []string
marker := ""
for {
result, err := o.bucket.ListObjects(oss.Prefix(dir+"/"), oss.Delimiter("/"), oss.Marker(marker))
if err != nil {
return nil, fmt.Errorf("failed to list objects: %w", err)
}
// Collect date prefixes from common prefixes
for _, prefix := range result.CommonPrefixes {
// Extract date directory from prefix
parts := strings.Split(strings.TrimPrefix(prefix, dir+"/"), "/")
if len(parts) >= 1 {
dateStr := parts[0]
// Check if it looks like a date (YYYY-MM-DD)
if len(dateStr) == 10 && dateStr[4] == '-' && dateStr[7] == '-' {
datePrefixes = append(datePrefixes, prefix)
}
}
}
if !result.IsTruncated {
break
}
marker = result.NextMarker
}
logger.L().Debug("Found date prefixes",
zap.String("key", key),
zap.Strings("prefixes", datePrefixes))
// Search in date prefixes (newest first by sorting in reverse)
for i := len(datePrefixes) - 1; i >= 0; i-- {
possibleKey := datePrefixes[i] + filename
logger.L().Debug("Trying date prefix",
zap.String("possible_key", possibleKey))
reader, err := o.bucket.GetObject(possibleKey)
if err == nil {
logger.L().Debug("Found object in date prefix",
zap.String("key", possibleKey))
return reader, nil
}
}
return nil, fmt.Errorf("object not found in any date prefix: %s (searched %d prefixes)", key, len(datePrefixes))
}
// ParseOSSConfigFromMap creates OSSConfig from string map (for database storage)
func ParseOSSConfigFromMap(configMap map[string]string) (OSSConfig, error) {
config := OSSConfig{}
config.Endpoint = configMap["endpoint"]
config.AccessKeyID = configMap["access_key_id"]
config.AccessKeySecret = configMap["access_key_secret"]
config.BucketName = configMap["bucket_name"]
// Parse path strategy
if strategyStr, exists := configMap["path_strategy"]; exists {
config.PathStrategy = storage.PathStrategy(strategyStr)
} else {
config.PathStrategy = storage.DateHierarchyStrategy
}
// Parse retention configuration
retentionConfig := storage.DefaultRetentionConfig()
if retentionDaysStr, exists := configMap["retention_days"]; exists {
if days, err := strconv.Atoi(retentionDaysStr); err == nil {
retentionConfig.RetentionDays = days
}
}
if archiveDaysStr, exists := configMap["archive_days"]; exists {
if days, err := strconv.Atoi(archiveDaysStr); err == nil {
retentionConfig.ArchiveDays = days
}
}
if cleanupStr, exists := configMap["cleanup_enabled"]; exists {
retentionConfig.CleanupEnabled = cleanupStr == "true"
}
if archiveStr, exists := configMap["archive_enabled"]; exists {
retentionConfig.ArchiveEnabled = archiveStr == "true"
}
config.RetentionConfig = retentionConfig
return config, nil
}

View File

@@ -0,0 +1,362 @@
package providers
import (
"context"
"fmt"
"io"
"path/filepath"
"strconv"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/veops/oneterm/pkg/logger"
"github.com/veops/oneterm/pkg/storage"
"go.uber.org/zap"
)
// S3Config holds configuration for AWS S3 storage
type S3Config struct {
Region string `json:"region" mapstructure:"region"`
AccessKeyID string `json:"access_key_id" mapstructure:"access_key_id"`
SecretAccessKey string `json:"secret_access_key" mapstructure:"secret_access_key"`
BucketName string `json:"bucket_name" mapstructure:"bucket_name"`
Endpoint string `json:"endpoint" mapstructure:"endpoint"` // For S3-compatible services
UseSSL bool `json:"use_ssl" mapstructure:"use_ssl"`
PathStrategy storage.PathStrategy `json:"path_strategy" mapstructure:"path_strategy"`
RetentionConfig storage.StorageRetentionConfig `json:"retention" mapstructure:"retention"`
}
// S3 implements the storage.Provider interface for AWS S3 storage
type S3 struct {
client *s3.S3
uploader *s3manager.Uploader
downloader *s3manager.Downloader
config S3Config
pathGenerator *storage.PathGenerator
}
// NewS3 creates a new S3 storage provider
func NewS3(config S3Config) (storage.Provider, error) {
// Set default path strategy if not specified
if config.PathStrategy == "" {
config.PathStrategy = storage.DateHierarchyStrategy
}
// Set default retention config if not specified
if config.RetentionConfig.RetentionDays == 0 {
config.RetentionConfig = storage.DefaultRetentionConfig()
}
// Set default region if not specified
if config.Region == "" {
config.Region = "us-east-1"
}
// Create AWS config
awsConfig := &aws.Config{
Region: aws.String(config.Region),
}
// Set credentials if provided
if config.AccessKeyID != "" && config.SecretAccessKey != "" {
awsConfig.Credentials = credentials.NewStaticCredentials(
config.AccessKeyID,
config.SecretAccessKey,
"",
)
}
// Set custom endpoint if provided (for S3-compatible services)
if config.Endpoint != "" {
awsConfig.Endpoint = aws.String(config.Endpoint)
awsConfig.S3ForcePathStyle = aws.Bool(true)
}
// Set SSL configuration
awsConfig.DisableSSL = aws.Bool(!config.UseSSL)
// Create session
sess, err := session.NewSession(awsConfig)
if err != nil {
return nil, fmt.Errorf("failed to create AWS session: %w", err)
}
// Create S3 client
client := s3.New(sess)
// Check if bucket exists and is accessible
ctx := context.Background()
_, err = client.HeadBucketWithContext(ctx, &s3.HeadBucketInput{
Bucket: aws.String(config.BucketName),
})
if err != nil {
return nil, fmt.Errorf("failed to access bucket %s: %w", config.BucketName, err)
}
// Create uploader and downloader
uploader := s3manager.NewUploaderWithClient(client)
downloader := s3manager.NewDownloaderWithClient(client)
// Use bucket name as virtual base path for path generator
pathGenerator := storage.NewPathGenerator(config.PathStrategy, config.BucketName)
return &S3{
client: client,
uploader: uploader,
downloader: downloader,
config: config,
pathGenerator: pathGenerator,
}, nil
}
// Upload uploads a file to S3 storage
func (s *S3) Upload(ctx context.Context, key string, reader io.Reader, size int64) error {
objectKey := s.getObjectKey(key)
_, err := s.uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(s.config.BucketName),
Key: aws.String(objectKey),
Body: reader,
ContentType: aws.String("application/octet-stream"),
})
if err != nil {
return fmt.Errorf("failed to upload object: %w", err)
}
return nil
}
// Download downloads a file from S3 storage
func (s *S3) Download(ctx context.Context, key string) (io.ReadCloser, error) {
objectKey := s.getObjectKey(key)
// Try exact key first
output, err := s.client.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(s.config.BucketName),
Key: aws.String(objectKey),
})
if err == nil {
return output.Body, nil
}
// For date hierarchy strategy, search in date prefixes
if s.config.PathStrategy == storage.DateHierarchyStrategy {
return s.searchInDatePrefixes(ctx, key)
}
return nil, fmt.Errorf("object not found: %s", key)
}
// Delete deletes a file from S3 storage
func (s *S3) Delete(ctx context.Context, key string) error {
objectKey := s.getObjectKey(key)
_, err := s.client.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(s.config.BucketName),
Key: aws.String(objectKey),
})
if err != nil {
return fmt.Errorf("failed to delete object: %w", err)
}
return nil
}
// Exists checks if a file exists in S3 storage
func (s *S3) Exists(ctx context.Context, key string) (bool, error) {
objectKey := s.getObjectKey(key)
// Try exact key first
_, err := s.client.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
Bucket: aws.String(s.config.BucketName),
Key: aws.String(objectKey),
})
if err == nil {
return true, nil
}
// For date hierarchy strategy, search in date prefixes
if s.config.PathStrategy == storage.DateHierarchyStrategy {
_, err := s.searchInDatePrefixes(ctx, key)
return err == nil, nil
}
return false, nil
}
// GetSize gets the size of a file in S3 storage
func (s *S3) GetSize(ctx context.Context, key string) (int64, error) {
objectKey := s.getObjectKey(key)
output, err := s.client.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
Bucket: aws.String(s.config.BucketName),
Key: aws.String(objectKey),
})
if err != nil {
if s.config.PathStrategy == storage.DateHierarchyStrategy {
return 0, fmt.Errorf("object not found: %s", key)
}
return 0, err
}
if output.ContentLength != nil {
return *output.ContentLength, nil
}
return 0, nil
}
// Type returns the storage type
func (s *S3) Type() string {
return "s3"
}
// HealthCheck performs a health check on S3 storage
func (s *S3) HealthCheck(ctx context.Context) error {
// Check if bucket is accessible
_, err := s.client.HeadBucketWithContext(ctx, &s3.HeadBucketInput{
Bucket: aws.String(s.config.BucketName),
})
if err != nil {
return fmt.Errorf("S3 health check failed: %w", err)
}
return nil
}
// GetPathStrategy returns the path strategy
func (s *S3) GetPathStrategy() storage.PathStrategy {
return s.config.PathStrategy
}
// GetRetentionConfig returns the retention configuration
func (s *S3) GetRetentionConfig() storage.StorageRetentionConfig {
return s.config.RetentionConfig
}
// getObjectKey resolves the object key for a given storage key
func (s *S3) getObjectKey(key string) string {
// Remove bucket prefix if present (path generator includes it)
if strings.HasPrefix(key, s.config.BucketName+"/") {
return strings.TrimPrefix(key, s.config.BucketName+"/")
}
return key
}
// searchInDatePrefixes searches for an object in date-based prefixes
func (s *S3) searchInDatePrefixes(ctx context.Context, key string) (io.ReadCloser, error) {
dir := filepath.Dir(key)
filename := filepath.Base(key)
// For flat keys like "sessionID.cast", treat as replays category
if dir == "." {
dir = "replays"
key = "replays/" + filename
}
logger.L().Debug("Searching in date prefixes",
zap.String("original_key", key),
zap.String("category", dir),
zap.String("filename", filename))
// List objects with category prefix to find date directories
output, err := s.client.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(s.config.BucketName),
Prefix: aws.String(dir + "/"),
Delimiter: aws.String("/"),
})
if err != nil {
return nil, fmt.Errorf("failed to list objects: %w", err)
}
// Collect date prefixes from common prefixes
var datePrefixes []string
for _, prefix := range output.CommonPrefixes {
if prefix.Prefix != nil {
prefixStr := *prefix.Prefix
// Extract date directory from prefix
parts := strings.Split(strings.TrimPrefix(prefixStr, dir+"/"), "/")
if len(parts) >= 1 {
dateStr := parts[0]
// Check if it looks like a date (YYYY-MM-DD)
if len(dateStr) == 10 && dateStr[4] == '-' && dateStr[7] == '-' {
datePrefixes = append(datePrefixes, prefixStr)
}
}
}
}
logger.L().Debug("Found date prefixes",
zap.String("key", key),
zap.Strings("prefixes", datePrefixes))
// Search in date prefixes (newest first by sorting in reverse)
for i := len(datePrefixes) - 1; i >= 0; i-- {
possibleKey := datePrefixes[i] + filename
logger.L().Debug("Trying date prefix",
zap.String("possible_key", possibleKey))
output, err := s.client.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(s.config.BucketName),
Key: aws.String(possibleKey),
})
if err == nil {
logger.L().Debug("Found object in date prefix",
zap.String("key", possibleKey))
return output.Body, nil
}
}
return nil, fmt.Errorf("object not found in any date prefix: %s (searched %d prefixes)", key, len(datePrefixes))
}
// ParseS3ConfigFromMap creates S3Config from string map (for database storage)
func ParseS3ConfigFromMap(configMap map[string]string) (S3Config, error) {
config := S3Config{}
config.Region = configMap["region"]
config.AccessKeyID = configMap["access_key_id"]
config.SecretAccessKey = configMap["secret_access_key"]
config.BucketName = configMap["bucket_name"]
config.Endpoint = configMap["endpoint"]
// Parse boolean fields
if useSSLStr, exists := configMap["use_ssl"]; exists {
config.UseSSL = useSSLStr == "true"
} else {
config.UseSSL = true // Default to SSL
}
// Parse path strategy
if strategyStr, exists := configMap["path_strategy"]; exists {
config.PathStrategy = storage.PathStrategy(strategyStr)
} else {
config.PathStrategy = storage.DateHierarchyStrategy
}
// Parse retention configuration
retentionConfig := storage.DefaultRetentionConfig()
if retentionDaysStr, exists := configMap["retention_days"]; exists {
if days, err := strconv.Atoi(retentionDaysStr); err == nil {
retentionConfig.RetentionDays = days
}
}
if archiveDaysStr, exists := configMap["archive_days"]; exists {
if days, err := strconv.Atoi(archiveDaysStr); err == nil {
retentionConfig.ArchiveDays = days
}
}
if cleanupStr, exists := configMap["cleanup_enabled"]; exists {
retentionConfig.CleanupEnabled = cleanupStr == "true"
}
if archiveStr, exists := configMap["archive_enabled"]; exists {
retentionConfig.ArchiveEnabled = archiveStr == "true"
}
config.RetentionConfig = retentionConfig
return config, nil
}