重试框架使用
应用场景
- 业务处理过程中的非核心操作执行失败后,不影响主流程执行,但后台需要尽最大可能重试失败操作直到成功
- 分布式事务实现中,可以将主业务的事务消息作为一个重试任务,由重试框架保证该消息发送到从业务。
创建重试任务
重试任务是在业务逻辑中创建的,下面是个示例:
RetryTask task = new RetryTask();
// 重试任务类型。
task.setTaskType(RetryTaskType.CREDIT_LIMIT_RECOVERY.name());
task.setTaskId(repaymentRecord.getId());
// 指定回退策略。下面两行的意思是:前一次重试失败后,隔10秒钟发起下一次。
task.setBackoffPolicy(RetryTaskBackOffPolicy.fixed.name());
task.setFixedBackoffPeriod(10L);
//retryPolicy目前只支持这一种。
task.setRetryPolicy(RetryTaskPolicy.simple.name());
//保存重试执行需要的上下文数据。
task.setData(new Gson().toJson(recoveryRequestDTO));
//这行的意思是:重试任务最多执行一天。 还有另外一种方式: task.setMaxAttempts(100), 最多重试100次。
task.setTaskDeadline(DateUtils.addDays(new Date(), 1));
// 向数据库中,插入重试任务
retryTaskMapper.insertSelective(task);
调度器执行重试任务
调度器从数据库中扫描要重试的任务,根据taskType调用相应的handler来执行。
- 创建调度器。不需要实现任何方法,只需要在类上加个@Service,保证父类中的cron被触发。
package com.cana.repayment.scheduler.retry;
import org.springframework.stereotype.Service;
import com.travelzen.framework.retry.scheduler.RetryTaskScheduler;
@Service
public class RepaymentRetryTaskScheduler extends RetryTaskScheduler{
}
- 创建RetryTaskHandlerFactory的实现类。
package com.cana.repayment.scheduler.retry;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import com.travelzen.framework.retry.dict.RetryTaskType;
import com.travelzen.framework.retry.handler.IRetryTaskHandler;
import com.travelzen.framework.retry.handler.IRetryTaskHandlerFactory;
@Component
public class RetryTaskHandlerFactory implements IRetryTaskHandlerFactory {
private static Map<RetryTaskType, IRetryTaskHandler> cache = new HashMap<>();
@Override
public IRetryTaskHandler getHandler(String taskType) {
taskType = StringUtils.trimToEmpty(taskType);
if (StringUtils.isBlank(taskType))
return null;
RetryTaskType type = RetryTaskType.valueOf(taskType);
IRetryTaskHandler handler = cache.get(type);
if (handler != null)
return handler;
switch (type) {
case CREDIT_LIMIT_RECOVERY:
handler = new CreditLimitRecoveryRetryTaskHandler();
cache.put(type, handler);
break;
case REPAYMENT_SUCCESS_NOTIFY:
handler = new RepaymentSuccessNotifyRetryTaskHandler();
cache.put(type, handler);
break;
case ADJUST_SUCCESS_NOTIFY:
handler = new AdjustSuccessNotifyRetryTaskHandler();
cache.put(type, handler);
break;
default:
break;
}
return handler;
}
@Override
public List<RetryTaskType> canHandleTaskTypes() {
return Arrays.asList(RetryTaskType.CREDIT_LIMIT_RECOVERY,
RetryTaskType.REPAYMENT_SUCCESS_NOTIFY,
RetryTaskType.ADJUST_SUCCESS_NOTIFY);
}
}
- 实现具体的handler
为每种taskType创建一个IRetryTaskHandler接口的实现类。
package com.cana.repayment.scheduler.retry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cana.credit.api.ICreditApi;
import com.cana.vbam.common.credit.dto.limit.CreditLimitRecoveryRequstDTO;
import com.google.gson.Gson;
import com.travelzen.framework.retry.dao.po.RetryTask;
import com.travelzen.framework.retry.handler.HandlerStatus;
import com.travelzen.framework.retry.handler.IRetryTaskHandler;
import com.travelzen.framework.spring.web.context.SpringApplicationContext;
public class CreditLimitRecoveryRetryTaskHandler implements IRetryTaskHandler {
private Logger logger = LoggerFactory.getLogger(getClass());
private ICreditApi creditApi = SpringApplicationContext.getApplicationContext().getBean(ICreditApi.class);
@Override
public void execute(RetryTask task, HandlerStatus status) throws Exception {
logger.info("发送授信额度恢复请求:" + task.getData());
creditApi.recoveryLimit(new Gson().fromJson(task.getData(), CreditLimitRecoveryRequstDTO.class));
logger.info("授信额度恢复成功");
}
}