大庆电影院:ActiveMQ入门

admin 3个月前 (07-21) 科技 36 1

Apache ActiveMQ是当前最盛行的开源的,支持多协议的,基于Java的新闻中间件,官网的原话是:Apache ActiveMQ™ is the most popular open source, multi-protocol, Java-based messaging server.

ActiveMQ是一个完全支持JMS1.1和J2EE规范的JMS Provider实现,只管JMS规范出台已经是良久的事情了,然则JMS在当今J2EE应用中仍扮演者特殊的职位。

JMS是什么

JMS全称Java Message Service,即Java新闻服务应用程序接口,是一个Java平台中关于面向新闻中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送新闻,举行异步通讯。Java新闻服务是一个与详细平台无关的API。

JMS工具模子


JMS新闻模子

在JMS尺度中,有两种新闻模子PTP(Point to Point)以及Publish/Subscribe(Pub/Sub)。

PTP,点对点新闻传送模子

在点对点新闻传送模子中,发送者将新闻发送给一个特殊的新闻行列,该行列保留了所有发送给它的新闻,消费者从这个行列中获取新闻。

PTP的特点:

  • 每个新闻只有一个消费者,即一旦被消费,新闻就不再在新闻行列中

  • 发送者和吸收者之间在时间上没有依赖性,也就是说当发送者发送了新闻之后,不管吸收者有没有正在运行,都不会影响到新闻被发送到行列

  • 吸收者在乐成吸收新闻之后需向行列发送确认收到通知


Pub/Sub,公布/订阅新闻通报模子

在公布/订阅新闻模子中,公布者公布一个新闻,该新闻通过topic通报给所有的客户端。在这种模子中,公布者和订阅者相互不知道对方,是匿名的且可以动态公布和订阅topic。在公布/订阅新闻模子中,目的地被称为主题(topic),topic主要用于保留和通报新闻,且会一直保留新闻直到新闻被通报给客户端。

Pub/Sub特点:

  • 每个新闻可以有多个消费者

  • 公布者和订阅者之间有时间上的依赖性。针对某个topic的订阅者,它必须建立一个或多个订阅者之后,才气消费公布者的新闻,而且为了消费新闻,订阅者必须保持运行的状态。

  • 为了缓和这样严酷的时间相关性,JMS允许订阅者建立一个可持久化的订阅,这样就可以在订阅者没有运行的时刻也能吸收到公布者的新闻


JMS新闻结构

Message主要由三部门组成,分别是新闻头Header,新闻属性Properties,以及新闻体Body。

新闻头中主要内容:

新闻属性可以理解为新闻的附加新闻头,属性名可以自定义。新闻的属性值可以是String, boolean , byte,short, double, int ,long或float型,Message接口为读取和写入属性提供了若干个取值函数和赋值函数方式。

新闻体的类型:

ActiveMQ的特征

  • 支持多种编程语言

  • 支持多种传输协议

  • 有多种持久化方式

ActiveMQ的安装

安装环境:JDK1.8,CentOS7
下载地址:http://activemq.apache.org/components/classic/download/
CentOS在连网的情况下也可以通过wget(若是wget下令不存在可以通过yum install wget举行安装)下令获取软件包,如:wget https://archive.apache.org/dist/activemq/5.15.10/apache-activemq-5.15.10-bin.tar.gz

提取文件: tar -zxvf apache-activemq-5.15.10-bin.tar.gz -C /vartar -zxvf apache-activemq-5.15.10-bin.tar.gz -C /var

重命名:mv /var/apache-activemq-5.15.10/ /var/activemq/

ActiveMQ解压后的目录结构:

在/etc/profile文件中添加Java环境变量:

export JAVA_HOME=/var/jdk1.8.0
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar


ActiveMQ解压后就可以使用,bin目录下可执行activemq可以举行ActiveMQ的启动住手。

ActiveMQ服务

前面使用下令运行ActiveMQ,但最好的方式是将ActiveMQ作为服务启动,使用system服务可以保证ActiveMQ在系统启动时自动启动。

建立ActiveMQ服务步骤:

  1. 建立一个systemd服务文件:

vi /usr/lib/systemd/system/activemq.service


  1. 在服务文件中添加以下内容

[Unit]
Description=ActiveMQ service
After=network.target

[Service]
Type=forking
ExecStart=/var/activemq/bin/activemq start
ExecStop=/var/activemq/bin/activemq stop
User=root
Group=root
Restart=always
RestartSec=9
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=activemq

[Install]
WantedBy=multi-user.target


  1. 产看Java安装目录:whereis java

  2. 设置activemq设置文件/var/activemq/bin/env中的JAVA_HOME

# Location of the java installation
# Specify the location of your java installation using JAVA_HOME, or specify the
# path to the "java" binary using JAVACMD
# (set JAVACMD to "auto" for automatic detection)
JAVA_HOME="/var/jdk1.8.0"
JAVACMD="auto"


  1. 通过systemctl治理activemq启停

  • 启动activemq服务:systemctl start activemq

  • 查看服务状态:systemctl status activemq

  • 建立软件链接:ln -s /usr/lib/systemd/system/activemq.service /etc/systemd/system/multi-user.target.wants/activemq.service

  • 开机自启:systemctl enable activemq

  • 检测是否开启乐成:systemctl list-unit-files |grep activemq

  • 住手activemq服务:systemctl stop activemq

ActiveMQ的Web治理平台

ActiveMQ自带有Web治理平台,默认使用8161端口,服务启动后在浏览器输入http://服务IP:8161/admin 即可进入,默认设置的账户admin,密码也是admin。

