1. 爬取逻辑流程
2. 准备工作 2.1 分析网页
2.2 编写结构体 根据上面的网页分析,可以根据div[class='txt_cont']
选中table
,然后根据第n
行第x
列,来编写对应的结构体,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 type BasisCrawl struct { Code string `selector:"tr:nth-child(2) > td:nth-of-type(1)"` FullName string `selector:"tr:nth-child(1) > td:nth-of-type(1)"` ShortName string `selector:"tr:nth-child(1) > td:nth-of-type(2)"` Type string `selector:"tr:nth-child(2) > td:nth-of-type(2)"` ReleaseDate string `selector:"tr:nth-child(3) > td:nth-of-type(1)"` EstablishDate string `selector:"tr:nth-child(3) > td:nth-of-type(2)"` EstablishShares string `selector:"tr:nth-child(3) > td:nth-of-type(2)"` Company string `selector:"tr:nth-child(5) > td:nth-of-type(1)"` Manager string `selector:"tr:nth-child(6) > td:nth-of-type(1)"` ManagerDesc string `selector:"tr:nth-child(6) > td:nth-of-type(1) > a[href]" attr:"href"` ManageFeeRate string `selector:"tr:nth-child(7) > td:nth-of-type(1)"` CustodyFeeRate string `selector:"tr:nth-child(7) > td:nth-of-type(2)"` SaleFeeRate string `selector:"tr:nth-child(8) > td:nth-of-type(1)"` Benchmark string `selector:"tr:nth-child(10) > td:nth-of-type(1)"` }
3. 请求流程预览
4. 代码实现 4.1 批量抓取入口函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func BatchBasicCrawl () { basicFundList := dao.FilterBasicFund() total := len (basicFundList) if total > 0 { var baseRowsChannel = make (chan entity.FundBasis, total) crawlByGroup(basicFundList, baseRowsChannel) var fundBasisRows []entity.FundBasis for item := range baseRowsChannel { fundBasisRows = append (fundBasisRows, item) } if fundBasisRows != nil { create := global.GvaMysqlClient.Create(fundBasisRows) if create.Error != nil { global.GvaLogger.Sugar().Errorf("基金详情入库失败" , create.Error) return } global.GvaLogger.Sugar().Infof("基金详情抓取成功,共: %v 条" , create.RowsAffected) } } }
4.2 过滤有详情code(dao.FilterBasicFund
) 1 2 3 4 5 6 func FilterBasicFund () []FilterBasicResult { res := []FilterBasicResult{} global.GvaMysqlClient.Raw("SELECT A.fund_code,B.`code` from fas_fund_day_top as A LEFT JOIN fas_fund_basis as B on A.fund_code = B.`code` WHERE B.`code` is NULL GROUP BY A.fund_code" ).Scan(&res) return res }
4.3 分组抓取函数(crawlByGroup
) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func crawlByGroup (basicResults []dao.FilterBasicResult, c chan <- entity.FundBasis) { groupNum := 15 fundCodeGroup := splitFundBasicList(basicResults, groupNum) var wg sync.WaitGroup wg.Add(groupNum) for _, results := range fundCodeGroup { basicFundList := results go func () { for _, item := range basicFundList { filterBasicResult := item f := BasisCrawl{} f.CrawlHtml(filterBasicResult.FundCode) if f.Code != "" { toEntity := f.ConvertToEntity() c <- toEntity } } wg.Done() }() } wg.Wait() close (c) }
4.4 根据Code
爬取详情(CrawlHtml
) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func (f *BasisCrawl) CrawlHtml(fundCode string ) { collector := colly.NewCollector(colly.UserAgent(crawl.UserAgent), colly.Async(true )) collector.OnError(func (response *colly.Response, err error ) { global.GvaLogger.Sugar().Errorf("基金%s,信息获取失败: %s" , fundCode, err) return }) collector.OnHTML("div[class='txt_cont']" , func (element *colly.HTMLElement) { err := element.Unmarshal(f) if err != nil { fmt.Println("element.Unmarshal error: " , err) } }) err := collector.Limit(&colly.LimitRule{ DomainGlob: "*fundf10.eastmoney.*" , Delay: 500 * time.Millisecond, RandomDelay: 500 * time.Millisecond, Parallelism: 20 , }) if err != nil { global.GvaLogger.Sugar().Errorf("设置限速失败: %s" , err) return } err = collector.Visit(fmt.Sprintf("https://fundf10.eastmoney.com/jbgk_%s.html" , fundCode)) if err != nil { global.GvaLogger.Sugar().Errorf("基金%s,信息请求失败: %s" , fundCode, err) } collector.Wait() }
4.5 数据清洗(ConvertToEntity
) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 func (f *BasisCrawl) ConvertToEntity() entity.FundBasis { var fundBaseEntity entity.FundBasis if strings.Contains(f.Code, "、" ) { f.Code = strings.Split(f.Code, "、" )[0 ] } fundBaseEntity.Code = utils.ExtractNumberFromString(f.Code) fundBaseEntity.FullName = f.FullName fundBaseEntity.ShortName = f.ShortName typeInfo := strings.Split(f.Type, "-" ) fundBaseEntity.MainType = typeInfo[0 ] fundBaseEntity.SubType = typeInfo[1 ] fundBaseEntity.Company = f.Company fundBaseEntity.Manager = f.Manager fundBaseEntity.ManagerDesc = strings.ReplaceAll(f.ManagerDesc,"//" ,"" ) fundBaseEntity.Benchmark = f.Benchmark fundBaseEntity.ReleaseDate = replaceDateChinese(f.ReleaseDate) fundBaseEntity.EstablishDate = strings.TrimSpace(replaceDateChinese(strings.Split(f.EstablishDate, "/" )[0 ])) establishShares := utils.ExtractNumberFromString(replaceDateChinese(strings.Split(f.EstablishShares, "/" )[1 ])) fundBaseEntity.EstablishShares, _ = strconv.ParseFloat(establishShares, 64 ) manageFeeRate := utils.ExtractNumberFromString(f.ManageFeeRate) fundBaseEntity.ManageFeeRate, _ = strconv.ParseFloat(manageFeeRate, 64 ) fundBaseEntity.CustodyFeeRate, _ = strconv.ParseFloat(utils.ExtractNumberFromString(f.CustodyFeeRate), 64 ) fundBaseEntity.SaleFeeRate, _ = strconv.ParseFloat(utils.ExtractNumberFromString(f.SaleFeeRate), 64 ) return fundBaseEntity }
5. 注册定时任务 5.1 实现Job 1 2 3 4 5 6 7 8 9 10 11 type FundBasicCron struct { Code string } func (c FundBasicCron) Run() { begin := time.Now().UnixMilli() fmt.Println("基金详情-定时任务开始运行" ) fund.BatchBasicCrawl() fmt.Printf("基金详情-定时任务运行完成,耗时:%vms\n" ,time.Now().UnixMilli() - begin) }
5.2 设置启动频率 1 2 3 4 5 6 func addJob (c *cron.Cron) { ... _, _ = c.AddJob("0 30 22 */1 * *" , crontab.FundBasicCron{}) }
6. 运行效果