refactor: 重构docker调用
This commit is contained in:
parent
2834f73d06
commit
907f41234d
@ -3,7 +3,6 @@ package exec
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
@ -18,20 +17,20 @@ var DockerContainer *dockerContainer
|
||||
|
||||
func init() {
|
||||
// Initialize DockerContainer with memory, CPU, and disk limits
|
||||
DockerContainer, _ = NewDockerContainer(int64(2*1024*1024*1024), int64(1000000000), "10G")
|
||||
DockerContainer, _ = NewDockerContainer(int64(2*1024*1024*1024), int64(1000000000))
|
||||
DockerContainer.CreateAndStartContainer()
|
||||
}
|
||||
|
||||
type dockerContainer struct {
|
||||
Client *client.Client
|
||||
Ctx context.Context
|
||||
ContainerID string
|
||||
Memory int64 // Memory in bytes
|
||||
CPU int64 // CPU in NanoCPUs
|
||||
Disk string // Disk size (string as required by Docker)
|
||||
Memory int64 // Memory in bytes
|
||||
CPU int64 // CPU in NanoCPUs
|
||||
}
|
||||
|
||||
// NewDockerContainer initializes Docker client, context, and resource limits
|
||||
func NewDockerContainer(memory int64, cpu int64, disk string) (*dockerContainer, error) {
|
||||
func NewDockerContainer(memory int64, cpu int64) (*dockerContainer, error) {
|
||||
ctx := context.Background()
|
||||
cli, err := client.NewClientWithOpts(client.FromEnv)
|
||||
if err != nil {
|
||||
@ -44,22 +43,22 @@ func NewDockerContainer(memory int64, cpu int64, disk string) (*dockerContainer,
|
||||
Ctx: ctx,
|
||||
Memory: memory,
|
||||
CPU: cpu,
|
||||
Disk: disk,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CreateAndExecuteCommand creates a container, sets resource limits, and executes a command inside it
|
||||
func (dc *dockerContainer) CreateAndExecuteCommand(command string) (string, error) {
|
||||
// 如果容器已经存在,则直接执行命令
|
||||
// CreateAndStartContainer creates a container, sets resource limits, and starts it
|
||||
// The container runs `tail -f /dev/null` to keep running, waiting for new commands
|
||||
func (dc *dockerContainer) CreateAndStartContainer() error {
|
||||
// 如果容器已经存在,则跳过创建
|
||||
if dc.ContainerID != "" {
|
||||
fmt.Println("Container already exists, using existing container.")
|
||||
return dc.ExecCommandInContainer(command)
|
||||
fmt.Println("Container already exists, skipping creation.")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Pull debian image if not available locally
|
||||
_, err := dc.Client.ImagePull(dc.Ctx, "debian", image.PullOptions{})
|
||||
if err != nil {
|
||||
return "", err
|
||||
return err
|
||||
}
|
||||
|
||||
// Define resource limits based on DockerContainer struct fields
|
||||
@ -68,26 +67,23 @@ func (dc *dockerContainer) CreateAndExecuteCommand(command string) (string, erro
|
||||
Memory: dc.Memory, // Set memory limit
|
||||
NanoCPUs: dc.CPU, // Set CPU limit
|
||||
},
|
||||
StorageOpt: map[string]string{
|
||||
"size": dc.Disk, // Set disk limit
|
||||
},
|
||||
}
|
||||
|
||||
// Use the custom network to isolate the container
|
||||
networkConfig := &network.NetworkingConfig{
|
||||
EndpointsConfig: map[string]*network.EndpointSettings{
|
||||
"my_custom_network": {}, // Attach container to the custom network
|
||||
"qq_bot_net": {}, // Attach container to the custom network
|
||||
},
|
||||
}
|
||||
|
||||
// Create a container with debian image, resource limits, and network settings
|
||||
// Create a container that runs `tail -f /dev/null` to keep it alive
|
||||
resp, err := dc.Client.ContainerCreate(dc.Ctx, &container.Config{
|
||||
Image: "debian",
|
||||
Cmd: []string{"sh", "-c", command}, // Set the command to execute
|
||||
Cmd: []string{"tail", "-f", "/dev/null"}, // Keep container running
|
||||
Tty: true,
|
||||
}, hostConfig, networkConfig, nil, "")
|
||||
if err != nil {
|
||||
return "", err
|
||||
return err
|
||||
}
|
||||
|
||||
// Store container ID in the struct
|
||||
@ -95,43 +91,38 @@ func (dc *dockerContainer) CreateAndExecuteCommand(command string) (string, erro
|
||||
|
||||
// Start the container
|
||||
if err := dc.Client.ContainerStart(dc.Ctx, resp.ID, container.StartOptions{}); err != nil {
|
||||
return "", err
|
||||
return err
|
||||
}
|
||||
|
||||
// Attach to the container logs
|
||||
out, err := dc.Client.ContainerLogs(dc.Ctx, resp.ID, container.LogsOptions{ShowStdout: true, ShowStderr: true, Follow: true})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer out.Close()
|
||||
|
||||
// Capture the output
|
||||
var outputBuffer bytes.Buffer
|
||||
_, err = io.Copy(&outputBuffer, out)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return outputBuffer.String(), nil
|
||||
fmt.Println("Container created and started, waiting for commands...")
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExecCommandInContainer executes a command in the existing container and returns its output
|
||||
func (dc *dockerContainer) ExecCommandInContainer(command string) (string, error) {
|
||||
if dc.ContainerID == "" {
|
||||
return "", fmt.Errorf("no container found, please create and start the container first")
|
||||
}
|
||||
|
||||
execConfig := container.ExecOptions{
|
||||
Cmd: []string{"sh", "-c", command},
|
||||
AttachStdout: true,
|
||||
AttachStderr: true,
|
||||
Tty: true,
|
||||
}
|
||||
var cancel context.CancelFunc
|
||||
timeout := 10 * time.Second
|
||||
ctxWithTimeout, cancel := context.WithTimeout(dc.Ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
// 创建一个 exec 实例
|
||||
execIDResp, err := dc.Client.ContainerExecCreate(dc.Ctx, dc.ContainerID, execConfig)
|
||||
execIDResp, err := dc.Client.ContainerExecCreate(ctxWithTimeout, dc.ContainerID, execConfig)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// 开始执行命令
|
||||
resp, err := dc.Client.ContainerExecAttach(dc.Ctx, execIDResp.ID, container.ExecStartOptions{
|
||||
resp, err := dc.Client.ContainerExecAttach(ctxWithTimeout, execIDResp.ID, container.ExecStartOptions{
|
||||
Tty: true,
|
||||
})
|
||||
if err != nil {
|
||||
@ -149,52 +140,6 @@ func (dc *dockerContainer) ExecCommandInContainer(command string) (string, error
|
||||
return outputBuffer.String(), nil
|
||||
}
|
||||
|
||||
// MonitorCPUUsage monitors the CPU usage of the container and restarts it if usage exceeds 90% for 30 seconds
|
||||
func (dc *dockerContainer) MonitorCPUUsage() error {
|
||||
cpuOverLimitDuration := time.Duration(30) * time.Second
|
||||
cpuUsageThreshold := 90.0
|
||||
|
||||
var cpuOverLimitStart time.Time
|
||||
for {
|
||||
// Get the container stats
|
||||
stats, err := dc.Client.ContainerStats(dc.Ctx, dc.ContainerID, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Parse the stats and calculate CPU usage
|
||||
var usage container.StatsResponse
|
||||
err = json.NewDecoder(stats.Body).Decode(&usage)
|
||||
if err != nil {
|
||||
stats.Body.Close()
|
||||
return err
|
||||
}
|
||||
stats.Body.Close()
|
||||
|
||||
// Calculate CPU usage percentage
|
||||
cpuDelta := float64(usage.CPUStats.CPUUsage.TotalUsage - usage.PreCPUStats.CPUUsage.TotalUsage)
|
||||
systemDelta := float64(usage.CPUStats.SystemUsage - usage.PreCPUStats.SystemUsage)
|
||||
cpuUsagePercent := (cpuDelta / systemDelta) * float64(len(usage.CPUStats.CPUUsage.PercpuUsage)) * 100.0
|
||||
|
||||
if cpuUsagePercent > cpuUsageThreshold {
|
||||
if cpuOverLimitStart.IsZero() {
|
||||
cpuOverLimitStart = time.Now()
|
||||
} else if time.Since(cpuOverLimitStart) > cpuOverLimitDuration {
|
||||
// Restart the container if CPU usage exceeds the threshold for the specified duration
|
||||
fmt.Println("CPU usage exceeded limit for 30 seconds, restarting container...")
|
||||
if err := dc.RestartAndCleanContainer(); err != nil {
|
||||
return err
|
||||
}
|
||||
cpuOverLimitStart = time.Time{} // Reset timer
|
||||
}
|
||||
} else {
|
||||
cpuOverLimitStart = time.Time{} // Reset timer if CPU usage is below threshold
|
||||
}
|
||||
|
||||
time.Sleep(5 * time.Second) // Poll CPU usage every 5 seconds
|
||||
}
|
||||
}
|
||||
|
||||
// RestartAndCleanContainer stops, removes, and recreates the container
|
||||
func (dc *dockerContainer) RestartAndCleanContainer() error {
|
||||
if dc.ContainerID == "" {
|
||||
@ -216,16 +161,13 @@ func (dc *dockerContainer) RestartAndCleanContainer() error {
|
||||
// Recreate the container with the same resource limits
|
||||
resp, err := dc.Client.ContainerCreate(dc.Ctx, &container.Config{
|
||||
Image: "debian",
|
||||
Cmd: []string{"sh", "-c", "echo 'Container restarted'"},
|
||||
Cmd: []string{"tail", "-f", "/dev/null"},
|
||||
Tty: true,
|
||||
}, &container.HostConfig{
|
||||
Resources: container.Resources{
|
||||
Memory: dc.Memory, // Reuse memory limit
|
||||
NanoCPUs: dc.CPU, // Reuse CPU limit
|
||||
},
|
||||
StorageOpt: map[string]string{
|
||||
"size": dc.Disk, // Reuse disk limit
|
||||
},
|
||||
}, nil, nil, "")
|
||||
if err != nil {
|
||||
return err
|
||||
@ -242,3 +184,25 @@ func (dc *dockerContainer) RestartAndCleanContainer() error {
|
||||
fmt.Println("Container restarted and cleaned.")
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveContainer stops and removes the container
|
||||
func (dc *dockerContainer) RemoveContainer() error {
|
||||
if dc.ContainerID == "" {
|
||||
return fmt.Errorf("no container to remove")
|
||||
}
|
||||
|
||||
// Stop the container
|
||||
timeout := 10 * time.Second
|
||||
timeoutSecs := int(timeout.Seconds())
|
||||
if err := dc.Client.ContainerStop(dc.Ctx, dc.ContainerID, container.StopOptions{Timeout: &timeoutSecs}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove the container
|
||||
if err := dc.Client.ContainerRemove(dc.Ctx, dc.ContainerID, container.RemoveOptions{Force: true}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Println("Container removed.")
|
||||
return nil
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user