若是服务启动后页面无法访问可能是防火墙内需要添加需要的端口。
查看防火墙状态:systemctl status firewalld
防火墙添加端口:firewall-cmd —zone=public —add-port=61616/tcp —permanent
重启防护墙:systemctl restart firewalld.service
或者直接关闭防火墙:systemctl stop firewalld.service

ActiveMQ的Web治理平台是基于jetty的,在ActiveMQ的安装目录下conf文件中有jetty.xml设置文件,通过该文件可以对Web治理平台举行设置治理, 如:

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
  <!-- the default port number for the web console -->
  <property name="host" value="0.0.0.0"/>
  <!--此处即为治理平台的端口-->
  <property name="port" value="8161"/>
</bean>

<bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
  <property name="name" value="BASIC" />
  <property name="roles" value="user,admin" />
  <!-- 改为false即可关闭上岸 -->
  <property name="authenticate" value="true" />
</bean>


通过jetty-realm.properties设置文件可以对Web治理平台的用户举行治理:

# 在此即可维护账号密码,花样:
# 用户名:密码,角色
# Defines users that can access the web (console, demo, etc.)
# username: password [,rolename ...]
admin: admin, admin
user: 1234, user


ActiveMQ的Java示例

Maven治理的Jar包:

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.10</version>
</dependency>


Producer代码示例:

package com.demo.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ProducerDemo {

  private static final String BORKER_URL = "tcp://ip:61616";
  private static final String QUEUE_NAME = "queue-test";

  public static void main(String[] args) throws Exception {
    // 建立毗邻工厂
    ActiveMQConnectionFactory activeMQConnectionFactory =
        new ActiveMQConnectionFactory("admin", "admin", BORKER_URL);
    // 建立毗邻工具
    Connection connection = activeMQConnectionFactory.createConnection();
    connection.start();
    // 建立会话
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 建立点对点发送的目的Queue
    Queue queue = session.createQueue(QUEUE_NAME);
    // 建立新闻生产者
    MessageProducer producer = session.createProducer(queue);
    //    Topic topic1 = session.createTopic("topic-test");
    //    MessageProducer producer1 = session.createProducer(topic1);
    // 设置生产者的模式,有两种可选 持久化 / 不持久化
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    // 文本新闻
    TextMessage message = session.createTextMessage("Hello ActiveMQ message");
    // 发送新闻
    producer.send(message);
    // 关闭毗邻
    producer.close();
    session.close();
    connection.close();
  }
}


运行之后可以在Web控制台Queues tab下看到新闻:

Consumer代码示例:

package com.demo.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ConsumerDemo {

  private static final String BORKER_URL = "tcp://192.168.0.242:61616";
  private static final String QUEUE_NAME = "queue-test";

  public static void main(String[] args) throws Exception {
    // 建立毗邻工厂
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(BORKER_URL);
    // 建立毗邻工具
    Connection connection = activeMQConnectionFactory.createConnection("admin", "admin");
    connection.start();
    // 建立会话
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 建立点对点消费的目的Queue
    Queue queue = session.createQueue(QUEUE_NAME);
    //    Topic topic1 = session.createTopic("topic-test");
    //    MessageConsumer consumer1 = session.createConsumer(topic1);
    // 建立新闻消费者
    MessageConsumer consumer = session.createConsumer(queue);
    // 吸收新闻
    Message message = consumer.receive();
    if (message instanceof TextMessage) {
      System.out.println("收到文本新闻:" + ((TextMessage) message).getText());
    } else {
      System.out.println(message);
    }
    // 关闭毗邻
    consumer.close();
    session.close();
    connection.close();
  }
}


运行后可以看到新闻被消费:

SpringBoot中使用ActiveMQ的代码示例

Maven依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>


yml设置文件:

spring:
  activemq:
    broker-url: tcp://ip:61616
    user: admin
    password: admin


代码示例:

import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.core.JmsTemplate;

import javax.annotation.PostConstruct;

@SpringBootApplication
public class Producer {

  @Autowired private JmsTemplate jmsTemplate;

  @PostConstruct
  public void init() {
    ActiveMQTopic activeMQTopic = new ActiveMQTopic("topic-test");
    jmsTemplate.convertAndSend(activeMQTopic, "Hello SpringBoot ActiveMQ!");
  }

  public static void main(String[] args) {
    SpringApplication.run(Producer.class);
  }
}

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;

import javax.jms.ConnectionFactory;

@EnableJms
@SpringBootApplication
public class Consumer {

  @Bean
  public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory) {
    SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setPubSubDomain(true);
    return factory;
  }

  @JmsListener(destination = "topic-test", containerFactory = "myFactory")
  public void receive(String message) {
    System.out.println("Received Message: " + message);
  }

  public static void main(String[] args) {
    SpringApplication.run(Consumer.class);
  }
},

阳光在线

阳光在线www.leegalo.com(原诚信在线)现已开放阳光在线手机版下载。阳光在线游戏公平、公开、公正,用实力赢取信誉。

Allbet Gaming声明:该文看法仅代表作者自己,与本平台无关。转载请注明:大庆电影院:ActiveMQ入门

网友评论

  • (*)

最新评论

  • UG环球充值 2020-07-21 00:01:28 回复

    Allbet Gaming欢迎进入allbetGaming网址。AllbetGaming网址开放AllbetGaming代理会员登录网址、AllbetGaming会员开户、AllbetGaming代理开户、AllbetGaming客户端下载、AllbetGamingAPP下载等业务。这个不火天理难容

    1

站点信息

  • 文章总数:982
  • 页面总数:0
  • 分类总数:8
  • 标签总数:1742
  • 评论总数:203
  • 浏览总数:6740