0%

You can click on the Blog Home to browser my blogs and find more interesting content. If there are any issues or suggestions, please feel free to reach out to me or simply submit a merge request on
GitHub Link


What’s zookeeper?

Zookeeper is a distributed system. In most cases, zookeeper acts as a coordinator, rather than a storage or message queue. For instance, let’s say we wanna inform other systems after we have done some stuffs, in this scerario, we can create a path as a signal on zabbix, and systems which want to know the events can subscribe the path event. It’s a powful coordinator providing data consistency services and most important as it is a distributed system, it also conform to the CAP rules.

What’s CAP?

Theoretically, the CAP theorem, also known as Brewer’s theorem, states that it is impossible for a distributed computing system to satisfy the following three points at the same time:

  • Consistence (all nodes can access the same latest copy of data)
  • Availability (every request can get a non-error response in a certain period of time - but the data obtained is not guaranteed to be the latest data)
  • Partition fault tolerance (Network partitioning) (System can always keep in consistency in a certain period of time. For instance, the network split into two regions, System can only choose to provide the service but with inconsistent data or stop providing service to keep the data consistent. That means If the node on one side of the partition is set to be unavailable in order to keep data consistency, the item A will fail. If two nodes can communicate with each other, both C and A can be guaranteed, but this leads to the loss of the Item C.)

How zookeeper keep data consistency?

Before we discuss of the data consistency, we must understand the principle of zookeeper’s election. Here is some specialist terms:

  • Leader, responsible for all the transaction requests, like data update, data add, and so forth.
  • Follower, responsible for the non-transaction requests, like data query.
  • ZXID, ZooKeeper Transaction Id, denotes the latest data version one node has.

Let’s assume we have three nodes, only if we elect the leader then the cluster can continue to work. So how they elect the leader?

  1. Choose a node with the latest data version(ZXID). When the election start, each node deem themself as the node with latest ZXID. They will sign their Identifier Card number (SID) on the vote and broadcast vote requests for themselves. Actually the reason why they gotta broadcast requests is every vode has their own vote ballot box.
  2. It’s Kinda different from the way we human vote. Each node of the zabbix cluster mantain a ballot box. As each node will send their vote to all the nodes cluster has, so all the nodes will hold the same votes eventually.
  3. They will try to find the one who holds the most votes will be the leader. Of cause there is a precondition, the number of votes leader hold must be more than half of the current cluster.

Election Analysis

Let’s jump int the code.

1
2
3
4
5
6
   public interface Election {

Vote lookForLeader() throws InterruptedException;
void shutdown();

}

This Election interface is very important, zookeeper implement the interface by developed a class called FastLeaderElection , let’s jump into the makeOffer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Messages that a peer wants to send to other peers.
* These messages can be both Notifications and Acks
* of reception of notification.
*/
public static class ToSend {...}

LinkedBlockingQueue<ToSend> sendqueue;
LinkedBlockingQueue<Notification> recvqueue;

/**
* Multi-threaded implementation of message handler. Messenger
* implements two sub-classes: WorkReceiver and WorkSender. The
* functionality of each is obvious from the name. Each of these
* spawns a new thread.
*/

protected class Messenger {...}

QuorumPeer self;
Messenger messenger;
AtomicLong logicalclock = new AtomicLong(); /* Election instance */
long proposedLeader;
long proposedZxid;
long proposedEpoch;

The most important thing is you can see that FastLeaderElection use the most typical Composite Pattern. It include QuorumPeer, which mainly involve all the core logics of ZK election. Let’s jump into QuorumPeer to investigate.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
loadDataBase();
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
}
startLeaderElection();
startJvmPauseMonitor();
super.start();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
public synchronized void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch (IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}

this.electionAlg = createElectionAlgorithm(electionType);
}
1

actually in createElectionAlgorithm, it composite a FastLeaderElection.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
case LOOKING:
if (getInitLastLoggedZxid() == -1) {
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) {
LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
break;
}
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
LOG.debug(
"Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
Long.toHexString(n.electionEpoch),
Long.toHexString(logicalclock.get()));
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}

LOG.debug(
"Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
n.sid,
n.leader,
Long.toHexString(n.zxid),
Long.toHexString(n.electionEpoch));

// don't care about the version if it's in LOOKING state
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));

if (voteSet.hasAllQuorums()) {

// Verify if there is any change in the proposed leader
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}

/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Check if a pair (server id, zxid) succeeds our
* current vote.
*
*/
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {

if (self.getQuorumVerifier().getWeight(newId) == 0) {
return false;
}

return ((newEpoch > curEpoch)
|| ((newEpoch == curEpoch)
&& ((newZxid > curZxid)
|| ((newZxid == curZxid)
&& (newId > curId)))));
}

