qq_bot/service/exec/exec.go
2024-10-21 20:33:41 +08:00

245 lines
6.8 KiB
Go

package exec
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"time"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/client"
)
var DockerContainer *dockerContainer
func init() {
// Initialize DockerContainer with memory, CPU, and disk limits
DockerContainer, _ = NewDockerContainer(int64(2*1024*1024*1024), int64(1000000000), "10G")
}
type dockerContainer struct {
Client *client.Client
Ctx context.Context
ContainerID string
Memory int64 // Memory in bytes
CPU int64 // CPU in NanoCPUs
Disk string // Disk size (string as required by Docker)
}
// NewDockerContainer initializes Docker client, context, and resource limits
func NewDockerContainer(memory int64, cpu int64, disk string) (*dockerContainer, error) {
ctx := context.Background()
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return nil, err
}
cli.NegotiateAPIVersion(ctx)
return &dockerContainer{
Client: cli,
Ctx: ctx,
Memory: memory,
CPU: cpu,
Disk: disk,
}, nil
}
// CreateAndExecuteCommand creates a container, sets resource limits, and executes a command inside it
func (dc *dockerContainer) CreateAndExecuteCommand(command string) (string, error) {
// 如果容器已经存在,则直接执行命令
if dc.ContainerID != "" {
fmt.Println("Container already exists, using existing container.")
return dc.ExecCommandInContainer(command)
}
// Pull debian image if not available locally
_, err := dc.Client.ImagePull(dc.Ctx, "debian", image.PullOptions{})
if err != nil {
return "", err
}
// Define resource limits based on DockerContainer struct fields
hostConfig := &container.HostConfig{
Resources: container.Resources{
Memory: dc.Memory, // Set memory limit
NanoCPUs: dc.CPU, // Set CPU limit
},
StorageOpt: map[string]string{
"size": dc.Disk, // Set disk limit
},
}
// Use the custom network to isolate the container
networkConfig := &network.NetworkingConfig{
EndpointsConfig: map[string]*network.EndpointSettings{
"my_custom_network": {}, // Attach container to the custom network
},
}
// Create a container with debian image, resource limits, and network settings
resp, err := dc.Client.ContainerCreate(dc.Ctx, &container.Config{
Image: "debian",
Cmd: []string{"sh", "-c", command}, // Set the command to execute
Tty: true,
}, hostConfig, networkConfig, nil, "")
if err != nil {
return "", err
}
// Store container ID in the struct
dc.ContainerID = resp.ID
// Start the container
if err := dc.Client.ContainerStart(dc.Ctx, resp.ID, container.StartOptions{}); err != nil {
return "", err
}
// Attach to the container logs
out, err := dc.Client.ContainerLogs(dc.Ctx, resp.ID, container.LogsOptions{ShowStdout: true, ShowStderr: true, Follow: true})
if err != nil {
return "", err
}
defer out.Close()
// Capture the output
var outputBuffer bytes.Buffer
_, err = io.Copy(&outputBuffer, out)
if err != nil {
return "", err
}
return outputBuffer.String(), nil
}
// ExecCommandInContainer executes a command in the existing container and returns its output
func (dc *dockerContainer) ExecCommandInContainer(command string) (string, error) {
execConfig := container.ExecOptions{
Cmd: []string{"sh", "-c", command},
AttachStdout: true,
AttachStderr: true,
Tty: true,
}
// 创建一个 exec 实例
execIDResp, err := dc.Client.ContainerExecCreate(dc.Ctx, dc.ContainerID, execConfig)
if err != nil {
return "", err
}
// 开始执行命令
resp, err := dc.Client.ContainerExecAttach(dc.Ctx, execIDResp.ID, container.ExecStartOptions{
Tty: true,
})
if err != nil {
return "", err
}
defer resp.Close()
// Capture the output
var outputBuffer bytes.Buffer
_, err = io.Copy(&outputBuffer, resp.Reader)
if err != nil {
return "", err
}
return outputBuffer.String(), nil
}
// MonitorCPUUsage monitors the CPU usage of the container and restarts it if usage exceeds 90% for 30 seconds
func (dc *dockerContainer) MonitorCPUUsage() error {
cpuOverLimitDuration := time.Duration(30) * time.Second
cpuUsageThreshold := 90.0
var cpuOverLimitStart time.Time
for {
// Get the container stats
stats, err := dc.Client.ContainerStats(dc.Ctx, dc.ContainerID, false)
if err != nil {
return err
}
// Parse the stats and calculate CPU usage
var usage container.StatsResponse
err = json.NewDecoder(stats.Body).Decode(&usage)
if err != nil {
stats.Body.Close()
return err
}
stats.Body.Close()
// Calculate CPU usage percentage
cpuDelta := float64(usage.CPUStats.CPUUsage.TotalUsage - usage.PreCPUStats.CPUUsage.TotalUsage)
systemDelta := float64(usage.CPUStats.SystemUsage - usage.PreCPUStats.SystemUsage)
cpuUsagePercent := (cpuDelta / systemDelta) * float64(len(usage.CPUStats.CPUUsage.PercpuUsage)) * 100.0
if cpuUsagePercent > cpuUsageThreshold {
if cpuOverLimitStart.IsZero() {
cpuOverLimitStart = time.Now()
} else if time.Since(cpuOverLimitStart) > cpuOverLimitDuration {
// Restart the container if CPU usage exceeds the threshold for the specified duration
fmt.Println("CPU usage exceeded limit for 30 seconds, restarting container...")
if err := dc.RestartAndCleanContainer(); err != nil {
return err
}
cpuOverLimitStart = time.Time{} // Reset timer
}
} else {
cpuOverLimitStart = time.Time{} // Reset timer if CPU usage is below threshold
}
time.Sleep(5 * time.Second) // Poll CPU usage every 5 seconds
}
}
// RestartAndCleanContainer stops, removes, and recreates the container
func (dc *dockerContainer) RestartAndCleanContainer() error {
if dc.ContainerID == "" {
return fmt.Errorf("no container to restart")
}
// Stop the container
timeout := 10 * time.Second
timeoutSecs := int(timeout.Seconds())
if err := dc.Client.ContainerStop(dc.Ctx, dc.ContainerID, container.StopOptions{Timeout: &timeoutSecs}); err != nil {
return err
}
// Remove the container to clean it
if err := dc.Client.ContainerRemove(dc.Ctx, dc.ContainerID, container.RemoveOptions{Force: true}); err != nil {
return err
}
// Recreate the container with the same resource limits
resp, err := dc.Client.ContainerCreate(dc.Ctx, &container.Config{
Image: "debian",
Cmd: []string{"sh", "-c", "echo 'Container restarted'"},
Tty: true,
}, &container.HostConfig{
Resources: container.Resources{
Memory: dc.Memory, // Reuse memory limit
NanoCPUs: dc.CPU, // Reuse CPU limit
},
StorageOpt: map[string]string{
"size": dc.Disk, // Reuse disk limit
},
}, nil, nil, "")
if err != nil {
return err
}
// Store new container ID
dc.ContainerID = resp.ID
// Start the new container
if err := dc.Client.ContainerStart(dc.Ctx, resp.ID, container.StartOptions{}); err != nil {
return err
}
fmt.Println("Container restarted and cleaned.")
return nil
}