Commit e7942610 by O'Reilly Media, Inc.

Initial commit

parents
File added
## Example files for the title:
# Programming Actors with Akka, by Richard Warburton
[![Programming Actors with Akka, by Richard Warburton](http://akamaicovers.oreilly.com/images/9781491990230/cat.gif)](https://www.safaribooksonline.com/library/view/title/9781491990247//)
The following applies to example files from material published by O’Reilly Media, Inc. Content from other publishers may include different rules of usage. Please refer to any additional usage rights explained in the actual example files or refer to the publisher’s website.
O'Reilly books are here to help you get your job done. In general, you may use the code in O'Reilly books in your programs and documentation. You do not need to contact us for permission unless you're reproducing a significant portion of the code. For example, writing a program that uses several chunks of code from our books does not require permission. Answering a question by citing our books and quoting example code does not require permission. On the other hand, selling or distributing a CD-ROM of examples from O'Reilly books does require permission. Incorporating a significant amount of example code from our books into your product's documentation does require permission.
We appreciate, but do not require, attribution. An attribution usually includes the title, author, publisher, and ISBN.
If you think your use of code examples falls outside fair use or the permission given here, feel free to contact us at <permissions@oreilly.com>.
Please note that the examples are not production code and have not been carefully testing. They are provided "as-is" and come with no warranty of any kind.
#!/bin/sh
set -eu
ab -c 10 -n $1 -k http://localhost:8080/account?customerId=bob
#!/bin/sh
set -eu
ab -c 10 -n $1 -k http://localhost:8080/mortgage?customerId=bob
// make sure to have $ mvn clean package
1. start services: ./run_all.sh sync
2. show API: curl http://localhost:8080/mortgage?customerId=bob
3. Warmup: ./bench-mortgage.sh 10000 and ./bench-account.sh 10000
4. Load visualvm and slow both services via mxbeans
5. Show slowdown: ./bench-mortgage.sh 100
6. Remove slowdown from balance service
7. Show threadpool exhaustion: ./bench-mortgage.sh 10000 and ./bench-account.sh 100
logo.png

17.4 KB

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.iteratrlearning</groupId>
<artifactId>asynchrony</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<jetty.version>9.2.13.v20150730</jetty.version>
<jackson.version>2.8.2</jackson.version>
<rxjava.version>2.0.1</rxjava.version>
</properties>
<dependencies>
<!-- Common -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.2.15</version>
</dependency>
<!-- Asynchronous vs Synchronous -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>fluent-hc</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
<version>2.1.0-alpha1</version>
</dependency>
<!-- CF -->
<!-- https://mvnrepository.com/artifact/io.javaslang/javaslang -->
<dependency>
<groupId>io.javaslang</groupId>
<artifactId>javaslang</artifactId>
<version>2.0.0</version>
</dependency>
<!-- Akka -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.4.11</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-core_2.11</artifactId>
<version>2.4.11</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-testkit_2.11</artifactId>
<version>2.4.11</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.11</artifactId>
<version>2.4.11</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-typed-experimental_2.11</artifactId>
<version>2.4.11</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-experimental_2.11</artifactId>
<version>2.4.11</version>
</dependency>
<!-- Reactive Streams -->
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>${rxjava.version}</version>
</dependency>
</dependencies>
</project>
#!/bin/sh
set -eu
version=$1
prefix="java -Xmx80m -Dbank.port=8080 -DcreditCheck.port=8081 -Daccount.port=8082 -cp target/asynchrony-1.0-SNAPSHOT-jar-with-dependencies.jar com.iteratrlearning.examples"
eval "(${prefix}.synchronous.account.AccountService) &"
accountPid=$!
if [ $version = "sync" ]; then
eval "(${prefix}.synchronous.bank.BankService) &"
bankPid=$!
else
eval "(${prefix}.asynchronous.bank.AsyncBankService) &"
bankPid=$!
fi
eval "(${prefix}.synchronous.credit_check.CreditCheckService) &"
creditCheckPid=$!
echo "Press any key to exit all processes"
IFS= read -r REPLY
echo
echo "----------------------"
echo
jps
echo
echo "----------------------"
echo
echo "Killing 3 services"
kill -9 $accountPid
kill -9 $bankPid
kill -9 $creditCheckPid
echo
echo "----------------------"
echo
jps
File added
package com.iteratrlearning;
import java.io.IOException;
import java.net.ServerSocket;
public class PortFinder
{
private static final int START_PORT = 9090;
private static final int MAX_ATTEMPTS = 10;
private static int candidatePort = START_PORT;
public static int findPort()
{
for (int i = 0; i < MAX_ATTEMPTS; i++)
{
try
{
try (final ServerSocket serverSocket = new ServerSocket(candidatePort))
{
if (serverSocket.isBound())
{
final int port = candidatePort;
System.out.println("********* Found Port: " + port);
candidatePort++;
return port;
}
}
}
catch (IOException e)
{
candidatePort++;
}
}
throw new Error(
"Unable to start service, attempted 10 ports, ranging from " +
(candidatePort - 10) + " to " + candidatePort +
", maybe you have a firewall enabled?");
}
private static int findConfigurablePort(final String property)
{
final Integer value = Integer.getInteger(property);
if (value == null)
{
return findPort();
}
return value;
}
public static int findBankServicePort()
{
return findConfigurablePort("bank.port");
}
public static int findCreditCheckServicePort()
{
return findConfigurablePort("creditCheck.port");
}
public static int findAccountServicePort()
{
return findConfigurablePort("account.port");
}
}
package com.iteratrlearning.answers.actors.akkabasics;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class CounterActor extends UntypedActor {
private int count = 0;
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
@Override
public void onReceive(Object message) throws Throwable {
log.info("Received Message: " + message);
if("Count".equals(message)) {
getSender().tell(count, getSelf());
}
else if("Stop".equals(message)) {
getContext().stop(getSelf());
}
count++;
}
public int getCount() {
return count;
}
}
package com.iteratrlearning.answers.actors.akkabasics;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class PingActor extends UntypedActor {
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
@Override
public void onReceive(Object message) throws Throwable {
log.info("Received Message: " + message);
if("PONG".equals(message)) {
getSender().tell("PING", getSelf());
}
}
}
package com.iteratrlearning.answers.actors.akkabasics;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
public class PingPongApp {
public static void main(String[] args) {
ActorSystem actorSystem = ActorSystem.create();
// Creates a SimpleActor and returns reference to it
ActorRef pingActor
= actorSystem.actorOf(Props.create(PingActor.class), "ping-actor");
ActorRef pongActor
= actorSystem.actorOf(Props.create(PongActor.class), "pong-actor");
pingActor.tell("PONG", pongActor);
}
}
package com.iteratrlearning.answers.actors.akkabasics;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class PongActor extends UntypedActor {
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
@Override
public void onReceive(Object message) throws Throwable {
log.info("Received Message: " + message);
if("PING".equals(message)) {
getSender().tell("PONG", getSelf());
}
}
}
package com.iteratrlearning.answers.actors.intro;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
public class ActorSystem {
private final static ExecutorService executorService = Executors.newCachedThreadPool();
public static <T> CustomActor<T> spawn(BiConsumer<CustomActor<T>, T> behaviourHandler,
BiConsumer<CustomActor<T>, Throwable> errorHandler) {
CustomActor<T> actor = CustomActor.create(behaviourHandler, errorHandler);
executorService.submit(actor);
return actor;
}
public static void shutdown() {
executorService.shutdown();
}
}
package com.iteratrlearning.answers.actors.intro;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
public class CustomActor<T> implements Runnable {
private final ConcurrentLinkedQueue<T> mailbox;
private final BiConsumer<CustomActor<T>, T> actionHandler;
private final BiConsumer<CustomActor<T>, Throwable> errorHandler;
private CustomActor(BiConsumer<CustomActor<T>, T> behaviourHandler,
BiConsumer<CustomActor<T>, Throwable> errorHandler) {
this.mailbox = new ConcurrentLinkedQueue<>();
this.actionHandler = behaviourHandler;
this.errorHandler = errorHandler;
}
static <T> CustomActor<T> create(BiConsumer<CustomActor<T>, T> behaviourHandler,
BiConsumer<CustomActor<T>, Throwable> errorHandler) {
return new CustomActor<>(behaviourHandler, errorHandler);
}
public void send(T message) {
mailbox.offer(message);
}
@Override
public void run() {
try {
while(true) {
T message = mailbox.poll();
if(message != null) {
actionHandler.accept(this, message);
}
}
} catch(Exception e) {
errorHandler.accept(this, e);
}
}
}
package com.iteratrlearning.answers.actors.intro;
public class CustomActorRunner {
private static CustomActor<String> pingActor;
private static CustomActor<String> pongActor;
public static void main(String[] args) throws InterruptedException {
pingActor = ActorSystem.spawn((actor, message) -> {
if("PING".equals(message)) {
pongActor.send("PONG");
}
else if("STOP".equals(message)) {
System.exit(0);
}
System.out.println(message);
}, (actor, exception) -> System.out.println(exception)
);
pongActor = ActorSystem.spawn((actor, message) -> {
if("PONG".equals(message)) {
pingActor.send("PING");
}
System.out.println(message);
}, (actor, exception) -> System.out.println(exception)
);
pingActor.send("PING");
Thread.sleep(10);
pingActor.send("STOP");
ActorSystem.shutdown();
}
}
package com.iteratrlearning.answers.actors.movie;
public final class InfoMovieMessage {
private final String movie;
public InfoMovieMessage(String data) {
this.movie = data;
}
public String getMovie() {
return this.movie;
}
@Override
public String toString() {
return "InfoMovieMessage{" +
"movie='" + movie + '\'' +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
InfoMovieMessage that = (InfoMovieMessage) o;
return getMovie().equals(that.getMovie());
}
@Override
public int hashCode() {
return getMovie().hashCode();
}
}
package com.iteratrlearning.answers.actors.movie;
public final class InfoReplyMovieMessage {
private final String movie;
private final Integer views;
public InfoReplyMovieMessage(String movie, int views) {
this.movie = movie;
this.views = views;
}
public String getMovie() {
return movie;
}
public Integer getViews() {
return views;
}
@Override
public String toString() {
return "InfoReplyMovieMessage{" +
"movie='" + movie + '\'' +
", views=" + views +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
InfoReplyMovieMessage that = (InfoReplyMovieMessage) o;
if (!getMovie().equals(that.getMovie())) return false;
return getViews().equals(that.getViews());
}
@Override
public int hashCode() {
int result = getMovie().hashCode();
result = 31 * result + getViews().hashCode();
return result;
}
}
package com.iteratrlearning.answers.actors.movie;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.model.ContentTypes;
import akka.http.javadsl.model.HttpEntities;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.server.AllDirectives;
import akka.http.javadsl.server.Route;
import akka.pattern.PatternsCS;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Flow;
import akka.util.Timeout;
import scala.concurrent.duration.Duration;