this is right part we refered to in the former section.

  1. if new epoch is higher, update vote as the new one.
  2. if new epoch is the same as current epoch, but new zxid is higher, update vote as the new one.
  3. if new epoch is the same as current epoch, new zxid is the same, update vote as the new one.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if (voteSet.hasAllQuorums()) {

// Verify if there is any change in the proposed leader
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}

/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}

You can click on the Blog Home to browser my blogs and find more interesting
content. If there are any issues or suggestions, please feel free to reach out to me or simply submit a merge request
on GitHub Link


What’s DDD?

DDD(Domain Driven Design) is just a concept, an abstract instruction or direction to help us reduce the complexity of
our own application. But because it’s a concept, it has already confused so many developers. We need to implement the
concept by ourselves without any restrictions, which means you don’t have a tutorial sample to refer at the beginning.
Here we try to provide an implement an application architecture by using DDD concept, but before that, what’s
application Architecture?

What’s Application Architecture

As we mentioned above, DDD is just a concept, but how we start to develop our own business logics by using this concept?
It’s a quite open question which actually does not have a fixed question. But from my perspective, I list the
requirements of Application Architecture:

  1. Application Architecture should cater kinds of team members in different levels by reduce the communication cost,
    keep them in the same context, and help us to improve the code quality.
  2. isolate our business codes by any dependencies, for instances the storage logics, the out system api dependencies.
  3. every functionality should be independent and easy to test.

here I will provide my own practical experience for you. Hopefully it can benefit you and help you generate your own
specific DDD implementations.

Example

Scenario

Let’s say we need to design a system to support users to transfer money.

Development Requirement Analise

  • we use h2 as the storage tech stack.
  • we need to think about the exchange rate.
  • we have some policy to protect our user from scam.
  • we can transfer money from accountA to accountB

implement by using script like coding style

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class TransferController {

private TransferService transferService;

public Result<Boolean> transfer(String targetAccountNumber, BigDecimal amount, HttpSession session) {
Long userId = (Long) session.getAttribute("userId");
return transferService.transfer(userId, targetAccountNumber, amount, "CNY");
}
}

public class TransferServiceImpl implements TransferService {

private static final String TOPIC_AUDIT_LOG = "TOPIC_AUDIT_LOG";
private AccountMapper accountDAO;
private KafkaTemplate<String, String> kafkaTemplate;
private YahooForexService yahooForex;

@Override
public Result<Boolean> transfer(Long sourceUserId, String targetAccountNumber, BigDecimal targetAmount, String targetCurrency) {
// 1. 从数据库读取数据,忽略所有校验逻辑如账号是否存在等
AccountDO sourceAccountDO = accountDAO.selectByUserId(sourceUserId);
AccountDO targetAccountDO = accountDAO.selectByAccountNumber(targetAccountNumber);

// 2. 业务参数校验
if (!targetAccountDO.getCurrency().equals(targetCurrency)) {
throw new InvalidCurrencyException();
}

// 3. 获取外部数据,并且包含一定的业务逻辑
// exchange rate = 1 source currency = X target currency
BigDecimal exchangeRate = BigDecimal.ONE;
if (sourceAccountDO.getCurrency().equals(targetCurrency)) {
exchangeRate = yahooForex.getExchangeRate(sourceAccountDO.getCurrency(), targetCurrency);
}
BigDecimal sourceAmount = targetAmount.divide(exchangeRate, RoundingMode.DOWN);

// 4. 业务参数校验
if (sourceAccountDO.getAvailable().compareTo(sourceAmount) < 0) {
throw new InsufficientFundsException();
}

if (sourceAccountDO.getDailyLimit().compareTo(sourceAmount) < 0) {
throw new DailyLimitExceededException();
}

// 5. 计算新值,并且更新字段
BigDecimal newSource = sourceAccountDO.getAvailable().subtract(sourceAmount);
BigDecimal newTarget = targetAccountDO.getAvailable().add(targetAmount);
sourceAccountDO.setAvailable(newSource);
targetAccountDO.setAvailable(newTarget);

// 6. 更新到数据库
accountDAO.update(sourceAccountDO);
accountDAO.update(targetAccountDO);

// 7. 发送审计消息
String message = sourceUserId + "," + targetAccountNumber + "," + targetAmount + "," + targetCurrency;
kafkaTemplate.send(TOPIC_AUDIT_LOG, message);

return Result.success(true);
}

}

As we can see this epitomise the typical three layer development approach. But actually there are several big downsides
about this type of development.

  • Cost of the maintenance of the system is poor. Every time you’re going to change the verification logics or exchange
    rate logics or something, you have to update this part of code that involves the parameter verification, storage,
    compute, calling external service, and so on. For example if we wanna sharding the tables, it will be a disaster.
  • The scalability of this system is bad. If we wanna add a new transfer function which happens between different bank,
    none of the codes can be reused.
  • Very difficult to develop the unit test cases. In this assumption, one method contains the dependency of db, external
    system, and we need to mock each of them if we just wanna add a unit test case focused on our own business logics.

Why?

Why we encountered these issues? the reason behind it is the typical three layer architecture breaks several design
principles:

  • Single Responsibility Principle. Let’s say we need to do a sharding to one table, we need to update all the logics
    related to the dao, do in the service method. That’s a violation of Single Responsibility Principle.
  • Dependency Inversion Principle. You can see our service depend on the implements of db and external service. Once we
    wanna do some updates of the db or external service, we have to go through the business service code.
  • Open Closed Principle. If we wanna add some other logics, for instance bank need to charge for a transfer fee. we have
    to modify the service code rather than add code.

Solution

The main objective of our architecture is we can conform the Dependency Inversion Principle. All the database
implementations, all the api implementations, all the web api will depend on the kernel layer(domain). But how we
implement it in our case?

As we can see, we can start develop from domain layer, all the business logics will depend on the abstractions rather
than the specific impleme ntations. The most basic dependency is no longer the database, it’s Domain Layer. Domain Layer
doesn’t have any dependency on other external dependencies. For instance, we don’t need to think about what kind of
database we need to use, how to design the interaction ways with other external systems when we start to develop our
system. All these services will depend on the abstractions(ACL classes and Repository classes defined in domain layer).

From this perspective, web controller, rpc service, databases, external api provided by other systems are all at the
same level, all of them depend on the abstractions defined by domain layer.

Infrastructure Layer

First, Let’s have a look at the Infrastructure Layer. The implementation(AccountRepositoryImpl) will depend on the
AccountRepository defined in Domain Layer. So when we develop our domain business logics, there is no need for us to
think how we communicate with database. Relatively speaking when we development how to implement the
AccountRepositoryImpl, we also just need to focus on how to store the domain object(Account).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Component
@RequiredArgsConstructor
public class AccountRepositoryImpl implements AccountRepository {
private final InfrastructureConverter infrastructureConverter;
private final AccountDao accountDao;
private final BalanceDao balanceDao;
private final CustomerDao customerDao;
private final CurrencySystem currencySystem;

@Override
public Long store(Account account) {
AccountDO accountDO = infrastructureConverter.copy(account);
CustomerDO customerDO = customerDao.save(accountDO.getCustomerDO());
BalanceDO balanceDO = balanceDao.save(accountDO.getBalanceDO());
accountDO.setCustomerDO(customerDO);
accountDO.setBalanceDO(balanceDO);
return accountDao.save(accountDO).getAccountNumber();
}

@Override
public Account load(Long id) {
Optional<AccountDO> accountDOOptional = accountDao.findById(id);
if (accountDOOptional.isEmpty()) {
return null;
}
AccountDO accountDO = accountDOOptional.get();
Account account = infrastructureConverter.copy(accountDO);
Customer customer = infrastructureConverter.copy(accountDO.getCustomerDO());
Money money = infrastructureConverter.copy(accountDO.getBalanceDO());
account.setBalance(money);
account.setCustomer(customer);
account.setCurrencySystem(currencySystem);
return account;
}

Anti Corruption Layer

Besides, we also put ACL in this Infrastructure Layer, once again, the only thing we need to do is focusing on the
domain logics. So in Anti Corruption Layer, we need to convert the objects from external system to the objects defined
in our own system.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class MockCurrencySystem implements CurrencySystem {
@Override
public Money toDollar(Money money) {
Float convertedAmount;
switch (money.getCurrencyType()) {
case RMB: convertedAmount = money.getAmount() * 7.8F; break;
case USD: convertedAmount = money.getAmount(); break;
default: throw new RuntimeException("unrecognised currency");
}
return new Money(CurrencyType.USD, convertedAmount);
}
}

Domain Layer

In the domain layer, we abstract all the business logics into the domain object.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@AllArgsConstructor
public class Account {
@Getter
@Setter
private Long accountNumber;

@Getter
@Setter
private Customer customer;

// to simplify this scenario, Let's assume that all bank account money is denominated in USD
@Getter
@Setter
private Money balance;

@Setter
private CurrencySystem currencySystem;

@SneakyThrows
public void withDraw(Money money) {
Money withDrawMoney = currencySystem.toDollar(money);
if (balance.getAmount() >= withDrawMoney.getAmount()) {
balance.setAmount(balance.getAmount() - withDrawMoney.getAmount());
} else {
throw new Exception("no enough money");
}
}

public void deposit(Money money) {
Money depositMoney = currencySystem.toDollar(money);
balance.setAmount(balance.getAmount() + depositMoney.getAmount());
}
}

In domain service, we abstract all the business logics that not suitable to put in the entity/aggregate object. Here we
put all the Limit Policies into the domain service. And suppose someday we need to add more limitStrategy, there is no
need for us to update the code. All we need to do is adding code.

1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
@RequiredArgsConstructor
public class AccountDomainService {
private final List<LimitStrategy> limitStrategyList;

public void transferMoney(Account source, Account dest, Money money) {
for (LimitStrategy limitStrategy : limitStrategyList) {
limitStrategy.allowable(source, dest, money);
}
source.withDraw(money);
dest.deposit(money);
}
}

Application Layer

In the application layer, we can find that the ApplicationService will only coordinate the domain objects and domain
service to implement the business logics. And it’s so clean and easy for us to meet other requirements by
re-coordinating the domain objects.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Slf4j
@Service
@RequiredArgsConstructor
public class AccountApplicationService {
private final AccountDomainService accountDomainService;
private final AccountRepository accountRepository;
private final AccountFactory accountFactory;

@SneakyThrows
@Transactional
public void transferMoney(Long sourceAccountNumber, Long destAccountNumber, Money amount) {
Account source = accountRepository.load(sourceAccountNumber);
Account dest = accountRepository.load(destAccountNumber);
if (source == null || dest == null) {
log.error("error account number: source {}, dest: {}, amount {}",
sourceAccountNumber, destAccountNumber, amount);
throw new Exception("invalid account number");
}
accountDomainService.transferMoney(source, dest, amount);
accountRepository.store(source);
accountRepository.store(dest);
}

public void createAccount(AccountDTO accountDTO) {
Account account = accountFactory.newAccount(accountDTO);
accountRepository.store(account);
}
}

Unit Test

  • test cases in the Domain Layer, are very easy to develop. we just need to mock the external system results.
  • test cases in the Infrastructure Layer, are also easy for us to develop. we can use test database like h2, and we can
    also take advantage of JPA test framework to make our test case development easier.
  • test cases in Biz Layer, can focus on the coordinate work rather than every business approaches. Practically we just
    mock several cases to make sure that we can cover the application service.
  • test cases in Adaptor Layer. We can use spring mvc test tools to test the Adaptors.

Ideally, each point above we can reach out to 100% coverage.

System Structure

This is the screenshot of our system’s packages.



And actually we borrowed some good ideas from Clean Architecture, domain layer is located in the right middle of the
circal, Coordinate work, application service wraps the domain layer, and provide service to the Adapter Layer(
web/api/rpc). There is a special point, the infrastructure layer will depend on the domain directly.

Summary

Let’s say someday we need to add transaction fee when user transfer money to another account. So first thing come up to
our mind is we need to modify some code in our domain layer, maybe we need to add some entity, maybe we need to create
another dependency on other system, but the most important thing is, we start to think from the domain layer, and then
we think about how to implement the abstraction defined by domain layer, how to save the status of domain objects. The
way how we develop, is the very proof that why we call it “driven by the domain”.

You can click on the Blog Home to browser my blogs and find more interesting content. If there are any
issues or suggestions, please feel free to reach out to me or simply submit a merge request on
GitHub Link


What’s OAuth?

OAuth is a protocol that designed to protect the user who wanna share their data to third part application or systems.
OAuth 2.0 is subsequent version of OAuth 1.0, mostly we use OAuth 2.0 nowadays.

Sample

Let’s say when we’re going to log in YOUTUBE by using the account of GOOGLE

Terms

In this case some here are the professional introduction of terms we may use.

  • Resource Server: In this case, YOUTUBE is going to get the username, profile, avatar from GOOGLE, so we call GOOGLE
    service which stored this kind of user information as Resource Server
  • Resource Owner: Denotes the user here.
  • Client: Denotes YOUTUBE here.
  • Authorization Server: In this case, it should be the server of GOOGLE also.(we just differentiate Authorization Server
    with Resource Service logically, they can be provided by a same server)
  • Useragent: The browser you use when you’re browsing the YOUTUBE videos.

Base on authorization code mode

if your application is based on frontend and backend separation architecture, then every request you send should involve
the token.

For more details
see Basic writing and formatting syntax
.

AUTH MODE

We have 5 kinds of authorization modes.

  1. Authorization code
  2. Implicit
  3. Resource Owner Password Credentials
  4. Client Credentials

Authorization Code Mode is the most functional, secure and commonly used mode in our projects. So in our next blog we
prepared Resource Service, Auth Service, and Client in which we use Spring Security to implement the OAuth 2.0 protocol
and help us understand all the details related the OAuth 2.0.

How to implement OAuth by Using Spring Security Framework

Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.

Quick Start

Create a new post

1
$ hexo new "My New Post"

More info: Writing

Run server

1
$ hexo server

More info: Server

Generate static files

1
$ hexo generate

More info: Generating

Deploy to remote sites

1
$ hexo deploy

More info: Deployment